]> git.saurik.com Git - redis.git/blob - redis.c
e26e676f64cdd397ac706dcb2514a4a34264952b
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61 #if defined(__sun) && defined(__GNUC__)
62 #include "solarisfixes.h"
63 #endif
64
65 #include "redis.h"
66 #include "ae.h" /* Event driven programming library */
67 #include "sds.h" /* Dynamic safe strings */
68 #include "anet.h" /* Networking the easy way */
69 #include "dict.h" /* Hash tables */
70 #include "adlist.h" /* Linked lists */
71 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
72 #include "lzf.h" /* LZF compression library */
73 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
74
75 /* Error codes */
76 #define REDIS_OK 0
77 #define REDIS_ERR -1
78
79 /* Static server configuration */
80 #define REDIS_SERVERPORT 6379 /* TCP port */
81 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
82 #define REDIS_IOBUF_LEN 1024
83 #define REDIS_LOADBUF_LEN 1024
84 #define REDIS_STATIC_ARGS 4
85 #define REDIS_DEFAULT_DBNUM 16
86 #define REDIS_CONFIGLINE_MAX 1024
87 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
88 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
89 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
90 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
91 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
92
93 /* Hash table parameters */
94 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
95
96 /* Command flags */
97 #define REDIS_CMD_BULK 1 /* Bulk write command */
98 #define REDIS_CMD_INLINE 2 /* Inline command */
99 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
100 this flags will return an error when the 'maxmemory' option is set in the
101 config file and the server is using more than maxmemory bytes of memory.
102 In short this commands are denied on low memory conditions. */
103 #define REDIS_CMD_DENYOOM 4
104
105 /* Object types */
106 #define REDIS_STRING 0
107 #define REDIS_LIST 1
108 #define REDIS_SET 2
109 #define REDIS_ZSET 3
110 #define REDIS_HASH 4
111
112 /* Objects encoding */
113 #define REDIS_ENCODING_RAW 0 /* Raw representation */
114 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
115
116 /* Object types only used for dumping to disk */
117 #define REDIS_EXPIRETIME 253
118 #define REDIS_SELECTDB 254
119 #define REDIS_EOF 255
120
121 /* Defines related to the dump file format. To store 32 bits lengths for short
122 * keys requires a lot of space, so we check the most significant 2 bits of
123 * the first byte to interpreter the length:
124 *
125 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
126 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
127 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
128 * 11|000000 this means: specially encoded object will follow. The six bits
129 * number specify the kind of object that follows.
130 * See the REDIS_RDB_ENC_* defines.
131 *
132 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
133 * values, will fit inside. */
134 #define REDIS_RDB_6BITLEN 0
135 #define REDIS_RDB_14BITLEN 1
136 #define REDIS_RDB_32BITLEN 2
137 #define REDIS_RDB_ENCVAL 3
138 #define REDIS_RDB_LENERR UINT_MAX
139
140 /* When a length of a string object stored on disk has the first two bits
141 * set, the remaining two bits specify a special encoding for the object
142 * accordingly to the following defines: */
143 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
144 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
145 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
146 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
147
148 /* Client flags */
149 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
150 #define REDIS_SLAVE 2 /* This client is a slave server */
151 #define REDIS_MASTER 4 /* This client is a master server */
152 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
153
154 /* Slave replication state - slave side */
155 #define REDIS_REPL_NONE 0 /* No active replication */
156 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
157 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
158
159 /* Slave replication state - from the point of view of master
160 * Note that in SEND_BULK and ONLINE state the slave receives new updates
161 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
162 * to start the next background saving in order to send updates to it. */
163 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
164 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
165 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
166 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
167
168 /* List related stuff */
169 #define REDIS_HEAD 0
170 #define REDIS_TAIL 1
171
172 /* Sort operations */
173 #define REDIS_SORT_GET 0
174 #define REDIS_SORT_DEL 1
175 #define REDIS_SORT_INCR 2
176 #define REDIS_SORT_DECR 3
177 #define REDIS_SORT_ASC 4
178 #define REDIS_SORT_DESC 5
179 #define REDIS_SORTKEY_MAX 1024
180
181 /* Log levels */
182 #define REDIS_DEBUG 0
183 #define REDIS_NOTICE 1
184 #define REDIS_WARNING 2
185
186 /* Anti-warning macro... */
187 #define REDIS_NOTUSED(V) ((void) V)
188
189 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
190 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
191
192 /*================================= Data types ============================== */
193
194 /* A redis object, that is a type able to hold a string / list / set */
195 typedef struct redisObject {
196 void *ptr;
197 unsigned char type;
198 unsigned char encoding;
199 unsigned char notused[2];
200 int refcount;
201 } robj;
202
203 typedef struct redisDb {
204 dict *dict;
205 dict *expires;
206 int id;
207 } redisDb;
208
209 /* With multiplexing we need to take per-clinet state.
210 * Clients are taken in a liked list. */
211 typedef struct redisClient {
212 int fd;
213 redisDb *db;
214 int dictid;
215 sds querybuf;
216 robj **argv, **mbargv;
217 int argc, mbargc;
218 int bulklen; /* bulk read len. -1 if not in bulk read mode */
219 int multibulk; /* multi bulk command format active */
220 list *reply;
221 int sentlen;
222 time_t lastinteraction; /* time of the last interaction, used for timeout */
223 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
224 int slaveseldb; /* slave selected db, if this client is a slave */
225 int authenticated; /* when requirepass is non-NULL */
226 int replstate; /* replication state if this is a slave */
227 int repldbfd; /* replication DB file descriptor */
228 long repldboff; /* replication DB file offset */
229 off_t repldbsize; /* replication DB file size */
230 } redisClient;
231
232 struct saveparam {
233 time_t seconds;
234 int changes;
235 };
236
237 /* Global server state structure */
238 struct redisServer {
239 int port;
240 int fd;
241 redisDb *db;
242 dict *sharingpool;
243 unsigned int sharingpoolsize;
244 long long dirty; /* changes to DB from the last save */
245 list *clients;
246 list *slaves, *monitors;
247 char neterr[ANET_ERR_LEN];
248 aeEventLoop *el;
249 int cronloops; /* number of times the cron function run */
250 list *objfreelist; /* A list of freed objects to avoid malloc() */
251 time_t lastsave; /* Unix time of last save succeeede */
252 size_t usedmemory; /* Used memory in megabytes */
253 /* Fields used only for stats */
254 time_t stat_starttime; /* server start time */
255 long long stat_numcommands; /* number of processed commands */
256 long long stat_numconnections; /* number of connections received */
257 /* Configuration */
258 int verbosity;
259 int glueoutputbuf;
260 int maxidletime;
261 int dbnum;
262 int daemonize;
263 char *pidfile;
264 int bgsaveinprogress;
265 pid_t bgsavechildpid;
266 struct saveparam *saveparams;
267 int saveparamslen;
268 char *logfile;
269 char *bindaddr;
270 char *dbfilename;
271 char *requirepass;
272 int shareobjects;
273 /* Replication related */
274 int isslave;
275 char *masterhost;
276 int masterport;
277 redisClient *master; /* client that is master for this slave */
278 int replstate;
279 unsigned int maxclients;
280 unsigned long maxmemory;
281 /* Sort parameters - qsort_r() is only available under BSD so we
282 * have to take this state global, in order to pass it to sortCompare() */
283 int sort_desc;
284 int sort_alpha;
285 int sort_bypattern;
286 };
287
288 typedef void redisCommandProc(redisClient *c);
289 struct redisCommand {
290 char *name;
291 redisCommandProc *proc;
292 int arity;
293 int flags;
294 };
295
296 struct redisFunctionSym {
297 char *name;
298 unsigned long pointer;
299 };
300
301 typedef struct _redisSortObject {
302 robj *obj;
303 union {
304 double score;
305 robj *cmpobj;
306 } u;
307 } redisSortObject;
308
309 typedef struct _redisSortOperation {
310 int type;
311 robj *pattern;
312 } redisSortOperation;
313
314 /* ZSETs use a specialized version of Skiplists */
315
316 typedef struct zskiplistNode {
317 struct zskiplistNode **forward;
318 struct zskiplistNode *backward;
319 double score;
320 robj *obj;
321 } zskiplistNode;
322
323 typedef struct zskiplist {
324 struct zskiplistNode *header, *tail;
325 long length;
326 int level;
327 } zskiplist;
328
329 typedef struct zset {
330 dict *dict;
331 zskiplist *zsl;
332 } zset;
333
334 /* Our shared "common" objects */
335
336 struct sharedObjectsStruct {
337 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
338 *colon, *nullbulk, *nullmultibulk,
339 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
340 *outofrangeerr, *plus,
341 *select0, *select1, *select2, *select3, *select4,
342 *select5, *select6, *select7, *select8, *select9;
343 } shared;
344
345 /* Global vars that are actally used as constants. The following double
346 * values are used for double on-disk serialization, and are initialized
347 * at runtime to avoid strange compiler optimizations. */
348
349 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
350
351 /*================================ Prototypes =============================== */
352
353 static void freeStringObject(robj *o);
354 static void freeListObject(robj *o);
355 static void freeSetObject(robj *o);
356 static void decrRefCount(void *o);
357 static robj *createObject(int type, void *ptr);
358 static void freeClient(redisClient *c);
359 static int rdbLoad(char *filename);
360 static void addReply(redisClient *c, robj *obj);
361 static void addReplySds(redisClient *c, sds s);
362 static void incrRefCount(robj *o);
363 static int rdbSaveBackground(char *filename);
364 static robj *createStringObject(char *ptr, size_t len);
365 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
366 static int syncWithMaster(void);
367 static robj *tryObjectSharing(robj *o);
368 static int tryObjectEncoding(robj *o);
369 static robj *getDecodedObject(const robj *o);
370 static int removeExpire(redisDb *db, robj *key);
371 static int expireIfNeeded(redisDb *db, robj *key);
372 static int deleteIfVolatile(redisDb *db, robj *key);
373 static int deleteKey(redisDb *db, robj *key);
374 static time_t getExpire(redisDb *db, robj *key);
375 static int setExpire(redisDb *db, robj *key, time_t when);
376 static void updateSlavesWaitingBgsave(int bgsaveerr);
377 static void freeMemoryIfNeeded(void);
378 static int processCommand(redisClient *c);
379 static void setupSigSegvAction(void);
380 static void rdbRemoveTempFile(pid_t childpid);
381 static size_t stringObjectLen(robj *o);
382 static void processInputBuffer(redisClient *c);
383 static zskiplist *zslCreate(void);
384 static void zslFree(zskiplist *zsl);
385 static void zslInsert(zskiplist *zsl, double score, robj *obj);
386
387 static void authCommand(redisClient *c);
388 static void pingCommand(redisClient *c);
389 static void echoCommand(redisClient *c);
390 static void setCommand(redisClient *c);
391 static void setnxCommand(redisClient *c);
392 static void getCommand(redisClient *c);
393 static void delCommand(redisClient *c);
394 static void existsCommand(redisClient *c);
395 static void incrCommand(redisClient *c);
396 static void decrCommand(redisClient *c);
397 static void incrbyCommand(redisClient *c);
398 static void decrbyCommand(redisClient *c);
399 static void selectCommand(redisClient *c);
400 static void randomkeyCommand(redisClient *c);
401 static void keysCommand(redisClient *c);
402 static void dbsizeCommand(redisClient *c);
403 static void lastsaveCommand(redisClient *c);
404 static void saveCommand(redisClient *c);
405 static void bgsaveCommand(redisClient *c);
406 static void shutdownCommand(redisClient *c);
407 static void moveCommand(redisClient *c);
408 static void renameCommand(redisClient *c);
409 static void renamenxCommand(redisClient *c);
410 static void lpushCommand(redisClient *c);
411 static void rpushCommand(redisClient *c);
412 static void lpopCommand(redisClient *c);
413 static void rpopCommand(redisClient *c);
414 static void llenCommand(redisClient *c);
415 static void lindexCommand(redisClient *c);
416 static void lrangeCommand(redisClient *c);
417 static void ltrimCommand(redisClient *c);
418 static void typeCommand(redisClient *c);
419 static void lsetCommand(redisClient *c);
420 static void saddCommand(redisClient *c);
421 static void sremCommand(redisClient *c);
422 static void smoveCommand(redisClient *c);
423 static void sismemberCommand(redisClient *c);
424 static void scardCommand(redisClient *c);
425 static void spopCommand(redisClient *c);
426 static void srandmemberCommand(redisClient *c);
427 static void sinterCommand(redisClient *c);
428 static void sinterstoreCommand(redisClient *c);
429 static void sunionCommand(redisClient *c);
430 static void sunionstoreCommand(redisClient *c);
431 static void sdiffCommand(redisClient *c);
432 static void sdiffstoreCommand(redisClient *c);
433 static void syncCommand(redisClient *c);
434 static void flushdbCommand(redisClient *c);
435 static void flushallCommand(redisClient *c);
436 static void sortCommand(redisClient *c);
437 static void lremCommand(redisClient *c);
438 static void infoCommand(redisClient *c);
439 static void mgetCommand(redisClient *c);
440 static void monitorCommand(redisClient *c);
441 static void expireCommand(redisClient *c);
442 static void getsetCommand(redisClient *c);
443 static void ttlCommand(redisClient *c);
444 static void slaveofCommand(redisClient *c);
445 static void debugCommand(redisClient *c);
446 static void msetCommand(redisClient *c);
447 static void msetnxCommand(redisClient *c);
448 static void zaddCommand(redisClient *c);
449 static void zrangeCommand(redisClient *c);
450 static void zrangebyscoreCommand(redisClient *c);
451 static void zrevrangeCommand(redisClient *c);
452 static void zlenCommand(redisClient *c);
453 static void zremCommand(redisClient *c);
454 static void zscoreCommand(redisClient *c);
455
456 /*================================= Globals ================================= */
457
458 /* Global vars */
459 static struct redisServer server; /* server global state */
460 static struct redisCommand cmdTable[] = {
461 {"get",getCommand,2,REDIS_CMD_INLINE},
462 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
463 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
464 {"del",delCommand,-2,REDIS_CMD_INLINE},
465 {"exists",existsCommand,2,REDIS_CMD_INLINE},
466 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
467 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
468 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
469 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
470 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
471 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
472 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
473 {"llen",llenCommand,2,REDIS_CMD_INLINE},
474 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
475 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
476 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
477 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
478 {"lrem",lremCommand,4,REDIS_CMD_BULK},
479 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
480 {"srem",sremCommand,3,REDIS_CMD_BULK},
481 {"smove",smoveCommand,4,REDIS_CMD_BULK},
482 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
483 {"scard",scardCommand,2,REDIS_CMD_INLINE},
484 {"spop",spopCommand,2,REDIS_CMD_INLINE},
485 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
486 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
487 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
490 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
491 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
493 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
494 {"zrem",zremCommand,3,REDIS_CMD_BULK},
495 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
496 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
497 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
498 {"zlen",zlenCommand,2,REDIS_CMD_INLINE},
499 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
500 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
501 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
503 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
505 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
506 {"select",selectCommand,2,REDIS_CMD_INLINE},
507 {"move",moveCommand,3,REDIS_CMD_INLINE},
508 {"rename",renameCommand,3,REDIS_CMD_INLINE},
509 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
510 {"expire",expireCommand,3,REDIS_CMD_INLINE},
511 {"keys",keysCommand,2,REDIS_CMD_INLINE},
512 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
513 {"auth",authCommand,2,REDIS_CMD_INLINE},
514 {"ping",pingCommand,1,REDIS_CMD_INLINE},
515 {"echo",echoCommand,2,REDIS_CMD_BULK},
516 {"save",saveCommand,1,REDIS_CMD_INLINE},
517 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
518 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
519 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
520 {"type",typeCommand,2,REDIS_CMD_INLINE},
521 {"sync",syncCommand,1,REDIS_CMD_INLINE},
522 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
523 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
524 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
525 {"info",infoCommand,1,REDIS_CMD_INLINE},
526 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
527 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
528 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
529 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
530 {NULL,NULL,0,0}
531 };
532 /*============================ Utility functions ============================ */
533
534 /* Glob-style pattern matching. */
535 int stringmatchlen(const char *pattern, int patternLen,
536 const char *string, int stringLen, int nocase)
537 {
538 while(patternLen) {
539 switch(pattern[0]) {
540 case '*':
541 while (pattern[1] == '*') {
542 pattern++;
543 patternLen--;
544 }
545 if (patternLen == 1)
546 return 1; /* match */
547 while(stringLen) {
548 if (stringmatchlen(pattern+1, patternLen-1,
549 string, stringLen, nocase))
550 return 1; /* match */
551 string++;
552 stringLen--;
553 }
554 return 0; /* no match */
555 break;
556 case '?':
557 if (stringLen == 0)
558 return 0; /* no match */
559 string++;
560 stringLen--;
561 break;
562 case '[':
563 {
564 int not, match;
565
566 pattern++;
567 patternLen--;
568 not = pattern[0] == '^';
569 if (not) {
570 pattern++;
571 patternLen--;
572 }
573 match = 0;
574 while(1) {
575 if (pattern[0] == '\\') {
576 pattern++;
577 patternLen--;
578 if (pattern[0] == string[0])
579 match = 1;
580 } else if (pattern[0] == ']') {
581 break;
582 } else if (patternLen == 0) {
583 pattern--;
584 patternLen++;
585 break;
586 } else if (pattern[1] == '-' && patternLen >= 3) {
587 int start = pattern[0];
588 int end = pattern[2];
589 int c = string[0];
590 if (start > end) {
591 int t = start;
592 start = end;
593 end = t;
594 }
595 if (nocase) {
596 start = tolower(start);
597 end = tolower(end);
598 c = tolower(c);
599 }
600 pattern += 2;
601 patternLen -= 2;
602 if (c >= start && c <= end)
603 match = 1;
604 } else {
605 if (!nocase) {
606 if (pattern[0] == string[0])
607 match = 1;
608 } else {
609 if (tolower((int)pattern[0]) == tolower((int)string[0]))
610 match = 1;
611 }
612 }
613 pattern++;
614 patternLen--;
615 }
616 if (not)
617 match = !match;
618 if (!match)
619 return 0; /* no match */
620 string++;
621 stringLen--;
622 break;
623 }
624 case '\\':
625 if (patternLen >= 2) {
626 pattern++;
627 patternLen--;
628 }
629 /* fall through */
630 default:
631 if (!nocase) {
632 if (pattern[0] != string[0])
633 return 0; /* no match */
634 } else {
635 if (tolower((int)pattern[0]) != tolower((int)string[0]))
636 return 0; /* no match */
637 }
638 string++;
639 stringLen--;
640 break;
641 }
642 pattern++;
643 patternLen--;
644 if (stringLen == 0) {
645 while(*pattern == '*') {
646 pattern++;
647 patternLen--;
648 }
649 break;
650 }
651 }
652 if (patternLen == 0 && stringLen == 0)
653 return 1;
654 return 0;
655 }
656
657 static void redisLog(int level, const char *fmt, ...) {
658 va_list ap;
659 FILE *fp;
660
661 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
662 if (!fp) return;
663
664 va_start(ap, fmt);
665 if (level >= server.verbosity) {
666 char *c = ".-*";
667 char buf[64];
668 time_t now;
669
670 now = time(NULL);
671 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
672 fprintf(fp,"%s %c ",buf,c[level]);
673 vfprintf(fp, fmt, ap);
674 fprintf(fp,"\n");
675 fflush(fp);
676 }
677 va_end(ap);
678
679 if (server.logfile) fclose(fp);
680 }
681
682 /*====================== Hash table type implementation ==================== */
683
684 /* This is an hash table type that uses the SDS dynamic strings libary as
685 * keys and radis objects as values (objects can hold SDS strings,
686 * lists, sets). */
687
688 static void dictVanillaFree(void *privdata, void *val)
689 {
690 DICT_NOTUSED(privdata);
691 zfree(val);
692 }
693
694 static int sdsDictKeyCompare(void *privdata, const void *key1,
695 const void *key2)
696 {
697 int l1,l2;
698 DICT_NOTUSED(privdata);
699
700 l1 = sdslen((sds)key1);
701 l2 = sdslen((sds)key2);
702 if (l1 != l2) return 0;
703 return memcmp(key1, key2, l1) == 0;
704 }
705
706 static void dictRedisObjectDestructor(void *privdata, void *val)
707 {
708 DICT_NOTUSED(privdata);
709
710 decrRefCount(val);
711 }
712
713 static int dictObjKeyCompare(void *privdata, const void *key1,
714 const void *key2)
715 {
716 const robj *o1 = key1, *o2 = key2;
717 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
718 }
719
720 static unsigned int dictObjHash(const void *key) {
721 const robj *o = key;
722 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
723 }
724
725 static int dictEncObjKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727 {
728 const robj *o1 = key1, *o2 = key2;
729
730 if (o1->encoding == REDIS_ENCODING_RAW &&
731 o2->encoding == REDIS_ENCODING_RAW)
732 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
733 else {
734 robj *dec1, *dec2;
735 int cmp;
736
737 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
738 getDecodedObject(o1) : (robj*)o1;
739 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
740 getDecodedObject(o2) : (robj*)o2;
741 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
742 if (dec1 != o1) decrRefCount(dec1);
743 if (dec2 != o2) decrRefCount(dec2);
744 return cmp;
745 }
746 }
747
748 static unsigned int dictEncObjHash(const void *key) {
749 const robj *o = key;
750
751 if (o->encoding == REDIS_ENCODING_RAW)
752 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
753 else {
754 robj *dec = getDecodedObject(o);
755 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
756 decrRefCount(dec);
757 return hash;
758 }
759 }
760
761 static dictType setDictType = {
762 dictEncObjHash, /* hash function */
763 NULL, /* key dup */
764 NULL, /* val dup */
765 dictEncObjKeyCompare, /* key compare */
766 dictRedisObjectDestructor, /* key destructor */
767 NULL /* val destructor */
768 };
769
770 static dictType zsetDictType = {
771 dictEncObjHash, /* hash function */
772 NULL, /* key dup */
773 NULL, /* val dup */
774 dictEncObjKeyCompare, /* key compare */
775 dictRedisObjectDestructor, /* key destructor */
776 dictVanillaFree /* val destructor */
777 };
778
779 static dictType hashDictType = {
780 dictObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 dictRedisObjectDestructor /* val destructor */
786 };
787
788 /* ========================= Random utility functions ======================= */
789
790 /* Redis generally does not try to recover from out of memory conditions
791 * when allocating objects or strings, it is not clear if it will be possible
792 * to report this condition to the client since the networking layer itself
793 * is based on heap allocation for send buffers, so we simply abort.
794 * At least the code will be simpler to read... */
795 static void oom(const char *msg) {
796 fprintf(stderr, "%s: Out of memory\n",msg);
797 fflush(stderr);
798 sleep(1);
799 abort();
800 }
801
802 /* ====================== Redis server networking stuff ===================== */
803 static void closeTimedoutClients(void) {
804 redisClient *c;
805 listNode *ln;
806 time_t now = time(NULL);
807
808 listRewind(server.clients);
809 while ((ln = listYield(server.clients)) != NULL) {
810 c = listNodeValue(ln);
811 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
812 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
813 (now - c->lastinteraction > server.maxidletime)) {
814 redisLog(REDIS_DEBUG,"Closing idle client");
815 freeClient(c);
816 }
817 }
818 }
819
820 static int htNeedsResize(dict *dict) {
821 long long size, used;
822
823 size = dictSlots(dict);
824 used = dictSize(dict);
825 return (size && used && size > DICT_HT_INITIAL_SIZE &&
826 (used*100/size < REDIS_HT_MINFILL));
827 }
828
829 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
830 * we resize the hash table to save memory */
831 static void tryResizeHashTables(void) {
832 int j;
833
834 for (j = 0; j < server.dbnum; j++) {
835 if (htNeedsResize(server.db[j].dict)) {
836 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
837 dictResize(server.db[j].dict);
838 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
839 }
840 if (htNeedsResize(server.db[j].expires))
841 dictResize(server.db[j].expires);
842 }
843 }
844
845 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
846 int j, loops = server.cronloops++;
847 REDIS_NOTUSED(eventLoop);
848 REDIS_NOTUSED(id);
849 REDIS_NOTUSED(clientData);
850
851 /* Update the global state with the amount of used memory */
852 server.usedmemory = zmalloc_used_memory();
853
854 /* Show some info about non-empty databases */
855 for (j = 0; j < server.dbnum; j++) {
856 long long size, used, vkeys;
857
858 size = dictSlots(server.db[j].dict);
859 used = dictSize(server.db[j].dict);
860 vkeys = dictSize(server.db[j].expires);
861 if (!(loops % 5) && (used || vkeys)) {
862 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
863 /* dictPrintStats(server.dict); */
864 }
865 }
866
867 /* We don't want to resize the hash tables while a bacground saving
868 * is in progress: the saving child is created using fork() that is
869 * implemented with a copy-on-write semantic in most modern systems, so
870 * if we resize the HT while there is the saving child at work actually
871 * a lot of memory movements in the parent will cause a lot of pages
872 * copied. */
873 if (!server.bgsaveinprogress) tryResizeHashTables();
874
875 /* Show information about connected clients */
876 if (!(loops % 5)) {
877 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
878 listLength(server.clients)-listLength(server.slaves),
879 listLength(server.slaves),
880 server.usedmemory,
881 dictSize(server.sharingpool));
882 }
883
884 /* Close connections of timedout clients */
885 if (server.maxidletime && !(loops % 10))
886 closeTimedoutClients();
887
888 /* Check if a background saving in progress terminated */
889 if (server.bgsaveinprogress) {
890 int statloc;
891 if (wait4(-1,&statloc,WNOHANG,NULL)) {
892 int exitcode = WEXITSTATUS(statloc);
893 int bysignal = WIFSIGNALED(statloc);
894
895 if (!bysignal && exitcode == 0) {
896 redisLog(REDIS_NOTICE,
897 "Background saving terminated with success");
898 server.dirty = 0;
899 server.lastsave = time(NULL);
900 } else if (!bysignal && exitcode != 0) {
901 redisLog(REDIS_WARNING, "Background saving error");
902 } else {
903 redisLog(REDIS_WARNING,
904 "Background saving terminated by signal");
905 rdbRemoveTempFile(server.bgsavechildpid);
906 }
907 server.bgsaveinprogress = 0;
908 server.bgsavechildpid = -1;
909 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
910 }
911 } else {
912 /* If there is not a background saving in progress check if
913 * we have to save now */
914 time_t now = time(NULL);
915 for (j = 0; j < server.saveparamslen; j++) {
916 struct saveparam *sp = server.saveparams+j;
917
918 if (server.dirty >= sp->changes &&
919 now-server.lastsave > sp->seconds) {
920 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
921 sp->changes, sp->seconds);
922 rdbSaveBackground(server.dbfilename);
923 break;
924 }
925 }
926 }
927
928 /* Try to expire a few timed out keys */
929 for (j = 0; j < server.dbnum; j++) {
930 redisDb *db = server.db+j;
931 int num = dictSize(db->expires);
932
933 if (num) {
934 time_t now = time(NULL);
935
936 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
937 num = REDIS_EXPIRELOOKUPS_PER_CRON;
938 while (num--) {
939 dictEntry *de;
940 time_t t;
941
942 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
943 t = (time_t) dictGetEntryVal(de);
944 if (now > t) {
945 deleteKey(db,dictGetEntryKey(de));
946 }
947 }
948 }
949 }
950
951 /* Check if we should connect to a MASTER */
952 if (server.replstate == REDIS_REPL_CONNECT) {
953 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
954 if (syncWithMaster() == REDIS_OK) {
955 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
956 }
957 }
958 return 1000;
959 }
960
961 static void createSharedObjects(void) {
962 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
963 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
964 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
965 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
966 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
967 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
968 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
969 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
970 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
971 /* no such key */
972 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
973 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
974 "-ERR Operation against a key holding the wrong kind of value\r\n"));
975 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
976 "-ERR no such key\r\n"));
977 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
978 "-ERR syntax error\r\n"));
979 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
980 "-ERR source and destination objects are the same\r\n"));
981 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
982 "-ERR index out of range\r\n"));
983 shared.space = createObject(REDIS_STRING,sdsnew(" "));
984 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
985 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
986 shared.select0 = createStringObject("select 0\r\n",10);
987 shared.select1 = createStringObject("select 1\r\n",10);
988 shared.select2 = createStringObject("select 2\r\n",10);
989 shared.select3 = createStringObject("select 3\r\n",10);
990 shared.select4 = createStringObject("select 4\r\n",10);
991 shared.select5 = createStringObject("select 5\r\n",10);
992 shared.select6 = createStringObject("select 6\r\n",10);
993 shared.select7 = createStringObject("select 7\r\n",10);
994 shared.select8 = createStringObject("select 8\r\n",10);
995 shared.select9 = createStringObject("select 9\r\n",10);
996 }
997
998 static void appendServerSaveParams(time_t seconds, int changes) {
999 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1000 server.saveparams[server.saveparamslen].seconds = seconds;
1001 server.saveparams[server.saveparamslen].changes = changes;
1002 server.saveparamslen++;
1003 }
1004
1005 static void ResetServerSaveParams() {
1006 zfree(server.saveparams);
1007 server.saveparams = NULL;
1008 server.saveparamslen = 0;
1009 }
1010
1011 static void initServerConfig() {
1012 server.dbnum = REDIS_DEFAULT_DBNUM;
1013 server.port = REDIS_SERVERPORT;
1014 server.verbosity = REDIS_DEBUG;
1015 server.maxidletime = REDIS_MAXIDLETIME;
1016 server.saveparams = NULL;
1017 server.logfile = NULL; /* NULL = log on standard output */
1018 server.bindaddr = NULL;
1019 server.glueoutputbuf = 1;
1020 server.daemonize = 0;
1021 server.pidfile = "/var/run/redis.pid";
1022 server.dbfilename = "dump.rdb";
1023 server.requirepass = NULL;
1024 server.shareobjects = 0;
1025 server.sharingpoolsize = 1024;
1026 server.maxclients = 0;
1027 server.maxmemory = 0;
1028 ResetServerSaveParams();
1029
1030 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1031 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1032 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1033 /* Replication related */
1034 server.isslave = 0;
1035 server.masterhost = NULL;
1036 server.masterport = 6379;
1037 server.master = NULL;
1038 server.replstate = REDIS_REPL_NONE;
1039
1040 /* Double constants initialization */
1041 R_Zero = 0.0;
1042 R_PosInf = 1.0/R_Zero;
1043 R_NegInf = -1.0/R_Zero;
1044 R_Nan = R_Zero/R_Zero;
1045 }
1046
1047 static void initServer() {
1048 int j;
1049
1050 signal(SIGHUP, SIG_IGN);
1051 signal(SIGPIPE, SIG_IGN);
1052 setupSigSegvAction();
1053
1054 server.clients = listCreate();
1055 server.slaves = listCreate();
1056 server.monitors = listCreate();
1057 server.objfreelist = listCreate();
1058 createSharedObjects();
1059 server.el = aeCreateEventLoop();
1060 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1061 server.sharingpool = dictCreate(&setDictType,NULL);
1062 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1063 if (server.fd == -1) {
1064 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1065 exit(1);
1066 }
1067 for (j = 0; j < server.dbnum; j++) {
1068 server.db[j].dict = dictCreate(&hashDictType,NULL);
1069 server.db[j].expires = dictCreate(&setDictType,NULL);
1070 server.db[j].id = j;
1071 }
1072 server.cronloops = 0;
1073 server.bgsaveinprogress = 0;
1074 server.bgsavechildpid = -1;
1075 server.lastsave = time(NULL);
1076 server.dirty = 0;
1077 server.usedmemory = 0;
1078 server.stat_numcommands = 0;
1079 server.stat_numconnections = 0;
1080 server.stat_starttime = time(NULL);
1081 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1082 }
1083
1084 /* Empty the whole database */
1085 static long long emptyDb() {
1086 int j;
1087 long long removed = 0;
1088
1089 for (j = 0; j < server.dbnum; j++) {
1090 removed += dictSize(server.db[j].dict);
1091 dictEmpty(server.db[j].dict);
1092 dictEmpty(server.db[j].expires);
1093 }
1094 return removed;
1095 }
1096
1097 static int yesnotoi(char *s) {
1098 if (!strcasecmp(s,"yes")) return 1;
1099 else if (!strcasecmp(s,"no")) return 0;
1100 else return -1;
1101 }
1102
1103 /* I agree, this is a very rudimental way to load a configuration...
1104 will improve later if the config gets more complex */
1105 static void loadServerConfig(char *filename) {
1106 FILE *fp;
1107 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1108 int linenum = 0;
1109 sds line = NULL;
1110
1111 if (filename[0] == '-' && filename[1] == '\0')
1112 fp = stdin;
1113 else {
1114 if ((fp = fopen(filename,"r")) == NULL) {
1115 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1116 exit(1);
1117 }
1118 }
1119
1120 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1121 sds *argv;
1122 int argc, j;
1123
1124 linenum++;
1125 line = sdsnew(buf);
1126 line = sdstrim(line," \t\r\n");
1127
1128 /* Skip comments and blank lines*/
1129 if (line[0] == '#' || line[0] == '\0') {
1130 sdsfree(line);
1131 continue;
1132 }
1133
1134 /* Split into arguments */
1135 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1136 sdstolower(argv[0]);
1137
1138 /* Execute config directives */
1139 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1140 server.maxidletime = atoi(argv[1]);
1141 if (server.maxidletime < 0) {
1142 err = "Invalid timeout value"; goto loaderr;
1143 }
1144 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1145 server.port = atoi(argv[1]);
1146 if (server.port < 1 || server.port > 65535) {
1147 err = "Invalid port"; goto loaderr;
1148 }
1149 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1150 server.bindaddr = zstrdup(argv[1]);
1151 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1152 int seconds = atoi(argv[1]);
1153 int changes = atoi(argv[2]);
1154 if (seconds < 1 || changes < 0) {
1155 err = "Invalid save parameters"; goto loaderr;
1156 }
1157 appendServerSaveParams(seconds,changes);
1158 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1159 if (chdir(argv[1]) == -1) {
1160 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1161 argv[1], strerror(errno));
1162 exit(1);
1163 }
1164 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1165 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1166 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1167 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1168 else {
1169 err = "Invalid log level. Must be one of debug, notice, warning";
1170 goto loaderr;
1171 }
1172 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1173 FILE *logfp;
1174
1175 server.logfile = zstrdup(argv[1]);
1176 if (!strcasecmp(server.logfile,"stdout")) {
1177 zfree(server.logfile);
1178 server.logfile = NULL;
1179 }
1180 if (server.logfile) {
1181 /* Test if we are able to open the file. The server will not
1182 * be able to abort just for this problem later... */
1183 logfp = fopen(server.logfile,"a");
1184 if (logfp == NULL) {
1185 err = sdscatprintf(sdsempty(),
1186 "Can't open the log file: %s", strerror(errno));
1187 goto loaderr;
1188 }
1189 fclose(logfp);
1190 }
1191 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1192 server.dbnum = atoi(argv[1]);
1193 if (server.dbnum < 1) {
1194 err = "Invalid number of databases"; goto loaderr;
1195 }
1196 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1197 server.maxclients = atoi(argv[1]);
1198 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1199 server.maxmemory = strtoll(argv[1], NULL, 10);
1200 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1201 server.masterhost = sdsnew(argv[1]);
1202 server.masterport = atoi(argv[2]);
1203 server.replstate = REDIS_REPL_CONNECT;
1204 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1205 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1206 err = "argument must be 'yes' or 'no'"; goto loaderr;
1207 }
1208 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1209 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1210 err = "argument must be 'yes' or 'no'"; goto loaderr;
1211 }
1212 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1213 server.sharingpoolsize = atoi(argv[1]);
1214 if (server.sharingpoolsize < 1) {
1215 err = "invalid object sharing pool size"; goto loaderr;
1216 }
1217 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1218 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1219 err = "argument must be 'yes' or 'no'"; goto loaderr;
1220 }
1221 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1222 server.requirepass = zstrdup(argv[1]);
1223 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1224 server.pidfile = zstrdup(argv[1]);
1225 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1226 server.dbfilename = zstrdup(argv[1]);
1227 } else {
1228 err = "Bad directive or wrong number of arguments"; goto loaderr;
1229 }
1230 for (j = 0; j < argc; j++)
1231 sdsfree(argv[j]);
1232 zfree(argv);
1233 sdsfree(line);
1234 }
1235 if (fp != stdin) fclose(fp);
1236 return;
1237
1238 loaderr:
1239 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1240 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1241 fprintf(stderr, ">>> '%s'\n", line);
1242 fprintf(stderr, "%s\n", err);
1243 exit(1);
1244 }
1245
1246 static void freeClientArgv(redisClient *c) {
1247 int j;
1248
1249 for (j = 0; j < c->argc; j++)
1250 decrRefCount(c->argv[j]);
1251 for (j = 0; j < c->mbargc; j++)
1252 decrRefCount(c->mbargv[j]);
1253 c->argc = 0;
1254 c->mbargc = 0;
1255 }
1256
1257 static void freeClient(redisClient *c) {
1258 listNode *ln;
1259
1260 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1261 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1262 sdsfree(c->querybuf);
1263 listRelease(c->reply);
1264 freeClientArgv(c);
1265 close(c->fd);
1266 ln = listSearchKey(server.clients,c);
1267 assert(ln != NULL);
1268 listDelNode(server.clients,ln);
1269 if (c->flags & REDIS_SLAVE) {
1270 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1271 close(c->repldbfd);
1272 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1273 ln = listSearchKey(l,c);
1274 assert(ln != NULL);
1275 listDelNode(l,ln);
1276 }
1277 if (c->flags & REDIS_MASTER) {
1278 server.master = NULL;
1279 server.replstate = REDIS_REPL_CONNECT;
1280 }
1281 zfree(c->argv);
1282 zfree(c->mbargv);
1283 zfree(c);
1284 }
1285
1286 static void glueReplyBuffersIfNeeded(redisClient *c) {
1287 int totlen = 0;
1288 listNode *ln;
1289 robj *o;
1290
1291 listRewind(c->reply);
1292 while((ln = listYield(c->reply))) {
1293 o = ln->value;
1294 totlen += sdslen(o->ptr);
1295 /* This optimization makes more sense if we don't have to copy
1296 * too much data */
1297 if (totlen > 1024) return;
1298 }
1299 if (totlen > 0) {
1300 char buf[1024];
1301 int copylen = 0;
1302
1303 listRewind(c->reply);
1304 while((ln = listYield(c->reply))) {
1305 o = ln->value;
1306 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1307 copylen += sdslen(o->ptr);
1308 listDelNode(c->reply,ln);
1309 }
1310 /* Now the output buffer is empty, add the new single element */
1311 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1312 listAddNodeTail(c->reply,o);
1313 }
1314 }
1315
1316 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1317 redisClient *c = privdata;
1318 int nwritten = 0, totwritten = 0, objlen;
1319 robj *o;
1320 REDIS_NOTUSED(el);
1321 REDIS_NOTUSED(mask);
1322
1323 if (server.glueoutputbuf && listLength(c->reply) > 1)
1324 glueReplyBuffersIfNeeded(c);
1325 while(listLength(c->reply)) {
1326 o = listNodeValue(listFirst(c->reply));
1327 objlen = sdslen(o->ptr);
1328
1329 if (objlen == 0) {
1330 listDelNode(c->reply,listFirst(c->reply));
1331 continue;
1332 }
1333
1334 if (c->flags & REDIS_MASTER) {
1335 /* Don't reply to a master */
1336 nwritten = objlen - c->sentlen;
1337 } else {
1338 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1339 if (nwritten <= 0) break;
1340 }
1341 c->sentlen += nwritten;
1342 totwritten += nwritten;
1343 /* If we fully sent the object on head go to the next one */
1344 if (c->sentlen == objlen) {
1345 listDelNode(c->reply,listFirst(c->reply));
1346 c->sentlen = 0;
1347 }
1348 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1349 * bytes, in a single threaded server it's a good idea to server
1350 * other clients as well, even if a very large request comes from
1351 * super fast link that is always able to accept data (in real world
1352 * terms think to 'KEYS *' against the loopback interfae) */
1353 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1354 }
1355 if (nwritten == -1) {
1356 if (errno == EAGAIN) {
1357 nwritten = 0;
1358 } else {
1359 redisLog(REDIS_DEBUG,
1360 "Error writing to client: %s", strerror(errno));
1361 freeClient(c);
1362 return;
1363 }
1364 }
1365 if (totwritten > 0) c->lastinteraction = time(NULL);
1366 if (listLength(c->reply) == 0) {
1367 c->sentlen = 0;
1368 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1369 }
1370 }
1371
1372 static struct redisCommand *lookupCommand(char *name) {
1373 int j = 0;
1374 while(cmdTable[j].name != NULL) {
1375 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1376 j++;
1377 }
1378 return NULL;
1379 }
1380
1381 /* resetClient prepare the client to process the next command */
1382 static void resetClient(redisClient *c) {
1383 freeClientArgv(c);
1384 c->bulklen = -1;
1385 c->multibulk = 0;
1386 }
1387
1388 /* If this function gets called we already read a whole
1389 * command, argments are in the client argv/argc fields.
1390 * processCommand() execute the command or prepare the
1391 * server for a bulk read from the client.
1392 *
1393 * If 1 is returned the client is still alive and valid and
1394 * and other operations can be performed by the caller. Otherwise
1395 * if 0 is returned the client was destroied (i.e. after QUIT). */
1396 static int processCommand(redisClient *c) {
1397 struct redisCommand *cmd;
1398 long long dirty;
1399
1400 /* Free some memory if needed (maxmemory setting) */
1401 if (server.maxmemory) freeMemoryIfNeeded();
1402
1403 /* Handle the multi bulk command type. This is an alternative protocol
1404 * supported by Redis in order to receive commands that are composed of
1405 * multiple binary-safe "bulk" arguments. The latency of processing is
1406 * a bit higher but this allows things like multi-sets, so if this
1407 * protocol is used only for MSET and similar commands this is a big win. */
1408 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1409 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1410 if (c->multibulk <= 0) {
1411 resetClient(c);
1412 return 1;
1413 } else {
1414 decrRefCount(c->argv[c->argc-1]);
1415 c->argc--;
1416 return 1;
1417 }
1418 } else if (c->multibulk) {
1419 if (c->bulklen == -1) {
1420 if (((char*)c->argv[0]->ptr)[0] != '$') {
1421 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1422 resetClient(c);
1423 return 1;
1424 } else {
1425 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1426 decrRefCount(c->argv[0]);
1427 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1428 c->argc--;
1429 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1430 resetClient(c);
1431 return 1;
1432 }
1433 c->argc--;
1434 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1435 return 1;
1436 }
1437 } else {
1438 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1439 c->mbargv[c->mbargc] = c->argv[0];
1440 c->mbargc++;
1441 c->argc--;
1442 c->multibulk--;
1443 if (c->multibulk == 0) {
1444 robj **auxargv;
1445 int auxargc;
1446
1447 /* Here we need to swap the multi-bulk argc/argv with the
1448 * normal argc/argv of the client structure. */
1449 auxargv = c->argv;
1450 c->argv = c->mbargv;
1451 c->mbargv = auxargv;
1452
1453 auxargc = c->argc;
1454 c->argc = c->mbargc;
1455 c->mbargc = auxargc;
1456
1457 /* We need to set bulklen to something different than -1
1458 * in order for the code below to process the command without
1459 * to try to read the last argument of a bulk command as
1460 * a special argument. */
1461 c->bulklen = 0;
1462 /* continue below and process the command */
1463 } else {
1464 c->bulklen = -1;
1465 return 1;
1466 }
1467 }
1468 }
1469 /* -- end of multi bulk commands processing -- */
1470
1471 /* The QUIT command is handled as a special case. Normal command
1472 * procs are unable to close the client connection safely */
1473 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1474 freeClient(c);
1475 return 0;
1476 }
1477 cmd = lookupCommand(c->argv[0]->ptr);
1478 if (!cmd) {
1479 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1480 resetClient(c);
1481 return 1;
1482 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1483 (c->argc < -cmd->arity)) {
1484 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1485 resetClient(c);
1486 return 1;
1487 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1488 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1489 resetClient(c);
1490 return 1;
1491 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1492 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1493
1494 decrRefCount(c->argv[c->argc-1]);
1495 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1496 c->argc--;
1497 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1498 resetClient(c);
1499 return 1;
1500 }
1501 c->argc--;
1502 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1503 /* It is possible that the bulk read is already in the
1504 * buffer. Check this condition and handle it accordingly.
1505 * This is just a fast path, alternative to call processInputBuffer().
1506 * It's a good idea since the code is small and this condition
1507 * happens most of the times. */
1508 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1509 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1510 c->argc++;
1511 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1512 } else {
1513 return 1;
1514 }
1515 }
1516 /* Let's try to share objects on the command arguments vector */
1517 if (server.shareobjects) {
1518 int j;
1519 for(j = 1; j < c->argc; j++)
1520 c->argv[j] = tryObjectSharing(c->argv[j]);
1521 }
1522 /* Let's try to encode the bulk object to save space. */
1523 if (cmd->flags & REDIS_CMD_BULK)
1524 tryObjectEncoding(c->argv[c->argc-1]);
1525
1526 /* Check if the user is authenticated */
1527 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1528 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1529 resetClient(c);
1530 return 1;
1531 }
1532
1533 /* Exec the command */
1534 dirty = server.dirty;
1535 cmd->proc(c);
1536 if (server.dirty-dirty != 0 && listLength(server.slaves))
1537 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1538 if (listLength(server.monitors))
1539 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1540 server.stat_numcommands++;
1541
1542 /* Prepare the client for the next command */
1543 if (c->flags & REDIS_CLOSE) {
1544 freeClient(c);
1545 return 0;
1546 }
1547 resetClient(c);
1548 return 1;
1549 }
1550
1551 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1552 listNode *ln;
1553 int outc = 0, j;
1554 robj **outv;
1555 /* (args*2)+1 is enough room for args, spaces, newlines */
1556 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1557
1558 if (argc <= REDIS_STATIC_ARGS) {
1559 outv = static_outv;
1560 } else {
1561 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1562 }
1563
1564 for (j = 0; j < argc; j++) {
1565 if (j != 0) outv[outc++] = shared.space;
1566 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1567 robj *lenobj;
1568
1569 lenobj = createObject(REDIS_STRING,
1570 sdscatprintf(sdsempty(),"%d\r\n",
1571 stringObjectLen(argv[j])));
1572 lenobj->refcount = 0;
1573 outv[outc++] = lenobj;
1574 }
1575 outv[outc++] = argv[j];
1576 }
1577 outv[outc++] = shared.crlf;
1578
1579 /* Increment all the refcounts at start and decrement at end in order to
1580 * be sure to free objects if there is no slave in a replication state
1581 * able to be feed with commands */
1582 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1583 listRewind(slaves);
1584 while((ln = listYield(slaves))) {
1585 redisClient *slave = ln->value;
1586
1587 /* Don't feed slaves that are still waiting for BGSAVE to start */
1588 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1589
1590 /* Feed all the other slaves, MONITORs and so on */
1591 if (slave->slaveseldb != dictid) {
1592 robj *selectcmd;
1593
1594 switch(dictid) {
1595 case 0: selectcmd = shared.select0; break;
1596 case 1: selectcmd = shared.select1; break;
1597 case 2: selectcmd = shared.select2; break;
1598 case 3: selectcmd = shared.select3; break;
1599 case 4: selectcmd = shared.select4; break;
1600 case 5: selectcmd = shared.select5; break;
1601 case 6: selectcmd = shared.select6; break;
1602 case 7: selectcmd = shared.select7; break;
1603 case 8: selectcmd = shared.select8; break;
1604 case 9: selectcmd = shared.select9; break;
1605 default:
1606 selectcmd = createObject(REDIS_STRING,
1607 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1608 selectcmd->refcount = 0;
1609 break;
1610 }
1611 addReply(slave,selectcmd);
1612 slave->slaveseldb = dictid;
1613 }
1614 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1615 }
1616 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1617 if (outv != static_outv) zfree(outv);
1618 }
1619
1620 static void processInputBuffer(redisClient *c) {
1621 again:
1622 if (c->bulklen == -1) {
1623 /* Read the first line of the query */
1624 char *p = strchr(c->querybuf,'\n');
1625 size_t querylen;
1626
1627 if (p) {
1628 sds query, *argv;
1629 int argc, j;
1630
1631 query = c->querybuf;
1632 c->querybuf = sdsempty();
1633 querylen = 1+(p-(query));
1634 if (sdslen(query) > querylen) {
1635 /* leave data after the first line of the query in the buffer */
1636 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1637 }
1638 *p = '\0'; /* remove "\n" */
1639 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1640 sdsupdatelen(query);
1641
1642 /* Now we can split the query in arguments */
1643 if (sdslen(query) == 0) {
1644 /* Ignore empty query */
1645 sdsfree(query);
1646 return;
1647 }
1648 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1649 sdsfree(query);
1650
1651 if (c->argv) zfree(c->argv);
1652 c->argv = zmalloc(sizeof(robj*)*argc);
1653
1654 for (j = 0; j < argc; j++) {
1655 if (sdslen(argv[j])) {
1656 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1657 c->argc++;
1658 } else {
1659 sdsfree(argv[j]);
1660 }
1661 }
1662 zfree(argv);
1663 /* Execute the command. If the client is still valid
1664 * after processCommand() return and there is something
1665 * on the query buffer try to process the next command. */
1666 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1667 return;
1668 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1669 redisLog(REDIS_DEBUG, "Client protocol error");
1670 freeClient(c);
1671 return;
1672 }
1673 } else {
1674 /* Bulk read handling. Note that if we are at this point
1675 the client already sent a command terminated with a newline,
1676 we are reading the bulk data that is actually the last
1677 argument of the command. */
1678 int qbl = sdslen(c->querybuf);
1679
1680 if (c->bulklen <= qbl) {
1681 /* Copy everything but the final CRLF as final argument */
1682 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1683 c->argc++;
1684 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1685 /* Process the command. If the client is still valid after
1686 * the processing and there is more data in the buffer
1687 * try to parse it. */
1688 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1689 return;
1690 }
1691 }
1692 }
1693
1694 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1695 redisClient *c = (redisClient*) privdata;
1696 char buf[REDIS_IOBUF_LEN];
1697 int nread;
1698 REDIS_NOTUSED(el);
1699 REDIS_NOTUSED(mask);
1700
1701 nread = read(fd, buf, REDIS_IOBUF_LEN);
1702 if (nread == -1) {
1703 if (errno == EAGAIN) {
1704 nread = 0;
1705 } else {
1706 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1707 freeClient(c);
1708 return;
1709 }
1710 } else if (nread == 0) {
1711 redisLog(REDIS_DEBUG, "Client closed connection");
1712 freeClient(c);
1713 return;
1714 }
1715 if (nread) {
1716 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1717 c->lastinteraction = time(NULL);
1718 } else {
1719 return;
1720 }
1721 processInputBuffer(c);
1722 }
1723
1724 static int selectDb(redisClient *c, int id) {
1725 if (id < 0 || id >= server.dbnum)
1726 return REDIS_ERR;
1727 c->db = &server.db[id];
1728 return REDIS_OK;
1729 }
1730
1731 static void *dupClientReplyValue(void *o) {
1732 incrRefCount((robj*)o);
1733 return 0;
1734 }
1735
1736 static redisClient *createClient(int fd) {
1737 redisClient *c = zmalloc(sizeof(*c));
1738
1739 anetNonBlock(NULL,fd);
1740 anetTcpNoDelay(NULL,fd);
1741 if (!c) return NULL;
1742 selectDb(c,0);
1743 c->fd = fd;
1744 c->querybuf = sdsempty();
1745 c->argc = 0;
1746 c->argv = NULL;
1747 c->bulklen = -1;
1748 c->multibulk = 0;
1749 c->mbargc = 0;
1750 c->mbargv = NULL;
1751 c->sentlen = 0;
1752 c->flags = 0;
1753 c->lastinteraction = time(NULL);
1754 c->authenticated = 0;
1755 c->replstate = REDIS_REPL_NONE;
1756 c->reply = listCreate();
1757 listSetFreeMethod(c->reply,decrRefCount);
1758 listSetDupMethod(c->reply,dupClientReplyValue);
1759 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1760 readQueryFromClient, c, NULL) == AE_ERR) {
1761 freeClient(c);
1762 return NULL;
1763 }
1764 listAddNodeTail(server.clients,c);
1765 return c;
1766 }
1767
1768 static void addReply(redisClient *c, robj *obj) {
1769 if (listLength(c->reply) == 0 &&
1770 (c->replstate == REDIS_REPL_NONE ||
1771 c->replstate == REDIS_REPL_ONLINE) &&
1772 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1773 sendReplyToClient, c, NULL) == AE_ERR) return;
1774 if (obj->encoding != REDIS_ENCODING_RAW) {
1775 obj = getDecodedObject(obj);
1776 } else {
1777 incrRefCount(obj);
1778 }
1779 listAddNodeTail(c->reply,obj);
1780 }
1781
1782 static void addReplySds(redisClient *c, sds s) {
1783 robj *o = createObject(REDIS_STRING,s);
1784 addReply(c,o);
1785 decrRefCount(o);
1786 }
1787
1788 static void addReplyBulkLen(redisClient *c, robj *obj) {
1789 size_t len;
1790
1791 if (obj->encoding == REDIS_ENCODING_RAW) {
1792 len = sdslen(obj->ptr);
1793 } else {
1794 long n = (long)obj->ptr;
1795
1796 len = 1;
1797 if (n < 0) {
1798 len++;
1799 n = -n;
1800 }
1801 while((n = n/10) != 0) {
1802 len++;
1803 }
1804 }
1805 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1806 }
1807
1808 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1809 int cport, cfd;
1810 char cip[128];
1811 redisClient *c;
1812 REDIS_NOTUSED(el);
1813 REDIS_NOTUSED(mask);
1814 REDIS_NOTUSED(privdata);
1815
1816 cfd = anetAccept(server.neterr, fd, cip, &cport);
1817 if (cfd == AE_ERR) {
1818 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1819 return;
1820 }
1821 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1822 if ((c = createClient(cfd)) == NULL) {
1823 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1824 close(cfd); /* May be already closed, just ingore errors */
1825 return;
1826 }
1827 /* If maxclient directive is set and this is one client more... close the
1828 * connection. Note that we create the client instead to check before
1829 * for this condition, since now the socket is already set in nonblocking
1830 * mode and we can send an error for free using the Kernel I/O */
1831 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1832 char *err = "-ERR max number of clients reached\r\n";
1833
1834 /* That's a best effort error message, don't check write errors */
1835 (void) write(c->fd,err,strlen(err));
1836 freeClient(c);
1837 return;
1838 }
1839 server.stat_numconnections++;
1840 }
1841
1842 /* ======================= Redis objects implementation ===================== */
1843
1844 static robj *createObject(int type, void *ptr) {
1845 robj *o;
1846
1847 if (listLength(server.objfreelist)) {
1848 listNode *head = listFirst(server.objfreelist);
1849 o = listNodeValue(head);
1850 listDelNode(server.objfreelist,head);
1851 } else {
1852 o = zmalloc(sizeof(*o));
1853 }
1854 o->type = type;
1855 o->encoding = REDIS_ENCODING_RAW;
1856 o->ptr = ptr;
1857 o->refcount = 1;
1858 return o;
1859 }
1860
1861 static robj *createStringObject(char *ptr, size_t len) {
1862 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1863 }
1864
1865 static robj *createListObject(void) {
1866 list *l = listCreate();
1867
1868 listSetFreeMethod(l,decrRefCount);
1869 return createObject(REDIS_LIST,l);
1870 }
1871
1872 static robj *createSetObject(void) {
1873 dict *d = dictCreate(&setDictType,NULL);
1874 return createObject(REDIS_SET,d);
1875 }
1876
1877 static robj *createZsetObject(void) {
1878 zset *zs = zmalloc(sizeof(*zs));
1879
1880 zs->dict = dictCreate(&zsetDictType,NULL);
1881 zs->zsl = zslCreate();
1882 return createObject(REDIS_ZSET,zs);
1883 }
1884
1885 static void freeStringObject(robj *o) {
1886 if (o->encoding == REDIS_ENCODING_RAW) {
1887 sdsfree(o->ptr);
1888 }
1889 }
1890
1891 static void freeListObject(robj *o) {
1892 listRelease((list*) o->ptr);
1893 }
1894
1895 static void freeSetObject(robj *o) {
1896 dictRelease((dict*) o->ptr);
1897 }
1898
1899 static void freeZsetObject(robj *o) {
1900 zset *zs = o->ptr;
1901
1902 dictRelease(zs->dict);
1903 zslFree(zs->zsl);
1904 zfree(zs);
1905 }
1906
1907 static void freeHashObject(robj *o) {
1908 dictRelease((dict*) o->ptr);
1909 }
1910
1911 static void incrRefCount(robj *o) {
1912 o->refcount++;
1913 #ifdef DEBUG_REFCOUNT
1914 if (o->type == REDIS_STRING)
1915 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1916 #endif
1917 }
1918
1919 static void decrRefCount(void *obj) {
1920 robj *o = obj;
1921
1922 #ifdef DEBUG_REFCOUNT
1923 if (o->type == REDIS_STRING)
1924 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1925 #endif
1926 if (--(o->refcount) == 0) {
1927 switch(o->type) {
1928 case REDIS_STRING: freeStringObject(o); break;
1929 case REDIS_LIST: freeListObject(o); break;
1930 case REDIS_SET: freeSetObject(o); break;
1931 case REDIS_ZSET: freeZsetObject(o); break;
1932 case REDIS_HASH: freeHashObject(o); break;
1933 default: assert(0 != 0); break;
1934 }
1935 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1936 !listAddNodeHead(server.objfreelist,o))
1937 zfree(o);
1938 }
1939 }
1940
1941 static robj *lookupKey(redisDb *db, robj *key) {
1942 dictEntry *de = dictFind(db->dict,key);
1943 return de ? dictGetEntryVal(de) : NULL;
1944 }
1945
1946 static robj *lookupKeyRead(redisDb *db, robj *key) {
1947 expireIfNeeded(db,key);
1948 return lookupKey(db,key);
1949 }
1950
1951 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1952 deleteIfVolatile(db,key);
1953 return lookupKey(db,key);
1954 }
1955
1956 static int deleteKey(redisDb *db, robj *key) {
1957 int retval;
1958
1959 /* We need to protect key from destruction: after the first dictDelete()
1960 * it may happen that 'key' is no longer valid if we don't increment
1961 * it's count. This may happen when we get the object reference directly
1962 * from the hash table with dictRandomKey() or dict iterators */
1963 incrRefCount(key);
1964 if (dictSize(db->expires)) dictDelete(db->expires,key);
1965 retval = dictDelete(db->dict,key);
1966 decrRefCount(key);
1967
1968 return retval == DICT_OK;
1969 }
1970
1971 /* Try to share an object against the shared objects pool */
1972 static robj *tryObjectSharing(robj *o) {
1973 struct dictEntry *de;
1974 unsigned long c;
1975
1976 if (o == NULL || server.shareobjects == 0) return o;
1977
1978 assert(o->type == REDIS_STRING);
1979 de = dictFind(server.sharingpool,o);
1980 if (de) {
1981 robj *shared = dictGetEntryKey(de);
1982
1983 c = ((unsigned long) dictGetEntryVal(de))+1;
1984 dictGetEntryVal(de) = (void*) c;
1985 incrRefCount(shared);
1986 decrRefCount(o);
1987 return shared;
1988 } else {
1989 /* Here we are using a stream algorihtm: Every time an object is
1990 * shared we increment its count, everytime there is a miss we
1991 * recrement the counter of a random object. If this object reaches
1992 * zero we remove the object and put the current object instead. */
1993 if (dictSize(server.sharingpool) >=
1994 server.sharingpoolsize) {
1995 de = dictGetRandomKey(server.sharingpool);
1996 assert(de != NULL);
1997 c = ((unsigned long) dictGetEntryVal(de))-1;
1998 dictGetEntryVal(de) = (void*) c;
1999 if (c == 0) {
2000 dictDelete(server.sharingpool,de->key);
2001 }
2002 } else {
2003 c = 0; /* If the pool is empty we want to add this object */
2004 }
2005 if (c == 0) {
2006 int retval;
2007
2008 retval = dictAdd(server.sharingpool,o,(void*)1);
2009 assert(retval == DICT_OK);
2010 incrRefCount(o);
2011 }
2012 return o;
2013 }
2014 }
2015
2016 /* Check if the nul-terminated string 's' can be represented by a long
2017 * (that is, is a number that fits into long without any other space or
2018 * character before or after the digits).
2019 *
2020 * If so, the function returns REDIS_OK and *longval is set to the value
2021 * of the number. Otherwise REDIS_ERR is returned */
2022 static int isStringRepresentableAsLong(sds s, long *longval) {
2023 char buf[32], *endptr;
2024 long value;
2025 int slen;
2026
2027 value = strtol(s, &endptr, 10);
2028 if (endptr[0] != '\0') return REDIS_ERR;
2029 slen = snprintf(buf,32,"%ld",value);
2030
2031 /* If the number converted back into a string is not identical
2032 * then it's not possible to encode the string as integer */
2033 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2034 if (longval) *longval = value;
2035 return REDIS_OK;
2036 }
2037
2038 /* Try to encode a string object in order to save space */
2039 static int tryObjectEncoding(robj *o) {
2040 long value;
2041 sds s = o->ptr;
2042
2043 if (o->encoding != REDIS_ENCODING_RAW)
2044 return REDIS_ERR; /* Already encoded */
2045
2046 /* It's not save to encode shared objects: shared objects can be shared
2047 * everywhere in the "object space" of Redis. Encoded objects can only
2048 * appear as "values" (and not, for instance, as keys) */
2049 if (o->refcount > 1) return REDIS_ERR;
2050
2051 /* Currently we try to encode only strings */
2052 assert(o->type == REDIS_STRING);
2053
2054 /* Check if we can represent this string as a long integer */
2055 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2056
2057 /* Ok, this object can be encoded */
2058 o->encoding = REDIS_ENCODING_INT;
2059 sdsfree(o->ptr);
2060 o->ptr = (void*) value;
2061 return REDIS_OK;
2062 }
2063
2064 /* Get a decoded version of an encoded object (returned as a new object) */
2065 static robj *getDecodedObject(const robj *o) {
2066 robj *dec;
2067
2068 assert(o->encoding != REDIS_ENCODING_RAW);
2069 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2070 char buf[32];
2071
2072 snprintf(buf,32,"%ld",(long)o->ptr);
2073 dec = createStringObject(buf,strlen(buf));
2074 return dec;
2075 } else {
2076 assert(1 != 1);
2077 }
2078 }
2079
2080 /* Compare two string objects via strcmp() or alike.
2081 * Note that the objects may be integer-encoded. In such a case we
2082 * use snprintf() to get a string representation of the numbers on the stack
2083 * and compare the strings, it's much faster than calling getDecodedObject(). */
2084 static int compareStringObjects(robj *a, robj *b) {
2085 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2086 char bufa[128], bufb[128], *astr, *bstr;
2087 int bothsds = 1;
2088
2089 if (a == b) return 0;
2090 if (a->encoding != REDIS_ENCODING_RAW) {
2091 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2092 astr = bufa;
2093 bothsds = 0;
2094 } else {
2095 astr = a->ptr;
2096 }
2097 if (b->encoding != REDIS_ENCODING_RAW) {
2098 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2099 bstr = bufb;
2100 bothsds = 0;
2101 } else {
2102 bstr = b->ptr;
2103 }
2104 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2105 }
2106
2107 static size_t stringObjectLen(robj *o) {
2108 assert(o->type == REDIS_STRING);
2109 if (o->encoding == REDIS_ENCODING_RAW) {
2110 return sdslen(o->ptr);
2111 } else {
2112 char buf[32];
2113
2114 return snprintf(buf,32,"%ld",(long)o->ptr);
2115 }
2116 }
2117
2118 /*============================ DB saving/loading ============================ */
2119
2120 static int rdbSaveType(FILE *fp, unsigned char type) {
2121 if (fwrite(&type,1,1,fp) == 0) return -1;
2122 return 0;
2123 }
2124
2125 static int rdbSaveTime(FILE *fp, time_t t) {
2126 int32_t t32 = (int32_t) t;
2127 if (fwrite(&t32,4,1,fp) == 0) return -1;
2128 return 0;
2129 }
2130
2131 /* check rdbLoadLen() comments for more info */
2132 static int rdbSaveLen(FILE *fp, uint32_t len) {
2133 unsigned char buf[2];
2134
2135 if (len < (1<<6)) {
2136 /* Save a 6 bit len */
2137 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2138 if (fwrite(buf,1,1,fp) == 0) return -1;
2139 } else if (len < (1<<14)) {
2140 /* Save a 14 bit len */
2141 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2142 buf[1] = len&0xFF;
2143 if (fwrite(buf,2,1,fp) == 0) return -1;
2144 } else {
2145 /* Save a 32 bit len */
2146 buf[0] = (REDIS_RDB_32BITLEN<<6);
2147 if (fwrite(buf,1,1,fp) == 0) return -1;
2148 len = htonl(len);
2149 if (fwrite(&len,4,1,fp) == 0) return -1;
2150 }
2151 return 0;
2152 }
2153
2154 /* String objects in the form "2391" "-100" without any space and with a
2155 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2156 * encoded as integers to save space */
2157 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2158 long long value;
2159 char *endptr, buf[32];
2160
2161 /* Check if it's possible to encode this value as a number */
2162 value = strtoll(s, &endptr, 10);
2163 if (endptr[0] != '\0') return 0;
2164 snprintf(buf,32,"%lld",value);
2165
2166 /* If the number converted back into a string is not identical
2167 * then it's not possible to encode the string as integer */
2168 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2169
2170 /* Finally check if it fits in our ranges */
2171 if (value >= -(1<<7) && value <= (1<<7)-1) {
2172 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2173 enc[1] = value&0xFF;
2174 return 2;
2175 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2176 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2177 enc[1] = value&0xFF;
2178 enc[2] = (value>>8)&0xFF;
2179 return 3;
2180 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2181 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2182 enc[1] = value&0xFF;
2183 enc[2] = (value>>8)&0xFF;
2184 enc[3] = (value>>16)&0xFF;
2185 enc[4] = (value>>24)&0xFF;
2186 return 5;
2187 } else {
2188 return 0;
2189 }
2190 }
2191
2192 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2193 unsigned int comprlen, outlen;
2194 unsigned char byte;
2195 void *out;
2196
2197 /* We require at least four bytes compression for this to be worth it */
2198 outlen = sdslen(obj->ptr)-4;
2199 if (outlen <= 0) return 0;
2200 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2201 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2202 if (comprlen == 0) {
2203 zfree(out);
2204 return 0;
2205 }
2206 /* Data compressed! Let's save it on disk */
2207 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2208 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2209 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2210 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2211 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2212 zfree(out);
2213 return comprlen;
2214
2215 writeerr:
2216 zfree(out);
2217 return -1;
2218 }
2219
2220 /* Save a string objet as [len][data] on disk. If the object is a string
2221 * representation of an integer value we try to safe it in a special form */
2222 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2223 size_t len;
2224 int enclen;
2225
2226 len = sdslen(obj->ptr);
2227
2228 /* Try integer encoding */
2229 if (len <= 11) {
2230 unsigned char buf[5];
2231 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2232 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2233 return 0;
2234 }
2235 }
2236
2237 /* Try LZF compression - under 20 bytes it's unable to compress even
2238 * aaaaaaaaaaaaaaaaaa so skip it */
2239 if (len > 20) {
2240 int retval;
2241
2242 retval = rdbSaveLzfStringObject(fp,obj);
2243 if (retval == -1) return -1;
2244 if (retval > 0) return 0;
2245 /* retval == 0 means data can't be compressed, save the old way */
2246 }
2247
2248 /* Store verbatim */
2249 if (rdbSaveLen(fp,len) == -1) return -1;
2250 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2251 return 0;
2252 }
2253
2254 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2255 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2256 int retval;
2257 robj *dec;
2258
2259 if (obj->encoding != REDIS_ENCODING_RAW) {
2260 dec = getDecodedObject(obj);
2261 retval = rdbSaveStringObjectRaw(fp,dec);
2262 decrRefCount(dec);
2263 return retval;
2264 } else {
2265 return rdbSaveStringObjectRaw(fp,obj);
2266 }
2267 }
2268
2269 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2270 * 8 bit integer specifing the length of the representation.
2271 * This 8 bit integer has special values in order to specify the following
2272 * conditions:
2273 * 253: not a number
2274 * 254: + inf
2275 * 255: - inf
2276 */
2277 static int rdbSaveDoubleValue(FILE *fp, double val) {
2278 unsigned char buf[128];
2279 int len;
2280
2281 if (isnan(val)) {
2282 buf[0] = 253;
2283 len = 1;
2284 } else if (!isfinite(val)) {
2285 len = 1;
2286 buf[0] = (val < 0) ? 255 : 254;
2287 } else {
2288 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2289 buf[0] = strlen((char*)buf);
2290 len = buf[0]+1;
2291 }
2292 if (fwrite(buf,len,1,fp) == 0) return -1;
2293 return 0;
2294 }
2295
2296 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2297 static int rdbSave(char *filename) {
2298 dictIterator *di = NULL;
2299 dictEntry *de;
2300 FILE *fp;
2301 char tmpfile[256];
2302 int j;
2303 time_t now = time(NULL);
2304
2305 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2306 fp = fopen(tmpfile,"w");
2307 if (!fp) {
2308 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2309 return REDIS_ERR;
2310 }
2311 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2312 for (j = 0; j < server.dbnum; j++) {
2313 redisDb *db = server.db+j;
2314 dict *d = db->dict;
2315 if (dictSize(d) == 0) continue;
2316 di = dictGetIterator(d);
2317 if (!di) {
2318 fclose(fp);
2319 return REDIS_ERR;
2320 }
2321
2322 /* Write the SELECT DB opcode */
2323 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2324 if (rdbSaveLen(fp,j) == -1) goto werr;
2325
2326 /* Iterate this DB writing every entry */
2327 while((de = dictNext(di)) != NULL) {
2328 robj *key = dictGetEntryKey(de);
2329 robj *o = dictGetEntryVal(de);
2330 time_t expiretime = getExpire(db,key);
2331
2332 /* Save the expire time */
2333 if (expiretime != -1) {
2334 /* If this key is already expired skip it */
2335 if (expiretime < now) continue;
2336 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2337 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2338 }
2339 /* Save the key and associated value */
2340 if (rdbSaveType(fp,o->type) == -1) goto werr;
2341 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2342 if (o->type == REDIS_STRING) {
2343 /* Save a string value */
2344 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2345 } else if (o->type == REDIS_LIST) {
2346 /* Save a list value */
2347 list *list = o->ptr;
2348 listNode *ln;
2349
2350 listRewind(list);
2351 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2352 while((ln = listYield(list))) {
2353 robj *eleobj = listNodeValue(ln);
2354
2355 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2356 }
2357 } else if (o->type == REDIS_SET) {
2358 /* Save a set value */
2359 dict *set = o->ptr;
2360 dictIterator *di = dictGetIterator(set);
2361 dictEntry *de;
2362
2363 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2364 while((de = dictNext(di)) != NULL) {
2365 robj *eleobj = dictGetEntryKey(de);
2366
2367 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2368 }
2369 dictReleaseIterator(di);
2370 } else if (o->type == REDIS_ZSET) {
2371 /* Save a set value */
2372 zset *zs = o->ptr;
2373 dictIterator *di = dictGetIterator(zs->dict);
2374 dictEntry *de;
2375
2376 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2377 while((de = dictNext(di)) != NULL) {
2378 robj *eleobj = dictGetEntryKey(de);
2379 double *score = dictGetEntryVal(de);
2380
2381 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2382 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2383 }
2384 dictReleaseIterator(di);
2385 } else {
2386 assert(0 != 0);
2387 }
2388 }
2389 dictReleaseIterator(di);
2390 }
2391 /* EOF opcode */
2392 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2393
2394 /* Make sure data will not remain on the OS's output buffers */
2395 fflush(fp);
2396 fsync(fileno(fp));
2397 fclose(fp);
2398
2399 /* Use RENAME to make sure the DB file is changed atomically only
2400 * if the generate DB file is ok. */
2401 if (rename(tmpfile,filename) == -1) {
2402 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2403 unlink(tmpfile);
2404 return REDIS_ERR;
2405 }
2406 redisLog(REDIS_NOTICE,"DB saved on disk");
2407 server.dirty = 0;
2408 server.lastsave = time(NULL);
2409 return REDIS_OK;
2410
2411 werr:
2412 fclose(fp);
2413 unlink(tmpfile);
2414 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2415 if (di) dictReleaseIterator(di);
2416 return REDIS_ERR;
2417 }
2418
2419 static int rdbSaveBackground(char *filename) {
2420 pid_t childpid;
2421
2422 if (server.bgsaveinprogress) return REDIS_ERR;
2423 if ((childpid = fork()) == 0) {
2424 /* Child */
2425 close(server.fd);
2426 if (rdbSave(filename) == REDIS_OK) {
2427 exit(0);
2428 } else {
2429 exit(1);
2430 }
2431 } else {
2432 /* Parent */
2433 if (childpid == -1) {
2434 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2435 strerror(errno));
2436 return REDIS_ERR;
2437 }
2438 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2439 server.bgsaveinprogress = 1;
2440 server.bgsavechildpid = childpid;
2441 return REDIS_OK;
2442 }
2443 return REDIS_OK; /* unreached */
2444 }
2445
2446 static void rdbRemoveTempFile(pid_t childpid) {
2447 char tmpfile[256];
2448
2449 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2450 unlink(tmpfile);
2451 }
2452
2453 static int rdbLoadType(FILE *fp) {
2454 unsigned char type;
2455 if (fread(&type,1,1,fp) == 0) return -1;
2456 return type;
2457 }
2458
2459 static time_t rdbLoadTime(FILE *fp) {
2460 int32_t t32;
2461 if (fread(&t32,4,1,fp) == 0) return -1;
2462 return (time_t) t32;
2463 }
2464
2465 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2466 * of this file for a description of how this are stored on disk.
2467 *
2468 * isencoded is set to 1 if the readed length is not actually a length but
2469 * an "encoding type", check the above comments for more info */
2470 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2471 unsigned char buf[2];
2472 uint32_t len;
2473
2474 if (isencoded) *isencoded = 0;
2475 if (rdbver == 0) {
2476 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2477 return ntohl(len);
2478 } else {
2479 int type;
2480
2481 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2482 type = (buf[0]&0xC0)>>6;
2483 if (type == REDIS_RDB_6BITLEN) {
2484 /* Read a 6 bit len */
2485 return buf[0]&0x3F;
2486 } else if (type == REDIS_RDB_ENCVAL) {
2487 /* Read a 6 bit len encoding type */
2488 if (isencoded) *isencoded = 1;
2489 return buf[0]&0x3F;
2490 } else if (type == REDIS_RDB_14BITLEN) {
2491 /* Read a 14 bit len */
2492 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2493 return ((buf[0]&0x3F)<<8)|buf[1];
2494 } else {
2495 /* Read a 32 bit len */
2496 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2497 return ntohl(len);
2498 }
2499 }
2500 }
2501
2502 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2503 unsigned char enc[4];
2504 long long val;
2505
2506 if (enctype == REDIS_RDB_ENC_INT8) {
2507 if (fread(enc,1,1,fp) == 0) return NULL;
2508 val = (signed char)enc[0];
2509 } else if (enctype == REDIS_RDB_ENC_INT16) {
2510 uint16_t v;
2511 if (fread(enc,2,1,fp) == 0) return NULL;
2512 v = enc[0]|(enc[1]<<8);
2513 val = (int16_t)v;
2514 } else if (enctype == REDIS_RDB_ENC_INT32) {
2515 uint32_t v;
2516 if (fread(enc,4,1,fp) == 0) return NULL;
2517 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2518 val = (int32_t)v;
2519 } else {
2520 val = 0; /* anti-warning */
2521 assert(0!=0);
2522 }
2523 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2524 }
2525
2526 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2527 unsigned int len, clen;
2528 unsigned char *c = NULL;
2529 sds val = NULL;
2530
2531 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2532 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2533 if ((c = zmalloc(clen)) == NULL) goto err;
2534 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2535 if (fread(c,clen,1,fp) == 0) goto err;
2536 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2537 zfree(c);
2538 return createObject(REDIS_STRING,val);
2539 err:
2540 zfree(c);
2541 sdsfree(val);
2542 return NULL;
2543 }
2544
2545 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2546 int isencoded;
2547 uint32_t len;
2548 sds val;
2549
2550 len = rdbLoadLen(fp,rdbver,&isencoded);
2551 if (isencoded) {
2552 switch(len) {
2553 case REDIS_RDB_ENC_INT8:
2554 case REDIS_RDB_ENC_INT16:
2555 case REDIS_RDB_ENC_INT32:
2556 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2557 case REDIS_RDB_ENC_LZF:
2558 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2559 default:
2560 assert(0!=0);
2561 }
2562 }
2563
2564 if (len == REDIS_RDB_LENERR) return NULL;
2565 val = sdsnewlen(NULL,len);
2566 if (len && fread(val,len,1,fp) == 0) {
2567 sdsfree(val);
2568 return NULL;
2569 }
2570 return tryObjectSharing(createObject(REDIS_STRING,val));
2571 }
2572
2573 /* For information about double serialization check rdbSaveDoubleValue() */
2574 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2575 char buf[128];
2576 unsigned char len;
2577
2578 if (fread(&len,1,1,fp) == 0) return -1;
2579 switch(len) {
2580 case 255: *val = R_NegInf; return 0;
2581 case 254: *val = R_PosInf; return 0;
2582 case 253: *val = R_Nan; return 0;
2583 default:
2584 if (fread(buf,len,1,fp) == 0) return -1;
2585 sscanf(buf, "%lg", val);
2586 return 0;
2587 }
2588 }
2589
2590 static int rdbLoad(char *filename) {
2591 FILE *fp;
2592 robj *keyobj = NULL;
2593 uint32_t dbid;
2594 int type, retval, rdbver;
2595 dict *d = server.db[0].dict;
2596 redisDb *db = server.db+0;
2597 char buf[1024];
2598 time_t expiretime = -1, now = time(NULL);
2599
2600 fp = fopen(filename,"r");
2601 if (!fp) return REDIS_ERR;
2602 if (fread(buf,9,1,fp) == 0) goto eoferr;
2603 buf[9] = '\0';
2604 if (memcmp(buf,"REDIS",5) != 0) {
2605 fclose(fp);
2606 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2607 return REDIS_ERR;
2608 }
2609 rdbver = atoi(buf+5);
2610 if (rdbver > 1) {
2611 fclose(fp);
2612 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2613 return REDIS_ERR;
2614 }
2615 while(1) {
2616 robj *o;
2617
2618 /* Read type. */
2619 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2620 if (type == REDIS_EXPIRETIME) {
2621 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2622 /* We read the time so we need to read the object type again */
2623 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2624 }
2625 if (type == REDIS_EOF) break;
2626 /* Handle SELECT DB opcode as a special case */
2627 if (type == REDIS_SELECTDB) {
2628 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2629 goto eoferr;
2630 if (dbid >= (unsigned)server.dbnum) {
2631 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2632 exit(1);
2633 }
2634 db = server.db+dbid;
2635 d = db->dict;
2636 continue;
2637 }
2638 /* Read key */
2639 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2640
2641 if (type == REDIS_STRING) {
2642 /* Read string value */
2643 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2644 tryObjectEncoding(o);
2645 } else if (type == REDIS_LIST || type == REDIS_SET) {
2646 /* Read list/set value */
2647 uint32_t listlen;
2648
2649 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2650 goto eoferr;
2651 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2652 /* Load every single element of the list/set */
2653 while(listlen--) {
2654 robj *ele;
2655
2656 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2657 tryObjectEncoding(ele);
2658 if (type == REDIS_LIST) {
2659 listAddNodeTail((list*)o->ptr,ele);
2660 } else {
2661 dictAdd((dict*)o->ptr,ele,NULL);
2662 }
2663 }
2664 } else if (type == REDIS_ZSET) {
2665 /* Read list/set value */
2666 uint32_t zsetlen;
2667 zset *zs;
2668
2669 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2670 goto eoferr;
2671 o = createZsetObject();
2672 zs = o->ptr;
2673 /* Load every single element of the list/set */
2674 while(zsetlen--) {
2675 robj *ele;
2676 double *score = zmalloc(sizeof(double));
2677
2678 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2679 tryObjectEncoding(ele);
2680 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2681 dictAdd(zs->dict,ele,score);
2682 zslInsert(zs->zsl,*score,ele);
2683 incrRefCount(ele); /* added to skiplist */
2684 }
2685 } else {
2686 assert(0 != 0);
2687 }
2688 /* Add the new object in the hash table */
2689 retval = dictAdd(d,keyobj,o);
2690 if (retval == DICT_ERR) {
2691 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2692 exit(1);
2693 }
2694 /* Set the expire time if needed */
2695 if (expiretime != -1) {
2696 setExpire(db,keyobj,expiretime);
2697 /* Delete this key if already expired */
2698 if (expiretime < now) deleteKey(db,keyobj);
2699 expiretime = -1;
2700 }
2701 keyobj = o = NULL;
2702 }
2703 fclose(fp);
2704 return REDIS_OK;
2705
2706 eoferr: /* unexpected end of file is handled here with a fatal exit */
2707 if (keyobj) decrRefCount(keyobj);
2708 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2709 exit(1);
2710 return REDIS_ERR; /* Just to avoid warning */
2711 }
2712
2713 /*================================== Commands =============================== */
2714
2715 static void authCommand(redisClient *c) {
2716 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2717 c->authenticated = 1;
2718 addReply(c,shared.ok);
2719 } else {
2720 c->authenticated = 0;
2721 addReply(c,shared.err);
2722 }
2723 }
2724
2725 static void pingCommand(redisClient *c) {
2726 addReply(c,shared.pong);
2727 }
2728
2729 static void echoCommand(redisClient *c) {
2730 addReplyBulkLen(c,c->argv[1]);
2731 addReply(c,c->argv[1]);
2732 addReply(c,shared.crlf);
2733 }
2734
2735 /*=================================== Strings =============================== */
2736
2737 static void setGenericCommand(redisClient *c, int nx) {
2738 int retval;
2739
2740 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2741 if (retval == DICT_ERR) {
2742 if (!nx) {
2743 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2744 incrRefCount(c->argv[2]);
2745 } else {
2746 addReply(c,shared.czero);
2747 return;
2748 }
2749 } else {
2750 incrRefCount(c->argv[1]);
2751 incrRefCount(c->argv[2]);
2752 }
2753 server.dirty++;
2754 removeExpire(c->db,c->argv[1]);
2755 addReply(c, nx ? shared.cone : shared.ok);
2756 }
2757
2758 static void setCommand(redisClient *c) {
2759 setGenericCommand(c,0);
2760 }
2761
2762 static void setnxCommand(redisClient *c) {
2763 setGenericCommand(c,1);
2764 }
2765
2766 static void getCommand(redisClient *c) {
2767 robj *o = lookupKeyRead(c->db,c->argv[1]);
2768
2769 if (o == NULL) {
2770 addReply(c,shared.nullbulk);
2771 } else {
2772 if (o->type != REDIS_STRING) {
2773 addReply(c,shared.wrongtypeerr);
2774 } else {
2775 addReplyBulkLen(c,o);
2776 addReply(c,o);
2777 addReply(c,shared.crlf);
2778 }
2779 }
2780 }
2781
2782 static void getsetCommand(redisClient *c) {
2783 getCommand(c);
2784 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2785 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2786 } else {
2787 incrRefCount(c->argv[1]);
2788 }
2789 incrRefCount(c->argv[2]);
2790 server.dirty++;
2791 removeExpire(c->db,c->argv[1]);
2792 }
2793
2794 static void mgetCommand(redisClient *c) {
2795 int j;
2796
2797 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2798 for (j = 1; j < c->argc; j++) {
2799 robj *o = lookupKeyRead(c->db,c->argv[j]);
2800 if (o == NULL) {
2801 addReply(c,shared.nullbulk);
2802 } else {
2803 if (o->type != REDIS_STRING) {
2804 addReply(c,shared.nullbulk);
2805 } else {
2806 addReplyBulkLen(c,o);
2807 addReply(c,o);
2808 addReply(c,shared.crlf);
2809 }
2810 }
2811 }
2812 }
2813
2814 static void incrDecrCommand(redisClient *c, long long incr) {
2815 long long value;
2816 int retval;
2817 robj *o;
2818
2819 o = lookupKeyWrite(c->db,c->argv[1]);
2820 if (o == NULL) {
2821 value = 0;
2822 } else {
2823 if (o->type != REDIS_STRING) {
2824 value = 0;
2825 } else {
2826 char *eptr;
2827
2828 if (o->encoding == REDIS_ENCODING_RAW)
2829 value = strtoll(o->ptr, &eptr, 10);
2830 else if (o->encoding == REDIS_ENCODING_INT)
2831 value = (long)o->ptr;
2832 else
2833 assert(1 != 1);
2834 }
2835 }
2836
2837 value += incr;
2838 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2839 tryObjectEncoding(o);
2840 retval = dictAdd(c->db->dict,c->argv[1],o);
2841 if (retval == DICT_ERR) {
2842 dictReplace(c->db->dict,c->argv[1],o);
2843 removeExpire(c->db,c->argv[1]);
2844 } else {
2845 incrRefCount(c->argv[1]);
2846 }
2847 server.dirty++;
2848 addReply(c,shared.colon);
2849 addReply(c,o);
2850 addReply(c,shared.crlf);
2851 }
2852
2853 static void incrCommand(redisClient *c) {
2854 incrDecrCommand(c,1);
2855 }
2856
2857 static void decrCommand(redisClient *c) {
2858 incrDecrCommand(c,-1);
2859 }
2860
2861 static void incrbyCommand(redisClient *c) {
2862 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2863 incrDecrCommand(c,incr);
2864 }
2865
2866 static void decrbyCommand(redisClient *c) {
2867 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2868 incrDecrCommand(c,-incr);
2869 }
2870
2871 /* ========================= Type agnostic commands ========================= */
2872
2873 static void delCommand(redisClient *c) {
2874 int deleted = 0, j;
2875
2876 for (j = 1; j < c->argc; j++) {
2877 if (deleteKey(c->db,c->argv[j])) {
2878 server.dirty++;
2879 deleted++;
2880 }
2881 }
2882 switch(deleted) {
2883 case 0:
2884 addReply(c,shared.czero);
2885 break;
2886 case 1:
2887 addReply(c,shared.cone);
2888 break;
2889 default:
2890 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2891 break;
2892 }
2893 }
2894
2895 static void existsCommand(redisClient *c) {
2896 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2897 }
2898
2899 static void selectCommand(redisClient *c) {
2900 int id = atoi(c->argv[1]->ptr);
2901
2902 if (selectDb(c,id) == REDIS_ERR) {
2903 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2904 } else {
2905 addReply(c,shared.ok);
2906 }
2907 }
2908
2909 static void randomkeyCommand(redisClient *c) {
2910 dictEntry *de;
2911
2912 while(1) {
2913 de = dictGetRandomKey(c->db->dict);
2914 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2915 }
2916 if (de == NULL) {
2917 addReply(c,shared.plus);
2918 addReply(c,shared.crlf);
2919 } else {
2920 addReply(c,shared.plus);
2921 addReply(c,dictGetEntryKey(de));
2922 addReply(c,shared.crlf);
2923 }
2924 }
2925
2926 static void keysCommand(redisClient *c) {
2927 dictIterator *di;
2928 dictEntry *de;
2929 sds pattern = c->argv[1]->ptr;
2930 int plen = sdslen(pattern);
2931 int numkeys = 0, keyslen = 0;
2932 robj *lenobj = createObject(REDIS_STRING,NULL);
2933
2934 di = dictGetIterator(c->db->dict);
2935 addReply(c,lenobj);
2936 decrRefCount(lenobj);
2937 while((de = dictNext(di)) != NULL) {
2938 robj *keyobj = dictGetEntryKey(de);
2939
2940 sds key = keyobj->ptr;
2941 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2942 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2943 if (expireIfNeeded(c->db,keyobj) == 0) {
2944 if (numkeys != 0)
2945 addReply(c,shared.space);
2946 addReply(c,keyobj);
2947 numkeys++;
2948 keyslen += sdslen(key);
2949 }
2950 }
2951 }
2952 dictReleaseIterator(di);
2953 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2954 addReply(c,shared.crlf);
2955 }
2956
2957 static void dbsizeCommand(redisClient *c) {
2958 addReplySds(c,
2959 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2960 }
2961
2962 static void lastsaveCommand(redisClient *c) {
2963 addReplySds(c,
2964 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2965 }
2966
2967 static void typeCommand(redisClient *c) {
2968 robj *o;
2969 char *type;
2970
2971 o = lookupKeyRead(c->db,c->argv[1]);
2972 if (o == NULL) {
2973 type = "+none";
2974 } else {
2975 switch(o->type) {
2976 case REDIS_STRING: type = "+string"; break;
2977 case REDIS_LIST: type = "+list"; break;
2978 case REDIS_SET: type = "+set"; break;
2979 default: type = "unknown"; break;
2980 }
2981 }
2982 addReplySds(c,sdsnew(type));
2983 addReply(c,shared.crlf);
2984 }
2985
2986 static void saveCommand(redisClient *c) {
2987 if (server.bgsaveinprogress) {
2988 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2989 return;
2990 }
2991 if (rdbSave(server.dbfilename) == REDIS_OK) {
2992 addReply(c,shared.ok);
2993 } else {
2994 addReply(c,shared.err);
2995 }
2996 }
2997
2998 static void bgsaveCommand(redisClient *c) {
2999 if (server.bgsaveinprogress) {
3000 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3001 return;
3002 }
3003 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3004 addReply(c,shared.ok);
3005 } else {
3006 addReply(c,shared.err);
3007 }
3008 }
3009
3010 static void shutdownCommand(redisClient *c) {
3011 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3012 /* Kill the saving child if there is a background saving in progress.
3013 We want to avoid race conditions, for instance our saving child may
3014 overwrite the synchronous saving did by SHUTDOWN. */
3015 if (server.bgsaveinprogress) {
3016 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3017 kill(server.bgsavechildpid,SIGKILL);
3018 rdbRemoveTempFile(server.bgsavechildpid);
3019 }
3020 /* SYNC SAVE */
3021 if (rdbSave(server.dbfilename) == REDIS_OK) {
3022 if (server.daemonize)
3023 unlink(server.pidfile);
3024 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3025 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3026 exit(1);
3027 } else {
3028 /* Ooops.. error saving! The best we can do is to continue operating.
3029 * Note that if there was a background saving process, in the next
3030 * cron() Redis will be notified that the background saving aborted,
3031 * handling special stuff like slaves pending for synchronization... */
3032 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3033 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3034 }
3035 }
3036
3037 static void renameGenericCommand(redisClient *c, int nx) {
3038 robj *o;
3039
3040 /* To use the same key as src and dst is probably an error */
3041 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3042 addReply(c,shared.sameobjecterr);
3043 return;
3044 }
3045
3046 o = lookupKeyWrite(c->db,c->argv[1]);
3047 if (o == NULL) {
3048 addReply(c,shared.nokeyerr);
3049 return;
3050 }
3051 incrRefCount(o);
3052 deleteIfVolatile(c->db,c->argv[2]);
3053 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3054 if (nx) {
3055 decrRefCount(o);
3056 addReply(c,shared.czero);
3057 return;
3058 }
3059 dictReplace(c->db->dict,c->argv[2],o);
3060 } else {
3061 incrRefCount(c->argv[2]);
3062 }
3063 deleteKey(c->db,c->argv[1]);
3064 server.dirty++;
3065 addReply(c,nx ? shared.cone : shared.ok);
3066 }
3067
3068 static void renameCommand(redisClient *c) {
3069 renameGenericCommand(c,0);
3070 }
3071
3072 static void renamenxCommand(redisClient *c) {
3073 renameGenericCommand(c,1);
3074 }
3075
3076 static void moveCommand(redisClient *c) {
3077 robj *o;
3078 redisDb *src, *dst;
3079 int srcid;
3080
3081 /* Obtain source and target DB pointers */
3082 src = c->db;
3083 srcid = c->db->id;
3084 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3085 addReply(c,shared.outofrangeerr);
3086 return;
3087 }
3088 dst = c->db;
3089 selectDb(c,srcid); /* Back to the source DB */
3090
3091 /* If the user is moving using as target the same
3092 * DB as the source DB it is probably an error. */
3093 if (src == dst) {
3094 addReply(c,shared.sameobjecterr);
3095 return;
3096 }
3097
3098 /* Check if the element exists and get a reference */
3099 o = lookupKeyWrite(c->db,c->argv[1]);
3100 if (!o) {
3101 addReply(c,shared.czero);
3102 return;
3103 }
3104
3105 /* Try to add the element to the target DB */
3106 deleteIfVolatile(dst,c->argv[1]);
3107 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3108 addReply(c,shared.czero);
3109 return;
3110 }
3111 incrRefCount(c->argv[1]);
3112 incrRefCount(o);
3113
3114 /* OK! key moved, free the entry in the source DB */
3115 deleteKey(src,c->argv[1]);
3116 server.dirty++;
3117 addReply(c,shared.cone);
3118 }
3119
3120 /* =================================== Lists ================================ */
3121 static void pushGenericCommand(redisClient *c, int where) {
3122 robj *lobj;
3123 list *list;
3124
3125 lobj = lookupKeyWrite(c->db,c->argv[1]);
3126 if (lobj == NULL) {
3127 lobj = createListObject();
3128 list = lobj->ptr;
3129 if (where == REDIS_HEAD) {
3130 listAddNodeHead(list,c->argv[2]);
3131 } else {
3132 listAddNodeTail(list,c->argv[2]);
3133 }
3134 dictAdd(c->db->dict,c->argv[1],lobj);
3135 incrRefCount(c->argv[1]);
3136 incrRefCount(c->argv[2]);
3137 } else {
3138 if (lobj->type != REDIS_LIST) {
3139 addReply(c,shared.wrongtypeerr);
3140 return;
3141 }
3142 list = lobj->ptr;
3143 if (where == REDIS_HEAD) {
3144 listAddNodeHead(list,c->argv[2]);
3145 } else {
3146 listAddNodeTail(list,c->argv[2]);
3147 }
3148 incrRefCount(c->argv[2]);
3149 }
3150 server.dirty++;
3151 addReply(c,shared.ok);
3152 }
3153
3154 static void lpushCommand(redisClient *c) {
3155 pushGenericCommand(c,REDIS_HEAD);
3156 }
3157
3158 static void rpushCommand(redisClient *c) {
3159 pushGenericCommand(c,REDIS_TAIL);
3160 }
3161
3162 static void llenCommand(redisClient *c) {
3163 robj *o;
3164 list *l;
3165
3166 o = lookupKeyRead(c->db,c->argv[1]);
3167 if (o == NULL) {
3168 addReply(c,shared.czero);
3169 return;
3170 } else {
3171 if (o->type != REDIS_LIST) {
3172 addReply(c,shared.wrongtypeerr);
3173 } else {
3174 l = o->ptr;
3175 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3176 }
3177 }
3178 }
3179
3180 static void lindexCommand(redisClient *c) {
3181 robj *o;
3182 int index = atoi(c->argv[2]->ptr);
3183
3184 o = lookupKeyRead(c->db,c->argv[1]);
3185 if (o == NULL) {
3186 addReply(c,shared.nullbulk);
3187 } else {
3188 if (o->type != REDIS_LIST) {
3189 addReply(c,shared.wrongtypeerr);
3190 } else {
3191 list *list = o->ptr;
3192 listNode *ln;
3193
3194 ln = listIndex(list, index);
3195 if (ln == NULL) {
3196 addReply(c,shared.nullbulk);
3197 } else {
3198 robj *ele = listNodeValue(ln);
3199 addReplyBulkLen(c,ele);
3200 addReply(c,ele);
3201 addReply(c,shared.crlf);
3202 }
3203 }
3204 }
3205 }
3206
3207 static void lsetCommand(redisClient *c) {
3208 robj *o;
3209 int index = atoi(c->argv[2]->ptr);
3210
3211 o = lookupKeyWrite(c->db,c->argv[1]);
3212 if (o == NULL) {
3213 addReply(c,shared.nokeyerr);
3214 } else {
3215 if (o->type != REDIS_LIST) {
3216 addReply(c,shared.wrongtypeerr);
3217 } else {
3218 list *list = o->ptr;
3219 listNode *ln;
3220
3221 ln = listIndex(list, index);
3222 if (ln == NULL) {
3223 addReply(c,shared.outofrangeerr);
3224 } else {
3225 robj *ele = listNodeValue(ln);
3226
3227 decrRefCount(ele);
3228 listNodeValue(ln) = c->argv[3];
3229 incrRefCount(c->argv[3]);
3230 addReply(c,shared.ok);
3231 server.dirty++;
3232 }
3233 }
3234 }
3235 }
3236
3237 static void popGenericCommand(redisClient *c, int where) {
3238 robj *o;
3239
3240 o = lookupKeyWrite(c->db,c->argv[1]);
3241 if (o == NULL) {
3242 addReply(c,shared.nullbulk);
3243 } else {
3244 if (o->type != REDIS_LIST) {
3245 addReply(c,shared.wrongtypeerr);
3246 } else {
3247 list *list = o->ptr;
3248 listNode *ln;
3249
3250 if (where == REDIS_HEAD)
3251 ln = listFirst(list);
3252 else
3253 ln = listLast(list);
3254
3255 if (ln == NULL) {
3256 addReply(c,shared.nullbulk);
3257 } else {
3258 robj *ele = listNodeValue(ln);
3259 addReplyBulkLen(c,ele);
3260 addReply(c,ele);
3261 addReply(c,shared.crlf);
3262 listDelNode(list,ln);
3263 server.dirty++;
3264 }
3265 }
3266 }
3267 }
3268
3269 static void lpopCommand(redisClient *c) {
3270 popGenericCommand(c,REDIS_HEAD);
3271 }
3272
3273 static void rpopCommand(redisClient *c) {
3274 popGenericCommand(c,REDIS_TAIL);
3275 }
3276
3277 static void lrangeCommand(redisClient *c) {
3278 robj *o;
3279 int start = atoi(c->argv[2]->ptr);
3280 int end = atoi(c->argv[3]->ptr);
3281
3282 o = lookupKeyRead(c->db,c->argv[1]);
3283 if (o == NULL) {
3284 addReply(c,shared.nullmultibulk);
3285 } else {
3286 if (o->type != REDIS_LIST) {
3287 addReply(c,shared.wrongtypeerr);
3288 } else {
3289 list *list = o->ptr;
3290 listNode *ln;
3291 int llen = listLength(list);
3292 int rangelen, j;
3293 robj *ele;
3294
3295 /* convert negative indexes */
3296 if (start < 0) start = llen+start;
3297 if (end < 0) end = llen+end;
3298 if (start < 0) start = 0;
3299 if (end < 0) end = 0;
3300
3301 /* indexes sanity checks */
3302 if (start > end || start >= llen) {
3303 /* Out of range start or start > end result in empty list */
3304 addReply(c,shared.emptymultibulk);
3305 return;
3306 }
3307 if (end >= llen) end = llen-1;
3308 rangelen = (end-start)+1;
3309
3310 /* Return the result in form of a multi-bulk reply */
3311 ln = listIndex(list, start);
3312 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3313 for (j = 0; j < rangelen; j++) {
3314 ele = listNodeValue(ln);
3315 addReplyBulkLen(c,ele);
3316 addReply(c,ele);
3317 addReply(c,shared.crlf);
3318 ln = ln->next;
3319 }
3320 }
3321 }
3322 }
3323
3324 static void ltrimCommand(redisClient *c) {
3325 robj *o;
3326 int start = atoi(c->argv[2]->ptr);
3327 int end = atoi(c->argv[3]->ptr);
3328
3329 o = lookupKeyWrite(c->db,c->argv[1]);
3330 if (o == NULL) {
3331 addReply(c,shared.nokeyerr);
3332 } else {
3333 if (o->type != REDIS_LIST) {
3334 addReply(c,shared.wrongtypeerr);
3335 } else {
3336 list *list = o->ptr;
3337 listNode *ln;
3338 int llen = listLength(list);
3339 int j, ltrim, rtrim;
3340
3341 /* convert negative indexes */
3342 if (start < 0) start = llen+start;
3343 if (end < 0) end = llen+end;
3344 if (start < 0) start = 0;
3345 if (end < 0) end = 0;
3346
3347 /* indexes sanity checks */
3348 if (start > end || start >= llen) {
3349 /* Out of range start or start > end result in empty list */
3350 ltrim = llen;
3351 rtrim = 0;
3352 } else {
3353 if (end >= llen) end = llen-1;
3354 ltrim = start;
3355 rtrim = llen-end-1;
3356 }
3357
3358 /* Remove list elements to perform the trim */
3359 for (j = 0; j < ltrim; j++) {
3360 ln = listFirst(list);
3361 listDelNode(list,ln);
3362 }
3363 for (j = 0; j < rtrim; j++) {
3364 ln = listLast(list);
3365 listDelNode(list,ln);
3366 }
3367 server.dirty++;
3368 addReply(c,shared.ok);
3369 }
3370 }
3371 }
3372
3373 static void lremCommand(redisClient *c) {
3374 robj *o;
3375
3376 o = lookupKeyWrite(c->db,c->argv[1]);
3377 if (o == NULL) {
3378 addReply(c,shared.czero);
3379 } else {
3380 if (o->type != REDIS_LIST) {
3381 addReply(c,shared.wrongtypeerr);
3382 } else {
3383 list *list = o->ptr;
3384 listNode *ln, *next;
3385 int toremove = atoi(c->argv[2]->ptr);
3386 int removed = 0;
3387 int fromtail = 0;
3388
3389 if (toremove < 0) {
3390 toremove = -toremove;
3391 fromtail = 1;
3392 }
3393 ln = fromtail ? list->tail : list->head;
3394 while (ln) {
3395 robj *ele = listNodeValue(ln);
3396
3397 next = fromtail ? ln->prev : ln->next;
3398 if (compareStringObjects(ele,c->argv[3]) == 0) {
3399 listDelNode(list,ln);
3400 server.dirty++;
3401 removed++;
3402 if (toremove && removed == toremove) break;
3403 }
3404 ln = next;
3405 }
3406 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3407 }
3408 }
3409 }
3410
3411 /* ==================================== Sets ================================ */
3412
3413 static void saddCommand(redisClient *c) {
3414 robj *set;
3415
3416 set = lookupKeyWrite(c->db,c->argv[1]);
3417 if (set == NULL) {
3418 set = createSetObject();
3419 dictAdd(c->db->dict,c->argv[1],set);
3420 incrRefCount(c->argv[1]);
3421 } else {
3422 if (set->type != REDIS_SET) {
3423 addReply(c,shared.wrongtypeerr);
3424 return;
3425 }
3426 }
3427 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3428 incrRefCount(c->argv[2]);
3429 server.dirty++;
3430 addReply(c,shared.cone);
3431 } else {
3432 addReply(c,shared.czero);
3433 }
3434 }
3435
3436 static void sremCommand(redisClient *c) {
3437 robj *set;
3438
3439 set = lookupKeyWrite(c->db,c->argv[1]);
3440 if (set == NULL) {
3441 addReply(c,shared.czero);
3442 } else {
3443 if (set->type != REDIS_SET) {
3444 addReply(c,shared.wrongtypeerr);
3445 return;
3446 }
3447 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3448 server.dirty++;
3449 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3450 addReply(c,shared.cone);
3451 } else {
3452 addReply(c,shared.czero);
3453 }
3454 }
3455 }
3456
3457 static void smoveCommand(redisClient *c) {
3458 robj *srcset, *dstset;
3459
3460 srcset = lookupKeyWrite(c->db,c->argv[1]);
3461 dstset = lookupKeyWrite(c->db,c->argv[2]);
3462
3463 /* If the source key does not exist return 0, if it's of the wrong type
3464 * raise an error */
3465 if (srcset == NULL || srcset->type != REDIS_SET) {
3466 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3467 return;
3468 }
3469 /* Error if the destination key is not a set as well */
3470 if (dstset && dstset->type != REDIS_SET) {
3471 addReply(c,shared.wrongtypeerr);
3472 return;
3473 }
3474 /* Remove the element from the source set */
3475 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3476 /* Key not found in the src set! return zero */
3477 addReply(c,shared.czero);
3478 return;
3479 }
3480 server.dirty++;
3481 /* Add the element to the destination set */
3482 if (!dstset) {
3483 dstset = createSetObject();
3484 dictAdd(c->db->dict,c->argv[2],dstset);
3485 incrRefCount(c->argv[2]);
3486 }
3487 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3488 incrRefCount(c->argv[3]);
3489 addReply(c,shared.cone);
3490 }
3491
3492 static void sismemberCommand(redisClient *c) {
3493 robj *set;
3494
3495 set = lookupKeyRead(c->db,c->argv[1]);
3496 if (set == NULL) {
3497 addReply(c,shared.czero);
3498 } else {
3499 if (set->type != REDIS_SET) {
3500 addReply(c,shared.wrongtypeerr);
3501 return;
3502 }
3503 if (dictFind(set->ptr,c->argv[2]))
3504 addReply(c,shared.cone);
3505 else
3506 addReply(c,shared.czero);
3507 }
3508 }
3509
3510 static void scardCommand(redisClient *c) {
3511 robj *o;
3512 dict *s;
3513
3514 o = lookupKeyRead(c->db,c->argv[1]);
3515 if (o == NULL) {
3516 addReply(c,shared.czero);
3517 return;
3518 } else {
3519 if (o->type != REDIS_SET) {
3520 addReply(c,shared.wrongtypeerr);
3521 } else {
3522 s = o->ptr;
3523 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3524 dictSize(s)));
3525 }
3526 }
3527 }
3528
3529 static void spopCommand(redisClient *c) {
3530 robj *set;
3531 dictEntry *de;
3532
3533 set = lookupKeyWrite(c->db,c->argv[1]);
3534 if (set == NULL) {
3535 addReply(c,shared.nullbulk);
3536 } else {
3537 if (set->type != REDIS_SET) {
3538 addReply(c,shared.wrongtypeerr);
3539 return;
3540 }
3541 de = dictGetRandomKey(set->ptr);
3542 if (de == NULL) {
3543 addReply(c,shared.nullbulk);
3544 } else {
3545 robj *ele = dictGetEntryKey(de);
3546
3547 addReplyBulkLen(c,ele);
3548 addReply(c,ele);
3549 addReply(c,shared.crlf);
3550 dictDelete(set->ptr,ele);
3551 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3552 server.dirty++;
3553 }
3554 }
3555 }
3556
3557 static void srandmemberCommand(redisClient *c) {
3558 robj *set;
3559 dictEntry *de;
3560
3561 set = lookupKeyRead(c->db,c->argv[1]);
3562 if (set == NULL) {
3563 addReply(c,shared.nullbulk);
3564 } else {
3565 if (set->type != REDIS_SET) {
3566 addReply(c,shared.wrongtypeerr);
3567 return;
3568 }
3569 de = dictGetRandomKey(set->ptr);
3570 if (de == NULL) {
3571 addReply(c,shared.nullbulk);
3572 } else {
3573 robj *ele = dictGetEntryKey(de);
3574
3575 addReplyBulkLen(c,ele);
3576 addReply(c,ele);
3577 addReply(c,shared.crlf);
3578 }
3579 }
3580 }
3581
3582 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3583 dict **d1 = (void*) s1, **d2 = (void*) s2;
3584
3585 return dictSize(*d1)-dictSize(*d2);
3586 }
3587
3588 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3589 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3590 dictIterator *di;
3591 dictEntry *de;
3592 robj *lenobj = NULL, *dstset = NULL;
3593 int j, cardinality = 0;
3594
3595 for (j = 0; j < setsnum; j++) {
3596 robj *setobj;
3597
3598 setobj = dstkey ?
3599 lookupKeyWrite(c->db,setskeys[j]) :
3600 lookupKeyRead(c->db,setskeys[j]);
3601 if (!setobj) {
3602 zfree(dv);
3603 if (dstkey) {
3604 deleteKey(c->db,dstkey);
3605 addReply(c,shared.ok);
3606 } else {
3607 addReply(c,shared.nullmultibulk);
3608 }
3609 return;
3610 }
3611 if (setobj->type != REDIS_SET) {
3612 zfree(dv);
3613 addReply(c,shared.wrongtypeerr);
3614 return;
3615 }
3616 dv[j] = setobj->ptr;
3617 }
3618 /* Sort sets from the smallest to largest, this will improve our
3619 * algorithm's performace */
3620 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3621
3622 /* The first thing we should output is the total number of elements...
3623 * since this is a multi-bulk write, but at this stage we don't know
3624 * the intersection set size, so we use a trick, append an empty object
3625 * to the output list and save the pointer to later modify it with the
3626 * right length */
3627 if (!dstkey) {
3628 lenobj = createObject(REDIS_STRING,NULL);
3629 addReply(c,lenobj);
3630 decrRefCount(lenobj);
3631 } else {
3632 /* If we have a target key where to store the resulting set
3633 * create this key with an empty set inside */
3634 dstset = createSetObject();
3635 }
3636
3637 /* Iterate all the elements of the first (smallest) set, and test
3638 * the element against all the other sets, if at least one set does
3639 * not include the element it is discarded */
3640 di = dictGetIterator(dv[0]);
3641
3642 while((de = dictNext(di)) != NULL) {
3643 robj *ele;
3644
3645 for (j = 1; j < setsnum; j++)
3646 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3647 if (j != setsnum)
3648 continue; /* at least one set does not contain the member */
3649 ele = dictGetEntryKey(de);
3650 if (!dstkey) {
3651 addReplyBulkLen(c,ele);
3652 addReply(c,ele);
3653 addReply(c,shared.crlf);
3654 cardinality++;
3655 } else {
3656 dictAdd(dstset->ptr,ele,NULL);
3657 incrRefCount(ele);
3658 }
3659 }
3660 dictReleaseIterator(di);
3661
3662 if (dstkey) {
3663 /* Store the resulting set into the target */
3664 deleteKey(c->db,dstkey);
3665 dictAdd(c->db->dict,dstkey,dstset);
3666 incrRefCount(dstkey);
3667 }
3668
3669 if (!dstkey) {
3670 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3671 } else {
3672 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3673 dictSize((dict*)dstset->ptr)));
3674 server.dirty++;
3675 }
3676 zfree(dv);
3677 }
3678
3679 static void sinterCommand(redisClient *c) {
3680 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3681 }
3682
3683 static void sinterstoreCommand(redisClient *c) {
3684 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3685 }
3686
3687 #define REDIS_OP_UNION 0
3688 #define REDIS_OP_DIFF 1
3689
3690 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3691 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3692 dictIterator *di;
3693 dictEntry *de;
3694 robj *dstset = NULL;
3695 int j, cardinality = 0;
3696
3697 for (j = 0; j < setsnum; j++) {
3698 robj *setobj;
3699
3700 setobj = dstkey ?
3701 lookupKeyWrite(c->db,setskeys[j]) :
3702 lookupKeyRead(c->db,setskeys[j]);
3703 if (!setobj) {
3704 dv[j] = NULL;
3705 continue;
3706 }
3707 if (setobj->type != REDIS_SET) {
3708 zfree(dv);
3709 addReply(c,shared.wrongtypeerr);
3710 return;
3711 }
3712 dv[j] = setobj->ptr;
3713 }
3714
3715 /* We need a temp set object to store our union. If the dstkey
3716 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3717 * this set object will be the resulting object to set into the target key*/
3718 dstset = createSetObject();
3719
3720 /* Iterate all the elements of all the sets, add every element a single
3721 * time to the result set */
3722 for (j = 0; j < setsnum; j++) {
3723 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3724 if (!dv[j]) continue; /* non existing keys are like empty sets */
3725
3726 di = dictGetIterator(dv[j]);
3727
3728 while((de = dictNext(di)) != NULL) {
3729 robj *ele;
3730
3731 /* dictAdd will not add the same element multiple times */
3732 ele = dictGetEntryKey(de);
3733 if (op == REDIS_OP_UNION || j == 0) {
3734 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3735 incrRefCount(ele);
3736 cardinality++;
3737 }
3738 } else if (op == REDIS_OP_DIFF) {
3739 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3740 cardinality--;
3741 }
3742 }
3743 }
3744 dictReleaseIterator(di);
3745
3746 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3747 }
3748
3749 /* Output the content of the resulting set, if not in STORE mode */
3750 if (!dstkey) {
3751 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3752 di = dictGetIterator(dstset->ptr);
3753 while((de = dictNext(di)) != NULL) {
3754 robj *ele;
3755
3756 ele = dictGetEntryKey(de);
3757 addReplyBulkLen(c,ele);
3758 addReply(c,ele);
3759 addReply(c,shared.crlf);
3760 }
3761 dictReleaseIterator(di);
3762 } else {
3763 /* If we have a target key where to store the resulting set
3764 * create this key with the result set inside */
3765 deleteKey(c->db,dstkey);
3766 dictAdd(c->db->dict,dstkey,dstset);
3767 incrRefCount(dstkey);
3768 }
3769
3770 /* Cleanup */
3771 if (!dstkey) {
3772 decrRefCount(dstset);
3773 } else {
3774 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3775 dictSize((dict*)dstset->ptr)));
3776 server.dirty++;
3777 }
3778 zfree(dv);
3779 }
3780
3781 static void sunionCommand(redisClient *c) {
3782 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3783 }
3784
3785 static void sunionstoreCommand(redisClient *c) {
3786 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3787 }
3788
3789 static void sdiffCommand(redisClient *c) {
3790 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3791 }
3792
3793 static void sdiffstoreCommand(redisClient *c) {
3794 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3795 }
3796
3797 /* ==================================== ZSets =============================== */
3798
3799 /* ZSETs are ordered sets using two data structures to hold the same elements
3800 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3801 * data structure.
3802 *
3803 * The elements are added to an hash table mapping Redis objects to scores.
3804 * At the same time the elements are added to a skip list mapping scores
3805 * to Redis objects (so objects are sorted by scores in this "view"). */
3806
3807 /* This skiplist implementation is almost a C translation of the original
3808 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3809 * Alternative to Balanced Trees", modified in three ways:
3810 * a) this implementation allows for repeated values.
3811 * b) the comparison is not just by key (our 'score') but by satellite data.
3812 * c) there is a back pointer, so it's a doubly linked list with the back
3813 * pointers being only at "level 1". This allows to traverse the list
3814 * from tail to head, useful for ZREVRANGE. */
3815
3816 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3817 zskiplistNode *zn = zmalloc(sizeof(*zn));
3818
3819 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3820 zn->score = score;
3821 zn->obj = obj;
3822 return zn;
3823 }
3824
3825 static zskiplist *zslCreate(void) {
3826 int j;
3827 zskiplist *zsl;
3828
3829 zsl = zmalloc(sizeof(*zsl));
3830 zsl->level = 1;
3831 zsl->length = 0;
3832 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3833 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3834 zsl->header->forward[j] = NULL;
3835 zsl->header->backward = NULL;
3836 zsl->tail = NULL;
3837 return zsl;
3838 }
3839
3840 static void zslFreeNode(zskiplistNode *node) {
3841 decrRefCount(node->obj);
3842 zfree(node->forward);
3843 zfree(node);
3844 }
3845
3846 static void zslFree(zskiplist *zsl) {
3847 zskiplistNode *node = zsl->header->forward[0], *next;
3848
3849 zfree(zsl->header->forward);
3850 zfree(zsl->header);
3851 while(node) {
3852 next = node->forward[0];
3853 zslFreeNode(node);
3854 node = next;
3855 }
3856 zfree(zsl);
3857 }
3858
3859 static int zslRandomLevel(void) {
3860 int level = 1;
3861 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3862 level += 1;
3863 return level;
3864 }
3865
3866 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3867 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3868 int i, level;
3869
3870 x = zsl->header;
3871 for (i = zsl->level-1; i >= 0; i--) {
3872 while (x->forward[i] &&
3873 (x->forward[i]->score < score ||
3874 (x->forward[i]->score == score &&
3875 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3876 x = x->forward[i];
3877 update[i] = x;
3878 }
3879 /* we assume the key is not already inside, since we allow duplicated
3880 * scores, and the re-insertion of score and redis object should never
3881 * happpen since the caller of zslInsert() should test in the hash table
3882 * if the element is already inside or not. */
3883 level = zslRandomLevel();
3884 if (level > zsl->level) {
3885 for (i = zsl->level; i < level; i++)
3886 update[i] = zsl->header;
3887 zsl->level = level;
3888 }
3889 x = zslCreateNode(level,score,obj);
3890 for (i = 0; i < level; i++) {
3891 x->forward[i] = update[i]->forward[i];
3892 update[i]->forward[i] = x;
3893 }
3894 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3895 if (x->forward[0])
3896 x->forward[0]->backward = x;
3897 else
3898 zsl->tail = x;
3899 zsl->length++;
3900 }
3901
3902 /* Delete an element with matching score/object from the skiplist. */
3903 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3904 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3905 int i;
3906
3907 x = zsl->header;
3908 for (i = zsl->level-1; i >= 0; i--) {
3909 while (x->forward[i] &&
3910 (x->forward[i]->score < score ||
3911 (x->forward[i]->score == score &&
3912 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3913 x = x->forward[i];
3914 update[i] = x;
3915 }
3916 /* We may have multiple elements with the same score, what we need
3917 * is to find the element with both the right score and object. */
3918 x = x->forward[0];
3919 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3920 for (i = 0; i < zsl->level; i++) {
3921 if (update[i]->forward[i] != x) break;
3922 update[i]->forward[i] = x->forward[i];
3923 }
3924 if (x->forward[0]) {
3925 x->forward[0]->backward = (x->backward == zsl->header) ?
3926 NULL : x->backward;
3927 } else {
3928 zsl->tail = x->backward;
3929 }
3930 zslFreeNode(x);
3931 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3932 zsl->level--;
3933 zsl->length--;
3934 return 1;
3935 } else {
3936 return 0; /* not found */
3937 }
3938 return 0; /* not found */
3939 }
3940
3941 /* Find the first node having a score equal or greater than the specified one.
3942 * Returns NULL if there is no match. */
3943 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
3944 zskiplistNode *x;
3945 int i;
3946
3947 x = zsl->header;
3948 for (i = zsl->level-1; i >= 0; i--) {
3949 while (x->forward[i] && x->forward[i]->score < score)
3950 x = x->forward[i];
3951 }
3952 /* We may have multiple elements with the same score, what we need
3953 * is to find the element with both the right score and object. */
3954 return x->forward[0];
3955 }
3956
3957 /* The actual Z-commands implementations */
3958
3959 static void zaddCommand(redisClient *c) {
3960 robj *zsetobj;
3961 zset *zs;
3962 double *score;
3963
3964 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
3965 if (zsetobj == NULL) {
3966 zsetobj = createZsetObject();
3967 dictAdd(c->db->dict,c->argv[1],zsetobj);
3968 incrRefCount(c->argv[1]);
3969 } else {
3970 if (zsetobj->type != REDIS_ZSET) {
3971 addReply(c,shared.wrongtypeerr);
3972 return;
3973 }
3974 }
3975 score = zmalloc(sizeof(double));
3976 *score = strtod(c->argv[2]->ptr,NULL);
3977 zs = zsetobj->ptr;
3978 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
3979 /* case 1: New element */
3980 incrRefCount(c->argv[3]); /* added to hash */
3981 zslInsert(zs->zsl,*score,c->argv[3]);
3982 incrRefCount(c->argv[3]); /* added to skiplist */
3983 server.dirty++;
3984 addReply(c,shared.cone);
3985 } else {
3986 dictEntry *de;
3987 double *oldscore;
3988
3989 /* case 2: Score update operation */
3990 de = dictFind(zs->dict,c->argv[3]);
3991 assert(de != NULL);
3992 oldscore = dictGetEntryVal(de);
3993 if (*score != *oldscore) {
3994 int deleted;
3995
3996 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
3997 assert(deleted != 0);
3998 zslInsert(zs->zsl,*score,c->argv[3]);
3999 incrRefCount(c->argv[3]);
4000 dictReplace(zs->dict,c->argv[3],score);
4001 server.dirty++;
4002 } else {
4003 zfree(score);
4004 }
4005 addReply(c,shared.czero);
4006 }
4007 }
4008
4009 static void zremCommand(redisClient *c) {
4010 robj *zsetobj;
4011 zset *zs;
4012
4013 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4014 if (zsetobj == NULL) {
4015 addReply(c,shared.czero);
4016 } else {
4017 dictEntry *de;
4018 double *oldscore;
4019 int deleted;
4020
4021 if (zsetobj->type != REDIS_ZSET) {
4022 addReply(c,shared.wrongtypeerr);
4023 return;
4024 }
4025 zs = zsetobj->ptr;
4026 de = dictFind(zs->dict,c->argv[2]);
4027 if (de == NULL) {
4028 addReply(c,shared.czero);
4029 return;
4030 }
4031 /* Delete from the skiplist */
4032 oldscore = dictGetEntryVal(de);
4033 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4034 assert(deleted != 0);
4035
4036 /* Delete from the hash table */
4037 dictDelete(zs->dict,c->argv[2]);
4038 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4039 server.dirty++;
4040 addReply(c,shared.cone);
4041 }
4042 }
4043
4044 static void zrangeGenericCommand(redisClient *c, int reverse) {
4045 robj *o;
4046 int start = atoi(c->argv[2]->ptr);
4047 int end = atoi(c->argv[3]->ptr);
4048
4049 o = lookupKeyRead(c->db,c->argv[1]);
4050 if (o == NULL) {
4051 addReply(c,shared.nullmultibulk);
4052 } else {
4053 if (o->type != REDIS_ZSET) {
4054 addReply(c,shared.wrongtypeerr);
4055 } else {
4056 zset *zsetobj = o->ptr;
4057 zskiplist *zsl = zsetobj->zsl;
4058 zskiplistNode *ln;
4059
4060 int llen = zsl->length;
4061 int rangelen, j;
4062 robj *ele;
4063
4064 /* convert negative indexes */
4065 if (start < 0) start = llen+start;
4066 if (end < 0) end = llen+end;
4067 if (start < 0) start = 0;
4068 if (end < 0) end = 0;
4069
4070 /* indexes sanity checks */
4071 if (start > end || start >= llen) {
4072 /* Out of range start or start > end result in empty list */
4073 addReply(c,shared.emptymultibulk);
4074 return;
4075 }
4076 if (end >= llen) end = llen-1;
4077 rangelen = (end-start)+1;
4078
4079 /* Return the result in form of a multi-bulk reply */
4080 if (reverse) {
4081 ln = zsl->tail;
4082 while (start--)
4083 ln = ln->backward;
4084 } else {
4085 ln = zsl->header->forward[0];
4086 while (start--)
4087 ln = ln->forward[0];
4088 }
4089
4090 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4091 for (j = 0; j < rangelen; j++) {
4092 ele = ln->obj;
4093 addReplyBulkLen(c,ele);
4094 addReply(c,ele);
4095 addReply(c,shared.crlf);
4096 ln = reverse ? ln->backward : ln->forward[0];
4097 }
4098 }
4099 }
4100 }
4101
4102 static void zrangeCommand(redisClient *c) {
4103 zrangeGenericCommand(c,0);
4104 }
4105
4106 static void zrevrangeCommand(redisClient *c) {
4107 zrangeGenericCommand(c,1);
4108 }
4109
4110 static void zrangebyscoreCommand(redisClient *c) {
4111 robj *o;
4112 double min = strtod(c->argv[2]->ptr,NULL);
4113 double max = strtod(c->argv[3]->ptr,NULL);
4114
4115 o = lookupKeyRead(c->db,c->argv[1]);
4116 if (o == NULL) {
4117 addReply(c,shared.nullmultibulk);
4118 } else {
4119 if (o->type != REDIS_ZSET) {
4120 addReply(c,shared.wrongtypeerr);
4121 } else {
4122 zset *zsetobj = o->ptr;
4123 zskiplist *zsl = zsetobj->zsl;
4124 zskiplistNode *ln;
4125 robj *ele, *lenobj;
4126 unsigned int rangelen = 0;
4127
4128 /* Get the first node with the score >= min */
4129 ln = zslFirstWithScore(zsl,min);
4130 if (ln == NULL) {
4131 /* No element matching the speciifed interval */
4132 addReply(c,shared.emptymultibulk);
4133 return;
4134 }
4135
4136 /* We don't know in advance how many matching elements there
4137 * are in the list, so we push this object that will represent
4138 * the multi-bulk length in the output buffer, and will "fix"
4139 * it later */
4140 lenobj = createObject(REDIS_STRING,NULL);
4141 addReply(c,lenobj);
4142
4143 while(ln && ln->score <= max) {
4144 ele = ln->obj;
4145 addReplyBulkLen(c,ele);
4146 addReply(c,ele);
4147 addReply(c,shared.crlf);
4148 ln = ln->forward[0];
4149 rangelen++;
4150 }
4151 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4152 }
4153 }
4154 }
4155
4156 static void zlenCommand(redisClient *c) {
4157 robj *o;
4158 zset *zs;
4159
4160 o = lookupKeyRead(c->db,c->argv[1]);
4161 if (o == NULL) {
4162 addReply(c,shared.czero);
4163 return;
4164 } else {
4165 if (o->type != REDIS_ZSET) {
4166 addReply(c,shared.wrongtypeerr);
4167 } else {
4168 zs = o->ptr;
4169 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4170 }
4171 }
4172 }
4173
4174 static void zscoreCommand(redisClient *c) {
4175 robj *o;
4176 zset *zs;
4177
4178 o = lookupKeyRead(c->db,c->argv[1]);
4179 if (o == NULL) {
4180 addReply(c,shared.czero);
4181 return;
4182 } else {
4183 if (o->type != REDIS_ZSET) {
4184 addReply(c,shared.wrongtypeerr);
4185 } else {
4186 dictEntry *de;
4187
4188 zs = o->ptr;
4189 de = dictFind(zs->dict,c->argv[2]);
4190 if (!de) {
4191 addReply(c,shared.nullbulk);
4192 } else {
4193 char buf[128];
4194 double *score = dictGetEntryVal(de);
4195
4196 snprintf(buf,sizeof(buf),"%.16g",*score);
4197 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4198 strlen(buf),buf));
4199 }
4200 }
4201 }
4202 }
4203
4204 /* ========================= Non type-specific commands ==================== */
4205
4206 static void flushdbCommand(redisClient *c) {
4207 server.dirty += dictSize(c->db->dict);
4208 dictEmpty(c->db->dict);
4209 dictEmpty(c->db->expires);
4210 addReply(c,shared.ok);
4211 }
4212
4213 static void flushallCommand(redisClient *c) {
4214 server.dirty += emptyDb();
4215 addReply(c,shared.ok);
4216 rdbSave(server.dbfilename);
4217 server.dirty++;
4218 }
4219
4220 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4221 redisSortOperation *so = zmalloc(sizeof(*so));
4222 so->type = type;
4223 so->pattern = pattern;
4224 return so;
4225 }
4226
4227 /* Return the value associated to the key with a name obtained
4228 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4229 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4230 char *p;
4231 sds spat, ssub;
4232 robj keyobj;
4233 int prefixlen, sublen, postfixlen;
4234 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4235 struct {
4236 long len;
4237 long free;
4238 char buf[REDIS_SORTKEY_MAX+1];
4239 } keyname;
4240
4241 if (subst->encoding == REDIS_ENCODING_RAW)
4242 incrRefCount(subst);
4243 else {
4244 subst = getDecodedObject(subst);
4245 }
4246
4247 spat = pattern->ptr;
4248 ssub = subst->ptr;
4249 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4250 p = strchr(spat,'*');
4251 if (!p) return NULL;
4252
4253 prefixlen = p-spat;
4254 sublen = sdslen(ssub);
4255 postfixlen = sdslen(spat)-(prefixlen+1);
4256 memcpy(keyname.buf,spat,prefixlen);
4257 memcpy(keyname.buf+prefixlen,ssub,sublen);
4258 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4259 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4260 keyname.len = prefixlen+sublen+postfixlen;
4261
4262 keyobj.refcount = 1;
4263 keyobj.type = REDIS_STRING;
4264 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4265
4266 decrRefCount(subst);
4267
4268 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4269 return lookupKeyRead(db,&keyobj);
4270 }
4271
4272 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4273 * the additional parameter is not standard but a BSD-specific we have to
4274 * pass sorting parameters via the global 'server' structure */
4275 static int sortCompare(const void *s1, const void *s2) {
4276 const redisSortObject *so1 = s1, *so2 = s2;
4277 int cmp;
4278
4279 if (!server.sort_alpha) {
4280 /* Numeric sorting. Here it's trivial as we precomputed scores */
4281 if (so1->u.score > so2->u.score) {
4282 cmp = 1;
4283 } else if (so1->u.score < so2->u.score) {
4284 cmp = -1;
4285 } else {
4286 cmp = 0;
4287 }
4288 } else {
4289 /* Alphanumeric sorting */
4290 if (server.sort_bypattern) {
4291 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4292 /* At least one compare object is NULL */
4293 if (so1->u.cmpobj == so2->u.cmpobj)
4294 cmp = 0;
4295 else if (so1->u.cmpobj == NULL)
4296 cmp = -1;
4297 else
4298 cmp = 1;
4299 } else {
4300 /* We have both the objects, use strcoll */
4301 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4302 }
4303 } else {
4304 /* Compare elements directly */
4305 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4306 so2->obj->encoding == REDIS_ENCODING_RAW) {
4307 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4308 } else {
4309 robj *dec1, *dec2;
4310
4311 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4312 so1->obj : getDecodedObject(so1->obj);
4313 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4314 so2->obj : getDecodedObject(so2->obj);
4315 cmp = strcoll(dec1->ptr,dec2->ptr);
4316 if (dec1 != so1->obj) decrRefCount(dec1);
4317 if (dec2 != so2->obj) decrRefCount(dec2);
4318 }
4319 }
4320 }
4321 return server.sort_desc ? -cmp : cmp;
4322 }
4323
4324 /* The SORT command is the most complex command in Redis. Warning: this code
4325 * is optimized for speed and a bit less for readability */
4326 static void sortCommand(redisClient *c) {
4327 list *operations;
4328 int outputlen = 0;
4329 int desc = 0, alpha = 0;
4330 int limit_start = 0, limit_count = -1, start, end;
4331 int j, dontsort = 0, vectorlen;
4332 int getop = 0; /* GET operation counter */
4333 robj *sortval, *sortby = NULL;
4334 redisSortObject *vector; /* Resulting vector to sort */
4335
4336 /* Lookup the key to sort. It must be of the right types */
4337 sortval = lookupKeyRead(c->db,c->argv[1]);
4338 if (sortval == NULL) {
4339 addReply(c,shared.nokeyerr);
4340 return;
4341 }
4342 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4343 addReply(c,shared.wrongtypeerr);
4344 return;
4345 }
4346
4347 /* Create a list of operations to perform for every sorted element.
4348 * Operations can be GET/DEL/INCR/DECR */
4349 operations = listCreate();
4350 listSetFreeMethod(operations,zfree);
4351 j = 2;
4352
4353 /* Now we need to protect sortval incrementing its count, in the future
4354 * SORT may have options able to overwrite/delete keys during the sorting
4355 * and the sorted key itself may get destroied */
4356 incrRefCount(sortval);
4357
4358 /* The SORT command has an SQL-alike syntax, parse it */
4359 while(j < c->argc) {
4360 int leftargs = c->argc-j-1;
4361 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4362 desc = 0;
4363 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4364 desc = 1;
4365 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4366 alpha = 1;
4367 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4368 limit_start = atoi(c->argv[j+1]->ptr);
4369 limit_count = atoi(c->argv[j+2]->ptr);
4370 j+=2;
4371 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4372 sortby = c->argv[j+1];
4373 /* If the BY pattern does not contain '*', i.e. it is constant,
4374 * we don't need to sort nor to lookup the weight keys. */
4375 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4376 j++;
4377 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4378 listAddNodeTail(operations,createSortOperation(
4379 REDIS_SORT_GET,c->argv[j+1]));
4380 getop++;
4381 j++;
4382 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4383 listAddNodeTail(operations,createSortOperation(
4384 REDIS_SORT_DEL,c->argv[j+1]));
4385 j++;
4386 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4387 listAddNodeTail(operations,createSortOperation(
4388 REDIS_SORT_INCR,c->argv[j+1]));
4389 j++;
4390 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4391 listAddNodeTail(operations,createSortOperation(
4392 REDIS_SORT_DECR,c->argv[j+1]));
4393 j++;
4394 } else {
4395 decrRefCount(sortval);
4396 listRelease(operations);
4397 addReply(c,shared.syntaxerr);
4398 return;
4399 }
4400 j++;
4401 }
4402
4403 /* Load the sorting vector with all the objects to sort */
4404 vectorlen = (sortval->type == REDIS_LIST) ?
4405 listLength((list*)sortval->ptr) :
4406 dictSize((dict*)sortval->ptr);
4407 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4408 j = 0;
4409 if (sortval->type == REDIS_LIST) {
4410 list *list = sortval->ptr;
4411 listNode *ln;
4412
4413 listRewind(list);
4414 while((ln = listYield(list))) {
4415 robj *ele = ln->value;
4416 vector[j].obj = ele;
4417 vector[j].u.score = 0;
4418 vector[j].u.cmpobj = NULL;
4419 j++;
4420 }
4421 } else {
4422 dict *set = sortval->ptr;
4423 dictIterator *di;
4424 dictEntry *setele;
4425
4426 di = dictGetIterator(set);
4427 while((setele = dictNext(di)) != NULL) {
4428 vector[j].obj = dictGetEntryKey(setele);
4429 vector[j].u.score = 0;
4430 vector[j].u.cmpobj = NULL;
4431 j++;
4432 }
4433 dictReleaseIterator(di);
4434 }
4435 assert(j == vectorlen);
4436
4437 /* Now it's time to load the right scores in the sorting vector */
4438 if (dontsort == 0) {
4439 for (j = 0; j < vectorlen; j++) {
4440 if (sortby) {
4441 robj *byval;
4442
4443 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4444 if (!byval || byval->type != REDIS_STRING) continue;
4445 if (alpha) {
4446 if (byval->encoding == REDIS_ENCODING_RAW) {
4447 vector[j].u.cmpobj = byval;
4448 incrRefCount(byval);
4449 } else {
4450 vector[j].u.cmpobj = getDecodedObject(byval);
4451 }
4452 } else {
4453 if (byval->encoding == REDIS_ENCODING_RAW) {
4454 vector[j].u.score = strtod(byval->ptr,NULL);
4455 } else {
4456 if (byval->encoding == REDIS_ENCODING_INT) {
4457 vector[j].u.score = (long)byval->ptr;
4458 } else
4459 assert(1 != 1);
4460 }
4461 }
4462 } else {
4463 if (!alpha) {
4464 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4465 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4466 else {
4467 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4468 vector[j].u.score = (long) vector[j].obj->ptr;
4469 else
4470 assert(1 != 1);
4471 }
4472 }
4473 }
4474 }
4475 }
4476
4477 /* We are ready to sort the vector... perform a bit of sanity check
4478 * on the LIMIT option too. We'll use a partial version of quicksort. */
4479 start = (limit_start < 0) ? 0 : limit_start;
4480 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4481 if (start >= vectorlen) {
4482 start = vectorlen-1;
4483 end = vectorlen-2;
4484 }
4485 if (end >= vectorlen) end = vectorlen-1;
4486
4487 if (dontsort == 0) {
4488 server.sort_desc = desc;
4489 server.sort_alpha = alpha;
4490 server.sort_bypattern = sortby ? 1 : 0;
4491 if (sortby && (start != 0 || end != vectorlen-1))
4492 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4493 else
4494 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4495 }
4496
4497 /* Send command output to the output buffer, performing the specified
4498 * GET/DEL/INCR/DECR operations if any. */
4499 outputlen = getop ? getop*(end-start+1) : end-start+1;
4500 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4501 for (j = start; j <= end; j++) {
4502 listNode *ln;
4503 if (!getop) {
4504 addReplyBulkLen(c,vector[j].obj);
4505 addReply(c,vector[j].obj);
4506 addReply(c,shared.crlf);
4507 }
4508 listRewind(operations);
4509 while((ln = listYield(operations))) {
4510 redisSortOperation *sop = ln->value;
4511 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4512 vector[j].obj);
4513
4514 if (sop->type == REDIS_SORT_GET) {
4515 if (!val || val->type != REDIS_STRING) {
4516 addReply(c,shared.nullbulk);
4517 } else {
4518 addReplyBulkLen(c,val);
4519 addReply(c,val);
4520 addReply(c,shared.crlf);
4521 }
4522 } else if (sop->type == REDIS_SORT_DEL) {
4523 /* TODO */
4524 }
4525 }
4526 }
4527
4528 /* Cleanup */
4529 decrRefCount(sortval);
4530 listRelease(operations);
4531 for (j = 0; j < vectorlen; j++) {
4532 if (sortby && alpha && vector[j].u.cmpobj)
4533 decrRefCount(vector[j].u.cmpobj);
4534 }
4535 zfree(vector);
4536 }
4537
4538 static void infoCommand(redisClient *c) {
4539 sds info;
4540 time_t uptime = time(NULL)-server.stat_starttime;
4541 int j;
4542
4543 info = sdscatprintf(sdsempty(),
4544 "redis_version:%s\r\n"
4545 "arch_bits:%s\r\n"
4546 "uptime_in_seconds:%d\r\n"
4547 "uptime_in_days:%d\r\n"
4548 "connected_clients:%d\r\n"
4549 "connected_slaves:%d\r\n"
4550 "used_memory:%zu\r\n"
4551 "changes_since_last_save:%lld\r\n"
4552 "bgsave_in_progress:%d\r\n"
4553 "last_save_time:%d\r\n"
4554 "total_connections_received:%lld\r\n"
4555 "total_commands_processed:%lld\r\n"
4556 "role:%s\r\n"
4557 ,REDIS_VERSION,
4558 (sizeof(long) == 8) ? "64" : "32",
4559 uptime,
4560 uptime/(3600*24),
4561 listLength(server.clients)-listLength(server.slaves),
4562 listLength(server.slaves),
4563 server.usedmemory,
4564 server.dirty,
4565 server.bgsaveinprogress,
4566 server.lastsave,
4567 server.stat_numconnections,
4568 server.stat_numcommands,
4569 server.masterhost == NULL ? "master" : "slave"
4570 );
4571 if (server.masterhost) {
4572 info = sdscatprintf(info,
4573 "master_host:%s\r\n"
4574 "master_port:%d\r\n"
4575 "master_link_status:%s\r\n"
4576 "master_last_io_seconds_ago:%d\r\n"
4577 ,server.masterhost,
4578 server.masterport,
4579 (server.replstate == REDIS_REPL_CONNECTED) ?
4580 "up" : "down",
4581 (int)(time(NULL)-server.master->lastinteraction)
4582 );
4583 }
4584 for (j = 0; j < server.dbnum; j++) {
4585 long long keys, vkeys;
4586
4587 keys = dictSize(server.db[j].dict);
4588 vkeys = dictSize(server.db[j].expires);
4589 if (keys || vkeys) {
4590 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4591 j, keys, vkeys);
4592 }
4593 }
4594 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4595 addReplySds(c,info);
4596 addReply(c,shared.crlf);
4597 }
4598
4599 static void monitorCommand(redisClient *c) {
4600 /* ignore MONITOR if aleady slave or in monitor mode */
4601 if (c->flags & REDIS_SLAVE) return;
4602
4603 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4604 c->slaveseldb = 0;
4605 listAddNodeTail(server.monitors,c);
4606 addReply(c,shared.ok);
4607 }
4608
4609 /* ================================= Expire ================================= */
4610 static int removeExpire(redisDb *db, robj *key) {
4611 if (dictDelete(db->expires,key) == DICT_OK) {
4612 return 1;
4613 } else {
4614 return 0;
4615 }
4616 }
4617
4618 static int setExpire(redisDb *db, robj *key, time_t when) {
4619 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4620 return 0;
4621 } else {
4622 incrRefCount(key);
4623 return 1;
4624 }
4625 }
4626
4627 /* Return the expire time of the specified key, or -1 if no expire
4628 * is associated with this key (i.e. the key is non volatile) */
4629 static time_t getExpire(redisDb *db, robj *key) {
4630 dictEntry *de;
4631
4632 /* No expire? return ASAP */
4633 if (dictSize(db->expires) == 0 ||
4634 (de = dictFind(db->expires,key)) == NULL) return -1;
4635
4636 return (time_t) dictGetEntryVal(de);
4637 }
4638
4639 static int expireIfNeeded(redisDb *db, robj *key) {
4640 time_t when;
4641 dictEntry *de;
4642
4643 /* No expire? return ASAP */
4644 if (dictSize(db->expires) == 0 ||
4645 (de = dictFind(db->expires,key)) == NULL) return 0;
4646
4647 /* Lookup the expire */
4648 when = (time_t) dictGetEntryVal(de);
4649 if (time(NULL) <= when) return 0;
4650
4651 /* Delete the key */
4652 dictDelete(db->expires,key);
4653 return dictDelete(db->dict,key) == DICT_OK;
4654 }
4655
4656 static int deleteIfVolatile(redisDb *db, robj *key) {
4657 dictEntry *de;
4658
4659 /* No expire? return ASAP */
4660 if (dictSize(db->expires) == 0 ||
4661 (de = dictFind(db->expires,key)) == NULL) return 0;
4662
4663 /* Delete the key */
4664 server.dirty++;
4665 dictDelete(db->expires,key);
4666 return dictDelete(db->dict,key) == DICT_OK;
4667 }
4668
4669 static void expireCommand(redisClient *c) {
4670 dictEntry *de;
4671 int seconds = atoi(c->argv[2]->ptr);
4672
4673 de = dictFind(c->db->dict,c->argv[1]);
4674 if (de == NULL) {
4675 addReply(c,shared.czero);
4676 return;
4677 }
4678 if (seconds <= 0) {
4679 addReply(c, shared.czero);
4680 return;
4681 } else {
4682 time_t when = time(NULL)+seconds;
4683 if (setExpire(c->db,c->argv[1],when)) {
4684 addReply(c,shared.cone);
4685 server.dirty++;
4686 } else {
4687 addReply(c,shared.czero);
4688 }
4689 return;
4690 }
4691 }
4692
4693 static void ttlCommand(redisClient *c) {
4694 time_t expire;
4695 int ttl = -1;
4696
4697 expire = getExpire(c->db,c->argv[1]);
4698 if (expire != -1) {
4699 ttl = (int) (expire-time(NULL));
4700 if (ttl < 0) ttl = -1;
4701 }
4702 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4703 }
4704
4705 static void msetGenericCommand(redisClient *c, int nx) {
4706 int j;
4707
4708 if ((c->argc % 2) == 0) {
4709 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4710 return;
4711 }
4712 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4713 * set nothing at all if at least one already key exists. */
4714 if (nx) {
4715 for (j = 1; j < c->argc; j += 2) {
4716 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4717 addReply(c, shared.czero);
4718 return;
4719 }
4720 }
4721 }
4722
4723 for (j = 1; j < c->argc; j += 2) {
4724 int retval;
4725
4726 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4727 if (retval == DICT_ERR) {
4728 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4729 incrRefCount(c->argv[j+1]);
4730 } else {
4731 incrRefCount(c->argv[j]);
4732 incrRefCount(c->argv[j+1]);
4733 }
4734 removeExpire(c->db,c->argv[j]);
4735 }
4736 server.dirty += (c->argc-1)/2;
4737 addReply(c, nx ? shared.cone : shared.ok);
4738 }
4739
4740 static void msetCommand(redisClient *c) {
4741 msetGenericCommand(c,0);
4742 }
4743
4744 static void msetnxCommand(redisClient *c) {
4745 msetGenericCommand(c,1);
4746 }
4747
4748 /* =============================== Replication ============================= */
4749
4750 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4751 ssize_t nwritten, ret = size;
4752 time_t start = time(NULL);
4753
4754 timeout++;
4755 while(size) {
4756 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4757 nwritten = write(fd,ptr,size);
4758 if (nwritten == -1) return -1;
4759 ptr += nwritten;
4760 size -= nwritten;
4761 }
4762 if ((time(NULL)-start) > timeout) {
4763 errno = ETIMEDOUT;
4764 return -1;
4765 }
4766 }
4767 return ret;
4768 }
4769
4770 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4771 ssize_t nread, totread = 0;
4772 time_t start = time(NULL);
4773
4774 timeout++;
4775 while(size) {
4776 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4777 nread = read(fd,ptr,size);
4778 if (nread == -1) return -1;
4779 ptr += nread;
4780 size -= nread;
4781 totread += nread;
4782 }
4783 if ((time(NULL)-start) > timeout) {
4784 errno = ETIMEDOUT;
4785 return -1;
4786 }
4787 }
4788 return totread;
4789 }
4790
4791 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4792 ssize_t nread = 0;
4793
4794 size--;
4795 while(size) {
4796 char c;
4797
4798 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4799 if (c == '\n') {
4800 *ptr = '\0';
4801 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4802 return nread;
4803 } else {
4804 *ptr++ = c;
4805 *ptr = '\0';
4806 nread++;
4807 }
4808 }
4809 return nread;
4810 }
4811
4812 static void syncCommand(redisClient *c) {
4813 /* ignore SYNC if aleady slave or in monitor mode */
4814 if (c->flags & REDIS_SLAVE) return;
4815
4816 /* SYNC can't be issued when the server has pending data to send to
4817 * the client about already issued commands. We need a fresh reply
4818 * buffer registering the differences between the BGSAVE and the current
4819 * dataset, so that we can copy to other slaves if needed. */
4820 if (listLength(c->reply) != 0) {
4821 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4822 return;
4823 }
4824
4825 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4826 /* Here we need to check if there is a background saving operation
4827 * in progress, or if it is required to start one */
4828 if (server.bgsaveinprogress) {
4829 /* Ok a background save is in progress. Let's check if it is a good
4830 * one for replication, i.e. if there is another slave that is
4831 * registering differences since the server forked to save */
4832 redisClient *slave;
4833 listNode *ln;
4834
4835 listRewind(server.slaves);
4836 while((ln = listYield(server.slaves))) {
4837 slave = ln->value;
4838 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4839 }
4840 if (ln) {
4841 /* Perfect, the server is already registering differences for
4842 * another slave. Set the right state, and copy the buffer. */
4843 listRelease(c->reply);
4844 c->reply = listDup(slave->reply);
4845 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4846 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4847 } else {
4848 /* No way, we need to wait for the next BGSAVE in order to
4849 * register differences */
4850 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4851 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4852 }
4853 } else {
4854 /* Ok we don't have a BGSAVE in progress, let's start one */
4855 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4856 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4857 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4858 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4859 return;
4860 }
4861 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4862 }
4863 c->repldbfd = -1;
4864 c->flags |= REDIS_SLAVE;
4865 c->slaveseldb = 0;
4866 listAddNodeTail(server.slaves,c);
4867 return;
4868 }
4869
4870 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4871 redisClient *slave = privdata;
4872 REDIS_NOTUSED(el);
4873 REDIS_NOTUSED(mask);
4874 char buf[REDIS_IOBUF_LEN];
4875 ssize_t nwritten, buflen;
4876
4877 if (slave->repldboff == 0) {
4878 /* Write the bulk write count before to transfer the DB. In theory here
4879 * we don't know how much room there is in the output buffer of the
4880 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4881 * operations) will never be smaller than the few bytes we need. */
4882 sds bulkcount;
4883
4884 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4885 slave->repldbsize);
4886 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4887 {
4888 sdsfree(bulkcount);
4889 freeClient(slave);
4890 return;
4891 }
4892 sdsfree(bulkcount);
4893 }
4894 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4895 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4896 if (buflen <= 0) {
4897 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4898 (buflen == 0) ? "premature EOF" : strerror(errno));
4899 freeClient(slave);
4900 return;
4901 }
4902 if ((nwritten = write(fd,buf,buflen)) == -1) {
4903 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4904 strerror(errno));
4905 freeClient(slave);
4906 return;
4907 }
4908 slave->repldboff += nwritten;
4909 if (slave->repldboff == slave->repldbsize) {
4910 close(slave->repldbfd);
4911 slave->repldbfd = -1;
4912 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4913 slave->replstate = REDIS_REPL_ONLINE;
4914 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4915 sendReplyToClient, slave, NULL) == AE_ERR) {
4916 freeClient(slave);
4917 return;
4918 }
4919 addReplySds(slave,sdsempty());
4920 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
4921 }
4922 }
4923
4924 /* This function is called at the end of every backgrond saving.
4925 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
4926 * otherwise REDIS_ERR is passed to the function.
4927 *
4928 * The goal of this function is to handle slaves waiting for a successful
4929 * background saving in order to perform non-blocking synchronization. */
4930 static void updateSlavesWaitingBgsave(int bgsaveerr) {
4931 listNode *ln;
4932 int startbgsave = 0;
4933
4934 listRewind(server.slaves);
4935 while((ln = listYield(server.slaves))) {
4936 redisClient *slave = ln->value;
4937
4938 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
4939 startbgsave = 1;
4940 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4941 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
4942 struct redis_stat buf;
4943
4944 if (bgsaveerr != REDIS_OK) {
4945 freeClient(slave);
4946 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
4947 continue;
4948 }
4949 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
4950 redis_fstat(slave->repldbfd,&buf) == -1) {
4951 freeClient(slave);
4952 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
4953 continue;
4954 }
4955 slave->repldboff = 0;
4956 slave->repldbsize = buf.st_size;
4957 slave->replstate = REDIS_REPL_SEND_BULK;
4958 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4959 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
4960 freeClient(slave);
4961 continue;
4962 }
4963 }
4964 }
4965 if (startbgsave) {
4966 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4967 listRewind(server.slaves);
4968 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
4969 while((ln = listYield(server.slaves))) {
4970 redisClient *slave = ln->value;
4971
4972 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
4973 freeClient(slave);
4974 }
4975 }
4976 }
4977 }
4978
4979 static int syncWithMaster(void) {
4980 char buf[1024], tmpfile[256];
4981 int dumpsize;
4982 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4983 int dfd;
4984
4985 if (fd == -1) {
4986 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4987 strerror(errno));
4988 return REDIS_ERR;
4989 }
4990 /* Issue the SYNC command */
4991 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4992 close(fd);
4993 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4994 strerror(errno));
4995 return REDIS_ERR;
4996 }
4997 /* Read the bulk write count */
4998 if (syncReadLine(fd,buf,1024,3600) == -1) {
4999 close(fd);
5000 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5001 strerror(errno));
5002 return REDIS_ERR;
5003 }
5004 dumpsize = atoi(buf+1);
5005 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5006 /* Read the bulk write data on a temp file */
5007 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5008 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5009 if (dfd == -1) {
5010 close(fd);
5011 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5012 return REDIS_ERR;
5013 }
5014 while(dumpsize) {
5015 int nread, nwritten;
5016
5017 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5018 if (nread == -1) {
5019 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5020 strerror(errno));
5021 close(fd);
5022 close(dfd);
5023 return REDIS_ERR;
5024 }
5025 nwritten = write(dfd,buf,nread);
5026 if (nwritten == -1) {
5027 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5028 close(fd);
5029 close(dfd);
5030 return REDIS_ERR;
5031 }
5032 dumpsize -= nread;
5033 }
5034 close(dfd);
5035 if (rename(tmpfile,server.dbfilename) == -1) {
5036 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5037 unlink(tmpfile);
5038 close(fd);
5039 return REDIS_ERR;
5040 }
5041 emptyDb();
5042 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5043 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5044 close(fd);
5045 return REDIS_ERR;
5046 }
5047 server.master = createClient(fd);
5048 server.master->flags |= REDIS_MASTER;
5049 server.replstate = REDIS_REPL_CONNECTED;
5050 return REDIS_OK;
5051 }
5052
5053 static void slaveofCommand(redisClient *c) {
5054 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5055 !strcasecmp(c->argv[2]->ptr,"one")) {
5056 if (server.masterhost) {
5057 sdsfree(server.masterhost);
5058 server.masterhost = NULL;
5059 if (server.master) freeClient(server.master);
5060 server.replstate = REDIS_REPL_NONE;
5061 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5062 }
5063 } else {
5064 sdsfree(server.masterhost);
5065 server.masterhost = sdsdup(c->argv[1]->ptr);
5066 server.masterport = atoi(c->argv[2]->ptr);
5067 if (server.master) freeClient(server.master);
5068 server.replstate = REDIS_REPL_CONNECT;
5069 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5070 server.masterhost, server.masterport);
5071 }
5072 addReply(c,shared.ok);
5073 }
5074
5075 /* ============================ Maxmemory directive ======================== */
5076
5077 /* This function gets called when 'maxmemory' is set on the config file to limit
5078 * the max memory used by the server, and we are out of memory.
5079 * This function will try to, in order:
5080 *
5081 * - Free objects from the free list
5082 * - Try to remove keys with an EXPIRE set
5083 *
5084 * It is not possible to free enough memory to reach used-memory < maxmemory
5085 * the server will start refusing commands that will enlarge even more the
5086 * memory usage.
5087 */
5088 static void freeMemoryIfNeeded(void) {
5089 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5090 if (listLength(server.objfreelist)) {
5091 robj *o;
5092
5093 listNode *head = listFirst(server.objfreelist);
5094 o = listNodeValue(head);
5095 listDelNode(server.objfreelist,head);
5096 zfree(o);
5097 } else {
5098 int j, k, freed = 0;
5099
5100 for (j = 0; j < server.dbnum; j++) {
5101 int minttl = -1;
5102 robj *minkey = NULL;
5103 struct dictEntry *de;
5104
5105 if (dictSize(server.db[j].expires)) {
5106 freed = 1;
5107 /* From a sample of three keys drop the one nearest to
5108 * the natural expire */
5109 for (k = 0; k < 3; k++) {
5110 time_t t;
5111
5112 de = dictGetRandomKey(server.db[j].expires);
5113 t = (time_t) dictGetEntryVal(de);
5114 if (minttl == -1 || t < minttl) {
5115 minkey = dictGetEntryKey(de);
5116 minttl = t;
5117 }
5118 }
5119 deleteKey(server.db+j,minkey);
5120 }
5121 }
5122 if (!freed) return; /* nothing to free... */
5123 }
5124 }
5125 }
5126
5127 /* ================================= Debugging ============================== */
5128
5129 static void debugCommand(redisClient *c) {
5130 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5131 *((char*)-1) = 'x';
5132 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5133 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5134 robj *key, *val;
5135
5136 if (!de) {
5137 addReply(c,shared.nokeyerr);
5138 return;
5139 }
5140 key = dictGetEntryKey(de);
5141 val = dictGetEntryVal(de);
5142 addReplySds(c,sdscatprintf(sdsempty(),
5143 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5144 key, key->refcount, val, val->refcount, val->encoding));
5145 } else {
5146 addReplySds(c,sdsnew(
5147 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5148 }
5149 }
5150
5151 #ifdef HAVE_BACKTRACE
5152 static struct redisFunctionSym symsTable[] = {
5153 {"compareStringObjects", (unsigned long)compareStringObjects},
5154 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5155 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5156 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5157 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5158 {"freeStringObject", (unsigned long)freeStringObject},
5159 {"freeListObject", (unsigned long)freeListObject},
5160 {"freeSetObject", (unsigned long)freeSetObject},
5161 {"decrRefCount", (unsigned long)decrRefCount},
5162 {"createObject", (unsigned long)createObject},
5163 {"freeClient", (unsigned long)freeClient},
5164 {"rdbLoad", (unsigned long)rdbLoad},
5165 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5166 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5167 {"addReply", (unsigned long)addReply},
5168 {"addReplySds", (unsigned long)addReplySds},
5169 {"incrRefCount", (unsigned long)incrRefCount},
5170 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5171 {"createStringObject", (unsigned long)createStringObject},
5172 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5173 {"syncWithMaster", (unsigned long)syncWithMaster},
5174 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5175 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5176 {"getDecodedObject", (unsigned long)getDecodedObject},
5177 {"removeExpire", (unsigned long)removeExpire},
5178 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5179 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5180 {"deleteKey", (unsigned long)deleteKey},
5181 {"getExpire", (unsigned long)getExpire},
5182 {"setExpire", (unsigned long)setExpire},
5183 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5184 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5185 {"authCommand", (unsigned long)authCommand},
5186 {"pingCommand", (unsigned long)pingCommand},
5187 {"echoCommand", (unsigned long)echoCommand},
5188 {"setCommand", (unsigned long)setCommand},
5189 {"setnxCommand", (unsigned long)setnxCommand},
5190 {"getCommand", (unsigned long)getCommand},
5191 {"delCommand", (unsigned long)delCommand},
5192 {"existsCommand", (unsigned long)existsCommand},
5193 {"incrCommand", (unsigned long)incrCommand},
5194 {"decrCommand", (unsigned long)decrCommand},
5195 {"incrbyCommand", (unsigned long)incrbyCommand},
5196 {"decrbyCommand", (unsigned long)decrbyCommand},
5197 {"selectCommand", (unsigned long)selectCommand},
5198 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5199 {"keysCommand", (unsigned long)keysCommand},
5200 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5201 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5202 {"saveCommand", (unsigned long)saveCommand},
5203 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5204 {"shutdownCommand", (unsigned long)shutdownCommand},
5205 {"moveCommand", (unsigned long)moveCommand},
5206 {"renameCommand", (unsigned long)renameCommand},
5207 {"renamenxCommand", (unsigned long)renamenxCommand},
5208 {"lpushCommand", (unsigned long)lpushCommand},
5209 {"rpushCommand", (unsigned long)rpushCommand},
5210 {"lpopCommand", (unsigned long)lpopCommand},
5211 {"rpopCommand", (unsigned long)rpopCommand},
5212 {"llenCommand", (unsigned long)llenCommand},
5213 {"lindexCommand", (unsigned long)lindexCommand},
5214 {"lrangeCommand", (unsigned long)lrangeCommand},
5215 {"ltrimCommand", (unsigned long)ltrimCommand},
5216 {"typeCommand", (unsigned long)typeCommand},
5217 {"lsetCommand", (unsigned long)lsetCommand},
5218 {"saddCommand", (unsigned long)saddCommand},
5219 {"sremCommand", (unsigned long)sremCommand},
5220 {"smoveCommand", (unsigned long)smoveCommand},
5221 {"sismemberCommand", (unsigned long)sismemberCommand},
5222 {"scardCommand", (unsigned long)scardCommand},
5223 {"spopCommand", (unsigned long)spopCommand},
5224 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5225 {"sinterCommand", (unsigned long)sinterCommand},
5226 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5227 {"sunionCommand", (unsigned long)sunionCommand},
5228 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5229 {"sdiffCommand", (unsigned long)sdiffCommand},
5230 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5231 {"syncCommand", (unsigned long)syncCommand},
5232 {"flushdbCommand", (unsigned long)flushdbCommand},
5233 {"flushallCommand", (unsigned long)flushallCommand},
5234 {"sortCommand", (unsigned long)sortCommand},
5235 {"lremCommand", (unsigned long)lremCommand},
5236 {"infoCommand", (unsigned long)infoCommand},
5237 {"mgetCommand", (unsigned long)mgetCommand},
5238 {"monitorCommand", (unsigned long)monitorCommand},
5239 {"expireCommand", (unsigned long)expireCommand},
5240 {"getsetCommand", (unsigned long)getsetCommand},
5241 {"ttlCommand", (unsigned long)ttlCommand},
5242 {"slaveofCommand", (unsigned long)slaveofCommand},
5243 {"debugCommand", (unsigned long)debugCommand},
5244 {"processCommand", (unsigned long)processCommand},
5245 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5246 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5247 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5248 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5249 {"msetCommand", (unsigned long)msetCommand},
5250 {"msetnxCommand", (unsigned long)msetnxCommand},
5251 {"zslCreateNode", (unsigned long)zslCreateNode},
5252 {"zslCreate", (unsigned long)zslCreate},
5253 {"zslFreeNode",(unsigned long)zslFreeNode},
5254 {"zslFree",(unsigned long)zslFree},
5255 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5256 {"zslInsert",(unsigned long)zslInsert},
5257 {"zslDelete",(unsigned long)zslDelete},
5258 {"createZsetObject",(unsigned long)createZsetObject},
5259 {"zaddCommand",(unsigned long)zaddCommand},
5260 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5261 {"zrangeCommand",(unsigned long)zrangeCommand},
5262 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5263 {"zremCommand",(unsigned long)zremCommand},
5264 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5265 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5266 {NULL,0}
5267 };
5268
5269 /* This function try to convert a pointer into a function name. It's used in
5270 * oreder to provide a backtrace under segmentation fault that's able to
5271 * display functions declared as static (otherwise the backtrace is useless). */
5272 static char *findFuncName(void *pointer, unsigned long *offset){
5273 int i, ret = -1;
5274 unsigned long off, minoff = 0;
5275
5276 /* Try to match against the Symbol with the smallest offset */
5277 for (i=0; symsTable[i].pointer; i++) {
5278 unsigned long lp = (unsigned long) pointer;
5279
5280 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5281 off=lp-symsTable[i].pointer;
5282 if (ret < 0 || off < minoff) {
5283 minoff=off;
5284 ret=i;
5285 }
5286 }
5287 }
5288 if (ret == -1) return NULL;
5289 *offset = minoff;
5290 return symsTable[ret].name;
5291 }
5292
5293 static void *getMcontextEip(ucontext_t *uc) {
5294 #if defined(__FreeBSD__)
5295 return (void*) uc->uc_mcontext.mc_eip;
5296 #elif defined(__dietlibc__)
5297 return (void*) uc->uc_mcontext.eip;
5298 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5299 return (void*) uc->uc_mcontext->__ss.__eip;
5300 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5301 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5302 return (void*) uc->uc_mcontext->__ss.__rip;
5303 #else
5304 return (void*) uc->uc_mcontext->__ss.__eip;
5305 #endif
5306 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5307 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5308 #elif defined(__ia64__) /* Linux IA64 */
5309 return (void*) uc->uc_mcontext.sc_ip;
5310 #else
5311 return NULL;
5312 #endif
5313 }
5314
5315 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5316 void *trace[100];
5317 char **messages = NULL;
5318 int i, trace_size = 0;
5319 unsigned long offset=0;
5320 time_t uptime = time(NULL)-server.stat_starttime;
5321 ucontext_t *uc = (ucontext_t*) secret;
5322 REDIS_NOTUSED(info);
5323
5324 redisLog(REDIS_WARNING,
5325 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5326 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5327 "redis_version:%s; "
5328 "uptime_in_seconds:%d; "
5329 "connected_clients:%d; "
5330 "connected_slaves:%d; "
5331 "used_memory:%zu; "
5332 "changes_since_last_save:%lld; "
5333 "bgsave_in_progress:%d; "
5334 "last_save_time:%d; "
5335 "total_connections_received:%lld; "
5336 "total_commands_processed:%lld; "
5337 "role:%s;"
5338 ,REDIS_VERSION,
5339 uptime,
5340 listLength(server.clients)-listLength(server.slaves),
5341 listLength(server.slaves),
5342 server.usedmemory,
5343 server.dirty,
5344 server.bgsaveinprogress,
5345 server.lastsave,
5346 server.stat_numconnections,
5347 server.stat_numcommands,
5348 server.masterhost == NULL ? "master" : "slave"
5349 ));
5350
5351 trace_size = backtrace(trace, 100);
5352 /* overwrite sigaction with caller's address */
5353 if (getMcontextEip(uc) != NULL) {
5354 trace[1] = getMcontextEip(uc);
5355 }
5356 messages = backtrace_symbols(trace, trace_size);
5357
5358 for (i=1; i<trace_size; ++i) {
5359 char *fn = findFuncName(trace[i], &offset), *p;
5360
5361 p = strchr(messages[i],'+');
5362 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5363 redisLog(REDIS_WARNING,"%s", messages[i]);
5364 } else {
5365 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5366 }
5367 }
5368 free(messages);
5369 exit(0);
5370 }
5371
5372 static void setupSigSegvAction(void) {
5373 struct sigaction act;
5374
5375 sigemptyset (&act.sa_mask);
5376 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5377 * is used. Otherwise, sa_handler is used */
5378 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5379 act.sa_sigaction = segvHandler;
5380 sigaction (SIGSEGV, &act, NULL);
5381 sigaction (SIGBUS, &act, NULL);
5382 sigaction (SIGFPE, &act, NULL);
5383 sigaction (SIGILL, &act, NULL);
5384 sigaction (SIGBUS, &act, NULL);
5385 return;
5386 }
5387 #else /* HAVE_BACKTRACE */
5388 static void setupSigSegvAction(void) {
5389 }
5390 #endif /* HAVE_BACKTRACE */
5391
5392 /* =================================== Main! ================================ */
5393
5394 #ifdef __linux__
5395 int linuxOvercommitMemoryValue(void) {
5396 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5397 char buf[64];
5398
5399 if (!fp) return -1;
5400 if (fgets(buf,64,fp) == NULL) {
5401 fclose(fp);
5402 return -1;
5403 }
5404 fclose(fp);
5405
5406 return atoi(buf);
5407 }
5408
5409 void linuxOvercommitMemoryWarning(void) {
5410 if (linuxOvercommitMemoryValue() == 0) {
5411 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5412 }
5413 }
5414 #endif /* __linux__ */
5415
5416 static void daemonize(void) {
5417 int fd;
5418 FILE *fp;
5419
5420 if (fork() != 0) exit(0); /* parent exits */
5421 setsid(); /* create a new session */
5422
5423 /* Every output goes to /dev/null. If Redis is daemonized but
5424 * the 'logfile' is set to 'stdout' in the configuration file
5425 * it will not log at all. */
5426 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5427 dup2(fd, STDIN_FILENO);
5428 dup2(fd, STDOUT_FILENO);
5429 dup2(fd, STDERR_FILENO);
5430 if (fd > STDERR_FILENO) close(fd);
5431 }
5432 /* Try to write the pid file */
5433 fp = fopen(server.pidfile,"w");
5434 if (fp) {
5435 fprintf(fp,"%d\n",getpid());
5436 fclose(fp);
5437 }
5438 }
5439
5440 int main(int argc, char **argv) {
5441 initServerConfig();
5442 if (argc == 2) {
5443 ResetServerSaveParams();
5444 loadServerConfig(argv[1]);
5445 } else if (argc > 2) {
5446 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5447 exit(1);
5448 } else {
5449 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5450 }
5451 initServer();
5452 if (server.daemonize) daemonize();
5453 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5454 #ifdef __linux__
5455 linuxOvercommitMemoryWarning();
5456 #endif
5457 if (rdbLoad(server.dbfilename) == REDIS_OK)
5458 redisLog(REDIS_NOTICE,"DB loaded from disk");
5459 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5460 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5461 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5462 aeMain(server.el);
5463 aeDeleteEventLoop(server.el);
5464 return 0;
5465 }