]> git.saurik.com Git - redis.git/blob - redis.c
86a06af16a728ce3065a5475da6f4e2bd8f63260
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
181
182 /* Log levels */
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
186
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
189
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
192
193 /*================================= Data types ============================== */
194
195 /* A redis object, that is a type able to hold a string / list / set */
196 typedef struct redisObject {
197 void *ptr;
198 unsigned char type;
199 unsigned char encoding;
200 unsigned char notused[2];
201 int refcount;
202 } robj;
203
204 typedef struct redisDb {
205 dict *dict;
206 dict *expires;
207 int id;
208 } redisDb;
209
210 /* With multiplexing we need to take per-clinet state.
211 * Clients are taken in a liked list. */
212 typedef struct redisClient {
213 int fd;
214 redisDb *db;
215 int dictid;
216 sds querybuf;
217 robj **argv, **mbargv;
218 int argc, mbargc;
219 int bulklen; /* bulk read len. -1 if not in bulk read mode */
220 int multibulk; /* multi bulk command format active */
221 list *reply;
222 int sentlen;
223 time_t lastinteraction; /* time of the last interaction, used for timeout */
224 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
225 int slaveseldb; /* slave selected db, if this client is a slave */
226 int authenticated; /* when requirepass is non-NULL */
227 int replstate; /* replication state if this is a slave */
228 int repldbfd; /* replication DB file descriptor */
229 long repldboff; /* replication DB file offset */
230 off_t repldbsize; /* replication DB file size */
231 } redisClient;
232
233 struct saveparam {
234 time_t seconds;
235 int changes;
236 };
237
238 /* Global server state structure */
239 struct redisServer {
240 int port;
241 int fd;
242 redisDb *db;
243 dict *sharingpool;
244 unsigned int sharingpoolsize;
245 long long dirty; /* changes to DB from the last save */
246 list *clients;
247 list *slaves, *monitors;
248 char neterr[ANET_ERR_LEN];
249 aeEventLoop *el;
250 int cronloops; /* number of times the cron function run */
251 list *objfreelist; /* A list of freed objects to avoid malloc() */
252 time_t lastsave; /* Unix time of last save succeeede */
253 size_t usedmemory; /* Used memory in megabytes */
254 /* Fields used only for stats */
255 time_t stat_starttime; /* server start time */
256 long long stat_numcommands; /* number of processed commands */
257 long long stat_numconnections; /* number of connections received */
258 /* Configuration */
259 int verbosity;
260 int glueoutputbuf;
261 int maxidletime;
262 int dbnum;
263 int daemonize;
264 char *pidfile;
265 int bgsaveinprogress;
266 pid_t bgsavechildpid;
267 struct saveparam *saveparams;
268 int saveparamslen;
269 char *logfile;
270 char *bindaddr;
271 char *dbfilename;
272 char *requirepass;
273 int shareobjects;
274 /* Replication related */
275 int isslave;
276 char *masterhost;
277 int masterport;
278 redisClient *master; /* client that is master for this slave */
279 int replstate;
280 unsigned int maxclients;
281 unsigned long maxmemory;
282 /* Sort parameters - qsort_r() is only available under BSD so we
283 * have to take this state global, in order to pass it to sortCompare() */
284 int sort_desc;
285 int sort_alpha;
286 int sort_bypattern;
287 };
288
289 typedef void redisCommandProc(redisClient *c);
290 struct redisCommand {
291 char *name;
292 redisCommandProc *proc;
293 int arity;
294 int flags;
295 };
296
297 struct redisFunctionSym {
298 char *name;
299 unsigned long pointer;
300 };
301
302 typedef struct _redisSortObject {
303 robj *obj;
304 union {
305 double score;
306 robj *cmpobj;
307 } u;
308 } redisSortObject;
309
310 typedef struct _redisSortOperation {
311 int type;
312 robj *pattern;
313 } redisSortOperation;
314
315 /* ZSETs use a specialized version of Skiplists */
316
317 typedef struct zskiplistNode {
318 struct zskiplistNode **forward;
319 struct zskiplistNode *backward;
320 double score;
321 robj *obj;
322 } zskiplistNode;
323
324 typedef struct zskiplist {
325 struct zskiplistNode *header, *tail;
326 unsigned long length;
327 int level;
328 } zskiplist;
329
330 typedef struct zset {
331 dict *dict;
332 zskiplist *zsl;
333 } zset;
334
335 /* Our shared "common" objects */
336
337 struct sharedObjectsStruct {
338 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
339 *colon, *nullbulk, *nullmultibulk,
340 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
341 *outofrangeerr, *plus,
342 *select0, *select1, *select2, *select3, *select4,
343 *select5, *select6, *select7, *select8, *select9;
344 } shared;
345
346 /* Global vars that are actally used as constants. The following double
347 * values are used for double on-disk serialization, and are initialized
348 * at runtime to avoid strange compiler optimizations. */
349
350 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
351
352 /*================================ Prototypes =============================== */
353
354 static void freeStringObject(robj *o);
355 static void freeListObject(robj *o);
356 static void freeSetObject(robj *o);
357 static void decrRefCount(void *o);
358 static robj *createObject(int type, void *ptr);
359 static void freeClient(redisClient *c);
360 static int rdbLoad(char *filename);
361 static void addReply(redisClient *c, robj *obj);
362 static void addReplySds(redisClient *c, sds s);
363 static void incrRefCount(robj *o);
364 static int rdbSaveBackground(char *filename);
365 static robj *createStringObject(char *ptr, size_t len);
366 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
367 static int syncWithMaster(void);
368 static robj *tryObjectSharing(robj *o);
369 static int tryObjectEncoding(robj *o);
370 static robj *getDecodedObject(const robj *o);
371 static int removeExpire(redisDb *db, robj *key);
372 static int expireIfNeeded(redisDb *db, robj *key);
373 static int deleteIfVolatile(redisDb *db, robj *key);
374 static int deleteKey(redisDb *db, robj *key);
375 static time_t getExpire(redisDb *db, robj *key);
376 static int setExpire(redisDb *db, robj *key, time_t when);
377 static void updateSlavesWaitingBgsave(int bgsaveerr);
378 static void freeMemoryIfNeeded(void);
379 static int processCommand(redisClient *c);
380 static void setupSigSegvAction(void);
381 static void rdbRemoveTempFile(pid_t childpid);
382 static size_t stringObjectLen(robj *o);
383 static void processInputBuffer(redisClient *c);
384 static zskiplist *zslCreate(void);
385 static void zslFree(zskiplist *zsl);
386 static void zslInsert(zskiplist *zsl, double score, robj *obj);
387
388 static void authCommand(redisClient *c);
389 static void pingCommand(redisClient *c);
390 static void echoCommand(redisClient *c);
391 static void setCommand(redisClient *c);
392 static void setnxCommand(redisClient *c);
393 static void getCommand(redisClient *c);
394 static void delCommand(redisClient *c);
395 static void existsCommand(redisClient *c);
396 static void incrCommand(redisClient *c);
397 static void decrCommand(redisClient *c);
398 static void incrbyCommand(redisClient *c);
399 static void decrbyCommand(redisClient *c);
400 static void selectCommand(redisClient *c);
401 static void randomkeyCommand(redisClient *c);
402 static void keysCommand(redisClient *c);
403 static void dbsizeCommand(redisClient *c);
404 static void lastsaveCommand(redisClient *c);
405 static void saveCommand(redisClient *c);
406 static void bgsaveCommand(redisClient *c);
407 static void shutdownCommand(redisClient *c);
408 static void moveCommand(redisClient *c);
409 static void renameCommand(redisClient *c);
410 static void renamenxCommand(redisClient *c);
411 static void lpushCommand(redisClient *c);
412 static void rpushCommand(redisClient *c);
413 static void lpopCommand(redisClient *c);
414 static void rpopCommand(redisClient *c);
415 static void llenCommand(redisClient *c);
416 static void lindexCommand(redisClient *c);
417 static void lrangeCommand(redisClient *c);
418 static void ltrimCommand(redisClient *c);
419 static void typeCommand(redisClient *c);
420 static void lsetCommand(redisClient *c);
421 static void saddCommand(redisClient *c);
422 static void sremCommand(redisClient *c);
423 static void smoveCommand(redisClient *c);
424 static void sismemberCommand(redisClient *c);
425 static void scardCommand(redisClient *c);
426 static void spopCommand(redisClient *c);
427 static void srandmemberCommand(redisClient *c);
428 static void sinterCommand(redisClient *c);
429 static void sinterstoreCommand(redisClient *c);
430 static void sunionCommand(redisClient *c);
431 static void sunionstoreCommand(redisClient *c);
432 static void sdiffCommand(redisClient *c);
433 static void sdiffstoreCommand(redisClient *c);
434 static void syncCommand(redisClient *c);
435 static void flushdbCommand(redisClient *c);
436 static void flushallCommand(redisClient *c);
437 static void sortCommand(redisClient *c);
438 static void lremCommand(redisClient *c);
439 static void infoCommand(redisClient *c);
440 static void mgetCommand(redisClient *c);
441 static void monitorCommand(redisClient *c);
442 static void expireCommand(redisClient *c);
443 static void expireatCommand(redisClient *c);
444 static void getsetCommand(redisClient *c);
445 static void ttlCommand(redisClient *c);
446 static void slaveofCommand(redisClient *c);
447 static void debugCommand(redisClient *c);
448 static void msetCommand(redisClient *c);
449 static void msetnxCommand(redisClient *c);
450 static void zaddCommand(redisClient *c);
451 static void zrangeCommand(redisClient *c);
452 static void zrangebyscoreCommand(redisClient *c);
453 static void zrevrangeCommand(redisClient *c);
454 static void zcardCommand(redisClient *c);
455 static void zremCommand(redisClient *c);
456 static void zscoreCommand(redisClient *c);
457 static void zremrangebyscoreCommand(redisClient *c);
458
459 /*================================= Globals ================================= */
460
461 /* Global vars */
462 static struct redisServer server; /* server global state */
463 static struct redisCommand cmdTable[] = {
464 {"get",getCommand,2,REDIS_CMD_INLINE},
465 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
466 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
467 {"del",delCommand,-2,REDIS_CMD_INLINE},
468 {"exists",existsCommand,2,REDIS_CMD_INLINE},
469 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
470 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
471 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
472 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
473 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
474 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
475 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
476 {"llen",llenCommand,2,REDIS_CMD_INLINE},
477 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
478 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
479 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
480 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
481 {"lrem",lremCommand,4,REDIS_CMD_BULK},
482 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
483 {"srem",sremCommand,3,REDIS_CMD_BULK},
484 {"smove",smoveCommand,4,REDIS_CMD_BULK},
485 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
486 {"scard",scardCommand,2,REDIS_CMD_INLINE},
487 {"spop",spopCommand,2,REDIS_CMD_INLINE},
488 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
489 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
490 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
491 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
494 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
495 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
496 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"zrem",zremCommand,3,REDIS_CMD_BULK},
498 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
499 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
500 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
501 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
502 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
503 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
504 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
506 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
507 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
508 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
510 {"select",selectCommand,2,REDIS_CMD_INLINE},
511 {"move",moveCommand,3,REDIS_CMD_INLINE},
512 {"rename",renameCommand,3,REDIS_CMD_INLINE},
513 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
514 {"expire",expireCommand,3,REDIS_CMD_INLINE},
515 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
516 {"keys",keysCommand,2,REDIS_CMD_INLINE},
517 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
518 {"auth",authCommand,2,REDIS_CMD_INLINE},
519 {"ping",pingCommand,1,REDIS_CMD_INLINE},
520 {"echo",echoCommand,2,REDIS_CMD_BULK},
521 {"save",saveCommand,1,REDIS_CMD_INLINE},
522 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
523 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
524 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
525 {"type",typeCommand,2,REDIS_CMD_INLINE},
526 {"sync",syncCommand,1,REDIS_CMD_INLINE},
527 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
528 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
529 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"info",infoCommand,1,REDIS_CMD_INLINE},
531 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
532 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
533 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
534 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
535 {NULL,NULL,0,0}
536 };
537 /*============================ Utility functions ============================ */
538
539 /* Glob-style pattern matching. */
540 int stringmatchlen(const char *pattern, int patternLen,
541 const char *string, int stringLen, int nocase)
542 {
543 while(patternLen) {
544 switch(pattern[0]) {
545 case '*':
546 while (pattern[1] == '*') {
547 pattern++;
548 patternLen--;
549 }
550 if (patternLen == 1)
551 return 1; /* match */
552 while(stringLen) {
553 if (stringmatchlen(pattern+1, patternLen-1,
554 string, stringLen, nocase))
555 return 1; /* match */
556 string++;
557 stringLen--;
558 }
559 return 0; /* no match */
560 break;
561 case '?':
562 if (stringLen == 0)
563 return 0; /* no match */
564 string++;
565 stringLen--;
566 break;
567 case '[':
568 {
569 int not, match;
570
571 pattern++;
572 patternLen--;
573 not = pattern[0] == '^';
574 if (not) {
575 pattern++;
576 patternLen--;
577 }
578 match = 0;
579 while(1) {
580 if (pattern[0] == '\\') {
581 pattern++;
582 patternLen--;
583 if (pattern[0] == string[0])
584 match = 1;
585 } else if (pattern[0] == ']') {
586 break;
587 } else if (patternLen == 0) {
588 pattern--;
589 patternLen++;
590 break;
591 } else if (pattern[1] == '-' && patternLen >= 3) {
592 int start = pattern[0];
593 int end = pattern[2];
594 int c = string[0];
595 if (start > end) {
596 int t = start;
597 start = end;
598 end = t;
599 }
600 if (nocase) {
601 start = tolower(start);
602 end = tolower(end);
603 c = tolower(c);
604 }
605 pattern += 2;
606 patternLen -= 2;
607 if (c >= start && c <= end)
608 match = 1;
609 } else {
610 if (!nocase) {
611 if (pattern[0] == string[0])
612 match = 1;
613 } else {
614 if (tolower((int)pattern[0]) == tolower((int)string[0]))
615 match = 1;
616 }
617 }
618 pattern++;
619 patternLen--;
620 }
621 if (not)
622 match = !match;
623 if (!match)
624 return 0; /* no match */
625 string++;
626 stringLen--;
627 break;
628 }
629 case '\\':
630 if (patternLen >= 2) {
631 pattern++;
632 patternLen--;
633 }
634 /* fall through */
635 default:
636 if (!nocase) {
637 if (pattern[0] != string[0])
638 return 0; /* no match */
639 } else {
640 if (tolower((int)pattern[0]) != tolower((int)string[0]))
641 return 0; /* no match */
642 }
643 string++;
644 stringLen--;
645 break;
646 }
647 pattern++;
648 patternLen--;
649 if (stringLen == 0) {
650 while(*pattern == '*') {
651 pattern++;
652 patternLen--;
653 }
654 break;
655 }
656 }
657 if (patternLen == 0 && stringLen == 0)
658 return 1;
659 return 0;
660 }
661
662 static void redisLog(int level, const char *fmt, ...) {
663 va_list ap;
664 FILE *fp;
665
666 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
667 if (!fp) return;
668
669 va_start(ap, fmt);
670 if (level >= server.verbosity) {
671 char *c = ".-*";
672 char buf[64];
673 time_t now;
674
675 now = time(NULL);
676 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
677 fprintf(fp,"%s %c ",buf,c[level]);
678 vfprintf(fp, fmt, ap);
679 fprintf(fp,"\n");
680 fflush(fp);
681 }
682 va_end(ap);
683
684 if (server.logfile) fclose(fp);
685 }
686
687 /*====================== Hash table type implementation ==================== */
688
689 /* This is an hash table type that uses the SDS dynamic strings libary as
690 * keys and radis objects as values (objects can hold SDS strings,
691 * lists, sets). */
692
693 static void dictVanillaFree(void *privdata, void *val)
694 {
695 DICT_NOTUSED(privdata);
696 zfree(val);
697 }
698
699 static int sdsDictKeyCompare(void *privdata, const void *key1,
700 const void *key2)
701 {
702 int l1,l2;
703 DICT_NOTUSED(privdata);
704
705 l1 = sdslen((sds)key1);
706 l2 = sdslen((sds)key2);
707 if (l1 != l2) return 0;
708 return memcmp(key1, key2, l1) == 0;
709 }
710
711 static void dictRedisObjectDestructor(void *privdata, void *val)
712 {
713 DICT_NOTUSED(privdata);
714
715 decrRefCount(val);
716 }
717
718 static int dictObjKeyCompare(void *privdata, const void *key1,
719 const void *key2)
720 {
721 const robj *o1 = key1, *o2 = key2;
722 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
723 }
724
725 static unsigned int dictObjHash(const void *key) {
726 const robj *o = key;
727 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
728 }
729
730 static int dictEncObjKeyCompare(void *privdata, const void *key1,
731 const void *key2)
732 {
733 const robj *o1 = key1, *o2 = key2;
734
735 if (o1->encoding == REDIS_ENCODING_RAW &&
736 o2->encoding == REDIS_ENCODING_RAW)
737 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
738 else {
739 robj *dec1, *dec2;
740 int cmp;
741
742 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
743 getDecodedObject(o1) : (robj*)o1;
744 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
745 getDecodedObject(o2) : (robj*)o2;
746 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
747 if (dec1 != o1) decrRefCount(dec1);
748 if (dec2 != o2) decrRefCount(dec2);
749 return cmp;
750 }
751 }
752
753 static unsigned int dictEncObjHash(const void *key) {
754 const robj *o = key;
755
756 if (o->encoding == REDIS_ENCODING_RAW)
757 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
758 else {
759 robj *dec = getDecodedObject(o);
760 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
761 decrRefCount(dec);
762 return hash;
763 }
764 }
765
766 static dictType setDictType = {
767 dictEncObjHash, /* hash function */
768 NULL, /* key dup */
769 NULL, /* val dup */
770 dictEncObjKeyCompare, /* key compare */
771 dictRedisObjectDestructor, /* key destructor */
772 NULL /* val destructor */
773 };
774
775 static dictType zsetDictType = {
776 dictEncObjHash, /* hash function */
777 NULL, /* key dup */
778 NULL, /* val dup */
779 dictEncObjKeyCompare, /* key compare */
780 dictRedisObjectDestructor, /* key destructor */
781 dictVanillaFree /* val destructor */
782 };
783
784 static dictType hashDictType = {
785 dictObjHash, /* hash function */
786 NULL, /* key dup */
787 NULL, /* val dup */
788 dictObjKeyCompare, /* key compare */
789 dictRedisObjectDestructor, /* key destructor */
790 dictRedisObjectDestructor /* val destructor */
791 };
792
793 /* ========================= Random utility functions ======================= */
794
795 /* Redis generally does not try to recover from out of memory conditions
796 * when allocating objects or strings, it is not clear if it will be possible
797 * to report this condition to the client since the networking layer itself
798 * is based on heap allocation for send buffers, so we simply abort.
799 * At least the code will be simpler to read... */
800 static void oom(const char *msg) {
801 fprintf(stderr, "%s: Out of memory\n",msg);
802 fflush(stderr);
803 sleep(1);
804 abort();
805 }
806
807 /* ====================== Redis server networking stuff ===================== */
808 static void closeTimedoutClients(void) {
809 redisClient *c;
810 listNode *ln;
811 time_t now = time(NULL);
812
813 listRewind(server.clients);
814 while ((ln = listYield(server.clients)) != NULL) {
815 c = listNodeValue(ln);
816 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
817 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
818 (now - c->lastinteraction > server.maxidletime)) {
819 redisLog(REDIS_DEBUG,"Closing idle client");
820 freeClient(c);
821 }
822 }
823 }
824
825 static int htNeedsResize(dict *dict) {
826 long long size, used;
827
828 size = dictSlots(dict);
829 used = dictSize(dict);
830 return (size && used && size > DICT_HT_INITIAL_SIZE &&
831 (used*100/size < REDIS_HT_MINFILL));
832 }
833
834 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
835 * we resize the hash table to save memory */
836 static void tryResizeHashTables(void) {
837 int j;
838
839 for (j = 0; j < server.dbnum; j++) {
840 if (htNeedsResize(server.db[j].dict)) {
841 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
842 dictResize(server.db[j].dict);
843 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
844 }
845 if (htNeedsResize(server.db[j].expires))
846 dictResize(server.db[j].expires);
847 }
848 }
849
850 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
851 int j, loops = server.cronloops++;
852 REDIS_NOTUSED(eventLoop);
853 REDIS_NOTUSED(id);
854 REDIS_NOTUSED(clientData);
855
856 /* Update the global state with the amount of used memory */
857 server.usedmemory = zmalloc_used_memory();
858
859 /* Show some info about non-empty databases */
860 for (j = 0; j < server.dbnum; j++) {
861 long long size, used, vkeys;
862
863 size = dictSlots(server.db[j].dict);
864 used = dictSize(server.db[j].dict);
865 vkeys = dictSize(server.db[j].expires);
866 if (!(loops % 5) && (used || vkeys)) {
867 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
868 /* dictPrintStats(server.dict); */
869 }
870 }
871
872 /* We don't want to resize the hash tables while a bacground saving
873 * is in progress: the saving child is created using fork() that is
874 * implemented with a copy-on-write semantic in most modern systems, so
875 * if we resize the HT while there is the saving child at work actually
876 * a lot of memory movements in the parent will cause a lot of pages
877 * copied. */
878 if (!server.bgsaveinprogress) tryResizeHashTables();
879
880 /* Show information about connected clients */
881 if (!(loops % 5)) {
882 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
883 listLength(server.clients)-listLength(server.slaves),
884 listLength(server.slaves),
885 server.usedmemory,
886 dictSize(server.sharingpool));
887 }
888
889 /* Close connections of timedout clients */
890 if (server.maxidletime && !(loops % 10))
891 closeTimedoutClients();
892
893 /* Check if a background saving in progress terminated */
894 if (server.bgsaveinprogress) {
895 int statloc;
896 if (wait4(-1,&statloc,WNOHANG,NULL)) {
897 int exitcode = WEXITSTATUS(statloc);
898 int bysignal = WIFSIGNALED(statloc);
899
900 if (!bysignal && exitcode == 0) {
901 redisLog(REDIS_NOTICE,
902 "Background saving terminated with success");
903 server.dirty = 0;
904 server.lastsave = time(NULL);
905 } else if (!bysignal && exitcode != 0) {
906 redisLog(REDIS_WARNING, "Background saving error");
907 } else {
908 redisLog(REDIS_WARNING,
909 "Background saving terminated by signal");
910 rdbRemoveTempFile(server.bgsavechildpid);
911 }
912 server.bgsaveinprogress = 0;
913 server.bgsavechildpid = -1;
914 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
915 }
916 } else {
917 /* If there is not a background saving in progress check if
918 * we have to save now */
919 time_t now = time(NULL);
920 for (j = 0; j < server.saveparamslen; j++) {
921 struct saveparam *sp = server.saveparams+j;
922
923 if (server.dirty >= sp->changes &&
924 now-server.lastsave > sp->seconds) {
925 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
926 sp->changes, sp->seconds);
927 rdbSaveBackground(server.dbfilename);
928 break;
929 }
930 }
931 }
932
933 /* Try to expire a few timed out keys */
934 for (j = 0; j < server.dbnum; j++) {
935 redisDb *db = server.db+j;
936 int num = dictSize(db->expires);
937
938 if (num) {
939 time_t now = time(NULL);
940
941 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
942 num = REDIS_EXPIRELOOKUPS_PER_CRON;
943 while (num--) {
944 dictEntry *de;
945 time_t t;
946
947 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
948 t = (time_t) dictGetEntryVal(de);
949 if (now > t) {
950 deleteKey(db,dictGetEntryKey(de));
951 }
952 }
953 }
954 }
955
956 /* Check if we should connect to a MASTER */
957 if (server.replstate == REDIS_REPL_CONNECT) {
958 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
959 if (syncWithMaster() == REDIS_OK) {
960 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
961 }
962 }
963 return 1000;
964 }
965
966 static void createSharedObjects(void) {
967 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
968 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
969 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
970 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
971 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
972 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
973 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
974 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
975 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
976 /* no such key */
977 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
978 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
979 "-ERR Operation against a key holding the wrong kind of value\r\n"));
980 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
981 "-ERR no such key\r\n"));
982 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
983 "-ERR syntax error\r\n"));
984 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
985 "-ERR source and destination objects are the same\r\n"));
986 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
987 "-ERR index out of range\r\n"));
988 shared.space = createObject(REDIS_STRING,sdsnew(" "));
989 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
990 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
991 shared.select0 = createStringObject("select 0\r\n",10);
992 shared.select1 = createStringObject("select 1\r\n",10);
993 shared.select2 = createStringObject("select 2\r\n",10);
994 shared.select3 = createStringObject("select 3\r\n",10);
995 shared.select4 = createStringObject("select 4\r\n",10);
996 shared.select5 = createStringObject("select 5\r\n",10);
997 shared.select6 = createStringObject("select 6\r\n",10);
998 shared.select7 = createStringObject("select 7\r\n",10);
999 shared.select8 = createStringObject("select 8\r\n",10);
1000 shared.select9 = createStringObject("select 9\r\n",10);
1001 }
1002
1003 static void appendServerSaveParams(time_t seconds, int changes) {
1004 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1005 server.saveparams[server.saveparamslen].seconds = seconds;
1006 server.saveparams[server.saveparamslen].changes = changes;
1007 server.saveparamslen++;
1008 }
1009
1010 static void ResetServerSaveParams() {
1011 zfree(server.saveparams);
1012 server.saveparams = NULL;
1013 server.saveparamslen = 0;
1014 }
1015
1016 static void initServerConfig() {
1017 server.dbnum = REDIS_DEFAULT_DBNUM;
1018 server.port = REDIS_SERVERPORT;
1019 server.verbosity = REDIS_DEBUG;
1020 server.maxidletime = REDIS_MAXIDLETIME;
1021 server.saveparams = NULL;
1022 server.logfile = NULL; /* NULL = log on standard output */
1023 server.bindaddr = NULL;
1024 server.glueoutputbuf = 1;
1025 server.daemonize = 0;
1026 server.pidfile = "/var/run/redis.pid";
1027 server.dbfilename = "dump.rdb";
1028 server.requirepass = NULL;
1029 server.shareobjects = 0;
1030 server.sharingpoolsize = 1024;
1031 server.maxclients = 0;
1032 server.maxmemory = 0;
1033 ResetServerSaveParams();
1034
1035 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1036 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1037 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1038 /* Replication related */
1039 server.isslave = 0;
1040 server.masterhost = NULL;
1041 server.masterport = 6379;
1042 server.master = NULL;
1043 server.replstate = REDIS_REPL_NONE;
1044
1045 /* Double constants initialization */
1046 R_Zero = 0.0;
1047 R_PosInf = 1.0/R_Zero;
1048 R_NegInf = -1.0/R_Zero;
1049 R_Nan = R_Zero/R_Zero;
1050 }
1051
1052 static void initServer() {
1053 int j;
1054
1055 signal(SIGHUP, SIG_IGN);
1056 signal(SIGPIPE, SIG_IGN);
1057 setupSigSegvAction();
1058
1059 server.clients = listCreate();
1060 server.slaves = listCreate();
1061 server.monitors = listCreate();
1062 server.objfreelist = listCreate();
1063 createSharedObjects();
1064 server.el = aeCreateEventLoop();
1065 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1066 server.sharingpool = dictCreate(&setDictType,NULL);
1067 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1068 if (server.fd == -1) {
1069 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1070 exit(1);
1071 }
1072 for (j = 0; j < server.dbnum; j++) {
1073 server.db[j].dict = dictCreate(&hashDictType,NULL);
1074 server.db[j].expires = dictCreate(&setDictType,NULL);
1075 server.db[j].id = j;
1076 }
1077 server.cronloops = 0;
1078 server.bgsaveinprogress = 0;
1079 server.bgsavechildpid = -1;
1080 server.lastsave = time(NULL);
1081 server.dirty = 0;
1082 server.usedmemory = 0;
1083 server.stat_numcommands = 0;
1084 server.stat_numconnections = 0;
1085 server.stat_starttime = time(NULL);
1086 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1087 }
1088
1089 /* Empty the whole database */
1090 static long long emptyDb() {
1091 int j;
1092 long long removed = 0;
1093
1094 for (j = 0; j < server.dbnum; j++) {
1095 removed += dictSize(server.db[j].dict);
1096 dictEmpty(server.db[j].dict);
1097 dictEmpty(server.db[j].expires);
1098 }
1099 return removed;
1100 }
1101
1102 static int yesnotoi(char *s) {
1103 if (!strcasecmp(s,"yes")) return 1;
1104 else if (!strcasecmp(s,"no")) return 0;
1105 else return -1;
1106 }
1107
1108 /* I agree, this is a very rudimental way to load a configuration...
1109 will improve later if the config gets more complex */
1110 static void loadServerConfig(char *filename) {
1111 FILE *fp;
1112 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1113 int linenum = 0;
1114 sds line = NULL;
1115
1116 if (filename[0] == '-' && filename[1] == '\0')
1117 fp = stdin;
1118 else {
1119 if ((fp = fopen(filename,"r")) == NULL) {
1120 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1121 exit(1);
1122 }
1123 }
1124
1125 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1126 sds *argv;
1127 int argc, j;
1128
1129 linenum++;
1130 line = sdsnew(buf);
1131 line = sdstrim(line," \t\r\n");
1132
1133 /* Skip comments and blank lines*/
1134 if (line[0] == '#' || line[0] == '\0') {
1135 sdsfree(line);
1136 continue;
1137 }
1138
1139 /* Split into arguments */
1140 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1141 sdstolower(argv[0]);
1142
1143 /* Execute config directives */
1144 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1145 server.maxidletime = atoi(argv[1]);
1146 if (server.maxidletime < 0) {
1147 err = "Invalid timeout value"; goto loaderr;
1148 }
1149 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1150 server.port = atoi(argv[1]);
1151 if (server.port < 1 || server.port > 65535) {
1152 err = "Invalid port"; goto loaderr;
1153 }
1154 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1155 server.bindaddr = zstrdup(argv[1]);
1156 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1157 int seconds = atoi(argv[1]);
1158 int changes = atoi(argv[2]);
1159 if (seconds < 1 || changes < 0) {
1160 err = "Invalid save parameters"; goto loaderr;
1161 }
1162 appendServerSaveParams(seconds,changes);
1163 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1164 if (chdir(argv[1]) == -1) {
1165 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1166 argv[1], strerror(errno));
1167 exit(1);
1168 }
1169 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1170 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1171 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1172 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1173 else {
1174 err = "Invalid log level. Must be one of debug, notice, warning";
1175 goto loaderr;
1176 }
1177 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1178 FILE *logfp;
1179
1180 server.logfile = zstrdup(argv[1]);
1181 if (!strcasecmp(server.logfile,"stdout")) {
1182 zfree(server.logfile);
1183 server.logfile = NULL;
1184 }
1185 if (server.logfile) {
1186 /* Test if we are able to open the file. The server will not
1187 * be able to abort just for this problem later... */
1188 logfp = fopen(server.logfile,"a");
1189 if (logfp == NULL) {
1190 err = sdscatprintf(sdsempty(),
1191 "Can't open the log file: %s", strerror(errno));
1192 goto loaderr;
1193 }
1194 fclose(logfp);
1195 }
1196 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1197 server.dbnum = atoi(argv[1]);
1198 if (server.dbnum < 1) {
1199 err = "Invalid number of databases"; goto loaderr;
1200 }
1201 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1202 server.maxclients = atoi(argv[1]);
1203 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1204 server.maxmemory = strtoll(argv[1], NULL, 10);
1205 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1206 server.masterhost = sdsnew(argv[1]);
1207 server.masterport = atoi(argv[2]);
1208 server.replstate = REDIS_REPL_CONNECT;
1209 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1210 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1211 err = "argument must be 'yes' or 'no'"; goto loaderr;
1212 }
1213 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1214 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1215 err = "argument must be 'yes' or 'no'"; goto loaderr;
1216 }
1217 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1218 server.sharingpoolsize = atoi(argv[1]);
1219 if (server.sharingpoolsize < 1) {
1220 err = "invalid object sharing pool size"; goto loaderr;
1221 }
1222 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1223 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1224 err = "argument must be 'yes' or 'no'"; goto loaderr;
1225 }
1226 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1227 server.requirepass = zstrdup(argv[1]);
1228 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1229 server.pidfile = zstrdup(argv[1]);
1230 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1231 server.dbfilename = zstrdup(argv[1]);
1232 } else {
1233 err = "Bad directive or wrong number of arguments"; goto loaderr;
1234 }
1235 for (j = 0; j < argc; j++)
1236 sdsfree(argv[j]);
1237 zfree(argv);
1238 sdsfree(line);
1239 }
1240 if (fp != stdin) fclose(fp);
1241 return;
1242
1243 loaderr:
1244 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1245 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1246 fprintf(stderr, ">>> '%s'\n", line);
1247 fprintf(stderr, "%s\n", err);
1248 exit(1);
1249 }
1250
1251 static void freeClientArgv(redisClient *c) {
1252 int j;
1253
1254 for (j = 0; j < c->argc; j++)
1255 decrRefCount(c->argv[j]);
1256 for (j = 0; j < c->mbargc; j++)
1257 decrRefCount(c->mbargv[j]);
1258 c->argc = 0;
1259 c->mbargc = 0;
1260 }
1261
1262 static void freeClient(redisClient *c) {
1263 listNode *ln;
1264
1265 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1266 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1267 sdsfree(c->querybuf);
1268 listRelease(c->reply);
1269 freeClientArgv(c);
1270 close(c->fd);
1271 ln = listSearchKey(server.clients,c);
1272 assert(ln != NULL);
1273 listDelNode(server.clients,ln);
1274 if (c->flags & REDIS_SLAVE) {
1275 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1276 close(c->repldbfd);
1277 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1278 ln = listSearchKey(l,c);
1279 assert(ln != NULL);
1280 listDelNode(l,ln);
1281 }
1282 if (c->flags & REDIS_MASTER) {
1283 server.master = NULL;
1284 server.replstate = REDIS_REPL_CONNECT;
1285 }
1286 zfree(c->argv);
1287 zfree(c->mbargv);
1288 zfree(c);
1289 }
1290
1291 static void glueReplyBuffersIfNeeded(redisClient *c) {
1292 int totlen = 0;
1293 listNode *ln;
1294 robj *o;
1295
1296 listRewind(c->reply);
1297 while((ln = listYield(c->reply))) {
1298 o = ln->value;
1299 totlen += sdslen(o->ptr);
1300 /* This optimization makes more sense if we don't have to copy
1301 * too much data */
1302 if (totlen > 1024) return;
1303 }
1304 if (totlen > 0) {
1305 char buf[1024];
1306 int copylen = 0;
1307
1308 listRewind(c->reply);
1309 while((ln = listYield(c->reply))) {
1310 o = ln->value;
1311 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1312 copylen += sdslen(o->ptr);
1313 listDelNode(c->reply,ln);
1314 }
1315 /* Now the output buffer is empty, add the new single element */
1316 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1317 listAddNodeTail(c->reply,o);
1318 }
1319 }
1320
1321 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1322 redisClient *c = privdata;
1323 int nwritten = 0, totwritten = 0, objlen;
1324 robj *o;
1325 REDIS_NOTUSED(el);
1326 REDIS_NOTUSED(mask);
1327
1328 if (server.glueoutputbuf && listLength(c->reply) > 1)
1329 glueReplyBuffersIfNeeded(c);
1330 while(listLength(c->reply)) {
1331 o = listNodeValue(listFirst(c->reply));
1332 objlen = sdslen(o->ptr);
1333
1334 if (objlen == 0) {
1335 listDelNode(c->reply,listFirst(c->reply));
1336 continue;
1337 }
1338
1339 if (c->flags & REDIS_MASTER) {
1340 /* Don't reply to a master */
1341 nwritten = objlen - c->sentlen;
1342 } else {
1343 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1344 if (nwritten <= 0) break;
1345 }
1346 c->sentlen += nwritten;
1347 totwritten += nwritten;
1348 /* If we fully sent the object on head go to the next one */
1349 if (c->sentlen == objlen) {
1350 listDelNode(c->reply,listFirst(c->reply));
1351 c->sentlen = 0;
1352 }
1353 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1354 * bytes, in a single threaded server it's a good idea to server
1355 * other clients as well, even if a very large request comes from
1356 * super fast link that is always able to accept data (in real world
1357 * terms think to 'KEYS *' against the loopback interfae) */
1358 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1359 }
1360 if (nwritten == -1) {
1361 if (errno == EAGAIN) {
1362 nwritten = 0;
1363 } else {
1364 redisLog(REDIS_DEBUG,
1365 "Error writing to client: %s", strerror(errno));
1366 freeClient(c);
1367 return;
1368 }
1369 }
1370 if (totwritten > 0) c->lastinteraction = time(NULL);
1371 if (listLength(c->reply) == 0) {
1372 c->sentlen = 0;
1373 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1374 }
1375 }
1376
1377 static struct redisCommand *lookupCommand(char *name) {
1378 int j = 0;
1379 while(cmdTable[j].name != NULL) {
1380 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1381 j++;
1382 }
1383 return NULL;
1384 }
1385
1386 /* resetClient prepare the client to process the next command */
1387 static void resetClient(redisClient *c) {
1388 freeClientArgv(c);
1389 c->bulklen = -1;
1390 c->multibulk = 0;
1391 }
1392
1393 /* If this function gets called we already read a whole
1394 * command, argments are in the client argv/argc fields.
1395 * processCommand() execute the command or prepare the
1396 * server for a bulk read from the client.
1397 *
1398 * If 1 is returned the client is still alive and valid and
1399 * and other operations can be performed by the caller. Otherwise
1400 * if 0 is returned the client was destroied (i.e. after QUIT). */
1401 static int processCommand(redisClient *c) {
1402 struct redisCommand *cmd;
1403 long long dirty;
1404
1405 /* Free some memory if needed (maxmemory setting) */
1406 if (server.maxmemory) freeMemoryIfNeeded();
1407
1408 /* Handle the multi bulk command type. This is an alternative protocol
1409 * supported by Redis in order to receive commands that are composed of
1410 * multiple binary-safe "bulk" arguments. The latency of processing is
1411 * a bit higher but this allows things like multi-sets, so if this
1412 * protocol is used only for MSET and similar commands this is a big win. */
1413 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1414 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1415 if (c->multibulk <= 0) {
1416 resetClient(c);
1417 return 1;
1418 } else {
1419 decrRefCount(c->argv[c->argc-1]);
1420 c->argc--;
1421 return 1;
1422 }
1423 } else if (c->multibulk) {
1424 if (c->bulklen == -1) {
1425 if (((char*)c->argv[0]->ptr)[0] != '$') {
1426 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1427 resetClient(c);
1428 return 1;
1429 } else {
1430 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1431 decrRefCount(c->argv[0]);
1432 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1433 c->argc--;
1434 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1435 resetClient(c);
1436 return 1;
1437 }
1438 c->argc--;
1439 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1440 return 1;
1441 }
1442 } else {
1443 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1444 c->mbargv[c->mbargc] = c->argv[0];
1445 c->mbargc++;
1446 c->argc--;
1447 c->multibulk--;
1448 if (c->multibulk == 0) {
1449 robj **auxargv;
1450 int auxargc;
1451
1452 /* Here we need to swap the multi-bulk argc/argv with the
1453 * normal argc/argv of the client structure. */
1454 auxargv = c->argv;
1455 c->argv = c->mbargv;
1456 c->mbargv = auxargv;
1457
1458 auxargc = c->argc;
1459 c->argc = c->mbargc;
1460 c->mbargc = auxargc;
1461
1462 /* We need to set bulklen to something different than -1
1463 * in order for the code below to process the command without
1464 * to try to read the last argument of a bulk command as
1465 * a special argument. */
1466 c->bulklen = 0;
1467 /* continue below and process the command */
1468 } else {
1469 c->bulklen = -1;
1470 return 1;
1471 }
1472 }
1473 }
1474 /* -- end of multi bulk commands processing -- */
1475
1476 /* The QUIT command is handled as a special case. Normal command
1477 * procs are unable to close the client connection safely */
1478 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1479 freeClient(c);
1480 return 0;
1481 }
1482 cmd = lookupCommand(c->argv[0]->ptr);
1483 if (!cmd) {
1484 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1485 resetClient(c);
1486 return 1;
1487 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1488 (c->argc < -cmd->arity)) {
1489 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1490 resetClient(c);
1491 return 1;
1492 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1493 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1494 resetClient(c);
1495 return 1;
1496 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1497 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1498
1499 decrRefCount(c->argv[c->argc-1]);
1500 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1501 c->argc--;
1502 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1503 resetClient(c);
1504 return 1;
1505 }
1506 c->argc--;
1507 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1508 /* It is possible that the bulk read is already in the
1509 * buffer. Check this condition and handle it accordingly.
1510 * This is just a fast path, alternative to call processInputBuffer().
1511 * It's a good idea since the code is small and this condition
1512 * happens most of the times. */
1513 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1514 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1515 c->argc++;
1516 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1517 } else {
1518 return 1;
1519 }
1520 }
1521 /* Let's try to share objects on the command arguments vector */
1522 if (server.shareobjects) {
1523 int j;
1524 for(j = 1; j < c->argc; j++)
1525 c->argv[j] = tryObjectSharing(c->argv[j]);
1526 }
1527 /* Let's try to encode the bulk object to save space. */
1528 if (cmd->flags & REDIS_CMD_BULK)
1529 tryObjectEncoding(c->argv[c->argc-1]);
1530
1531 /* Check if the user is authenticated */
1532 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1533 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1534 resetClient(c);
1535 return 1;
1536 }
1537
1538 /* Exec the command */
1539 dirty = server.dirty;
1540 cmd->proc(c);
1541 if (server.dirty-dirty != 0 && listLength(server.slaves))
1542 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1543 if (listLength(server.monitors))
1544 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1545 server.stat_numcommands++;
1546
1547 /* Prepare the client for the next command */
1548 if (c->flags & REDIS_CLOSE) {
1549 freeClient(c);
1550 return 0;
1551 }
1552 resetClient(c);
1553 return 1;
1554 }
1555
1556 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1557 listNode *ln;
1558 int outc = 0, j;
1559 robj **outv;
1560 /* (args*2)+1 is enough room for args, spaces, newlines */
1561 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1562
1563 if (argc <= REDIS_STATIC_ARGS) {
1564 outv = static_outv;
1565 } else {
1566 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1567 }
1568
1569 for (j = 0; j < argc; j++) {
1570 if (j != 0) outv[outc++] = shared.space;
1571 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1572 robj *lenobj;
1573
1574 lenobj = createObject(REDIS_STRING,
1575 sdscatprintf(sdsempty(),"%d\r\n",
1576 stringObjectLen(argv[j])));
1577 lenobj->refcount = 0;
1578 outv[outc++] = lenobj;
1579 }
1580 outv[outc++] = argv[j];
1581 }
1582 outv[outc++] = shared.crlf;
1583
1584 /* Increment all the refcounts at start and decrement at end in order to
1585 * be sure to free objects if there is no slave in a replication state
1586 * able to be feed with commands */
1587 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1588 listRewind(slaves);
1589 while((ln = listYield(slaves))) {
1590 redisClient *slave = ln->value;
1591
1592 /* Don't feed slaves that are still waiting for BGSAVE to start */
1593 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1594
1595 /* Feed all the other slaves, MONITORs and so on */
1596 if (slave->slaveseldb != dictid) {
1597 robj *selectcmd;
1598
1599 switch(dictid) {
1600 case 0: selectcmd = shared.select0; break;
1601 case 1: selectcmd = shared.select1; break;
1602 case 2: selectcmd = shared.select2; break;
1603 case 3: selectcmd = shared.select3; break;
1604 case 4: selectcmd = shared.select4; break;
1605 case 5: selectcmd = shared.select5; break;
1606 case 6: selectcmd = shared.select6; break;
1607 case 7: selectcmd = shared.select7; break;
1608 case 8: selectcmd = shared.select8; break;
1609 case 9: selectcmd = shared.select9; break;
1610 default:
1611 selectcmd = createObject(REDIS_STRING,
1612 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1613 selectcmd->refcount = 0;
1614 break;
1615 }
1616 addReply(slave,selectcmd);
1617 slave->slaveseldb = dictid;
1618 }
1619 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1620 }
1621 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1622 if (outv != static_outv) zfree(outv);
1623 }
1624
1625 static void processInputBuffer(redisClient *c) {
1626 again:
1627 if (c->bulklen == -1) {
1628 /* Read the first line of the query */
1629 char *p = strchr(c->querybuf,'\n');
1630 size_t querylen;
1631
1632 if (p) {
1633 sds query, *argv;
1634 int argc, j;
1635
1636 query = c->querybuf;
1637 c->querybuf = sdsempty();
1638 querylen = 1+(p-(query));
1639 if (sdslen(query) > querylen) {
1640 /* leave data after the first line of the query in the buffer */
1641 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1642 }
1643 *p = '\0'; /* remove "\n" */
1644 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1645 sdsupdatelen(query);
1646
1647 /* Now we can split the query in arguments */
1648 if (sdslen(query) == 0) {
1649 /* Ignore empty query */
1650 sdsfree(query);
1651 return;
1652 }
1653 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1654 sdsfree(query);
1655
1656 if (c->argv) zfree(c->argv);
1657 c->argv = zmalloc(sizeof(robj*)*argc);
1658
1659 for (j = 0; j < argc; j++) {
1660 if (sdslen(argv[j])) {
1661 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1662 c->argc++;
1663 } else {
1664 sdsfree(argv[j]);
1665 }
1666 }
1667 zfree(argv);
1668 /* Execute the command. If the client is still valid
1669 * after processCommand() return and there is something
1670 * on the query buffer try to process the next command. */
1671 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1672 return;
1673 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1674 redisLog(REDIS_DEBUG, "Client protocol error");
1675 freeClient(c);
1676 return;
1677 }
1678 } else {
1679 /* Bulk read handling. Note that if we are at this point
1680 the client already sent a command terminated with a newline,
1681 we are reading the bulk data that is actually the last
1682 argument of the command. */
1683 int qbl = sdslen(c->querybuf);
1684
1685 if (c->bulklen <= qbl) {
1686 /* Copy everything but the final CRLF as final argument */
1687 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1688 c->argc++;
1689 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1690 /* Process the command. If the client is still valid after
1691 * the processing and there is more data in the buffer
1692 * try to parse it. */
1693 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1694 return;
1695 }
1696 }
1697 }
1698
1699 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1700 redisClient *c = (redisClient*) privdata;
1701 char buf[REDIS_IOBUF_LEN];
1702 int nread;
1703 REDIS_NOTUSED(el);
1704 REDIS_NOTUSED(mask);
1705
1706 nread = read(fd, buf, REDIS_IOBUF_LEN);
1707 if (nread == -1) {
1708 if (errno == EAGAIN) {
1709 nread = 0;
1710 } else {
1711 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1712 freeClient(c);
1713 return;
1714 }
1715 } else if (nread == 0) {
1716 redisLog(REDIS_DEBUG, "Client closed connection");
1717 freeClient(c);
1718 return;
1719 }
1720 if (nread) {
1721 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1722 c->lastinteraction = time(NULL);
1723 } else {
1724 return;
1725 }
1726 processInputBuffer(c);
1727 }
1728
1729 static int selectDb(redisClient *c, int id) {
1730 if (id < 0 || id >= server.dbnum)
1731 return REDIS_ERR;
1732 c->db = &server.db[id];
1733 return REDIS_OK;
1734 }
1735
1736 static void *dupClientReplyValue(void *o) {
1737 incrRefCount((robj*)o);
1738 return 0;
1739 }
1740
1741 static redisClient *createClient(int fd) {
1742 redisClient *c = zmalloc(sizeof(*c));
1743
1744 anetNonBlock(NULL,fd);
1745 anetTcpNoDelay(NULL,fd);
1746 if (!c) return NULL;
1747 selectDb(c,0);
1748 c->fd = fd;
1749 c->querybuf = sdsempty();
1750 c->argc = 0;
1751 c->argv = NULL;
1752 c->bulklen = -1;
1753 c->multibulk = 0;
1754 c->mbargc = 0;
1755 c->mbargv = NULL;
1756 c->sentlen = 0;
1757 c->flags = 0;
1758 c->lastinteraction = time(NULL);
1759 c->authenticated = 0;
1760 c->replstate = REDIS_REPL_NONE;
1761 c->reply = listCreate();
1762 listSetFreeMethod(c->reply,decrRefCount);
1763 listSetDupMethod(c->reply,dupClientReplyValue);
1764 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1765 readQueryFromClient, c, NULL) == AE_ERR) {
1766 freeClient(c);
1767 return NULL;
1768 }
1769 listAddNodeTail(server.clients,c);
1770 return c;
1771 }
1772
1773 static void addReply(redisClient *c, robj *obj) {
1774 if (listLength(c->reply) == 0 &&
1775 (c->replstate == REDIS_REPL_NONE ||
1776 c->replstate == REDIS_REPL_ONLINE) &&
1777 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1778 sendReplyToClient, c, NULL) == AE_ERR) return;
1779 if (obj->encoding != REDIS_ENCODING_RAW) {
1780 obj = getDecodedObject(obj);
1781 } else {
1782 incrRefCount(obj);
1783 }
1784 listAddNodeTail(c->reply,obj);
1785 }
1786
1787 static void addReplySds(redisClient *c, sds s) {
1788 robj *o = createObject(REDIS_STRING,s);
1789 addReply(c,o);
1790 decrRefCount(o);
1791 }
1792
1793 static void addReplyBulkLen(redisClient *c, robj *obj) {
1794 size_t len;
1795
1796 if (obj->encoding == REDIS_ENCODING_RAW) {
1797 len = sdslen(obj->ptr);
1798 } else {
1799 long n = (long)obj->ptr;
1800
1801 len = 1;
1802 if (n < 0) {
1803 len++;
1804 n = -n;
1805 }
1806 while((n = n/10) != 0) {
1807 len++;
1808 }
1809 }
1810 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1811 }
1812
1813 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1814 int cport, cfd;
1815 char cip[128];
1816 redisClient *c;
1817 REDIS_NOTUSED(el);
1818 REDIS_NOTUSED(mask);
1819 REDIS_NOTUSED(privdata);
1820
1821 cfd = anetAccept(server.neterr, fd, cip, &cport);
1822 if (cfd == AE_ERR) {
1823 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1824 return;
1825 }
1826 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1827 if ((c = createClient(cfd)) == NULL) {
1828 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1829 close(cfd); /* May be already closed, just ingore errors */
1830 return;
1831 }
1832 /* If maxclient directive is set and this is one client more... close the
1833 * connection. Note that we create the client instead to check before
1834 * for this condition, since now the socket is already set in nonblocking
1835 * mode and we can send an error for free using the Kernel I/O */
1836 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1837 char *err = "-ERR max number of clients reached\r\n";
1838
1839 /* That's a best effort error message, don't check write errors */
1840 (void) write(c->fd,err,strlen(err));
1841 freeClient(c);
1842 return;
1843 }
1844 server.stat_numconnections++;
1845 }
1846
1847 /* ======================= Redis objects implementation ===================== */
1848
1849 static robj *createObject(int type, void *ptr) {
1850 robj *o;
1851
1852 if (listLength(server.objfreelist)) {
1853 listNode *head = listFirst(server.objfreelist);
1854 o = listNodeValue(head);
1855 listDelNode(server.objfreelist,head);
1856 } else {
1857 o = zmalloc(sizeof(*o));
1858 }
1859 o->type = type;
1860 o->encoding = REDIS_ENCODING_RAW;
1861 o->ptr = ptr;
1862 o->refcount = 1;
1863 return o;
1864 }
1865
1866 static robj *createStringObject(char *ptr, size_t len) {
1867 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1868 }
1869
1870 static robj *createListObject(void) {
1871 list *l = listCreate();
1872
1873 listSetFreeMethod(l,decrRefCount);
1874 return createObject(REDIS_LIST,l);
1875 }
1876
1877 static robj *createSetObject(void) {
1878 dict *d = dictCreate(&setDictType,NULL);
1879 return createObject(REDIS_SET,d);
1880 }
1881
1882 static robj *createZsetObject(void) {
1883 zset *zs = zmalloc(sizeof(*zs));
1884
1885 zs->dict = dictCreate(&zsetDictType,NULL);
1886 zs->zsl = zslCreate();
1887 return createObject(REDIS_ZSET,zs);
1888 }
1889
1890 static void freeStringObject(robj *o) {
1891 if (o->encoding == REDIS_ENCODING_RAW) {
1892 sdsfree(o->ptr);
1893 }
1894 }
1895
1896 static void freeListObject(robj *o) {
1897 listRelease((list*) o->ptr);
1898 }
1899
1900 static void freeSetObject(robj *o) {
1901 dictRelease((dict*) o->ptr);
1902 }
1903
1904 static void freeZsetObject(robj *o) {
1905 zset *zs = o->ptr;
1906
1907 dictRelease(zs->dict);
1908 zslFree(zs->zsl);
1909 zfree(zs);
1910 }
1911
1912 static void freeHashObject(robj *o) {
1913 dictRelease((dict*) o->ptr);
1914 }
1915
1916 static void incrRefCount(robj *o) {
1917 o->refcount++;
1918 #ifdef DEBUG_REFCOUNT
1919 if (o->type == REDIS_STRING)
1920 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1921 #endif
1922 }
1923
1924 static void decrRefCount(void *obj) {
1925 robj *o = obj;
1926
1927 #ifdef DEBUG_REFCOUNT
1928 if (o->type == REDIS_STRING)
1929 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1930 #endif
1931 if (--(o->refcount) == 0) {
1932 switch(o->type) {
1933 case REDIS_STRING: freeStringObject(o); break;
1934 case REDIS_LIST: freeListObject(o); break;
1935 case REDIS_SET: freeSetObject(o); break;
1936 case REDIS_ZSET: freeZsetObject(o); break;
1937 case REDIS_HASH: freeHashObject(o); break;
1938 default: assert(0 != 0); break;
1939 }
1940 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1941 !listAddNodeHead(server.objfreelist,o))
1942 zfree(o);
1943 }
1944 }
1945
1946 static robj *lookupKey(redisDb *db, robj *key) {
1947 dictEntry *de = dictFind(db->dict,key);
1948 return de ? dictGetEntryVal(de) : NULL;
1949 }
1950
1951 static robj *lookupKeyRead(redisDb *db, robj *key) {
1952 expireIfNeeded(db,key);
1953 return lookupKey(db,key);
1954 }
1955
1956 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1957 deleteIfVolatile(db,key);
1958 return lookupKey(db,key);
1959 }
1960
1961 static int deleteKey(redisDb *db, robj *key) {
1962 int retval;
1963
1964 /* We need to protect key from destruction: after the first dictDelete()
1965 * it may happen that 'key' is no longer valid if we don't increment
1966 * it's count. This may happen when we get the object reference directly
1967 * from the hash table with dictRandomKey() or dict iterators */
1968 incrRefCount(key);
1969 if (dictSize(db->expires)) dictDelete(db->expires,key);
1970 retval = dictDelete(db->dict,key);
1971 decrRefCount(key);
1972
1973 return retval == DICT_OK;
1974 }
1975
1976 /* Try to share an object against the shared objects pool */
1977 static robj *tryObjectSharing(robj *o) {
1978 struct dictEntry *de;
1979 unsigned long c;
1980
1981 if (o == NULL || server.shareobjects == 0) return o;
1982
1983 assert(o->type == REDIS_STRING);
1984 de = dictFind(server.sharingpool,o);
1985 if (de) {
1986 robj *shared = dictGetEntryKey(de);
1987
1988 c = ((unsigned long) dictGetEntryVal(de))+1;
1989 dictGetEntryVal(de) = (void*) c;
1990 incrRefCount(shared);
1991 decrRefCount(o);
1992 return shared;
1993 } else {
1994 /* Here we are using a stream algorihtm: Every time an object is
1995 * shared we increment its count, everytime there is a miss we
1996 * recrement the counter of a random object. If this object reaches
1997 * zero we remove the object and put the current object instead. */
1998 if (dictSize(server.sharingpool) >=
1999 server.sharingpoolsize) {
2000 de = dictGetRandomKey(server.sharingpool);
2001 assert(de != NULL);
2002 c = ((unsigned long) dictGetEntryVal(de))-1;
2003 dictGetEntryVal(de) = (void*) c;
2004 if (c == 0) {
2005 dictDelete(server.sharingpool,de->key);
2006 }
2007 } else {
2008 c = 0; /* If the pool is empty we want to add this object */
2009 }
2010 if (c == 0) {
2011 int retval;
2012
2013 retval = dictAdd(server.sharingpool,o,(void*)1);
2014 assert(retval == DICT_OK);
2015 incrRefCount(o);
2016 }
2017 return o;
2018 }
2019 }
2020
2021 /* Check if the nul-terminated string 's' can be represented by a long
2022 * (that is, is a number that fits into long without any other space or
2023 * character before or after the digits).
2024 *
2025 * If so, the function returns REDIS_OK and *longval is set to the value
2026 * of the number. Otherwise REDIS_ERR is returned */
2027 static int isStringRepresentableAsLong(sds s, long *longval) {
2028 char buf[32], *endptr;
2029 long value;
2030 int slen;
2031
2032 value = strtol(s, &endptr, 10);
2033 if (endptr[0] != '\0') return REDIS_ERR;
2034 slen = snprintf(buf,32,"%ld",value);
2035
2036 /* If the number converted back into a string is not identical
2037 * then it's not possible to encode the string as integer */
2038 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2039 if (longval) *longval = value;
2040 return REDIS_OK;
2041 }
2042
2043 /* Try to encode a string object in order to save space */
2044 static int tryObjectEncoding(robj *o) {
2045 long value;
2046 sds s = o->ptr;
2047
2048 if (o->encoding != REDIS_ENCODING_RAW)
2049 return REDIS_ERR; /* Already encoded */
2050
2051 /* It's not save to encode shared objects: shared objects can be shared
2052 * everywhere in the "object space" of Redis. Encoded objects can only
2053 * appear as "values" (and not, for instance, as keys) */
2054 if (o->refcount > 1) return REDIS_ERR;
2055
2056 /* Currently we try to encode only strings */
2057 assert(o->type == REDIS_STRING);
2058
2059 /* Check if we can represent this string as a long integer */
2060 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2061
2062 /* Ok, this object can be encoded */
2063 o->encoding = REDIS_ENCODING_INT;
2064 sdsfree(o->ptr);
2065 o->ptr = (void*) value;
2066 return REDIS_OK;
2067 }
2068
2069 /* Get a decoded version of an encoded object (returned as a new object) */
2070 static robj *getDecodedObject(const robj *o) {
2071 robj *dec;
2072
2073 assert(o->encoding != REDIS_ENCODING_RAW);
2074 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2075 char buf[32];
2076
2077 snprintf(buf,32,"%ld",(long)o->ptr);
2078 dec = createStringObject(buf,strlen(buf));
2079 return dec;
2080 } else {
2081 assert(1 != 1);
2082 }
2083 }
2084
2085 /* Compare two string objects via strcmp() or alike.
2086 * Note that the objects may be integer-encoded. In such a case we
2087 * use snprintf() to get a string representation of the numbers on the stack
2088 * and compare the strings, it's much faster than calling getDecodedObject(). */
2089 static int compareStringObjects(robj *a, robj *b) {
2090 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2091 char bufa[128], bufb[128], *astr, *bstr;
2092 int bothsds = 1;
2093
2094 if (a == b) return 0;
2095 if (a->encoding != REDIS_ENCODING_RAW) {
2096 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2097 astr = bufa;
2098 bothsds = 0;
2099 } else {
2100 astr = a->ptr;
2101 }
2102 if (b->encoding != REDIS_ENCODING_RAW) {
2103 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2104 bstr = bufb;
2105 bothsds = 0;
2106 } else {
2107 bstr = b->ptr;
2108 }
2109 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2110 }
2111
2112 static size_t stringObjectLen(robj *o) {
2113 assert(o->type == REDIS_STRING);
2114 if (o->encoding == REDIS_ENCODING_RAW) {
2115 return sdslen(o->ptr);
2116 } else {
2117 char buf[32];
2118
2119 return snprintf(buf,32,"%ld",(long)o->ptr);
2120 }
2121 }
2122
2123 /*============================ DB saving/loading ============================ */
2124
2125 static int rdbSaveType(FILE *fp, unsigned char type) {
2126 if (fwrite(&type,1,1,fp) == 0) return -1;
2127 return 0;
2128 }
2129
2130 static int rdbSaveTime(FILE *fp, time_t t) {
2131 int32_t t32 = (int32_t) t;
2132 if (fwrite(&t32,4,1,fp) == 0) return -1;
2133 return 0;
2134 }
2135
2136 /* check rdbLoadLen() comments for more info */
2137 static int rdbSaveLen(FILE *fp, uint32_t len) {
2138 unsigned char buf[2];
2139
2140 if (len < (1<<6)) {
2141 /* Save a 6 bit len */
2142 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2143 if (fwrite(buf,1,1,fp) == 0) return -1;
2144 } else if (len < (1<<14)) {
2145 /* Save a 14 bit len */
2146 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2147 buf[1] = len&0xFF;
2148 if (fwrite(buf,2,1,fp) == 0) return -1;
2149 } else {
2150 /* Save a 32 bit len */
2151 buf[0] = (REDIS_RDB_32BITLEN<<6);
2152 if (fwrite(buf,1,1,fp) == 0) return -1;
2153 len = htonl(len);
2154 if (fwrite(&len,4,1,fp) == 0) return -1;
2155 }
2156 return 0;
2157 }
2158
2159 /* String objects in the form "2391" "-100" without any space and with a
2160 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2161 * encoded as integers to save space */
2162 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2163 long long value;
2164 char *endptr, buf[32];
2165
2166 /* Check if it's possible to encode this value as a number */
2167 value = strtoll(s, &endptr, 10);
2168 if (endptr[0] != '\0') return 0;
2169 snprintf(buf,32,"%lld",value);
2170
2171 /* If the number converted back into a string is not identical
2172 * then it's not possible to encode the string as integer */
2173 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2174
2175 /* Finally check if it fits in our ranges */
2176 if (value >= -(1<<7) && value <= (1<<7)-1) {
2177 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2178 enc[1] = value&0xFF;
2179 return 2;
2180 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2181 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2182 enc[1] = value&0xFF;
2183 enc[2] = (value>>8)&0xFF;
2184 return 3;
2185 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2186 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2187 enc[1] = value&0xFF;
2188 enc[2] = (value>>8)&0xFF;
2189 enc[3] = (value>>16)&0xFF;
2190 enc[4] = (value>>24)&0xFF;
2191 return 5;
2192 } else {
2193 return 0;
2194 }
2195 }
2196
2197 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2198 unsigned int comprlen, outlen;
2199 unsigned char byte;
2200 void *out;
2201
2202 /* We require at least four bytes compression for this to be worth it */
2203 outlen = sdslen(obj->ptr)-4;
2204 if (outlen <= 0) return 0;
2205 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2206 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2207 if (comprlen == 0) {
2208 zfree(out);
2209 return 0;
2210 }
2211 /* Data compressed! Let's save it on disk */
2212 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2213 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2214 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2215 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2216 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2217 zfree(out);
2218 return comprlen;
2219
2220 writeerr:
2221 zfree(out);
2222 return -1;
2223 }
2224
2225 /* Save a string objet as [len][data] on disk. If the object is a string
2226 * representation of an integer value we try to safe it in a special form */
2227 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2228 size_t len;
2229 int enclen;
2230
2231 len = sdslen(obj->ptr);
2232
2233 /* Try integer encoding */
2234 if (len <= 11) {
2235 unsigned char buf[5];
2236 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2237 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2238 return 0;
2239 }
2240 }
2241
2242 /* Try LZF compression - under 20 bytes it's unable to compress even
2243 * aaaaaaaaaaaaaaaaaa so skip it */
2244 if (len > 20) {
2245 int retval;
2246
2247 retval = rdbSaveLzfStringObject(fp,obj);
2248 if (retval == -1) return -1;
2249 if (retval > 0) return 0;
2250 /* retval == 0 means data can't be compressed, save the old way */
2251 }
2252
2253 /* Store verbatim */
2254 if (rdbSaveLen(fp,len) == -1) return -1;
2255 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2256 return 0;
2257 }
2258
2259 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2260 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2261 int retval;
2262 robj *dec;
2263
2264 if (obj->encoding != REDIS_ENCODING_RAW) {
2265 dec = getDecodedObject(obj);
2266 retval = rdbSaveStringObjectRaw(fp,dec);
2267 decrRefCount(dec);
2268 return retval;
2269 } else {
2270 return rdbSaveStringObjectRaw(fp,obj);
2271 }
2272 }
2273
2274 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2275 * 8 bit integer specifing the length of the representation.
2276 * This 8 bit integer has special values in order to specify the following
2277 * conditions:
2278 * 253: not a number
2279 * 254: + inf
2280 * 255: - inf
2281 */
2282 static int rdbSaveDoubleValue(FILE *fp, double val) {
2283 unsigned char buf[128];
2284 int len;
2285
2286 if (isnan(val)) {
2287 buf[0] = 253;
2288 len = 1;
2289 } else if (!isfinite(val)) {
2290 len = 1;
2291 buf[0] = (val < 0) ? 255 : 254;
2292 } else {
2293 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2294 buf[0] = strlen((char*)buf);
2295 len = buf[0]+1;
2296 }
2297 if (fwrite(buf,len,1,fp) == 0) return -1;
2298 return 0;
2299 }
2300
2301 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2302 static int rdbSave(char *filename) {
2303 dictIterator *di = NULL;
2304 dictEntry *de;
2305 FILE *fp;
2306 char tmpfile[256];
2307 int j;
2308 time_t now = time(NULL);
2309
2310 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2311 fp = fopen(tmpfile,"w");
2312 if (!fp) {
2313 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2314 return REDIS_ERR;
2315 }
2316 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2317 for (j = 0; j < server.dbnum; j++) {
2318 redisDb *db = server.db+j;
2319 dict *d = db->dict;
2320 if (dictSize(d) == 0) continue;
2321 di = dictGetIterator(d);
2322 if (!di) {
2323 fclose(fp);
2324 return REDIS_ERR;
2325 }
2326
2327 /* Write the SELECT DB opcode */
2328 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2329 if (rdbSaveLen(fp,j) == -1) goto werr;
2330
2331 /* Iterate this DB writing every entry */
2332 while((de = dictNext(di)) != NULL) {
2333 robj *key = dictGetEntryKey(de);
2334 robj *o = dictGetEntryVal(de);
2335 time_t expiretime = getExpire(db,key);
2336
2337 /* Save the expire time */
2338 if (expiretime != -1) {
2339 /* If this key is already expired skip it */
2340 if (expiretime < now) continue;
2341 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2342 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2343 }
2344 /* Save the key and associated value */
2345 if (rdbSaveType(fp,o->type) == -1) goto werr;
2346 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2347 if (o->type == REDIS_STRING) {
2348 /* Save a string value */
2349 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2350 } else if (o->type == REDIS_LIST) {
2351 /* Save a list value */
2352 list *list = o->ptr;
2353 listNode *ln;
2354
2355 listRewind(list);
2356 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2357 while((ln = listYield(list))) {
2358 robj *eleobj = listNodeValue(ln);
2359
2360 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2361 }
2362 } else if (o->type == REDIS_SET) {
2363 /* Save a set value */
2364 dict *set = o->ptr;
2365 dictIterator *di = dictGetIterator(set);
2366 dictEntry *de;
2367
2368 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2369 while((de = dictNext(di)) != NULL) {
2370 robj *eleobj = dictGetEntryKey(de);
2371
2372 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2373 }
2374 dictReleaseIterator(di);
2375 } else if (o->type == REDIS_ZSET) {
2376 /* Save a set value */
2377 zset *zs = o->ptr;
2378 dictIterator *di = dictGetIterator(zs->dict);
2379 dictEntry *de;
2380
2381 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2382 while((de = dictNext(di)) != NULL) {
2383 robj *eleobj = dictGetEntryKey(de);
2384 double *score = dictGetEntryVal(de);
2385
2386 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2387 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2388 }
2389 dictReleaseIterator(di);
2390 } else {
2391 assert(0 != 0);
2392 }
2393 }
2394 dictReleaseIterator(di);
2395 }
2396 /* EOF opcode */
2397 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2398
2399 /* Make sure data will not remain on the OS's output buffers */
2400 fflush(fp);
2401 fsync(fileno(fp));
2402 fclose(fp);
2403
2404 /* Use RENAME to make sure the DB file is changed atomically only
2405 * if the generate DB file is ok. */
2406 if (rename(tmpfile,filename) == -1) {
2407 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2408 unlink(tmpfile);
2409 return REDIS_ERR;
2410 }
2411 redisLog(REDIS_NOTICE,"DB saved on disk");
2412 server.dirty = 0;
2413 server.lastsave = time(NULL);
2414 return REDIS_OK;
2415
2416 werr:
2417 fclose(fp);
2418 unlink(tmpfile);
2419 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2420 if (di) dictReleaseIterator(di);
2421 return REDIS_ERR;
2422 }
2423
2424 static int rdbSaveBackground(char *filename) {
2425 pid_t childpid;
2426
2427 if (server.bgsaveinprogress) return REDIS_ERR;
2428 if ((childpid = fork()) == 0) {
2429 /* Child */
2430 close(server.fd);
2431 if (rdbSave(filename) == REDIS_OK) {
2432 exit(0);
2433 } else {
2434 exit(1);
2435 }
2436 } else {
2437 /* Parent */
2438 if (childpid == -1) {
2439 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2440 strerror(errno));
2441 return REDIS_ERR;
2442 }
2443 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2444 server.bgsaveinprogress = 1;
2445 server.bgsavechildpid = childpid;
2446 return REDIS_OK;
2447 }
2448 return REDIS_OK; /* unreached */
2449 }
2450
2451 static void rdbRemoveTempFile(pid_t childpid) {
2452 char tmpfile[256];
2453
2454 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2455 unlink(tmpfile);
2456 }
2457
2458 static int rdbLoadType(FILE *fp) {
2459 unsigned char type;
2460 if (fread(&type,1,1,fp) == 0) return -1;
2461 return type;
2462 }
2463
2464 static time_t rdbLoadTime(FILE *fp) {
2465 int32_t t32;
2466 if (fread(&t32,4,1,fp) == 0) return -1;
2467 return (time_t) t32;
2468 }
2469
2470 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2471 * of this file for a description of how this are stored on disk.
2472 *
2473 * isencoded is set to 1 if the readed length is not actually a length but
2474 * an "encoding type", check the above comments for more info */
2475 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2476 unsigned char buf[2];
2477 uint32_t len;
2478
2479 if (isencoded) *isencoded = 0;
2480 if (rdbver == 0) {
2481 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2482 return ntohl(len);
2483 } else {
2484 int type;
2485
2486 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2487 type = (buf[0]&0xC0)>>6;
2488 if (type == REDIS_RDB_6BITLEN) {
2489 /* Read a 6 bit len */
2490 return buf[0]&0x3F;
2491 } else if (type == REDIS_RDB_ENCVAL) {
2492 /* Read a 6 bit len encoding type */
2493 if (isencoded) *isencoded = 1;
2494 return buf[0]&0x3F;
2495 } else if (type == REDIS_RDB_14BITLEN) {
2496 /* Read a 14 bit len */
2497 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2498 return ((buf[0]&0x3F)<<8)|buf[1];
2499 } else {
2500 /* Read a 32 bit len */
2501 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2502 return ntohl(len);
2503 }
2504 }
2505 }
2506
2507 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2508 unsigned char enc[4];
2509 long long val;
2510
2511 if (enctype == REDIS_RDB_ENC_INT8) {
2512 if (fread(enc,1,1,fp) == 0) return NULL;
2513 val = (signed char)enc[0];
2514 } else if (enctype == REDIS_RDB_ENC_INT16) {
2515 uint16_t v;
2516 if (fread(enc,2,1,fp) == 0) return NULL;
2517 v = enc[0]|(enc[1]<<8);
2518 val = (int16_t)v;
2519 } else if (enctype == REDIS_RDB_ENC_INT32) {
2520 uint32_t v;
2521 if (fread(enc,4,1,fp) == 0) return NULL;
2522 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2523 val = (int32_t)v;
2524 } else {
2525 val = 0; /* anti-warning */
2526 assert(0!=0);
2527 }
2528 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2529 }
2530
2531 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2532 unsigned int len, clen;
2533 unsigned char *c = NULL;
2534 sds val = NULL;
2535
2536 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2537 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2538 if ((c = zmalloc(clen)) == NULL) goto err;
2539 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2540 if (fread(c,clen,1,fp) == 0) goto err;
2541 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2542 zfree(c);
2543 return createObject(REDIS_STRING,val);
2544 err:
2545 zfree(c);
2546 sdsfree(val);
2547 return NULL;
2548 }
2549
2550 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2551 int isencoded;
2552 uint32_t len;
2553 sds val;
2554
2555 len = rdbLoadLen(fp,rdbver,&isencoded);
2556 if (isencoded) {
2557 switch(len) {
2558 case REDIS_RDB_ENC_INT8:
2559 case REDIS_RDB_ENC_INT16:
2560 case REDIS_RDB_ENC_INT32:
2561 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2562 case REDIS_RDB_ENC_LZF:
2563 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2564 default:
2565 assert(0!=0);
2566 }
2567 }
2568
2569 if (len == REDIS_RDB_LENERR) return NULL;
2570 val = sdsnewlen(NULL,len);
2571 if (len && fread(val,len,1,fp) == 0) {
2572 sdsfree(val);
2573 return NULL;
2574 }
2575 return tryObjectSharing(createObject(REDIS_STRING,val));
2576 }
2577
2578 /* For information about double serialization check rdbSaveDoubleValue() */
2579 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2580 char buf[128];
2581 unsigned char len;
2582
2583 if (fread(&len,1,1,fp) == 0) return -1;
2584 switch(len) {
2585 case 255: *val = R_NegInf; return 0;
2586 case 254: *val = R_PosInf; return 0;
2587 case 253: *val = R_Nan; return 0;
2588 default:
2589 if (fread(buf,len,1,fp) == 0) return -1;
2590 sscanf(buf, "%lg", val);
2591 return 0;
2592 }
2593 }
2594
2595 static int rdbLoad(char *filename) {
2596 FILE *fp;
2597 robj *keyobj = NULL;
2598 uint32_t dbid;
2599 int type, retval, rdbver;
2600 dict *d = server.db[0].dict;
2601 redisDb *db = server.db+0;
2602 char buf[1024];
2603 time_t expiretime = -1, now = time(NULL);
2604
2605 fp = fopen(filename,"r");
2606 if (!fp) return REDIS_ERR;
2607 if (fread(buf,9,1,fp) == 0) goto eoferr;
2608 buf[9] = '\0';
2609 if (memcmp(buf,"REDIS",5) != 0) {
2610 fclose(fp);
2611 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2612 return REDIS_ERR;
2613 }
2614 rdbver = atoi(buf+5);
2615 if (rdbver > 1) {
2616 fclose(fp);
2617 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2618 return REDIS_ERR;
2619 }
2620 while(1) {
2621 robj *o;
2622
2623 /* Read type. */
2624 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2625 if (type == REDIS_EXPIRETIME) {
2626 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2627 /* We read the time so we need to read the object type again */
2628 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2629 }
2630 if (type == REDIS_EOF) break;
2631 /* Handle SELECT DB opcode as a special case */
2632 if (type == REDIS_SELECTDB) {
2633 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2634 goto eoferr;
2635 if (dbid >= (unsigned)server.dbnum) {
2636 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2637 exit(1);
2638 }
2639 db = server.db+dbid;
2640 d = db->dict;
2641 continue;
2642 }
2643 /* Read key */
2644 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2645
2646 if (type == REDIS_STRING) {
2647 /* Read string value */
2648 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2649 tryObjectEncoding(o);
2650 } else if (type == REDIS_LIST || type == REDIS_SET) {
2651 /* Read list/set value */
2652 uint32_t listlen;
2653
2654 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2655 goto eoferr;
2656 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2657 /* Load every single element of the list/set */
2658 while(listlen--) {
2659 robj *ele;
2660
2661 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2662 tryObjectEncoding(ele);
2663 if (type == REDIS_LIST) {
2664 listAddNodeTail((list*)o->ptr,ele);
2665 } else {
2666 dictAdd((dict*)o->ptr,ele,NULL);
2667 }
2668 }
2669 } else if (type == REDIS_ZSET) {
2670 /* Read list/set value */
2671 uint32_t zsetlen;
2672 zset *zs;
2673
2674 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2675 goto eoferr;
2676 o = createZsetObject();
2677 zs = o->ptr;
2678 /* Load every single element of the list/set */
2679 while(zsetlen--) {
2680 robj *ele;
2681 double *score = zmalloc(sizeof(double));
2682
2683 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2684 tryObjectEncoding(ele);
2685 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2686 dictAdd(zs->dict,ele,score);
2687 zslInsert(zs->zsl,*score,ele);
2688 incrRefCount(ele); /* added to skiplist */
2689 }
2690 } else {
2691 assert(0 != 0);
2692 }
2693 /* Add the new object in the hash table */
2694 retval = dictAdd(d,keyobj,o);
2695 if (retval == DICT_ERR) {
2696 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2697 exit(1);
2698 }
2699 /* Set the expire time if needed */
2700 if (expiretime != -1) {
2701 setExpire(db,keyobj,expiretime);
2702 /* Delete this key if already expired */
2703 if (expiretime < now) deleteKey(db,keyobj);
2704 expiretime = -1;
2705 }
2706 keyobj = o = NULL;
2707 }
2708 fclose(fp);
2709 return REDIS_OK;
2710
2711 eoferr: /* unexpected end of file is handled here with a fatal exit */
2712 if (keyobj) decrRefCount(keyobj);
2713 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2714 exit(1);
2715 return REDIS_ERR; /* Just to avoid warning */
2716 }
2717
2718 /*================================== Commands =============================== */
2719
2720 static void authCommand(redisClient *c) {
2721 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2722 c->authenticated = 1;
2723 addReply(c,shared.ok);
2724 } else {
2725 c->authenticated = 0;
2726 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2727 }
2728 }
2729
2730 static void pingCommand(redisClient *c) {
2731 addReply(c,shared.pong);
2732 }
2733
2734 static void echoCommand(redisClient *c) {
2735 addReplyBulkLen(c,c->argv[1]);
2736 addReply(c,c->argv[1]);
2737 addReply(c,shared.crlf);
2738 }
2739
2740 /*=================================== Strings =============================== */
2741
2742 static void setGenericCommand(redisClient *c, int nx) {
2743 int retval;
2744
2745 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2746 if (retval == DICT_ERR) {
2747 if (!nx) {
2748 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2749 incrRefCount(c->argv[2]);
2750 } else {
2751 addReply(c,shared.czero);
2752 return;
2753 }
2754 } else {
2755 incrRefCount(c->argv[1]);
2756 incrRefCount(c->argv[2]);
2757 }
2758 server.dirty++;
2759 removeExpire(c->db,c->argv[1]);
2760 addReply(c, nx ? shared.cone : shared.ok);
2761 }
2762
2763 static void setCommand(redisClient *c) {
2764 setGenericCommand(c,0);
2765 }
2766
2767 static void setnxCommand(redisClient *c) {
2768 setGenericCommand(c,1);
2769 }
2770
2771 static void getCommand(redisClient *c) {
2772 robj *o = lookupKeyRead(c->db,c->argv[1]);
2773
2774 if (o == NULL) {
2775 addReply(c,shared.nullbulk);
2776 } else {
2777 if (o->type != REDIS_STRING) {
2778 addReply(c,shared.wrongtypeerr);
2779 } else {
2780 addReplyBulkLen(c,o);
2781 addReply(c,o);
2782 addReply(c,shared.crlf);
2783 }
2784 }
2785 }
2786
2787 static void getsetCommand(redisClient *c) {
2788 getCommand(c);
2789 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2790 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2791 } else {
2792 incrRefCount(c->argv[1]);
2793 }
2794 incrRefCount(c->argv[2]);
2795 server.dirty++;
2796 removeExpire(c->db,c->argv[1]);
2797 }
2798
2799 static void mgetCommand(redisClient *c) {
2800 int j;
2801
2802 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2803 for (j = 1; j < c->argc; j++) {
2804 robj *o = lookupKeyRead(c->db,c->argv[j]);
2805 if (o == NULL) {
2806 addReply(c,shared.nullbulk);
2807 } else {
2808 if (o->type != REDIS_STRING) {
2809 addReply(c,shared.nullbulk);
2810 } else {
2811 addReplyBulkLen(c,o);
2812 addReply(c,o);
2813 addReply(c,shared.crlf);
2814 }
2815 }
2816 }
2817 }
2818
2819 static void incrDecrCommand(redisClient *c, long long incr) {
2820 long long value;
2821 int retval;
2822 robj *o;
2823
2824 o = lookupKeyWrite(c->db,c->argv[1]);
2825 if (o == NULL) {
2826 value = 0;
2827 } else {
2828 if (o->type != REDIS_STRING) {
2829 value = 0;
2830 } else {
2831 char *eptr;
2832
2833 if (o->encoding == REDIS_ENCODING_RAW)
2834 value = strtoll(o->ptr, &eptr, 10);
2835 else if (o->encoding == REDIS_ENCODING_INT)
2836 value = (long)o->ptr;
2837 else
2838 assert(1 != 1);
2839 }
2840 }
2841
2842 value += incr;
2843 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2844 tryObjectEncoding(o);
2845 retval = dictAdd(c->db->dict,c->argv[1],o);
2846 if (retval == DICT_ERR) {
2847 dictReplace(c->db->dict,c->argv[1],o);
2848 removeExpire(c->db,c->argv[1]);
2849 } else {
2850 incrRefCount(c->argv[1]);
2851 }
2852 server.dirty++;
2853 addReply(c,shared.colon);
2854 addReply(c,o);
2855 addReply(c,shared.crlf);
2856 }
2857
2858 static void incrCommand(redisClient *c) {
2859 incrDecrCommand(c,1);
2860 }
2861
2862 static void decrCommand(redisClient *c) {
2863 incrDecrCommand(c,-1);
2864 }
2865
2866 static void incrbyCommand(redisClient *c) {
2867 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2868 incrDecrCommand(c,incr);
2869 }
2870
2871 static void decrbyCommand(redisClient *c) {
2872 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2873 incrDecrCommand(c,-incr);
2874 }
2875
2876 /* ========================= Type agnostic commands ========================= */
2877
2878 static void delCommand(redisClient *c) {
2879 int deleted = 0, j;
2880
2881 for (j = 1; j < c->argc; j++) {
2882 if (deleteKey(c->db,c->argv[j])) {
2883 server.dirty++;
2884 deleted++;
2885 }
2886 }
2887 switch(deleted) {
2888 case 0:
2889 addReply(c,shared.czero);
2890 break;
2891 case 1:
2892 addReply(c,shared.cone);
2893 break;
2894 default:
2895 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2896 break;
2897 }
2898 }
2899
2900 static void existsCommand(redisClient *c) {
2901 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2902 }
2903
2904 static void selectCommand(redisClient *c) {
2905 int id = atoi(c->argv[1]->ptr);
2906
2907 if (selectDb(c,id) == REDIS_ERR) {
2908 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2909 } else {
2910 addReply(c,shared.ok);
2911 }
2912 }
2913
2914 static void randomkeyCommand(redisClient *c) {
2915 dictEntry *de;
2916
2917 while(1) {
2918 de = dictGetRandomKey(c->db->dict);
2919 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2920 }
2921 if (de == NULL) {
2922 addReply(c,shared.plus);
2923 addReply(c,shared.crlf);
2924 } else {
2925 addReply(c,shared.plus);
2926 addReply(c,dictGetEntryKey(de));
2927 addReply(c,shared.crlf);
2928 }
2929 }
2930
2931 static void keysCommand(redisClient *c) {
2932 dictIterator *di;
2933 dictEntry *de;
2934 sds pattern = c->argv[1]->ptr;
2935 int plen = sdslen(pattern);
2936 int numkeys = 0, keyslen = 0;
2937 robj *lenobj = createObject(REDIS_STRING,NULL);
2938
2939 di = dictGetIterator(c->db->dict);
2940 addReply(c,lenobj);
2941 decrRefCount(lenobj);
2942 while((de = dictNext(di)) != NULL) {
2943 robj *keyobj = dictGetEntryKey(de);
2944
2945 sds key = keyobj->ptr;
2946 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2947 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2948 if (expireIfNeeded(c->db,keyobj) == 0) {
2949 if (numkeys != 0)
2950 addReply(c,shared.space);
2951 addReply(c,keyobj);
2952 numkeys++;
2953 keyslen += sdslen(key);
2954 }
2955 }
2956 }
2957 dictReleaseIterator(di);
2958 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2959 addReply(c,shared.crlf);
2960 }
2961
2962 static void dbsizeCommand(redisClient *c) {
2963 addReplySds(c,
2964 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2965 }
2966
2967 static void lastsaveCommand(redisClient *c) {
2968 addReplySds(c,
2969 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2970 }
2971
2972 static void typeCommand(redisClient *c) {
2973 robj *o;
2974 char *type;
2975
2976 o = lookupKeyRead(c->db,c->argv[1]);
2977 if (o == NULL) {
2978 type = "+none";
2979 } else {
2980 switch(o->type) {
2981 case REDIS_STRING: type = "+string"; break;
2982 case REDIS_LIST: type = "+list"; break;
2983 case REDIS_SET: type = "+set"; break;
2984 default: type = "unknown"; break;
2985 }
2986 }
2987 addReplySds(c,sdsnew(type));
2988 addReply(c,shared.crlf);
2989 }
2990
2991 static void saveCommand(redisClient *c) {
2992 if (server.bgsaveinprogress) {
2993 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2994 return;
2995 }
2996 if (rdbSave(server.dbfilename) == REDIS_OK) {
2997 addReply(c,shared.ok);
2998 } else {
2999 addReply(c,shared.err);
3000 }
3001 }
3002
3003 static void bgsaveCommand(redisClient *c) {
3004 if (server.bgsaveinprogress) {
3005 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3006 return;
3007 }
3008 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3009 addReply(c,shared.ok);
3010 } else {
3011 addReply(c,shared.err);
3012 }
3013 }
3014
3015 static void shutdownCommand(redisClient *c) {
3016 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3017 /* Kill the saving child if there is a background saving in progress.
3018 We want to avoid race conditions, for instance our saving child may
3019 overwrite the synchronous saving did by SHUTDOWN. */
3020 if (server.bgsaveinprogress) {
3021 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3022 kill(server.bgsavechildpid,SIGKILL);
3023 rdbRemoveTempFile(server.bgsavechildpid);
3024 }
3025 /* SYNC SAVE */
3026 if (rdbSave(server.dbfilename) == REDIS_OK) {
3027 if (server.daemonize)
3028 unlink(server.pidfile);
3029 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3030 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3031 exit(1);
3032 } else {
3033 /* Ooops.. error saving! The best we can do is to continue operating.
3034 * Note that if there was a background saving process, in the next
3035 * cron() Redis will be notified that the background saving aborted,
3036 * handling special stuff like slaves pending for synchronization... */
3037 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3038 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3039 }
3040 }
3041
3042 static void renameGenericCommand(redisClient *c, int nx) {
3043 robj *o;
3044
3045 /* To use the same key as src and dst is probably an error */
3046 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3047 addReply(c,shared.sameobjecterr);
3048 return;
3049 }
3050
3051 o = lookupKeyWrite(c->db,c->argv[1]);
3052 if (o == NULL) {
3053 addReply(c,shared.nokeyerr);
3054 return;
3055 }
3056 incrRefCount(o);
3057 deleteIfVolatile(c->db,c->argv[2]);
3058 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3059 if (nx) {
3060 decrRefCount(o);
3061 addReply(c,shared.czero);
3062 return;
3063 }
3064 dictReplace(c->db->dict,c->argv[2],o);
3065 } else {
3066 incrRefCount(c->argv[2]);
3067 }
3068 deleteKey(c->db,c->argv[1]);
3069 server.dirty++;
3070 addReply(c,nx ? shared.cone : shared.ok);
3071 }
3072
3073 static void renameCommand(redisClient *c) {
3074 renameGenericCommand(c,0);
3075 }
3076
3077 static void renamenxCommand(redisClient *c) {
3078 renameGenericCommand(c,1);
3079 }
3080
3081 static void moveCommand(redisClient *c) {
3082 robj *o;
3083 redisDb *src, *dst;
3084 int srcid;
3085
3086 /* Obtain source and target DB pointers */
3087 src = c->db;
3088 srcid = c->db->id;
3089 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3090 addReply(c,shared.outofrangeerr);
3091 return;
3092 }
3093 dst = c->db;
3094 selectDb(c,srcid); /* Back to the source DB */
3095
3096 /* If the user is moving using as target the same
3097 * DB as the source DB it is probably an error. */
3098 if (src == dst) {
3099 addReply(c,shared.sameobjecterr);
3100 return;
3101 }
3102
3103 /* Check if the element exists and get a reference */
3104 o = lookupKeyWrite(c->db,c->argv[1]);
3105 if (!o) {
3106 addReply(c,shared.czero);
3107 return;
3108 }
3109
3110 /* Try to add the element to the target DB */
3111 deleteIfVolatile(dst,c->argv[1]);
3112 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3113 addReply(c,shared.czero);
3114 return;
3115 }
3116 incrRefCount(c->argv[1]);
3117 incrRefCount(o);
3118
3119 /* OK! key moved, free the entry in the source DB */
3120 deleteKey(src,c->argv[1]);
3121 server.dirty++;
3122 addReply(c,shared.cone);
3123 }
3124
3125 /* =================================== Lists ================================ */
3126 static void pushGenericCommand(redisClient *c, int where) {
3127 robj *lobj;
3128 list *list;
3129
3130 lobj = lookupKeyWrite(c->db,c->argv[1]);
3131 if (lobj == NULL) {
3132 lobj = createListObject();
3133 list = lobj->ptr;
3134 if (where == REDIS_HEAD) {
3135 listAddNodeHead(list,c->argv[2]);
3136 } else {
3137 listAddNodeTail(list,c->argv[2]);
3138 }
3139 dictAdd(c->db->dict,c->argv[1],lobj);
3140 incrRefCount(c->argv[1]);
3141 incrRefCount(c->argv[2]);
3142 } else {
3143 if (lobj->type != REDIS_LIST) {
3144 addReply(c,shared.wrongtypeerr);
3145 return;
3146 }
3147 list = lobj->ptr;
3148 if (where == REDIS_HEAD) {
3149 listAddNodeHead(list,c->argv[2]);
3150 } else {
3151 listAddNodeTail(list,c->argv[2]);
3152 }
3153 incrRefCount(c->argv[2]);
3154 }
3155 server.dirty++;
3156 addReply(c,shared.ok);
3157 }
3158
3159 static void lpushCommand(redisClient *c) {
3160 pushGenericCommand(c,REDIS_HEAD);
3161 }
3162
3163 static void rpushCommand(redisClient *c) {
3164 pushGenericCommand(c,REDIS_TAIL);
3165 }
3166
3167 static void llenCommand(redisClient *c) {
3168 robj *o;
3169 list *l;
3170
3171 o = lookupKeyRead(c->db,c->argv[1]);
3172 if (o == NULL) {
3173 addReply(c,shared.czero);
3174 return;
3175 } else {
3176 if (o->type != REDIS_LIST) {
3177 addReply(c,shared.wrongtypeerr);
3178 } else {
3179 l = o->ptr;
3180 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3181 }
3182 }
3183 }
3184
3185 static void lindexCommand(redisClient *c) {
3186 robj *o;
3187 int index = atoi(c->argv[2]->ptr);
3188
3189 o = lookupKeyRead(c->db,c->argv[1]);
3190 if (o == NULL) {
3191 addReply(c,shared.nullbulk);
3192 } else {
3193 if (o->type != REDIS_LIST) {
3194 addReply(c,shared.wrongtypeerr);
3195 } else {
3196 list *list = o->ptr;
3197 listNode *ln;
3198
3199 ln = listIndex(list, index);
3200 if (ln == NULL) {
3201 addReply(c,shared.nullbulk);
3202 } else {
3203 robj *ele = listNodeValue(ln);
3204 addReplyBulkLen(c,ele);
3205 addReply(c,ele);
3206 addReply(c,shared.crlf);
3207 }
3208 }
3209 }
3210 }
3211
3212 static void lsetCommand(redisClient *c) {
3213 robj *o;
3214 int index = atoi(c->argv[2]->ptr);
3215
3216 o = lookupKeyWrite(c->db,c->argv[1]);
3217 if (o == NULL) {
3218 addReply(c,shared.nokeyerr);
3219 } else {
3220 if (o->type != REDIS_LIST) {
3221 addReply(c,shared.wrongtypeerr);
3222 } else {
3223 list *list = o->ptr;
3224 listNode *ln;
3225
3226 ln = listIndex(list, index);
3227 if (ln == NULL) {
3228 addReply(c,shared.outofrangeerr);
3229 } else {
3230 robj *ele = listNodeValue(ln);
3231
3232 decrRefCount(ele);
3233 listNodeValue(ln) = c->argv[3];
3234 incrRefCount(c->argv[3]);
3235 addReply(c,shared.ok);
3236 server.dirty++;
3237 }
3238 }
3239 }
3240 }
3241
3242 static void popGenericCommand(redisClient *c, int where) {
3243 robj *o;
3244
3245 o = lookupKeyWrite(c->db,c->argv[1]);
3246 if (o == NULL) {
3247 addReply(c,shared.nullbulk);
3248 } else {
3249 if (o->type != REDIS_LIST) {
3250 addReply(c,shared.wrongtypeerr);
3251 } else {
3252 list *list = o->ptr;
3253 listNode *ln;
3254
3255 if (where == REDIS_HEAD)
3256 ln = listFirst(list);
3257 else
3258 ln = listLast(list);
3259
3260 if (ln == NULL) {
3261 addReply(c,shared.nullbulk);
3262 } else {
3263 robj *ele = listNodeValue(ln);
3264 addReplyBulkLen(c,ele);
3265 addReply(c,ele);
3266 addReply(c,shared.crlf);
3267 listDelNode(list,ln);
3268 server.dirty++;
3269 }
3270 }
3271 }
3272 }
3273
3274 static void lpopCommand(redisClient *c) {
3275 popGenericCommand(c,REDIS_HEAD);
3276 }
3277
3278 static void rpopCommand(redisClient *c) {
3279 popGenericCommand(c,REDIS_TAIL);
3280 }
3281
3282 static void lrangeCommand(redisClient *c) {
3283 robj *o;
3284 int start = atoi(c->argv[2]->ptr);
3285 int end = atoi(c->argv[3]->ptr);
3286
3287 o = lookupKeyRead(c->db,c->argv[1]);
3288 if (o == NULL) {
3289 addReply(c,shared.nullmultibulk);
3290 } else {
3291 if (o->type != REDIS_LIST) {
3292 addReply(c,shared.wrongtypeerr);
3293 } else {
3294 list *list = o->ptr;
3295 listNode *ln;
3296 int llen = listLength(list);
3297 int rangelen, j;
3298 robj *ele;
3299
3300 /* convert negative indexes */
3301 if (start < 0) start = llen+start;
3302 if (end < 0) end = llen+end;
3303 if (start < 0) start = 0;
3304 if (end < 0) end = 0;
3305
3306 /* indexes sanity checks */
3307 if (start > end || start >= llen) {
3308 /* Out of range start or start > end result in empty list */
3309 addReply(c,shared.emptymultibulk);
3310 return;
3311 }
3312 if (end >= llen) end = llen-1;
3313 rangelen = (end-start)+1;
3314
3315 /* Return the result in form of a multi-bulk reply */
3316 ln = listIndex(list, start);
3317 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3318 for (j = 0; j < rangelen; j++) {
3319 ele = listNodeValue(ln);
3320 addReplyBulkLen(c,ele);
3321 addReply(c,ele);
3322 addReply(c,shared.crlf);
3323 ln = ln->next;
3324 }
3325 }
3326 }
3327 }
3328
3329 static void ltrimCommand(redisClient *c) {
3330 robj *o;
3331 int start = atoi(c->argv[2]->ptr);
3332 int end = atoi(c->argv[3]->ptr);
3333
3334 o = lookupKeyWrite(c->db,c->argv[1]);
3335 if (o == NULL) {
3336 addReply(c,shared.nokeyerr);
3337 } else {
3338 if (o->type != REDIS_LIST) {
3339 addReply(c,shared.wrongtypeerr);
3340 } else {
3341 list *list = o->ptr;
3342 listNode *ln;
3343 int llen = listLength(list);
3344 int j, ltrim, rtrim;
3345
3346 /* convert negative indexes */
3347 if (start < 0) start = llen+start;
3348 if (end < 0) end = llen+end;
3349 if (start < 0) start = 0;
3350 if (end < 0) end = 0;
3351
3352 /* indexes sanity checks */
3353 if (start > end || start >= llen) {
3354 /* Out of range start or start > end result in empty list */
3355 ltrim = llen;
3356 rtrim = 0;
3357 } else {
3358 if (end >= llen) end = llen-1;
3359 ltrim = start;
3360 rtrim = llen-end-1;
3361 }
3362
3363 /* Remove list elements to perform the trim */
3364 for (j = 0; j < ltrim; j++) {
3365 ln = listFirst(list);
3366 listDelNode(list,ln);
3367 }
3368 for (j = 0; j < rtrim; j++) {
3369 ln = listLast(list);
3370 listDelNode(list,ln);
3371 }
3372 server.dirty++;
3373 addReply(c,shared.ok);
3374 }
3375 }
3376 }
3377
3378 static void lremCommand(redisClient *c) {
3379 robj *o;
3380
3381 o = lookupKeyWrite(c->db,c->argv[1]);
3382 if (o == NULL) {
3383 addReply(c,shared.czero);
3384 } else {
3385 if (o->type != REDIS_LIST) {
3386 addReply(c,shared.wrongtypeerr);
3387 } else {
3388 list *list = o->ptr;
3389 listNode *ln, *next;
3390 int toremove = atoi(c->argv[2]->ptr);
3391 int removed = 0;
3392 int fromtail = 0;
3393
3394 if (toremove < 0) {
3395 toremove = -toremove;
3396 fromtail = 1;
3397 }
3398 ln = fromtail ? list->tail : list->head;
3399 while (ln) {
3400 robj *ele = listNodeValue(ln);
3401
3402 next = fromtail ? ln->prev : ln->next;
3403 if (compareStringObjects(ele,c->argv[3]) == 0) {
3404 listDelNode(list,ln);
3405 server.dirty++;
3406 removed++;
3407 if (toremove && removed == toremove) break;
3408 }
3409 ln = next;
3410 }
3411 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3412 }
3413 }
3414 }
3415
3416 /* ==================================== Sets ================================ */
3417
3418 static void saddCommand(redisClient *c) {
3419 robj *set;
3420
3421 set = lookupKeyWrite(c->db,c->argv[1]);
3422 if (set == NULL) {
3423 set = createSetObject();
3424 dictAdd(c->db->dict,c->argv[1],set);
3425 incrRefCount(c->argv[1]);
3426 } else {
3427 if (set->type != REDIS_SET) {
3428 addReply(c,shared.wrongtypeerr);
3429 return;
3430 }
3431 }
3432 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3433 incrRefCount(c->argv[2]);
3434 server.dirty++;
3435 addReply(c,shared.cone);
3436 } else {
3437 addReply(c,shared.czero);
3438 }
3439 }
3440
3441 static void sremCommand(redisClient *c) {
3442 robj *set;
3443
3444 set = lookupKeyWrite(c->db,c->argv[1]);
3445 if (set == NULL) {
3446 addReply(c,shared.czero);
3447 } else {
3448 if (set->type != REDIS_SET) {
3449 addReply(c,shared.wrongtypeerr);
3450 return;
3451 }
3452 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3453 server.dirty++;
3454 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3455 addReply(c,shared.cone);
3456 } else {
3457 addReply(c,shared.czero);
3458 }
3459 }
3460 }
3461
3462 static void smoveCommand(redisClient *c) {
3463 robj *srcset, *dstset;
3464
3465 srcset = lookupKeyWrite(c->db,c->argv[1]);
3466 dstset = lookupKeyWrite(c->db,c->argv[2]);
3467
3468 /* If the source key does not exist return 0, if it's of the wrong type
3469 * raise an error */
3470 if (srcset == NULL || srcset->type != REDIS_SET) {
3471 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3472 return;
3473 }
3474 /* Error if the destination key is not a set as well */
3475 if (dstset && dstset->type != REDIS_SET) {
3476 addReply(c,shared.wrongtypeerr);
3477 return;
3478 }
3479 /* Remove the element from the source set */
3480 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3481 /* Key not found in the src set! return zero */
3482 addReply(c,shared.czero);
3483 return;
3484 }
3485 server.dirty++;
3486 /* Add the element to the destination set */
3487 if (!dstset) {
3488 dstset = createSetObject();
3489 dictAdd(c->db->dict,c->argv[2],dstset);
3490 incrRefCount(c->argv[2]);
3491 }
3492 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3493 incrRefCount(c->argv[3]);
3494 addReply(c,shared.cone);
3495 }
3496
3497 static void sismemberCommand(redisClient *c) {
3498 robj *set;
3499
3500 set = lookupKeyRead(c->db,c->argv[1]);
3501 if (set == NULL) {
3502 addReply(c,shared.czero);
3503 } else {
3504 if (set->type != REDIS_SET) {
3505 addReply(c,shared.wrongtypeerr);
3506 return;
3507 }
3508 if (dictFind(set->ptr,c->argv[2]))
3509 addReply(c,shared.cone);
3510 else
3511 addReply(c,shared.czero);
3512 }
3513 }
3514
3515 static void scardCommand(redisClient *c) {
3516 robj *o;
3517 dict *s;
3518
3519 o = lookupKeyRead(c->db,c->argv[1]);
3520 if (o == NULL) {
3521 addReply(c,shared.czero);
3522 return;
3523 } else {
3524 if (o->type != REDIS_SET) {
3525 addReply(c,shared.wrongtypeerr);
3526 } else {
3527 s = o->ptr;
3528 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3529 dictSize(s)));
3530 }
3531 }
3532 }
3533
3534 static void spopCommand(redisClient *c) {
3535 robj *set;
3536 dictEntry *de;
3537
3538 set = lookupKeyWrite(c->db,c->argv[1]);
3539 if (set == NULL) {
3540 addReply(c,shared.nullbulk);
3541 } else {
3542 if (set->type != REDIS_SET) {
3543 addReply(c,shared.wrongtypeerr);
3544 return;
3545 }
3546 de = dictGetRandomKey(set->ptr);
3547 if (de == NULL) {
3548 addReply(c,shared.nullbulk);
3549 } else {
3550 robj *ele = dictGetEntryKey(de);
3551
3552 addReplyBulkLen(c,ele);
3553 addReply(c,ele);
3554 addReply(c,shared.crlf);
3555 dictDelete(set->ptr,ele);
3556 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3557 server.dirty++;
3558 }
3559 }
3560 }
3561
3562 static void srandmemberCommand(redisClient *c) {
3563 robj *set;
3564 dictEntry *de;
3565
3566 set = lookupKeyRead(c->db,c->argv[1]);
3567 if (set == NULL) {
3568 addReply(c,shared.nullbulk);
3569 } else {
3570 if (set->type != REDIS_SET) {
3571 addReply(c,shared.wrongtypeerr);
3572 return;
3573 }
3574 de = dictGetRandomKey(set->ptr);
3575 if (de == NULL) {
3576 addReply(c,shared.nullbulk);
3577 } else {
3578 robj *ele = dictGetEntryKey(de);
3579
3580 addReplyBulkLen(c,ele);
3581 addReply(c,ele);
3582 addReply(c,shared.crlf);
3583 }
3584 }
3585 }
3586
3587 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3588 dict **d1 = (void*) s1, **d2 = (void*) s2;
3589
3590 return dictSize(*d1)-dictSize(*d2);
3591 }
3592
3593 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3594 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3595 dictIterator *di;
3596 dictEntry *de;
3597 robj *lenobj = NULL, *dstset = NULL;
3598 int j, cardinality = 0;
3599
3600 for (j = 0; j < setsnum; j++) {
3601 robj *setobj;
3602
3603 setobj = dstkey ?
3604 lookupKeyWrite(c->db,setskeys[j]) :
3605 lookupKeyRead(c->db,setskeys[j]);
3606 if (!setobj) {
3607 zfree(dv);
3608 if (dstkey) {
3609 deleteKey(c->db,dstkey);
3610 addReply(c,shared.ok);
3611 } else {
3612 addReply(c,shared.nullmultibulk);
3613 }
3614 return;
3615 }
3616 if (setobj->type != REDIS_SET) {
3617 zfree(dv);
3618 addReply(c,shared.wrongtypeerr);
3619 return;
3620 }
3621 dv[j] = setobj->ptr;
3622 }
3623 /* Sort sets from the smallest to largest, this will improve our
3624 * algorithm's performace */
3625 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3626
3627 /* The first thing we should output is the total number of elements...
3628 * since this is a multi-bulk write, but at this stage we don't know
3629 * the intersection set size, so we use a trick, append an empty object
3630 * to the output list and save the pointer to later modify it with the
3631 * right length */
3632 if (!dstkey) {
3633 lenobj = createObject(REDIS_STRING,NULL);
3634 addReply(c,lenobj);
3635 decrRefCount(lenobj);
3636 } else {
3637 /* If we have a target key where to store the resulting set
3638 * create this key with an empty set inside */
3639 dstset = createSetObject();
3640 }
3641
3642 /* Iterate all the elements of the first (smallest) set, and test
3643 * the element against all the other sets, if at least one set does
3644 * not include the element it is discarded */
3645 di = dictGetIterator(dv[0]);
3646
3647 while((de = dictNext(di)) != NULL) {
3648 robj *ele;
3649
3650 for (j = 1; j < setsnum; j++)
3651 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3652 if (j != setsnum)
3653 continue; /* at least one set does not contain the member */
3654 ele = dictGetEntryKey(de);
3655 if (!dstkey) {
3656 addReplyBulkLen(c,ele);
3657 addReply(c,ele);
3658 addReply(c,shared.crlf);
3659 cardinality++;
3660 } else {
3661 dictAdd(dstset->ptr,ele,NULL);
3662 incrRefCount(ele);
3663 }
3664 }
3665 dictReleaseIterator(di);
3666
3667 if (dstkey) {
3668 /* Store the resulting set into the target */
3669 deleteKey(c->db,dstkey);
3670 dictAdd(c->db->dict,dstkey,dstset);
3671 incrRefCount(dstkey);
3672 }
3673
3674 if (!dstkey) {
3675 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3676 } else {
3677 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3678 dictSize((dict*)dstset->ptr)));
3679 server.dirty++;
3680 }
3681 zfree(dv);
3682 }
3683
3684 static void sinterCommand(redisClient *c) {
3685 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3686 }
3687
3688 static void sinterstoreCommand(redisClient *c) {
3689 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3690 }
3691
3692 #define REDIS_OP_UNION 0
3693 #define REDIS_OP_DIFF 1
3694
3695 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3696 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3697 dictIterator *di;
3698 dictEntry *de;
3699 robj *dstset = NULL;
3700 int j, cardinality = 0;
3701
3702 for (j = 0; j < setsnum; j++) {
3703 robj *setobj;
3704
3705 setobj = dstkey ?
3706 lookupKeyWrite(c->db,setskeys[j]) :
3707 lookupKeyRead(c->db,setskeys[j]);
3708 if (!setobj) {
3709 dv[j] = NULL;
3710 continue;
3711 }
3712 if (setobj->type != REDIS_SET) {
3713 zfree(dv);
3714 addReply(c,shared.wrongtypeerr);
3715 return;
3716 }
3717 dv[j] = setobj->ptr;
3718 }
3719
3720 /* We need a temp set object to store our union. If the dstkey
3721 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3722 * this set object will be the resulting object to set into the target key*/
3723 dstset = createSetObject();
3724
3725 /* Iterate all the elements of all the sets, add every element a single
3726 * time to the result set */
3727 for (j = 0; j < setsnum; j++) {
3728 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3729 if (!dv[j]) continue; /* non existing keys are like empty sets */
3730
3731 di = dictGetIterator(dv[j]);
3732
3733 while((de = dictNext(di)) != NULL) {
3734 robj *ele;
3735
3736 /* dictAdd will not add the same element multiple times */
3737 ele = dictGetEntryKey(de);
3738 if (op == REDIS_OP_UNION || j == 0) {
3739 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3740 incrRefCount(ele);
3741 cardinality++;
3742 }
3743 } else if (op == REDIS_OP_DIFF) {
3744 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3745 cardinality--;
3746 }
3747 }
3748 }
3749 dictReleaseIterator(di);
3750
3751 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3752 }
3753
3754 /* Output the content of the resulting set, if not in STORE mode */
3755 if (!dstkey) {
3756 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3757 di = dictGetIterator(dstset->ptr);
3758 while((de = dictNext(di)) != NULL) {
3759 robj *ele;
3760
3761 ele = dictGetEntryKey(de);
3762 addReplyBulkLen(c,ele);
3763 addReply(c,ele);
3764 addReply(c,shared.crlf);
3765 }
3766 dictReleaseIterator(di);
3767 } else {
3768 /* If we have a target key where to store the resulting set
3769 * create this key with the result set inside */
3770 deleteKey(c->db,dstkey);
3771 dictAdd(c->db->dict,dstkey,dstset);
3772 incrRefCount(dstkey);
3773 }
3774
3775 /* Cleanup */
3776 if (!dstkey) {
3777 decrRefCount(dstset);
3778 } else {
3779 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3780 dictSize((dict*)dstset->ptr)));
3781 server.dirty++;
3782 }
3783 zfree(dv);
3784 }
3785
3786 static void sunionCommand(redisClient *c) {
3787 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3788 }
3789
3790 static void sunionstoreCommand(redisClient *c) {
3791 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3792 }
3793
3794 static void sdiffCommand(redisClient *c) {
3795 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3796 }
3797
3798 static void sdiffstoreCommand(redisClient *c) {
3799 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3800 }
3801
3802 /* ==================================== ZSets =============================== */
3803
3804 /* ZSETs are ordered sets using two data structures to hold the same elements
3805 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3806 * data structure.
3807 *
3808 * The elements are added to an hash table mapping Redis objects to scores.
3809 * At the same time the elements are added to a skip list mapping scores
3810 * to Redis objects (so objects are sorted by scores in this "view"). */
3811
3812 /* This skiplist implementation is almost a C translation of the original
3813 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3814 * Alternative to Balanced Trees", modified in three ways:
3815 * a) this implementation allows for repeated values.
3816 * b) the comparison is not just by key (our 'score') but by satellite data.
3817 * c) there is a back pointer, so it's a doubly linked list with the back
3818 * pointers being only at "level 1". This allows to traverse the list
3819 * from tail to head, useful for ZREVRANGE. */
3820
3821 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3822 zskiplistNode *zn = zmalloc(sizeof(*zn));
3823
3824 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3825 zn->score = score;
3826 zn->obj = obj;
3827 return zn;
3828 }
3829
3830 static zskiplist *zslCreate(void) {
3831 int j;
3832 zskiplist *zsl;
3833
3834 zsl = zmalloc(sizeof(*zsl));
3835 zsl->level = 1;
3836 zsl->length = 0;
3837 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3838 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3839 zsl->header->forward[j] = NULL;
3840 zsl->header->backward = NULL;
3841 zsl->tail = NULL;
3842 return zsl;
3843 }
3844
3845 static void zslFreeNode(zskiplistNode *node) {
3846 decrRefCount(node->obj);
3847 zfree(node->forward);
3848 zfree(node);
3849 }
3850
3851 static void zslFree(zskiplist *zsl) {
3852 zskiplistNode *node = zsl->header->forward[0], *next;
3853
3854 zfree(zsl->header->forward);
3855 zfree(zsl->header);
3856 while(node) {
3857 next = node->forward[0];
3858 zslFreeNode(node);
3859 node = next;
3860 }
3861 zfree(zsl);
3862 }
3863
3864 static int zslRandomLevel(void) {
3865 int level = 1;
3866 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3867 level += 1;
3868 return level;
3869 }
3870
3871 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3872 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3873 int i, level;
3874
3875 x = zsl->header;
3876 for (i = zsl->level-1; i >= 0; i--) {
3877 while (x->forward[i] &&
3878 (x->forward[i]->score < score ||
3879 (x->forward[i]->score == score &&
3880 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3881 x = x->forward[i];
3882 update[i] = x;
3883 }
3884 /* we assume the key is not already inside, since we allow duplicated
3885 * scores, and the re-insertion of score and redis object should never
3886 * happpen since the caller of zslInsert() should test in the hash table
3887 * if the element is already inside or not. */
3888 level = zslRandomLevel();
3889 if (level > zsl->level) {
3890 for (i = zsl->level; i < level; i++)
3891 update[i] = zsl->header;
3892 zsl->level = level;
3893 }
3894 x = zslCreateNode(level,score,obj);
3895 for (i = 0; i < level; i++) {
3896 x->forward[i] = update[i]->forward[i];
3897 update[i]->forward[i] = x;
3898 }
3899 x->backward = (update[0] == zsl->header) ? NULL : update[0];
3900 if (x->forward[0])
3901 x->forward[0]->backward = x;
3902 else
3903 zsl->tail = x;
3904 zsl->length++;
3905 }
3906
3907 /* Delete an element with matching score/object from the skiplist. */
3908 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
3909 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3910 int i;
3911
3912 x = zsl->header;
3913 for (i = zsl->level-1; i >= 0; i--) {
3914 while (x->forward[i] &&
3915 (x->forward[i]->score < score ||
3916 (x->forward[i]->score == score &&
3917 compareStringObjects(x->forward[i]->obj,obj) < 0)))
3918 x = x->forward[i];
3919 update[i] = x;
3920 }
3921 /* We may have multiple elements with the same score, what we need
3922 * is to find the element with both the right score and object. */
3923 x = x->forward[0];
3924 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
3925 for (i = 0; i < zsl->level; i++) {
3926 if (update[i]->forward[i] != x) break;
3927 update[i]->forward[i] = x->forward[i];
3928 }
3929 if (x->forward[0]) {
3930 x->forward[0]->backward = (x->backward == zsl->header) ?
3931 NULL : x->backward;
3932 } else {
3933 zsl->tail = x->backward;
3934 }
3935 zslFreeNode(x);
3936 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3937 zsl->level--;
3938 zsl->length--;
3939 return 1;
3940 } else {
3941 return 0; /* not found */
3942 }
3943 return 0; /* not found */
3944 }
3945
3946 /* Delete all the elements with score between min and max from the skiplist.
3947 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
3948 * Note that this function takes the reference to the hash table view of the
3949 * sorted set, in order to remove the elements from the hash table too. */
3950 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
3951 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3952 unsigned long removed = 0;
3953 int i;
3954
3955 x = zsl->header;
3956 for (i = zsl->level-1; i >= 0; i--) {
3957 while (x->forward[i] && x->forward[i]->score < min)
3958 x = x->forward[i];
3959 update[i] = x;
3960 }
3961 /* We may have multiple elements with the same score, what we need
3962 * is to find the element with both the right score and object. */
3963 x = x->forward[0];
3964 while (x && x->score <= max) {
3965 zskiplistNode *next;
3966
3967 for (i = 0; i < zsl->level; i++) {
3968 if (update[i]->forward[i] != x) break;
3969 update[i]->forward[i] = x->forward[i];
3970 }
3971 if (x->forward[0]) {
3972 x->forward[0]->backward = (x->backward == zsl->header) ?
3973 NULL : x->backward;
3974 } else {
3975 zsl->tail = x->backward;
3976 }
3977 next = x->forward[0];
3978 dictDelete(dict,x->obj);
3979 zslFreeNode(x);
3980 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
3981 zsl->level--;
3982 zsl->length--;
3983 removed++;
3984 x = next;
3985 }
3986 return removed; /* not found */
3987 }
3988
3989 /* Find the first node having a score equal or greater than the specified one.
3990 * Returns NULL if there is no match. */
3991 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
3992 zskiplistNode *x;
3993 int i;
3994
3995 x = zsl->header;
3996 for (i = zsl->level-1; i >= 0; i--) {
3997 while (x->forward[i] && x->forward[i]->score < score)
3998 x = x->forward[i];
3999 }
4000 /* We may have multiple elements with the same score, what we need
4001 * is to find the element with both the right score and object. */
4002 return x->forward[0];
4003 }
4004
4005 /* The actual Z-commands implementations */
4006
4007 static void zaddCommand(redisClient *c) {
4008 robj *zsetobj;
4009 zset *zs;
4010 double *score;
4011
4012 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4013 if (zsetobj == NULL) {
4014 zsetobj = createZsetObject();
4015 dictAdd(c->db->dict,c->argv[1],zsetobj);
4016 incrRefCount(c->argv[1]);
4017 } else {
4018 if (zsetobj->type != REDIS_ZSET) {
4019 addReply(c,shared.wrongtypeerr);
4020 return;
4021 }
4022 }
4023 score = zmalloc(sizeof(double));
4024 *score = strtod(c->argv[2]->ptr,NULL);
4025 zs = zsetobj->ptr;
4026 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4027 /* case 1: New element */
4028 incrRefCount(c->argv[3]); /* added to hash */
4029 zslInsert(zs->zsl,*score,c->argv[3]);
4030 incrRefCount(c->argv[3]); /* added to skiplist */
4031 server.dirty++;
4032 addReply(c,shared.cone);
4033 } else {
4034 dictEntry *de;
4035 double *oldscore;
4036
4037 /* case 2: Score update operation */
4038 de = dictFind(zs->dict,c->argv[3]);
4039 assert(de != NULL);
4040 oldscore = dictGetEntryVal(de);
4041 if (*score != *oldscore) {
4042 int deleted;
4043
4044 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4045 assert(deleted != 0);
4046 zslInsert(zs->zsl,*score,c->argv[3]);
4047 incrRefCount(c->argv[3]);
4048 dictReplace(zs->dict,c->argv[3],score);
4049 server.dirty++;
4050 } else {
4051 zfree(score);
4052 }
4053 addReply(c,shared.czero);
4054 }
4055 }
4056
4057 static void zremCommand(redisClient *c) {
4058 robj *zsetobj;
4059 zset *zs;
4060
4061 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4062 if (zsetobj == NULL) {
4063 addReply(c,shared.czero);
4064 } else {
4065 dictEntry *de;
4066 double *oldscore;
4067 int deleted;
4068
4069 if (zsetobj->type != REDIS_ZSET) {
4070 addReply(c,shared.wrongtypeerr);
4071 return;
4072 }
4073 zs = zsetobj->ptr;
4074 de = dictFind(zs->dict,c->argv[2]);
4075 if (de == NULL) {
4076 addReply(c,shared.czero);
4077 return;
4078 }
4079 /* Delete from the skiplist */
4080 oldscore = dictGetEntryVal(de);
4081 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4082 assert(deleted != 0);
4083
4084 /* Delete from the hash table */
4085 dictDelete(zs->dict,c->argv[2]);
4086 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4087 server.dirty++;
4088 addReply(c,shared.cone);
4089 }
4090 }
4091
4092 static void zremrangebyscoreCommand(redisClient *c) {
4093 double min = strtod(c->argv[2]->ptr,NULL);
4094 double max = strtod(c->argv[3]->ptr,NULL);
4095 robj *zsetobj;
4096 zset *zs;
4097
4098 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4099 if (zsetobj == NULL) {
4100 addReply(c,shared.czero);
4101 } else {
4102 long deleted;
4103
4104 if (zsetobj->type != REDIS_ZSET) {
4105 addReply(c,shared.wrongtypeerr);
4106 return;
4107 }
4108 zs = zsetobj->ptr;
4109 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4110 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4111 server.dirty += deleted;
4112 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4113 }
4114 }
4115
4116 static void zrangeGenericCommand(redisClient *c, int reverse) {
4117 robj *o;
4118 int start = atoi(c->argv[2]->ptr);
4119 int end = atoi(c->argv[3]->ptr);
4120
4121 o = lookupKeyRead(c->db,c->argv[1]);
4122 if (o == NULL) {
4123 addReply(c,shared.nullmultibulk);
4124 } else {
4125 if (o->type != REDIS_ZSET) {
4126 addReply(c,shared.wrongtypeerr);
4127 } else {
4128 zset *zsetobj = o->ptr;
4129 zskiplist *zsl = zsetobj->zsl;
4130 zskiplistNode *ln;
4131
4132 int llen = zsl->length;
4133 int rangelen, j;
4134 robj *ele;
4135
4136 /* convert negative indexes */
4137 if (start < 0) start = llen+start;
4138 if (end < 0) end = llen+end;
4139 if (start < 0) start = 0;
4140 if (end < 0) end = 0;
4141
4142 /* indexes sanity checks */
4143 if (start > end || start >= llen) {
4144 /* Out of range start or start > end result in empty list */
4145 addReply(c,shared.emptymultibulk);
4146 return;
4147 }
4148 if (end >= llen) end = llen-1;
4149 rangelen = (end-start)+1;
4150
4151 /* Return the result in form of a multi-bulk reply */
4152 if (reverse) {
4153 ln = zsl->tail;
4154 while (start--)
4155 ln = ln->backward;
4156 } else {
4157 ln = zsl->header->forward[0];
4158 while (start--)
4159 ln = ln->forward[0];
4160 }
4161
4162 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4163 for (j = 0; j < rangelen; j++) {
4164 ele = ln->obj;
4165 addReplyBulkLen(c,ele);
4166 addReply(c,ele);
4167 addReply(c,shared.crlf);
4168 ln = reverse ? ln->backward : ln->forward[0];
4169 }
4170 }
4171 }
4172 }
4173
4174 static void zrangeCommand(redisClient *c) {
4175 zrangeGenericCommand(c,0);
4176 }
4177
4178 static void zrevrangeCommand(redisClient *c) {
4179 zrangeGenericCommand(c,1);
4180 }
4181
4182 static void zrangebyscoreCommand(redisClient *c) {
4183 robj *o;
4184 double min = strtod(c->argv[2]->ptr,NULL);
4185 double max = strtod(c->argv[3]->ptr,NULL);
4186
4187 o = lookupKeyRead(c->db,c->argv[1]);
4188 if (o == NULL) {
4189 addReply(c,shared.nullmultibulk);
4190 } else {
4191 if (o->type != REDIS_ZSET) {
4192 addReply(c,shared.wrongtypeerr);
4193 } else {
4194 zset *zsetobj = o->ptr;
4195 zskiplist *zsl = zsetobj->zsl;
4196 zskiplistNode *ln;
4197 robj *ele, *lenobj;
4198 unsigned int rangelen = 0;
4199
4200 /* Get the first node with the score >= min */
4201 ln = zslFirstWithScore(zsl,min);
4202 if (ln == NULL) {
4203 /* No element matching the speciifed interval */
4204 addReply(c,shared.emptymultibulk);
4205 return;
4206 }
4207
4208 /* We don't know in advance how many matching elements there
4209 * are in the list, so we push this object that will represent
4210 * the multi-bulk length in the output buffer, and will "fix"
4211 * it later */
4212 lenobj = createObject(REDIS_STRING,NULL);
4213 addReply(c,lenobj);
4214
4215 while(ln && ln->score <= max) {
4216 ele = ln->obj;
4217 addReplyBulkLen(c,ele);
4218 addReply(c,ele);
4219 addReply(c,shared.crlf);
4220 ln = ln->forward[0];
4221 rangelen++;
4222 }
4223 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4224 }
4225 }
4226 }
4227
4228 static void zcardCommand(redisClient *c) {
4229 robj *o;
4230 zset *zs;
4231
4232 o = lookupKeyRead(c->db,c->argv[1]);
4233 if (o == NULL) {
4234 addReply(c,shared.czero);
4235 return;
4236 } else {
4237 if (o->type != REDIS_ZSET) {
4238 addReply(c,shared.wrongtypeerr);
4239 } else {
4240 zs = o->ptr;
4241 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4242 }
4243 }
4244 }
4245
4246 static void zscoreCommand(redisClient *c) {
4247 robj *o;
4248 zset *zs;
4249
4250 o = lookupKeyRead(c->db,c->argv[1]);
4251 if (o == NULL) {
4252 addReply(c,shared.czero);
4253 return;
4254 } else {
4255 if (o->type != REDIS_ZSET) {
4256 addReply(c,shared.wrongtypeerr);
4257 } else {
4258 dictEntry *de;
4259
4260 zs = o->ptr;
4261 de = dictFind(zs->dict,c->argv[2]);
4262 if (!de) {
4263 addReply(c,shared.nullbulk);
4264 } else {
4265 char buf[128];
4266 double *score = dictGetEntryVal(de);
4267
4268 snprintf(buf,sizeof(buf),"%.16g",*score);
4269 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4270 strlen(buf),buf));
4271 }
4272 }
4273 }
4274 }
4275
4276 /* ========================= Non type-specific commands ==================== */
4277
4278 static void flushdbCommand(redisClient *c) {
4279 server.dirty += dictSize(c->db->dict);
4280 dictEmpty(c->db->dict);
4281 dictEmpty(c->db->expires);
4282 addReply(c,shared.ok);
4283 }
4284
4285 static void flushallCommand(redisClient *c) {
4286 server.dirty += emptyDb();
4287 addReply(c,shared.ok);
4288 rdbSave(server.dbfilename);
4289 server.dirty++;
4290 }
4291
4292 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4293 redisSortOperation *so = zmalloc(sizeof(*so));
4294 so->type = type;
4295 so->pattern = pattern;
4296 return so;
4297 }
4298
4299 /* Return the value associated to the key with a name obtained
4300 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4301 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4302 char *p;
4303 sds spat, ssub;
4304 robj keyobj;
4305 int prefixlen, sublen, postfixlen;
4306 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4307 struct {
4308 long len;
4309 long free;
4310 char buf[REDIS_SORTKEY_MAX+1];
4311 } keyname;
4312
4313 if (subst->encoding == REDIS_ENCODING_RAW)
4314 incrRefCount(subst);
4315 else {
4316 subst = getDecodedObject(subst);
4317 }
4318
4319 spat = pattern->ptr;
4320 ssub = subst->ptr;
4321 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4322 p = strchr(spat,'*');
4323 if (!p) return NULL;
4324
4325 prefixlen = p-spat;
4326 sublen = sdslen(ssub);
4327 postfixlen = sdslen(spat)-(prefixlen+1);
4328 memcpy(keyname.buf,spat,prefixlen);
4329 memcpy(keyname.buf+prefixlen,ssub,sublen);
4330 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4331 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4332 keyname.len = prefixlen+sublen+postfixlen;
4333
4334 keyobj.refcount = 1;
4335 keyobj.type = REDIS_STRING;
4336 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4337
4338 decrRefCount(subst);
4339
4340 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4341 return lookupKeyRead(db,&keyobj);
4342 }
4343
4344 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4345 * the additional parameter is not standard but a BSD-specific we have to
4346 * pass sorting parameters via the global 'server' structure */
4347 static int sortCompare(const void *s1, const void *s2) {
4348 const redisSortObject *so1 = s1, *so2 = s2;
4349 int cmp;
4350
4351 if (!server.sort_alpha) {
4352 /* Numeric sorting. Here it's trivial as we precomputed scores */
4353 if (so1->u.score > so2->u.score) {
4354 cmp = 1;
4355 } else if (so1->u.score < so2->u.score) {
4356 cmp = -1;
4357 } else {
4358 cmp = 0;
4359 }
4360 } else {
4361 /* Alphanumeric sorting */
4362 if (server.sort_bypattern) {
4363 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4364 /* At least one compare object is NULL */
4365 if (so1->u.cmpobj == so2->u.cmpobj)
4366 cmp = 0;
4367 else if (so1->u.cmpobj == NULL)
4368 cmp = -1;
4369 else
4370 cmp = 1;
4371 } else {
4372 /* We have both the objects, use strcoll */
4373 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4374 }
4375 } else {
4376 /* Compare elements directly */
4377 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4378 so2->obj->encoding == REDIS_ENCODING_RAW) {
4379 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4380 } else {
4381 robj *dec1, *dec2;
4382
4383 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4384 so1->obj : getDecodedObject(so1->obj);
4385 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4386 so2->obj : getDecodedObject(so2->obj);
4387 cmp = strcoll(dec1->ptr,dec2->ptr);
4388 if (dec1 != so1->obj) decrRefCount(dec1);
4389 if (dec2 != so2->obj) decrRefCount(dec2);
4390 }
4391 }
4392 }
4393 return server.sort_desc ? -cmp : cmp;
4394 }
4395
4396 /* The SORT command is the most complex command in Redis. Warning: this code
4397 * is optimized for speed and a bit less for readability */
4398 static void sortCommand(redisClient *c) {
4399 list *operations;
4400 int outputlen = 0;
4401 int desc = 0, alpha = 0;
4402 int limit_start = 0, limit_count = -1, start, end;
4403 int j, dontsort = 0, vectorlen;
4404 int getop = 0; /* GET operation counter */
4405 robj *sortval, *sortby = NULL;
4406 redisSortObject *vector; /* Resulting vector to sort */
4407
4408 /* Lookup the key to sort. It must be of the right types */
4409 sortval = lookupKeyRead(c->db,c->argv[1]);
4410 if (sortval == NULL) {
4411 addReply(c,shared.nokeyerr);
4412 return;
4413 }
4414 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4415 addReply(c,shared.wrongtypeerr);
4416 return;
4417 }
4418
4419 /* Create a list of operations to perform for every sorted element.
4420 * Operations can be GET/DEL/INCR/DECR */
4421 operations = listCreate();
4422 listSetFreeMethod(operations,zfree);
4423 j = 2;
4424
4425 /* Now we need to protect sortval incrementing its count, in the future
4426 * SORT may have options able to overwrite/delete keys during the sorting
4427 * and the sorted key itself may get destroied */
4428 incrRefCount(sortval);
4429
4430 /* The SORT command has an SQL-alike syntax, parse it */
4431 while(j < c->argc) {
4432 int leftargs = c->argc-j-1;
4433 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4434 desc = 0;
4435 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4436 desc = 1;
4437 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4438 alpha = 1;
4439 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4440 limit_start = atoi(c->argv[j+1]->ptr);
4441 limit_count = atoi(c->argv[j+2]->ptr);
4442 j+=2;
4443 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4444 sortby = c->argv[j+1];
4445 /* If the BY pattern does not contain '*', i.e. it is constant,
4446 * we don't need to sort nor to lookup the weight keys. */
4447 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4448 j++;
4449 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4450 listAddNodeTail(operations,createSortOperation(
4451 REDIS_SORT_GET,c->argv[j+1]));
4452 getop++;
4453 j++;
4454 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4455 listAddNodeTail(operations,createSortOperation(
4456 REDIS_SORT_DEL,c->argv[j+1]));
4457 j++;
4458 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4459 listAddNodeTail(operations,createSortOperation(
4460 REDIS_SORT_INCR,c->argv[j+1]));
4461 j++;
4462 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4463 listAddNodeTail(operations,createSortOperation(
4464 REDIS_SORT_DECR,c->argv[j+1]));
4465 j++;
4466 } else {
4467 decrRefCount(sortval);
4468 listRelease(operations);
4469 addReply(c,shared.syntaxerr);
4470 return;
4471 }
4472 j++;
4473 }
4474
4475 /* Load the sorting vector with all the objects to sort */
4476 vectorlen = (sortval->type == REDIS_LIST) ?
4477 listLength((list*)sortval->ptr) :
4478 dictSize((dict*)sortval->ptr);
4479 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4480 j = 0;
4481 if (sortval->type == REDIS_LIST) {
4482 list *list = sortval->ptr;
4483 listNode *ln;
4484
4485 listRewind(list);
4486 while((ln = listYield(list))) {
4487 robj *ele = ln->value;
4488 vector[j].obj = ele;
4489 vector[j].u.score = 0;
4490 vector[j].u.cmpobj = NULL;
4491 j++;
4492 }
4493 } else {
4494 dict *set = sortval->ptr;
4495 dictIterator *di;
4496 dictEntry *setele;
4497
4498 di = dictGetIterator(set);
4499 while((setele = dictNext(di)) != NULL) {
4500 vector[j].obj = dictGetEntryKey(setele);
4501 vector[j].u.score = 0;
4502 vector[j].u.cmpobj = NULL;
4503 j++;
4504 }
4505 dictReleaseIterator(di);
4506 }
4507 assert(j == vectorlen);
4508
4509 /* Now it's time to load the right scores in the sorting vector */
4510 if (dontsort == 0) {
4511 for (j = 0; j < vectorlen; j++) {
4512 if (sortby) {
4513 robj *byval;
4514
4515 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4516 if (!byval || byval->type != REDIS_STRING) continue;
4517 if (alpha) {
4518 if (byval->encoding == REDIS_ENCODING_RAW) {
4519 vector[j].u.cmpobj = byval;
4520 incrRefCount(byval);
4521 } else {
4522 vector[j].u.cmpobj = getDecodedObject(byval);
4523 }
4524 } else {
4525 if (byval->encoding == REDIS_ENCODING_RAW) {
4526 vector[j].u.score = strtod(byval->ptr,NULL);
4527 } else {
4528 if (byval->encoding == REDIS_ENCODING_INT) {
4529 vector[j].u.score = (long)byval->ptr;
4530 } else
4531 assert(1 != 1);
4532 }
4533 }
4534 } else {
4535 if (!alpha) {
4536 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4537 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4538 else {
4539 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4540 vector[j].u.score = (long) vector[j].obj->ptr;
4541 else
4542 assert(1 != 1);
4543 }
4544 }
4545 }
4546 }
4547 }
4548
4549 /* We are ready to sort the vector... perform a bit of sanity check
4550 * on the LIMIT option too. We'll use a partial version of quicksort. */
4551 start = (limit_start < 0) ? 0 : limit_start;
4552 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4553 if (start >= vectorlen) {
4554 start = vectorlen-1;
4555 end = vectorlen-2;
4556 }
4557 if (end >= vectorlen) end = vectorlen-1;
4558
4559 if (dontsort == 0) {
4560 server.sort_desc = desc;
4561 server.sort_alpha = alpha;
4562 server.sort_bypattern = sortby ? 1 : 0;
4563 if (sortby && (start != 0 || end != vectorlen-1))
4564 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4565 else
4566 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4567 }
4568
4569 /* Send command output to the output buffer, performing the specified
4570 * GET/DEL/INCR/DECR operations if any. */
4571 outputlen = getop ? getop*(end-start+1) : end-start+1;
4572 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4573 for (j = start; j <= end; j++) {
4574 listNode *ln;
4575 if (!getop) {
4576 addReplyBulkLen(c,vector[j].obj);
4577 addReply(c,vector[j].obj);
4578 addReply(c,shared.crlf);
4579 }
4580 listRewind(operations);
4581 while((ln = listYield(operations))) {
4582 redisSortOperation *sop = ln->value;
4583 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4584 vector[j].obj);
4585
4586 if (sop->type == REDIS_SORT_GET) {
4587 if (!val || val->type != REDIS_STRING) {
4588 addReply(c,shared.nullbulk);
4589 } else {
4590 addReplyBulkLen(c,val);
4591 addReply(c,val);
4592 addReply(c,shared.crlf);
4593 }
4594 } else if (sop->type == REDIS_SORT_DEL) {
4595 /* TODO */
4596 }
4597 }
4598 }
4599
4600 /* Cleanup */
4601 decrRefCount(sortval);
4602 listRelease(operations);
4603 for (j = 0; j < vectorlen; j++) {
4604 if (sortby && alpha && vector[j].u.cmpobj)
4605 decrRefCount(vector[j].u.cmpobj);
4606 }
4607 zfree(vector);
4608 }
4609
4610 static void infoCommand(redisClient *c) {
4611 sds info;
4612 time_t uptime = time(NULL)-server.stat_starttime;
4613 int j;
4614
4615 info = sdscatprintf(sdsempty(),
4616 "redis_version:%s\r\n"
4617 "arch_bits:%s\r\n"
4618 "uptime_in_seconds:%d\r\n"
4619 "uptime_in_days:%d\r\n"
4620 "connected_clients:%d\r\n"
4621 "connected_slaves:%d\r\n"
4622 "used_memory:%zu\r\n"
4623 "changes_since_last_save:%lld\r\n"
4624 "bgsave_in_progress:%d\r\n"
4625 "last_save_time:%d\r\n"
4626 "total_connections_received:%lld\r\n"
4627 "total_commands_processed:%lld\r\n"
4628 "role:%s\r\n"
4629 ,REDIS_VERSION,
4630 (sizeof(long) == 8) ? "64" : "32",
4631 uptime,
4632 uptime/(3600*24),
4633 listLength(server.clients)-listLength(server.slaves),
4634 listLength(server.slaves),
4635 server.usedmemory,
4636 server.dirty,
4637 server.bgsaveinprogress,
4638 server.lastsave,
4639 server.stat_numconnections,
4640 server.stat_numcommands,
4641 server.masterhost == NULL ? "master" : "slave"
4642 );
4643 if (server.masterhost) {
4644 info = sdscatprintf(info,
4645 "master_host:%s\r\n"
4646 "master_port:%d\r\n"
4647 "master_link_status:%s\r\n"
4648 "master_last_io_seconds_ago:%d\r\n"
4649 ,server.masterhost,
4650 server.masterport,
4651 (server.replstate == REDIS_REPL_CONNECTED) ?
4652 "up" : "down",
4653 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4654 );
4655 }
4656 for (j = 0; j < server.dbnum; j++) {
4657 long long keys, vkeys;
4658
4659 keys = dictSize(server.db[j].dict);
4660 vkeys = dictSize(server.db[j].expires);
4661 if (keys || vkeys) {
4662 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4663 j, keys, vkeys);
4664 }
4665 }
4666 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4667 addReplySds(c,info);
4668 addReply(c,shared.crlf);
4669 }
4670
4671 static void monitorCommand(redisClient *c) {
4672 /* ignore MONITOR if aleady slave or in monitor mode */
4673 if (c->flags & REDIS_SLAVE) return;
4674
4675 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4676 c->slaveseldb = 0;
4677 listAddNodeTail(server.monitors,c);
4678 addReply(c,shared.ok);
4679 }
4680
4681 /* ================================= Expire ================================= */
4682 static int removeExpire(redisDb *db, robj *key) {
4683 if (dictDelete(db->expires,key) == DICT_OK) {
4684 return 1;
4685 } else {
4686 return 0;
4687 }
4688 }
4689
4690 static int setExpire(redisDb *db, robj *key, time_t when) {
4691 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4692 return 0;
4693 } else {
4694 incrRefCount(key);
4695 return 1;
4696 }
4697 }
4698
4699 /* Return the expire time of the specified key, or -1 if no expire
4700 * is associated with this key (i.e. the key is non volatile) */
4701 static time_t getExpire(redisDb *db, robj *key) {
4702 dictEntry *de;
4703
4704 /* No expire? return ASAP */
4705 if (dictSize(db->expires) == 0 ||
4706 (de = dictFind(db->expires,key)) == NULL) return -1;
4707
4708 return (time_t) dictGetEntryVal(de);
4709 }
4710
4711 static int expireIfNeeded(redisDb *db, robj *key) {
4712 time_t when;
4713 dictEntry *de;
4714
4715 /* No expire? return ASAP */
4716 if (dictSize(db->expires) == 0 ||
4717 (de = dictFind(db->expires,key)) == NULL) return 0;
4718
4719 /* Lookup the expire */
4720 when = (time_t) dictGetEntryVal(de);
4721 if (time(NULL) <= when) return 0;
4722
4723 /* Delete the key */
4724 dictDelete(db->expires,key);
4725 return dictDelete(db->dict,key) == DICT_OK;
4726 }
4727
4728 static int deleteIfVolatile(redisDb *db, robj *key) {
4729 dictEntry *de;
4730
4731 /* No expire? return ASAP */
4732 if (dictSize(db->expires) == 0 ||
4733 (de = dictFind(db->expires,key)) == NULL) return 0;
4734
4735 /* Delete the key */
4736 server.dirty++;
4737 dictDelete(db->expires,key);
4738 return dictDelete(db->dict,key) == DICT_OK;
4739 }
4740
4741 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4742 dictEntry *de;
4743
4744 de = dictFind(c->db->dict,key);
4745 if (de == NULL) {
4746 addReply(c,shared.czero);
4747 return;
4748 }
4749 if (seconds <= 0) {
4750 addReply(c, shared.czero);
4751 return;
4752 } else {
4753 time_t when = time(NULL)+seconds;
4754 if (setExpire(c->db,key,when)) {
4755 addReply(c,shared.cone);
4756 server.dirty++;
4757 } else {
4758 addReply(c,shared.czero);
4759 }
4760 return;
4761 }
4762 }
4763
4764 static void expireCommand(redisClient *c) {
4765 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4766 }
4767
4768 static void expireatCommand(redisClient *c) {
4769 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4770 }
4771
4772 static void ttlCommand(redisClient *c) {
4773 time_t expire;
4774 int ttl = -1;
4775
4776 expire = getExpire(c->db,c->argv[1]);
4777 if (expire != -1) {
4778 ttl = (int) (expire-time(NULL));
4779 if (ttl < 0) ttl = -1;
4780 }
4781 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4782 }
4783
4784 static void msetGenericCommand(redisClient *c, int nx) {
4785 int j;
4786
4787 if ((c->argc % 2) == 0) {
4788 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4789 return;
4790 }
4791 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4792 * set nothing at all if at least one already key exists. */
4793 if (nx) {
4794 for (j = 1; j < c->argc; j += 2) {
4795 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4796 addReply(c, shared.czero);
4797 return;
4798 }
4799 }
4800 }
4801
4802 for (j = 1; j < c->argc; j += 2) {
4803 int retval;
4804
4805 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4806 if (retval == DICT_ERR) {
4807 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4808 incrRefCount(c->argv[j+1]);
4809 } else {
4810 incrRefCount(c->argv[j]);
4811 incrRefCount(c->argv[j+1]);
4812 }
4813 removeExpire(c->db,c->argv[j]);
4814 }
4815 server.dirty += (c->argc-1)/2;
4816 addReply(c, nx ? shared.cone : shared.ok);
4817 }
4818
4819 static void msetCommand(redisClient *c) {
4820 msetGenericCommand(c,0);
4821 }
4822
4823 static void msetnxCommand(redisClient *c) {
4824 msetGenericCommand(c,1);
4825 }
4826
4827 /* =============================== Replication ============================= */
4828
4829 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4830 ssize_t nwritten, ret = size;
4831 time_t start = time(NULL);
4832
4833 timeout++;
4834 while(size) {
4835 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4836 nwritten = write(fd,ptr,size);
4837 if (nwritten == -1) return -1;
4838 ptr += nwritten;
4839 size -= nwritten;
4840 }
4841 if ((time(NULL)-start) > timeout) {
4842 errno = ETIMEDOUT;
4843 return -1;
4844 }
4845 }
4846 return ret;
4847 }
4848
4849 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4850 ssize_t nread, totread = 0;
4851 time_t start = time(NULL);
4852
4853 timeout++;
4854 while(size) {
4855 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4856 nread = read(fd,ptr,size);
4857 if (nread == -1) return -1;
4858 ptr += nread;
4859 size -= nread;
4860 totread += nread;
4861 }
4862 if ((time(NULL)-start) > timeout) {
4863 errno = ETIMEDOUT;
4864 return -1;
4865 }
4866 }
4867 return totread;
4868 }
4869
4870 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4871 ssize_t nread = 0;
4872
4873 size--;
4874 while(size) {
4875 char c;
4876
4877 if (syncRead(fd,&c,1,timeout) == -1) return -1;
4878 if (c == '\n') {
4879 *ptr = '\0';
4880 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
4881 return nread;
4882 } else {
4883 *ptr++ = c;
4884 *ptr = '\0';
4885 nread++;
4886 }
4887 }
4888 return nread;
4889 }
4890
4891 static void syncCommand(redisClient *c) {
4892 /* ignore SYNC if aleady slave or in monitor mode */
4893 if (c->flags & REDIS_SLAVE) return;
4894
4895 /* SYNC can't be issued when the server has pending data to send to
4896 * the client about already issued commands. We need a fresh reply
4897 * buffer registering the differences between the BGSAVE and the current
4898 * dataset, so that we can copy to other slaves if needed. */
4899 if (listLength(c->reply) != 0) {
4900 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
4901 return;
4902 }
4903
4904 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
4905 /* Here we need to check if there is a background saving operation
4906 * in progress, or if it is required to start one */
4907 if (server.bgsaveinprogress) {
4908 /* Ok a background save is in progress. Let's check if it is a good
4909 * one for replication, i.e. if there is another slave that is
4910 * registering differences since the server forked to save */
4911 redisClient *slave;
4912 listNode *ln;
4913
4914 listRewind(server.slaves);
4915 while((ln = listYield(server.slaves))) {
4916 slave = ln->value;
4917 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
4918 }
4919 if (ln) {
4920 /* Perfect, the server is already registering differences for
4921 * another slave. Set the right state, and copy the buffer. */
4922 listRelease(c->reply);
4923 c->reply = listDup(slave->reply);
4924 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4925 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
4926 } else {
4927 /* No way, we need to wait for the next BGSAVE in order to
4928 * register differences */
4929 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
4930 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
4931 }
4932 } else {
4933 /* Ok we don't have a BGSAVE in progress, let's start one */
4934 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
4935 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
4936 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
4937 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
4938 return;
4939 }
4940 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
4941 }
4942 c->repldbfd = -1;
4943 c->flags |= REDIS_SLAVE;
4944 c->slaveseldb = 0;
4945 listAddNodeTail(server.slaves,c);
4946 return;
4947 }
4948
4949 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
4950 redisClient *slave = privdata;
4951 REDIS_NOTUSED(el);
4952 REDIS_NOTUSED(mask);
4953 char buf[REDIS_IOBUF_LEN];
4954 ssize_t nwritten, buflen;
4955
4956 if (slave->repldboff == 0) {
4957 /* Write the bulk write count before to transfer the DB. In theory here
4958 * we don't know how much room there is in the output buffer of the
4959 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
4960 * operations) will never be smaller than the few bytes we need. */
4961 sds bulkcount;
4962
4963 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
4964 slave->repldbsize);
4965 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
4966 {
4967 sdsfree(bulkcount);
4968 freeClient(slave);
4969 return;
4970 }
4971 sdsfree(bulkcount);
4972 }
4973 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
4974 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
4975 if (buflen <= 0) {
4976 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
4977 (buflen == 0) ? "premature EOF" : strerror(errno));
4978 freeClient(slave);
4979 return;
4980 }
4981 if ((nwritten = write(fd,buf,buflen)) == -1) {
4982 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
4983 strerror(errno));
4984 freeClient(slave);
4985 return;
4986 }
4987 slave->repldboff += nwritten;
4988 if (slave->repldboff == slave->repldbsize) {
4989 close(slave->repldbfd);
4990 slave->repldbfd = -1;
4991 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
4992 slave->replstate = REDIS_REPL_ONLINE;
4993 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
4994 sendReplyToClient, slave, NULL) == AE_ERR) {
4995 freeClient(slave);
4996 return;
4997 }
4998 addReplySds(slave,sdsempty());
4999 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5000 }
5001 }
5002
5003 /* This function is called at the end of every backgrond saving.
5004 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5005 * otherwise REDIS_ERR is passed to the function.
5006 *
5007 * The goal of this function is to handle slaves waiting for a successful
5008 * background saving in order to perform non-blocking synchronization. */
5009 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5010 listNode *ln;
5011 int startbgsave = 0;
5012
5013 listRewind(server.slaves);
5014 while((ln = listYield(server.slaves))) {
5015 redisClient *slave = ln->value;
5016
5017 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5018 startbgsave = 1;
5019 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5020 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5021 struct redis_stat buf;
5022
5023 if (bgsaveerr != REDIS_OK) {
5024 freeClient(slave);
5025 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5026 continue;
5027 }
5028 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5029 redis_fstat(slave->repldbfd,&buf) == -1) {
5030 freeClient(slave);
5031 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5032 continue;
5033 }
5034 slave->repldboff = 0;
5035 slave->repldbsize = buf.st_size;
5036 slave->replstate = REDIS_REPL_SEND_BULK;
5037 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5038 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5039 freeClient(slave);
5040 continue;
5041 }
5042 }
5043 }
5044 if (startbgsave) {
5045 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5046 listRewind(server.slaves);
5047 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5048 while((ln = listYield(server.slaves))) {
5049 redisClient *slave = ln->value;
5050
5051 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5052 freeClient(slave);
5053 }
5054 }
5055 }
5056 }
5057
5058 static int syncWithMaster(void) {
5059 char buf[1024], tmpfile[256];
5060 int dumpsize;
5061 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5062 int dfd;
5063
5064 if (fd == -1) {
5065 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5066 strerror(errno));
5067 return REDIS_ERR;
5068 }
5069 /* Issue the SYNC command */
5070 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5071 close(fd);
5072 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5073 strerror(errno));
5074 return REDIS_ERR;
5075 }
5076 /* Read the bulk write count */
5077 if (syncReadLine(fd,buf,1024,3600) == -1) {
5078 close(fd);
5079 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5080 strerror(errno));
5081 return REDIS_ERR;
5082 }
5083 if (buf[0] != '$') {
5084 close(fd);
5085 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5086 return REDIS_ERR;
5087 }
5088 dumpsize = atoi(buf+1);
5089 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5090 /* Read the bulk write data on a temp file */
5091 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5092 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5093 if (dfd == -1) {
5094 close(fd);
5095 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5096 return REDIS_ERR;
5097 }
5098 while(dumpsize) {
5099 int nread, nwritten;
5100
5101 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5102 if (nread == -1) {
5103 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5104 strerror(errno));
5105 close(fd);
5106 close(dfd);
5107 return REDIS_ERR;
5108 }
5109 nwritten = write(dfd,buf,nread);
5110 if (nwritten == -1) {
5111 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5112 close(fd);
5113 close(dfd);
5114 return REDIS_ERR;
5115 }
5116 dumpsize -= nread;
5117 }
5118 close(dfd);
5119 if (rename(tmpfile,server.dbfilename) == -1) {
5120 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5121 unlink(tmpfile);
5122 close(fd);
5123 return REDIS_ERR;
5124 }
5125 emptyDb();
5126 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5127 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5128 close(fd);
5129 return REDIS_ERR;
5130 }
5131 server.master = createClient(fd);
5132 server.master->flags |= REDIS_MASTER;
5133 server.replstate = REDIS_REPL_CONNECTED;
5134 return REDIS_OK;
5135 }
5136
5137 static void slaveofCommand(redisClient *c) {
5138 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5139 !strcasecmp(c->argv[2]->ptr,"one")) {
5140 if (server.masterhost) {
5141 sdsfree(server.masterhost);
5142 server.masterhost = NULL;
5143 if (server.master) freeClient(server.master);
5144 server.replstate = REDIS_REPL_NONE;
5145 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5146 }
5147 } else {
5148 sdsfree(server.masterhost);
5149 server.masterhost = sdsdup(c->argv[1]->ptr);
5150 server.masterport = atoi(c->argv[2]->ptr);
5151 if (server.master) freeClient(server.master);
5152 server.replstate = REDIS_REPL_CONNECT;
5153 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5154 server.masterhost, server.masterport);
5155 }
5156 addReply(c,shared.ok);
5157 }
5158
5159 /* ============================ Maxmemory directive ======================== */
5160
5161 /* This function gets called when 'maxmemory' is set on the config file to limit
5162 * the max memory used by the server, and we are out of memory.
5163 * This function will try to, in order:
5164 *
5165 * - Free objects from the free list
5166 * - Try to remove keys with an EXPIRE set
5167 *
5168 * It is not possible to free enough memory to reach used-memory < maxmemory
5169 * the server will start refusing commands that will enlarge even more the
5170 * memory usage.
5171 */
5172 static void freeMemoryIfNeeded(void) {
5173 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5174 if (listLength(server.objfreelist)) {
5175 robj *o;
5176
5177 listNode *head = listFirst(server.objfreelist);
5178 o = listNodeValue(head);
5179 listDelNode(server.objfreelist,head);
5180 zfree(o);
5181 } else {
5182 int j, k, freed = 0;
5183
5184 for (j = 0; j < server.dbnum; j++) {
5185 int minttl = -1;
5186 robj *minkey = NULL;
5187 struct dictEntry *de;
5188
5189 if (dictSize(server.db[j].expires)) {
5190 freed = 1;
5191 /* From a sample of three keys drop the one nearest to
5192 * the natural expire */
5193 for (k = 0; k < 3; k++) {
5194 time_t t;
5195
5196 de = dictGetRandomKey(server.db[j].expires);
5197 t = (time_t) dictGetEntryVal(de);
5198 if (minttl == -1 || t < minttl) {
5199 minkey = dictGetEntryKey(de);
5200 minttl = t;
5201 }
5202 }
5203 deleteKey(server.db+j,minkey);
5204 }
5205 }
5206 if (!freed) return; /* nothing to free... */
5207 }
5208 }
5209 }
5210
5211 /* ================================= Debugging ============================== */
5212
5213 static void debugCommand(redisClient *c) {
5214 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5215 *((char*)-1) = 'x';
5216 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5217 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5218 robj *key, *val;
5219
5220 if (!de) {
5221 addReply(c,shared.nokeyerr);
5222 return;
5223 }
5224 key = dictGetEntryKey(de);
5225 val = dictGetEntryVal(de);
5226 addReplySds(c,sdscatprintf(sdsempty(),
5227 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5228 key, key->refcount, val, val->refcount, val->encoding));
5229 } else {
5230 addReplySds(c,sdsnew(
5231 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5232 }
5233 }
5234
5235 #ifdef HAVE_BACKTRACE
5236 static struct redisFunctionSym symsTable[] = {
5237 {"compareStringObjects", (unsigned long)compareStringObjects},
5238 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5239 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5240 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5241 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5242 {"freeStringObject", (unsigned long)freeStringObject},
5243 {"freeListObject", (unsigned long)freeListObject},
5244 {"freeSetObject", (unsigned long)freeSetObject},
5245 {"decrRefCount", (unsigned long)decrRefCount},
5246 {"createObject", (unsigned long)createObject},
5247 {"freeClient", (unsigned long)freeClient},
5248 {"rdbLoad", (unsigned long)rdbLoad},
5249 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5250 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5251 {"addReply", (unsigned long)addReply},
5252 {"addReplySds", (unsigned long)addReplySds},
5253 {"incrRefCount", (unsigned long)incrRefCount},
5254 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5255 {"createStringObject", (unsigned long)createStringObject},
5256 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5257 {"syncWithMaster", (unsigned long)syncWithMaster},
5258 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5259 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5260 {"getDecodedObject", (unsigned long)getDecodedObject},
5261 {"removeExpire", (unsigned long)removeExpire},
5262 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5263 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5264 {"deleteKey", (unsigned long)deleteKey},
5265 {"getExpire", (unsigned long)getExpire},
5266 {"setExpire", (unsigned long)setExpire},
5267 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5268 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5269 {"authCommand", (unsigned long)authCommand},
5270 {"pingCommand", (unsigned long)pingCommand},
5271 {"echoCommand", (unsigned long)echoCommand},
5272 {"setCommand", (unsigned long)setCommand},
5273 {"setnxCommand", (unsigned long)setnxCommand},
5274 {"getCommand", (unsigned long)getCommand},
5275 {"delCommand", (unsigned long)delCommand},
5276 {"existsCommand", (unsigned long)existsCommand},
5277 {"incrCommand", (unsigned long)incrCommand},
5278 {"decrCommand", (unsigned long)decrCommand},
5279 {"incrbyCommand", (unsigned long)incrbyCommand},
5280 {"decrbyCommand", (unsigned long)decrbyCommand},
5281 {"selectCommand", (unsigned long)selectCommand},
5282 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5283 {"keysCommand", (unsigned long)keysCommand},
5284 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5285 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5286 {"saveCommand", (unsigned long)saveCommand},
5287 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5288 {"shutdownCommand", (unsigned long)shutdownCommand},
5289 {"moveCommand", (unsigned long)moveCommand},
5290 {"renameCommand", (unsigned long)renameCommand},
5291 {"renamenxCommand", (unsigned long)renamenxCommand},
5292 {"lpushCommand", (unsigned long)lpushCommand},
5293 {"rpushCommand", (unsigned long)rpushCommand},
5294 {"lpopCommand", (unsigned long)lpopCommand},
5295 {"rpopCommand", (unsigned long)rpopCommand},
5296 {"llenCommand", (unsigned long)llenCommand},
5297 {"lindexCommand", (unsigned long)lindexCommand},
5298 {"lrangeCommand", (unsigned long)lrangeCommand},
5299 {"ltrimCommand", (unsigned long)ltrimCommand},
5300 {"typeCommand", (unsigned long)typeCommand},
5301 {"lsetCommand", (unsigned long)lsetCommand},
5302 {"saddCommand", (unsigned long)saddCommand},
5303 {"sremCommand", (unsigned long)sremCommand},
5304 {"smoveCommand", (unsigned long)smoveCommand},
5305 {"sismemberCommand", (unsigned long)sismemberCommand},
5306 {"scardCommand", (unsigned long)scardCommand},
5307 {"spopCommand", (unsigned long)spopCommand},
5308 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5309 {"sinterCommand", (unsigned long)sinterCommand},
5310 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5311 {"sunionCommand", (unsigned long)sunionCommand},
5312 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5313 {"sdiffCommand", (unsigned long)sdiffCommand},
5314 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5315 {"syncCommand", (unsigned long)syncCommand},
5316 {"flushdbCommand", (unsigned long)flushdbCommand},
5317 {"flushallCommand", (unsigned long)flushallCommand},
5318 {"sortCommand", (unsigned long)sortCommand},
5319 {"lremCommand", (unsigned long)lremCommand},
5320 {"infoCommand", (unsigned long)infoCommand},
5321 {"mgetCommand", (unsigned long)mgetCommand},
5322 {"monitorCommand", (unsigned long)monitorCommand},
5323 {"expireCommand", (unsigned long)expireCommand},
5324 {"expireatCommand", (unsigned long)expireatCommand},
5325 {"getsetCommand", (unsigned long)getsetCommand},
5326 {"ttlCommand", (unsigned long)ttlCommand},
5327 {"slaveofCommand", (unsigned long)slaveofCommand},
5328 {"debugCommand", (unsigned long)debugCommand},
5329 {"processCommand", (unsigned long)processCommand},
5330 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5331 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5332 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5333 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5334 {"msetCommand", (unsigned long)msetCommand},
5335 {"msetnxCommand", (unsigned long)msetnxCommand},
5336 {"zslCreateNode", (unsigned long)zslCreateNode},
5337 {"zslCreate", (unsigned long)zslCreate},
5338 {"zslFreeNode",(unsigned long)zslFreeNode},
5339 {"zslFree",(unsigned long)zslFree},
5340 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5341 {"zslInsert",(unsigned long)zslInsert},
5342 {"zslDelete",(unsigned long)zslDelete},
5343 {"createZsetObject",(unsigned long)createZsetObject},
5344 {"zaddCommand",(unsigned long)zaddCommand},
5345 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5346 {"zrangeCommand",(unsigned long)zrangeCommand},
5347 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5348 {"zremCommand",(unsigned long)zremCommand},
5349 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5350 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5351 {NULL,0}
5352 };
5353
5354 /* This function try to convert a pointer into a function name. It's used in
5355 * oreder to provide a backtrace under segmentation fault that's able to
5356 * display functions declared as static (otherwise the backtrace is useless). */
5357 static char *findFuncName(void *pointer, unsigned long *offset){
5358 int i, ret = -1;
5359 unsigned long off, minoff = 0;
5360
5361 /* Try to match against the Symbol with the smallest offset */
5362 for (i=0; symsTable[i].pointer; i++) {
5363 unsigned long lp = (unsigned long) pointer;
5364
5365 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5366 off=lp-symsTable[i].pointer;
5367 if (ret < 0 || off < minoff) {
5368 minoff=off;
5369 ret=i;
5370 }
5371 }
5372 }
5373 if (ret == -1) return NULL;
5374 *offset = minoff;
5375 return symsTable[ret].name;
5376 }
5377
5378 static void *getMcontextEip(ucontext_t *uc) {
5379 #if defined(__FreeBSD__)
5380 return (void*) uc->uc_mcontext.mc_eip;
5381 #elif defined(__dietlibc__)
5382 return (void*) uc->uc_mcontext.eip;
5383 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5384 return (void*) uc->uc_mcontext->__ss.__eip;
5385 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5386 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5387 return (void*) uc->uc_mcontext->__ss.__rip;
5388 #else
5389 return (void*) uc->uc_mcontext->__ss.__eip;
5390 #endif
5391 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5392 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5393 #elif defined(__ia64__) /* Linux IA64 */
5394 return (void*) uc->uc_mcontext.sc_ip;
5395 #else
5396 return NULL;
5397 #endif
5398 }
5399
5400 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5401 void *trace[100];
5402 char **messages = NULL;
5403 int i, trace_size = 0;
5404 unsigned long offset=0;
5405 time_t uptime = time(NULL)-server.stat_starttime;
5406 ucontext_t *uc = (ucontext_t*) secret;
5407 REDIS_NOTUSED(info);
5408
5409 redisLog(REDIS_WARNING,
5410 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5411 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5412 "redis_version:%s; "
5413 "uptime_in_seconds:%d; "
5414 "connected_clients:%d; "
5415 "connected_slaves:%d; "
5416 "used_memory:%zu; "
5417 "changes_since_last_save:%lld; "
5418 "bgsave_in_progress:%d; "
5419 "last_save_time:%d; "
5420 "total_connections_received:%lld; "
5421 "total_commands_processed:%lld; "
5422 "role:%s;"
5423 ,REDIS_VERSION,
5424 uptime,
5425 listLength(server.clients)-listLength(server.slaves),
5426 listLength(server.slaves),
5427 server.usedmemory,
5428 server.dirty,
5429 server.bgsaveinprogress,
5430 server.lastsave,
5431 server.stat_numconnections,
5432 server.stat_numcommands,
5433 server.masterhost == NULL ? "master" : "slave"
5434 ));
5435
5436 trace_size = backtrace(trace, 100);
5437 /* overwrite sigaction with caller's address */
5438 if (getMcontextEip(uc) != NULL) {
5439 trace[1] = getMcontextEip(uc);
5440 }
5441 messages = backtrace_symbols(trace, trace_size);
5442
5443 for (i=1; i<trace_size; ++i) {
5444 char *fn = findFuncName(trace[i], &offset), *p;
5445
5446 p = strchr(messages[i],'+');
5447 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5448 redisLog(REDIS_WARNING,"%s", messages[i]);
5449 } else {
5450 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5451 }
5452 }
5453 free(messages);
5454 exit(0);
5455 }
5456
5457 static void setupSigSegvAction(void) {
5458 struct sigaction act;
5459
5460 sigemptyset (&act.sa_mask);
5461 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5462 * is used. Otherwise, sa_handler is used */
5463 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5464 act.sa_sigaction = segvHandler;
5465 sigaction (SIGSEGV, &act, NULL);
5466 sigaction (SIGBUS, &act, NULL);
5467 sigaction (SIGFPE, &act, NULL);
5468 sigaction (SIGILL, &act, NULL);
5469 sigaction (SIGBUS, &act, NULL);
5470 return;
5471 }
5472 #else /* HAVE_BACKTRACE */
5473 static void setupSigSegvAction(void) {
5474 }
5475 #endif /* HAVE_BACKTRACE */
5476
5477 /* =================================== Main! ================================ */
5478
5479 #ifdef __linux__
5480 int linuxOvercommitMemoryValue(void) {
5481 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5482 char buf[64];
5483
5484 if (!fp) return -1;
5485 if (fgets(buf,64,fp) == NULL) {
5486 fclose(fp);
5487 return -1;
5488 }
5489 fclose(fp);
5490
5491 return atoi(buf);
5492 }
5493
5494 void linuxOvercommitMemoryWarning(void) {
5495 if (linuxOvercommitMemoryValue() == 0) {
5496 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5497 }
5498 }
5499 #endif /* __linux__ */
5500
5501 static void daemonize(void) {
5502 int fd;
5503 FILE *fp;
5504
5505 if (fork() != 0) exit(0); /* parent exits */
5506 setsid(); /* create a new session */
5507
5508 /* Every output goes to /dev/null. If Redis is daemonized but
5509 * the 'logfile' is set to 'stdout' in the configuration file
5510 * it will not log at all. */
5511 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5512 dup2(fd, STDIN_FILENO);
5513 dup2(fd, STDOUT_FILENO);
5514 dup2(fd, STDERR_FILENO);
5515 if (fd > STDERR_FILENO) close(fd);
5516 }
5517 /* Try to write the pid file */
5518 fp = fopen(server.pidfile,"w");
5519 if (fp) {
5520 fprintf(fp,"%d\n",getpid());
5521 fclose(fp);
5522 }
5523 }
5524
5525 int main(int argc, char **argv) {
5526 initServerConfig();
5527 if (argc == 2) {
5528 ResetServerSaveParams();
5529 loadServerConfig(argv[1]);
5530 } else if (argc > 2) {
5531 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5532 exit(1);
5533 } else {
5534 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5535 }
5536 initServer();
5537 if (server.daemonize) daemonize();
5538 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5539 #ifdef __linux__
5540 linuxOvercommitMemoryWarning();
5541 #endif
5542 if (rdbLoad(server.dbfilename) == REDIS_OK)
5543 redisLog(REDIS_NOTICE,"DB loaded from disk");
5544 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5545 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5546 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5547 aeMain(server.el);
5548 aeDeleteEventLoop(server.el);
5549 return 0;
5550 }