]> git.saurik.com Git - redis.git/blob - redis.c
eb44138f3a97aa9951046696065616cbef533914
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #include "redis.h"
63 #include "ae.h" /* Event driven programming library */
64 #include "sds.h" /* Dynamic safe strings */
65 #include "anet.h" /* Networking the easy way */
66 #include "dict.h" /* Hash tables */
67 #include "adlist.h" /* Linked lists */
68 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
69 #include "lzf.h" /* LZF compression library */
70 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
71
72 /* Error codes */
73 #define REDIS_OK 0
74 #define REDIS_ERR -1
75
76 /* Static server configuration */
77 #define REDIS_SERVERPORT 6379 /* TCP port */
78 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
79 #define REDIS_IOBUF_LEN 1024
80 #define REDIS_LOADBUF_LEN 1024
81 #define REDIS_STATIC_ARGS 4
82 #define REDIS_DEFAULT_DBNUM 16
83 #define REDIS_CONFIGLINE_MAX 1024
84 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
85 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
86 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
87 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
88 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
89
90 /* Hash table parameters */
91 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
92
93 /* Command flags */
94 #define REDIS_CMD_BULK 1 /* Bulk write command */
95 #define REDIS_CMD_INLINE 2 /* Inline command */
96 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
97 this flags will return an error when the 'maxmemory' option is set in the
98 config file and the server is using more than maxmemory bytes of memory.
99 In short this commands are denied on low memory conditions. */
100 #define REDIS_CMD_DENYOOM 4
101
102 /* Object types */
103 #define REDIS_STRING 0
104 #define REDIS_LIST 1
105 #define REDIS_SET 2
106 #define REDIS_ZSET 3
107 #define REDIS_HASH 4
108
109 /* Objects encoding */
110 #define REDIS_ENCODING_RAW 0 /* Raw representation */
111 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
112
113 /* Object types only used for dumping to disk */
114 #define REDIS_EXPIRETIME 253
115 #define REDIS_SELECTDB 254
116 #define REDIS_EOF 255
117
118 /* Defines related to the dump file format. To store 32 bits lengths for short
119 * keys requires a lot of space, so we check the most significant 2 bits of
120 * the first byte to interpreter the length:
121 *
122 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
123 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
124 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
125 * 11|000000 this means: specially encoded object will follow. The six bits
126 * number specify the kind of object that follows.
127 * See the REDIS_RDB_ENC_* defines.
128 *
129 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
130 * values, will fit inside. */
131 #define REDIS_RDB_6BITLEN 0
132 #define REDIS_RDB_14BITLEN 1
133 #define REDIS_RDB_32BITLEN 2
134 #define REDIS_RDB_ENCVAL 3
135 #define REDIS_RDB_LENERR UINT_MAX
136
137 /* When a length of a string object stored on disk has the first two bits
138 * set, the remaining two bits specify a special encoding for the object
139 * accordingly to the following defines: */
140 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
141 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
142 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
143 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
144
145 /* Client flags */
146 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
147 #define REDIS_SLAVE 2 /* This client is a slave server */
148 #define REDIS_MASTER 4 /* This client is a master server */
149 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
150
151 /* Slave replication state - slave side */
152 #define REDIS_REPL_NONE 0 /* No active replication */
153 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
154 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
155
156 /* Slave replication state - from the point of view of master
157 * Note that in SEND_BULK and ONLINE state the slave receives new updates
158 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
159 * to start the next background saving in order to send updates to it. */
160 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
161 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
162 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
163 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
164
165 /* List related stuff */
166 #define REDIS_HEAD 0
167 #define REDIS_TAIL 1
168
169 /* Sort operations */
170 #define REDIS_SORT_GET 0
171 #define REDIS_SORT_DEL 1
172 #define REDIS_SORT_INCR 2
173 #define REDIS_SORT_DECR 3
174 #define REDIS_SORT_ASC 4
175 #define REDIS_SORT_DESC 5
176 #define REDIS_SORTKEY_MAX 1024
177
178 /* Log levels */
179 #define REDIS_DEBUG 0
180 #define REDIS_NOTICE 1
181 #define REDIS_WARNING 2
182
183 /* Anti-warning macro... */
184 #define REDIS_NOTUSED(V) ((void) V)
185
186 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
187 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
188
189 /*================================= Data types ============================== */
190
191 /* A redis object, that is a type able to hold a string / list / set */
192 typedef struct redisObject {
193 void *ptr;
194 unsigned char type;
195 unsigned char encoding;
196 unsigned char notused[2];
197 int refcount;
198 } robj;
199
200 typedef struct redisDb {
201 dict *dict;
202 dict *expires;
203 int id;
204 } redisDb;
205
206 /* With multiplexing we need to take per-clinet state.
207 * Clients are taken in a liked list. */
208 typedef struct redisClient {
209 int fd;
210 redisDb *db;
211 int dictid;
212 sds querybuf;
213 robj **argv, **mbargv;
214 int argc, mbargc;
215 int bulklen; /* bulk read len. -1 if not in bulk read mode */
216 int multibulk; /* multi bulk command format active */
217 list *reply;
218 int sentlen;
219 time_t lastinteraction; /* time of the last interaction, used for timeout */
220 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
221 int slaveseldb; /* slave selected db, if this client is a slave */
222 int authenticated; /* when requirepass is non-NULL */
223 int replstate; /* replication state if this is a slave */
224 int repldbfd; /* replication DB file descriptor */
225 long repldboff; /* replication DB file offset */
226 off_t repldbsize; /* replication DB file size */
227 } redisClient;
228
229 struct saveparam {
230 time_t seconds;
231 int changes;
232 };
233
234 /* Global server state structure */
235 struct redisServer {
236 int port;
237 int fd;
238 redisDb *db;
239 dict *sharingpool;
240 unsigned int sharingpoolsize;
241 long long dirty; /* changes to DB from the last save */
242 list *clients;
243 list *slaves, *monitors;
244 char neterr[ANET_ERR_LEN];
245 aeEventLoop *el;
246 int cronloops; /* number of times the cron function run */
247 list *objfreelist; /* A list of freed objects to avoid malloc() */
248 time_t lastsave; /* Unix time of last save succeeede */
249 size_t usedmemory; /* Used memory in megabytes */
250 /* Fields used only for stats */
251 time_t stat_starttime; /* server start time */
252 long long stat_numcommands; /* number of processed commands */
253 long long stat_numconnections; /* number of connections received */
254 /* Configuration */
255 int verbosity;
256 int glueoutputbuf;
257 int maxidletime;
258 int dbnum;
259 int daemonize;
260 char *pidfile;
261 int bgsaveinprogress;
262 pid_t bgsavechildpid;
263 struct saveparam *saveparams;
264 int saveparamslen;
265 char *logfile;
266 char *bindaddr;
267 char *dbfilename;
268 char *requirepass;
269 int shareobjects;
270 /* Replication related */
271 int isslave;
272 char *masterhost;
273 int masterport;
274 redisClient *master; /* client that is master for this slave */
275 int replstate;
276 unsigned int maxclients;
277 unsigned long maxmemory;
278 /* Sort parameters - qsort_r() is only available under BSD so we
279 * have to take this state global, in order to pass it to sortCompare() */
280 int sort_desc;
281 int sort_alpha;
282 int sort_bypattern;
283 };
284
285 typedef void redisCommandProc(redisClient *c);
286 struct redisCommand {
287 char *name;
288 redisCommandProc *proc;
289 int arity;
290 int flags;
291 };
292
293 struct redisFunctionSym {
294 char *name;
295 unsigned long pointer;
296 };
297
298 typedef struct _redisSortObject {
299 robj *obj;
300 union {
301 double score;
302 robj *cmpobj;
303 } u;
304 } redisSortObject;
305
306 typedef struct _redisSortOperation {
307 int type;
308 robj *pattern;
309 } redisSortOperation;
310
311 /* ZSETs use a specialized version of Skiplists */
312
313 typedef struct zskiplistNode {
314 struct zskiplistNode **forward;
315 struct zskiplistNode *backward;
316 double score;
317 robj *obj;
318 } zskiplistNode;
319
320 typedef struct zskiplist {
321 struct zskiplistNode *header, *tail;
322 long length;
323 int level;
324 } zskiplist;
325
326 typedef struct zset {
327 dict *dict;
328 zskiplist *zsl;
329 } zset;
330
331 /* Our shared "common" objects */
332
333 struct sharedObjectsStruct {
334 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
335 *colon, *nullbulk, *nullmultibulk,
336 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
337 *outofrangeerr, *plus,
338 *select0, *select1, *select2, *select3, *select4,
339 *select5, *select6, *select7, *select8, *select9;
340 } shared;
341
342 /* Global vars that are actally used as constants. The following double
343 * values are used for double on-disk serialization, and are initialized
344 * at runtime to avoid strange compiler optimizations. */
345
346 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
347
348 /*================================ Prototypes =============================== */
349
350 static void freeStringObject(robj *o);
351 static void freeListObject(robj *o);
352 static void freeSetObject(robj *o);
353 static void decrRefCount(void *o);
354 static robj *createObject(int type, void *ptr);
355 static void freeClient(redisClient *c);
356 static int rdbLoad(char *filename);
357 static void addReply(redisClient *c, robj *obj);
358 static void addReplySds(redisClient *c, sds s);
359 static void incrRefCount(robj *o);
360 static int rdbSaveBackground(char *filename);
361 static robj *createStringObject(char *ptr, size_t len);
362 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
363 static int syncWithMaster(void);
364 static robj *tryObjectSharing(robj *o);
365 static int tryObjectEncoding(robj *o);
366 static robj *getDecodedObject(const robj *o);
367 static int removeExpire(redisDb *db, robj *key);
368 static int expireIfNeeded(redisDb *db, robj *key);
369 static int deleteIfVolatile(redisDb *db, robj *key);
370 static int deleteKey(redisDb *db, robj *key);
371 static time_t getExpire(redisDb *db, robj *key);
372 static int setExpire(redisDb *db, robj *key, time_t when);
373 static void updateSlavesWaitingBgsave(int bgsaveerr);
374 static void freeMemoryIfNeeded(void);
375 static int processCommand(redisClient *c);
376 static void setupSigSegvAction(void);
377 static void rdbRemoveTempFile(pid_t childpid);
378 static size_t stringObjectLen(robj *o);
379 static void processInputBuffer(redisClient *c);
380 static zskiplist *zslCreate(void);
381 static void zslFree(zskiplist *zsl);
382 static void zslInsert(zskiplist *zsl, double score, robj *obj);
383
384 static void authCommand(redisClient *c);
385 static void pingCommand(redisClient *c);
386 static void echoCommand(redisClient *c);
387 static void setCommand(redisClient *c);
388 static void setnxCommand(redisClient *c);
389 static void getCommand(redisClient *c);
390 static void delCommand(redisClient *c);
391 static void existsCommand(redisClient *c);
392 static void incrCommand(redisClient *c);
393 static void decrCommand(redisClient *c);
394 static void incrbyCommand(redisClient *c);
395 static void decrbyCommand(redisClient *c);
396 static void selectCommand(redisClient *c);
397 static void randomkeyCommand(redisClient *c);
398 static void keysCommand(redisClient *c);
399 static void dbsizeCommand(redisClient *c);
400 static void lastsaveCommand(redisClient *c);
401 static void saveCommand(redisClient *c);
402 static void bgsaveCommand(redisClient *c);
403 static void shutdownCommand(redisClient *c);
404 static void moveCommand(redisClient *c);
405 static void renameCommand(redisClient *c);
406 static void renamenxCommand(redisClient *c);
407 static void lpushCommand(redisClient *c);
408 static void rpushCommand(redisClient *c);
409 static void lpopCommand(redisClient *c);
410 static void rpopCommand(redisClient *c);
411 static void llenCommand(redisClient *c);
412 static void lindexCommand(redisClient *c);
413 static void lrangeCommand(redisClient *c);
414 static void ltrimCommand(redisClient *c);
415 static void typeCommand(redisClient *c);
416 static void lsetCommand(redisClient *c);
417 static void saddCommand(redisClient *c);
418 static void sremCommand(redisClient *c);
419 static void smoveCommand(redisClient *c);
420 static void sismemberCommand(redisClient *c);
421 static void scardCommand(redisClient *c);
422 static void spopCommand(redisClient *c);
423 static void srandmemberCommand(redisClient *c);
424 static void sinterCommand(redisClient *c);
425 static void sinterstoreCommand(redisClient *c);
426 static void sunionCommand(redisClient *c);
427 static void sunionstoreCommand(redisClient *c);
428 static void sdiffCommand(redisClient *c);
429 static void sdiffstoreCommand(redisClient *c);
430 static void syncCommand(redisClient *c);
431 static void flushdbCommand(redisClient *c);
432 static void flushallCommand(redisClient *c);
433 static void sortCommand(redisClient *c);
434 static void lremCommand(redisClient *c);
435 static void infoCommand(redisClient *c);
436 static void mgetCommand(redisClient *c);
437 static void monitorCommand(redisClient *c);
438 static void expireCommand(redisClient *c);
439 static void getsetCommand(redisClient *c);
440 static void ttlCommand(redisClient *c);
441 static void slaveofCommand(redisClient *c);
442 static void debugCommand(redisClient *c);
443 static void msetCommand(redisClient *c);
444 static void msetnxCommand(redisClient *c);
445 static void zaddCommand(redisClient *c);
446 static void zrangeCommand(redisClient *c);
447 static void zrangebyscoreCommand(redisClient *c);
448 static void zrevrangeCommand(redisClient *c);
449 static void zlenCommand(redisClient *c);
450 static void zremCommand(redisClient *c);
451 static void zscoreCommand(redisClient *c);
452
453 /*================================= Globals ================================= */
454
455 /* Global vars */
456 static struct redisServer server; /* server global state */
457 static struct redisCommand cmdTable[] = {
458 {"get",getCommand,2,REDIS_CMD_INLINE},
459 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
460 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
461 {"del",delCommand,-2,REDIS_CMD_INLINE},
462 {"exists",existsCommand,2,REDIS_CMD_INLINE},
463 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
464 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
465 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
466 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
467 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
468 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
469 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
470 {"llen",llenCommand,2,REDIS_CMD_INLINE},
471 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
472 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
473 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
474 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
475 {"lrem",lremCommand,4,REDIS_CMD_BULK},
476 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
477 {"srem",sremCommand,3,REDIS_CMD_BULK},
478 {"smove",smoveCommand,4,REDIS_CMD_BULK},
479 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
480 {"scard",scardCommand,2,REDIS_CMD_INLINE},
481 {"spop",spopCommand,2,REDIS_CMD_INLINE},
482 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
483 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
484 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
485 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
486 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
487 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
490 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"zrem",zremCommand,3,REDIS_CMD_BULK},
492 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
493 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
494 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
495 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
496 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
498 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
499 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
500 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
503 {"select",selectCommand,2,REDIS_CMD_INLINE},
504 {"move",moveCommand,3,REDIS_CMD_INLINE},
505 {"rename",renameCommand,3,REDIS_CMD_INLINE},
506 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
507 {"expire",expireCommand,3,REDIS_CMD_INLINE},
508 {"keys",keysCommand,2,REDIS_CMD_INLINE},
509 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
510 {"auth",authCommand,2,REDIS_CMD_INLINE},
511 {"ping",pingCommand,1,REDIS_CMD_INLINE},
512 {"echo",echoCommand,2,REDIS_CMD_BULK},
513 {"save",saveCommand,1,REDIS_CMD_INLINE},
514 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
515 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
516 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
517 {"type",typeCommand,2,REDIS_CMD_INLINE},
518 {"sync",syncCommand,1,REDIS_CMD_INLINE},
519 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
520 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
521 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
522 {"info",infoCommand,1,REDIS_CMD_INLINE},
523 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
524 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
525 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
526 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
527 {NULL,NULL,0,0}
528 };
529 /*============================ Utility functions ============================ */
530
531 /* Glob-style pattern matching. */
532 int stringmatchlen(const char *pattern, int patternLen,
533 const char *string, int stringLen, int nocase)
534 {
535 while(patternLen) {
536 switch(pattern[0]) {
537 case '*':
538 while (pattern[1] == '*') {
539 pattern++;
540 patternLen--;
541 }
542 if (patternLen == 1)
543 return 1; /* match */
544 while(stringLen) {
545 if (stringmatchlen(pattern+1, patternLen-1,
546 string, stringLen, nocase))
547 return 1; /* match */
548 string++;
549 stringLen--;
550 }
551 return 0; /* no match */
552 break;
553 case '?':
554 if (stringLen == 0)
555 return 0; /* no match */
556 string++;
557 stringLen--;
558 break;
559 case '[':
560 {
561 int not, match;
562
563 pattern++;
564 patternLen--;
565 not = pattern[0] == '^';
566 if (not) {
567 pattern++;
568 patternLen--;
569 }
570 match = 0;
571 while(1) {
572 if (pattern[0] == '\\') {
573 pattern++;
574 patternLen--;
575 if (pattern[0] == string[0])
576 match = 1;
577 } else if (pattern[0] == ']') {
578 break;
579 } else if (patternLen == 0) {
580 pattern--;
581 patternLen++;
582 break;
583 } else if (pattern[1] == '-' && patternLen >= 3) {
584 int start = pattern[0];
585 int end = pattern[2];
586 int c = string[0];
587 if (start > end) {
588 int t = start;
589 start = end;
590 end = t;
591 }
592 if (nocase) {
593 start = tolower(start);
594 end = tolower(end);
595 c = tolower(c);
596 }
597 pattern += 2;
598 patternLen -= 2;
599 if (c >= start && c <= end)
600 match = 1;
601 } else {
602 if (!nocase) {
603 if (pattern[0] == string[0])
604 match = 1;
605 } else {
606 if (tolower((int)pattern[0]) == tolower((int)string[0]))
607 match = 1;
608 }
609 }
610 pattern++;
611 patternLen--;
612 }
613 if (not)
614 match = !match;
615 if (!match)
616 return 0; /* no match */
617 string++;
618 stringLen--;
619 break;
620 }
621 case '\\':
622 if (patternLen >= 2) {
623 pattern++;
624 patternLen--;
625 }
626 /* fall through */
627 default:
628 if (!nocase) {
629 if (pattern[0] != string[0])
630 return 0; /* no match */
631 } else {
632 if (tolower((int)pattern[0]) != tolower((int)string[0]))
633 return 0; /* no match */
634 }
635 string++;
636 stringLen--;
637 break;
638 }
639 pattern++;
640 patternLen--;
641 if (stringLen == 0) {
642 while(*pattern == '*') {
643 pattern++;
644 patternLen--;
645 }
646 break;
647 }
648 }
649 if (patternLen == 0 && stringLen == 0)
650 return 1;
651 return 0;
652 }
653
654 static void redisLog(int level, const char *fmt, ...) {
655 va_list ap;
656 FILE *fp;
657
658 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
659 if (!fp) return;
660
661 va_start(ap, fmt);
662 if (level >= server.verbosity) {
663 char *c = ".-*";
664 char buf[64];
665 time_t now;
666
667 now = time(NULL);
668 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
669 fprintf(fp,"%s %c ",buf,c[level]);
670 vfprintf(fp, fmt, ap);
671 fprintf(fp,"\n");
672 fflush(fp);
673 }
674 va_end(ap);
675
676 if (server.logfile) fclose(fp);
677 }
678
679 /*====================== Hash table type implementation ==================== */
680
681 /* This is an hash table type that uses the SDS dynamic strings libary as
682 * keys and radis objects as values (objects can hold SDS strings,
683 * lists, sets). */
684
685 static void dictVanillaFree(void *privdata, void *val)
686 {
687 DICT_NOTUSED(privdata);
688 zfree(val);
689 }
690
691 static int sdsDictKeyCompare(void *privdata, const void *key1,
692 const void *key2)
693 {
694 int l1,l2;
695 DICT_NOTUSED(privdata);
696
697 l1 = sdslen((sds)key1);
698 l2 = sdslen((sds)key2);
699 if (l1 != l2) return 0;
700 return memcmp(key1, key2, l1) == 0;
701 }
702
703 static void dictRedisObjectDestructor(void *privdata, void *val)
704 {
705 DICT_NOTUSED(privdata);
706
707 decrRefCount(val);
708 }
709
710 static int dictObjKeyCompare(void *privdata, const void *key1,
711 const void *key2)
712 {
713 const robj *o1 = key1, *o2 = key2;
714 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
715 }
716
717 static unsigned int dictObjHash(const void *key) {
718 const robj *o = key;
719 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
720 }
721
722 static int dictEncObjKeyCompare(void *privdata, const void *key1,
723 const void *key2)
724 {
725 const robj *o1 = key1, *o2 = key2;
726
727 if (o1->encoding == REDIS_ENCODING_RAW &&
728 o2->encoding == REDIS_ENCODING_RAW)
729 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
730 else {
731 robj *dec1, *dec2;
732 int cmp;
733
734 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
735 getDecodedObject(o1) : (robj*)o1;
736 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
737 getDecodedObject(o2) : (robj*)o2;
738 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
739 if (dec1 != o1) decrRefCount(dec1);
740 if (dec2 != o2) decrRefCount(dec2);
741 return cmp;
742 }
743 }
744
745 static unsigned int dictEncObjHash(const void *key) {
746 const robj *o = key;
747
748 if (o->encoding == REDIS_ENCODING_RAW)
749 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
750 else {
751 robj *dec = getDecodedObject(o);
752 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
753 decrRefCount(dec);
754 return hash;
755 }
756 }
757
758 static dictType setDictType = {
759 dictEncObjHash, /* hash function */
760 NULL, /* key dup */
761 NULL, /* val dup */
762 dictEncObjKeyCompare, /* key compare */
763 dictRedisObjectDestructor, /* key destructor */
764 NULL /* val destructor */
765 };
766
767 static dictType zsetDictType = {
768 dictEncObjHash, /* hash function */
769 NULL, /* key dup */
770 NULL, /* val dup */
771 dictEncObjKeyCompare, /* key compare */
772 dictRedisObjectDestructor, /* key destructor */
773 dictVanillaFree /* val destructor */
774 };
775
776 static dictType hashDictType = {
777 dictObjHash, /* hash function */
778 NULL, /* key dup */
779 NULL, /* val dup */
780 dictObjKeyCompare, /* key compare */
781 dictRedisObjectDestructor, /* key destructor */
782 dictRedisObjectDestructor /* val destructor */
783 };
784
785 /* ========================= Random utility functions ======================= */
786
787 /* Redis generally does not try to recover from out of memory conditions
788 * when allocating objects or strings, it is not clear if it will be possible
789 * to report this condition to the client since the networking layer itself
790 * is based on heap allocation for send buffers, so we simply abort.
791 * At least the code will be simpler to read... */
792 static void oom(const char *msg) {
793 fprintf(stderr, "%s: Out of memory\n",msg);
794 fflush(stderr);
795 sleep(1);
796 abort();
797 }
798
799 /* ====================== Redis server networking stuff ===================== */
800 static void closeTimedoutClients(void) {
801 redisClient *c;
802 listNode *ln;
803 time_t now = time(NULL);
804
805 listRewind(server.clients);
806 while ((ln = listYield(server.clients)) != NULL) {
807 c = listNodeValue(ln);
808 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
809 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
810 (now - c->lastinteraction > server.maxidletime)) {
811 redisLog(REDIS_DEBUG,"Closing idle client");
812 freeClient(c);
813 }
814 }
815 }
816
817 static int htNeedsResize(dict *dict) {
818 long long size, used;
819
820 size = dictSlots(dict);
821 used = dictSize(dict);
822 return (size && used && size > DICT_HT_INITIAL_SIZE &&
823 (used*100/size < REDIS_HT_MINFILL));
824 }
825
826 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
827 * we resize the hash table to save memory */
828 static void tryResizeHashTables(void) {
829 int j;
830
831 for (j = 0; j < server.dbnum; j++) {
832 if (htNeedsResize(server.db[j].dict)) {
833 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
834 dictResize(server.db[j].dict);
835 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
836 }
837 if (htNeedsResize(server.db[j].expires))
838 dictResize(server.db[j].expires);
839 }
840 }
841
842 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
843 int j, loops = server.cronloops++;
844 REDIS_NOTUSED(eventLoop);
845 REDIS_NOTUSED(id);
846 REDIS_NOTUSED(clientData);
847
848 /* Update the global state with the amount of used memory */
849 server.usedmemory = zmalloc_used_memory();
850
851 /* Show some info about non-empty databases */
852 for (j = 0; j < server.dbnum; j++) {
853 long long size, used, vkeys;
854
855 size = dictSlots(server.db[j].dict);
856 used = dictSize(server.db[j].dict);
857 vkeys = dictSize(server.db[j].expires);
858 if (!(loops % 5) && (used || vkeys)) {
859 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
860 /* dictPrintStats(server.dict); */
861 }
862 }
863
864 /* We don't want to resize the hash tables while a bacground saving
865 * is in progress: the saving child is created using fork() that is
866 * implemented with a copy-on-write semantic in most modern systems, so
867 * if we resize the HT while there is the saving child at work actually
868 * a lot of memory movements in the parent will cause a lot of pages
869 * copied. */
870 if (!server.bgsaveinprogress) tryResizeHashTables();
871
872 /* Show information about connected clients */
873 if (!(loops % 5)) {
874 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
875 listLength(server.clients)-listLength(server.slaves),
876 listLength(server.slaves),
877 server.usedmemory,
878 dictSize(server.sharingpool));
879 }
880
881 /* Close connections of timedout clients */
882 if (server.maxidletime && !(loops % 10))
883 closeTimedoutClients();
884
885 /* Check if a background saving in progress terminated */
886 if (server.bgsaveinprogress) {
887 int statloc;
888 if (wait4(-1,&statloc,WNOHANG,NULL)) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 redisLog(REDIS_NOTICE,
894 "Background saving terminated with success");
895 server.dirty = 0;
896 server.lastsave = time(NULL);
897 } else if (!bysignal && exitcode != 0) {
898 redisLog(REDIS_WARNING, "Background saving error");
899 } else {
900 redisLog(REDIS_WARNING,
901 "Background saving terminated by signal");
902 rdbRemoveTempFile(server.bgsavechildpid);
903 }
904 server.bgsaveinprogress = 0;
905 server.bgsavechildpid = -1;
906 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
907 }
908 } else {
909 /* If there is not a background saving in progress check if
910 * we have to save now */
911 time_t now = time(NULL);
912 for (j = 0; j < server.saveparamslen; j++) {
913 struct saveparam *sp = server.saveparams+j;
914
915 if (server.dirty >= sp->changes &&
916 now-server.lastsave > sp->seconds) {
917 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
918 sp->changes, sp->seconds);
919 rdbSaveBackground(server.dbfilename);
920 break;
921 }
922 }
923 }
924
925 /* Try to expire a few timed out keys */
926 for (j = 0; j < server.dbnum; j++) {
927 redisDb *db = server.db+j;
928 int num = dictSize(db->expires);
929
930 if (num) {
931 time_t now = time(NULL);
932
933 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
934 num = REDIS_EXPIRELOOKUPS_PER_CRON;
935 while (num--) {
936 dictEntry *de;
937 time_t t;
938
939 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
940 t = (time_t) dictGetEntryVal(de);
941 if (now > t) {
942 deleteKey(db,dictGetEntryKey(de));
943 }
944 }
945 }
946 }
947
948 /* Check if we should connect to a MASTER */
949 if (server.replstate == REDIS_REPL_CONNECT) {
950 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
951 if (syncWithMaster() == REDIS_OK) {
952 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
953 }
954 }
955 return 1000;
956 }
957
958 static void createSharedObjects(void) {
959 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
960 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
961 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
962 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
963 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
964 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
965 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
966 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
967 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
968 /* no such key */
969 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
970 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
971 "-ERR Operation against a key holding the wrong kind of value\r\n"));
972 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
973 "-ERR no such key\r\n"));
974 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
975 "-ERR syntax error\r\n"));
976 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
977 "-ERR source and destination objects are the same\r\n"));
978 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
979 "-ERR index out of range\r\n"));
980 shared.space = createObject(REDIS_STRING,sdsnew(" "));
981 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
982 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
983 shared.select0 = createStringObject("select 0\r\n",10);
984 shared.select1 = createStringObject("select 1\r\n",10);
985 shared.select2 = createStringObject("select 2\r\n",10);
986 shared.select3 = createStringObject("select 3\r\n",10);
987 shared.select4 = createStringObject("select 4\r\n",10);
988 shared.select5 = createStringObject("select 5\r\n",10);
989 shared.select6 = createStringObject("select 6\r\n",10);
990 shared.select7 = createStringObject("select 7\r\n",10);
991 shared.select8 = createStringObject("select 8\r\n",10);
992 shared.select9 = createStringObject("select 9\r\n",10);
993 }
994
995 static void appendServerSaveParams(time_t seconds, int changes) {
996 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
997 server.saveparams[server.saveparamslen].seconds = seconds;
998 server.saveparams[server.saveparamslen].changes = changes;
999 server.saveparamslen++;
1000 }
1001
1002 static void ResetServerSaveParams() {
1003 zfree(server.saveparams);
1004 server.saveparams = NULL;
1005 server.saveparamslen = 0;
1006 }
1007
1008 static void initServerConfig() {
1009 server.dbnum = REDIS_DEFAULT_DBNUM;
1010 server.port = REDIS_SERVERPORT;
1011 server.verbosity = REDIS_DEBUG;
1012 server.maxidletime = REDIS_MAXIDLETIME;
1013 server.saveparams = NULL;
1014 server.logfile = NULL; /* NULL = log on standard output */
1015 server.bindaddr = NULL;
1016 server.glueoutputbuf = 1;
1017 server.daemonize = 0;
1018 server.pidfile = "/var/run/redis.pid";
1019 server.dbfilename = "dump.rdb";
1020 server.requirepass = NULL;
1021 server.shareobjects = 0;
1022 server.sharingpoolsize = 1024;
1023 server.maxclients = 0;
1024 server.maxmemory = 0;
1025 ResetServerSaveParams();
1026
1027 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1028 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1029 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1030 /* Replication related */
1031 server.isslave = 0;
1032 server.masterhost = NULL;
1033 server.masterport = 6379;
1034 server.master = NULL;
1035 server.replstate = REDIS_REPL_NONE;
1036
1037 /* Double constants initialization */
1038 R_Zero = 0.0;
1039 R_PosInf = 1.0/R_Zero;
1040 R_NegInf = -1.0/R_Zero;
1041 R_Nan = R_Zero/R_Zero;
1042 }
1043
1044 static void initServer() {
1045 int j;
1046
1047 signal(SIGHUP, SIG_IGN);
1048 signal(SIGPIPE, SIG_IGN);
1049 setupSigSegvAction();
1050
1051 server.clients = listCreate();
1052 server.slaves = listCreate();
1053 server.monitors = listCreate();
1054 server.objfreelist = listCreate();
1055 createSharedObjects();
1056 server.el = aeCreateEventLoop();
1057 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1058 server.sharingpool = dictCreate(&setDictType,NULL);
1059 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1060 if (server.fd == -1) {
1061 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1062 exit(1);
1063 }
1064 for (j = 0; j < server.dbnum; j++) {
1065 server.db[j].dict = dictCreate(&hashDictType,NULL);
1066 server.db[j].expires = dictCreate(&setDictType,NULL);
1067 server.db[j].id = j;
1068 }
1069 server.cronloops = 0;
1070 server.bgsaveinprogress = 0;
1071 server.bgsavechildpid = -1;
1072 server.lastsave = time(NULL);
1073 server.dirty = 0;
1074 server.usedmemory = 0;
1075 server.stat_numcommands = 0;
1076 server.stat_numconnections = 0;
1077 server.stat_starttime = time(NULL);
1078 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1079 }
1080
1081 /* Empty the whole database */
1082 static long long emptyDb() {
1083 int j;
1084 long long removed = 0;
1085
1086 for (j = 0; j < server.dbnum; j++) {
1087 removed += dictSize(server.db[j].dict);
1088 dictEmpty(server.db[j].dict);
1089 dictEmpty(server.db[j].expires);
1090 }
1091 return removed;
1092 }
1093
1094 static int yesnotoi(char *s) {
1095 if (!strcasecmp(s,"yes")) return 1;
1096 else if (!strcasecmp(s,"no")) return 0;
1097 else return -1;
1098 }
1099
1100 /* I agree, this is a very rudimental way to load a configuration...
1101 will improve later if the config gets more complex */
1102 static void loadServerConfig(char *filename) {
1103 FILE *fp;
1104 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1105 int linenum = 0;
1106 sds line = NULL;
1107
1108 if (filename[0] == '-' && filename[1] == '\0')
1109 fp = stdin;
1110 else {
1111 if ((fp = fopen(filename,"r")) == NULL) {
1112 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1113 exit(1);
1114 }
1115 }
1116
1117 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1118 sds *argv;
1119 int argc, j;
1120
1121 linenum++;
1122 line = sdsnew(buf);
1123 line = sdstrim(line," \t\r\n");
1124
1125 /* Skip comments and blank lines*/
1126 if (line[0] == '#' || line[0] == '\0') {
1127 sdsfree(line);
1128 continue;
1129 }
1130
1131 /* Split into arguments */
1132 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1133 sdstolower(argv[0]);
1134
1135 /* Execute config directives */
1136 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1137 server.maxidletime = atoi(argv[1]);
1138 if (server.maxidletime < 0) {
1139 err = "Invalid timeout value"; goto loaderr;
1140 }
1141 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1142 server.port = atoi(argv[1]);
1143 if (server.port < 1 || server.port > 65535) {
1144 err = "Invalid port"; goto loaderr;
1145 }
1146 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1147 server.bindaddr = zstrdup(argv[1]);
1148 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1149 int seconds = atoi(argv[1]);
1150 int changes = atoi(argv[2]);
1151 if (seconds < 1 || changes < 0) {
1152 err = "Invalid save parameters"; goto loaderr;
1153 }
1154 appendServerSaveParams(seconds,changes);
1155 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1156 if (chdir(argv[1]) == -1) {
1157 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1158 argv[1], strerror(errno));
1159 exit(1);
1160 }
1161 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1162 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1163 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1164 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1165 else {
1166 err = "Invalid log level. Must be one of debug, notice, warning";
1167 goto loaderr;
1168 }
1169 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1170 FILE *logfp;
1171
1172 server.logfile = zstrdup(argv[1]);
1173 if (!strcasecmp(server.logfile,"stdout")) {
1174 zfree(server.logfile);
1175 server.logfile = NULL;
1176 }
1177 if (server.logfile) {
1178 /* Test if we are able to open the file. The server will not
1179 * be able to abort just for this problem later... */
1180 logfp = fopen(server.logfile,"a");
1181 if (logfp == NULL) {
1182 err = sdscatprintf(sdsempty(),
1183 "Can't open the log file: %s", strerror(errno));
1184 goto loaderr;
1185 }
1186 fclose(logfp);
1187 }
1188 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1189 server.dbnum = atoi(argv[1]);
1190 if (server.dbnum < 1) {
1191 err = "Invalid number of databases"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1194 server.maxclients = atoi(argv[1]);
1195 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1196 server.maxmemory = strtoll(argv[1], NULL, 10);
1197 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1198 server.masterhost = sdsnew(argv[1]);
1199 server.masterport = atoi(argv[2]);
1200 server.replstate = REDIS_REPL_CONNECT;
1201 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1202 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1203 err = "argument must be 'yes' or 'no'"; goto loaderr;
1204 }
1205 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1206 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1207 err = "argument must be 'yes' or 'no'"; goto loaderr;
1208 }
1209 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1210 server.sharingpoolsize = atoi(argv[1]);
1211 if (server.sharingpoolsize < 1) {
1212 err = "invalid object sharing pool size"; goto loaderr;
1213 }
1214 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1215 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1216 err = "argument must be 'yes' or 'no'"; goto loaderr;
1217 }
1218 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1219 server.requirepass = zstrdup(argv[1]);
1220 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1221 server.pidfile = zstrdup(argv[1]);
1222 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1223 server.dbfilename = zstrdup(argv[1]);
1224 } else {
1225 err = "Bad directive or wrong number of arguments"; goto loaderr;
1226 }
1227 for (j = 0; j < argc; j++)
1228 sdsfree(argv[j]);
1229 zfree(argv);
1230 sdsfree(line);
1231 }
1232 if (fp != stdin) fclose(fp);
1233 return;
1234
1235 loaderr:
1236 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1237 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1238 fprintf(stderr, ">>> '%s'\n", line);
1239 fprintf(stderr, "%s\n", err);
1240 exit(1);
1241 }
1242
1243 static void freeClientArgv(redisClient *c) {
1244 int j;
1245
1246 for (j = 0; j < c->argc; j++)
1247 decrRefCount(c->argv[j]);
1248 for (j = 0; j < c->mbargc; j++)
1249 decrRefCount(c->mbargv[j]);
1250 c->argc = 0;
1251 c->mbargc = 0;
1252 }
1253
1254 static void freeClient(redisClient *c) {
1255 listNode *ln;
1256
1257 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1258 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1259 sdsfree(c->querybuf);
1260 listRelease(c->reply);
1261 freeClientArgv(c);
1262 close(c->fd);
1263 ln = listSearchKey(server.clients,c);
1264 assert(ln != NULL);
1265 listDelNode(server.clients,ln);
1266 if (c->flags & REDIS_SLAVE) {
1267 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1268 close(c->repldbfd);
1269 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1270 ln = listSearchKey(l,c);
1271 assert(ln != NULL);
1272 listDelNode(l,ln);
1273 }
1274 if (c->flags & REDIS_MASTER) {
1275 server.master = NULL;
1276 server.replstate = REDIS_REPL_CONNECT;
1277 }
1278 zfree(c->argv);
1279 zfree(c->mbargv);
1280 zfree(c);
1281 }
1282
1283 static void glueReplyBuffersIfNeeded(redisClient *c) {
1284 int totlen = 0;
1285 listNode *ln;
1286 robj *o;
1287
1288 listRewind(c->reply);
1289 while((ln = listYield(c->reply))) {
1290 o = ln->value;
1291 totlen += sdslen(o->ptr);
1292 /* This optimization makes more sense if we don't have to copy
1293 * too much data */
1294 if (totlen > 1024) return;
1295 }
1296 if (totlen > 0) {
1297 char buf[1024];
1298 int copylen = 0;
1299
1300 listRewind(c->reply);
1301 while((ln = listYield(c->reply))) {
1302 o = ln->value;
1303 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1304 copylen += sdslen(o->ptr);
1305 listDelNode(c->reply,ln);
1306 }
1307 /* Now the output buffer is empty, add the new single element */
1308 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1309 listAddNodeTail(c->reply,o);
1310 }
1311 }
1312
1313 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1314 redisClient *c = privdata;
1315 int nwritten = 0, totwritten = 0, objlen;
1316 robj *o;
1317 REDIS_NOTUSED(el);
1318 REDIS_NOTUSED(mask);
1319
1320 if (server.glueoutputbuf && listLength(c->reply) > 1)
1321 glueReplyBuffersIfNeeded(c);
1322 while(listLength(c->reply)) {
1323 o = listNodeValue(listFirst(c->reply));
1324 objlen = sdslen(o->ptr);
1325
1326 if (objlen == 0) {
1327 listDelNode(c->reply,listFirst(c->reply));
1328 continue;
1329 }
1330
1331 if (c->flags & REDIS_MASTER) {
1332 /* Don't reply to a master */
1333 nwritten = objlen - c->sentlen;
1334 } else {
1335 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1336 if (nwritten <= 0) break;
1337 }
1338 c->sentlen += nwritten;
1339 totwritten += nwritten;
1340 /* If we fully sent the object on head go to the next one */
1341 if (c->sentlen == objlen) {
1342 listDelNode(c->reply,listFirst(c->reply));
1343 c->sentlen = 0;
1344 }
1345 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1346 * bytes, in a single threaded server it's a good idea to server
1347 * other clients as well, even if a very large request comes from
1348 * super fast link that is always able to accept data (in real world
1349 * terms think to 'KEYS *' against the loopback interfae) */
1350 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1351 }
1352 if (nwritten == -1) {
1353 if (errno == EAGAIN) {
1354 nwritten = 0;
1355 } else {
1356 redisLog(REDIS_DEBUG,
1357 "Error writing to client: %s", strerror(errno));
1358 freeClient(c);
1359 return;
1360 }
1361 }
1362 if (totwritten > 0) c->lastinteraction = time(NULL);
1363 if (listLength(c->reply) == 0) {
1364 c->sentlen = 0;
1365 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1366 }
1367 }
1368
1369 static struct redisCommand *lookupCommand(char *name) {
1370 int j = 0;
1371 while(cmdTable[j].name != NULL) {
1372 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1373 j++;
1374 }
1375 return NULL;
1376 }
1377
1378 /* resetClient prepare the client to process the next command */
1379 static void resetClient(redisClient *c) {
1380 freeClientArgv(c);
1381 c->bulklen = -1;
1382 c->multibulk = 0;
1383 }
1384
1385 /* If this function gets called we already read a whole
1386 * command, argments are in the client argv/argc fields.
1387 * processCommand() execute the command or prepare the
1388 * server for a bulk read from the client.
1389 *
1390 * If 1 is returned the client is still alive and valid and
1391 * and other operations can be performed by the caller. Otherwise
1392 * if 0 is returned the client was destroied (i.e. after QUIT). */
1393 static int processCommand(redisClient *c) {
1394 struct redisCommand *cmd;
1395 long long dirty;
1396
1397 /* Free some memory if needed (maxmemory setting) */
1398 if (server.maxmemory) freeMemoryIfNeeded();
1399
1400 /* Handle the multi bulk command type. This is an alternative protocol
1401 * supported by Redis in order to receive commands that are composed of
1402 * multiple binary-safe "bulk" arguments. The latency of processing is
1403 * a bit higher but this allows things like multi-sets, so if this
1404 * protocol is used only for MSET and similar commands this is a big win. */
1405 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1406 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1407 if (c->multibulk <= 0) {
1408 resetClient(c);
1409 return 1;
1410 } else {
1411 decrRefCount(c->argv[c->argc-1]);
1412 c->argc--;
1413 return 1;
1414 }
1415 } else if (c->multibulk) {
1416 if (c->bulklen == -1) {
1417 if (((char*)c->argv[0]->ptr)[0] != '$') {
1418 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1419 resetClient(c);
1420 return 1;
1421 } else {
1422 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1423 decrRefCount(c->argv[0]);
1424 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1425 c->argc--;
1426 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1427 resetClient(c);
1428 return 1;
1429 }
1430 c->argc--;
1431 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1432 return 1;
1433 }
1434 } else {
1435 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1436 c->mbargv[c->mbargc] = c->argv[0];
1437 c->mbargc++;
1438 c->argc--;
1439 c->multibulk--;
1440 if (c->multibulk == 0) {
1441 robj **auxargv;
1442 int auxargc;
1443
1444 /* Here we need to swap the multi-bulk argc/argv with the
1445 * normal argc/argv of the client structure. */
1446 auxargv = c->argv;
1447 c->argv = c->mbargv;
1448 c->mbargv = auxargv;
1449
1450 auxargc = c->argc;
1451 c->argc = c->mbargc;
1452 c->mbargc = auxargc;
1453
1454 /* We need to set bulklen to something different than -1
1455 * in order for the code below to process the command without
1456 * to try to read the last argument of a bulk command as
1457 * a special argument. */
1458 c->bulklen = 0;
1459 /* continue below and process the command */
1460 } else {
1461 c->bulklen = -1;
1462 return 1;
1463 }
1464 }
1465 }
1466 /* -- end of multi bulk commands processing -- */
1467
1468 /* The QUIT command is handled as a special case. Normal command
1469 * procs are unable to close the client connection safely */
1470 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1471 freeClient(c);
1472 return 0;
1473 }
1474 cmd = lookupCommand(c->argv[0]->ptr);
1475 if (!cmd) {
1476 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1477 resetClient(c);
1478 return 1;
1479 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1480 (c->argc < -cmd->arity)) {
1481 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1482 resetClient(c);
1483 return 1;
1484 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1485 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1486 resetClient(c);
1487 return 1;
1488 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1489 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1490
1491 decrRefCount(c->argv[c->argc-1]);
1492 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1493 c->argc--;
1494 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1495 resetClient(c);
1496 return 1;
1497 }
1498 c->argc--;
1499 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1500 /* It is possible that the bulk read is already in the
1501 * buffer. Check this condition and handle it accordingly.
1502 * This is just a fast path, alternative to call processInputBuffer().
1503 * It's a good idea since the code is small and this condition
1504 * happens most of the times. */
1505 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1506 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1507 c->argc++;
1508 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1509 } else {
1510 return 1;
1511 }
1512 }
1513 /* Let's try to share objects on the command arguments vector */
1514 if (server.shareobjects) {
1515 int j;
1516 for(j = 1; j < c->argc; j++)
1517 c->argv[j] = tryObjectSharing(c->argv[j]);
1518 }
1519 /* Let's try to encode the bulk object to save space. */
1520 if (cmd->flags & REDIS_CMD_BULK)
1521 tryObjectEncoding(c->argv[c->argc-1]);
1522
1523 /* Check if the user is authenticated */
1524 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1525 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1526 resetClient(c);
1527 return 1;
1528 }
1529
1530 /* Exec the command */
1531 dirty = server.dirty;
1532 cmd->proc(c);
1533 if (server.dirty-dirty != 0 && listLength(server.slaves))
1534 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1535 if (listLength(server.monitors))
1536 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1537 server.stat_numcommands++;
1538
1539 /* Prepare the client for the next command */
1540 if (c->flags & REDIS_CLOSE) {
1541 freeClient(c);
1542 return 0;
1543 }
1544 resetClient(c);
1545 return 1;
1546 }
1547
1548 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1549 listNode *ln;
1550 int outc = 0, j;
1551 robj **outv;
1552 /* (args*2)+1 is enough room for args, spaces, newlines */
1553 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1554
1555 if (argc <= REDIS_STATIC_ARGS) {
1556 outv = static_outv;
1557 } else {
1558 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1559 }
1560
1561 for (j = 0; j < argc; j++) {
1562 if (j != 0) outv[outc++] = shared.space;
1563 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1564 robj *lenobj;
1565
1566 lenobj = createObject(REDIS_STRING,
1567 sdscatprintf(sdsempty(),"%d\r\n",
1568 stringObjectLen(argv[j])));
1569 lenobj->refcount = 0;
1570 outv[outc++] = lenobj;
1571 }
1572 outv[outc++] = argv[j];
1573 }
1574 outv[outc++] = shared.crlf;
1575
1576 /* Increment all the refcounts at start and decrement at end in order to
1577 * be sure to free objects if there is no slave in a replication state
1578 * able to be feed with commands */
1579 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1580 listRewind(slaves);
1581 while((ln = listYield(slaves))) {
1582 redisClient *slave = ln->value;
1583
1584 /* Don't feed slaves that are still waiting for BGSAVE to start */
1585 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1586
1587 /* Feed all the other slaves, MONITORs and so on */
1588 if (slave->slaveseldb != dictid) {
1589 robj *selectcmd;
1590
1591 switch(dictid) {
1592 case 0: selectcmd = shared.select0; break;
1593 case 1: selectcmd = shared.select1; break;
1594 case 2: selectcmd = shared.select2; break;
1595 case 3: selectcmd = shared.select3; break;
1596 case 4: selectcmd = shared.select4; break;
1597 case 5: selectcmd = shared.select5; break;
1598 case 6: selectcmd = shared.select6; break;
1599 case 7: selectcmd = shared.select7; break;
1600 case 8: selectcmd = shared.select8; break;
1601 case 9: selectcmd = shared.select9; break;
1602 default:
1603 selectcmd = createObject(REDIS_STRING,
1604 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1605 selectcmd->refcount = 0;
1606 break;
1607 }
1608 addReply(slave,selectcmd);
1609 slave->slaveseldb = dictid;
1610 }
1611 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1612 }
1613 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1614 if (outv != static_outv) zfree(outv);
1615 }
1616
1617 static void processInputBuffer(redisClient *c) {
1618 again:
1619 if (c->bulklen == -1) {
1620 /* Read the first line of the query */
1621 char *p = strchr(c->querybuf,'\n');
1622 size_t querylen;
1623
1624 if (p) {
1625 sds query, *argv;
1626 int argc, j;
1627
1628 query = c->querybuf;
1629 c->querybuf = sdsempty();
1630 querylen = 1+(p-(query));
1631 if (sdslen(query) > querylen) {
1632 /* leave data after the first line of the query in the buffer */
1633 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1634 }
1635 *p = '\0'; /* remove "\n" */
1636 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1637 sdsupdatelen(query);
1638
1639 /* Now we can split the query in arguments */
1640 if (sdslen(query) == 0) {
1641 /* Ignore empty query */
1642 sdsfree(query);
1643 return;
1644 }
1645 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1646 sdsfree(query);
1647
1648 if (c->argv) zfree(c->argv);
1649 c->argv = zmalloc(sizeof(robj*)*argc);
1650
1651 for (j = 0; j < argc; j++) {
1652 if (sdslen(argv[j])) {
1653 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1654 c->argc++;
1655 } else {
1656 sdsfree(argv[j]);
1657 }
1658 }
1659 zfree(argv);
1660 /* Execute the command. If the client is still valid
1661 * after processCommand() return and there is something
1662 * on the query buffer try to process the next command. */
1663 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1664 return;
1665 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1666 redisLog(REDIS_DEBUG, "Client protocol error");
1667 freeClient(c);
1668 return;
1669 }
1670 } else {
1671 /* Bulk read handling. Note that if we are at this point
1672 the client already sent a command terminated with a newline,
1673 we are reading the bulk data that is actually the last
1674 argument of the command. */
1675 int qbl = sdslen(c->querybuf);
1676
1677 if (c->bulklen <= qbl) {
1678 /* Copy everything but the final CRLF as final argument */
1679 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1680 c->argc++;
1681 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1682 /* Process the command. If the client is still valid after
1683 * the processing and there is more data in the buffer
1684 * try to parse it. */
1685 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1686 return;
1687 }
1688 }
1689 }
1690
1691 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1692 redisClient *c = (redisClient*) privdata;
1693 char buf[REDIS_IOBUF_LEN];
1694 int nread;
1695 REDIS_NOTUSED(el);
1696 REDIS_NOTUSED(mask);
1697
1698 nread = read(fd, buf, REDIS_IOBUF_LEN);
1699 if (nread == -1) {
1700 if (errno == EAGAIN) {
1701 nread = 0;
1702 } else {
1703 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1704 freeClient(c);
1705 return;
1706 }
1707 } else if (nread == 0) {
1708 redisLog(REDIS_DEBUG, "Client closed connection");
1709 freeClient(c);
1710 return;
1711 }
1712 if (nread) {
1713 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1714 c->lastinteraction = time(NULL);
1715 } else {
1716 return;
1717 }
1718 processInputBuffer(c);
1719 }
1720
1721 static int selectDb(redisClient *c, int id) {
1722 if (id < 0 || id >= server.dbnum)
1723 return REDIS_ERR;
1724 c->db = &server.db[id];
1725 return REDIS_OK;
1726 }
1727
1728 static void *dupClientReplyValue(void *o) {
1729 incrRefCount((robj*)o);
1730 return 0;
1731 }
1732
1733 static redisClient *createClient(int fd) {
1734 redisClient *c = zmalloc(sizeof(*c));
1735
1736 anetNonBlock(NULL,fd);
1737 anetTcpNoDelay(NULL,fd);
1738 if (!c) return NULL;
1739 selectDb(c,0);
1740 c->fd = fd;
1741 c->querybuf = sdsempty();
1742 c->argc = 0;
1743 c->argv = NULL;
1744 c->bulklen = -1;
1745 c->multibulk = 0;
1746 c->mbargc = 0;
1747 c->mbargv = NULL;
1748 c->sentlen = 0;
1749 c->flags = 0;
1750 c->lastinteraction = time(NULL);
1751 c->authenticated = 0;
1752 c->replstate = REDIS_REPL_NONE;
1753 c->reply = listCreate();
1754 listSetFreeMethod(c->reply,decrRefCount);
1755 listSetDupMethod(c->reply,dupClientReplyValue);
1756 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1757 readQueryFromClient, c, NULL) == AE_ERR) {
1758 freeClient(c);
1759 return NULL;
1760 }
1761 listAddNodeTail(server.clients,c);
1762 return c;
1763 }
1764
1765 static void addReply(redisClient *c, robj *obj) {
1766 if (listLength(c->reply) == 0 &&
1767 (c->replstate == REDIS_REPL_NONE ||
1768 c->replstate == REDIS_REPL_ONLINE) &&
1769 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1770 sendReplyToClient, c, NULL) == AE_ERR) return;
1771 if (obj->encoding != REDIS_ENCODING_RAW) {
1772 obj = getDecodedObject(obj);
1773 } else {
1774 incrRefCount(obj);
1775 }
1776 listAddNodeTail(c->reply,obj);
1777 }
1778
1779 static void addReplySds(redisClient *c, sds s) {
1780 robj *o = createObject(REDIS_STRING,s);
1781 addReply(c,o);
1782 decrRefCount(o);
1783 }
1784
1785 static void addReplyBulkLen(redisClient *c, robj *obj) {
1786 size_t len;
1787
1788 if (obj->encoding == REDIS_ENCODING_RAW) {
1789 len = sdslen(obj->ptr);
1790 } else {
1791 long n = (long)obj->ptr;
1792
1793 len = 1;
1794 if (n < 0) {
1795 len++;
1796 n = -n;
1797 }
1798 while((n = n/10) != 0) {
1799 len++;
1800 }
1801 }
1802 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1803 }
1804
1805 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1806 int cport, cfd;
1807 char cip[128];
1808 redisClient *c;
1809 REDIS_NOTUSED(el);
1810 REDIS_NOTUSED(mask);
1811 REDIS_NOTUSED(privdata);
1812
1813 cfd = anetAccept(server.neterr, fd, cip, &cport);
1814 if (cfd == AE_ERR) {
1815 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1816 return;
1817 }
1818 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1819 if ((c = createClient(cfd)) == NULL) {
1820 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1821 close(cfd); /* May be already closed, just ingore errors */
1822 return;
1823 }
1824 /* If maxclient directive is set and this is one client more... close the
1825 * connection. Note that we create the client instead to check before
1826 * for this condition, since now the socket is already set in nonblocking
1827 * mode and we can send an error for free using the Kernel I/O */
1828 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1829 char *err = "-ERR max number of clients reached\r\n";
1830
1831 /* That's a best effort error message, don't check write errors */
1832 (void) write(c->fd,err,strlen(err));
1833 freeClient(c);
1834 return;
1835 }
1836 server.stat_numconnections++;
1837 }
1838
1839 /* ======================= Redis objects implementation ===================== */
1840
1841 static robj *createObject(int type, void *ptr) {
1842 robj *o;
1843
1844 if (listLength(server.objfreelist)) {
1845 listNode *head = listFirst(server.objfreelist);
1846 o = listNodeValue(head);
1847 listDelNode(server.objfreelist,head);
1848 } else {
1849 o = zmalloc(sizeof(*o));
1850 }
1851 o->type = type;
1852 o->encoding = REDIS_ENCODING_RAW;
1853 o->ptr = ptr;
1854 o->refcount = 1;
1855 return o;
1856 }
1857
1858 static robj *createStringObject(char *ptr, size_t len) {
1859 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1860 }
1861
1862 static robj *createListObject(void) {
1863 list *l = listCreate();
1864
1865 listSetFreeMethod(l,decrRefCount);
1866 return createObject(REDIS_LIST,l);
1867 }
1868
1869 static robj *createSetObject(void) {
1870 dict *d = dictCreate(&setDictType,NULL);
1871 return createObject(REDIS_SET,d);
1872 }
1873
1874 static robj *createZsetObject(void) {
1875 zset *zs = zmalloc(sizeof(*zs));
1876
1877 zs->dict = dictCreate(&zsetDictType,NULL);
1878 zs->zsl = zslCreate();
1879 return createObject(REDIS_ZSET,zs);
1880 }
1881
1882 static void freeStringObject(robj *o) {
1883 if (o->encoding == REDIS_ENCODING_RAW) {
1884 sdsfree(o->ptr);
1885 }
1886 }
1887
1888 static void freeListObject(robj *o) {
1889 listRelease((list*) o->ptr);
1890 }
1891
1892 static void freeSetObject(robj *o) {
1893 dictRelease((dict*) o->ptr);
1894 }
1895
1896 static void freeZsetObject(robj *o) {
1897 zset *zs = o->ptr;
1898
1899 dictRelease(zs->dict);
1900 zslFree(zs->zsl);
1901 zfree(zs);
1902 }
1903
1904 static void freeHashObject(robj *o) {
1905 dictRelease((dict*) o->ptr);
1906 }
1907
1908 static void incrRefCount(robj *o) {
1909 o->refcount++;
1910 #ifdef DEBUG_REFCOUNT
1911 if (o->type == REDIS_STRING)
1912 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1913 #endif
1914 }
1915
1916 static void decrRefCount(void *obj) {
1917 robj *o = obj;
1918
1919 #ifdef DEBUG_REFCOUNT
1920 if (o->type == REDIS_STRING)
1921 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1922 #endif
1923 if (--(o->refcount) == 0) {
1924 switch(o->type) {
1925 case REDIS_STRING: freeStringObject(o); break;
1926 case REDIS_LIST: freeListObject(o); break;
1927 case REDIS_SET: freeSetObject(o); break;
1928 case REDIS_ZSET: freeZsetObject(o); break;
1929 case REDIS_HASH: freeHashObject(o); break;
1930 default: assert(0 != 0); break;
1931 }
1932 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1933 !listAddNodeHead(server.objfreelist,o))
1934 zfree(o);
1935 }
1936 }
1937
1938 static robj *lookupKey(redisDb *db, robj *key) {
1939 dictEntry *de = dictFind(db->dict,key);
1940 return de ? dictGetEntryVal(de) : NULL;
1941 }
1942
1943 static robj *lookupKeyRead(redisDb *db, robj *key) {
1944 expireIfNeeded(db,key);
1945 return lookupKey(db,key);
1946 }
1947
1948 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1949 deleteIfVolatile(db,key);
1950 return lookupKey(db,key);
1951 }
1952
1953 static int deleteKey(redisDb *db, robj *key) {
1954 int retval;
1955
1956 /* We need to protect key from destruction: after the first dictDelete()
1957 * it may happen that 'key' is no longer valid if we don't increment
1958 * it's count. This may happen when we get the object reference directly
1959 * from the hash table with dictRandomKey() or dict iterators */
1960 incrRefCount(key);
1961 if (dictSize(db->expires)) dictDelete(db->expires,key);
1962 retval = dictDelete(db->dict,key);
1963 decrRefCount(key);
1964
1965 return retval == DICT_OK;
1966 }
1967
1968 /* Try to share an object against the shared objects pool */
1969 static robj *tryObjectSharing(robj *o) {
1970 struct dictEntry *de;
1971 unsigned long c;
1972
1973 if (o == NULL || server.shareobjects == 0) return o;
1974
1975 assert(o->type == REDIS_STRING);
1976 de = dictFind(server.sharingpool,o);
1977 if (de) {
1978 robj *shared = dictGetEntryKey(de);
1979
1980 c = ((unsigned long) dictGetEntryVal(de))+1;
1981 dictGetEntryVal(de) = (void*) c;
1982 incrRefCount(shared);
1983 decrRefCount(o);
1984 return shared;
1985 } else {
1986 /* Here we are using a stream algorihtm: Every time an object is
1987 * shared we increment its count, everytime there is a miss we
1988 * recrement the counter of a random object. If this object reaches
1989 * zero we remove the object and put the current object instead. */
1990 if (dictSize(server.sharingpool) >=
1991 server.sharingpoolsize) {
1992 de = dictGetRandomKey(server.sharingpool);
1993 assert(de != NULL);
1994 c = ((unsigned long) dictGetEntryVal(de))-1;
1995 dictGetEntryVal(de) = (void*) c;
1996 if (c == 0) {
1997 dictDelete(server.sharingpool,de->key);
1998 }
1999 } else {
2000 c = 0; /* If the pool is empty we want to add this object */
2001 }
2002 if (c == 0) {
2003 int retval;
2004
2005 retval = dictAdd(server.sharingpool,o,(void*)1);
2006 assert(retval == DICT_OK);
2007 incrRefCount(o);
2008 }
2009 return o;
2010 }
2011 }
2012
2013 /* Check if the nul-terminated string 's' can be represented by a long
2014 * (that is, is a number that fits into long without any other space or
2015 * character before or after the digits).
2016 *
2017 * If so, the function returns REDIS_OK and *longval is set to the value
2018 * of the number. Otherwise REDIS_ERR is returned */
2019 static int isStringRepresentableAsLong(sds s, long *longval) {
2020 char buf[32], *endptr;
2021 long value;
2022 int slen;
2023
2024 value = strtol(s, &endptr, 10);
2025 if (endptr[0] != '\0') return REDIS_ERR;
2026 slen = snprintf(buf,32,"%ld",value);
2027
2028 /* If the number converted back into a string is not identical
2029 * then it's not possible to encode the string as integer */
2030 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2031 if (longval) *longval = value;
2032 return REDIS_OK;
2033 }
2034
2035 /* Try to encode a string object in order to save space */
2036 static int tryObjectEncoding(robj *o) {
2037 long value;
2038 sds s = o->ptr;
2039
2040 if (o->encoding != REDIS_ENCODING_RAW)
2041 return REDIS_ERR; /* Already encoded */
2042
2043 /* It's not save to encode shared objects: shared objects can be shared
2044 * everywhere in the "object space" of Redis. Encoded objects can only
2045 * appear as "values" (and not, for instance, as keys) */
2046 if (o->refcount > 1) return REDIS_ERR;
2047
2048 /* Currently we try to encode only strings */
2049 assert(o->type == REDIS_STRING);
2050
2051 /* Check if we can represent this string as a long integer */
2052 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2053
2054 /* Ok, this object can be encoded */
2055 o->encoding = REDIS_ENCODING_INT;
2056 sdsfree(o->ptr);
2057 o->ptr = (void*) value;
2058 return REDIS_OK;
2059 }
2060
2061 /* Get a decoded version of an encoded object (returned as a new object) */
2062 static robj *getDecodedObject(const robj *o) {
2063 robj *dec;
2064
2065 assert(o->encoding != REDIS_ENCODING_RAW);
2066 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2067 char buf[32];
2068
2069 snprintf(buf,32,"%ld",(long)o->ptr);
2070 dec = createStringObject(buf,strlen(buf));
2071 return dec;
2072 } else {
2073 assert(1 != 1);
2074 }
2075 }
2076
2077 /* Compare two string objects via strcmp() or alike.
2078 * Note that the objects may be integer-encoded. In such a case we
2079 * use snprintf() to get a string representation of the numbers on the stack
2080 * and compare the strings, it's much faster than calling getDecodedObject(). */
2081 static int compareStringObjects(robj *a, robj *b) {
2082 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2083 char bufa[128], bufb[128], *astr, *bstr;
2084 int bothsds = 1;
2085
2086 if (a == b) return 0;
2087 if (a->encoding != REDIS_ENCODING_RAW) {
2088 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2089 astr = bufa;
2090 bothsds = 0;
2091 } else {
2092 astr = a->ptr;
2093 }
2094 if (b->encoding != REDIS_ENCODING_RAW) {
2095 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2096 bstr = bufb;
2097 bothsds = 0;
2098 } else {
2099 bstr = b->ptr;
2100 }
2101 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2102 }
2103
2104 static size_t stringObjectLen(robj *o) {
2105 assert(o->type == REDIS_STRING);
2106 if (o->encoding == REDIS_ENCODING_RAW) {
2107 return sdslen(o->ptr);
2108 } else {
2109 char buf[32];
2110
2111 return snprintf(buf,32,"%ld",(long)o->ptr);
2112 }
2113 }
2114
2115 /*============================ DB saving/loading ============================ */
2116
2117 static int rdbSaveType(FILE *fp, unsigned char type) {
2118 if (fwrite(&type,1,1,fp) == 0) return -1;
2119 return 0;
2120 }
2121
2122 static int rdbSaveTime(FILE *fp, time_t t) {
2123 int32_t t32 = (int32_t) t;
2124 if (fwrite(&t32,4,1,fp) == 0) return -1;
2125 return 0;
2126 }
2127
2128 /* check rdbLoadLen() comments for more info */
2129 static int rdbSaveLen(FILE *fp, uint32_t len) {
2130 unsigned char buf[2];
2131
2132 if (len < (1<<6)) {
2133 /* Save a 6 bit len */
2134 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2135 if (fwrite(buf,1,1,fp) == 0) return -1;
2136 } else if (len < (1<<14)) {
2137 /* Save a 14 bit len */
2138 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2139 buf[1] = len&0xFF;
2140 if (fwrite(buf,2,1,fp) == 0) return -1;
2141 } else {
2142 /* Save a 32 bit len */
2143 buf[0] = (REDIS_RDB_32BITLEN<<6);
2144 if (fwrite(buf,1,1,fp) == 0) return -1;
2145 len = htonl(len);
2146 if (fwrite(&len,4,1,fp) == 0) return -1;
2147 }
2148 return 0;
2149 }
2150
2151 /* String objects in the form "2391" "-100" without any space and with a
2152 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2153 * encoded as integers to save space */
2154 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2155 long long value;
2156 char *endptr, buf[32];
2157
2158 /* Check if it's possible to encode this value as a number */
2159 value = strtoll(s, &endptr, 10);
2160 if (endptr[0] != '\0') return 0;
2161 snprintf(buf,32,"%lld",value);
2162
2163 /* If the number converted back into a string is not identical
2164 * then it's not possible to encode the string as integer */
2165 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2166
2167 /* Finally check if it fits in our ranges */
2168 if (value >= -(1<<7) && value <= (1<<7)-1) {
2169 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2170 enc[1] = value&0xFF;
2171 return 2;
2172 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2173 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2174 enc[1] = value&0xFF;
2175 enc[2] = (value>>8)&0xFF;
2176 return 3;
2177 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2178 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2179 enc[1] = value&0xFF;
2180 enc[2] = (value>>8)&0xFF;
2181 enc[3] = (value>>16)&0xFF;
2182 enc[4] = (value>>24)&0xFF;
2183 return 5;
2184 } else {
2185 return 0;
2186 }
2187 }
2188
2189 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2190 unsigned int comprlen, outlen;
2191 unsigned char byte;
2192 void *out;
2193
2194 /* We require at least four bytes compression for this to be worth it */
2195 outlen = sdslen(obj->ptr)-4;
2196 if (outlen <= 0) return 0;
2197 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2198 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2199 if (comprlen == 0) {
2200 zfree(out);
2201 return 0;
2202 }
2203 /* Data compressed! Let's save it on disk */
2204 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2205 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2206 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2207 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2208 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2209 zfree(out);
2210 return comprlen;
2211
2212 writeerr:
2213 zfree(out);
2214 return -1;
2215 }
2216
2217 /* Save a string objet as [len][data] on disk. If the object is a string
2218 * representation of an integer value we try to safe it in a special form */
2219 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2220 size_t len;
2221 int enclen;
2222
2223 len = sdslen(obj->ptr);
2224
2225 /* Try integer encoding */
2226 if (len <= 11) {
2227 unsigned char buf[5];
2228 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2229 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2230 return 0;
2231 }
2232 }
2233
2234 /* Try LZF compression - under 20 bytes it's unable to compress even
2235 * aaaaaaaaaaaaaaaaaa so skip it */
2236 if (len > 20) {
2237 int retval;
2238
2239 retval = rdbSaveLzfStringObject(fp,obj);
2240 if (retval == -1) return -1;
2241 if (retval > 0) return 0;
2242 /* retval == 0 means data can't be compressed, save the old way */
2243 }
2244
2245 /* Store verbatim */
2246 if (rdbSaveLen(fp,len) == -1) return -1;
2247 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2248 return 0;
2249 }
2250
2251 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2252 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2253 int retval;
2254 robj *dec;
2255
2256 if (obj->encoding != REDIS_ENCODING_RAW) {
2257 dec = getDecodedObject(obj);
2258 retval = rdbSaveStringObjectRaw(fp,dec);
2259 decrRefCount(dec);
2260 return retval;
2261 } else {
2262 return rdbSaveStringObjectRaw(fp,obj);
2263 }
2264 }
2265
2266 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2267 * 8 bit integer specifing the length of the representation.
2268 * This 8 bit integer has special values in order to specify the following
2269 * conditions:
2270 * 253: not a number
2271 * 254: + inf
2272 * 255: - inf
2273 */
2274 static int rdbSaveDoubleValue(FILE *fp, double val) {
2275 unsigned char buf[128];
2276 int len;
2277
2278 if (isnan(val)) {
2279 buf[0] = 253;
2280 len = 1;
2281 } else if (!isfinite(val)) {
2282 len = 1;
2283 buf[0] = (val < 0) ? 255 : 254;
2284 } else {
2285 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2286 buf[0] = strlen((char*)buf);
2287 len = buf[0]+1;
2288 }
2289 if (fwrite(buf,len,1,fp) == 0) return -1;
2290 return 0;
2291 }
2292
2293 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2294 static int rdbSave(char *filename) {
2295 dictIterator *di = NULL;
2296 dictEntry *de;
2297 FILE *fp;
2298 char tmpfile[256];
2299 int j;
2300 time_t now = time(NULL);
2301
2302 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2303 fp = fopen(tmpfile,"w");
2304 if (!fp) {
2305 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2306 return REDIS_ERR;
2307 }
2308 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2309 for (j = 0; j < server.dbnum; j++) {
2310 redisDb *db = server.db+j;
2311 dict *d = db->dict;
2312 if (dictSize(d) == 0) continue;
2313 di = dictGetIterator(d);
2314 if (!di) {
2315 fclose(fp);
2316 return REDIS_ERR;
2317 }
2318
2319 /* Write the SELECT DB opcode */
2320 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2321 if (rdbSaveLen(fp,j) == -1) goto werr;
2322
2323 /* Iterate this DB writing every entry */
2324 while((de = dictNext(di)) != NULL) {
2325 robj *key = dictGetEntryKey(de);
2326 robj *o = dictGetEntryVal(de);
2327 time_t expiretime = getExpire(db,key);
2328
2329 /* Save the expire time */
2330 if (expiretime != -1) {
2331 /* If this key is already expired skip it */
2332 if (expiretime < now) continue;
2333 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2334 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2335 }
2336 /* Save the key and associated value */
2337 if (rdbSaveType(fp,o->type) == -1) goto werr;
2338 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2339 if (o->type == REDIS_STRING) {
2340 /* Save a string value */
2341 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2342 } else if (o->type == REDIS_LIST) {
2343 /* Save a list value */
2344 list *list = o->ptr;
2345 listNode *ln;
2346
2347 listRewind(list);
2348 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2349 while((ln = listYield(list))) {
2350 robj *eleobj = listNodeValue(ln);
2351
2352 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2353 }
2354 } else if (o->type == REDIS_SET) {
2355 /* Save a set value */
2356 dict *set = o->ptr;
2357 dictIterator *di = dictGetIterator(set);
2358 dictEntry *de;
2359
2360 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2361 while((de = dictNext(di)) != NULL) {
2362 robj *eleobj = dictGetEntryKey(de);
2363
2364 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2365 }
2366 dictReleaseIterator(di);
2367 } else if (o->type == REDIS_ZSET) {
2368 /* Save a set value */
2369 zset *zs = o->ptr;
2370 dictIterator *di = dictGetIterator(zs->dict);
2371 dictEntry *de;
2372
2373 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2374 while((de = dictNext(di)) != NULL) {
2375 robj *eleobj = dictGetEntryKey(de);
2376 double *score = dictGetEntryVal(de);
2377
2378 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2379 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2380 }
2381 dictReleaseIterator(di);
2382 } else {
2383 assert(0 != 0);
2384 }
2385 }
2386 dictReleaseIterator(di);
2387 }
2388 /* EOF opcode */
2389 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2390
2391 /* Make sure data will not remain on the OS's output buffers */
2392 fflush(fp);
2393 fsync(fileno(fp));
2394 fclose(fp);
2395
2396 /* Use RENAME to make sure the DB file is changed atomically only
2397 * if the generate DB file is ok. */
2398 if (rename(tmpfile,filename) == -1) {
2399 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2400 unlink(tmpfile);
2401 return REDIS_ERR;
2402 }
2403 redisLog(REDIS_NOTICE,"DB saved on disk");
2404 server.dirty = 0;
2405 server.lastsave = time(NULL);
2406 return REDIS_OK;
2407
2408 werr:
2409 fclose(fp);
2410 unlink(tmpfile);
2411 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2412 if (di) dictReleaseIterator(di);
2413 return REDIS_ERR;
2414 }
2415
2416 static int rdbSaveBackground(char *filename) {
2417 pid_t childpid;
2418
2419 if (server.bgsaveinprogress) return REDIS_ERR;
2420 if ((childpid = fork()) == 0) {
2421 /* Child */
2422 close(server.fd);
2423 if (rdbSave(filename) == REDIS_OK) {
2424 exit(0);
2425 } else {
2426 exit(1);
2427 }
2428 } else {
2429 /* Parent */
2430 if (childpid == -1) {
2431 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2432 strerror(errno));
2433 return REDIS_ERR;
2434 }
2435 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2436 server.bgsaveinprogress = 1;
2437 server.bgsavechildpid = childpid;
2438 return REDIS_OK;
2439 }
2440 return REDIS_OK; /* unreached */
2441 }
2442
2443 static void rdbRemoveTempFile(pid_t childpid) {
2444 char tmpfile[256];
2445
2446 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2447 unlink(tmpfile);
2448 }
2449
2450 static int rdbLoadType(FILE *fp) {
2451 unsigned char type;
2452 if (fread(&type,1,1,fp) == 0) return -1;
2453 return type;
2454 }
2455
2456 static time_t rdbLoadTime(FILE *fp) {
2457 int32_t t32;
2458 if (fread(&t32,4,1,fp) == 0) return -1;
2459 return (time_t) t32;
2460 }
2461
2462 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2463 * of this file for a description of how this are stored on disk.
2464 *
2465 * isencoded is set to 1 if the readed length is not actually a length but
2466 * an "encoding type", check the above comments for more info */
2467 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2468 unsigned char buf[2];
2469 uint32_t len;
2470
2471 if (isencoded) *isencoded = 0;
2472 if (rdbver == 0) {
2473 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2474 return ntohl(len);
2475 } else {
2476 int type;
2477
2478 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2479 type = (buf[0]&0xC0)>>6;
2480 if (type == REDIS_RDB_6BITLEN) {
2481 /* Read a 6 bit len */
2482 return buf[0]&0x3F;
2483 } else if (type == REDIS_RDB_ENCVAL) {
2484 /* Read a 6 bit len encoding type */
2485 if (isencoded) *isencoded = 1;
2486 return buf[0]&0x3F;
2487 } else if (type == REDIS_RDB_14BITLEN) {
2488 /* Read a 14 bit len */
2489 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2490 return ((buf[0]&0x3F)<<8)|buf[1];
2491 } else {
2492 /* Read a 32 bit len */
2493 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2494 return ntohl(len);
2495 }
2496 }
2497 }
2498
2499 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2500 unsigned char enc[4];
2501 long long val;
2502
2503 if (enctype == REDIS_RDB_ENC_INT8) {
2504 if (fread(enc,1,1,fp) == 0) return NULL;
2505 val = (signed char)enc[0];
2506 } else if (enctype == REDIS_RDB_ENC_INT16) {
2507 uint16_t v;
2508 if (fread(enc,2,1,fp) == 0) return NULL;
2509 v = enc[0]|(enc[1]<<8);
2510 val = (int16_t)v;
2511 } else if (enctype == REDIS_RDB_ENC_INT32) {
2512 uint32_t v;
2513 if (fread(enc,4,1,fp) == 0) return NULL;
2514 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2515 val = (int32_t)v;
2516 } else {
2517 val = 0; /* anti-warning */
2518 assert(0!=0);
2519 }
2520 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2521 }
2522
2523 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2524 unsigned int len, clen;
2525 unsigned char *c = NULL;
2526 sds val = NULL;
2527
2528 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2529 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2530 if ((c = zmalloc(clen)) == NULL) goto err;
2531 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2532 if (fread(c,clen,1,fp) == 0) goto err;
2533 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2534 zfree(c);
2535 return createObject(REDIS_STRING,val);
2536 err:
2537 zfree(c);
2538 sdsfree(val);
2539 return NULL;
2540 }
2541
2542 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2543 int isencoded;
2544 uint32_t len;
2545 sds val;
2546
2547 len = rdbLoadLen(fp,rdbver,&isencoded);
2548 if (isencoded) {
2549 switch(len) {
2550 case REDIS_RDB_ENC_INT8:
2551 case REDIS_RDB_ENC_INT16:
2552 case REDIS_RDB_ENC_INT32:
2553 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2554 case REDIS_RDB_ENC_LZF:
2555 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2556 default:
2557 assert(0!=0);
2558 }
2559 }
2560
2561 if (len == REDIS_RDB_LENERR) return NULL;
2562 val = sdsnewlen(NULL,len);
2563 if (len && fread(val,len,1,fp) == 0) {
2564 sdsfree(val);
2565 return NULL;
2566 }
2567 return tryObjectSharing(createObject(REDIS_STRING,val));
2568 }
2569
2570 /* For information about double serialization check rdbSaveDoubleValue() */
2571 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2572 char buf[128];
2573 unsigned char len;
2574
2575 if (fread(&len,1,1,fp) == 0) return -1;
2576 switch(len) {
2577 case 255: *val = R_NegInf; return 0;
2578 case 254: *val = R_PosInf; return 0;
2579 case 253: *val = R_Nan; return 0;
2580 default:
2581 if (fread(buf,len,1,fp) == 0) return -1;
2582 sscanf(buf, "%lg", val);
2583 return 0;
2584 }
2585 }
2586
2587 static int rdbLoad(char *filename) {
2588 FILE *fp;
2589 robj *keyobj = NULL;
2590 uint32_t dbid;
2591 int type, retval, rdbver;
2592 dict *d = server.db[0].dict;
2593 redisDb *db = server.db+0;
2594 char buf[1024];
2595 time_t expiretime = -1, now = time(NULL);
2596
2597 fp = fopen(filename,"r");
2598 if (!fp) return REDIS_ERR;
2599 if (fread(buf,9,1,fp) == 0) goto eoferr;
2600 buf[9] = '\0';
2601 if (memcmp(buf,"REDIS",5) != 0) {
2602 fclose(fp);
2603 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2604 return REDIS_ERR;
2605 }
2606 rdbver = atoi(buf+5);
2607 if (rdbver > 1) {
2608 fclose(fp);
2609 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2610 return REDIS_ERR;
2611 }
2612 while(1) {
2613 robj *o;
2614
2615 /* Read type. */
2616 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2617 if (type == REDIS_EXPIRETIME) {
2618 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2619 /* We read the time so we need to read the object type again */
2620 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2621 }
2622 if (type == REDIS_EOF) break;
2623 /* Handle SELECT DB opcode as a special case */
2624 if (type == REDIS_SELECTDB) {
2625 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2626 goto eoferr;
2627 if (dbid >= (unsigned)server.dbnum) {
2628 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2629 exit(1);
2630 }
2631 db = server.db+dbid;
2632 d = db->dict;
2633 continue;
2634 }
2635 /* Read key */
2636 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2637
2638 if (type == REDIS_STRING) {
2639 /* Read string value */
2640 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2641 tryObjectEncoding(o);
2642 } else if (type == REDIS_LIST || type == REDIS_SET) {
2643 /* Read list/set value */
2644 uint32_t listlen;
2645
2646 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2647 goto eoferr;
2648 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2649 /* Load every single element of the list/set */
2650 while(listlen--) {
2651 robj *ele;
2652
2653 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2654 tryObjectEncoding(ele);
2655 if (type == REDIS_LIST) {
2656 listAddNodeTail((list*)o->ptr,ele);
2657 } else {
2658 dictAdd((dict*)o->ptr,ele,NULL);
2659 }
2660 }
2661 } else if (type == REDIS_ZSET) {
2662 /* Read list/set value */
2663 uint32_t zsetlen;
2664 zset *zs;
2665
2666 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2667 goto eoferr;
2668 o = createZsetObject();
2669 zs = o->ptr;
2670 /* Load every single element of the list/set */
2671 while(zsetlen--) {
2672 robj *ele;
2673 double *score = zmalloc(sizeof(double));
2674
2675 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2676 tryObjectEncoding(ele);
2677 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2678 dictAdd(zs->dict,ele,score);
2679 zslInsert(zs->zsl,*score,ele);
2680 incrRefCount(ele); /* added to skiplist */
2681 }
2682 } else {
2683 assert(0 != 0);
2684 }
2685 /* Add the new object in the hash table */
2686 retval = dictAdd(d,keyobj,o);
2687 if (retval == DICT_ERR) {
2688 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2689 exit(1);
2690 }
2691 /* Set the expire time if needed */
2692 if (expiretime != -1) {
2693 setExpire(db,keyobj,expiretime);
2694 /* Delete this key if already expired */
2695 if (expiretime < now) deleteKey(db,keyobj);
2696 expiretime = -1;
2697 }
2698 keyobj = o = NULL;
2699 }
2700 fclose(fp);
2701 return REDIS_OK;
2702
2703 eoferr: /* unexpected end of file is handled here with a fatal exit */
2704 if (keyobj) decrRefCount(keyobj);
2705 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2706 exit(1);
2707 return REDIS_ERR; /* Just to avoid warning */
2708 }
2709
2710 /*================================== Commands =============================== */
2711
2712 static void authCommand(redisClient *c) {
2713 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2714 c->authenticated = 1;
2715 addReply(c,shared.ok);
2716 } else {
2717 c->authenticated = 0;
2718 addReply(c,shared.err);
2719 }
2720 }
2721
2722 static void pingCommand(redisClient *c) {
2723 addReply(c,shared.pong);
2724 }
2725
2726 static void echoCommand(redisClient *c) {
2727 addReplyBulkLen(c,c->argv[1]);
2728 addReply(c,c->argv[1]);
2729 addReply(c,shared.crlf);
2730 }
2731
2732 /*=================================== Strings =============================== */
2733
2734 static void setGenericCommand(redisClient *c, int nx) {
2735 int retval;
2736
2737 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2738 if (retval == DICT_ERR) {
2739 if (!nx) {
2740 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2741 incrRefCount(c->argv[2]);
2742 } else {
2743 addReply(c,shared.czero);
2744 return;
2745 }
2746 } else {
2747 incrRefCount(c->argv[1]);
2748 incrRefCount(c->argv[2]);
2749 }
2750 server.dirty++;
2751 removeExpire(c->db,c->argv[1]);
2752 addReply(c, nx ? shared.cone : shared.ok);
2753 }
2754
2755 static void setCommand(redisClient *c) {
2756 setGenericCommand(c,0);
2757 }
2758
2759 static void setnxCommand(redisClient *c) {
2760 setGenericCommand(c,1);
2761 }
2762
2763 static void getCommand(redisClient *c) {
2764 robj *o = lookupKeyRead(c->db,c->argv[1]);
2765
2766 if (o == NULL) {
2767 addReply(c,shared.nullbulk);
2768 } else {
2769 if (o->type != REDIS_STRING) {
2770 addReply(c,shared.wrongtypeerr);
2771 } else {
2772 addReplyBulkLen(c,o);
2773 addReply(c,o);
2774 addReply(c,shared.crlf);
2775 }
2776 }
2777 }
2778
2779 static void getsetCommand(redisClient *c) {
2780 getCommand(c);
2781 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2782 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2783 } else {
2784 incrRefCount(c->argv[1]);
2785 }
2786 incrRefCount(c->argv[2]);
2787 server.dirty++;
2788 removeExpire(c->db,c->argv[1]);
2789 }
2790
2791 static void mgetCommand(redisClient *c) {
2792 int j;
2793
2794 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2795 for (j = 1; j < c->argc; j++) {
2796 robj *o = lookupKeyRead(c->db,c->argv[j]);
2797 if (o == NULL) {
2798 addReply(c,shared.nullbulk);
2799 } else {
2800 if (o->type != REDIS_STRING) {
2801 addReply(c,shared.nullbulk);
2802 } else {
2803 addReplyBulkLen(c,o);
2804 addReply(c,o);
2805 addReply(c,shared.crlf);
2806 }
2807 }
2808 }
2809 }
2810
2811 static void incrDecrCommand(redisClient *c, long long incr) {
2812 long long value;
2813 int retval;
2814 robj *o;
2815
2816 o = lookupKeyWrite(c->db,c->argv[1]);
2817 if (o == NULL) {
2818 value = 0;
2819 } else {
2820 if (o->type != REDIS_STRING) {
2821 value = 0;
2822 } else {
2823 char *eptr;
2824
2825 if (o->encoding == REDIS_ENCODING_RAW)
2826 value = strtoll(o->ptr, &eptr, 10);
2827 else if (o->encoding == REDIS_ENCODING_INT)
2828 value = (long)o->ptr;
2829 else
2830 assert(1 != 1);
2831 }
2832 }
2833
2834 value += incr;
2835 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2836 tryObjectEncoding(o);
2837 retval = dictAdd(c->db->dict,c->argv[1],o);
2838 if (retval == DICT_ERR) {
2839 dictReplace(c->db->dict,c->argv[1],o);
2840 removeExpire(c->db,c->argv[1]);
2841 } else {
2842 incrRefCount(c->argv[1]);
2843 }
2844 server.dirty++;
2845 addReply(c,shared.colon);
2846 addReply(c,o);
2847 addReply(c,shared.crlf);
2848 }
2849
2850 static void incrCommand(redisClient *c) {
2851 incrDecrCommand(c,1);
2852 }
2853
2854 static void decrCommand(redisClient *c) {
2855 incrDecrCommand(c,-1);
2856 }
2857
2858 static void incrbyCommand(redisClient *c) {
2859 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2860 incrDecrCommand(c,incr);
2861 }
2862
2863 static void decrbyCommand(redisClient *c) {
2864 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2865 incrDecrCommand(c,-incr);
2866 }
2867
2868 /* ========================= Type agnostic commands ========================= */
2869
2870 static void delCommand(redisClient *c) {
2871 int deleted = 0, j;
2872
2873 for (j = 1; j < c->argc; j++) {
2874 if (deleteKey(c->db,c->argv[j])) {
2875 server.dirty++;
2876 deleted++;
2877 }
2878 }
2879 switch(deleted) {
2880 case 0:
2881 addReply(c,shared.czero);
2882 break;
2883 case 1:
2884 addReply(c,shared.cone);
2885 break;
2886 default:
2887 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2888 break;
2889 }
2890 }
2891
2892 static void existsCommand(redisClient *c) {
2893 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2894 }
2895
2896 static void selectCommand(redisClient *c) {
2897 int id = atoi(c->argv[1]->ptr);
2898
2899 if (selectDb(c,id) == REDIS_ERR) {
2900 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2901 } else {
2902 addReply(c,shared.ok);
2903 }
2904 }
2905
2906 static void randomkeyCommand(redisClient *c) {
2907 dictEntry *de;
2908
2909 while(1) {
2910 de = dictGetRandomKey(c->db->dict);
2911 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2912 }
2913 if (de == NULL) {
2914 addReply(c,shared.plus);
2915 addReply(c,shared.crlf);
2916 } else {
2917 addReply(c,shared.plus);
2918 addReply(c,dictGetEntryKey(de));
2919 addReply(c,shared.crlf);
2920 }
2921 }
2922
2923 static void keysCommand(redisClient *c) {
2924 dictIterator *di;
2925 dictEntry *de;
2926 sds pattern = c->argv[1]->ptr;
2927 int plen = sdslen(pattern);
2928 int numkeys = 0, keyslen = 0;
2929 robj *lenobj = createObject(REDIS_STRING,NULL);
2930
2931 di = dictGetIterator(c->db->dict);
2932 addReply(c,lenobj);
2933 decrRefCount(lenobj);
2934 while((de = dictNext(di)) != NULL) {
2935 robj *keyobj = dictGetEntryKey(de);
2936
2937 sds key = keyobj->ptr;
2938 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2939 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2940 if (expireIfNeeded(c->db,keyobj) == 0) {
2941 if (numkeys != 0)
2942 addReply(c,shared.space);
2943 addReply(c,keyobj);
2944 numkeys++;
2945 keyslen += sdslen(key);
2946 }
2947 }
2948 }
2949 dictReleaseIterator(di);
2950 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2951 addReply(c,shared.crlf);
2952 }
2953
2954 static void dbsizeCommand(redisClient *c) {
2955 addReplySds(c,
2956 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2957 }
2958
2959 static void lastsaveCommand(redisClient *c) {
2960 addReplySds(c,
2961 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2962 }
2963
2964 static void typeCommand(redisClient *c) {
2965 robj *o;
2966 char *type;
2967
2968 o = lookupKeyRead(c->db,c->argv[1]);
2969 if (o == NULL) {
2970 type = "+none";
2971 } else {
2972 switch(o->type) {
2973 case REDIS_STRING: type = "+string"; break;
2974 case REDIS_LIST: type = "+list"; break;
2975 case REDIS_SET: type = "+set"; break;
2976 default: type = "unknown"; break;
2977 }
2978 }
2979 addReplySds(c,sdsnew(type));
2980 addReply(c,shared.crlf);
2981 }
2982
2983 static void saveCommand(redisClient *c) {
2984 if (server.bgsaveinprogress) {
2985 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2986 return;
2987 }
2988 if (rdbSave(server.dbfilename) == REDIS_OK) {
2989 addReply(c,shared.ok);
2990 } else {
2991 addReply(c,shared.err);
2992 }
2993 }
2994
2995 static void bgsaveCommand(redisClient *c) {
2996 if (server.bgsaveinprogress) {
2997 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2998 return;
2999 }
3000 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3001 addReply(c,shared.ok);
3002 } else {
3003 addReply(c,shared.err);
3004 }
3005 }
3006
3007 static void shutdownCommand(redisClient *c) {
3008 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3009 /* Kill the saving child if there is a background saving in progress.
3010 We want to avoid race conditions, for instance our saving child may
3011 overwrite the synchronous saving did by SHUTDOWN. */
3012 if (server.bgsaveinprogress) {
3013 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3014 kill(server.bgsavechildpid,SIGKILL);
3015 rdbRemoveTempFile(server.bgsavechildpid);
3016 }
3017 /* SYNC SAVE */
3018 if (rdbSave(server.dbfilename) == REDIS_OK) {
3019 if (server.daemonize)
3020 unlink(server.pidfile);
3021 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3022 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3023 exit(1);
3024 } else {
3025 /* Ooops.. error saving! The best we can do is to continue operating.
3026 * Note that if there was a background saving process, in the next
3027 * cron() Redis will be notified that the background saving aborted,
3028 * handling special stuff like slaves pending for synchronization... */
3029 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3030 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3031 }
3032 }
3033
3034 static void renameGenericCommand(redisClient *c, int nx) {
3035 robj *o;
3036
3037 /* To use the same key as src and dst is probably an error */
3038 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3039 addReply(c,shared.sameobjecterr);
3040 return;
3041 }
3042
3043 o = lookupKeyWrite(c->db,c->argv[1]);
3044 if (o == NULL) {
3045 addReply(c,shared.nokeyerr);
3046 return;
3047 }
3048 incrRefCount(o);
3049 deleteIfVolatile(c->db,c->argv[2]);
3050 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3051 if (nx) {
3052 decrRefCount(o);
3053 addReply(c,shared.czero);
3054 return;
3055 }
3056 dictReplace(c->db->dict,c->argv[2],o);
3057 } else {
3058 incrRefCount(c->argv[2]);
3059 }
3060 deleteKey(c->db,c->argv[1]);
3061 server.dirty++;
3062 addReply(c,nx ? shared.cone : shared.ok);
3063 }
3064
3065 static void renameCommand(redisClient *c) {
3066 renameGenericCommand(c,0);
3067 }
3068
3069 static void renamenxCommand(redisClient *c) {
3070 renameGenericCommand(c,1);
3071 }
3072
3073 static void moveCommand(redisClient *c) {
3074 robj *o;
3075 redisDb *src, *dst;
3076 int srcid;
3077
3078 /* Obtain source and target DB pointers */
3079 src = c->db;
3080 srcid = c->db->id;
3081 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3082 addReply(c,shared.outofrangeerr);
3083 return;
3084 }
3085 dst = c->db;
3086 selectDb(c,srcid); /* Back to the source DB */
3087
3088 /* If the user is moving using as target the same
3089 * DB as the source DB it is probably an error. */
3090 if (src == dst) {
3091 addReply(c,shared.sameobjecterr);
3092 return;
3093 }
3094
3095 /* Check if the element exists and get a reference */
3096 o = lookupKeyWrite(c->db,c->argv[1]);
3097 if (!o) {
3098 addReply(c,shared.czero);
3099 return;
3100 }
3101
3102 /* Try to add the element to the target DB */
3103 deleteIfVolatile(dst,c->argv[1]);
3104 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3105 addReply(c,shared.czero);
3106 return;
3107 }
3108 incrRefCount(c->argv[1]);
3109 incrRefCount(o);
3110
3111 /* OK! key moved, free the entry in the source DB */
3112 deleteKey(src,c->argv[1]);
3113 server.dirty++;
3114 addReply(c,shared.cone);
3115 }
3116
3117 /* =================================== Lists ================================ */
3118 static void pushGenericCommand(redisClient *c, int where) {
3119 robj *lobj;
3120 list *list;
3121
3122 lobj = lookupKeyWrite(c->db,c->argv[1]);
3123 if (lobj == NULL) {
3124 lobj = createListObject();
3125 list = lobj->ptr;
3126 if (where == REDIS_HEAD) {
3127 listAddNodeHead(list,c->argv[2]);
3128 } else {
3129 listAddNodeTail(list,c->argv[2]);
3130 }
3131 dictAdd(c->db->dict,c->argv[1],lobj);
3132 incrRefCount(c->argv[1]);
3133 incrRefCount(c->argv[2]);
3134 } else {
3135 if (lobj->type != REDIS_LIST) {
3136 addReply(c,shared.wrongtypeerr);
3137 return;
3138 }
3139 list = lobj->ptr;
3140 if (where == REDIS_HEAD) {
3141 listAddNodeHead(list,c->argv[2]);
3142 } else {
3143 listAddNodeTail(list,c->argv[2]);
3144 }
3145 incrRefCount(c->argv[2]);
3146 }
3147 server.dirty++;
3148 addReply(c,shared.ok);
3149 }
3150
3151 static void lpushCommand(redisClient *c) {
3152 pushGenericCommand(c,REDIS_HEAD);
3153 }
3154
3155 static void rpushCommand(redisClient *c) {
3156 pushGenericCommand(c,REDIS_TAIL);
3157 }
3158
3159 static void llenCommand(redisClient *c) {
3160 robj *o;
3161 list *l;
3162
3163 o = lookupKeyRead(c->db,c->argv[1]);
3164 if (o == NULL) {
3165 addReply(c,shared.czero);
3166 return;
3167 } else {
3168 if (o->type != REDIS_LIST) {
3169 addReply(c,shared.wrongtypeerr);
3170 } else {
3171 l = o->ptr;
3172 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3173 }
3174 }
3175 }
3176
3177 static void lindexCommand(redisClient *c) {
3178 robj *o;
3179 int index = atoi(c->argv[2]->ptr);
3180
3181 o = lookupKeyRead(c->db,c->argv[1]);
3182 if (o == NULL) {
3183 addReply(c,shared.nullbulk);
3184 } else {
3185 if (o->type != REDIS_LIST) {
3186 addReply(c,shared.wrongtypeerr);
3187 } else {
3188 list *list = o->ptr;
3189 listNode *ln;
3190
3191 ln = listIndex(list, index);
3192 if (ln == NULL) {
3193 addReply(c,shared.nullbulk);
3194 } else {
3195 robj *ele = listNodeValue(ln);
3196 addReplyBulkLen(c,ele);
3197 addReply(c,ele);
3198 addReply(c,shared.crlf);
3199 }
3200 }
3201 }
3202 }
3203
3204 static void lsetCommand(redisClient *c) {
3205 robj *o;
3206 int index = atoi(c->argv[2]->ptr);
3207
3208 o = lookupKeyWrite(c->db,c->argv[1]);
3209 if (o == NULL) {
3210 addReply(c,shared.nokeyerr);
3211 } else {
3212 if (o->type != REDIS_LIST) {
3213 addReply(c,shared.wrongtypeerr);
3214 } else {
3215 list *list = o->ptr;
3216 listNode *ln;
3217
3218 ln = listIndex(list, index);
3219 if (ln == NULL) {
3220 addReply(c,shared.outofrangeerr);
3221 } else {
3222 robj *ele = listNodeValue(ln);
3223
3224 decrRefCount(ele);
3225 listNodeValue(ln) = c->argv[3];
3226 incrRefCount(c->argv[3]);
3227 addReply(c,shared.ok);
3228 server.dirty++;
3229 }
3230 }
3231 }
3232 }
3233
3234 static void popGenericCommand(redisClient *c, int where) {
3235 robj *o;
3236
3237 o = lookupKeyWrite(c->db,c->argv[1]);
3238 if (o == NULL) {
3239 addReply(c,shared.nullbulk);
3240 } else {
3241 if (o->type != REDIS_LIST) {
3242 addReply(c,shared.wrongtypeerr);
3243 } else {
3244 list *list = o->ptr;
3245 listNode *ln;
3246
3247 if (where == REDIS_HEAD)
3248 ln = listFirst(list);
3249 else
3250 ln = listLast(list);
3251
3252 if (ln == NULL) {
3253 addReply(c,shared.nullbulk);
3254 } else {
3255 robj *ele = listNodeValue(ln);
3256 addReplyBulkLen(c,ele);
3257 addReply(c,ele);
3258 addReply(c,shared.crlf);
3259 listDelNode(list,ln);
3260 server.dirty++;
3261 }
3262 }
3263 }
3264 }
3265
3266 static void lpopCommand(redisClient *c) {
3267 popGenericCommand(c,REDIS_HEAD);
3268 }
3269
3270 static void rpopCommand(redisClient *c) {
3271 popGenericCommand(c,REDIS_TAIL);
3272 }
3273
3274 static void lrangeCommand(redisClient *c) {
3275 robj *o;
3276 int start = atoi(c->argv[2]->ptr);
3277 int end = atoi(c->argv[3]->ptr);
3278
3279 o = lookupKeyRead(c->db,c->argv[1]);
3280 if (o == NULL) {
3281 addReply(c,shared.nullmultibulk);
3282 } else {
3283 if (o->type != REDIS_LIST) {
3284 addReply(c,shared.wrongtypeerr);
3285 } else {
3286 list *list = o->ptr;
3287 listNode *ln;
3288 int llen = listLength(list);
3289 int rangelen, j;
3290 robj *ele;
3291
3292 /* convert negative indexes */
3293 if (start < 0) start = llen+start;
3294 if (end < 0) end = llen+end;
3295 if (start < 0) start = 0;
3296 if (end < 0) end = 0;
3297
3298 /* indexes sanity checks */
3299 if (start > end || start >= llen) {
3300 /* Out of range start or start > end result in empty list */
3301 addReply(c,shared.emptymultibulk);
3302 return;
3303 }
3304 if (end >= llen) end = llen-1;
3305 rangelen = (end-start)+1;
3306
3307 /* Return the result in form of a multi-bulk reply */
3308 ln = listIndex(list, start);
3309 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3310 for (j = 0; j < rangelen; j++) {
3311 ele = listNodeValue(ln);
3312 addReplyBulkLen(c,ele);
3313 addReply(c,ele);
3314 addReply(c,shared.crlf);
3315 ln = ln->next;
3316 }
3317 }
3318 }
3319 }
3320
3321 static void ltrimCommand(redisClient *c) {
3322 robj *o;
3323 int start = atoi(c->argv[2]->ptr);
3324 int end = atoi(c->argv[3]->ptr);
3325
3326 o = lookupKeyWrite(c->db,c->argv[1]);
3327 if (o == NULL) {
3328 addReply(c,shared.nokeyerr);
3329 } else {
3330 if (o->type != REDIS_LIST) {
3331 addReply(c,shared.wrongtypeerr);
3332 } else {
3333 list *list = o->ptr;
3334 listNode *ln;
3335 int llen = listLength(list);
3336 int j, ltrim, rtrim;
3337
3338 /* convert negative indexes */
3339 if (start < 0) start = llen+start;
3340 if (end < 0) end = llen+end;
3341 if (start < 0) start = 0;
3342 if (end < 0) end = 0;
3343
3344 /* indexes sanity checks */
3345 if (start > end || start >= llen) {
3346 /* Out of range start or start > end result in empty list */
3347 ltrim = llen;
3348 rtrim = 0;
3349 } else {
3350 if (end >= llen) end = llen-1;
3351 ltrim = start;
3352 rtrim = llen-end-1;
3353 }
3354
3355 /* Remove list elements to perform the trim */
3356 for (j = 0; j < ltrim; j++) {
3357 ln = listFirst(list);
3358 listDelNode(list,ln);
3359 }
3360 for (j = 0; j < rtrim; j++) {
3361 ln = listLast(list);
3362 listDelNode(list,ln);
3363 }
3364 server.dirty++;
3365 addReply(c,shared.ok);
3366 }
3367 }
3368 }
3369
3370 static void lremCommand(redisClient *c) {
3371 robj *o;
3372
3373 o = lookupKeyWrite(c->db,c->argv[1]);
3374 if (o == NULL) {
3375 addReply(c,shared.czero);
3376 } else {
3377 if (o->type != REDIS_LIST) {
3378 addReply(c,shared.wrongtypeerr);
3379 } else {
3380 list *list = o->ptr;
3381 listNode *ln, *next;
3382 int toremove = atoi(c->argv[2]->ptr);
3383 int removed = 0;
3384 int fromtail = 0;
3385
3386 if (toremove < 0) {
3387 toremove = -toremove;
3388 fromtail = 1;
3389 }
3390 ln = fromtail ? list->tail : list->head;
3391 while (ln) {
3392 robj *ele = listNodeValue(ln);
3393
3394 next = fromtail ? ln->prev : ln->next;
3395 if (compareStringObjects(ele,c->argv[3]) == 0) {
3396 listDelNode(list,ln);
3397 server.dirty++;
3398 removed++;
3399 if (toremove && removed == toremove) break;
3400 }
3401 ln = next;
3402 }
3403 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3404 }
3405 }
3406 }
3407
3408 /* ==================================== Sets ================================ */
3409
3410 static void saddCommand(redisClient *c) {
3411 robj *set;
3412
3413 set = lookupKeyWrite(c->db,c->argv[1]);
3414 if (set == NULL) {
3415 set = createSetObject();
3416 dictAdd(c->db->dict,c->argv[1],set);
3417 incrRefCount(c->argv[1]);
3418 } else {
3419 if (set->type != REDIS_SET) {
3420 addReply(c,shared.wrongtypeerr);
3421 return;
3422 }
3423 }
3424 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3425 incrRefCount(c->argv[2]);
3426 server.dirty++;
3427 addReply(c,shared.cone);
3428 } else {
3429 addReply(c,shared.czero);
3430 }
3431 }
3432
3433 static void sremCommand(redisClient *c) {
3434 robj *set;
3435
3436 set = lookupKeyWrite(c->db,c->argv[1]);
3437 if (set == NULL) {
3438 addReply(c,shared.czero);
3439 } else {
3440 if (set->type != REDIS_SET) {
3441 addReply(c,shared.wrongtypeerr);
3442 return;
3443 }
3444 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3445 server.dirty++;
3446 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3447 addReply(c,shared.cone);
3448 } else {
3449 addReply(c,shared.czero);
3450 }
3451 }
3452 }
3453
3454 static void smoveCommand(redisClient *c) {
3455 robj *srcset, *dstset;
3456
3457 srcset = lookupKeyWrite(c->db,c->argv[1]);
3458 dstset = lookupKeyWrite(c->db,c->argv[2]);
3459
3460 /* If the source key does not exist return 0, if it's of the wrong type
3461 * raise an error */
3462 if (srcset == NULL || srcset->type != REDIS_SET) {
3463 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3464 return;
3465 }
3466 /* Error if the destination key is not a set as well */
3467 if (dstset && dstset->type != REDIS_SET) {
3468 addReply(c,shared.wrongtypeerr);
3469 return;
3470 }
3471 /* Remove the element from the source set */
3472 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3473 /* Key not found in the src set! return zero */
3474 addReply(c,shared.czero);
3475 return;
3476 }
3477 server.dirty++;
3478 /* Add the element to the destination set */
3479 if (!dstset) {
3480 dstset = createSetObject();
3481 dictAdd(c->db->dict,c->argv[2],dstset);
3482 incrRefCount(c->argv[2]);
3483 }
3484 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3485 incrRefCount(c->argv[3]);
3486 addReply(c,shared.cone);
3487 }
3488
3489 static void sismemberCommand(redisClient *c) {
3490 robj *set;
3491
3492 set = lookupKeyRead(c->db,c->argv[1]);
3493 if (set == NULL) {
3494 addReply(c,shared.czero);
3495 } else {
3496 if (set->type != REDIS_SET) {
3497 addReply(c,shared.wrongtypeerr);
3498 return;
3499 }
3500 if (dictFind(set->ptr,c->argv[2]))
3501 addReply(c,shared.cone);
3502 else
3503 addReply(c,shared.czero);
3504 }
3505 }
3506
3507 static void scardCommand(redisClient *c) {
3508 robj *o;
3509 dict *s;
3510
3511 o = lookupKeyRead(c->db,c->argv[1]);
3512 if (o == NULL) {
3513 addReply(c,shared.czero);
3514 return;
3515 } else {
3516 if (o->type != REDIS_SET) {
3517 addReply(c,shared.wrongtypeerr);
3518 } else {
3519 s = o->ptr;
3520 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3521 dictSize(s)));
3522 }
3523 }
3524 }
3525
3526 static void spopCommand(redisClient *c) {
3527 robj *set;
3528 dictEntry *de;
3529
3530 set = lookupKeyWrite(c->db,c->argv[1]);
3531 if (set == NULL) {
3532 addReply(c,shared.nullbulk);
3533 } else {
3534 if (set->type != REDIS_SET) {
3535 addReply(c,shared.wrongtypeerr);
3536 return;
3537 }
3538 de = dictGetRandomKey(set->ptr);
3539 if (de == NULL) {
3540 addReply(c,shared.nullbulk);
3541 } else {
3542 robj *ele = dictGetEntryKey(de);
3543
3544 addReplyBulkLen(c,ele);
3545 addReply(c,ele);
3546 addReply(c,shared.crlf);
3547 dictDelete(set->ptr,ele);
3548 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3549 server.dirty++;
3550 }
3551 }
3552 }
3553
3554 static void srandmemberCommand(redisClient *c) {
3555 robj *set;
3556 dictEntry *de;
3557
3558 set = lookupKeyRead(c->db,c->argv[1]);
3559 if (set == NULL) {
3560 addReply(c,shared.nullbulk);
3561 } else {
3562 if (set->type != REDIS_SET) {
3563 addReply(c,shared.wrongtypeerr);
3564 return;
3565 }
3566 de = dictGetRandomKey(set->ptr);
3567 if (de == NULL) {
3568 addReply(c,shared.nullbulk);
3569 } else {
3570 robj *ele = dictGetEntryKey(de);
3571
3572 addReplyBulkLen(c,ele);
3573 addReply(c,ele);
3574 addReply(c,shared.crlf);
3575 }
3576 }
3577 }
3578
3579 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3580 dict **d1 = (void*) s1, **d2 = (void*) s2;
3581
3582 return dictSize(*d1)-dictSize(*d2);
3583 }
3584
3585 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3586 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3587 dictIterator *di;
3588 dictEntry *de;
3589 robj *lenobj = NULL, *dstset = NULL;
3590 int j, cardinality = 0;
3591
3592 for (j = 0; j < setsnum; j++) {
3593 robj *setobj;
3594
3595 setobj = dstkey ?
3596 lookupKeyWrite(c->db,setskeys[j]) :
3597 lookupKeyRead(c->db,setskeys[j]);
3598 if (!setobj) {
3599 zfree(dv);
3600 if (dstkey) {
3601 deleteKey(c->db,dstkey);
3602 addReply(c,shared.ok);
3603 } else {
3604 addReply(c,shared.nullmultibulk);
3605 }
3606 return;
3607 }
3608 if (setobj->type != REDIS_SET) {
3609 zfree(dv);
3610 addReply(c,shared.wrongtypeerr);
3611 return;
3612 }
3613 dv[j] = setobj->ptr;
3614 }
3615 /* Sort sets from the smallest to largest, this will improve our
3616 * algorithm's performace */
3617 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3618
3619 /* The first thing we should output is the total number of elements...
3620 * since this is a multi-bulk write, but at this stage we don't know
3621 * the intersection set size, so we use a trick, append an empty object
3622 * to the output list and save the pointer to later modify it with the
3623 * right length */
3624 if (!dstkey) {
3625 lenobj = createObject(REDIS_STRING,NULL);
3626 addReply(c,lenobj);
3627 decrRefCount(lenobj);
3628 } else {
3629 /* If we have a target key where to store the resulting set
3630 * create this key with an empty set inside */
3631 dstset = createSetObject();
3632 }
3633
3634 /* Iterate all the elements of the first (smallest) set, and test
3635 * the element against all the other sets, if at least one set does
3636 * not include the element it is discarded */
3637 di = dictGetIterator(dv[0]);
3638
3639 while((de = dictNext(di)) != NULL) {
3640 robj *ele;
3641
3642 for (j = 1; j < setsnum; j++)
3643 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3644 if (j != setsnum)
3645 continue; /* at least one set does not contain the member */
3646 ele = dictGetEntryKey(de);
3647 if (!dstkey) {
3648 addReplyBulkLen(c,ele);
3649 addReply(c,ele);
3650 addReply(c,shared.crlf);
3651 cardinality++;
3652 } else {
3653 dictAdd(dstset->ptr,ele,NULL);
3654 incrRefCount(ele);
3655 }
3656 }
3657 dictReleaseIterator(di);
3658
3659 if (dstkey) {
3660 /* Store the resulting set into the target */
3661 deleteKey(c->db,dstkey);
3662 dictAdd(c->db->dict,dstkey,dstset);
3663 incrRefCount(dstkey);
3664 }
3665
3666 if (!dstkey) {
3667 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3668 } else {
3669 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3670 dictSize((dict*)dstset->ptr)));
3671 server.dirty++;
3672 }
3673 zfree(dv);
3674 }
3675
3676 static void sinterCommand(redisClient *c) {
3677 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3678 }
3679
3680 static void sinterstoreCommand(redisClient *c) {
3681 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3682 }
3683
3684 #define REDIS_OP_UNION 0
3685 #define REDIS_OP_DIFF 1
3686
3687 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3688 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3689 dictIterator *di;
3690 dictEntry *de;
3691 robj *dstset = NULL;
3692 int j, cardinality = 0;
3693
3694 for (j = 0; j < setsnum; j++) {
3695 robj *setobj;
3696
3697 setobj = dstkey ?
3698 lookupKeyWrite(c->db,setskeys[j]) :
3699 lookupKeyRead(c->db,setskeys[j]);
3700 if (!setobj) {
3701 dv[j] = NULL;
3702 continue;
3703 }
3704 if (setobj->type != REDIS_SET) {
3705 zfree(dv);
3706 addReply(c,shared.wrongtypeerr);
3707 return;
3708 }
3709 dv[j] = setobj->ptr;
3710 }
3711
3712 /* We need a temp set object to store our union. If the dstkey
3713 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3714 * this set object will be the resulting object to set into the target key*/
3715 dstset = createSetObject();
3716
3717 /* Iterate all the elements of all the sets, add every element a single
3718 * time to the result set */
3719 for (j = 0; j < setsnum; j++) {
3720 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3721 if (!dv[j]) continue; /* non existing keys are like empty sets */
3722
3723 di = dictGetIterator(dv[j]);
3724
3725 while((de = dictNext(di)) != NULL) {
3726 robj *ele;
3727
3728 /* dictAdd will not add the same element multiple times */
3729 ele = dictGetEntryKey(de);
3730 if (op == REDIS_OP_UNION || j == 0) {
3731 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3732 incrRefCount(ele);
3733 cardinality++;
3734 }
3735 } else if (op == REDIS_OP_DIFF) {
3736 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3737 cardinality--;
3738 }
3739 }
3740 }
3741 dictReleaseIterator(di);
3742
3743 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3744 }
3745
3746 /* Output the content of the resulting set, if not in STORE mode */
3747 if (!dstkey) {
3748 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3749 di = dictGetIterator(dstset->ptr);
3750 while((de = dictNext(di)) != NULL) {
3751 robj *ele;
3752
3753 ele = dictGetEntryKey(de);
3754 addReplyBulkLen(c,ele);
3755 addReply(c,ele);
3756 addReply(c,shared.crlf);
3757 }
3758 dictReleaseIterator(di);
3759 } else {
3760 /* If we have a target key where to store the resulting set
3761 * create this key with the result set inside */
3762 deleteKey(c->db,dstkey);
3763 dictAdd(c->db->dict,dstkey,dstset);
3764 incrRefCount(dstkey);
3765 }
3766
3767 /* Cleanup */
3768 if (!dstkey) {
3769 decrRefCount(dstset);
3770 } else {
3771 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3772 dictSize((dict*)dstset->ptr)));
3773 server.dirty++;
3774 }
3775 zfree(dv);
3776 }
3777
3778 static void sunionCommand(redisClient *c) {
3779 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3780 }
3781
3782 static void sunionstoreCommand(redisClient *c) {
3783 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3784 }
3785
3786 static void sdiffCommand(redisClient *c) {
3787 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3788 }
3789
3790 static void sdiffstoreCommand(redisClient *c) {
3791 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3792 }
3793
3794 /* ==================================== ZSets =============================== */
3795
3796 /* ZSETs are ordered sets using two data structures to hold the same elements
3797 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3798 * data structure.
3799 *
3800 * The elements are added to an hash table mapping Redis objects to scores.
3801 * At the same time the elements are added to a skip list mapping scores
3802 * to Redis objects (so objects are sorted by scores in this "view"). */
3803
3804 /* This skiplist implementation is almost a C translation of the original
3805 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3806 * Alternative to Balanced Trees", modified in three ways:
3807 * a) this implementation allows for repeated values.
3808 * b) the comparison is not just by key (our 'score') but by satellite data.
3809 * c) there is a back pointer, so it's a doubly linked list with the back
3810 * pointers being only at "level 1". This allows to traverse the list
3811 * from tail to head, useful for ZREVRANGE. */
3812
3813 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3814 zskiplistNode *zn = zmalloc(sizeof(*zn));
3815
3816 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3817 zn->score = score;
3818 zn->obj = obj;
3819 return zn;
3820 }
3821
3822 static zskiplist *zslCreate(void) {
3823 int j;
3824 zskiplist *zsl;
3825
3826 zsl = zmalloc(sizeof(*zsl));
3827 zsl->level = 1;
3828 zsl->length = 0;
3829 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3830 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3831 zsl->header->forward[j] = NULL;
3832 zsl->header->backward = NULL;
3833 zsl->tail = NULL;
3834 return zsl;
3835 }
3836
3837 static void zslFreeNode(zskiplistNode *node) {
3838 decrRefCount(node->obj);
3839 zfree(node->forward);
3840 zfree(node);
3841 }
3842
3843 static void zslFree(zskiplist *zsl) {
3844 zskiplistNode *node = zsl->header->forward[0], *next;
3845
3846 zfree(zsl->header->forward);
3847 zfree(zsl->header);
3848 while(node) {
3849 next = node->forward[0];
3850 zslFreeNode(node);
3851 node = next;
3852 }
3853 zfree(zsl);
3854 }
3855
3856 static int zslRandomLevel(void) {
3857 int level = 1;
3858 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3859 level += 1;
3860 return level;
3861 }
3862
3863 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3864 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3865 int i, level;
3866
3867 x = zsl->header;
3868 for (i = zsl->level-1; i >= 0; i--) {
3869 while (x->forward[i] &&
3870 (x->forward[i]->score < score ||
3871 (x->forward[i]->score == score &&
3872 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3873 x = x->forward[i];
3874 update[i] = x;
3875 }
3876 /* we assume the key is not already inside, since we allow duplicated
3877 * scores, and the re-insertion of score and redis object should never
3878 * happpen since the caller of zslInsert() should test in the hash table
3879 * if the element is already inside or not. */
3880 level = zslRandomLevel();
3881 if (level > zsl->level) {
3882 for (i = zsl->level; i < level; i++)
3883 update[i] = zsl->header;
3884 zsl->level = level;
3885 }
3886 x = zslCreateNode(level,score,obj);
3887 for (i = 0; i < level; i++) {
3888 x->forward[i] = update[i]->forward[i];
3889 update[i]->forward[i] = x;
3890 }
3891 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3892 if (x->forward[0])
3893 x->forward[0]->backward = x;
3894 else
3895 zsl->tail = x;
3896 zsl->length++;
3897 }
3898
3899 /* Delete an element with matching score/object from the skiplist. */
3900 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3901 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3902 int i;
3903
3904 x = zsl->header;
3905 for (i = zsl->level-1; i >= 0; i--) {
3906 while (x->forward[i] &&
3907 (x->forward[i]->score < score ||
3908 (x->forward[i]->score == score &&
3909 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3910 x = x->forward[i];
3911 update[i] = x;
3912 }
3913 /* We may have multiple elements with the same score, what we need
3914 * is to find the element with both the right score and object. */
3915 x = x->forward[0];
3916 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3917 for (i = 0; i < zsl->level; i++) {
3918 if (update[i]->forward[i] != x) break;
3919 update[i]->forward[i] = x->forward[i];
3920 }
3921 if (x->forward[0]) {
3922 x->forward[0]->backward = (x->backward == zsl->header) ?
3923 NULL : x->backward;
3924 } else {
3925 zsl->tail = x->backward;
3926 }
3927 zslFreeNode(x);
3928 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3929 zsl->level--;
3930 zsl->length--;
3931 return 1;
3932 } else {
3933 return 0; /* not found */
3934 }
3935 return 0; /* not found */
3936 }
3937
3938 /* Find the first node having a score equal or greater than the specified one.
3939 * Returns NULL if there is no match. */
3940 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
3941 zskiplistNode *x;
3942 int i;
3943
3944 x = zsl->header;
3945 for (i = zsl->level-1; i >= 0; i--) {
3946 while (x->forward[i] && x->forward[i]->score < score)
3947 x = x->forward[i];
3948 }
3949 /* We may have multiple elements with the same score, what we need
3950 * is to find the element with both the right score and object. */
3951 return x->forward[0];
3952 }
3953
3954 /* The actual Z-commands implementations */
3955
3956 static void zaddCommand(redisClient *c) {
3957 robj *zsetobj;
3958 zset *zs;
3959 double *score;
3960
3961 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3962 if (zsetobj == NULL) {
3963 zsetobj = createZsetObject();
3964 dictAdd(c->db->dict,c->argv[1],zsetobj);
3965 incrRefCount(c->argv[1]);
3966 } else {
3967 if (zsetobj->type != REDIS_ZSET) {
3968 addReply(c,shared.wrongtypeerr);
3969 return;
3970 }
3971 }
3972 score = zmalloc(sizeof(double));
3973 *score = strtod(c->argv[2]->ptr,NULL);
3974 zs = zsetobj->ptr;
3975 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3976 /* case 1: New element */
3977 incrRefCount(c->argv[3]); /* added to hash */
3978 zslInsert(zs->zsl,*score,c->argv[3]);
3979 incrRefCount(c->argv[3]); /* added to skiplist */
3980 server.dirty++;
3981 addReply(c,shared.cone);
3982 } else {
3983 dictEntry *de;
3984 double *oldscore;
3985
3986 /* case 2: Score update operation */
3987 de = dictFind(zs->dict,c->argv[3]);
3988 assert(de != NULL);
3989 oldscore = dictGetEntryVal(de);
3990 if (*score != *oldscore) {
3991 int deleted;
3992
3993 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3994 assert(deleted != 0);
3995 zslInsert(zs->zsl,*score,c->argv[3]);
3996 incrRefCount(c->argv[3]);
3997 dictReplace(zs->dict,c->argv[3],score);
3998 server.dirty++;
3999 } else {
4000 zfree(score);
4001 }
4002 addReply(c,shared.czero);
4003 }
4004 }
4005
4006 static void zremCommand(redisClient *c) {
4007 robj *zsetobj;
4008 zset *zs;
4009
4010 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4011 if (zsetobj == NULL) {
4012 addReply(c,shared.czero);
4013 } else {
4014 dictEntry *de;
4015 double *oldscore;
4016 int deleted;
4017
4018 if (zsetobj->type != REDIS_ZSET) {
4019 addReply(c,shared.wrongtypeerr);
4020 return;
4021 }
4022 zs = zsetobj->ptr;
4023 de = dictFind(zs->dict,c->argv[2]);
4024 if (de == NULL) {
4025 addReply(c,shared.czero);
4026 return;
4027 }
4028 /* Delete from the skiplist */
4029 oldscore = dictGetEntryVal(de);
4030 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4031 assert(deleted != 0);
4032
4033 /* Delete from the hash table */
4034 dictDelete(zs->dict,c->argv[2]);
4035 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4036 server.dirty++;
4037 addReply(c,shared.cone);
4038 }
4039 }
4040
4041 static void zrangeGenericCommand(redisClient *c, int reverse) {
4042 robj *o;
4043 int start = atoi(c->argv[2]->ptr);
4044 int end = atoi(c->argv[3]->ptr);
4045
4046 o = lookupKeyRead(c->db,c->argv[1]);
4047 if (o == NULL) {
4048 addReply(c,shared.nullmultibulk);
4049 } else {
4050 if (o->type != REDIS_ZSET) {
4051 addReply(c,shared.wrongtypeerr);
4052 } else {
4053 zset *zsetobj = o->ptr;
4054 zskiplist *zsl = zsetobj->zsl;
4055 zskiplistNode *ln;
4056
4057 int llen = zsl->length;
4058 int rangelen, j;
4059 robj *ele;
4060
4061 /* convert negative indexes */
4062 if (start < 0) start = llen+start;
4063 if (end < 0) end = llen+end;
4064 if (start < 0) start = 0;
4065 if (end < 0) end = 0;
4066
4067 /* indexes sanity checks */
4068 if (start > end || start >= llen) {
4069 /* Out of range start or start > end result in empty list */
4070 addReply(c,shared.emptymultibulk);
4071 return;
4072 }
4073 if (end >= llen) end = llen-1;
4074 rangelen = (end-start)+1;
4075
4076 /* Return the result in form of a multi-bulk reply */
4077 if (reverse) {
4078 ln = zsl->tail;
4079 while (start--)
4080 ln = ln->backward;
4081 } else {
4082 ln = zsl->header->forward[0];
4083 while (start--)
4084 ln = ln->forward[0];
4085 }
4086
4087 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4088 for (j = 0; j < rangelen; j++) {
4089 ele = ln->obj;
4090 addReplyBulkLen(c,ele);
4091 addReply(c,ele);
4092 addReply(c,shared.crlf);
4093 ln = reverse ? ln->backward : ln->forward[0];
4094 }
4095 }
4096 }
4097 }
4098
4099 static void zrangeCommand(redisClient *c) {
4100 zrangeGenericCommand(c,0);
4101 }
4102
4103 static void zrevrangeCommand(redisClient *c) {
4104 zrangeGenericCommand(c,1);
4105 }
4106
4107 static void zrangebyscoreCommand(redisClient *c) {
4108 robj *o;
4109 double min = strtod(c->argv[2]->ptr,NULL);
4110 double max = strtod(c->argv[3]->ptr,NULL);
4111
4112 o = lookupKeyRead(c->db,c->argv[1]);
4113 if (o == NULL) {
4114 addReply(c,shared.nullmultibulk);
4115 } else {
4116 if (o->type != REDIS_ZSET) {
4117 addReply(c,shared.wrongtypeerr);
4118 } else {
4119 zset *zsetobj = o->ptr;
4120 zskiplist *zsl = zsetobj->zsl;
4121 zskiplistNode *ln;
4122 robj *ele, *lenobj;
4123 unsigned int rangelen = 0;
4124
4125 /* Get the first node with the score >= min */
4126 ln = zslFirstWithScore(zsl,min);
4127 if (ln == NULL) {
4128 /* No element matching the speciifed interval */
4129 addReply(c,shared.emptymultibulk);
4130 return;
4131 }
4132
4133 /* We don't know in advance how many matching elements there
4134 * are in the list, so we push this object that will represent
4135 * the multi-bulk length in the output buffer, and will "fix"
4136 * it later */
4137 lenobj = createObject(REDIS_STRING,NULL);
4138 addReply(c,lenobj);
4139
4140 while(ln && ln->score <= max) {
4141 ele = ln->obj;
4142 addReplyBulkLen(c,ele);
4143 addReply(c,ele);
4144 addReply(c,shared.crlf);
4145 ln = ln->forward[0];
4146 rangelen++;
4147 }
4148 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4149 }
4150 }
4151 }
4152
4153 static void zlenCommand(redisClient *c) {
4154 robj *o;
4155 zset *zs;
4156
4157 o = lookupKeyRead(c->db,c->argv[1]);
4158 if (o == NULL) {
4159 addReply(c,shared.czero);
4160 return;
4161 } else {
4162 if (o->type != REDIS_ZSET) {
4163 addReply(c,shared.wrongtypeerr);
4164 } else {
4165 zs = o->ptr;
4166 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4167 }
4168 }
4169 }
4170
4171 static void zscoreCommand(redisClient *c) {
4172 robj *o;
4173 zset *zs;
4174
4175 o = lookupKeyRead(c->db,c->argv[1]);
4176 if (o == NULL) {
4177 addReply(c,shared.czero);
4178 return;
4179 } else {
4180 if (o->type != REDIS_ZSET) {
4181 addReply(c,shared.wrongtypeerr);
4182 } else {
4183 dictEntry *de;
4184
4185 zs = o->ptr;
4186 de = dictFind(zs->dict,c->argv[2]);
4187 if (!de) {
4188 addReply(c,shared.nullbulk);
4189 } else {
4190 char buf[128];
4191 double *score = dictGetEntryVal(de);
4192
4193 snprintf(buf,sizeof(buf),"%.16g",*score);
4194 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4195 strlen(buf),buf));
4196 }
4197 }
4198 }
4199 }
4200
4201 /* ========================= Non type-specific commands ==================== */
4202
4203 static void flushdbCommand(redisClient *c) {
4204 server.dirty += dictSize(c->db->dict);
4205 dictEmpty(c->db->dict);
4206 dictEmpty(c->db->expires);
4207 addReply(c,shared.ok);
4208 }
4209
4210 static void flushallCommand(redisClient *c) {
4211 server.dirty += emptyDb();
4212 addReply(c,shared.ok);
4213 rdbSave(server.dbfilename);
4214 server.dirty++;
4215 }
4216
4217 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4218 redisSortOperation *so = zmalloc(sizeof(*so));
4219 so->type = type;
4220 so->pattern = pattern;
4221 return so;
4222 }
4223
4224 /* Return the value associated to the key with a name obtained
4225 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4226 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4227 char *p;
4228 sds spat, ssub;
4229 robj keyobj;
4230 int prefixlen, sublen, postfixlen;
4231 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4232 struct {
4233 long len;
4234 long free;
4235 char buf[REDIS_SORTKEY_MAX+1];
4236 } keyname;
4237
4238 if (subst->encoding == REDIS_ENCODING_RAW)
4239 incrRefCount(subst);
4240 else {
4241 subst = getDecodedObject(subst);
4242 }
4243
4244 spat = pattern->ptr;
4245 ssub = subst->ptr;
4246 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4247 p = strchr(spat,'*');
4248 if (!p) return NULL;
4249
4250 prefixlen = p-spat;
4251 sublen = sdslen(ssub);
4252 postfixlen = sdslen(spat)-(prefixlen+1);
4253 memcpy(keyname.buf,spat,prefixlen);
4254 memcpy(keyname.buf+prefixlen,ssub,sublen);
4255 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4256 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4257 keyname.len = prefixlen+sublen+postfixlen;
4258
4259 keyobj.refcount = 1;
4260 keyobj.type = REDIS_STRING;
4261 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4262
4263 decrRefCount(subst);
4264
4265 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4266 return lookupKeyRead(db,&keyobj);
4267 }
4268
4269 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4270 * the additional parameter is not standard but a BSD-specific we have to
4271 * pass sorting parameters via the global 'server' structure */
4272 static int sortCompare(const void *s1, const void *s2) {
4273 const redisSortObject *so1 = s1, *so2 = s2;
4274 int cmp;
4275
4276 if (!server.sort_alpha) {
4277 /* Numeric sorting. Here it's trivial as we precomputed scores */
4278 if (so1->u.score > so2->u.score) {
4279 cmp = 1;
4280 } else if (so1->u.score < so2->u.score) {
4281 cmp = -1;
4282 } else {
4283 cmp = 0;
4284 }
4285 } else {
4286 /* Alphanumeric sorting */
4287 if (server.sort_bypattern) {
4288 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4289 /* At least one compare object is NULL */
4290 if (so1->u.cmpobj == so2->u.cmpobj)
4291 cmp = 0;
4292 else if (so1->u.cmpobj == NULL)
4293 cmp = -1;
4294 else
4295 cmp = 1;
4296 } else {
4297 /* We have both the objects, use strcoll */
4298 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4299 }
4300 } else {
4301 /* Compare elements directly */
4302 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4303 so2->obj->encoding == REDIS_ENCODING_RAW) {
4304 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4305 } else {
4306 robj *dec1, *dec2;
4307
4308 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4309 so1->obj : getDecodedObject(so1->obj);
4310 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4311 so2->obj : getDecodedObject(so2->obj);
4312 cmp = strcoll(dec1->ptr,dec2->ptr);
4313 if (dec1 != so1->obj) decrRefCount(dec1);
4314 if (dec2 != so2->obj) decrRefCount(dec2);
4315 }
4316 }
4317 }
4318 return server.sort_desc ? -cmp : cmp;
4319 }
4320
4321 /* The SORT command is the most complex command in Redis. Warning: this code
4322 * is optimized for speed and a bit less for readability */
4323 static void sortCommand(redisClient *c) {
4324 list *operations;
4325 int outputlen = 0;
4326 int desc = 0, alpha = 0;
4327 int limit_start = 0, limit_count = -1, start, end;
4328 int j, dontsort = 0, vectorlen;
4329 int getop = 0; /* GET operation counter */
4330 robj *sortval, *sortby = NULL;
4331 redisSortObject *vector; /* Resulting vector to sort */
4332
4333 /* Lookup the key to sort. It must be of the right types */
4334 sortval = lookupKeyRead(c->db,c->argv[1]);
4335 if (sortval == NULL) {
4336 addReply(c,shared.nokeyerr);
4337 return;
4338 }
4339 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4340 addReply(c,shared.wrongtypeerr);
4341 return;
4342 }
4343
4344 /* Create a list of operations to perform for every sorted element.
4345 * Operations can be GET/DEL/INCR/DECR */
4346 operations = listCreate();
4347 listSetFreeMethod(operations,zfree);
4348 j = 2;
4349
4350 /* Now we need to protect sortval incrementing its count, in the future
4351 * SORT may have options able to overwrite/delete keys during the sorting
4352 * and the sorted key itself may get destroied */
4353 incrRefCount(sortval);
4354
4355 /* The SORT command has an SQL-alike syntax, parse it */
4356 while(j < c->argc) {
4357 int leftargs = c->argc-j-1;
4358 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4359 desc = 0;
4360 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4361 desc = 1;
4362 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4363 alpha = 1;
4364 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4365 limit_start = atoi(c->argv[j+1]->ptr);
4366 limit_count = atoi(c->argv[j+2]->ptr);
4367 j+=2;
4368 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4369 sortby = c->argv[j+1];
4370 /* If the BY pattern does not contain '*', i.e. it is constant,
4371 * we don't need to sort nor to lookup the weight keys. */
4372 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4373 j++;
4374 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4375 listAddNodeTail(operations,createSortOperation(
4376 REDIS_SORT_GET,c->argv[j+1]));
4377 getop++;
4378 j++;
4379 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4380 listAddNodeTail(operations,createSortOperation(
4381 REDIS_SORT_DEL,c->argv[j+1]));
4382 j++;
4383 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4384 listAddNodeTail(operations,createSortOperation(
4385 REDIS_SORT_INCR,c->argv[j+1]));
4386 j++;
4387 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4388 listAddNodeTail(operations,createSortOperation(
4389 REDIS_SORT_DECR,c->argv[j+1]));
4390 j++;
4391 } else {
4392 decrRefCount(sortval);
4393 listRelease(operations);
4394 addReply(c,shared.syntaxerr);
4395 return;
4396 }
4397 j++;
4398 }
4399
4400 /* Load the sorting vector with all the objects to sort */
4401 vectorlen = (sortval->type == REDIS_LIST) ?
4402 listLength((list*)sortval->ptr) :
4403 dictSize((dict*)sortval->ptr);
4404 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4405 j = 0;
4406 if (sortval->type == REDIS_LIST) {
4407 list *list = sortval->ptr;
4408 listNode *ln;
4409
4410 listRewind(list);
4411 while((ln = listYield(list))) {
4412 robj *ele = ln->value;
4413 vector[j].obj = ele;
4414 vector[j].u.score = 0;
4415 vector[j].u.cmpobj = NULL;
4416 j++;
4417 }
4418 } else {
4419 dict *set = sortval->ptr;
4420 dictIterator *di;
4421 dictEntry *setele;
4422
4423 di = dictGetIterator(set);
4424 while((setele = dictNext(di)) != NULL) {
4425 vector[j].obj = dictGetEntryKey(setele);
4426 vector[j].u.score = 0;
4427 vector[j].u.cmpobj = NULL;
4428 j++;
4429 }
4430 dictReleaseIterator(di);
4431 }
4432 assert(j == vectorlen);
4433
4434 /* Now it's time to load the right scores in the sorting vector */
4435 if (dontsort == 0) {
4436 for (j = 0; j < vectorlen; j++) {
4437 if (sortby) {
4438 robj *byval;
4439
4440 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4441 if (!byval || byval->type != REDIS_STRING) continue;
4442 if (alpha) {
4443 if (byval->encoding == REDIS_ENCODING_RAW) {
4444 vector[j].u.cmpobj = byval;
4445 incrRefCount(byval);
4446 } else {
4447 vector[j].u.cmpobj = getDecodedObject(byval);
4448 }
4449 } else {
4450 if (byval->encoding == REDIS_ENCODING_RAW) {
4451 vector[j].u.score = strtod(byval->ptr,NULL);
4452 } else {
4453 if (byval->encoding == REDIS_ENCODING_INT) {
4454 vector[j].u.score = (long)byval->ptr;
4455 } else
4456 assert(1 != 1);
4457 }
4458 }
4459 } else {
4460 if (!alpha) {
4461 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4462 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4463 else {
4464 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4465 vector[j].u.score = (long) vector[j].obj->ptr;
4466 else
4467 assert(1 != 1);
4468 }
4469 }
4470 }
4471 }
4472 }
4473
4474 /* We are ready to sort the vector... perform a bit of sanity check
4475 * on the LIMIT option too. We'll use a partial version of quicksort. */
4476 start = (limit_start < 0) ? 0 : limit_start;
4477 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4478 if (start >= vectorlen) {
4479 start = vectorlen-1;
4480 end = vectorlen-2;
4481 }
4482 if (end >= vectorlen) end = vectorlen-1;
4483
4484 if (dontsort == 0) {
4485 server.sort_desc = desc;
4486 server.sort_alpha = alpha;
4487 server.sort_bypattern = sortby ? 1 : 0;
4488 if (sortby && (start != 0 || end != vectorlen-1))
4489 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4490 else
4491 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4492 }
4493
4494 /* Send command output to the output buffer, performing the specified
4495 * GET/DEL/INCR/DECR operations if any. */
4496 outputlen = getop ? getop*(end-start+1) : end-start+1;
4497 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4498 for (j = start; j <= end; j++) {
4499 listNode *ln;
4500 if (!getop) {
4501 addReplyBulkLen(c,vector[j].obj);
4502 addReply(c,vector[j].obj);
4503 addReply(c,shared.crlf);
4504 }
4505 listRewind(operations);
4506 while((ln = listYield(operations))) {
4507 redisSortOperation *sop = ln->value;
4508 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4509 vector[j].obj);
4510
4511 if (sop->type == REDIS_SORT_GET) {
4512 if (!val || val->type != REDIS_STRING) {
4513 addReply(c,shared.nullbulk);
4514 } else {
4515 addReplyBulkLen(c,val);
4516 addReply(c,val);
4517 addReply(c,shared.crlf);
4518 }
4519 } else if (sop->type == REDIS_SORT_DEL) {
4520 /* TODO */
4521 }
4522 }
4523 }
4524
4525 /* Cleanup */
4526 decrRefCount(sortval);
4527 listRelease(operations);
4528 for (j = 0; j < vectorlen; j++) {
4529 if (sortby && alpha && vector[j].u.cmpobj)
4530 decrRefCount(vector[j].u.cmpobj);
4531 }
4532 zfree(vector);
4533 }
4534
4535 static void infoCommand(redisClient *c) {
4536 sds info;
4537 time_t uptime = time(NULL)-server.stat_starttime;
4538 int j;
4539
4540 info = sdscatprintf(sdsempty(),
4541 "redis_version:%s\r\n"
4542 "arch_bits:%s\r\n"
4543 "uptime_in_seconds:%d\r\n"
4544 "uptime_in_days:%d\r\n"
4545 "connected_clients:%d\r\n"
4546 "connected_slaves:%d\r\n"
4547 "used_memory:%zu\r\n"
4548 "changes_since_last_save:%lld\r\n"
4549 "bgsave_in_progress:%d\r\n"
4550 "last_save_time:%d\r\n"
4551 "total_connections_received:%lld\r\n"
4552 "total_commands_processed:%lld\r\n"
4553 "role:%s\r\n"
4554 ,REDIS_VERSION,
4555 (sizeof(long) == 8) ? "64" : "32",
4556 uptime,
4557 uptime/(3600*24),
4558 listLength(server.clients)-listLength(server.slaves),
4559 listLength(server.slaves),
4560 server.usedmemory,
4561 server.dirty,
4562 server.bgsaveinprogress,
4563 server.lastsave,
4564 server.stat_numconnections,
4565 server.stat_numcommands,
4566 server.masterhost == NULL ? "master" : "slave"
4567 );
4568 if (server.masterhost) {
4569 info = sdscatprintf(info,
4570 "master_host:%s\r\n"
4571 "master_port:%d\r\n"
4572 "master_link_status:%s\r\n"
4573 "master_last_io_seconds_ago:%d\r\n"
4574 ,server.masterhost,
4575 server.masterport,
4576 (server.replstate == REDIS_REPL_CONNECTED) ?
4577 "up" : "down",
4578 (int)(time(NULL)-server.master->lastinteraction)
4579 );
4580 }
4581 for (j = 0; j < server.dbnum; j++) {
4582 long long keys, vkeys;
4583
4584 keys = dictSize(server.db[j].dict);
4585 vkeys = dictSize(server.db[j].expires);
4586 if (keys || vkeys) {
4587 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4588 j, keys, vkeys);
4589 }
4590 }
4591 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4592 addReplySds(c,info);
4593 addReply(c,shared.crlf);
4594 }
4595
4596 static void monitorCommand(redisClient *c) {
4597 /* ignore MONITOR if aleady slave or in monitor mode */
4598 if (c->flags & REDIS_SLAVE) return;
4599
4600 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4601 c->slaveseldb = 0;
4602 listAddNodeTail(server.monitors,c);
4603 addReply(c,shared.ok);
4604 }
4605
4606 /* ================================= Expire ================================= */
4607 static int removeExpire(redisDb *db, robj *key) {
4608 if (dictDelete(db->expires,key) == DICT_OK) {
4609 return 1;
4610 } else {
4611 return 0;
4612 }
4613 }
4614
4615 static int setExpire(redisDb *db, robj *key, time_t when) {
4616 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4617 return 0;
4618 } else {
4619 incrRefCount(key);
4620 return 1;
4621 }
4622 }
4623
4624 /* Return the expire time of the specified key, or -1 if no expire
4625 * is associated with this key (i.e. the key is non volatile) */
4626 static time_t getExpire(redisDb *db, robj *key) {
4627 dictEntry *de;
4628
4629 /* No expire? return ASAP */
4630 if (dictSize(db->expires) == 0 ||
4631 (de = dictFind(db->expires,key)) == NULL) return -1;
4632
4633 return (time_t) dictGetEntryVal(de);
4634 }
4635
4636 static int expireIfNeeded(redisDb *db, robj *key) {
4637 time_t when;
4638 dictEntry *de;
4639
4640 /* No expire? return ASAP */
4641 if (dictSize(db->expires) == 0 ||
4642 (de = dictFind(db->expires,key)) == NULL) return 0;
4643
4644 /* Lookup the expire */
4645 when = (time_t) dictGetEntryVal(de);
4646 if (time(NULL) <= when) return 0;
4647
4648 /* Delete the key */
4649 dictDelete(db->expires,key);
4650 return dictDelete(db->dict,key) == DICT_OK;
4651 }
4652
4653 static int deleteIfVolatile(redisDb *db, robj *key) {
4654 dictEntry *de;
4655
4656 /* No expire? return ASAP */
4657 if (dictSize(db->expires) == 0 ||
4658 (de = dictFind(db->expires,key)) == NULL) return 0;
4659
4660 /* Delete the key */
4661 server.dirty++;
4662 dictDelete(db->expires,key);
4663 return dictDelete(db->dict,key) == DICT_OK;
4664 }
4665
4666 static void expireCommand(redisClient *c) {
4667 dictEntry *de;
4668 int seconds = atoi(c->argv[2]->ptr);
4669
4670 de = dictFind(c->db->dict,c->argv[1]);
4671 if (de == NULL) {
4672 addReply(c,shared.czero);
4673 return;
4674 }
4675 if (seconds <= 0) {
4676 addReply(c, shared.czero);
4677 return;
4678 } else {
4679 time_t when = time(NULL)+seconds;
4680 if (setExpire(c->db,c->argv[1],when)) {
4681 addReply(c,shared.cone);
4682 server.dirty++;
4683 } else {
4684 addReply(c,shared.czero);
4685 }
4686 return;
4687 }
4688 }
4689
4690 static void ttlCommand(redisClient *c) {
4691 time_t expire;
4692 int ttl = -1;
4693
4694 expire = getExpire(c->db,c->argv[1]);
4695 if (expire != -1) {
4696 ttl = (int) (expire-time(NULL));
4697 if (ttl < 0) ttl = -1;
4698 }
4699 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4700 }
4701
4702 static void msetGenericCommand(redisClient *c, int nx) {
4703 int j;
4704
4705 if ((c->argc % 2) == 0) {
4706 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4707 return;
4708 }
4709 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4710 * set nothing at all if at least one already key exists. */
4711 if (nx) {
4712 for (j = 1; j < c->argc; j += 2) {
4713 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4714 addReply(c, shared.czero);
4715 return;
4716 }
4717 }
4718 }
4719
4720 for (j = 1; j < c->argc; j += 2) {
4721 int retval;
4722
4723 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4724 if (retval == DICT_ERR) {
4725 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4726 incrRefCount(c->argv[j+1]);
4727 } else {
4728 incrRefCount(c->argv[j]);
4729 incrRefCount(c->argv[j+1]);
4730 }
4731 removeExpire(c->db,c->argv[j]);
4732 }
4733 server.dirty += (c->argc-1)/2;
4734 addReply(c, nx ? shared.cone : shared.ok);
4735 }
4736
4737 static void msetCommand(redisClient *c) {
4738 msetGenericCommand(c,0);
4739 }
4740
4741 static void msetnxCommand(redisClient *c) {
4742 msetGenericCommand(c,1);
4743 }
4744
4745 /* =============================== Replication ============================= */
4746
4747 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4748 ssize_t nwritten, ret = size;
4749 time_t start = time(NULL);
4750
4751 timeout++;
4752 while(size) {
4753 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4754 nwritten = write(fd,ptr,size);
4755 if (nwritten == -1) return -1;
4756 ptr += nwritten;
4757 size -= nwritten;
4758 }
4759 if ((time(NULL)-start) > timeout) {
4760 errno = ETIMEDOUT;
4761 return -1;
4762 }
4763 }
4764 return ret;
4765 }
4766
4767 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4768 ssize_t nread, totread = 0;
4769 time_t start = time(NULL);
4770
4771 timeout++;
4772 while(size) {
4773 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4774 nread = read(fd,ptr,size);
4775 if (nread == -1) return -1;
4776 ptr += nread;
4777 size -= nread;
4778 totread += nread;
4779 }
4780 if ((time(NULL)-start) > timeout) {
4781 errno = ETIMEDOUT;
4782 return -1;
4783 }
4784 }
4785 return totread;
4786 }
4787
4788 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4789 ssize_t nread = 0;
4790
4791 size--;
4792 while(size) {
4793 char c;
4794
4795 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4796 if (c == '\n') {
4797 *ptr = '\0';
4798 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4799 return nread;
4800 } else {
4801 *ptr++ = c;
4802 *ptr = '\0';
4803 nread++;
4804 }
4805 }
4806 return nread;
4807 }
4808
4809 static void syncCommand(redisClient *c) {
4810 /* ignore SYNC if aleady slave or in monitor mode */
4811 if (c->flags & REDIS_SLAVE) return;
4812
4813 /* SYNC can't be issued when the server has pending data to send to
4814 * the client about already issued commands. We need a fresh reply
4815 * buffer registering the differences between the BGSAVE and the current
4816 * dataset, so that we can copy to other slaves if needed. */
4817 if (listLength(c->reply) != 0) {
4818 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4819 return;
4820 }
4821
4822 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4823 /* Here we need to check if there is a background saving operation
4824 * in progress, or if it is required to start one */
4825 if (server.bgsaveinprogress) {
4826 /* Ok a background save is in progress. Let's check if it is a good
4827 * one for replication, i.e. if there is another slave that is
4828 * registering differences since the server forked to save */
4829 redisClient *slave;
4830 listNode *ln;
4831
4832 listRewind(server.slaves);
4833 while((ln = listYield(server.slaves))) {
4834 slave = ln->value;
4835 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4836 }
4837 if (ln) {
4838 /* Perfect, the server is already registering differences for
4839 * another slave. Set the right state, and copy the buffer. */
4840 listRelease(c->reply);
4841 c->reply = listDup(slave->reply);
4842 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4843 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4844 } else {
4845 /* No way, we need to wait for the next BGSAVE in order to
4846 * register differences */
4847 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4848 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4849 }
4850 } else {
4851 /* Ok we don't have a BGSAVE in progress, let's start one */
4852 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4853 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4854 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4855 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4856 return;
4857 }
4858 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4859 }
4860 c->repldbfd = -1;
4861 c->flags |= REDIS_SLAVE;
4862 c->slaveseldb = 0;
4863 listAddNodeTail(server.slaves,c);
4864 return;
4865 }
4866
4867 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4868 redisClient *slave = privdata;
4869 REDIS_NOTUSED(el);
4870 REDIS_NOTUSED(mask);
4871 char buf[REDIS_IOBUF_LEN];
4872 ssize_t nwritten, buflen;
4873
4874 if (slave->repldboff == 0) {
4875 /* Write the bulk write count before to transfer the DB. In theory here
4876 * we don't know how much room there is in the output buffer of the
4877 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4878 * operations) will never be smaller than the few bytes we need. */
4879 sds bulkcount;
4880
4881 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4882 slave->repldbsize);
4883 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4884 {
4885 sdsfree(bulkcount);
4886 freeClient(slave);
4887 return;
4888 }
4889 sdsfree(bulkcount);
4890 }
4891 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4892 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4893 if (buflen <= 0) {
4894 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4895 (buflen == 0) ? "premature EOF" : strerror(errno));
4896 freeClient(slave);
4897 return;
4898 }
4899 if ((nwritten = write(fd,buf,buflen)) == -1) {
4900 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4901 strerror(errno));
4902 freeClient(slave);
4903 return;
4904 }
4905 slave->repldboff += nwritten;
4906 if (slave->repldboff == slave->repldbsize) {
4907 close(slave->repldbfd);
4908 slave->repldbfd = -1;
4909 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4910 slave->replstate = REDIS_REPL_ONLINE;
4911 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4912 sendReplyToClient, slave, NULL) == AE_ERR) {
4913 freeClient(slave);
4914 return;
4915 }
4916 addReplySds(slave,sdsempty());
4917 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4918 }
4919 }
4920
4921 /* This function is called at the end of every backgrond saving.
4922 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4923 * otherwise REDIS_ERR is passed to the function.
4924 *
4925 * The goal of this function is to handle slaves waiting for a successful
4926 * background saving in order to perform non-blocking synchronization. */
4927 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4928 listNode *ln;
4929 int startbgsave = 0;
4930
4931 listRewind(server.slaves);
4932 while((ln = listYield(server.slaves))) {
4933 redisClient *slave = ln->value;
4934
4935 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4936 startbgsave = 1;
4937 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4938 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4939 struct redis_stat buf;
4940
4941 if (bgsaveerr != REDIS_OK) {
4942 freeClient(slave);
4943 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4944 continue;
4945 }
4946 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4947 redis_fstat(slave->repldbfd,&buf) == -1) {
4948 freeClient(slave);
4949 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4950 continue;
4951 }
4952 slave->repldboff = 0;
4953 slave->repldbsize = buf.st_size;
4954 slave->replstate = REDIS_REPL_SEND_BULK;
4955 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4956 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4957 freeClient(slave);
4958 continue;
4959 }
4960 }
4961 }
4962 if (startbgsave) {
4963 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4964 listRewind(server.slaves);
4965 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4966 while((ln = listYield(server.slaves))) {
4967 redisClient *slave = ln->value;
4968
4969 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4970 freeClient(slave);
4971 }
4972 }
4973 }
4974 }
4975
4976 static int syncWithMaster(void) {
4977 char buf[1024], tmpfile[256];
4978 int dumpsize;
4979 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4980 int dfd;
4981
4982 if (fd == -1) {
4983 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4984 strerror(errno));
4985 return REDIS_ERR;
4986 }
4987 /* Issue the SYNC command */
4988 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4989 close(fd);
4990 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4991 strerror(errno));
4992 return REDIS_ERR;
4993 }
4994 /* Read the bulk write count */
4995 if (syncReadLine(fd,buf,1024,3600) == -1) {
4996 close(fd);
4997 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4998 strerror(errno));
4999 return REDIS_ERR;
5000 }
5001 dumpsize = atoi(buf+1);
5002 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5003 /* Read the bulk write data on a temp file */
5004 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5005 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5006 if (dfd == -1) {
5007 close(fd);
5008 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5009 return REDIS_ERR;
5010 }
5011 while(dumpsize) {
5012 int nread, nwritten;
5013
5014 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5015 if (nread == -1) {
5016 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5017 strerror(errno));
5018 close(fd);
5019 close(dfd);
5020 return REDIS_ERR;
5021 }
5022 nwritten = write(dfd,buf,nread);
5023 if (nwritten == -1) {
5024 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5025 close(fd);
5026 close(dfd);
5027 return REDIS_ERR;
5028 }
5029 dumpsize -= nread;
5030 }
5031 close(dfd);
5032 if (rename(tmpfile,server.dbfilename) == -1) {
5033 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5034 unlink(tmpfile);
5035 close(fd);
5036 return REDIS_ERR;
5037 }
5038 emptyDb();
5039 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5040 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5041 close(fd);
5042 return REDIS_ERR;
5043 }
5044 server.master = createClient(fd);
5045 server.master->flags |= REDIS_MASTER;
5046 server.replstate = REDIS_REPL_CONNECTED;
5047 return REDIS_OK;
5048 }
5049
5050 static void slaveofCommand(redisClient *c) {
5051 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5052 !strcasecmp(c->argv[2]->ptr,"one")) {
5053 if (server.masterhost) {
5054 sdsfree(server.masterhost);
5055 server.masterhost = NULL;
5056 if (server.master) freeClient(server.master);
5057 server.replstate = REDIS_REPL_NONE;
5058 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5059 }
5060 } else {
5061 sdsfree(server.masterhost);
5062 server.masterhost = sdsdup(c->argv[1]->ptr);
5063 server.masterport = atoi(c->argv[2]->ptr);
5064 if (server.master) freeClient(server.master);
5065 server.replstate = REDIS_REPL_CONNECT;
5066 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5067 server.masterhost, server.masterport);
5068 }
5069 addReply(c,shared.ok);
5070 }
5071
5072 /* ============================ Maxmemory directive ======================== */
5073
5074 /* This function gets called when 'maxmemory' is set on the config file to limit
5075 * the max memory used by the server, and we are out of memory.
5076 * This function will try to, in order:
5077 *
5078 * - Free objects from the free list
5079 * - Try to remove keys with an EXPIRE set
5080 *
5081 * It is not possible to free enough memory to reach used-memory < maxmemory
5082 * the server will start refusing commands that will enlarge even more the
5083 * memory usage.
5084 */
5085 static void freeMemoryIfNeeded(void) {
5086 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5087 if (listLength(server.objfreelist)) {
5088 robj *o;
5089
5090 listNode *head = listFirst(server.objfreelist);
5091 o = listNodeValue(head);
5092 listDelNode(server.objfreelist,head);
5093 zfree(o);
5094 } else {
5095 int j, k, freed = 0;
5096
5097 for (j = 0; j < server.dbnum; j++) {
5098 int minttl = -1;
5099 robj *minkey = NULL;
5100 struct dictEntry *de;
5101
5102 if (dictSize(server.db[j].expires)) {
5103 freed = 1;
5104 /* From a sample of three keys drop the one nearest to
5105 * the natural expire */
5106 for (k = 0; k < 3; k++) {
5107 time_t t;
5108
5109 de = dictGetRandomKey(server.db[j].expires);
5110 t = (time_t) dictGetEntryVal(de);
5111 if (minttl == -1 || t < minttl) {
5112 minkey = dictGetEntryKey(de);
5113 minttl = t;
5114 }
5115 }
5116 deleteKey(server.db+j,minkey);
5117 }
5118 }
5119 if (!freed) return; /* nothing to free... */
5120 }
5121 }
5122 }
5123
5124 /* ================================= Debugging ============================== */
5125
5126 static void debugCommand(redisClient *c) {
5127 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5128 *((char*)-1) = 'x';
5129 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5130 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5131 robj *key, *val;
5132
5133 if (!de) {
5134 addReply(c,shared.nokeyerr);
5135 return;
5136 }
5137 key = dictGetEntryKey(de);
5138 val = dictGetEntryVal(de);
5139 addReplySds(c,sdscatprintf(sdsempty(),
5140 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5141 key, key->refcount, val, val->refcount, val->encoding));
5142 } else {
5143 addReplySds(c,sdsnew(
5144 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5145 }
5146 }
5147
5148 #ifdef HAVE_BACKTRACE
5149 static struct redisFunctionSym symsTable[] = {
5150 {"compareStringObjects", (unsigned long)compareStringObjects},
5151 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5152 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5153 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5154 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5155 {"freeStringObject", (unsigned long)freeStringObject},
5156 {"freeListObject", (unsigned long)freeListObject},
5157 {"freeSetObject", (unsigned long)freeSetObject},
5158 {"decrRefCount", (unsigned long)decrRefCount},
5159 {"createObject", (unsigned long)createObject},
5160 {"freeClient", (unsigned long)freeClient},
5161 {"rdbLoad", (unsigned long)rdbLoad},
5162 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5163 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5164 {"addReply", (unsigned long)addReply},
5165 {"addReplySds", (unsigned long)addReplySds},
5166 {"incrRefCount", (unsigned long)incrRefCount},
5167 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5168 {"createStringObject", (unsigned long)createStringObject},
5169 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5170 {"syncWithMaster", (unsigned long)syncWithMaster},
5171 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5172 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5173 {"getDecodedObject", (unsigned long)getDecodedObject},
5174 {"removeExpire", (unsigned long)removeExpire},
5175 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5176 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5177 {"deleteKey", (unsigned long)deleteKey},
5178 {"getExpire", (unsigned long)getExpire},
5179 {"setExpire", (unsigned long)setExpire},
5180 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5181 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5182 {"authCommand", (unsigned long)authCommand},
5183 {"pingCommand", (unsigned long)pingCommand},
5184 {"echoCommand", (unsigned long)echoCommand},
5185 {"setCommand", (unsigned long)setCommand},
5186 {"setnxCommand", (unsigned long)setnxCommand},
5187 {"getCommand", (unsigned long)getCommand},
5188 {"delCommand", (unsigned long)delCommand},
5189 {"existsCommand", (unsigned long)existsCommand},
5190 {"incrCommand", (unsigned long)incrCommand},
5191 {"decrCommand", (unsigned long)decrCommand},
5192 {"incrbyCommand", (unsigned long)incrbyCommand},
5193 {"decrbyCommand", (unsigned long)decrbyCommand},
5194 {"selectCommand", (unsigned long)selectCommand},
5195 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5196 {"keysCommand", (unsigned long)keysCommand},
5197 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5198 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5199 {"saveCommand", (unsigned long)saveCommand},
5200 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5201 {"shutdownCommand", (unsigned long)shutdownCommand},
5202 {"moveCommand", (unsigned long)moveCommand},
5203 {"renameCommand", (unsigned long)renameCommand},
5204 {"renamenxCommand", (unsigned long)renamenxCommand},
5205 {"lpushCommand", (unsigned long)lpushCommand},
5206 {"rpushCommand", (unsigned long)rpushCommand},
5207 {"lpopCommand", (unsigned long)lpopCommand},
5208 {"rpopCommand", (unsigned long)rpopCommand},
5209 {"llenCommand", (unsigned long)llenCommand},
5210 {"lindexCommand", (unsigned long)lindexCommand},
5211 {"lrangeCommand", (unsigned long)lrangeCommand},
5212 {"ltrimCommand", (unsigned long)ltrimCommand},
5213 {"typeCommand", (unsigned long)typeCommand},
5214 {"lsetCommand", (unsigned long)lsetCommand},
5215 {"saddCommand", (unsigned long)saddCommand},
5216 {"sremCommand", (unsigned long)sremCommand},
5217 {"smoveCommand", (unsigned long)smoveCommand},
5218 {"sismemberCommand", (unsigned long)sismemberCommand},
5219 {"scardCommand", (unsigned long)scardCommand},
5220 {"spopCommand", (unsigned long)spopCommand},
5221 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5222 {"sinterCommand", (unsigned long)sinterCommand},
5223 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5224 {"sunionCommand", (unsigned long)sunionCommand},
5225 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5226 {"sdiffCommand", (unsigned long)sdiffCommand},
5227 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5228 {"syncCommand", (unsigned long)syncCommand},
5229 {"flushdbCommand", (unsigned long)flushdbCommand},
5230 {"flushallCommand", (unsigned long)flushallCommand},
5231 {"sortCommand", (unsigned long)sortCommand},
5232 {"lremCommand", (unsigned long)lremCommand},
5233 {"infoCommand", (unsigned long)infoCommand},
5234 {"mgetCommand", (unsigned long)mgetCommand},
5235 {"monitorCommand", (unsigned long)monitorCommand},
5236 {"expireCommand", (unsigned long)expireCommand},
5237 {"getsetCommand", (unsigned long)getsetCommand},
5238 {"ttlCommand", (unsigned long)ttlCommand},
5239 {"slaveofCommand", (unsigned long)slaveofCommand},
5240 {"debugCommand", (unsigned long)debugCommand},
5241 {"processCommand", (unsigned long)processCommand},
5242 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5243 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5244 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5245 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5246 {"msetCommand", (unsigned long)msetCommand},
5247 {"msetnxCommand", (unsigned long)msetnxCommand},
5248 {"zslCreateNode", (unsigned long)zslCreateNode},
5249 {"zslCreate", (unsigned long)zslCreate},
5250 {"zslFreeNode",(unsigned long)zslFreeNode},
5251 {"zslFree",(unsigned long)zslFree},
5252 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5253 {"zslInsert",(unsigned long)zslInsert},
5254 {"zslDelete",(unsigned long)zslDelete},
5255 {"createZsetObject",(unsigned long)createZsetObject},
5256 {"zaddCommand",(unsigned long)zaddCommand},
5257 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5258 {"zrangeCommand",(unsigned long)zrangeCommand},
5259 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5260 {"zremCommand",(unsigned long)zremCommand},
5261 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5262 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5263 {NULL,0}
5264 };
5265
5266 /* This function try to convert a pointer into a function name. It's used in
5267 * oreder to provide a backtrace under segmentation fault that's able to
5268 * display functions declared as static (otherwise the backtrace is useless). */
5269 static char *findFuncName(void *pointer, unsigned long *offset){
5270 int i, ret = -1;
5271 unsigned long off, minoff = 0;
5272
5273 /* Try to match against the Symbol with the smallest offset */
5274 for (i=0; symsTable[i].pointer; i++) {
5275 unsigned long lp = (unsigned long) pointer;
5276
5277 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5278 off=lp-symsTable[i].pointer;
5279 if (ret < 0 || off < minoff) {
5280 minoff=off;
5281 ret=i;
5282 }
5283 }
5284 }
5285 if (ret == -1) return NULL;
5286 *offset = minoff;
5287 return symsTable[ret].name;
5288 }
5289
5290 static void *getMcontextEip(ucontext_t *uc) {
5291 #if defined(__FreeBSD__)
5292 return (void*) uc->uc_mcontext.mc_eip;
5293 #elif defined(__dietlibc__)
5294 return (void*) uc->uc_mcontext.eip;
5295 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5296 return (void*) uc->uc_mcontext->__ss.__eip;
5297 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5298 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5299 return (void*) uc->uc_mcontext->__ss.__rip;
5300 #else
5301 return (void*) uc->uc_mcontext->__ss.__eip;
5302 #endif
5303 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5304 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5305 #elif defined(__ia64__) /* Linux IA64 */
5306 return (void*) uc->uc_mcontext.sc_ip;
5307 #else
5308 return NULL;
5309 #endif
5310 }
5311
5312 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5313 void *trace[100];
5314 char **messages = NULL;
5315 int i, trace_size = 0;
5316 unsigned long offset=0;
5317 time_t uptime = time(NULL)-server.stat_starttime;
5318 ucontext_t *uc = (ucontext_t*) secret;
5319 REDIS_NOTUSED(info);
5320
5321 redisLog(REDIS_WARNING,
5322 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5323 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5324 "redis_version:%s; "
5325 "uptime_in_seconds:%d; "
5326 "connected_clients:%d; "
5327 "connected_slaves:%d; "
5328 "used_memory:%zu; "
5329 "changes_since_last_save:%lld; "
5330 "bgsave_in_progress:%d; "
5331 "last_save_time:%d; "
5332 "total_connections_received:%lld; "
5333 "total_commands_processed:%lld; "
5334 "role:%s;"
5335 ,REDIS_VERSION,
5336 uptime,
5337 listLength(server.clients)-listLength(server.slaves),
5338 listLength(server.slaves),
5339 server.usedmemory,
5340 server.dirty,
5341 server.bgsaveinprogress,
5342 server.lastsave,
5343 server.stat_numconnections,
5344 server.stat_numcommands,
5345 server.masterhost == NULL ? "master" : "slave"
5346 ));
5347
5348 trace_size = backtrace(trace, 100);
5349 /* overwrite sigaction with caller's address */
5350 if (getMcontextEip(uc) != NULL) {
5351 trace[1] = getMcontextEip(uc);
5352 }
5353 messages = backtrace_symbols(trace, trace_size);
5354
5355 for (i=1; i<trace_size; ++i) {
5356 char *fn = findFuncName(trace[i], &offset), *p;
5357
5358 p = strchr(messages[i],'+');
5359 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5360 redisLog(REDIS_WARNING,"%s", messages[i]);
5361 } else {
5362 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5363 }
5364 }
5365 free(messages);
5366 exit(0);
5367 }
5368
5369 static void setupSigSegvAction(void) {
5370 struct sigaction act;
5371
5372 sigemptyset (&act.sa_mask);
5373 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5374 * is used. Otherwise, sa_handler is used */
5375 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5376 act.sa_sigaction = segvHandler;
5377 sigaction (SIGSEGV, &act, NULL);
5378 sigaction (SIGBUS, &act, NULL);
5379 sigaction (SIGFPE, &act, NULL);
5380 sigaction (SIGILL, &act, NULL);
5381 sigaction (SIGBUS, &act, NULL);
5382 return;
5383 }
5384 #else /* HAVE_BACKTRACE */
5385 static void setupSigSegvAction(void) {
5386 }
5387 #endif /* HAVE_BACKTRACE */
5388
5389 /* =================================== Main! ================================ */
5390
5391 #ifdef __linux__
5392 int linuxOvercommitMemoryValue(void) {
5393 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5394 char buf[64];
5395
5396 if (!fp) return -1;
5397 if (fgets(buf,64,fp) == NULL) {
5398 fclose(fp);
5399 return -1;
5400 }
5401 fclose(fp);
5402
5403 return atoi(buf);
5404 }
5405
5406 void linuxOvercommitMemoryWarning(void) {
5407 if (linuxOvercommitMemoryValue() == 0) {
5408 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5409 }
5410 }
5411 #endif /* __linux__ */
5412
5413 static void daemonize(void) {
5414 int fd;
5415 FILE *fp;
5416
5417 if (fork() != 0) exit(0); /* parent exits */
5418 setsid(); /* create a new session */
5419
5420 /* Every output goes to /dev/null. If Redis is daemonized but
5421 * the 'logfile' is set to 'stdout' in the configuration file
5422 * it will not log at all. */
5423 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5424 dup2(fd, STDIN_FILENO);
5425 dup2(fd, STDOUT_FILENO);
5426 dup2(fd, STDERR_FILENO);
5427 if (fd > STDERR_FILENO) close(fd);
5428 }
5429 /* Try to write the pid file */
5430 fp = fopen(server.pidfile,"w");
5431 if (fp) {
5432 fprintf(fp,"%d\n",getpid());
5433 fclose(fp);
5434 }
5435 }
5436
5437 int main(int argc, char **argv) {
5438 initServerConfig();
5439 if (argc == 2) {
5440 ResetServerSaveParams();
5441 loadServerConfig(argv[1]);
5442 } else if (argc > 2) {
5443 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5444 exit(1);
5445 } else {
5446 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5447 }
5448 initServer();
5449 if (server.daemonize) daemonize();
5450 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5451 #ifdef __linux__
5452 linuxOvercommitMemoryWarning();
5453 #endif
5454 if (rdbLoad(server.dbfilename) == REDIS_OK)
5455 redisLog(REDIS_NOTICE,"DB loaded from disk");
5456 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5457 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5458 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5459 aeMain(server.el);
5460 aeDeleteEventLoop(server.el);
5461 return 0;
5462 }