]> git.saurik.com Git - redis.git/blob - redis.c
da7937ad93c14fae6ac22aab0fab7ad26fc870a0
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <limits.h>
60 #include <math.h>
61
62 #if defined(__sun)
63 #include "solarisfixes.h"
64 #endif
65
66 #include "redis.h"
67 #include "ae.h" /* Event driven programming library */
68 #include "sds.h" /* Dynamic safe strings */
69 #include "anet.h" /* Networking the easy way */
70 #include "dict.h" /* Hash tables */
71 #include "adlist.h" /* Linked lists */
72 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
73 #include "lzf.h" /* LZF compression library */
74 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
75
76 /* Error codes */
77 #define REDIS_OK 0
78 #define REDIS_ERR -1
79
80 /* Static server configuration */
81 #define REDIS_SERVERPORT 6379 /* TCP port */
82 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
83 #define REDIS_IOBUF_LEN 1024
84 #define REDIS_LOADBUF_LEN 1024
85 #define REDIS_STATIC_ARGS 4
86 #define REDIS_DEFAULT_DBNUM 16
87 #define REDIS_CONFIGLINE_MAX 1024
88 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
89 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
90 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
91 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
92 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
93
94 /* Hash table parameters */
95 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
96
97 /* Command flags */
98 #define REDIS_CMD_BULK 1 /* Bulk write command */
99 #define REDIS_CMD_INLINE 2 /* Inline command */
100 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
101 this flags will return an error when the 'maxmemory' option is set in the
102 config file and the server is using more than maxmemory bytes of memory.
103 In short this commands are denied on low memory conditions. */
104 #define REDIS_CMD_DENYOOM 4
105
106 /* Object types */
107 #define REDIS_STRING 0
108 #define REDIS_LIST 1
109 #define REDIS_SET 2
110 #define REDIS_ZSET 3
111 #define REDIS_HASH 4
112
113 /* Objects encoding */
114 #define REDIS_ENCODING_RAW 0 /* Raw representation */
115 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
116
117 /* Object types only used for dumping to disk */
118 #define REDIS_EXPIRETIME 253
119 #define REDIS_SELECTDB 254
120 #define REDIS_EOF 255
121
122 /* Defines related to the dump file format. To store 32 bits lengths for short
123 * keys requires a lot of space, so we check the most significant 2 bits of
124 * the first byte to interpreter the length:
125 *
126 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
127 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
128 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
129 * 11|000000 this means: specially encoded object will follow. The six bits
130 * number specify the kind of object that follows.
131 * See the REDIS_RDB_ENC_* defines.
132 *
133 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
134 * values, will fit inside. */
135 #define REDIS_RDB_6BITLEN 0
136 #define REDIS_RDB_14BITLEN 1
137 #define REDIS_RDB_32BITLEN 2
138 #define REDIS_RDB_ENCVAL 3
139 #define REDIS_RDB_LENERR UINT_MAX
140
141 /* When a length of a string object stored on disk has the first two bits
142 * set, the remaining two bits specify a special encoding for the object
143 * accordingly to the following defines: */
144 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
145 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
146 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
147 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
148
149 /* Client flags */
150 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
151 #define REDIS_SLAVE 2 /* This client is a slave server */
152 #define REDIS_MASTER 4 /* This client is a master server */
153 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
154
155 /* Slave replication state - slave side */
156 #define REDIS_REPL_NONE 0 /* No active replication */
157 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
158 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
159
160 /* Slave replication state - from the point of view of master
161 * Note that in SEND_BULK and ONLINE state the slave receives new updates
162 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
163 * to start the next background saving in order to send updates to it. */
164 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
165 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
166 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
167 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
168
169 /* List related stuff */
170 #define REDIS_HEAD 0
171 #define REDIS_TAIL 1
172
173 /* Sort operations */
174 #define REDIS_SORT_GET 0
175 #define REDIS_SORT_DEL 1
176 #define REDIS_SORT_INCR 2
177 #define REDIS_SORT_DECR 3
178 #define REDIS_SORT_ASC 4
179 #define REDIS_SORT_DESC 5
180 #define REDIS_SORTKEY_MAX 1024
181
182 /* Log levels */
183 #define REDIS_DEBUG 0
184 #define REDIS_NOTICE 1
185 #define REDIS_WARNING 2
186
187 /* Anti-warning macro... */
188 #define REDIS_NOTUSED(V) ((void) V)
189
190 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
191 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
192
193 /* Append only defines */
194 #define APPENDFSYNC_NO 0
195 #define APPENDFSYNC_ALWAYS 1
196 #define APPENDFSYNC_EVERYSEC 2
197
198 /*================================= Data types ============================== */
199
200 /* A redis object, that is a type able to hold a string / list / set */
201 typedef struct redisObject {
202 void *ptr;
203 unsigned char type;
204 unsigned char encoding;
205 unsigned char notused[2];
206 int refcount;
207 } robj;
208
209 typedef struct redisDb {
210 dict *dict;
211 dict *expires;
212 int id;
213 } redisDb;
214
215 /* With multiplexing we need to take per-clinet state.
216 * Clients are taken in a liked list. */
217 typedef struct redisClient {
218 int fd;
219 redisDb *db;
220 int dictid;
221 sds querybuf;
222 robj **argv, **mbargv;
223 int argc, mbargc;
224 int bulklen; /* bulk read len. -1 if not in bulk read mode */
225 int multibulk; /* multi bulk command format active */
226 list *reply;
227 int sentlen;
228 time_t lastinteraction; /* time of the last interaction, used for timeout */
229 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
230 int slaveseldb; /* slave selected db, if this client is a slave */
231 int authenticated; /* when requirepass is non-NULL */
232 int replstate; /* replication state if this is a slave */
233 int repldbfd; /* replication DB file descriptor */
234 long repldboff; /* replication DB file offset */
235 off_t repldbsize; /* replication DB file size */
236 } redisClient;
237
238 struct saveparam {
239 time_t seconds;
240 int changes;
241 };
242
243 /* Global server state structure */
244 struct redisServer {
245 int port;
246 int fd;
247 redisDb *db;
248 dict *sharingpool;
249 unsigned int sharingpoolsize;
250 long long dirty; /* changes to DB from the last save */
251 list *clients;
252 list *slaves, *monitors;
253 char neterr[ANET_ERR_LEN];
254 aeEventLoop *el;
255 int cronloops; /* number of times the cron function run */
256 list *objfreelist; /* A list of freed objects to avoid malloc() */
257 time_t lastsave; /* Unix time of last save succeeede */
258 size_t usedmemory; /* Used memory in megabytes */
259 /* Fields used only for stats */
260 time_t stat_starttime; /* server start time */
261 long long stat_numcommands; /* number of processed commands */
262 long long stat_numconnections; /* number of connections received */
263 /* Configuration */
264 int verbosity;
265 int glueoutputbuf;
266 int maxidletime;
267 int dbnum;
268 int daemonize;
269 int appendonly;
270 int appendfsync;
271 time_t lastfsync;
272 int appendfd;
273 int appendseldb;
274 char *pidfile;
275 int bgsaveinprogress;
276 pid_t bgsavechildpid;
277 struct saveparam *saveparams;
278 int saveparamslen;
279 char *logfile;
280 char *bindaddr;
281 char *dbfilename;
282 char *appendfilename;
283 char *requirepass;
284 int shareobjects;
285 /* Replication related */
286 int isslave;
287 char *masterhost;
288 int masterport;
289 redisClient *master; /* client that is master for this slave */
290 int replstate;
291 unsigned int maxclients;
292 unsigned long maxmemory;
293 /* Sort parameters - qsort_r() is only available under BSD so we
294 * have to take this state global, in order to pass it to sortCompare() */
295 int sort_desc;
296 int sort_alpha;
297 int sort_bypattern;
298 };
299
300 typedef void redisCommandProc(redisClient *c);
301 struct redisCommand {
302 char *name;
303 redisCommandProc *proc;
304 int arity;
305 int flags;
306 };
307
308 struct redisFunctionSym {
309 char *name;
310 unsigned long pointer;
311 };
312
313 typedef struct _redisSortObject {
314 robj *obj;
315 union {
316 double score;
317 robj *cmpobj;
318 } u;
319 } redisSortObject;
320
321 typedef struct _redisSortOperation {
322 int type;
323 robj *pattern;
324 } redisSortOperation;
325
326 /* ZSETs use a specialized version of Skiplists */
327
328 typedef struct zskiplistNode {
329 struct zskiplistNode **forward;
330 struct zskiplistNode *backward;
331 double score;
332 robj *obj;
333 } zskiplistNode;
334
335 typedef struct zskiplist {
336 struct zskiplistNode *header, *tail;
337 unsigned long length;
338 int level;
339 } zskiplist;
340
341 typedef struct zset {
342 dict *dict;
343 zskiplist *zsl;
344 } zset;
345
346 /* Our shared "common" objects */
347
348 struct sharedObjectsStruct {
349 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
350 *colon, *nullbulk, *nullmultibulk,
351 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
352 *outofrangeerr, *plus,
353 *select0, *select1, *select2, *select3, *select4,
354 *select5, *select6, *select7, *select8, *select9;
355 } shared;
356
357 /* Global vars that are actally used as constants. The following double
358 * values are used for double on-disk serialization, and are initialized
359 * at runtime to avoid strange compiler optimizations. */
360
361 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
362
363 /*================================ Prototypes =============================== */
364
365 static void freeStringObject(robj *o);
366 static void freeListObject(robj *o);
367 static void freeSetObject(robj *o);
368 static void decrRefCount(void *o);
369 static robj *createObject(int type, void *ptr);
370 static void freeClient(redisClient *c);
371 static int rdbLoad(char *filename);
372 static void addReply(redisClient *c, robj *obj);
373 static void addReplySds(redisClient *c, sds s);
374 static void incrRefCount(robj *o);
375 static int rdbSaveBackground(char *filename);
376 static robj *createStringObject(char *ptr, size_t len);
377 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
378 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
379 static int syncWithMaster(void);
380 static robj *tryObjectSharing(robj *o);
381 static int tryObjectEncoding(robj *o);
382 static robj *getDecodedObject(const robj *o);
383 static int removeExpire(redisDb *db, robj *key);
384 static int expireIfNeeded(redisDb *db, robj *key);
385 static int deleteIfVolatile(redisDb *db, robj *key);
386 static int deleteKey(redisDb *db, robj *key);
387 static time_t getExpire(redisDb *db, robj *key);
388 static int setExpire(redisDb *db, robj *key, time_t when);
389 static void updateSlavesWaitingBgsave(int bgsaveerr);
390 static void freeMemoryIfNeeded(void);
391 static int processCommand(redisClient *c);
392 static void setupSigSegvAction(void);
393 static void rdbRemoveTempFile(pid_t childpid);
394 static size_t stringObjectLen(robj *o);
395 static void processInputBuffer(redisClient *c);
396 static zskiplist *zslCreate(void);
397 static void zslFree(zskiplist *zsl);
398 static void zslInsert(zskiplist *zsl, double score, robj *obj);
399
400 static void authCommand(redisClient *c);
401 static void pingCommand(redisClient *c);
402 static void echoCommand(redisClient *c);
403 static void setCommand(redisClient *c);
404 static void setnxCommand(redisClient *c);
405 static void getCommand(redisClient *c);
406 static void delCommand(redisClient *c);
407 static void existsCommand(redisClient *c);
408 static void incrCommand(redisClient *c);
409 static void decrCommand(redisClient *c);
410 static void incrbyCommand(redisClient *c);
411 static void decrbyCommand(redisClient *c);
412 static void selectCommand(redisClient *c);
413 static void randomkeyCommand(redisClient *c);
414 static void keysCommand(redisClient *c);
415 static void dbsizeCommand(redisClient *c);
416 static void lastsaveCommand(redisClient *c);
417 static void saveCommand(redisClient *c);
418 static void bgsaveCommand(redisClient *c);
419 static void shutdownCommand(redisClient *c);
420 static void moveCommand(redisClient *c);
421 static void renameCommand(redisClient *c);
422 static void renamenxCommand(redisClient *c);
423 static void lpushCommand(redisClient *c);
424 static void rpushCommand(redisClient *c);
425 static void lpopCommand(redisClient *c);
426 static void rpopCommand(redisClient *c);
427 static void llenCommand(redisClient *c);
428 static void lindexCommand(redisClient *c);
429 static void lrangeCommand(redisClient *c);
430 static void ltrimCommand(redisClient *c);
431 static void typeCommand(redisClient *c);
432 static void lsetCommand(redisClient *c);
433 static void saddCommand(redisClient *c);
434 static void sremCommand(redisClient *c);
435 static void smoveCommand(redisClient *c);
436 static void sismemberCommand(redisClient *c);
437 static void scardCommand(redisClient *c);
438 static void spopCommand(redisClient *c);
439 static void srandmemberCommand(redisClient *c);
440 static void sinterCommand(redisClient *c);
441 static void sinterstoreCommand(redisClient *c);
442 static void sunionCommand(redisClient *c);
443 static void sunionstoreCommand(redisClient *c);
444 static void sdiffCommand(redisClient *c);
445 static void sdiffstoreCommand(redisClient *c);
446 static void syncCommand(redisClient *c);
447 static void flushdbCommand(redisClient *c);
448 static void flushallCommand(redisClient *c);
449 static void sortCommand(redisClient *c);
450 static void lremCommand(redisClient *c);
451 static void infoCommand(redisClient *c);
452 static void mgetCommand(redisClient *c);
453 static void monitorCommand(redisClient *c);
454 static void expireCommand(redisClient *c);
455 static void expireatCommand(redisClient *c);
456 static void getsetCommand(redisClient *c);
457 static void ttlCommand(redisClient *c);
458 static void slaveofCommand(redisClient *c);
459 static void debugCommand(redisClient *c);
460 static void msetCommand(redisClient *c);
461 static void msetnxCommand(redisClient *c);
462 static void zaddCommand(redisClient *c);
463 static void zrangeCommand(redisClient *c);
464 static void zrangebyscoreCommand(redisClient *c);
465 static void zrevrangeCommand(redisClient *c);
466 static void zcardCommand(redisClient *c);
467 static void zremCommand(redisClient *c);
468 static void zscoreCommand(redisClient *c);
469 static void zremrangebyscoreCommand(redisClient *c);
470
471 /*================================= Globals ================================= */
472
473 /* Global vars */
474 static struct redisServer server; /* server global state */
475 static struct redisCommand cmdTable[] = {
476 {"get",getCommand,2,REDIS_CMD_INLINE},
477 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
478 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
479 {"del",delCommand,-2,REDIS_CMD_INLINE},
480 {"exists",existsCommand,2,REDIS_CMD_INLINE},
481 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
482 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
483 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
484 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
486 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
487 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
488 {"llen",llenCommand,2,REDIS_CMD_INLINE},
489 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
490 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
492 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
493 {"lrem",lremCommand,4,REDIS_CMD_BULK},
494 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"srem",sremCommand,3,REDIS_CMD_BULK},
496 {"smove",smoveCommand,4,REDIS_CMD_BULK},
497 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
498 {"scard",scardCommand,2,REDIS_CMD_INLINE},
499 {"spop",spopCommand,2,REDIS_CMD_INLINE},
500 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
501 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
502 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
503 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
504 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
505 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
506 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
507 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
508 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
509 {"zrem",zremCommand,3,REDIS_CMD_BULK},
510 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
511 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
512 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
513 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
514 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
515 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
519 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
522 {"select",selectCommand,2,REDIS_CMD_INLINE},
523 {"move",moveCommand,3,REDIS_CMD_INLINE},
524 {"rename",renameCommand,3,REDIS_CMD_INLINE},
525 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
526 {"expire",expireCommand,3,REDIS_CMD_INLINE},
527 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
528 {"keys",keysCommand,2,REDIS_CMD_INLINE},
529 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
530 {"auth",authCommand,2,REDIS_CMD_INLINE},
531 {"ping",pingCommand,1,REDIS_CMD_INLINE},
532 {"echo",echoCommand,2,REDIS_CMD_BULK},
533 {"save",saveCommand,1,REDIS_CMD_INLINE},
534 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
535 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
536 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
537 {"type",typeCommand,2,REDIS_CMD_INLINE},
538 {"sync",syncCommand,1,REDIS_CMD_INLINE},
539 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
540 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
541 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
542 {"info",infoCommand,1,REDIS_CMD_INLINE},
543 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
544 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
545 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
546 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
547 {NULL,NULL,0,0}
548 };
549 /*============================ Utility functions ============================ */
550
551 /* Glob-style pattern matching. */
552 int stringmatchlen(const char *pattern, int patternLen,
553 const char *string, int stringLen, int nocase)
554 {
555 while(patternLen) {
556 switch(pattern[0]) {
557 case '*':
558 while (pattern[1] == '*') {
559 pattern++;
560 patternLen--;
561 }
562 if (patternLen == 1)
563 return 1; /* match */
564 while(stringLen) {
565 if (stringmatchlen(pattern+1, patternLen-1,
566 string, stringLen, nocase))
567 return 1; /* match */
568 string++;
569 stringLen--;
570 }
571 return 0; /* no match */
572 break;
573 case '?':
574 if (stringLen == 0)
575 return 0; /* no match */
576 string++;
577 stringLen--;
578 break;
579 case '[':
580 {
581 int not, match;
582
583 pattern++;
584 patternLen--;
585 not = pattern[0] == '^';
586 if (not) {
587 pattern++;
588 patternLen--;
589 }
590 match = 0;
591 while(1) {
592 if (pattern[0] == '\\') {
593 pattern++;
594 patternLen--;
595 if (pattern[0] == string[0])
596 match = 1;
597 } else if (pattern[0] == ']') {
598 break;
599 } else if (patternLen == 0) {
600 pattern--;
601 patternLen++;
602 break;
603 } else if (pattern[1] == '-' && patternLen >= 3) {
604 int start = pattern[0];
605 int end = pattern[2];
606 int c = string[0];
607 if (start > end) {
608 int t = start;
609 start = end;
610 end = t;
611 }
612 if (nocase) {
613 start = tolower(start);
614 end = tolower(end);
615 c = tolower(c);
616 }
617 pattern += 2;
618 patternLen -= 2;
619 if (c >= start && c <= end)
620 match = 1;
621 } else {
622 if (!nocase) {
623 if (pattern[0] == string[0])
624 match = 1;
625 } else {
626 if (tolower((int)pattern[0]) == tolower((int)string[0]))
627 match = 1;
628 }
629 }
630 pattern++;
631 patternLen--;
632 }
633 if (not)
634 match = !match;
635 if (!match)
636 return 0; /* no match */
637 string++;
638 stringLen--;
639 break;
640 }
641 case '\\':
642 if (patternLen >= 2) {
643 pattern++;
644 patternLen--;
645 }
646 /* fall through */
647 default:
648 if (!nocase) {
649 if (pattern[0] != string[0])
650 return 0; /* no match */
651 } else {
652 if (tolower((int)pattern[0]) != tolower((int)string[0]))
653 return 0; /* no match */
654 }
655 string++;
656 stringLen--;
657 break;
658 }
659 pattern++;
660 patternLen--;
661 if (stringLen == 0) {
662 while(*pattern == '*') {
663 pattern++;
664 patternLen--;
665 }
666 break;
667 }
668 }
669 if (patternLen == 0 && stringLen == 0)
670 return 1;
671 return 0;
672 }
673
674 static void redisLog(int level, const char *fmt, ...) {
675 va_list ap;
676 FILE *fp;
677
678 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
679 if (!fp) return;
680
681 va_start(ap, fmt);
682 if (level >= server.verbosity) {
683 char *c = ".-*";
684 char buf[64];
685 time_t now;
686
687 now = time(NULL);
688 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
689 fprintf(fp,"%s %c ",buf,c[level]);
690 vfprintf(fp, fmt, ap);
691 fprintf(fp,"\n");
692 fflush(fp);
693 }
694 va_end(ap);
695
696 if (server.logfile) fclose(fp);
697 }
698
699 /*====================== Hash table type implementation ==================== */
700
701 /* This is an hash table type that uses the SDS dynamic strings libary as
702 * keys and radis objects as values (objects can hold SDS strings,
703 * lists, sets). */
704
705 static void dictVanillaFree(void *privdata, void *val)
706 {
707 DICT_NOTUSED(privdata);
708 zfree(val);
709 }
710
711 static int sdsDictKeyCompare(void *privdata, const void *key1,
712 const void *key2)
713 {
714 int l1,l2;
715 DICT_NOTUSED(privdata);
716
717 l1 = sdslen((sds)key1);
718 l2 = sdslen((sds)key2);
719 if (l1 != l2) return 0;
720 return memcmp(key1, key2, l1) == 0;
721 }
722
723 static void dictRedisObjectDestructor(void *privdata, void *val)
724 {
725 DICT_NOTUSED(privdata);
726
727 decrRefCount(val);
728 }
729
730 static int dictObjKeyCompare(void *privdata, const void *key1,
731 const void *key2)
732 {
733 const robj *o1 = key1, *o2 = key2;
734 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
735 }
736
737 static unsigned int dictObjHash(const void *key) {
738 const robj *o = key;
739 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
740 }
741
742 static int dictEncObjKeyCompare(void *privdata, const void *key1,
743 const void *key2)
744 {
745 const robj *o1 = key1, *o2 = key2;
746
747 if (o1->encoding == REDIS_ENCODING_RAW &&
748 o2->encoding == REDIS_ENCODING_RAW)
749 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
750 else {
751 robj *dec1, *dec2;
752 int cmp;
753
754 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
755 getDecodedObject(o1) : (robj*)o1;
756 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
757 getDecodedObject(o2) : (robj*)o2;
758 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
759 if (dec1 != o1) decrRefCount(dec1);
760 if (dec2 != o2) decrRefCount(dec2);
761 return cmp;
762 }
763 }
764
765 static unsigned int dictEncObjHash(const void *key) {
766 const robj *o = key;
767
768 if (o->encoding == REDIS_ENCODING_RAW)
769 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
770 else {
771 robj *dec = getDecodedObject(o);
772 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
773 decrRefCount(dec);
774 return hash;
775 }
776 }
777
778 static dictType setDictType = {
779 dictEncObjHash, /* hash function */
780 NULL, /* key dup */
781 NULL, /* val dup */
782 dictEncObjKeyCompare, /* key compare */
783 dictRedisObjectDestructor, /* key destructor */
784 NULL /* val destructor */
785 };
786
787 static dictType zsetDictType = {
788 dictEncObjHash, /* hash function */
789 NULL, /* key dup */
790 NULL, /* val dup */
791 dictEncObjKeyCompare, /* key compare */
792 dictRedisObjectDestructor, /* key destructor */
793 dictVanillaFree /* val destructor */
794 };
795
796 static dictType hashDictType = {
797 dictObjHash, /* hash function */
798 NULL, /* key dup */
799 NULL, /* val dup */
800 dictObjKeyCompare, /* key compare */
801 dictRedisObjectDestructor, /* key destructor */
802 dictRedisObjectDestructor /* val destructor */
803 };
804
805 /* ========================= Random utility functions ======================= */
806
807 /* Redis generally does not try to recover from out of memory conditions
808 * when allocating objects or strings, it is not clear if it will be possible
809 * to report this condition to the client since the networking layer itself
810 * is based on heap allocation for send buffers, so we simply abort.
811 * At least the code will be simpler to read... */
812 static void oom(const char *msg) {
813 fprintf(stderr, "%s: Out of memory\n",msg);
814 fflush(stderr);
815 sleep(1);
816 abort();
817 }
818
819 /* ====================== Redis server networking stuff ===================== */
820 static void closeTimedoutClients(void) {
821 redisClient *c;
822 listNode *ln;
823 time_t now = time(NULL);
824
825 listRewind(server.clients);
826 while ((ln = listYield(server.clients)) != NULL) {
827 c = listNodeValue(ln);
828 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
829 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
830 (now - c->lastinteraction > server.maxidletime)) {
831 redisLog(REDIS_DEBUG,"Closing idle client");
832 freeClient(c);
833 }
834 }
835 }
836
837 static int htNeedsResize(dict *dict) {
838 long long size, used;
839
840 size = dictSlots(dict);
841 used = dictSize(dict);
842 return (size && used && size > DICT_HT_INITIAL_SIZE &&
843 (used*100/size < REDIS_HT_MINFILL));
844 }
845
846 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
847 * we resize the hash table to save memory */
848 static void tryResizeHashTables(void) {
849 int j;
850
851 for (j = 0; j < server.dbnum; j++) {
852 if (htNeedsResize(server.db[j].dict)) {
853 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
854 dictResize(server.db[j].dict);
855 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
856 }
857 if (htNeedsResize(server.db[j].expires))
858 dictResize(server.db[j].expires);
859 }
860 }
861
862 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
863 int j, loops = server.cronloops++;
864 REDIS_NOTUSED(eventLoop);
865 REDIS_NOTUSED(id);
866 REDIS_NOTUSED(clientData);
867
868 /* Update the global state with the amount of used memory */
869 server.usedmemory = zmalloc_used_memory();
870
871 /* Show some info about non-empty databases */
872 for (j = 0; j < server.dbnum; j++) {
873 long long size, used, vkeys;
874
875 size = dictSlots(server.db[j].dict);
876 used = dictSize(server.db[j].dict);
877 vkeys = dictSize(server.db[j].expires);
878 if (!(loops % 5) && (used || vkeys)) {
879 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
880 /* dictPrintStats(server.dict); */
881 }
882 }
883
884 /* We don't want to resize the hash tables while a bacground saving
885 * is in progress: the saving child is created using fork() that is
886 * implemented with a copy-on-write semantic in most modern systems, so
887 * if we resize the HT while there is the saving child at work actually
888 * a lot of memory movements in the parent will cause a lot of pages
889 * copied. */
890 if (!server.bgsaveinprogress) tryResizeHashTables();
891
892 /* Show information about connected clients */
893 if (!(loops % 5)) {
894 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
895 listLength(server.clients)-listLength(server.slaves),
896 listLength(server.slaves),
897 server.usedmemory,
898 dictSize(server.sharingpool));
899 }
900
901 /* Close connections of timedout clients */
902 if (server.maxidletime && !(loops % 10))
903 closeTimedoutClients();
904
905 /* Check if a background saving in progress terminated */
906 if (server.bgsaveinprogress) {
907 int statloc;
908 if (wait4(-1,&statloc,WNOHANG,NULL)) {
909 int exitcode = WEXITSTATUS(statloc);
910 int bysignal = WIFSIGNALED(statloc);
911
912 if (!bysignal && exitcode == 0) {
913 redisLog(REDIS_NOTICE,
914 "Background saving terminated with success");
915 server.dirty = 0;
916 server.lastsave = time(NULL);
917 } else if (!bysignal && exitcode != 0) {
918 redisLog(REDIS_WARNING, "Background saving error");
919 } else {
920 redisLog(REDIS_WARNING,
921 "Background saving terminated by signal");
922 rdbRemoveTempFile(server.bgsavechildpid);
923 }
924 server.bgsaveinprogress = 0;
925 server.bgsavechildpid = -1;
926 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
927 }
928 } else {
929 /* If there is not a background saving in progress check if
930 * we have to save now */
931 time_t now = time(NULL);
932 for (j = 0; j < server.saveparamslen; j++) {
933 struct saveparam *sp = server.saveparams+j;
934
935 if (server.dirty >= sp->changes &&
936 now-server.lastsave > sp->seconds) {
937 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
938 sp->changes, sp->seconds);
939 rdbSaveBackground(server.dbfilename);
940 break;
941 }
942 }
943 }
944
945 /* Try to expire a few timed out keys */
946 for (j = 0; j < server.dbnum; j++) {
947 redisDb *db = server.db+j;
948 int num = dictSize(db->expires);
949
950 if (num) {
951 time_t now = time(NULL);
952
953 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
954 num = REDIS_EXPIRELOOKUPS_PER_CRON;
955 while (num--) {
956 dictEntry *de;
957 time_t t;
958
959 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
960 t = (time_t) dictGetEntryVal(de);
961 if (now > t) {
962 deleteKey(db,dictGetEntryKey(de));
963 }
964 }
965 }
966 }
967
968 /* Check if we should connect to a MASTER */
969 if (server.replstate == REDIS_REPL_CONNECT) {
970 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
971 if (syncWithMaster() == REDIS_OK) {
972 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
973 }
974 }
975 return 1000;
976 }
977
978 static void createSharedObjects(void) {
979 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
980 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
981 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
982 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
983 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
984 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
985 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
986 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
987 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
988 /* no such key */
989 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
990 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
991 "-ERR Operation against a key holding the wrong kind of value\r\n"));
992 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
993 "-ERR no such key\r\n"));
994 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
995 "-ERR syntax error\r\n"));
996 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
997 "-ERR source and destination objects are the same\r\n"));
998 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
999 "-ERR index out of range\r\n"));
1000 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1001 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1002 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1003 shared.select0 = createStringObject("select 0\r\n",10);
1004 shared.select1 = createStringObject("select 1\r\n",10);
1005 shared.select2 = createStringObject("select 2\r\n",10);
1006 shared.select3 = createStringObject("select 3\r\n",10);
1007 shared.select4 = createStringObject("select 4\r\n",10);
1008 shared.select5 = createStringObject("select 5\r\n",10);
1009 shared.select6 = createStringObject("select 6\r\n",10);
1010 shared.select7 = createStringObject("select 7\r\n",10);
1011 shared.select8 = createStringObject("select 8\r\n",10);
1012 shared.select9 = createStringObject("select 9\r\n",10);
1013 }
1014
1015 static void appendServerSaveParams(time_t seconds, int changes) {
1016 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1017 server.saveparams[server.saveparamslen].seconds = seconds;
1018 server.saveparams[server.saveparamslen].changes = changes;
1019 server.saveparamslen++;
1020 }
1021
1022 static void ResetServerSaveParams() {
1023 zfree(server.saveparams);
1024 server.saveparams = NULL;
1025 server.saveparamslen = 0;
1026 }
1027
1028 static void initServerConfig() {
1029 server.dbnum = REDIS_DEFAULT_DBNUM;
1030 server.port = REDIS_SERVERPORT;
1031 server.verbosity = REDIS_DEBUG;
1032 server.maxidletime = REDIS_MAXIDLETIME;
1033 server.saveparams = NULL;
1034 server.logfile = NULL; /* NULL = log on standard output */
1035 server.bindaddr = NULL;
1036 server.glueoutputbuf = 1;
1037 server.daemonize = 0;
1038 server.appendonly = 0;
1039 server.appendfsync = APPENDFSYNC_ALWAYS;
1040 server.lastfsync = time(NULL);
1041 server.appendfd = -1;
1042 server.appendseldb = -1; /* Make sure the first time will not match */
1043 server.pidfile = "/var/run/redis.pid";
1044 server.dbfilename = "dump.rdb";
1045 server.appendfilename = "appendonly.log";
1046 server.requirepass = NULL;
1047 server.shareobjects = 0;
1048 server.sharingpoolsize = 1024;
1049 server.maxclients = 0;
1050 server.maxmemory = 0;
1051 ResetServerSaveParams();
1052
1053 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1054 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1055 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1056 /* Replication related */
1057 server.isslave = 0;
1058 server.masterhost = NULL;
1059 server.masterport = 6379;
1060 server.master = NULL;
1061 server.replstate = REDIS_REPL_NONE;
1062
1063 /* Double constants initialization */
1064 R_Zero = 0.0;
1065 R_PosInf = 1.0/R_Zero;
1066 R_NegInf = -1.0/R_Zero;
1067 R_Nan = R_Zero/R_Zero;
1068 }
1069
1070 static void initServer() {
1071 int j;
1072
1073 signal(SIGHUP, SIG_IGN);
1074 signal(SIGPIPE, SIG_IGN);
1075 setupSigSegvAction();
1076
1077 server.clients = listCreate();
1078 server.slaves = listCreate();
1079 server.monitors = listCreate();
1080 server.objfreelist = listCreate();
1081 createSharedObjects();
1082 server.el = aeCreateEventLoop();
1083 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1084 server.sharingpool = dictCreate(&setDictType,NULL);
1085 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1086 if (server.fd == -1) {
1087 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1088 exit(1);
1089 }
1090 for (j = 0; j < server.dbnum; j++) {
1091 server.db[j].dict = dictCreate(&hashDictType,NULL);
1092 server.db[j].expires = dictCreate(&setDictType,NULL);
1093 server.db[j].id = j;
1094 }
1095 server.cronloops = 0;
1096 server.bgsaveinprogress = 0;
1097 server.bgsavechildpid = -1;
1098 server.lastsave = time(NULL);
1099 server.dirty = 0;
1100 server.usedmemory = 0;
1101 server.stat_numcommands = 0;
1102 server.stat_numconnections = 0;
1103 server.stat_starttime = time(NULL);
1104 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1105
1106 if (server.appendonly) {
1107 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1108 if (server.appendfd == -1) {
1109 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1110 strerror(errno));
1111 exit(1);
1112 }
1113 }
1114 }
1115
1116 /* Empty the whole database */
1117 static long long emptyDb() {
1118 int j;
1119 long long removed = 0;
1120
1121 for (j = 0; j < server.dbnum; j++) {
1122 removed += dictSize(server.db[j].dict);
1123 dictEmpty(server.db[j].dict);
1124 dictEmpty(server.db[j].expires);
1125 }
1126 return removed;
1127 }
1128
1129 static int yesnotoi(char *s) {
1130 if (!strcasecmp(s,"yes")) return 1;
1131 else if (!strcasecmp(s,"no")) return 0;
1132 else return -1;
1133 }
1134
1135 /* I agree, this is a very rudimental way to load a configuration...
1136 will improve later if the config gets more complex */
1137 static void loadServerConfig(char *filename) {
1138 FILE *fp;
1139 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1140 int linenum = 0;
1141 sds line = NULL;
1142
1143 if (filename[0] == '-' && filename[1] == '\0')
1144 fp = stdin;
1145 else {
1146 if ((fp = fopen(filename,"r")) == NULL) {
1147 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1148 exit(1);
1149 }
1150 }
1151
1152 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1153 sds *argv;
1154 int argc, j;
1155
1156 linenum++;
1157 line = sdsnew(buf);
1158 line = sdstrim(line," \t\r\n");
1159
1160 /* Skip comments and blank lines*/
1161 if (line[0] == '#' || line[0] == '\0') {
1162 sdsfree(line);
1163 continue;
1164 }
1165
1166 /* Split into arguments */
1167 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1168 sdstolower(argv[0]);
1169
1170 /* Execute config directives */
1171 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1172 server.maxidletime = atoi(argv[1]);
1173 if (server.maxidletime < 0) {
1174 err = "Invalid timeout value"; goto loaderr;
1175 }
1176 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1177 server.port = atoi(argv[1]);
1178 if (server.port < 1 || server.port > 65535) {
1179 err = "Invalid port"; goto loaderr;
1180 }
1181 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1182 server.bindaddr = zstrdup(argv[1]);
1183 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1184 int seconds = atoi(argv[1]);
1185 int changes = atoi(argv[2]);
1186 if (seconds < 1 || changes < 0) {
1187 err = "Invalid save parameters"; goto loaderr;
1188 }
1189 appendServerSaveParams(seconds,changes);
1190 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1191 if (chdir(argv[1]) == -1) {
1192 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1193 argv[1], strerror(errno));
1194 exit(1);
1195 }
1196 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1197 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1198 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1199 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1200 else {
1201 err = "Invalid log level. Must be one of debug, notice, warning";
1202 goto loaderr;
1203 }
1204 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1205 FILE *logfp;
1206
1207 server.logfile = zstrdup(argv[1]);
1208 if (!strcasecmp(server.logfile,"stdout")) {
1209 zfree(server.logfile);
1210 server.logfile = NULL;
1211 }
1212 if (server.logfile) {
1213 /* Test if we are able to open the file. The server will not
1214 * be able to abort just for this problem later... */
1215 logfp = fopen(server.logfile,"a");
1216 if (logfp == NULL) {
1217 err = sdscatprintf(sdsempty(),
1218 "Can't open the log file: %s", strerror(errno));
1219 goto loaderr;
1220 }
1221 fclose(logfp);
1222 }
1223 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1224 server.dbnum = atoi(argv[1]);
1225 if (server.dbnum < 1) {
1226 err = "Invalid number of databases"; goto loaderr;
1227 }
1228 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1229 server.maxclients = atoi(argv[1]);
1230 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1231 server.maxmemory = strtoll(argv[1], NULL, 10);
1232 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1233 server.masterhost = sdsnew(argv[1]);
1234 server.masterport = atoi(argv[2]);
1235 server.replstate = REDIS_REPL_CONNECT;
1236 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1237 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1238 err = "argument must be 'yes' or 'no'"; goto loaderr;
1239 }
1240 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1241 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1242 err = "argument must be 'yes' or 'no'"; goto loaderr;
1243 }
1244 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1245 server.sharingpoolsize = atoi(argv[1]);
1246 if (server.sharingpoolsize < 1) {
1247 err = "invalid object sharing pool size"; goto loaderr;
1248 }
1249 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1250 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1251 err = "argument must be 'yes' or 'no'"; goto loaderr;
1252 }
1253 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1254 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1255 err = "argument must be 'yes' or 'no'"; goto loaderr;
1256 }
1257 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1258 if (strcasecmp(argv[1],"no")) {
1259 server.appendfsync = APPENDFSYNC_NO;
1260 } else if (strcasecmp(argv[1],"always")) {
1261 server.appendfsync = APPENDFSYNC_ALWAYS;
1262 } else if (strcasecmp(argv[1],"everysec")) {
1263 server.appendfsync = APPENDFSYNC_EVERYSEC;
1264 } else {
1265 err = "argument must be 'no', 'always' or 'everysec'";
1266 goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1269 server.requirepass = zstrdup(argv[1]);
1270 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1271 server.pidfile = zstrdup(argv[1]);
1272 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1273 server.dbfilename = zstrdup(argv[1]);
1274 } else {
1275 err = "Bad directive or wrong number of arguments"; goto loaderr;
1276 }
1277 for (j = 0; j < argc; j++)
1278 sdsfree(argv[j]);
1279 zfree(argv);
1280 sdsfree(line);
1281 }
1282 if (fp != stdin) fclose(fp);
1283 return;
1284
1285 loaderr:
1286 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1287 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1288 fprintf(stderr, ">>> '%s'\n", line);
1289 fprintf(stderr, "%s\n", err);
1290 exit(1);
1291 }
1292
1293 static void freeClientArgv(redisClient *c) {
1294 int j;
1295
1296 for (j = 0; j < c->argc; j++)
1297 decrRefCount(c->argv[j]);
1298 for (j = 0; j < c->mbargc; j++)
1299 decrRefCount(c->mbargv[j]);
1300 c->argc = 0;
1301 c->mbargc = 0;
1302 }
1303
1304 static void freeClient(redisClient *c) {
1305 listNode *ln;
1306
1307 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1308 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1309 sdsfree(c->querybuf);
1310 listRelease(c->reply);
1311 freeClientArgv(c);
1312 close(c->fd);
1313 ln = listSearchKey(server.clients,c);
1314 assert(ln != NULL);
1315 listDelNode(server.clients,ln);
1316 if (c->flags & REDIS_SLAVE) {
1317 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1318 close(c->repldbfd);
1319 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1320 ln = listSearchKey(l,c);
1321 assert(ln != NULL);
1322 listDelNode(l,ln);
1323 }
1324 if (c->flags & REDIS_MASTER) {
1325 server.master = NULL;
1326 server.replstate = REDIS_REPL_CONNECT;
1327 }
1328 zfree(c->argv);
1329 zfree(c->mbargv);
1330 zfree(c);
1331 }
1332
1333 static void glueReplyBuffersIfNeeded(redisClient *c) {
1334 int totlen = 0;
1335 listNode *ln;
1336 robj *o;
1337
1338 listRewind(c->reply);
1339 while((ln = listYield(c->reply))) {
1340 o = ln->value;
1341 totlen += sdslen(o->ptr);
1342 /* This optimization makes more sense if we don't have to copy
1343 * too much data */
1344 if (totlen > 1024) return;
1345 }
1346 if (totlen > 0) {
1347 char buf[1024];
1348 int copylen = 0;
1349
1350 listRewind(c->reply);
1351 while((ln = listYield(c->reply))) {
1352 o = ln->value;
1353 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1354 copylen += sdslen(o->ptr);
1355 listDelNode(c->reply,ln);
1356 }
1357 /* Now the output buffer is empty, add the new single element */
1358 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1359 listAddNodeTail(c->reply,o);
1360 }
1361 }
1362
1363 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1364 redisClient *c = privdata;
1365 int nwritten = 0, totwritten = 0, objlen;
1366 robj *o;
1367 REDIS_NOTUSED(el);
1368 REDIS_NOTUSED(mask);
1369
1370 if (server.glueoutputbuf && listLength(c->reply) > 1)
1371 glueReplyBuffersIfNeeded(c);
1372 while(listLength(c->reply)) {
1373 o = listNodeValue(listFirst(c->reply));
1374 objlen = sdslen(o->ptr);
1375
1376 if (objlen == 0) {
1377 listDelNode(c->reply,listFirst(c->reply));
1378 continue;
1379 }
1380
1381 if (c->flags & REDIS_MASTER) {
1382 /* Don't reply to a master */
1383 nwritten = objlen - c->sentlen;
1384 } else {
1385 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1386 if (nwritten <= 0) break;
1387 }
1388 c->sentlen += nwritten;
1389 totwritten += nwritten;
1390 /* If we fully sent the object on head go to the next one */
1391 if (c->sentlen == objlen) {
1392 listDelNode(c->reply,listFirst(c->reply));
1393 c->sentlen = 0;
1394 }
1395 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1396 * bytes, in a single threaded server it's a good idea to server
1397 * other clients as well, even if a very large request comes from
1398 * super fast link that is always able to accept data (in real world
1399 * terms think to 'KEYS *' against the loopback interfae) */
1400 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1401 }
1402 if (nwritten == -1) {
1403 if (errno == EAGAIN) {
1404 nwritten = 0;
1405 } else {
1406 redisLog(REDIS_DEBUG,
1407 "Error writing to client: %s", strerror(errno));
1408 freeClient(c);
1409 return;
1410 }
1411 }
1412 if (totwritten > 0) c->lastinteraction = time(NULL);
1413 if (listLength(c->reply) == 0) {
1414 c->sentlen = 0;
1415 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1416 }
1417 }
1418
1419 static struct redisCommand *lookupCommand(char *name) {
1420 int j = 0;
1421 while(cmdTable[j].name != NULL) {
1422 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1423 j++;
1424 }
1425 return NULL;
1426 }
1427
1428 /* resetClient prepare the client to process the next command */
1429 static void resetClient(redisClient *c) {
1430 freeClientArgv(c);
1431 c->bulklen = -1;
1432 c->multibulk = 0;
1433 }
1434
1435 /* If this function gets called we already read a whole
1436 * command, argments are in the client argv/argc fields.
1437 * processCommand() execute the command or prepare the
1438 * server for a bulk read from the client.
1439 *
1440 * If 1 is returned the client is still alive and valid and
1441 * and other operations can be performed by the caller. Otherwise
1442 * if 0 is returned the client was destroied (i.e. after QUIT). */
1443 static int processCommand(redisClient *c) {
1444 struct redisCommand *cmd;
1445 long long dirty;
1446
1447 /* Free some memory if needed (maxmemory setting) */
1448 if (server.maxmemory) freeMemoryIfNeeded();
1449
1450 /* Handle the multi bulk command type. This is an alternative protocol
1451 * supported by Redis in order to receive commands that are composed of
1452 * multiple binary-safe "bulk" arguments. The latency of processing is
1453 * a bit higher but this allows things like multi-sets, so if this
1454 * protocol is used only for MSET and similar commands this is a big win. */
1455 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1456 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1457 if (c->multibulk <= 0) {
1458 resetClient(c);
1459 return 1;
1460 } else {
1461 decrRefCount(c->argv[c->argc-1]);
1462 c->argc--;
1463 return 1;
1464 }
1465 } else if (c->multibulk) {
1466 if (c->bulklen == -1) {
1467 if (((char*)c->argv[0]->ptr)[0] != '$') {
1468 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1469 resetClient(c);
1470 return 1;
1471 } else {
1472 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1473 decrRefCount(c->argv[0]);
1474 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1475 c->argc--;
1476 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1477 resetClient(c);
1478 return 1;
1479 }
1480 c->argc--;
1481 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1482 return 1;
1483 }
1484 } else {
1485 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1486 c->mbargv[c->mbargc] = c->argv[0];
1487 c->mbargc++;
1488 c->argc--;
1489 c->multibulk--;
1490 if (c->multibulk == 0) {
1491 robj **auxargv;
1492 int auxargc;
1493
1494 /* Here we need to swap the multi-bulk argc/argv with the
1495 * normal argc/argv of the client structure. */
1496 auxargv = c->argv;
1497 c->argv = c->mbargv;
1498 c->mbargv = auxargv;
1499
1500 auxargc = c->argc;
1501 c->argc = c->mbargc;
1502 c->mbargc = auxargc;
1503
1504 /* We need to set bulklen to something different than -1
1505 * in order for the code below to process the command without
1506 * to try to read the last argument of a bulk command as
1507 * a special argument. */
1508 c->bulklen = 0;
1509 /* continue below and process the command */
1510 } else {
1511 c->bulklen = -1;
1512 return 1;
1513 }
1514 }
1515 }
1516 /* -- end of multi bulk commands processing -- */
1517
1518 /* The QUIT command is handled as a special case. Normal command
1519 * procs are unable to close the client connection safely */
1520 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1521 freeClient(c);
1522 return 0;
1523 }
1524 cmd = lookupCommand(c->argv[0]->ptr);
1525 if (!cmd) {
1526 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1527 resetClient(c);
1528 return 1;
1529 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1530 (c->argc < -cmd->arity)) {
1531 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1532 resetClient(c);
1533 return 1;
1534 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1535 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1536 resetClient(c);
1537 return 1;
1538 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1539 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1540
1541 decrRefCount(c->argv[c->argc-1]);
1542 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1543 c->argc--;
1544 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1545 resetClient(c);
1546 return 1;
1547 }
1548 c->argc--;
1549 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1550 /* It is possible that the bulk read is already in the
1551 * buffer. Check this condition and handle it accordingly.
1552 * This is just a fast path, alternative to call processInputBuffer().
1553 * It's a good idea since the code is small and this condition
1554 * happens most of the times. */
1555 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1556 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1557 c->argc++;
1558 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1559 } else {
1560 return 1;
1561 }
1562 }
1563 /* Let's try to share objects on the command arguments vector */
1564 if (server.shareobjects) {
1565 int j;
1566 for(j = 1; j < c->argc; j++)
1567 c->argv[j] = tryObjectSharing(c->argv[j]);
1568 }
1569 /* Let's try to encode the bulk object to save space. */
1570 if (cmd->flags & REDIS_CMD_BULK)
1571 tryObjectEncoding(c->argv[c->argc-1]);
1572
1573 /* Check if the user is authenticated */
1574 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1575 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1576 resetClient(c);
1577 return 1;
1578 }
1579
1580 /* Exec the command */
1581 dirty = server.dirty;
1582 cmd->proc(c);
1583 if (server.appendonly != 0)
1584 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1585 if (server.dirty-dirty != 0 && listLength(server.slaves))
1586 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1587 if (listLength(server.monitors))
1588 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1589 server.stat_numcommands++;
1590
1591 /* Prepare the client for the next command */
1592 if (c->flags & REDIS_CLOSE) {
1593 freeClient(c);
1594 return 0;
1595 }
1596 resetClient(c);
1597 return 1;
1598 }
1599
1600 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1601 listNode *ln;
1602 int outc = 0, j;
1603 robj **outv;
1604 /* (args*2)+1 is enough room for args, spaces, newlines */
1605 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1606
1607 if (argc <= REDIS_STATIC_ARGS) {
1608 outv = static_outv;
1609 } else {
1610 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1611 }
1612
1613 for (j = 0; j < argc; j++) {
1614 if (j != 0) outv[outc++] = shared.space;
1615 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1616 robj *lenobj;
1617
1618 lenobj = createObject(REDIS_STRING,
1619 sdscatprintf(sdsempty(),"%d\r\n",
1620 stringObjectLen(argv[j])));
1621 lenobj->refcount = 0;
1622 outv[outc++] = lenobj;
1623 }
1624 outv[outc++] = argv[j];
1625 }
1626 outv[outc++] = shared.crlf;
1627
1628 /* Increment all the refcounts at start and decrement at end in order to
1629 * be sure to free objects if there is no slave in a replication state
1630 * able to be feed with commands */
1631 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1632 listRewind(slaves);
1633 while((ln = listYield(slaves))) {
1634 redisClient *slave = ln->value;
1635
1636 /* Don't feed slaves that are still waiting for BGSAVE to start */
1637 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1638
1639 /* Feed all the other slaves, MONITORs and so on */
1640 if (slave->slaveseldb != dictid) {
1641 robj *selectcmd;
1642
1643 switch(dictid) {
1644 case 0: selectcmd = shared.select0; break;
1645 case 1: selectcmd = shared.select1; break;
1646 case 2: selectcmd = shared.select2; break;
1647 case 3: selectcmd = shared.select3; break;
1648 case 4: selectcmd = shared.select4; break;
1649 case 5: selectcmd = shared.select5; break;
1650 case 6: selectcmd = shared.select6; break;
1651 case 7: selectcmd = shared.select7; break;
1652 case 8: selectcmd = shared.select8; break;
1653 case 9: selectcmd = shared.select9; break;
1654 default:
1655 selectcmd = createObject(REDIS_STRING,
1656 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1657 selectcmd->refcount = 0;
1658 break;
1659 }
1660 addReply(slave,selectcmd);
1661 slave->slaveseldb = dictid;
1662 }
1663 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1664 }
1665 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1666 if (outv != static_outv) zfree(outv);
1667 }
1668
1669 /* TODO: translate EXPIREs into EXPIRETOs */
1670 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1671 sds buf = sdsempty();
1672 int j;
1673 ssize_t nwritten;
1674 time_t now;
1675 robj *tmpargv[3];
1676
1677 /* The DB this command was targetting is not the same as the last command
1678 * we appendend. To issue a SELECT command is needed. */
1679 if (dictid != server.appendseldb) {
1680 char seldb[64];
1681
1682 snprintf(seldb,sizeof(seldb),"%d",dictid);
1683 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1684 strlen(seldb),seldb);
1685 server.appendseldb = dictid;
1686 }
1687
1688 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
1689 * EXPIREs into EXPIREATs calls */
1690 if (cmd->proc == expireCommand) {
1691 long when;
1692
1693 tmpargv[0] = createStringObject("EXPIREAT",8);
1694 tmpargv[1] = argv[1];
1695 incrRefCount(argv[1]);
1696 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
1697 tmpargv[2] = createObject(REDIS_STRING,
1698 sdscatprintf(sdsempty(),"%ld",when));
1699 argv = tmpargv;
1700 }
1701
1702 /* Append the actual command */
1703 buf = sdscatprintf(buf,"*%d\r\n",argc);
1704 for (j = 0; j < argc; j++) {
1705 robj *o = argv[j];
1706
1707 if (o->encoding != REDIS_ENCODING_RAW)
1708 o = getDecodedObject(o);
1709 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
1710 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
1711 buf = sdscatlen(buf,"\r\n",2);
1712 if (o != argv[j])
1713 decrRefCount(o);
1714 }
1715
1716 /* Free the objects from the modified argv for EXPIREAT */
1717 if (cmd->proc == expireCommand) {
1718 for (j = 0; j < 3; j++)
1719 decrRefCount(argv[j]);
1720 }
1721
1722 /* We want to perform a single write. This should be guaranteed atomic
1723 * at least if the filesystem we are writing is a real physical one.
1724 * While this will save us against the server being killed I don't think
1725 * there is much to do about the whole server stopping for power problems
1726 * or alike */
1727 nwritten = write(server.appendfd,buf,sdslen(buf));
1728 if (nwritten != (signed)sdslen(buf)) {
1729 /* Ooops, we are in troubles. The best thing to do for now is
1730 * to simply exit instead to give the illusion that everything is
1731 * working as expected. */
1732 if (nwritten == -1) {
1733 redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
1734 } else {
1735 redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
1736 }
1737 abort();
1738 }
1739 now = time(NULL);
1740 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
1741 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
1742 now-server.lastfsync > 1))
1743 {
1744 fsync(server.appendfd); /* Let's try to get this data on the disk */
1745 server.lastfsync = now;
1746 }
1747 }
1748
1749 static void processInputBuffer(redisClient *c) {
1750 again:
1751 if (c->bulklen == -1) {
1752 /* Read the first line of the query */
1753 char *p = strchr(c->querybuf,'\n');
1754 size_t querylen;
1755
1756 if (p) {
1757 sds query, *argv;
1758 int argc, j;
1759
1760 query = c->querybuf;
1761 c->querybuf = sdsempty();
1762 querylen = 1+(p-(query));
1763 if (sdslen(query) > querylen) {
1764 /* leave data after the first line of the query in the buffer */
1765 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1766 }
1767 *p = '\0'; /* remove "\n" */
1768 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1769 sdsupdatelen(query);
1770
1771 /* Now we can split the query in arguments */
1772 if (sdslen(query) == 0) {
1773 /* Ignore empty query */
1774 sdsfree(query);
1775 return;
1776 }
1777 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1778 sdsfree(query);
1779
1780 if (c->argv) zfree(c->argv);
1781 c->argv = zmalloc(sizeof(robj*)*argc);
1782
1783 for (j = 0; j < argc; j++) {
1784 if (sdslen(argv[j])) {
1785 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1786 c->argc++;
1787 } else {
1788 sdsfree(argv[j]);
1789 }
1790 }
1791 zfree(argv);
1792 /* Execute the command. If the client is still valid
1793 * after processCommand() return and there is something
1794 * on the query buffer try to process the next command. */
1795 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1796 return;
1797 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1798 redisLog(REDIS_DEBUG, "Client protocol error");
1799 freeClient(c);
1800 return;
1801 }
1802 } else {
1803 /* Bulk read handling. Note that if we are at this point
1804 the client already sent a command terminated with a newline,
1805 we are reading the bulk data that is actually the last
1806 argument of the command. */
1807 int qbl = sdslen(c->querybuf);
1808
1809 if (c->bulklen <= qbl) {
1810 /* Copy everything but the final CRLF as final argument */
1811 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1812 c->argc++;
1813 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1814 /* Process the command. If the client is still valid after
1815 * the processing and there is more data in the buffer
1816 * try to parse it. */
1817 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1818 return;
1819 }
1820 }
1821 }
1822
1823 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1824 redisClient *c = (redisClient*) privdata;
1825 char buf[REDIS_IOBUF_LEN];
1826 int nread;
1827 REDIS_NOTUSED(el);
1828 REDIS_NOTUSED(mask);
1829
1830 nread = read(fd, buf, REDIS_IOBUF_LEN);
1831 if (nread == -1) {
1832 if (errno == EAGAIN) {
1833 nread = 0;
1834 } else {
1835 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1836 freeClient(c);
1837 return;
1838 }
1839 } else if (nread == 0) {
1840 redisLog(REDIS_DEBUG, "Client closed connection");
1841 freeClient(c);
1842 return;
1843 }
1844 if (nread) {
1845 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1846 c->lastinteraction = time(NULL);
1847 } else {
1848 return;
1849 }
1850 processInputBuffer(c);
1851 }
1852
1853 static int selectDb(redisClient *c, int id) {
1854 if (id < 0 || id >= server.dbnum)
1855 return REDIS_ERR;
1856 c->db = &server.db[id];
1857 return REDIS_OK;
1858 }
1859
1860 static void *dupClientReplyValue(void *o) {
1861 incrRefCount((robj*)o);
1862 return 0;
1863 }
1864
1865 static redisClient *createClient(int fd) {
1866 redisClient *c = zmalloc(sizeof(*c));
1867
1868 anetNonBlock(NULL,fd);
1869 anetTcpNoDelay(NULL,fd);
1870 if (!c) return NULL;
1871 selectDb(c,0);
1872 c->fd = fd;
1873 c->querybuf = sdsempty();
1874 c->argc = 0;
1875 c->argv = NULL;
1876 c->bulklen = -1;
1877 c->multibulk = 0;
1878 c->mbargc = 0;
1879 c->mbargv = NULL;
1880 c->sentlen = 0;
1881 c->flags = 0;
1882 c->lastinteraction = time(NULL);
1883 c->authenticated = 0;
1884 c->replstate = REDIS_REPL_NONE;
1885 c->reply = listCreate();
1886 listSetFreeMethod(c->reply,decrRefCount);
1887 listSetDupMethod(c->reply,dupClientReplyValue);
1888 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1889 readQueryFromClient, c, NULL) == AE_ERR) {
1890 freeClient(c);
1891 return NULL;
1892 }
1893 listAddNodeTail(server.clients,c);
1894 return c;
1895 }
1896
1897 static void addReply(redisClient *c, robj *obj) {
1898 if (listLength(c->reply) == 0 &&
1899 (c->replstate == REDIS_REPL_NONE ||
1900 c->replstate == REDIS_REPL_ONLINE) &&
1901 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1902 sendReplyToClient, c, NULL) == AE_ERR) return;
1903 if (obj->encoding != REDIS_ENCODING_RAW) {
1904 obj = getDecodedObject(obj);
1905 } else {
1906 incrRefCount(obj);
1907 }
1908 listAddNodeTail(c->reply,obj);
1909 }
1910
1911 static void addReplySds(redisClient *c, sds s) {
1912 robj *o = createObject(REDIS_STRING,s);
1913 addReply(c,o);
1914 decrRefCount(o);
1915 }
1916
1917 static void addReplyBulkLen(redisClient *c, robj *obj) {
1918 size_t len;
1919
1920 if (obj->encoding == REDIS_ENCODING_RAW) {
1921 len = sdslen(obj->ptr);
1922 } else {
1923 long n = (long)obj->ptr;
1924
1925 len = 1;
1926 if (n < 0) {
1927 len++;
1928 n = -n;
1929 }
1930 while((n = n/10) != 0) {
1931 len++;
1932 }
1933 }
1934 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1935 }
1936
1937 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1938 int cport, cfd;
1939 char cip[128];
1940 redisClient *c;
1941 REDIS_NOTUSED(el);
1942 REDIS_NOTUSED(mask);
1943 REDIS_NOTUSED(privdata);
1944
1945 cfd = anetAccept(server.neterr, fd, cip, &cport);
1946 if (cfd == AE_ERR) {
1947 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1948 return;
1949 }
1950 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1951 if ((c = createClient(cfd)) == NULL) {
1952 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1953 close(cfd); /* May be already closed, just ingore errors */
1954 return;
1955 }
1956 /* If maxclient directive is set and this is one client more... close the
1957 * connection. Note that we create the client instead to check before
1958 * for this condition, since now the socket is already set in nonblocking
1959 * mode and we can send an error for free using the Kernel I/O */
1960 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1961 char *err = "-ERR max number of clients reached\r\n";
1962
1963 /* That's a best effort error message, don't check write errors */
1964 (void) write(c->fd,err,strlen(err));
1965 freeClient(c);
1966 return;
1967 }
1968 server.stat_numconnections++;
1969 }
1970
1971 /* ======================= Redis objects implementation ===================== */
1972
1973 static robj *createObject(int type, void *ptr) {
1974 robj *o;
1975
1976 if (listLength(server.objfreelist)) {
1977 listNode *head = listFirst(server.objfreelist);
1978 o = listNodeValue(head);
1979 listDelNode(server.objfreelist,head);
1980 } else {
1981 o = zmalloc(sizeof(*o));
1982 }
1983 o->type = type;
1984 o->encoding = REDIS_ENCODING_RAW;
1985 o->ptr = ptr;
1986 o->refcount = 1;
1987 return o;
1988 }
1989
1990 static robj *createStringObject(char *ptr, size_t len) {
1991 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1992 }
1993
1994 static robj *createListObject(void) {
1995 list *l = listCreate();
1996
1997 listSetFreeMethod(l,decrRefCount);
1998 return createObject(REDIS_LIST,l);
1999 }
2000
2001 static robj *createSetObject(void) {
2002 dict *d = dictCreate(&setDictType,NULL);
2003 return createObject(REDIS_SET,d);
2004 }
2005
2006 static robj *createZsetObject(void) {
2007 zset *zs = zmalloc(sizeof(*zs));
2008
2009 zs->dict = dictCreate(&zsetDictType,NULL);
2010 zs->zsl = zslCreate();
2011 return createObject(REDIS_ZSET,zs);
2012 }
2013
2014 static void freeStringObject(robj *o) {
2015 if (o->encoding == REDIS_ENCODING_RAW) {
2016 sdsfree(o->ptr);
2017 }
2018 }
2019
2020 static void freeListObject(robj *o) {
2021 listRelease((list*) o->ptr);
2022 }
2023
2024 static void freeSetObject(robj *o) {
2025 dictRelease((dict*) o->ptr);
2026 }
2027
2028 static void freeZsetObject(robj *o) {
2029 zset *zs = o->ptr;
2030
2031 dictRelease(zs->dict);
2032 zslFree(zs->zsl);
2033 zfree(zs);
2034 }
2035
2036 static void freeHashObject(robj *o) {
2037 dictRelease((dict*) o->ptr);
2038 }
2039
2040 static void incrRefCount(robj *o) {
2041 o->refcount++;
2042 #ifdef DEBUG_REFCOUNT
2043 if (o->type == REDIS_STRING)
2044 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2045 #endif
2046 }
2047
2048 static void decrRefCount(void *obj) {
2049 robj *o = obj;
2050
2051 #ifdef DEBUG_REFCOUNT
2052 if (o->type == REDIS_STRING)
2053 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2054 #endif
2055 if (--(o->refcount) == 0) {
2056 switch(o->type) {
2057 case REDIS_STRING: freeStringObject(o); break;
2058 case REDIS_LIST: freeListObject(o); break;
2059 case REDIS_SET: freeSetObject(o); break;
2060 case REDIS_ZSET: freeZsetObject(o); break;
2061 case REDIS_HASH: freeHashObject(o); break;
2062 default: assert(0 != 0); break;
2063 }
2064 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2065 !listAddNodeHead(server.objfreelist,o))
2066 zfree(o);
2067 }
2068 }
2069
2070 static robj *lookupKey(redisDb *db, robj *key) {
2071 dictEntry *de = dictFind(db->dict,key);
2072 return de ? dictGetEntryVal(de) : NULL;
2073 }
2074
2075 static robj *lookupKeyRead(redisDb *db, robj *key) {
2076 expireIfNeeded(db,key);
2077 return lookupKey(db,key);
2078 }
2079
2080 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2081 deleteIfVolatile(db,key);
2082 return lookupKey(db,key);
2083 }
2084
2085 static int deleteKey(redisDb *db, robj *key) {
2086 int retval;
2087
2088 /* We need to protect key from destruction: after the first dictDelete()
2089 * it may happen that 'key' is no longer valid if we don't increment
2090 * it's count. This may happen when we get the object reference directly
2091 * from the hash table with dictRandomKey() or dict iterators */
2092 incrRefCount(key);
2093 if (dictSize(db->expires)) dictDelete(db->expires,key);
2094 retval = dictDelete(db->dict,key);
2095 decrRefCount(key);
2096
2097 return retval == DICT_OK;
2098 }
2099
2100 /* Try to share an object against the shared objects pool */
2101 static robj *tryObjectSharing(robj *o) {
2102 struct dictEntry *de;
2103 unsigned long c;
2104
2105 if (o == NULL || server.shareobjects == 0) return o;
2106
2107 assert(o->type == REDIS_STRING);
2108 de = dictFind(server.sharingpool,o);
2109 if (de) {
2110 robj *shared = dictGetEntryKey(de);
2111
2112 c = ((unsigned long) dictGetEntryVal(de))+1;
2113 dictGetEntryVal(de) = (void*) c;
2114 incrRefCount(shared);
2115 decrRefCount(o);
2116 return shared;
2117 } else {
2118 /* Here we are using a stream algorihtm: Every time an object is
2119 * shared we increment its count, everytime there is a miss we
2120 * recrement the counter of a random object. If this object reaches
2121 * zero we remove the object and put the current object instead. */
2122 if (dictSize(server.sharingpool) >=
2123 server.sharingpoolsize) {
2124 de = dictGetRandomKey(server.sharingpool);
2125 assert(de != NULL);
2126 c = ((unsigned long) dictGetEntryVal(de))-1;
2127 dictGetEntryVal(de) = (void*) c;
2128 if (c == 0) {
2129 dictDelete(server.sharingpool,de->key);
2130 }
2131 } else {
2132 c = 0; /* If the pool is empty we want to add this object */
2133 }
2134 if (c == 0) {
2135 int retval;
2136
2137 retval = dictAdd(server.sharingpool,o,(void*)1);
2138 assert(retval == DICT_OK);
2139 incrRefCount(o);
2140 }
2141 return o;
2142 }
2143 }
2144
2145 /* Check if the nul-terminated string 's' can be represented by a long
2146 * (that is, is a number that fits into long without any other space or
2147 * character before or after the digits).
2148 *
2149 * If so, the function returns REDIS_OK and *longval is set to the value
2150 * of the number. Otherwise REDIS_ERR is returned */
2151 static int isStringRepresentableAsLong(sds s, long *longval) {
2152 char buf[32], *endptr;
2153 long value;
2154 int slen;
2155
2156 value = strtol(s, &endptr, 10);
2157 if (endptr[0] != '\0') return REDIS_ERR;
2158 slen = snprintf(buf,32,"%ld",value);
2159
2160 /* If the number converted back into a string is not identical
2161 * then it's not possible to encode the string as integer */
2162 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2163 if (longval) *longval = value;
2164 return REDIS_OK;
2165 }
2166
2167 /* Try to encode a string object in order to save space */
2168 static int tryObjectEncoding(robj *o) {
2169 long value;
2170 sds s = o->ptr;
2171
2172 if (o->encoding != REDIS_ENCODING_RAW)
2173 return REDIS_ERR; /* Already encoded */
2174
2175 /* It's not save to encode shared objects: shared objects can be shared
2176 * everywhere in the "object space" of Redis. Encoded objects can only
2177 * appear as "values" (and not, for instance, as keys) */
2178 if (o->refcount > 1) return REDIS_ERR;
2179
2180 /* Currently we try to encode only strings */
2181 assert(o->type == REDIS_STRING);
2182
2183 /* Check if we can represent this string as a long integer */
2184 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2185
2186 /* Ok, this object can be encoded */
2187 o->encoding = REDIS_ENCODING_INT;
2188 sdsfree(o->ptr);
2189 o->ptr = (void*) value;
2190 return REDIS_OK;
2191 }
2192
2193 /* Get a decoded version of an encoded object (returned as a new object) */
2194 static robj *getDecodedObject(const robj *o) {
2195 robj *dec;
2196
2197 assert(o->encoding != REDIS_ENCODING_RAW);
2198 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2199 char buf[32];
2200
2201 snprintf(buf,32,"%ld",(long)o->ptr);
2202 dec = createStringObject(buf,strlen(buf));
2203 return dec;
2204 } else {
2205 assert(1 != 1);
2206 }
2207 }
2208
2209 /* Compare two string objects via strcmp() or alike.
2210 * Note that the objects may be integer-encoded. In such a case we
2211 * use snprintf() to get a string representation of the numbers on the stack
2212 * and compare the strings, it's much faster than calling getDecodedObject(). */
2213 static int compareStringObjects(robj *a, robj *b) {
2214 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2215 char bufa[128], bufb[128], *astr, *bstr;
2216 int bothsds = 1;
2217
2218 if (a == b) return 0;
2219 if (a->encoding != REDIS_ENCODING_RAW) {
2220 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2221 astr = bufa;
2222 bothsds = 0;
2223 } else {
2224 astr = a->ptr;
2225 }
2226 if (b->encoding != REDIS_ENCODING_RAW) {
2227 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2228 bstr = bufb;
2229 bothsds = 0;
2230 } else {
2231 bstr = b->ptr;
2232 }
2233 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2234 }
2235
2236 static size_t stringObjectLen(robj *o) {
2237 assert(o->type == REDIS_STRING);
2238 if (o->encoding == REDIS_ENCODING_RAW) {
2239 return sdslen(o->ptr);
2240 } else {
2241 char buf[32];
2242
2243 return snprintf(buf,32,"%ld",(long)o->ptr);
2244 }
2245 }
2246
2247 /*============================ DB saving/loading ============================ */
2248
2249 static int rdbSaveType(FILE *fp, unsigned char type) {
2250 if (fwrite(&type,1,1,fp) == 0) return -1;
2251 return 0;
2252 }
2253
2254 static int rdbSaveTime(FILE *fp, time_t t) {
2255 int32_t t32 = (int32_t) t;
2256 if (fwrite(&t32,4,1,fp) == 0) return -1;
2257 return 0;
2258 }
2259
2260 /* check rdbLoadLen() comments for more info */
2261 static int rdbSaveLen(FILE *fp, uint32_t len) {
2262 unsigned char buf[2];
2263
2264 if (len < (1<<6)) {
2265 /* Save a 6 bit len */
2266 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2267 if (fwrite(buf,1,1,fp) == 0) return -1;
2268 } else if (len < (1<<14)) {
2269 /* Save a 14 bit len */
2270 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2271 buf[1] = len&0xFF;
2272 if (fwrite(buf,2,1,fp) == 0) return -1;
2273 } else {
2274 /* Save a 32 bit len */
2275 buf[0] = (REDIS_RDB_32BITLEN<<6);
2276 if (fwrite(buf,1,1,fp) == 0) return -1;
2277 len = htonl(len);
2278 if (fwrite(&len,4,1,fp) == 0) return -1;
2279 }
2280 return 0;
2281 }
2282
2283 /* String objects in the form "2391" "-100" without any space and with a
2284 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2285 * encoded as integers to save space */
2286 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2287 long long value;
2288 char *endptr, buf[32];
2289
2290 /* Check if it's possible to encode this value as a number */
2291 value = strtoll(s, &endptr, 10);
2292 if (endptr[0] != '\0') return 0;
2293 snprintf(buf,32,"%lld",value);
2294
2295 /* If the number converted back into a string is not identical
2296 * then it's not possible to encode the string as integer */
2297 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2298
2299 /* Finally check if it fits in our ranges */
2300 if (value >= -(1<<7) && value <= (1<<7)-1) {
2301 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2302 enc[1] = value&0xFF;
2303 return 2;
2304 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2305 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2306 enc[1] = value&0xFF;
2307 enc[2] = (value>>8)&0xFF;
2308 return 3;
2309 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2310 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2311 enc[1] = value&0xFF;
2312 enc[2] = (value>>8)&0xFF;
2313 enc[3] = (value>>16)&0xFF;
2314 enc[4] = (value>>24)&0xFF;
2315 return 5;
2316 } else {
2317 return 0;
2318 }
2319 }
2320
2321 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2322 unsigned int comprlen, outlen;
2323 unsigned char byte;
2324 void *out;
2325
2326 /* We require at least four bytes compression for this to be worth it */
2327 outlen = sdslen(obj->ptr)-4;
2328 if (outlen <= 0) return 0;
2329 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2330 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2331 if (comprlen == 0) {
2332 zfree(out);
2333 return 0;
2334 }
2335 /* Data compressed! Let's save it on disk */
2336 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2337 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2338 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2339 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2340 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2341 zfree(out);
2342 return comprlen;
2343
2344 writeerr:
2345 zfree(out);
2346 return -1;
2347 }
2348
2349 /* Save a string objet as [len][data] on disk. If the object is a string
2350 * representation of an integer value we try to safe it in a special form */
2351 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2352 size_t len;
2353 int enclen;
2354
2355 len = sdslen(obj->ptr);
2356
2357 /* Try integer encoding */
2358 if (len <= 11) {
2359 unsigned char buf[5];
2360 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2361 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2362 return 0;
2363 }
2364 }
2365
2366 /* Try LZF compression - under 20 bytes it's unable to compress even
2367 * aaaaaaaaaaaaaaaaaa so skip it */
2368 if (len > 20) {
2369 int retval;
2370
2371 retval = rdbSaveLzfStringObject(fp,obj);
2372 if (retval == -1) return -1;
2373 if (retval > 0) return 0;
2374 /* retval == 0 means data can't be compressed, save the old way */
2375 }
2376
2377 /* Store verbatim */
2378 if (rdbSaveLen(fp,len) == -1) return -1;
2379 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2380 return 0;
2381 }
2382
2383 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2384 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2385 int retval;
2386 robj *dec;
2387
2388 if (obj->encoding != REDIS_ENCODING_RAW) {
2389 dec = getDecodedObject(obj);
2390 retval = rdbSaveStringObjectRaw(fp,dec);
2391 decrRefCount(dec);
2392 return retval;
2393 } else {
2394 return rdbSaveStringObjectRaw(fp,obj);
2395 }
2396 }
2397
2398 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2399 * 8 bit integer specifing the length of the representation.
2400 * This 8 bit integer has special values in order to specify the following
2401 * conditions:
2402 * 253: not a number
2403 * 254: + inf
2404 * 255: - inf
2405 */
2406 static int rdbSaveDoubleValue(FILE *fp, double val) {
2407 unsigned char buf[128];
2408 int len;
2409
2410 if (isnan(val)) {
2411 buf[0] = 253;
2412 len = 1;
2413 } else if (!isfinite(val)) {
2414 len = 1;
2415 buf[0] = (val < 0) ? 255 : 254;
2416 } else {
2417 snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
2418 buf[0] = strlen((char*)buf);
2419 len = buf[0]+1;
2420 }
2421 if (fwrite(buf,len,1,fp) == 0) return -1;
2422 return 0;
2423 }
2424
2425 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2426 static int rdbSave(char *filename) {
2427 dictIterator *di = NULL;
2428 dictEntry *de;
2429 FILE *fp;
2430 char tmpfile[256];
2431 int j;
2432 time_t now = time(NULL);
2433
2434 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2435 fp = fopen(tmpfile,"w");
2436 if (!fp) {
2437 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2438 return REDIS_ERR;
2439 }
2440 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2441 for (j = 0; j < server.dbnum; j++) {
2442 redisDb *db = server.db+j;
2443 dict *d = db->dict;
2444 if (dictSize(d) == 0) continue;
2445 di = dictGetIterator(d);
2446 if (!di) {
2447 fclose(fp);
2448 return REDIS_ERR;
2449 }
2450
2451 /* Write the SELECT DB opcode */
2452 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2453 if (rdbSaveLen(fp,j) == -1) goto werr;
2454
2455 /* Iterate this DB writing every entry */
2456 while((de = dictNext(di)) != NULL) {
2457 robj *key = dictGetEntryKey(de);
2458 robj *o = dictGetEntryVal(de);
2459 time_t expiretime = getExpire(db,key);
2460
2461 /* Save the expire time */
2462 if (expiretime != -1) {
2463 /* If this key is already expired skip it */
2464 if (expiretime < now) continue;
2465 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2466 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2467 }
2468 /* Save the key and associated value */
2469 if (rdbSaveType(fp,o->type) == -1) goto werr;
2470 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2471 if (o->type == REDIS_STRING) {
2472 /* Save a string value */
2473 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2474 } else if (o->type == REDIS_LIST) {
2475 /* Save a list value */
2476 list *list = o->ptr;
2477 listNode *ln;
2478
2479 listRewind(list);
2480 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2481 while((ln = listYield(list))) {
2482 robj *eleobj = listNodeValue(ln);
2483
2484 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2485 }
2486 } else if (o->type == REDIS_SET) {
2487 /* Save a set value */
2488 dict *set = o->ptr;
2489 dictIterator *di = dictGetIterator(set);
2490 dictEntry *de;
2491
2492 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2493 while((de = dictNext(di)) != NULL) {
2494 robj *eleobj = dictGetEntryKey(de);
2495
2496 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2497 }
2498 dictReleaseIterator(di);
2499 } else if (o->type == REDIS_ZSET) {
2500 /* Save a set value */
2501 zset *zs = o->ptr;
2502 dictIterator *di = dictGetIterator(zs->dict);
2503 dictEntry *de;
2504
2505 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2506 while((de = dictNext(di)) != NULL) {
2507 robj *eleobj = dictGetEntryKey(de);
2508 double *score = dictGetEntryVal(de);
2509
2510 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2511 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2512 }
2513 dictReleaseIterator(di);
2514 } else {
2515 assert(0 != 0);
2516 }
2517 }
2518 dictReleaseIterator(di);
2519 }
2520 /* EOF opcode */
2521 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2522
2523 /* Make sure data will not remain on the OS's output buffers */
2524 fflush(fp);
2525 fsync(fileno(fp));
2526 fclose(fp);
2527
2528 /* Use RENAME to make sure the DB file is changed atomically only
2529 * if the generate DB file is ok. */
2530 if (rename(tmpfile,filename) == -1) {
2531 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2532 unlink(tmpfile);
2533 return REDIS_ERR;
2534 }
2535 redisLog(REDIS_NOTICE,"DB saved on disk");
2536 server.dirty = 0;
2537 server.lastsave = time(NULL);
2538 return REDIS_OK;
2539
2540 werr:
2541 fclose(fp);
2542 unlink(tmpfile);
2543 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2544 if (di) dictReleaseIterator(di);
2545 return REDIS_ERR;
2546 }
2547
2548 static int rdbSaveBackground(char *filename) {
2549 pid_t childpid;
2550
2551 if (server.bgsaveinprogress) return REDIS_ERR;
2552 if ((childpid = fork()) == 0) {
2553 /* Child */
2554 close(server.fd);
2555 if (rdbSave(filename) == REDIS_OK) {
2556 exit(0);
2557 } else {
2558 exit(1);
2559 }
2560 } else {
2561 /* Parent */
2562 if (childpid == -1) {
2563 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2564 strerror(errno));
2565 return REDIS_ERR;
2566 }
2567 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2568 server.bgsaveinprogress = 1;
2569 server.bgsavechildpid = childpid;
2570 return REDIS_OK;
2571 }
2572 return REDIS_OK; /* unreached */
2573 }
2574
2575 static void rdbRemoveTempFile(pid_t childpid) {
2576 char tmpfile[256];
2577
2578 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2579 unlink(tmpfile);
2580 }
2581
2582 static int rdbLoadType(FILE *fp) {
2583 unsigned char type;
2584 if (fread(&type,1,1,fp) == 0) return -1;
2585 return type;
2586 }
2587
2588 static time_t rdbLoadTime(FILE *fp) {
2589 int32_t t32;
2590 if (fread(&t32,4,1,fp) == 0) return -1;
2591 return (time_t) t32;
2592 }
2593
2594 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2595 * of this file for a description of how this are stored on disk.
2596 *
2597 * isencoded is set to 1 if the readed length is not actually a length but
2598 * an "encoding type", check the above comments for more info */
2599 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2600 unsigned char buf[2];
2601 uint32_t len;
2602
2603 if (isencoded) *isencoded = 0;
2604 if (rdbver == 0) {
2605 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2606 return ntohl(len);
2607 } else {
2608 int type;
2609
2610 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2611 type = (buf[0]&0xC0)>>6;
2612 if (type == REDIS_RDB_6BITLEN) {
2613 /* Read a 6 bit len */
2614 return buf[0]&0x3F;
2615 } else if (type == REDIS_RDB_ENCVAL) {
2616 /* Read a 6 bit len encoding type */
2617 if (isencoded) *isencoded = 1;
2618 return buf[0]&0x3F;
2619 } else if (type == REDIS_RDB_14BITLEN) {
2620 /* Read a 14 bit len */
2621 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2622 return ((buf[0]&0x3F)<<8)|buf[1];
2623 } else {
2624 /* Read a 32 bit len */
2625 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2626 return ntohl(len);
2627 }
2628 }
2629 }
2630
2631 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2632 unsigned char enc[4];
2633 long long val;
2634
2635 if (enctype == REDIS_RDB_ENC_INT8) {
2636 if (fread(enc,1,1,fp) == 0) return NULL;
2637 val = (signed char)enc[0];
2638 } else if (enctype == REDIS_RDB_ENC_INT16) {
2639 uint16_t v;
2640 if (fread(enc,2,1,fp) == 0) return NULL;
2641 v = enc[0]|(enc[1]<<8);
2642 val = (int16_t)v;
2643 } else if (enctype == REDIS_RDB_ENC_INT32) {
2644 uint32_t v;
2645 if (fread(enc,4,1,fp) == 0) return NULL;
2646 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2647 val = (int32_t)v;
2648 } else {
2649 val = 0; /* anti-warning */
2650 assert(0!=0);
2651 }
2652 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2653 }
2654
2655 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2656 unsigned int len, clen;
2657 unsigned char *c = NULL;
2658 sds val = NULL;
2659
2660 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2661 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2662 if ((c = zmalloc(clen)) == NULL) goto err;
2663 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2664 if (fread(c,clen,1,fp) == 0) goto err;
2665 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2666 zfree(c);
2667 return createObject(REDIS_STRING,val);
2668 err:
2669 zfree(c);
2670 sdsfree(val);
2671 return NULL;
2672 }
2673
2674 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2675 int isencoded;
2676 uint32_t len;
2677 sds val;
2678
2679 len = rdbLoadLen(fp,rdbver,&isencoded);
2680 if (isencoded) {
2681 switch(len) {
2682 case REDIS_RDB_ENC_INT8:
2683 case REDIS_RDB_ENC_INT16:
2684 case REDIS_RDB_ENC_INT32:
2685 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2686 case REDIS_RDB_ENC_LZF:
2687 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2688 default:
2689 assert(0!=0);
2690 }
2691 }
2692
2693 if (len == REDIS_RDB_LENERR) return NULL;
2694 val = sdsnewlen(NULL,len);
2695 if (len && fread(val,len,1,fp) == 0) {
2696 sdsfree(val);
2697 return NULL;
2698 }
2699 return tryObjectSharing(createObject(REDIS_STRING,val));
2700 }
2701
2702 /* For information about double serialization check rdbSaveDoubleValue() */
2703 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2704 char buf[128];
2705 unsigned char len;
2706
2707 if (fread(&len,1,1,fp) == 0) return -1;
2708 switch(len) {
2709 case 255: *val = R_NegInf; return 0;
2710 case 254: *val = R_PosInf; return 0;
2711 case 253: *val = R_Nan; return 0;
2712 default:
2713 if (fread(buf,len,1,fp) == 0) return -1;
2714 sscanf(buf, "%lg", val);
2715 return 0;
2716 }
2717 }
2718
2719 static int rdbLoad(char *filename) {
2720 FILE *fp;
2721 robj *keyobj = NULL;
2722 uint32_t dbid;
2723 int type, retval, rdbver;
2724 dict *d = server.db[0].dict;
2725 redisDb *db = server.db+0;
2726 char buf[1024];
2727 time_t expiretime = -1, now = time(NULL);
2728
2729 fp = fopen(filename,"r");
2730 if (!fp) return REDIS_ERR;
2731 if (fread(buf,9,1,fp) == 0) goto eoferr;
2732 buf[9] = '\0';
2733 if (memcmp(buf,"REDIS",5) != 0) {
2734 fclose(fp);
2735 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2736 return REDIS_ERR;
2737 }
2738 rdbver = atoi(buf+5);
2739 if (rdbver > 1) {
2740 fclose(fp);
2741 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2742 return REDIS_ERR;
2743 }
2744 while(1) {
2745 robj *o;
2746
2747 /* Read type. */
2748 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2749 if (type == REDIS_EXPIRETIME) {
2750 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2751 /* We read the time so we need to read the object type again */
2752 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2753 }
2754 if (type == REDIS_EOF) break;
2755 /* Handle SELECT DB opcode as a special case */
2756 if (type == REDIS_SELECTDB) {
2757 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2758 goto eoferr;
2759 if (dbid >= (unsigned)server.dbnum) {
2760 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2761 exit(1);
2762 }
2763 db = server.db+dbid;
2764 d = db->dict;
2765 continue;
2766 }
2767 /* Read key */
2768 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2769
2770 if (type == REDIS_STRING) {
2771 /* Read string value */
2772 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2773 tryObjectEncoding(o);
2774 } else if (type == REDIS_LIST || type == REDIS_SET) {
2775 /* Read list/set value */
2776 uint32_t listlen;
2777
2778 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2779 goto eoferr;
2780 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2781 /* Load every single element of the list/set */
2782 while(listlen--) {
2783 robj *ele;
2784
2785 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2786 tryObjectEncoding(ele);
2787 if (type == REDIS_LIST) {
2788 listAddNodeTail((list*)o->ptr,ele);
2789 } else {
2790 dictAdd((dict*)o->ptr,ele,NULL);
2791 }
2792 }
2793 } else if (type == REDIS_ZSET) {
2794 /* Read list/set value */
2795 uint32_t zsetlen;
2796 zset *zs;
2797
2798 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2799 goto eoferr;
2800 o = createZsetObject();
2801 zs = o->ptr;
2802 /* Load every single element of the list/set */
2803 while(zsetlen--) {
2804 robj *ele;
2805 double *score = zmalloc(sizeof(double));
2806
2807 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2808 tryObjectEncoding(ele);
2809 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2810 dictAdd(zs->dict,ele,score);
2811 zslInsert(zs->zsl,*score,ele);
2812 incrRefCount(ele); /* added to skiplist */
2813 }
2814 } else {
2815 assert(0 != 0);
2816 }
2817 /* Add the new object in the hash table */
2818 retval = dictAdd(d,keyobj,o);
2819 if (retval == DICT_ERR) {
2820 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2821 exit(1);
2822 }
2823 /* Set the expire time if needed */
2824 if (expiretime != -1) {
2825 setExpire(db,keyobj,expiretime);
2826 /* Delete this key if already expired */
2827 if (expiretime < now) deleteKey(db,keyobj);
2828 expiretime = -1;
2829 }
2830 keyobj = o = NULL;
2831 }
2832 fclose(fp);
2833 return REDIS_OK;
2834
2835 eoferr: /* unexpected end of file is handled here with a fatal exit */
2836 if (keyobj) decrRefCount(keyobj);
2837 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2838 exit(1);
2839 return REDIS_ERR; /* Just to avoid warning */
2840 }
2841
2842 /*================================== Commands =============================== */
2843
2844 static void authCommand(redisClient *c) {
2845 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2846 c->authenticated = 1;
2847 addReply(c,shared.ok);
2848 } else {
2849 c->authenticated = 0;
2850 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2851 }
2852 }
2853
2854 static void pingCommand(redisClient *c) {
2855 addReply(c,shared.pong);
2856 }
2857
2858 static void echoCommand(redisClient *c) {
2859 addReplyBulkLen(c,c->argv[1]);
2860 addReply(c,c->argv[1]);
2861 addReply(c,shared.crlf);
2862 }
2863
2864 /*=================================== Strings =============================== */
2865
2866 static void setGenericCommand(redisClient *c, int nx) {
2867 int retval;
2868
2869 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2870 if (retval == DICT_ERR) {
2871 if (!nx) {
2872 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2873 incrRefCount(c->argv[2]);
2874 } else {
2875 addReply(c,shared.czero);
2876 return;
2877 }
2878 } else {
2879 incrRefCount(c->argv[1]);
2880 incrRefCount(c->argv[2]);
2881 }
2882 server.dirty++;
2883 removeExpire(c->db,c->argv[1]);
2884 addReply(c, nx ? shared.cone : shared.ok);
2885 }
2886
2887 static void setCommand(redisClient *c) {
2888 setGenericCommand(c,0);
2889 }
2890
2891 static void setnxCommand(redisClient *c) {
2892 setGenericCommand(c,1);
2893 }
2894
2895 static void getCommand(redisClient *c) {
2896 robj *o = lookupKeyRead(c->db,c->argv[1]);
2897
2898 if (o == NULL) {
2899 addReply(c,shared.nullbulk);
2900 } else {
2901 if (o->type != REDIS_STRING) {
2902 addReply(c,shared.wrongtypeerr);
2903 } else {
2904 addReplyBulkLen(c,o);
2905 addReply(c,o);
2906 addReply(c,shared.crlf);
2907 }
2908 }
2909 }
2910
2911 static void getsetCommand(redisClient *c) {
2912 getCommand(c);
2913 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2914 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2915 } else {
2916 incrRefCount(c->argv[1]);
2917 }
2918 incrRefCount(c->argv[2]);
2919 server.dirty++;
2920 removeExpire(c->db,c->argv[1]);
2921 }
2922
2923 static void mgetCommand(redisClient *c) {
2924 int j;
2925
2926 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2927 for (j = 1; j < c->argc; j++) {
2928 robj *o = lookupKeyRead(c->db,c->argv[j]);
2929 if (o == NULL) {
2930 addReply(c,shared.nullbulk);
2931 } else {
2932 if (o->type != REDIS_STRING) {
2933 addReply(c,shared.nullbulk);
2934 } else {
2935 addReplyBulkLen(c,o);
2936 addReply(c,o);
2937 addReply(c,shared.crlf);
2938 }
2939 }
2940 }
2941 }
2942
2943 static void incrDecrCommand(redisClient *c, long long incr) {
2944 long long value;
2945 int retval;
2946 robj *o;
2947
2948 o = lookupKeyWrite(c->db,c->argv[1]);
2949 if (o == NULL) {
2950 value = 0;
2951 } else {
2952 if (o->type != REDIS_STRING) {
2953 value = 0;
2954 } else {
2955 char *eptr;
2956
2957 if (o->encoding == REDIS_ENCODING_RAW)
2958 value = strtoll(o->ptr, &eptr, 10);
2959 else if (o->encoding == REDIS_ENCODING_INT)
2960 value = (long)o->ptr;
2961 else
2962 assert(1 != 1);
2963 }
2964 }
2965
2966 value += incr;
2967 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2968 tryObjectEncoding(o);
2969 retval = dictAdd(c->db->dict,c->argv[1],o);
2970 if (retval == DICT_ERR) {
2971 dictReplace(c->db->dict,c->argv[1],o);
2972 removeExpire(c->db,c->argv[1]);
2973 } else {
2974 incrRefCount(c->argv[1]);
2975 }
2976 server.dirty++;
2977 addReply(c,shared.colon);
2978 addReply(c,o);
2979 addReply(c,shared.crlf);
2980 }
2981
2982 static void incrCommand(redisClient *c) {
2983 incrDecrCommand(c,1);
2984 }
2985
2986 static void decrCommand(redisClient *c) {
2987 incrDecrCommand(c,-1);
2988 }
2989
2990 static void incrbyCommand(redisClient *c) {
2991 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2992 incrDecrCommand(c,incr);
2993 }
2994
2995 static void decrbyCommand(redisClient *c) {
2996 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2997 incrDecrCommand(c,-incr);
2998 }
2999
3000 /* ========================= Type agnostic commands ========================= */
3001
3002 static void delCommand(redisClient *c) {
3003 int deleted = 0, j;
3004
3005 for (j = 1; j < c->argc; j++) {
3006 if (deleteKey(c->db,c->argv[j])) {
3007 server.dirty++;
3008 deleted++;
3009 }
3010 }
3011 switch(deleted) {
3012 case 0:
3013 addReply(c,shared.czero);
3014 break;
3015 case 1:
3016 addReply(c,shared.cone);
3017 break;
3018 default:
3019 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3020 break;
3021 }
3022 }
3023
3024 static void existsCommand(redisClient *c) {
3025 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3026 }
3027
3028 static void selectCommand(redisClient *c) {
3029 int id = atoi(c->argv[1]->ptr);
3030
3031 if (selectDb(c,id) == REDIS_ERR) {
3032 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3033 } else {
3034 addReply(c,shared.ok);
3035 }
3036 }
3037
3038 static void randomkeyCommand(redisClient *c) {
3039 dictEntry *de;
3040
3041 while(1) {
3042 de = dictGetRandomKey(c->db->dict);
3043 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3044 }
3045 if (de == NULL) {
3046 addReply(c,shared.plus);
3047 addReply(c,shared.crlf);
3048 } else {
3049 addReply(c,shared.plus);
3050 addReply(c,dictGetEntryKey(de));
3051 addReply(c,shared.crlf);
3052 }
3053 }
3054
3055 static void keysCommand(redisClient *c) {
3056 dictIterator *di;
3057 dictEntry *de;
3058 sds pattern = c->argv[1]->ptr;
3059 int plen = sdslen(pattern);
3060 int numkeys = 0, keyslen = 0;
3061 robj *lenobj = createObject(REDIS_STRING,NULL);
3062
3063 di = dictGetIterator(c->db->dict);
3064 addReply(c,lenobj);
3065 decrRefCount(lenobj);
3066 while((de = dictNext(di)) != NULL) {
3067 robj *keyobj = dictGetEntryKey(de);
3068
3069 sds key = keyobj->ptr;
3070 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3071 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3072 if (expireIfNeeded(c->db,keyobj) == 0) {
3073 if (numkeys != 0)
3074 addReply(c,shared.space);
3075 addReply(c,keyobj);
3076 numkeys++;
3077 keyslen += sdslen(key);
3078 }
3079 }
3080 }
3081 dictReleaseIterator(di);
3082 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3083 addReply(c,shared.crlf);
3084 }
3085
3086 static void dbsizeCommand(redisClient *c) {
3087 addReplySds(c,
3088 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3089 }
3090
3091 static void lastsaveCommand(redisClient *c) {
3092 addReplySds(c,
3093 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3094 }
3095
3096 static void typeCommand(redisClient *c) {
3097 robj *o;
3098 char *type;
3099
3100 o = lookupKeyRead(c->db,c->argv[1]);
3101 if (o == NULL) {
3102 type = "+none";
3103 } else {
3104 switch(o->type) {
3105 case REDIS_STRING: type = "+string"; break;
3106 case REDIS_LIST: type = "+list"; break;
3107 case REDIS_SET: type = "+set"; break;
3108 case REDIS_ZSET: type = "+zset"; break;
3109 default: type = "unknown"; break;
3110 }
3111 }
3112 addReplySds(c,sdsnew(type));
3113 addReply(c,shared.crlf);
3114 }
3115
3116 static void saveCommand(redisClient *c) {
3117 if (server.bgsaveinprogress) {
3118 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3119 return;
3120 }
3121 if (rdbSave(server.dbfilename) == REDIS_OK) {
3122 addReply(c,shared.ok);
3123 } else {
3124 addReply(c,shared.err);
3125 }
3126 }
3127
3128 static void bgsaveCommand(redisClient *c) {
3129 if (server.bgsaveinprogress) {
3130 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3131 return;
3132 }
3133 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3134 addReply(c,shared.ok);
3135 } else {
3136 addReply(c,shared.err);
3137 }
3138 }
3139
3140 static void shutdownCommand(redisClient *c) {
3141 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3142 /* Kill the saving child if there is a background saving in progress.
3143 We want to avoid race conditions, for instance our saving child may
3144 overwrite the synchronous saving did by SHUTDOWN. */
3145 if (server.bgsaveinprogress) {
3146 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3147 kill(server.bgsavechildpid,SIGKILL);
3148 rdbRemoveTempFile(server.bgsavechildpid);
3149 }
3150 /* SYNC SAVE */
3151 if (rdbSave(server.dbfilename) == REDIS_OK) {
3152 if (server.daemonize)
3153 unlink(server.pidfile);
3154 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3155 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3156 exit(1);
3157 } else {
3158 /* Ooops.. error saving! The best we can do is to continue operating.
3159 * Note that if there was a background saving process, in the next
3160 * cron() Redis will be notified that the background saving aborted,
3161 * handling special stuff like slaves pending for synchronization... */
3162 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3163 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3164 }
3165 }
3166
3167 static void renameGenericCommand(redisClient *c, int nx) {
3168 robj *o;
3169
3170 /* To use the same key as src and dst is probably an error */
3171 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3172 addReply(c,shared.sameobjecterr);
3173 return;
3174 }
3175
3176 o = lookupKeyWrite(c->db,c->argv[1]);
3177 if (o == NULL) {
3178 addReply(c,shared.nokeyerr);
3179 return;
3180 }
3181 incrRefCount(o);
3182 deleteIfVolatile(c->db,c->argv[2]);
3183 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3184 if (nx) {
3185 decrRefCount(o);
3186 addReply(c,shared.czero);
3187 return;
3188 }
3189 dictReplace(c->db->dict,c->argv[2],o);
3190 } else {
3191 incrRefCount(c->argv[2]);
3192 }
3193 deleteKey(c->db,c->argv[1]);
3194 server.dirty++;
3195 addReply(c,nx ? shared.cone : shared.ok);
3196 }
3197
3198 static void renameCommand(redisClient *c) {
3199 renameGenericCommand(c,0);
3200 }
3201
3202 static void renamenxCommand(redisClient *c) {
3203 renameGenericCommand(c,1);
3204 }
3205
3206 static void moveCommand(redisClient *c) {
3207 robj *o;
3208 redisDb *src, *dst;
3209 int srcid;
3210
3211 /* Obtain source and target DB pointers */
3212 src = c->db;
3213 srcid = c->db->id;
3214 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3215 addReply(c,shared.outofrangeerr);
3216 return;
3217 }
3218 dst = c->db;
3219 selectDb(c,srcid); /* Back to the source DB */
3220
3221 /* If the user is moving using as target the same
3222 * DB as the source DB it is probably an error. */
3223 if (src == dst) {
3224 addReply(c,shared.sameobjecterr);
3225 return;
3226 }
3227
3228 /* Check if the element exists and get a reference */
3229 o = lookupKeyWrite(c->db,c->argv[1]);
3230 if (!o) {
3231 addReply(c,shared.czero);
3232 return;
3233 }
3234
3235 /* Try to add the element to the target DB */
3236 deleteIfVolatile(dst,c->argv[1]);
3237 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3238 addReply(c,shared.czero);
3239 return;
3240 }
3241 incrRefCount(c->argv[1]);
3242 incrRefCount(o);
3243
3244 /* OK! key moved, free the entry in the source DB */
3245 deleteKey(src,c->argv[1]);
3246 server.dirty++;
3247 addReply(c,shared.cone);
3248 }
3249
3250 /* =================================== Lists ================================ */
3251 static void pushGenericCommand(redisClient *c, int where) {
3252 robj *lobj;
3253 list *list;
3254
3255 lobj = lookupKeyWrite(c->db,c->argv[1]);
3256 if (lobj == NULL) {
3257 lobj = createListObject();
3258 list = lobj->ptr;
3259 if (where == REDIS_HEAD) {
3260 listAddNodeHead(list,c->argv[2]);
3261 } else {
3262 listAddNodeTail(list,c->argv[2]);
3263 }
3264 dictAdd(c->db->dict,c->argv[1],lobj);
3265 incrRefCount(c->argv[1]);
3266 incrRefCount(c->argv[2]);
3267 } else {
3268 if (lobj->type != REDIS_LIST) {
3269 addReply(c,shared.wrongtypeerr);
3270 return;
3271 }
3272 list = lobj->ptr;
3273 if (where == REDIS_HEAD) {
3274 listAddNodeHead(list,c->argv[2]);
3275 } else {
3276 listAddNodeTail(list,c->argv[2]);
3277 }
3278 incrRefCount(c->argv[2]);
3279 }
3280 server.dirty++;
3281 addReply(c,shared.ok);
3282 }
3283
3284 static void lpushCommand(redisClient *c) {
3285 pushGenericCommand(c,REDIS_HEAD);
3286 }
3287
3288 static void rpushCommand(redisClient *c) {
3289 pushGenericCommand(c,REDIS_TAIL);
3290 }
3291
3292 static void llenCommand(redisClient *c) {
3293 robj *o;
3294 list *l;
3295
3296 o = lookupKeyRead(c->db,c->argv[1]);
3297 if (o == NULL) {
3298 addReply(c,shared.czero);
3299 return;
3300 } else {
3301 if (o->type != REDIS_LIST) {
3302 addReply(c,shared.wrongtypeerr);
3303 } else {
3304 l = o->ptr;
3305 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3306 }
3307 }
3308 }
3309
3310 static void lindexCommand(redisClient *c) {
3311 robj *o;
3312 int index = atoi(c->argv[2]->ptr);
3313
3314 o = lookupKeyRead(c->db,c->argv[1]);
3315 if (o == NULL) {
3316 addReply(c,shared.nullbulk);
3317 } else {
3318 if (o->type != REDIS_LIST) {
3319 addReply(c,shared.wrongtypeerr);
3320 } else {
3321 list *list = o->ptr;
3322 listNode *ln;
3323
3324 ln = listIndex(list, index);
3325 if (ln == NULL) {
3326 addReply(c,shared.nullbulk);
3327 } else {
3328 robj *ele = listNodeValue(ln);
3329 addReplyBulkLen(c,ele);
3330 addReply(c,ele);
3331 addReply(c,shared.crlf);
3332 }
3333 }
3334 }
3335 }
3336
3337 static void lsetCommand(redisClient *c) {
3338 robj *o;
3339 int index = atoi(c->argv[2]->ptr);
3340
3341 o = lookupKeyWrite(c->db,c->argv[1]);
3342 if (o == NULL) {
3343 addReply(c,shared.nokeyerr);
3344 } else {
3345 if (o->type != REDIS_LIST) {
3346 addReply(c,shared.wrongtypeerr);
3347 } else {
3348 list *list = o->ptr;
3349 listNode *ln;
3350
3351 ln = listIndex(list, index);
3352 if (ln == NULL) {
3353 addReply(c,shared.outofrangeerr);
3354 } else {
3355 robj *ele = listNodeValue(ln);
3356
3357 decrRefCount(ele);
3358 listNodeValue(ln) = c->argv[3];
3359 incrRefCount(c->argv[3]);
3360 addReply(c,shared.ok);
3361 server.dirty++;
3362 }
3363 }
3364 }
3365 }
3366
3367 static void popGenericCommand(redisClient *c, int where) {
3368 robj *o;
3369
3370 o = lookupKeyWrite(c->db,c->argv[1]);
3371 if (o == NULL) {
3372 addReply(c,shared.nullbulk);
3373 } else {
3374 if (o->type != REDIS_LIST) {
3375 addReply(c,shared.wrongtypeerr);
3376 } else {
3377 list *list = o->ptr;
3378 listNode *ln;
3379
3380 if (where == REDIS_HEAD)
3381 ln = listFirst(list);
3382 else
3383 ln = listLast(list);
3384
3385 if (ln == NULL) {
3386 addReply(c,shared.nullbulk);
3387 } else {
3388 robj *ele = listNodeValue(ln);
3389 addReplyBulkLen(c,ele);
3390 addReply(c,ele);
3391 addReply(c,shared.crlf);
3392 listDelNode(list,ln);
3393 server.dirty++;
3394 }
3395 }
3396 }
3397 }
3398
3399 static void lpopCommand(redisClient *c) {
3400 popGenericCommand(c,REDIS_HEAD);
3401 }
3402
3403 static void rpopCommand(redisClient *c) {
3404 popGenericCommand(c,REDIS_TAIL);
3405 }
3406
3407 static void lrangeCommand(redisClient *c) {
3408 robj *o;
3409 int start = atoi(c->argv[2]->ptr);
3410 int end = atoi(c->argv[3]->ptr);
3411
3412 o = lookupKeyRead(c->db,c->argv[1]);
3413 if (o == NULL) {
3414 addReply(c,shared.nullmultibulk);
3415 } else {
3416 if (o->type != REDIS_LIST) {
3417 addReply(c,shared.wrongtypeerr);
3418 } else {
3419 list *list = o->ptr;
3420 listNode *ln;
3421 int llen = listLength(list);
3422 int rangelen, j;
3423 robj *ele;
3424
3425 /* convert negative indexes */
3426 if (start < 0) start = llen+start;
3427 if (end < 0) end = llen+end;
3428 if (start < 0) start = 0;
3429 if (end < 0) end = 0;
3430
3431 /* indexes sanity checks */
3432 if (start > end || start >= llen) {
3433 /* Out of range start or start > end result in empty list */
3434 addReply(c,shared.emptymultibulk);
3435 return;
3436 }
3437 if (end >= llen) end = llen-1;
3438 rangelen = (end-start)+1;
3439
3440 /* Return the result in form of a multi-bulk reply */
3441 ln = listIndex(list, start);
3442 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3443 for (j = 0; j < rangelen; j++) {
3444 ele = listNodeValue(ln);
3445 addReplyBulkLen(c,ele);
3446 addReply(c,ele);
3447 addReply(c,shared.crlf);
3448 ln = ln->next;
3449 }
3450 }
3451 }
3452 }
3453
3454 static void ltrimCommand(redisClient *c) {
3455 robj *o;
3456 int start = atoi(c->argv[2]->ptr);
3457 int end = atoi(c->argv[3]->ptr);
3458
3459 o = lookupKeyWrite(c->db,c->argv[1]);
3460 if (o == NULL) {
3461 addReply(c,shared.nokeyerr);
3462 } else {
3463 if (o->type != REDIS_LIST) {
3464 addReply(c,shared.wrongtypeerr);
3465 } else {
3466 list *list = o->ptr;
3467 listNode *ln;
3468 int llen = listLength(list);
3469 int j, ltrim, rtrim;
3470
3471 /* convert negative indexes */
3472 if (start < 0) start = llen+start;
3473 if (end < 0) end = llen+end;
3474 if (start < 0) start = 0;
3475 if (end < 0) end = 0;
3476
3477 /* indexes sanity checks */
3478 if (start > end || start >= llen) {
3479 /* Out of range start or start > end result in empty list */
3480 ltrim = llen;
3481 rtrim = 0;
3482 } else {
3483 if (end >= llen) end = llen-1;
3484 ltrim = start;
3485 rtrim = llen-end-1;
3486 }
3487
3488 /* Remove list elements to perform the trim */
3489 for (j = 0; j < ltrim; j++) {
3490 ln = listFirst(list);
3491 listDelNode(list,ln);
3492 }
3493 for (j = 0; j < rtrim; j++) {
3494 ln = listLast(list);
3495 listDelNode(list,ln);
3496 }
3497 server.dirty++;
3498 addReply(c,shared.ok);
3499 }
3500 }
3501 }
3502
3503 static void lremCommand(redisClient *c) {
3504 robj *o;
3505
3506 o = lookupKeyWrite(c->db,c->argv[1]);
3507 if (o == NULL) {
3508 addReply(c,shared.czero);
3509 } else {
3510 if (o->type != REDIS_LIST) {
3511 addReply(c,shared.wrongtypeerr);
3512 } else {
3513 list *list = o->ptr;
3514 listNode *ln, *next;
3515 int toremove = atoi(c->argv[2]->ptr);
3516 int removed = 0;
3517 int fromtail = 0;
3518
3519 if (toremove < 0) {
3520 toremove = -toremove;
3521 fromtail = 1;
3522 }
3523 ln = fromtail ? list->tail : list->head;
3524 while (ln) {
3525 robj *ele = listNodeValue(ln);
3526
3527 next = fromtail ? ln->prev : ln->next;
3528 if (compareStringObjects(ele,c->argv[3]) == 0) {
3529 listDelNode(list,ln);
3530 server.dirty++;
3531 removed++;
3532 if (toremove && removed == toremove) break;
3533 }
3534 ln = next;
3535 }
3536 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3537 }
3538 }
3539 }
3540
3541 /* ==================================== Sets ================================ */
3542
3543 static void saddCommand(redisClient *c) {
3544 robj *set;
3545
3546 set = lookupKeyWrite(c->db,c->argv[1]);
3547 if (set == NULL) {
3548 set = createSetObject();
3549 dictAdd(c->db->dict,c->argv[1],set);
3550 incrRefCount(c->argv[1]);
3551 } else {
3552 if (set->type != REDIS_SET) {
3553 addReply(c,shared.wrongtypeerr);
3554 return;
3555 }
3556 }
3557 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3558 incrRefCount(c->argv[2]);
3559 server.dirty++;
3560 addReply(c,shared.cone);
3561 } else {
3562 addReply(c,shared.czero);
3563 }
3564 }
3565
3566 static void sremCommand(redisClient *c) {
3567 robj *set;
3568
3569 set = lookupKeyWrite(c->db,c->argv[1]);
3570 if (set == NULL) {
3571 addReply(c,shared.czero);
3572 } else {
3573 if (set->type != REDIS_SET) {
3574 addReply(c,shared.wrongtypeerr);
3575 return;
3576 }
3577 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3578 server.dirty++;
3579 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3580 addReply(c,shared.cone);
3581 } else {
3582 addReply(c,shared.czero);
3583 }
3584 }
3585 }
3586
3587 static void smoveCommand(redisClient *c) {
3588 robj *srcset, *dstset;
3589
3590 srcset = lookupKeyWrite(c->db,c->argv[1]);
3591 dstset = lookupKeyWrite(c->db,c->argv[2]);
3592
3593 /* If the source key does not exist return 0, if it's of the wrong type
3594 * raise an error */
3595 if (srcset == NULL || srcset->type != REDIS_SET) {
3596 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3597 return;
3598 }
3599 /* Error if the destination key is not a set as well */
3600 if (dstset && dstset->type != REDIS_SET) {
3601 addReply(c,shared.wrongtypeerr);
3602 return;
3603 }
3604 /* Remove the element from the source set */
3605 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3606 /* Key not found in the src set! return zero */
3607 addReply(c,shared.czero);
3608 return;
3609 }
3610 server.dirty++;
3611 /* Add the element to the destination set */
3612 if (!dstset) {
3613 dstset = createSetObject();
3614 dictAdd(c->db->dict,c->argv[2],dstset);
3615 incrRefCount(c->argv[2]);
3616 }
3617 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3618 incrRefCount(c->argv[3]);
3619 addReply(c,shared.cone);
3620 }
3621
3622 static void sismemberCommand(redisClient *c) {
3623 robj *set;
3624
3625 set = lookupKeyRead(c->db,c->argv[1]);
3626 if (set == NULL) {
3627 addReply(c,shared.czero);
3628 } else {
3629 if (set->type != REDIS_SET) {
3630 addReply(c,shared.wrongtypeerr);
3631 return;
3632 }
3633 if (dictFind(set->ptr,c->argv[2]))
3634 addReply(c,shared.cone);
3635 else
3636 addReply(c,shared.czero);
3637 }
3638 }
3639
3640 static void scardCommand(redisClient *c) {
3641 robj *o;
3642 dict *s;
3643
3644 o = lookupKeyRead(c->db,c->argv[1]);
3645 if (o == NULL) {
3646 addReply(c,shared.czero);
3647 return;
3648 } else {
3649 if (o->type != REDIS_SET) {
3650 addReply(c,shared.wrongtypeerr);
3651 } else {
3652 s = o->ptr;
3653 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3654 dictSize(s)));
3655 }
3656 }
3657 }
3658
3659 static void spopCommand(redisClient *c) {
3660 robj *set;
3661 dictEntry *de;
3662
3663 set = lookupKeyWrite(c->db,c->argv[1]);
3664 if (set == NULL) {
3665 addReply(c,shared.nullbulk);
3666 } else {
3667 if (set->type != REDIS_SET) {
3668 addReply(c,shared.wrongtypeerr);
3669 return;
3670 }
3671 de = dictGetRandomKey(set->ptr);
3672 if (de == NULL) {
3673 addReply(c,shared.nullbulk);
3674 } else {
3675 robj *ele = dictGetEntryKey(de);
3676
3677 addReplyBulkLen(c,ele);
3678 addReply(c,ele);
3679 addReply(c,shared.crlf);
3680 dictDelete(set->ptr,ele);
3681 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3682 server.dirty++;
3683 }
3684 }
3685 }
3686
3687 static void srandmemberCommand(redisClient *c) {
3688 robj *set;
3689 dictEntry *de;
3690
3691 set = lookupKeyRead(c->db,c->argv[1]);
3692 if (set == NULL) {
3693 addReply(c,shared.nullbulk);
3694 } else {
3695 if (set->type != REDIS_SET) {
3696 addReply(c,shared.wrongtypeerr);
3697 return;
3698 }
3699 de = dictGetRandomKey(set->ptr);
3700 if (de == NULL) {
3701 addReply(c,shared.nullbulk);
3702 } else {
3703 robj *ele = dictGetEntryKey(de);
3704
3705 addReplyBulkLen(c,ele);
3706 addReply(c,ele);
3707 addReply(c,shared.crlf);
3708 }
3709 }
3710 }
3711
3712 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3713 dict **d1 = (void*) s1, **d2 = (void*) s2;
3714
3715 return dictSize(*d1)-dictSize(*d2);
3716 }
3717
3718 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3719 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3720 dictIterator *di;
3721 dictEntry *de;
3722 robj *lenobj = NULL, *dstset = NULL;
3723 int j, cardinality = 0;
3724
3725 for (j = 0; j < setsnum; j++) {
3726 robj *setobj;
3727
3728 setobj = dstkey ?
3729 lookupKeyWrite(c->db,setskeys[j]) :
3730 lookupKeyRead(c->db,setskeys[j]);
3731 if (!setobj) {
3732 zfree(dv);
3733 if (dstkey) {
3734 deleteKey(c->db,dstkey);
3735 addReply(c,shared.ok);
3736 } else {
3737 addReply(c,shared.nullmultibulk);
3738 }
3739 return;
3740 }
3741 if (setobj->type != REDIS_SET) {
3742 zfree(dv);
3743 addReply(c,shared.wrongtypeerr);
3744 return;
3745 }
3746 dv[j] = setobj->ptr;
3747 }
3748 /* Sort sets from the smallest to largest, this will improve our
3749 * algorithm's performace */
3750 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3751
3752 /* The first thing we should output is the total number of elements...
3753 * since this is a multi-bulk write, but at this stage we don't know
3754 * the intersection set size, so we use a trick, append an empty object
3755 * to the output list and save the pointer to later modify it with the
3756 * right length */
3757 if (!dstkey) {
3758 lenobj = createObject(REDIS_STRING,NULL);
3759 addReply(c,lenobj);
3760 decrRefCount(lenobj);
3761 } else {
3762 /* If we have a target key where to store the resulting set
3763 * create this key with an empty set inside */
3764 dstset = createSetObject();
3765 }
3766
3767 /* Iterate all the elements of the first (smallest) set, and test
3768 * the element against all the other sets, if at least one set does
3769 * not include the element it is discarded */
3770 di = dictGetIterator(dv[0]);
3771
3772 while((de = dictNext(di)) != NULL) {
3773 robj *ele;
3774
3775 for (j = 1; j < setsnum; j++)
3776 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3777 if (j != setsnum)
3778 continue; /* at least one set does not contain the member */
3779 ele = dictGetEntryKey(de);
3780 if (!dstkey) {
3781 addReplyBulkLen(c,ele);
3782 addReply(c,ele);
3783 addReply(c,shared.crlf);
3784 cardinality++;
3785 } else {
3786 dictAdd(dstset->ptr,ele,NULL);
3787 incrRefCount(ele);
3788 }
3789 }
3790 dictReleaseIterator(di);
3791
3792 if (dstkey) {
3793 /* Store the resulting set into the target */
3794 deleteKey(c->db,dstkey);
3795 dictAdd(c->db->dict,dstkey,dstset);
3796 incrRefCount(dstkey);
3797 }
3798
3799 if (!dstkey) {
3800 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3801 } else {
3802 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3803 dictSize((dict*)dstset->ptr)));
3804 server.dirty++;
3805 }
3806 zfree(dv);
3807 }
3808
3809 static void sinterCommand(redisClient *c) {
3810 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3811 }
3812
3813 static void sinterstoreCommand(redisClient *c) {
3814 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3815 }
3816
3817 #define REDIS_OP_UNION 0
3818 #define REDIS_OP_DIFF 1
3819
3820 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3821 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3822 dictIterator *di;
3823 dictEntry *de;
3824 robj *dstset = NULL;
3825 int j, cardinality = 0;
3826
3827 for (j = 0; j < setsnum; j++) {
3828 robj *setobj;
3829
3830 setobj = dstkey ?
3831 lookupKeyWrite(c->db,setskeys[j]) :
3832 lookupKeyRead(c->db,setskeys[j]);
3833 if (!setobj) {
3834 dv[j] = NULL;
3835 continue;
3836 }
3837 if (setobj->type != REDIS_SET) {
3838 zfree(dv);
3839 addReply(c,shared.wrongtypeerr);
3840 return;
3841 }
3842 dv[j] = setobj->ptr;
3843 }
3844
3845 /* We need a temp set object to store our union. If the dstkey
3846 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3847 * this set object will be the resulting object to set into the target key*/
3848 dstset = createSetObject();
3849
3850 /* Iterate all the elements of all the sets, add every element a single
3851 * time to the result set */
3852 for (j = 0; j < setsnum; j++) {
3853 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3854 if (!dv[j]) continue; /* non existing keys are like empty sets */
3855
3856 di = dictGetIterator(dv[j]);
3857
3858 while((de = dictNext(di)) != NULL) {
3859 robj *ele;
3860
3861 /* dictAdd will not add the same element multiple times */
3862 ele = dictGetEntryKey(de);
3863 if (op == REDIS_OP_UNION || j == 0) {
3864 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3865 incrRefCount(ele);
3866 cardinality++;
3867 }
3868 } else if (op == REDIS_OP_DIFF) {
3869 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3870 cardinality--;
3871 }
3872 }
3873 }
3874 dictReleaseIterator(di);
3875
3876 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3877 }
3878
3879 /* Output the content of the resulting set, if not in STORE mode */
3880 if (!dstkey) {
3881 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3882 di = dictGetIterator(dstset->ptr);
3883 while((de = dictNext(di)) != NULL) {
3884 robj *ele;
3885
3886 ele = dictGetEntryKey(de);
3887 addReplyBulkLen(c,ele);
3888 addReply(c,ele);
3889 addReply(c,shared.crlf);
3890 }
3891 dictReleaseIterator(di);
3892 } else {
3893 /* If we have a target key where to store the resulting set
3894 * create this key with the result set inside */
3895 deleteKey(c->db,dstkey);
3896 dictAdd(c->db->dict,dstkey,dstset);
3897 incrRefCount(dstkey);
3898 }
3899
3900 /* Cleanup */
3901 if (!dstkey) {
3902 decrRefCount(dstset);
3903 } else {
3904 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3905 dictSize((dict*)dstset->ptr)));
3906 server.dirty++;
3907 }
3908 zfree(dv);
3909 }
3910
3911 static void sunionCommand(redisClient *c) {
3912 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3913 }
3914
3915 static void sunionstoreCommand(redisClient *c) {
3916 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3917 }
3918
3919 static void sdiffCommand(redisClient *c) {
3920 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3921 }
3922
3923 static void sdiffstoreCommand(redisClient *c) {
3924 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3925 }
3926
3927 /* ==================================== ZSets =============================== */
3928
3929 /* ZSETs are ordered sets using two data structures to hold the same elements
3930 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
3931 * data structure.
3932 *
3933 * The elements are added to an hash table mapping Redis objects to scores.
3934 * At the same time the elements are added to a skip list mapping scores
3935 * to Redis objects (so objects are sorted by scores in this "view"). */
3936
3937 /* This skiplist implementation is almost a C translation of the original
3938 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
3939 * Alternative to Balanced Trees", modified in three ways:
3940 * a) this implementation allows for repeated values.
3941 * b) the comparison is not just by key (our 'score') but by satellite data.
3942 * c) there is a back pointer, so it's a doubly linked list with the back
3943 * pointers being only at "level 1". This allows to traverse the list
3944 * from tail to head, useful for ZREVRANGE. */
3945
3946 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
3947 zskiplistNode *zn = zmalloc(sizeof(*zn));
3948
3949 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
3950 zn->score = score;
3951 zn->obj = obj;
3952 return zn;
3953 }
3954
3955 static zskiplist *zslCreate(void) {
3956 int j;
3957 zskiplist *zsl;
3958
3959 zsl = zmalloc(sizeof(*zsl));
3960 zsl->level = 1;
3961 zsl->length = 0;
3962 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
3963 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
3964 zsl->header->forward[j] = NULL;
3965 zsl->header->backward = NULL;
3966 zsl->tail = NULL;
3967 return zsl;
3968 }
3969
3970 static void zslFreeNode(zskiplistNode *node) {
3971 decrRefCount(node->obj);
3972 zfree(node->forward);
3973 zfree(node);
3974 }
3975
3976 static void zslFree(zskiplist *zsl) {
3977 zskiplistNode *node = zsl->header->forward[0], *next;
3978
3979 zfree(zsl->header->forward);
3980 zfree(zsl->header);
3981 while(node) {
3982 next = node->forward[0];
3983 zslFreeNode(node);
3984 node = next;
3985 }
3986 zfree(zsl);
3987 }
3988
3989 static int zslRandomLevel(void) {
3990 int level = 1;
3991 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
3992 level += 1;
3993 return level;
3994 }
3995
3996 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
3997 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
3998 int i, level;
3999
4000 x = zsl->header;
4001 for (i = zsl->level-1; i >= 0; i--) {
4002 while (x->forward[i] &&
4003 (x->forward[i]->score < score ||
4004 (x->forward[i]->score == score &&
4005 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4006 x = x->forward[i];
4007 update[i] = x;
4008 }
4009 /* we assume the key is not already inside, since we allow duplicated
4010 * scores, and the re-insertion of score and redis object should never
4011 * happpen since the caller of zslInsert() should test in the hash table
4012 * if the element is already inside or not. */
4013 level = zslRandomLevel();
4014 if (level > zsl->level) {
4015 for (i = zsl->level; i < level; i++)
4016 update[i] = zsl->header;
4017 zsl->level = level;
4018 }
4019 x = zslCreateNode(level,score,obj);
4020 for (i = 0; i < level; i++) {
4021 x->forward[i] = update[i]->forward[i];
4022 update[i]->forward[i] = x;
4023 }
4024 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4025 if (x->forward[0])
4026 x->forward[0]->backward = x;
4027 else
4028 zsl->tail = x;
4029 zsl->length++;
4030 }
4031
4032 /* Delete an element with matching score/object from the skiplist. */
4033 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4034 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4035 int i;
4036
4037 x = zsl->header;
4038 for (i = zsl->level-1; i >= 0; i--) {
4039 while (x->forward[i] &&
4040 (x->forward[i]->score < score ||
4041 (x->forward[i]->score == score &&
4042 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4043 x = x->forward[i];
4044 update[i] = x;
4045 }
4046 /* We may have multiple elements with the same score, what we need
4047 * is to find the element with both the right score and object. */
4048 x = x->forward[0];
4049 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4050 for (i = 0; i < zsl->level; i++) {
4051 if (update[i]->forward[i] != x) break;
4052 update[i]->forward[i] = x->forward[i];
4053 }
4054 if (x->forward[0]) {
4055 x->forward[0]->backward = (x->backward == zsl->header) ?
4056 NULL : x->backward;
4057 } else {
4058 zsl->tail = x->backward;
4059 }
4060 zslFreeNode(x);
4061 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4062 zsl->level--;
4063 zsl->length--;
4064 return 1;
4065 } else {
4066 return 0; /* not found */
4067 }
4068 return 0; /* not found */
4069 }
4070
4071 /* Delete all the elements with score between min and max from the skiplist.
4072 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4073 * Note that this function takes the reference to the hash table view of the
4074 * sorted set, in order to remove the elements from the hash table too. */
4075 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4076 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4077 unsigned long removed = 0;
4078 int i;
4079
4080 x = zsl->header;
4081 for (i = zsl->level-1; i >= 0; i--) {
4082 while (x->forward[i] && x->forward[i]->score < min)
4083 x = x->forward[i];
4084 update[i] = x;
4085 }
4086 /* We may have multiple elements with the same score, what we need
4087 * is to find the element with both the right score and object. */
4088 x = x->forward[0];
4089 while (x && x->score <= max) {
4090 zskiplistNode *next;
4091
4092 for (i = 0; i < zsl->level; i++) {
4093 if (update[i]->forward[i] != x) break;
4094 update[i]->forward[i] = x->forward[i];
4095 }
4096 if (x->forward[0]) {
4097 x->forward[0]->backward = (x->backward == zsl->header) ?
4098 NULL : x->backward;
4099 } else {
4100 zsl->tail = x->backward;
4101 }
4102 next = x->forward[0];
4103 dictDelete(dict,x->obj);
4104 zslFreeNode(x);
4105 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4106 zsl->level--;
4107 zsl->length--;
4108 removed++;
4109 x = next;
4110 }
4111 return removed; /* not found */
4112 }
4113
4114 /* Find the first node having a score equal or greater than the specified one.
4115 * Returns NULL if there is no match. */
4116 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4117 zskiplistNode *x;
4118 int i;
4119
4120 x = zsl->header;
4121 for (i = zsl->level-1; i >= 0; i--) {
4122 while (x->forward[i] && x->forward[i]->score < score)
4123 x = x->forward[i];
4124 }
4125 /* We may have multiple elements with the same score, what we need
4126 * is to find the element with both the right score and object. */
4127 return x->forward[0];
4128 }
4129
4130 /* The actual Z-commands implementations */
4131
4132 static void zaddCommand(redisClient *c) {
4133 robj *zsetobj;
4134 zset *zs;
4135 double *score;
4136
4137 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4138 if (zsetobj == NULL) {
4139 zsetobj = createZsetObject();
4140 dictAdd(c->db->dict,c->argv[1],zsetobj);
4141 incrRefCount(c->argv[1]);
4142 } else {
4143 if (zsetobj->type != REDIS_ZSET) {
4144 addReply(c,shared.wrongtypeerr);
4145 return;
4146 }
4147 }
4148 score = zmalloc(sizeof(double));
4149 *score = strtod(c->argv[2]->ptr,NULL);
4150 zs = zsetobj->ptr;
4151 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4152 /* case 1: New element */
4153 incrRefCount(c->argv[3]); /* added to hash */
4154 zslInsert(zs->zsl,*score,c->argv[3]);
4155 incrRefCount(c->argv[3]); /* added to skiplist */
4156 server.dirty++;
4157 addReply(c,shared.cone);
4158 } else {
4159 dictEntry *de;
4160 double *oldscore;
4161
4162 /* case 2: Score update operation */
4163 de = dictFind(zs->dict,c->argv[3]);
4164 assert(de != NULL);
4165 oldscore = dictGetEntryVal(de);
4166 if (*score != *oldscore) {
4167 int deleted;
4168
4169 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4170 assert(deleted != 0);
4171 zslInsert(zs->zsl,*score,c->argv[3]);
4172 incrRefCount(c->argv[3]);
4173 dictReplace(zs->dict,c->argv[3],score);
4174 server.dirty++;
4175 } else {
4176 zfree(score);
4177 }
4178 addReply(c,shared.czero);
4179 }
4180 }
4181
4182 static void zremCommand(redisClient *c) {
4183 robj *zsetobj;
4184 zset *zs;
4185
4186 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4187 if (zsetobj == NULL) {
4188 addReply(c,shared.czero);
4189 } else {
4190 dictEntry *de;
4191 double *oldscore;
4192 int deleted;
4193
4194 if (zsetobj->type != REDIS_ZSET) {
4195 addReply(c,shared.wrongtypeerr);
4196 return;
4197 }
4198 zs = zsetobj->ptr;
4199 de = dictFind(zs->dict,c->argv[2]);
4200 if (de == NULL) {
4201 addReply(c,shared.czero);
4202 return;
4203 }
4204 /* Delete from the skiplist */
4205 oldscore = dictGetEntryVal(de);
4206 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4207 assert(deleted != 0);
4208
4209 /* Delete from the hash table */
4210 dictDelete(zs->dict,c->argv[2]);
4211 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4212 server.dirty++;
4213 addReply(c,shared.cone);
4214 }
4215 }
4216
4217 static void zremrangebyscoreCommand(redisClient *c) {
4218 double min = strtod(c->argv[2]->ptr,NULL);
4219 double max = strtod(c->argv[3]->ptr,NULL);
4220 robj *zsetobj;
4221 zset *zs;
4222
4223 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4224 if (zsetobj == NULL) {
4225 addReply(c,shared.czero);
4226 } else {
4227 long deleted;
4228
4229 if (zsetobj->type != REDIS_ZSET) {
4230 addReply(c,shared.wrongtypeerr);
4231 return;
4232 }
4233 zs = zsetobj->ptr;
4234 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4235 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4236 server.dirty += deleted;
4237 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4238 }
4239 }
4240
4241 static void zrangeGenericCommand(redisClient *c, int reverse) {
4242 robj *o;
4243 int start = atoi(c->argv[2]->ptr);
4244 int end = atoi(c->argv[3]->ptr);
4245
4246 o = lookupKeyRead(c->db,c->argv[1]);
4247 if (o == NULL) {
4248 addReply(c,shared.nullmultibulk);
4249 } else {
4250 if (o->type != REDIS_ZSET) {
4251 addReply(c,shared.wrongtypeerr);
4252 } else {
4253 zset *zsetobj = o->ptr;
4254 zskiplist *zsl = zsetobj->zsl;
4255 zskiplistNode *ln;
4256
4257 int llen = zsl->length;
4258 int rangelen, j;
4259 robj *ele;
4260
4261 /* convert negative indexes */
4262 if (start < 0) start = llen+start;
4263 if (end < 0) end = llen+end;
4264 if (start < 0) start = 0;
4265 if (end < 0) end = 0;
4266
4267 /* indexes sanity checks */
4268 if (start > end || start >= llen) {
4269 /* Out of range start or start > end result in empty list */
4270 addReply(c,shared.emptymultibulk);
4271 return;
4272 }
4273 if (end >= llen) end = llen-1;
4274 rangelen = (end-start)+1;
4275
4276 /* Return the result in form of a multi-bulk reply */
4277 if (reverse) {
4278 ln = zsl->tail;
4279 while (start--)
4280 ln = ln->backward;
4281 } else {
4282 ln = zsl->header->forward[0];
4283 while (start--)
4284 ln = ln->forward[0];
4285 }
4286
4287 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4288 for (j = 0; j < rangelen; j++) {
4289 ele = ln->obj;
4290 addReplyBulkLen(c,ele);
4291 addReply(c,ele);
4292 addReply(c,shared.crlf);
4293 ln = reverse ? ln->backward : ln->forward[0];
4294 }
4295 }
4296 }
4297 }
4298
4299 static void zrangeCommand(redisClient *c) {
4300 zrangeGenericCommand(c,0);
4301 }
4302
4303 static void zrevrangeCommand(redisClient *c) {
4304 zrangeGenericCommand(c,1);
4305 }
4306
4307 static void zrangebyscoreCommand(redisClient *c) {
4308 robj *o;
4309 double min = strtod(c->argv[2]->ptr,NULL);
4310 double max = strtod(c->argv[3]->ptr,NULL);
4311
4312 o = lookupKeyRead(c->db,c->argv[1]);
4313 if (o == NULL) {
4314 addReply(c,shared.nullmultibulk);
4315 } else {
4316 if (o->type != REDIS_ZSET) {
4317 addReply(c,shared.wrongtypeerr);
4318 } else {
4319 zset *zsetobj = o->ptr;
4320 zskiplist *zsl = zsetobj->zsl;
4321 zskiplistNode *ln;
4322 robj *ele, *lenobj;
4323 unsigned int rangelen = 0;
4324
4325 /* Get the first node with the score >= min */
4326 ln = zslFirstWithScore(zsl,min);
4327 if (ln == NULL) {
4328 /* No element matching the speciifed interval */
4329 addReply(c,shared.emptymultibulk);
4330 return;
4331 }
4332
4333 /* We don't know in advance how many matching elements there
4334 * are in the list, so we push this object that will represent
4335 * the multi-bulk length in the output buffer, and will "fix"
4336 * it later */
4337 lenobj = createObject(REDIS_STRING,NULL);
4338 addReply(c,lenobj);
4339
4340 while(ln && ln->score <= max) {
4341 ele = ln->obj;
4342 addReplyBulkLen(c,ele);
4343 addReply(c,ele);
4344 addReply(c,shared.crlf);
4345 ln = ln->forward[0];
4346 rangelen++;
4347 }
4348 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4349 }
4350 }
4351 }
4352
4353 static void zcardCommand(redisClient *c) {
4354 robj *o;
4355 zset *zs;
4356
4357 o = lookupKeyRead(c->db,c->argv[1]);
4358 if (o == NULL) {
4359 addReply(c,shared.czero);
4360 return;
4361 } else {
4362 if (o->type != REDIS_ZSET) {
4363 addReply(c,shared.wrongtypeerr);
4364 } else {
4365 zs = o->ptr;
4366 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4367 }
4368 }
4369 }
4370
4371 static void zscoreCommand(redisClient *c) {
4372 robj *o;
4373 zset *zs;
4374
4375 o = lookupKeyRead(c->db,c->argv[1]);
4376 if (o == NULL) {
4377 addReply(c,shared.czero);
4378 return;
4379 } else {
4380 if (o->type != REDIS_ZSET) {
4381 addReply(c,shared.wrongtypeerr);
4382 } else {
4383 dictEntry *de;
4384
4385 zs = o->ptr;
4386 de = dictFind(zs->dict,c->argv[2]);
4387 if (!de) {
4388 addReply(c,shared.nullbulk);
4389 } else {
4390 char buf[128];
4391 double *score = dictGetEntryVal(de);
4392
4393 snprintf(buf,sizeof(buf),"%.16g",*score);
4394 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4395 strlen(buf),buf));
4396 }
4397 }
4398 }
4399 }
4400
4401 /* ========================= Non type-specific commands ==================== */
4402
4403 static void flushdbCommand(redisClient *c) {
4404 server.dirty += dictSize(c->db->dict);
4405 dictEmpty(c->db->dict);
4406 dictEmpty(c->db->expires);
4407 addReply(c,shared.ok);
4408 }
4409
4410 static void flushallCommand(redisClient *c) {
4411 server.dirty += emptyDb();
4412 addReply(c,shared.ok);
4413 rdbSave(server.dbfilename);
4414 server.dirty++;
4415 }
4416
4417 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4418 redisSortOperation *so = zmalloc(sizeof(*so));
4419 so->type = type;
4420 so->pattern = pattern;
4421 return so;
4422 }
4423
4424 /* Return the value associated to the key with a name obtained
4425 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4426 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4427 char *p;
4428 sds spat, ssub;
4429 robj keyobj;
4430 int prefixlen, sublen, postfixlen;
4431 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4432 struct {
4433 long len;
4434 long free;
4435 char buf[REDIS_SORTKEY_MAX+1];
4436 } keyname;
4437
4438 if (subst->encoding == REDIS_ENCODING_RAW)
4439 incrRefCount(subst);
4440 else {
4441 subst = getDecodedObject(subst);
4442 }
4443
4444 spat = pattern->ptr;
4445 ssub = subst->ptr;
4446 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4447 p = strchr(spat,'*');
4448 if (!p) return NULL;
4449
4450 prefixlen = p-spat;
4451 sublen = sdslen(ssub);
4452 postfixlen = sdslen(spat)-(prefixlen+1);
4453 memcpy(keyname.buf,spat,prefixlen);
4454 memcpy(keyname.buf+prefixlen,ssub,sublen);
4455 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4456 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4457 keyname.len = prefixlen+sublen+postfixlen;
4458
4459 keyobj.refcount = 1;
4460 keyobj.type = REDIS_STRING;
4461 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4462
4463 decrRefCount(subst);
4464
4465 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4466 return lookupKeyRead(db,&keyobj);
4467 }
4468
4469 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4470 * the additional parameter is not standard but a BSD-specific we have to
4471 * pass sorting parameters via the global 'server' structure */
4472 static int sortCompare(const void *s1, const void *s2) {
4473 const redisSortObject *so1 = s1, *so2 = s2;
4474 int cmp;
4475
4476 if (!server.sort_alpha) {
4477 /* Numeric sorting. Here it's trivial as we precomputed scores */
4478 if (so1->u.score > so2->u.score) {
4479 cmp = 1;
4480 } else if (so1->u.score < so2->u.score) {
4481 cmp = -1;
4482 } else {
4483 cmp = 0;
4484 }
4485 } else {
4486 /* Alphanumeric sorting */
4487 if (server.sort_bypattern) {
4488 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4489 /* At least one compare object is NULL */
4490 if (so1->u.cmpobj == so2->u.cmpobj)
4491 cmp = 0;
4492 else if (so1->u.cmpobj == NULL)
4493 cmp = -1;
4494 else
4495 cmp = 1;
4496 } else {
4497 /* We have both the objects, use strcoll */
4498 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4499 }
4500 } else {
4501 /* Compare elements directly */
4502 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4503 so2->obj->encoding == REDIS_ENCODING_RAW) {
4504 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4505 } else {
4506 robj *dec1, *dec2;
4507
4508 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4509 so1->obj : getDecodedObject(so1->obj);
4510 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4511 so2->obj : getDecodedObject(so2->obj);
4512 cmp = strcoll(dec1->ptr,dec2->ptr);
4513 if (dec1 != so1->obj) decrRefCount(dec1);
4514 if (dec2 != so2->obj) decrRefCount(dec2);
4515 }
4516 }
4517 }
4518 return server.sort_desc ? -cmp : cmp;
4519 }
4520
4521 /* The SORT command is the most complex command in Redis. Warning: this code
4522 * is optimized for speed and a bit less for readability */
4523 static void sortCommand(redisClient *c) {
4524 list *operations;
4525 int outputlen = 0;
4526 int desc = 0, alpha = 0;
4527 int limit_start = 0, limit_count = -1, start, end;
4528 int j, dontsort = 0, vectorlen;
4529 int getop = 0; /* GET operation counter */
4530 robj *sortval, *sortby = NULL;
4531 redisSortObject *vector; /* Resulting vector to sort */
4532
4533 /* Lookup the key to sort. It must be of the right types */
4534 sortval = lookupKeyRead(c->db,c->argv[1]);
4535 if (sortval == NULL) {
4536 addReply(c,shared.nokeyerr);
4537 return;
4538 }
4539 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4540 addReply(c,shared.wrongtypeerr);
4541 return;
4542 }
4543
4544 /* Create a list of operations to perform for every sorted element.
4545 * Operations can be GET/DEL/INCR/DECR */
4546 operations = listCreate();
4547 listSetFreeMethod(operations,zfree);
4548 j = 2;
4549
4550 /* Now we need to protect sortval incrementing its count, in the future
4551 * SORT may have options able to overwrite/delete keys during the sorting
4552 * and the sorted key itself may get destroied */
4553 incrRefCount(sortval);
4554
4555 /* The SORT command has an SQL-alike syntax, parse it */
4556 while(j < c->argc) {
4557 int leftargs = c->argc-j-1;
4558 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4559 desc = 0;
4560 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4561 desc = 1;
4562 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4563 alpha = 1;
4564 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4565 limit_start = atoi(c->argv[j+1]->ptr);
4566 limit_count = atoi(c->argv[j+2]->ptr);
4567 j+=2;
4568 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4569 sortby = c->argv[j+1];
4570 /* If the BY pattern does not contain '*', i.e. it is constant,
4571 * we don't need to sort nor to lookup the weight keys. */
4572 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4573 j++;
4574 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4575 listAddNodeTail(operations,createSortOperation(
4576 REDIS_SORT_GET,c->argv[j+1]));
4577 getop++;
4578 j++;
4579 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
4580 listAddNodeTail(operations,createSortOperation(
4581 REDIS_SORT_DEL,c->argv[j+1]));
4582 j++;
4583 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
4584 listAddNodeTail(operations,createSortOperation(
4585 REDIS_SORT_INCR,c->argv[j+1]));
4586 j++;
4587 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4588 listAddNodeTail(operations,createSortOperation(
4589 REDIS_SORT_DECR,c->argv[j+1]));
4590 j++;
4591 } else {
4592 decrRefCount(sortval);
4593 listRelease(operations);
4594 addReply(c,shared.syntaxerr);
4595 return;
4596 }
4597 j++;
4598 }
4599
4600 /* Load the sorting vector with all the objects to sort */
4601 vectorlen = (sortval->type == REDIS_LIST) ?
4602 listLength((list*)sortval->ptr) :
4603 dictSize((dict*)sortval->ptr);
4604 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4605 j = 0;
4606 if (sortval->type == REDIS_LIST) {
4607 list *list = sortval->ptr;
4608 listNode *ln;
4609
4610 listRewind(list);
4611 while((ln = listYield(list))) {
4612 robj *ele = ln->value;
4613 vector[j].obj = ele;
4614 vector[j].u.score = 0;
4615 vector[j].u.cmpobj = NULL;
4616 j++;
4617 }
4618 } else {
4619 dict *set = sortval->ptr;
4620 dictIterator *di;
4621 dictEntry *setele;
4622
4623 di = dictGetIterator(set);
4624 while((setele = dictNext(di)) != NULL) {
4625 vector[j].obj = dictGetEntryKey(setele);
4626 vector[j].u.score = 0;
4627 vector[j].u.cmpobj = NULL;
4628 j++;
4629 }
4630 dictReleaseIterator(di);
4631 }
4632 assert(j == vectorlen);
4633
4634 /* Now it's time to load the right scores in the sorting vector */
4635 if (dontsort == 0) {
4636 for (j = 0; j < vectorlen; j++) {
4637 if (sortby) {
4638 robj *byval;
4639
4640 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4641 if (!byval || byval->type != REDIS_STRING) continue;
4642 if (alpha) {
4643 if (byval->encoding == REDIS_ENCODING_RAW) {
4644 vector[j].u.cmpobj = byval;
4645 incrRefCount(byval);
4646 } else {
4647 vector[j].u.cmpobj = getDecodedObject(byval);
4648 }
4649 } else {
4650 if (byval->encoding == REDIS_ENCODING_RAW) {
4651 vector[j].u.score = strtod(byval->ptr,NULL);
4652 } else {
4653 if (byval->encoding == REDIS_ENCODING_INT) {
4654 vector[j].u.score = (long)byval->ptr;
4655 } else
4656 assert(1 != 1);
4657 }
4658 }
4659 } else {
4660 if (!alpha) {
4661 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4662 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4663 else {
4664 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4665 vector[j].u.score = (long) vector[j].obj->ptr;
4666 else
4667 assert(1 != 1);
4668 }
4669 }
4670 }
4671 }
4672 }
4673
4674 /* We are ready to sort the vector... perform a bit of sanity check
4675 * on the LIMIT option too. We'll use a partial version of quicksort. */
4676 start = (limit_start < 0) ? 0 : limit_start;
4677 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4678 if (start >= vectorlen) {
4679 start = vectorlen-1;
4680 end = vectorlen-2;
4681 }
4682 if (end >= vectorlen) end = vectorlen-1;
4683
4684 if (dontsort == 0) {
4685 server.sort_desc = desc;
4686 server.sort_alpha = alpha;
4687 server.sort_bypattern = sortby ? 1 : 0;
4688 if (sortby && (start != 0 || end != vectorlen-1))
4689 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4690 else
4691 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4692 }
4693
4694 /* Send command output to the output buffer, performing the specified
4695 * GET/DEL/INCR/DECR operations if any. */
4696 outputlen = getop ? getop*(end-start+1) : end-start+1;
4697 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4698 for (j = start; j <= end; j++) {
4699 listNode *ln;
4700 if (!getop) {
4701 addReplyBulkLen(c,vector[j].obj);
4702 addReply(c,vector[j].obj);
4703 addReply(c,shared.crlf);
4704 }
4705 listRewind(operations);
4706 while((ln = listYield(operations))) {
4707 redisSortOperation *sop = ln->value;
4708 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4709 vector[j].obj);
4710
4711 if (sop->type == REDIS_SORT_GET) {
4712 if (!val || val->type != REDIS_STRING) {
4713 addReply(c,shared.nullbulk);
4714 } else {
4715 addReplyBulkLen(c,val);
4716 addReply(c,val);
4717 addReply(c,shared.crlf);
4718 }
4719 } else if (sop->type == REDIS_SORT_DEL) {
4720 /* TODO */
4721 }
4722 }
4723 }
4724
4725 /* Cleanup */
4726 decrRefCount(sortval);
4727 listRelease(operations);
4728 for (j = 0; j < vectorlen; j++) {
4729 if (sortby && alpha && vector[j].u.cmpobj)
4730 decrRefCount(vector[j].u.cmpobj);
4731 }
4732 zfree(vector);
4733 }
4734
4735 static void infoCommand(redisClient *c) {
4736 sds info;
4737 time_t uptime = time(NULL)-server.stat_starttime;
4738 int j;
4739
4740 info = sdscatprintf(sdsempty(),
4741 "redis_version:%s\r\n"
4742 "arch_bits:%s\r\n"
4743 "uptime_in_seconds:%d\r\n"
4744 "uptime_in_days:%d\r\n"
4745 "connected_clients:%d\r\n"
4746 "connected_slaves:%d\r\n"
4747 "used_memory:%zu\r\n"
4748 "changes_since_last_save:%lld\r\n"
4749 "bgsave_in_progress:%d\r\n"
4750 "last_save_time:%d\r\n"
4751 "total_connections_received:%lld\r\n"
4752 "total_commands_processed:%lld\r\n"
4753 "role:%s\r\n"
4754 ,REDIS_VERSION,
4755 (sizeof(long) == 8) ? "64" : "32",
4756 uptime,
4757 uptime/(3600*24),
4758 listLength(server.clients)-listLength(server.slaves),
4759 listLength(server.slaves),
4760 server.usedmemory,
4761 server.dirty,
4762 server.bgsaveinprogress,
4763 server.lastsave,
4764 server.stat_numconnections,
4765 server.stat_numcommands,
4766 server.masterhost == NULL ? "master" : "slave"
4767 );
4768 if (server.masterhost) {
4769 info = sdscatprintf(info,
4770 "master_host:%s\r\n"
4771 "master_port:%d\r\n"
4772 "master_link_status:%s\r\n"
4773 "master_last_io_seconds_ago:%d\r\n"
4774 ,server.masterhost,
4775 server.masterport,
4776 (server.replstate == REDIS_REPL_CONNECTED) ?
4777 "up" : "down",
4778 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4779 );
4780 }
4781 for (j = 0; j < server.dbnum; j++) {
4782 long long keys, vkeys;
4783
4784 keys = dictSize(server.db[j].dict);
4785 vkeys = dictSize(server.db[j].expires);
4786 if (keys || vkeys) {
4787 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4788 j, keys, vkeys);
4789 }
4790 }
4791 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4792 addReplySds(c,info);
4793 addReply(c,shared.crlf);
4794 }
4795
4796 static void monitorCommand(redisClient *c) {
4797 /* ignore MONITOR if aleady slave or in monitor mode */
4798 if (c->flags & REDIS_SLAVE) return;
4799
4800 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4801 c->slaveseldb = 0;
4802 listAddNodeTail(server.monitors,c);
4803 addReply(c,shared.ok);
4804 }
4805
4806 /* ================================= Expire ================================= */
4807 static int removeExpire(redisDb *db, robj *key) {
4808 if (dictDelete(db->expires,key) == DICT_OK) {
4809 return 1;
4810 } else {
4811 return 0;
4812 }
4813 }
4814
4815 static int setExpire(redisDb *db, robj *key, time_t when) {
4816 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4817 return 0;
4818 } else {
4819 incrRefCount(key);
4820 return 1;
4821 }
4822 }
4823
4824 /* Return the expire time of the specified key, or -1 if no expire
4825 * is associated with this key (i.e. the key is non volatile) */
4826 static time_t getExpire(redisDb *db, robj *key) {
4827 dictEntry *de;
4828
4829 /* No expire? return ASAP */
4830 if (dictSize(db->expires) == 0 ||
4831 (de = dictFind(db->expires,key)) == NULL) return -1;
4832
4833 return (time_t) dictGetEntryVal(de);
4834 }
4835
4836 static int expireIfNeeded(redisDb *db, robj *key) {
4837 time_t when;
4838 dictEntry *de;
4839
4840 /* No expire? return ASAP */
4841 if (dictSize(db->expires) == 0 ||
4842 (de = dictFind(db->expires,key)) == NULL) return 0;
4843
4844 /* Lookup the expire */
4845 when = (time_t) dictGetEntryVal(de);
4846 if (time(NULL) <= when) return 0;
4847
4848 /* Delete the key */
4849 dictDelete(db->expires,key);
4850 return dictDelete(db->dict,key) == DICT_OK;
4851 }
4852
4853 static int deleteIfVolatile(redisDb *db, robj *key) {
4854 dictEntry *de;
4855
4856 /* No expire? return ASAP */
4857 if (dictSize(db->expires) == 0 ||
4858 (de = dictFind(db->expires,key)) == NULL) return 0;
4859
4860 /* Delete the key */
4861 server.dirty++;
4862 dictDelete(db->expires,key);
4863 return dictDelete(db->dict,key) == DICT_OK;
4864 }
4865
4866 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4867 dictEntry *de;
4868
4869 de = dictFind(c->db->dict,key);
4870 if (de == NULL) {
4871 addReply(c,shared.czero);
4872 return;
4873 }
4874 if (seconds < 0) {
4875 if (deleteKey(c->db,key)) server.dirty++;
4876 addReply(c, shared.cone);
4877 return;
4878 } else {
4879 time_t when = time(NULL)+seconds;
4880 if (setExpire(c->db,key,when)) {
4881 addReply(c,shared.cone);
4882 server.dirty++;
4883 } else {
4884 addReply(c,shared.czero);
4885 }
4886 return;
4887 }
4888 }
4889
4890 static void expireCommand(redisClient *c) {
4891 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
4892 }
4893
4894 static void expireatCommand(redisClient *c) {
4895 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
4896 }
4897
4898 static void ttlCommand(redisClient *c) {
4899 time_t expire;
4900 int ttl = -1;
4901
4902 expire = getExpire(c->db,c->argv[1]);
4903 if (expire != -1) {
4904 ttl = (int) (expire-time(NULL));
4905 if (ttl < 0) ttl = -1;
4906 }
4907 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
4908 }
4909
4910 static void msetGenericCommand(redisClient *c, int nx) {
4911 int j;
4912
4913 if ((c->argc % 2) == 0) {
4914 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
4915 return;
4916 }
4917 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
4918 * set nothing at all if at least one already key exists. */
4919 if (nx) {
4920 for (j = 1; j < c->argc; j += 2) {
4921 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
4922 addReply(c, shared.czero);
4923 return;
4924 }
4925 }
4926 }
4927
4928 for (j = 1; j < c->argc; j += 2) {
4929 int retval;
4930
4931 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
4932 if (retval == DICT_ERR) {
4933 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
4934 incrRefCount(c->argv[j+1]);
4935 } else {
4936 incrRefCount(c->argv[j]);
4937 incrRefCount(c->argv[j+1]);
4938 }
4939 removeExpire(c->db,c->argv[j]);
4940 }
4941 server.dirty += (c->argc-1)/2;
4942 addReply(c, nx ? shared.cone : shared.ok);
4943 }
4944
4945 static void msetCommand(redisClient *c) {
4946 msetGenericCommand(c,0);
4947 }
4948
4949 static void msetnxCommand(redisClient *c) {
4950 msetGenericCommand(c,1);
4951 }
4952
4953 /* =============================== Replication ============================= */
4954
4955 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
4956 ssize_t nwritten, ret = size;
4957 time_t start = time(NULL);
4958
4959 timeout++;
4960 while(size) {
4961 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
4962 nwritten = write(fd,ptr,size);
4963 if (nwritten == -1) return -1;
4964 ptr += nwritten;
4965 size -= nwritten;
4966 }
4967 if ((time(NULL)-start) > timeout) {
4968 errno = ETIMEDOUT;
4969 return -1;
4970 }
4971 }
4972 return ret;
4973 }
4974
4975 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
4976 ssize_t nread, totread = 0;
4977 time_t start = time(NULL);
4978
4979 timeout++;
4980 while(size) {
4981 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
4982 nread = read(fd,ptr,size);
4983 if (nread == -1) return -1;
4984 ptr += nread;
4985 size -= nread;
4986 totread += nread;
4987 }
4988 if ((time(NULL)-start) > timeout) {
4989 errno = ETIMEDOUT;
4990 return -1;
4991 }
4992 }
4993 return totread;
4994 }
4995
4996 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
4997 ssize_t nread = 0;
4998
4999 size--;
5000 while(size) {
5001 char c;
5002
5003 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5004 if (c == '\n') {
5005 *ptr = '\0';
5006 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5007 return nread;
5008 } else {
5009 *ptr++ = c;
5010 *ptr = '\0';
5011 nread++;
5012 }
5013 }
5014 return nread;
5015 }
5016
5017 static void syncCommand(redisClient *c) {
5018 /* ignore SYNC if aleady slave or in monitor mode */
5019 if (c->flags & REDIS_SLAVE) return;
5020
5021 /* SYNC can't be issued when the server has pending data to send to
5022 * the client about already issued commands. We need a fresh reply
5023 * buffer registering the differences between the BGSAVE and the current
5024 * dataset, so that we can copy to other slaves if needed. */
5025 if (listLength(c->reply) != 0) {
5026 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5027 return;
5028 }
5029
5030 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5031 /* Here we need to check if there is a background saving operation
5032 * in progress, or if it is required to start one */
5033 if (server.bgsaveinprogress) {
5034 /* Ok a background save is in progress. Let's check if it is a good
5035 * one for replication, i.e. if there is another slave that is
5036 * registering differences since the server forked to save */
5037 redisClient *slave;
5038 listNode *ln;
5039
5040 listRewind(server.slaves);
5041 while((ln = listYield(server.slaves))) {
5042 slave = ln->value;
5043 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5044 }
5045 if (ln) {
5046 /* Perfect, the server is already registering differences for
5047 * another slave. Set the right state, and copy the buffer. */
5048 listRelease(c->reply);
5049 c->reply = listDup(slave->reply);
5050 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5051 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5052 } else {
5053 /* No way, we need to wait for the next BGSAVE in order to
5054 * register differences */
5055 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5056 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5057 }
5058 } else {
5059 /* Ok we don't have a BGSAVE in progress, let's start one */
5060 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5061 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5062 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5063 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5064 return;
5065 }
5066 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5067 }
5068 c->repldbfd = -1;
5069 c->flags |= REDIS_SLAVE;
5070 c->slaveseldb = 0;
5071 listAddNodeTail(server.slaves,c);
5072 return;
5073 }
5074
5075 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5076 redisClient *slave = privdata;
5077 REDIS_NOTUSED(el);
5078 REDIS_NOTUSED(mask);
5079 char buf[REDIS_IOBUF_LEN];
5080 ssize_t nwritten, buflen;
5081
5082 if (slave->repldboff == 0) {
5083 /* Write the bulk write count before to transfer the DB. In theory here
5084 * we don't know how much room there is in the output buffer of the
5085 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5086 * operations) will never be smaller than the few bytes we need. */
5087 sds bulkcount;
5088
5089 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5090 slave->repldbsize);
5091 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5092 {
5093 sdsfree(bulkcount);
5094 freeClient(slave);
5095 return;
5096 }
5097 sdsfree(bulkcount);
5098 }
5099 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5100 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5101 if (buflen <= 0) {
5102 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5103 (buflen == 0) ? "premature EOF" : strerror(errno));
5104 freeClient(slave);
5105 return;
5106 }
5107 if ((nwritten = write(fd,buf,buflen)) == -1) {
5108 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5109 strerror(errno));
5110 freeClient(slave);
5111 return;
5112 }
5113 slave->repldboff += nwritten;
5114 if (slave->repldboff == slave->repldbsize) {
5115 close(slave->repldbfd);
5116 slave->repldbfd = -1;
5117 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5118 slave->replstate = REDIS_REPL_ONLINE;
5119 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5120 sendReplyToClient, slave, NULL) == AE_ERR) {
5121 freeClient(slave);
5122 return;
5123 }
5124 addReplySds(slave,sdsempty());
5125 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5126 }
5127 }
5128
5129 /* This function is called at the end of every backgrond saving.
5130 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5131 * otherwise REDIS_ERR is passed to the function.
5132 *
5133 * The goal of this function is to handle slaves waiting for a successful
5134 * background saving in order to perform non-blocking synchronization. */
5135 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5136 listNode *ln;
5137 int startbgsave = 0;
5138
5139 listRewind(server.slaves);
5140 while((ln = listYield(server.slaves))) {
5141 redisClient *slave = ln->value;
5142
5143 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5144 startbgsave = 1;
5145 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5146 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5147 struct redis_stat buf;
5148
5149 if (bgsaveerr != REDIS_OK) {
5150 freeClient(slave);
5151 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5152 continue;
5153 }
5154 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5155 redis_fstat(slave->repldbfd,&buf) == -1) {
5156 freeClient(slave);
5157 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5158 continue;
5159 }
5160 slave->repldboff = 0;
5161 slave->repldbsize = buf.st_size;
5162 slave->replstate = REDIS_REPL_SEND_BULK;
5163 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5164 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5165 freeClient(slave);
5166 continue;
5167 }
5168 }
5169 }
5170 if (startbgsave) {
5171 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5172 listRewind(server.slaves);
5173 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5174 while((ln = listYield(server.slaves))) {
5175 redisClient *slave = ln->value;
5176
5177 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5178 freeClient(slave);
5179 }
5180 }
5181 }
5182 }
5183
5184 static int syncWithMaster(void) {
5185 char buf[1024], tmpfile[256];
5186 int dumpsize;
5187 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5188 int dfd;
5189
5190 if (fd == -1) {
5191 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5192 strerror(errno));
5193 return REDIS_ERR;
5194 }
5195 /* Issue the SYNC command */
5196 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5197 close(fd);
5198 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5199 strerror(errno));
5200 return REDIS_ERR;
5201 }
5202 /* Read the bulk write count */
5203 if (syncReadLine(fd,buf,1024,3600) == -1) {
5204 close(fd);
5205 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5206 strerror(errno));
5207 return REDIS_ERR;
5208 }
5209 if (buf[0] != '$') {
5210 close(fd);
5211 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5212 return REDIS_ERR;
5213 }
5214 dumpsize = atoi(buf+1);
5215 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5216 /* Read the bulk write data on a temp file */
5217 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5218 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5219 if (dfd == -1) {
5220 close(fd);
5221 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5222 return REDIS_ERR;
5223 }
5224 while(dumpsize) {
5225 int nread, nwritten;
5226
5227 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5228 if (nread == -1) {
5229 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5230 strerror(errno));
5231 close(fd);
5232 close(dfd);
5233 return REDIS_ERR;
5234 }
5235 nwritten = write(dfd,buf,nread);
5236 if (nwritten == -1) {
5237 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5238 close(fd);
5239 close(dfd);
5240 return REDIS_ERR;
5241 }
5242 dumpsize -= nread;
5243 }
5244 close(dfd);
5245 if (rename(tmpfile,server.dbfilename) == -1) {
5246 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5247 unlink(tmpfile);
5248 close(fd);
5249 return REDIS_ERR;
5250 }
5251 emptyDb();
5252 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5253 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5254 close(fd);
5255 return REDIS_ERR;
5256 }
5257 server.master = createClient(fd);
5258 server.master->flags |= REDIS_MASTER;
5259 server.replstate = REDIS_REPL_CONNECTED;
5260 return REDIS_OK;
5261 }
5262
5263 static void slaveofCommand(redisClient *c) {
5264 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5265 !strcasecmp(c->argv[2]->ptr,"one")) {
5266 if (server.masterhost) {
5267 sdsfree(server.masterhost);
5268 server.masterhost = NULL;
5269 if (server.master) freeClient(server.master);
5270 server.replstate = REDIS_REPL_NONE;
5271 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5272 }
5273 } else {
5274 sdsfree(server.masterhost);
5275 server.masterhost = sdsdup(c->argv[1]->ptr);
5276 server.masterport = atoi(c->argv[2]->ptr);
5277 if (server.master) freeClient(server.master);
5278 server.replstate = REDIS_REPL_CONNECT;
5279 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5280 server.masterhost, server.masterport);
5281 }
5282 addReply(c,shared.ok);
5283 }
5284
5285 /* ============================ Maxmemory directive ======================== */
5286
5287 /* This function gets called when 'maxmemory' is set on the config file to limit
5288 * the max memory used by the server, and we are out of memory.
5289 * This function will try to, in order:
5290 *
5291 * - Free objects from the free list
5292 * - Try to remove keys with an EXPIRE set
5293 *
5294 * It is not possible to free enough memory to reach used-memory < maxmemory
5295 * the server will start refusing commands that will enlarge even more the
5296 * memory usage.
5297 */
5298 static void freeMemoryIfNeeded(void) {
5299 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5300 if (listLength(server.objfreelist)) {
5301 robj *o;
5302
5303 listNode *head = listFirst(server.objfreelist);
5304 o = listNodeValue(head);
5305 listDelNode(server.objfreelist,head);
5306 zfree(o);
5307 } else {
5308 int j, k, freed = 0;
5309
5310 for (j = 0; j < server.dbnum; j++) {
5311 int minttl = -1;
5312 robj *minkey = NULL;
5313 struct dictEntry *de;
5314
5315 if (dictSize(server.db[j].expires)) {
5316 freed = 1;
5317 /* From a sample of three keys drop the one nearest to
5318 * the natural expire */
5319 for (k = 0; k < 3; k++) {
5320 time_t t;
5321
5322 de = dictGetRandomKey(server.db[j].expires);
5323 t = (time_t) dictGetEntryVal(de);
5324 if (minttl == -1 || t < minttl) {
5325 minkey = dictGetEntryKey(de);
5326 minttl = t;
5327 }
5328 }
5329 deleteKey(server.db+j,minkey);
5330 }
5331 }
5332 if (!freed) return; /* nothing to free... */
5333 }
5334 }
5335 }
5336
5337 /* ================================= Debugging ============================== */
5338
5339 static void debugCommand(redisClient *c) {
5340 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5341 *((char*)-1) = 'x';
5342 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5343 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5344 robj *key, *val;
5345
5346 if (!de) {
5347 addReply(c,shared.nokeyerr);
5348 return;
5349 }
5350 key = dictGetEntryKey(de);
5351 val = dictGetEntryVal(de);
5352 addReplySds(c,sdscatprintf(sdsempty(),
5353 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5354 key, key->refcount, val, val->refcount, val->encoding));
5355 } else {
5356 addReplySds(c,sdsnew(
5357 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5358 }
5359 }
5360
5361 #ifdef HAVE_BACKTRACE
5362 static struct redisFunctionSym symsTable[] = {
5363 {"compareStringObjects", (unsigned long)compareStringObjects},
5364 {"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
5365 {"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
5366 {"dictEncObjHash", (unsigned long)dictEncObjHash},
5367 {"incrDecrCommand", (unsigned long)incrDecrCommand},
5368 {"freeStringObject", (unsigned long)freeStringObject},
5369 {"freeListObject", (unsigned long)freeListObject},
5370 {"freeSetObject", (unsigned long)freeSetObject},
5371 {"decrRefCount", (unsigned long)decrRefCount},
5372 {"createObject", (unsigned long)createObject},
5373 {"freeClient", (unsigned long)freeClient},
5374 {"rdbLoad", (unsigned long)rdbLoad},
5375 {"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
5376 {"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
5377 {"addReply", (unsigned long)addReply},
5378 {"addReplySds", (unsigned long)addReplySds},
5379 {"incrRefCount", (unsigned long)incrRefCount},
5380 {"rdbSaveBackground", (unsigned long)rdbSaveBackground},
5381 {"createStringObject", (unsigned long)createStringObject},
5382 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
5383 {"syncWithMaster", (unsigned long)syncWithMaster},
5384 {"tryObjectSharing", (unsigned long)tryObjectSharing},
5385 {"tryObjectEncoding", (unsigned long)tryObjectEncoding},
5386 {"getDecodedObject", (unsigned long)getDecodedObject},
5387 {"removeExpire", (unsigned long)removeExpire},
5388 {"expireIfNeeded", (unsigned long)expireIfNeeded},
5389 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
5390 {"deleteKey", (unsigned long)deleteKey},
5391 {"getExpire", (unsigned long)getExpire},
5392 {"setExpire", (unsigned long)setExpire},
5393 {"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
5394 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
5395 {"authCommand", (unsigned long)authCommand},
5396 {"pingCommand", (unsigned long)pingCommand},
5397 {"echoCommand", (unsigned long)echoCommand},
5398 {"setCommand", (unsigned long)setCommand},
5399 {"setnxCommand", (unsigned long)setnxCommand},
5400 {"getCommand", (unsigned long)getCommand},
5401 {"delCommand", (unsigned long)delCommand},
5402 {"existsCommand", (unsigned long)existsCommand},
5403 {"incrCommand", (unsigned long)incrCommand},
5404 {"decrCommand", (unsigned long)decrCommand},
5405 {"incrbyCommand", (unsigned long)incrbyCommand},
5406 {"decrbyCommand", (unsigned long)decrbyCommand},
5407 {"selectCommand", (unsigned long)selectCommand},
5408 {"randomkeyCommand", (unsigned long)randomkeyCommand},
5409 {"keysCommand", (unsigned long)keysCommand},
5410 {"dbsizeCommand", (unsigned long)dbsizeCommand},
5411 {"lastsaveCommand", (unsigned long)lastsaveCommand},
5412 {"saveCommand", (unsigned long)saveCommand},
5413 {"bgsaveCommand", (unsigned long)bgsaveCommand},
5414 {"shutdownCommand", (unsigned long)shutdownCommand},
5415 {"moveCommand", (unsigned long)moveCommand},
5416 {"renameCommand", (unsigned long)renameCommand},
5417 {"renamenxCommand", (unsigned long)renamenxCommand},
5418 {"lpushCommand", (unsigned long)lpushCommand},
5419 {"rpushCommand", (unsigned long)rpushCommand},
5420 {"lpopCommand", (unsigned long)lpopCommand},
5421 {"rpopCommand", (unsigned long)rpopCommand},
5422 {"llenCommand", (unsigned long)llenCommand},
5423 {"lindexCommand", (unsigned long)lindexCommand},
5424 {"lrangeCommand", (unsigned long)lrangeCommand},
5425 {"ltrimCommand", (unsigned long)ltrimCommand},
5426 {"typeCommand", (unsigned long)typeCommand},
5427 {"lsetCommand", (unsigned long)lsetCommand},
5428 {"saddCommand", (unsigned long)saddCommand},
5429 {"sremCommand", (unsigned long)sremCommand},
5430 {"smoveCommand", (unsigned long)smoveCommand},
5431 {"sismemberCommand", (unsigned long)sismemberCommand},
5432 {"scardCommand", (unsigned long)scardCommand},
5433 {"spopCommand", (unsigned long)spopCommand},
5434 {"srandmemberCommand", (unsigned long)srandmemberCommand},
5435 {"sinterCommand", (unsigned long)sinterCommand},
5436 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
5437 {"sunionCommand", (unsigned long)sunionCommand},
5438 {"sunionstoreCommand", (unsigned long)sunionstoreCommand},
5439 {"sdiffCommand", (unsigned long)sdiffCommand},
5440 {"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
5441 {"syncCommand", (unsigned long)syncCommand},
5442 {"flushdbCommand", (unsigned long)flushdbCommand},
5443 {"flushallCommand", (unsigned long)flushallCommand},
5444 {"sortCommand", (unsigned long)sortCommand},
5445 {"lremCommand", (unsigned long)lremCommand},
5446 {"infoCommand", (unsigned long)infoCommand},
5447 {"mgetCommand", (unsigned long)mgetCommand},
5448 {"monitorCommand", (unsigned long)monitorCommand},
5449 {"expireCommand", (unsigned long)expireCommand},
5450 {"expireatCommand", (unsigned long)expireatCommand},
5451 {"getsetCommand", (unsigned long)getsetCommand},
5452 {"ttlCommand", (unsigned long)ttlCommand},
5453 {"slaveofCommand", (unsigned long)slaveofCommand},
5454 {"debugCommand", (unsigned long)debugCommand},
5455 {"processCommand", (unsigned long)processCommand},
5456 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
5457 {"readQueryFromClient", (unsigned long)readQueryFromClient},
5458 {"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
5459 {"msetGenericCommand", (unsigned long)msetGenericCommand},
5460 {"msetCommand", (unsigned long)msetCommand},
5461 {"msetnxCommand", (unsigned long)msetnxCommand},
5462 {"zslCreateNode", (unsigned long)zslCreateNode},
5463 {"zslCreate", (unsigned long)zslCreate},
5464 {"zslFreeNode",(unsigned long)zslFreeNode},
5465 {"zslFree",(unsigned long)zslFree},
5466 {"zslRandomLevel",(unsigned long)zslRandomLevel},
5467 {"zslInsert",(unsigned long)zslInsert},
5468 {"zslDelete",(unsigned long)zslDelete},
5469 {"createZsetObject",(unsigned long)createZsetObject},
5470 {"zaddCommand",(unsigned long)zaddCommand},
5471 {"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
5472 {"zrangeCommand",(unsigned long)zrangeCommand},
5473 {"zrevrangeCommand",(unsigned long)zrevrangeCommand},
5474 {"zremCommand",(unsigned long)zremCommand},
5475 {"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
5476 {"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
5477 {"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
5478 {NULL,0}
5479 };
5480
5481 /* This function try to convert a pointer into a function name. It's used in
5482 * oreder to provide a backtrace under segmentation fault that's able to
5483 * display functions declared as static (otherwise the backtrace is useless). */
5484 static char *findFuncName(void *pointer, unsigned long *offset){
5485 int i, ret = -1;
5486 unsigned long off, minoff = 0;
5487
5488 /* Try to match against the Symbol with the smallest offset */
5489 for (i=0; symsTable[i].pointer; i++) {
5490 unsigned long lp = (unsigned long) pointer;
5491
5492 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5493 off=lp-symsTable[i].pointer;
5494 if (ret < 0 || off < minoff) {
5495 minoff=off;
5496 ret=i;
5497 }
5498 }
5499 }
5500 if (ret == -1) return NULL;
5501 *offset = minoff;
5502 return symsTable[ret].name;
5503 }
5504
5505 static void *getMcontextEip(ucontext_t *uc) {
5506 #if defined(__FreeBSD__)
5507 return (void*) uc->uc_mcontext.mc_eip;
5508 #elif defined(__dietlibc__)
5509 return (void*) uc->uc_mcontext.eip;
5510 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5511 return (void*) uc->uc_mcontext->__ss.__eip;
5512 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5513 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5514 return (void*) uc->uc_mcontext->__ss.__rip;
5515 #else
5516 return (void*) uc->uc_mcontext->__ss.__eip;
5517 #endif
5518 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5519 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5520 #elif defined(__ia64__) /* Linux IA64 */
5521 return (void*) uc->uc_mcontext.sc_ip;
5522 #else
5523 return NULL;
5524 #endif
5525 }
5526
5527 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5528 void *trace[100];
5529 char **messages = NULL;
5530 int i, trace_size = 0;
5531 unsigned long offset=0;
5532 time_t uptime = time(NULL)-server.stat_starttime;
5533 ucontext_t *uc = (ucontext_t*) secret;
5534 REDIS_NOTUSED(info);
5535
5536 redisLog(REDIS_WARNING,
5537 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5538 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5539 "redis_version:%s; "
5540 "uptime_in_seconds:%d; "
5541 "connected_clients:%d; "
5542 "connected_slaves:%d; "
5543 "used_memory:%zu; "
5544 "changes_since_last_save:%lld; "
5545 "bgsave_in_progress:%d; "
5546 "last_save_time:%d; "
5547 "total_connections_received:%lld; "
5548 "total_commands_processed:%lld; "
5549 "role:%s;"
5550 ,REDIS_VERSION,
5551 uptime,
5552 listLength(server.clients)-listLength(server.slaves),
5553 listLength(server.slaves),
5554 server.usedmemory,
5555 server.dirty,
5556 server.bgsaveinprogress,
5557 server.lastsave,
5558 server.stat_numconnections,
5559 server.stat_numcommands,
5560 server.masterhost == NULL ? "master" : "slave"
5561 ));
5562
5563 trace_size = backtrace(trace, 100);
5564 /* overwrite sigaction with caller's address */
5565 if (getMcontextEip(uc) != NULL) {
5566 trace[1] = getMcontextEip(uc);
5567 }
5568 messages = backtrace_symbols(trace, trace_size);
5569
5570 for (i=1; i<trace_size; ++i) {
5571 char *fn = findFuncName(trace[i], &offset), *p;
5572
5573 p = strchr(messages[i],'+');
5574 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5575 redisLog(REDIS_WARNING,"%s", messages[i]);
5576 } else {
5577 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5578 }
5579 }
5580 free(messages);
5581 exit(0);
5582 }
5583
5584 static void setupSigSegvAction(void) {
5585 struct sigaction act;
5586
5587 sigemptyset (&act.sa_mask);
5588 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5589 * is used. Otherwise, sa_handler is used */
5590 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5591 act.sa_sigaction = segvHandler;
5592 sigaction (SIGSEGV, &act, NULL);
5593 sigaction (SIGBUS, &act, NULL);
5594 sigaction (SIGFPE, &act, NULL);
5595 sigaction (SIGILL, &act, NULL);
5596 sigaction (SIGBUS, &act, NULL);
5597 return;
5598 }
5599 #else /* HAVE_BACKTRACE */
5600 static void setupSigSegvAction(void) {
5601 }
5602 #endif /* HAVE_BACKTRACE */
5603
5604 /* =================================== Main! ================================ */
5605
5606 #ifdef __linux__
5607 int linuxOvercommitMemoryValue(void) {
5608 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5609 char buf[64];
5610
5611 if (!fp) return -1;
5612 if (fgets(buf,64,fp) == NULL) {
5613 fclose(fp);
5614 return -1;
5615 }
5616 fclose(fp);
5617
5618 return atoi(buf);
5619 }
5620
5621 void linuxOvercommitMemoryWarning(void) {
5622 if (linuxOvercommitMemoryValue() == 0) {
5623 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5624 }
5625 }
5626 #endif /* __linux__ */
5627
5628 static void daemonize(void) {
5629 int fd;
5630 FILE *fp;
5631
5632 if (fork() != 0) exit(0); /* parent exits */
5633 setsid(); /* create a new session */
5634
5635 /* Every output goes to /dev/null. If Redis is daemonized but
5636 * the 'logfile' is set to 'stdout' in the configuration file
5637 * it will not log at all. */
5638 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5639 dup2(fd, STDIN_FILENO);
5640 dup2(fd, STDOUT_FILENO);
5641 dup2(fd, STDERR_FILENO);
5642 if (fd > STDERR_FILENO) close(fd);
5643 }
5644 /* Try to write the pid file */
5645 fp = fopen(server.pidfile,"w");
5646 if (fp) {
5647 fprintf(fp,"%d\n",getpid());
5648 fclose(fp);
5649 }
5650 }
5651
5652 int main(int argc, char **argv) {
5653 initServerConfig();
5654 if (argc == 2) {
5655 ResetServerSaveParams();
5656 loadServerConfig(argv[1]);
5657 } else if (argc > 2) {
5658 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5659 exit(1);
5660 } else {
5661 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5662 }
5663 initServer();
5664 if (server.daemonize) daemonize();
5665 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5666 #ifdef __linux__
5667 linuxOvercommitMemoryWarning();
5668 #endif
5669 if (rdbLoad(server.dbfilename) == REDIS_OK)
5670 redisLog(REDIS_NOTICE,"DB loaded from disk");
5671 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5672 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5673 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5674 aeMain(server.el);
5675 aeDeleteEventLoop(server.el);
5676 return 0;
5677 }