]> git.saurik.com Git - redis.git/blame_incremental - redis.c
support for writev implemented but currently ifdef-ed in order to understan why I...
[redis.git] / redis.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#define REDIS_VERSION "1.050"
31
32#include "fmacros.h"
33#include "config.h"
34
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38#include <time.h>
39#include <unistd.h>
40#define __USE_POSIX199309
41#include <signal.h>
42
43#ifdef HAVE_BACKTRACE
44#include <execinfo.h>
45#include <ucontext.h>
46#endif /* HAVE_BACKTRACE */
47
48#include <sys/wait.h>
49#include <errno.h>
50#include <assert.h>
51#include <ctype.h>
52#include <stdarg.h>
53#include <inttypes.h>
54#include <arpa/inet.h>
55#include <sys/stat.h>
56#include <fcntl.h>
57#include <sys/time.h>
58#include <sys/resource.h>
59#include <sys/uio.h>
60#include <limits.h>
61#include <math.h>
62
63#if defined(__sun)
64#include "solarisfixes.h"
65#endif
66
67#include "redis.h"
68#include "ae.h" /* Event driven programming library */
69#include "sds.h" /* Dynamic safe strings */
70#include "anet.h" /* Networking the easy way */
71#include "dict.h" /* Hash tables */
72#include "adlist.h" /* Linked lists */
73#include "zmalloc.h" /* total memory usage aware version of malloc/free */
74#include "lzf.h" /* LZF compression library */
75#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77/* Error codes */
78#define REDIS_OK 0
79#define REDIS_ERR -1
80
81/* Static server configuration */
82#define REDIS_SERVERPORT 6379 /* TCP port */
83#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84#define REDIS_IOBUF_LEN 1024
85#define REDIS_LOADBUF_LEN 1024
86#define REDIS_STATIC_ARGS 4
87#define REDIS_DEFAULT_DBNUM 16
88#define REDIS_CONFIGLINE_MAX 1024
89#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96#define REDIS_WRITEV_THRESHOLD 3
97/* Max number of iovecs used for each writev call */
98#define REDIS_WRITEV_IOVEC_COUNT 256
99
100/* Hash table parameters */
101#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103/* Command flags */
104#define REDIS_CMD_BULK 1 /* Bulk write command */
105#define REDIS_CMD_INLINE 2 /* Inline command */
106/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110#define REDIS_CMD_DENYOOM 4
111
112/* Object types */
113#define REDIS_STRING 0
114#define REDIS_LIST 1
115#define REDIS_SET 2
116#define REDIS_ZSET 3
117#define REDIS_HASH 4
118
119/* Objects encoding */
120#define REDIS_ENCODING_RAW 0 /* Raw representation */
121#define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123/* Object types only used for dumping to disk */
124#define REDIS_EXPIRETIME 253
125#define REDIS_SELECTDB 254
126#define REDIS_EOF 255
127
128/* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141#define REDIS_RDB_6BITLEN 0
142#define REDIS_RDB_14BITLEN 1
143#define REDIS_RDB_32BITLEN 2
144#define REDIS_RDB_ENCVAL 3
145#define REDIS_RDB_LENERR UINT_MAX
146
147/* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155/* Client flags */
156#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157#define REDIS_SLAVE 2 /* This client is a slave server */
158#define REDIS_MASTER 4 /* This client is a master server */
159#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161/* Slave replication state - slave side */
162#define REDIS_REPL_NONE 0 /* No active replication */
163#define REDIS_REPL_CONNECT 1 /* Must connect to master */
164#define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166/* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175/* List related stuff */
176#define REDIS_HEAD 0
177#define REDIS_TAIL 1
178
179/* Sort operations */
180#define REDIS_SORT_GET 0
181#define REDIS_SORT_ASC 1
182#define REDIS_SORT_DESC 2
183#define REDIS_SORTKEY_MAX 1024
184
185/* Log levels */
186#define REDIS_DEBUG 0
187#define REDIS_NOTICE 1
188#define REDIS_WARNING 2
189
190/* Anti-warning macro... */
191#define REDIS_NOTUSED(V) ((void) V)
192
193#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196/* Append only defines */
197#define APPENDFSYNC_NO 0
198#define APPENDFSYNC_ALWAYS 1
199#define APPENDFSYNC_EVERYSEC 2
200
201/*================================= Data types ============================== */
202
203/* A redis object, that is a type able to hold a string / list / set */
204typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210} robj;
211
212typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216} redisDb;
217
218/* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239} redisClient;
240
241struct saveparam {
242 time_t seconds;
243 int changes;
244};
245
246/* Global server state structure */
247struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 int bgsaveinprogress;
279 pid_t bgsavechildpid;
280 struct saveparam *saveparams;
281 int saveparamslen;
282 char *logfile;
283 char *bindaddr;
284 char *dbfilename;
285 char *appendfilename;
286 char *requirepass;
287 int shareobjects;
288 /* Replication related */
289 int isslave;
290 char *masterauth;
291 char *masterhost;
292 int masterport;
293 redisClient *master; /* client that is master for this slave */
294 int replstate;
295 unsigned int maxclients;
296 unsigned long maxmemory;
297 /* Sort parameters - qsort_r() is only available under BSD so we
298 * have to take this state global, in order to pass it to sortCompare() */
299 int sort_desc;
300 int sort_alpha;
301 int sort_bypattern;
302};
303
304typedef void redisCommandProc(redisClient *c);
305struct redisCommand {
306 char *name;
307 redisCommandProc *proc;
308 int arity;
309 int flags;
310};
311
312struct redisFunctionSym {
313 char *name;
314 unsigned long pointer;
315};
316
317typedef struct _redisSortObject {
318 robj *obj;
319 union {
320 double score;
321 robj *cmpobj;
322 } u;
323} redisSortObject;
324
325typedef struct _redisSortOperation {
326 int type;
327 robj *pattern;
328} redisSortOperation;
329
330/* ZSETs use a specialized version of Skiplists */
331
332typedef struct zskiplistNode {
333 struct zskiplistNode **forward;
334 struct zskiplistNode *backward;
335 double score;
336 robj *obj;
337} zskiplistNode;
338
339typedef struct zskiplist {
340 struct zskiplistNode *header, *tail;
341 unsigned long length;
342 int level;
343} zskiplist;
344
345typedef struct zset {
346 dict *dict;
347 zskiplist *zsl;
348} zset;
349
350/* Our shared "common" objects */
351
352struct sharedObjectsStruct {
353 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
354 *colon, *nullbulk, *nullmultibulk,
355 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
356 *outofrangeerr, *plus,
357 *select0, *select1, *select2, *select3, *select4,
358 *select5, *select6, *select7, *select8, *select9;
359} shared;
360
361/* Global vars that are actally used as constants. The following double
362 * values are used for double on-disk serialization, and are initialized
363 * at runtime to avoid strange compiler optimizations. */
364
365static double R_Zero, R_PosInf, R_NegInf, R_Nan;
366
367/*================================ Prototypes =============================== */
368
369static void freeStringObject(robj *o);
370static void freeListObject(robj *o);
371static void freeSetObject(robj *o);
372static void decrRefCount(void *o);
373static robj *createObject(int type, void *ptr);
374static void freeClient(redisClient *c);
375static int rdbLoad(char *filename);
376static void addReply(redisClient *c, robj *obj);
377static void addReplySds(redisClient *c, sds s);
378static void incrRefCount(robj *o);
379static int rdbSaveBackground(char *filename);
380static robj *createStringObject(char *ptr, size_t len);
381static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
382static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
383static int syncWithMaster(void);
384static robj *tryObjectSharing(robj *o);
385static int tryObjectEncoding(robj *o);
386static robj *getDecodedObject(const robj *o);
387static int removeExpire(redisDb *db, robj *key);
388static int expireIfNeeded(redisDb *db, robj *key);
389static int deleteIfVolatile(redisDb *db, robj *key);
390static int deleteKey(redisDb *db, robj *key);
391static time_t getExpire(redisDb *db, robj *key);
392static int setExpire(redisDb *db, robj *key, time_t when);
393static void updateSlavesWaitingBgsave(int bgsaveerr);
394static void freeMemoryIfNeeded(void);
395static int processCommand(redisClient *c);
396static void setupSigSegvAction(void);
397static void rdbRemoveTempFile(pid_t childpid);
398static size_t stringObjectLen(robj *o);
399static void processInputBuffer(redisClient *c);
400static zskiplist *zslCreate(void);
401static void zslFree(zskiplist *zsl);
402static void zslInsert(zskiplist *zsl, double score, robj *obj);
403static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
404
405static void authCommand(redisClient *c);
406static void pingCommand(redisClient *c);
407static void echoCommand(redisClient *c);
408static void setCommand(redisClient *c);
409static void setnxCommand(redisClient *c);
410static void getCommand(redisClient *c);
411static void delCommand(redisClient *c);
412static void existsCommand(redisClient *c);
413static void incrCommand(redisClient *c);
414static void decrCommand(redisClient *c);
415static void incrbyCommand(redisClient *c);
416static void decrbyCommand(redisClient *c);
417static void selectCommand(redisClient *c);
418static void randomkeyCommand(redisClient *c);
419static void keysCommand(redisClient *c);
420static void dbsizeCommand(redisClient *c);
421static void lastsaveCommand(redisClient *c);
422static void saveCommand(redisClient *c);
423static void bgsaveCommand(redisClient *c);
424static void shutdownCommand(redisClient *c);
425static void moveCommand(redisClient *c);
426static void renameCommand(redisClient *c);
427static void renamenxCommand(redisClient *c);
428static void lpushCommand(redisClient *c);
429static void rpushCommand(redisClient *c);
430static void lpopCommand(redisClient *c);
431static void rpopCommand(redisClient *c);
432static void llenCommand(redisClient *c);
433static void lindexCommand(redisClient *c);
434static void lrangeCommand(redisClient *c);
435static void ltrimCommand(redisClient *c);
436static void typeCommand(redisClient *c);
437static void lsetCommand(redisClient *c);
438static void saddCommand(redisClient *c);
439static void sremCommand(redisClient *c);
440static void smoveCommand(redisClient *c);
441static void sismemberCommand(redisClient *c);
442static void scardCommand(redisClient *c);
443static void spopCommand(redisClient *c);
444static void srandmemberCommand(redisClient *c);
445static void sinterCommand(redisClient *c);
446static void sinterstoreCommand(redisClient *c);
447static void sunionCommand(redisClient *c);
448static void sunionstoreCommand(redisClient *c);
449static void sdiffCommand(redisClient *c);
450static void sdiffstoreCommand(redisClient *c);
451static void syncCommand(redisClient *c);
452static void flushdbCommand(redisClient *c);
453static void flushallCommand(redisClient *c);
454static void sortCommand(redisClient *c);
455static void lremCommand(redisClient *c);
456static void rpoplpushcommand(redisClient *c);
457static void infoCommand(redisClient *c);
458static void mgetCommand(redisClient *c);
459static void monitorCommand(redisClient *c);
460static void expireCommand(redisClient *c);
461static void expireatCommand(redisClient *c);
462static void getsetCommand(redisClient *c);
463static void ttlCommand(redisClient *c);
464static void slaveofCommand(redisClient *c);
465static void debugCommand(redisClient *c);
466static void msetCommand(redisClient *c);
467static void msetnxCommand(redisClient *c);
468static void zaddCommand(redisClient *c);
469static void zrangeCommand(redisClient *c);
470static void zrangebyscoreCommand(redisClient *c);
471static void zrevrangeCommand(redisClient *c);
472static void zcardCommand(redisClient *c);
473static void zremCommand(redisClient *c);
474static void zscoreCommand(redisClient *c);
475static void zremrangebyscoreCommand(redisClient *c);
476
477/*================================= Globals ================================= */
478
479/* Global vars */
480static struct redisServer server; /* server global state */
481static struct redisCommand cmdTable[] = {
482 {"get",getCommand,2,REDIS_CMD_INLINE},
483 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"del",delCommand,-2,REDIS_CMD_INLINE},
486 {"exists",existsCommand,2,REDIS_CMD_INLINE},
487 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
490 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
492 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
493 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
494 {"llen",llenCommand,2,REDIS_CMD_INLINE},
495 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
496 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
498 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
499 {"lrem",lremCommand,4,REDIS_CMD_BULK},
500 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
501 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"srem",sremCommand,3,REDIS_CMD_BULK},
503 {"smove",smoveCommand,4,REDIS_CMD_BULK},
504 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
505 {"scard",scardCommand,2,REDIS_CMD_INLINE},
506 {"spop",spopCommand,2,REDIS_CMD_INLINE},
507 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
508 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
511 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
512 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
515 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"zrem",zremCommand,3,REDIS_CMD_BULK},
517 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
518 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
519 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
520 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
521 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
522 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
523 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
524 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
525 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
526 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
527 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
529 {"select",selectCommand,2,REDIS_CMD_INLINE},
530 {"move",moveCommand,3,REDIS_CMD_INLINE},
531 {"rename",renameCommand,3,REDIS_CMD_INLINE},
532 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
533 {"expire",expireCommand,3,REDIS_CMD_INLINE},
534 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
535 {"keys",keysCommand,2,REDIS_CMD_INLINE},
536 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
537 {"auth",authCommand,2,REDIS_CMD_INLINE},
538 {"ping",pingCommand,1,REDIS_CMD_INLINE},
539 {"echo",echoCommand,2,REDIS_CMD_BULK},
540 {"save",saveCommand,1,REDIS_CMD_INLINE},
541 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
542 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
543 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
544 {"type",typeCommand,2,REDIS_CMD_INLINE},
545 {"sync",syncCommand,1,REDIS_CMD_INLINE},
546 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
547 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
548 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
549 {"info",infoCommand,1,REDIS_CMD_INLINE},
550 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
551 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
552 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
553 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
554 {NULL,NULL,0,0}
555};
556
557/*============================ Utility functions ============================ */
558
559/* Glob-style pattern matching. */
560int stringmatchlen(const char *pattern, int patternLen,
561 const char *string, int stringLen, int nocase)
562{
563 while(patternLen) {
564 switch(pattern[0]) {
565 case '*':
566 while (pattern[1] == '*') {
567 pattern++;
568 patternLen--;
569 }
570 if (patternLen == 1)
571 return 1; /* match */
572 while(stringLen) {
573 if (stringmatchlen(pattern+1, patternLen-1,
574 string, stringLen, nocase))
575 return 1; /* match */
576 string++;
577 stringLen--;
578 }
579 return 0; /* no match */
580 break;
581 case '?':
582 if (stringLen == 0)
583 return 0; /* no match */
584 string++;
585 stringLen--;
586 break;
587 case '[':
588 {
589 int not, match;
590
591 pattern++;
592 patternLen--;
593 not = pattern[0] == '^';
594 if (not) {
595 pattern++;
596 patternLen--;
597 }
598 match = 0;
599 while(1) {
600 if (pattern[0] == '\\') {
601 pattern++;
602 patternLen--;
603 if (pattern[0] == string[0])
604 match = 1;
605 } else if (pattern[0] == ']') {
606 break;
607 } else if (patternLen == 0) {
608 pattern--;
609 patternLen++;
610 break;
611 } else if (pattern[1] == '-' && patternLen >= 3) {
612 int start = pattern[0];
613 int end = pattern[2];
614 int c = string[0];
615 if (start > end) {
616 int t = start;
617 start = end;
618 end = t;
619 }
620 if (nocase) {
621 start = tolower(start);
622 end = tolower(end);
623 c = tolower(c);
624 }
625 pattern += 2;
626 patternLen -= 2;
627 if (c >= start && c <= end)
628 match = 1;
629 } else {
630 if (!nocase) {
631 if (pattern[0] == string[0])
632 match = 1;
633 } else {
634 if (tolower((int)pattern[0]) == tolower((int)string[0]))
635 match = 1;
636 }
637 }
638 pattern++;
639 patternLen--;
640 }
641 if (not)
642 match = !match;
643 if (!match)
644 return 0; /* no match */
645 string++;
646 stringLen--;
647 break;
648 }
649 case '\\':
650 if (patternLen >= 2) {
651 pattern++;
652 patternLen--;
653 }
654 /* fall through */
655 default:
656 if (!nocase) {
657 if (pattern[0] != string[0])
658 return 0; /* no match */
659 } else {
660 if (tolower((int)pattern[0]) != tolower((int)string[0]))
661 return 0; /* no match */
662 }
663 string++;
664 stringLen--;
665 break;
666 }
667 pattern++;
668 patternLen--;
669 if (stringLen == 0) {
670 while(*pattern == '*') {
671 pattern++;
672 patternLen--;
673 }
674 break;
675 }
676 }
677 if (patternLen == 0 && stringLen == 0)
678 return 1;
679 return 0;
680}
681
682static void redisLog(int level, const char *fmt, ...) {
683 va_list ap;
684 FILE *fp;
685
686 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
687 if (!fp) return;
688
689 va_start(ap, fmt);
690 if (level >= server.verbosity) {
691 char *c = ".-*";
692 char buf[64];
693 time_t now;
694
695 now = time(NULL);
696 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
697 fprintf(fp,"%s %c ",buf,c[level]);
698 vfprintf(fp, fmt, ap);
699 fprintf(fp,"\n");
700 fflush(fp);
701 }
702 va_end(ap);
703
704 if (server.logfile) fclose(fp);
705}
706
707/*====================== Hash table type implementation ==================== */
708
709/* This is an hash table type that uses the SDS dynamic strings libary as
710 * keys and radis objects as values (objects can hold SDS strings,
711 * lists, sets). */
712
713static void dictVanillaFree(void *privdata, void *val)
714{
715 DICT_NOTUSED(privdata);
716 zfree(val);
717}
718
719static int sdsDictKeyCompare(void *privdata, const void *key1,
720 const void *key2)
721{
722 int l1,l2;
723 DICT_NOTUSED(privdata);
724
725 l1 = sdslen((sds)key1);
726 l2 = sdslen((sds)key2);
727 if (l1 != l2) return 0;
728 return memcmp(key1, key2, l1) == 0;
729}
730
731static void dictRedisObjectDestructor(void *privdata, void *val)
732{
733 DICT_NOTUSED(privdata);
734
735 decrRefCount(val);
736}
737
738static int dictObjKeyCompare(void *privdata, const void *key1,
739 const void *key2)
740{
741 const robj *o1 = key1, *o2 = key2;
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743}
744
745static unsigned int dictObjHash(const void *key) {
746 const robj *o = key;
747 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
748}
749
750static int dictEncObjKeyCompare(void *privdata, const void *key1,
751 const void *key2)
752{
753 const robj *o1 = key1, *o2 = key2;
754
755 if (o1->encoding == REDIS_ENCODING_RAW &&
756 o2->encoding == REDIS_ENCODING_RAW)
757 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
758 else {
759 robj *dec1, *dec2;
760 int cmp;
761
762 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
763 getDecodedObject(o1) : (robj*)o1;
764 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
765 getDecodedObject(o2) : (robj*)o2;
766 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
767 if (dec1 != o1) decrRefCount(dec1);
768 if (dec2 != o2) decrRefCount(dec2);
769 return cmp;
770 }
771}
772
773static unsigned int dictEncObjHash(const void *key) {
774 const robj *o = key;
775
776 if (o->encoding == REDIS_ENCODING_RAW)
777 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
778 else {
779 robj *dec = getDecodedObject(o);
780 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
781 decrRefCount(dec);
782 return hash;
783 }
784}
785
786static dictType setDictType = {
787 dictEncObjHash, /* hash function */
788 NULL, /* key dup */
789 NULL, /* val dup */
790 dictEncObjKeyCompare, /* key compare */
791 dictRedisObjectDestructor, /* key destructor */
792 NULL /* val destructor */
793};
794
795static dictType zsetDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 dictVanillaFree /* val destructor */
802};
803
804static dictType hashDictType = {
805 dictObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictRedisObjectDestructor /* val destructor */
811};
812
813/* ========================= Random utility functions ======================= */
814
815/* Redis generally does not try to recover from out of memory conditions
816 * when allocating objects or strings, it is not clear if it will be possible
817 * to report this condition to the client since the networking layer itself
818 * is based on heap allocation for send buffers, so we simply abort.
819 * At least the code will be simpler to read... */
820static void oom(const char *msg) {
821 fprintf(stderr, "%s: Out of memory\n",msg);
822 fflush(stderr);
823 sleep(1);
824 abort();
825}
826
827/* ====================== Redis server networking stuff ===================== */
828static void closeTimedoutClients(void) {
829 redisClient *c;
830 listNode *ln;
831 time_t now = time(NULL);
832
833 listRewind(server.clients);
834 while ((ln = listYield(server.clients)) != NULL) {
835 c = listNodeValue(ln);
836 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
837 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
838 (now - c->lastinteraction > server.maxidletime)) {
839 redisLog(REDIS_DEBUG,"Closing idle client");
840 freeClient(c);
841 }
842 }
843}
844
845static int htNeedsResize(dict *dict) {
846 long long size, used;
847
848 size = dictSlots(dict);
849 used = dictSize(dict);
850 return (size && used && size > DICT_HT_INITIAL_SIZE &&
851 (used*100/size < REDIS_HT_MINFILL));
852}
853
854/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
855 * we resize the hash table to save memory */
856static void tryResizeHashTables(void) {
857 int j;
858
859 for (j = 0; j < server.dbnum; j++) {
860 if (htNeedsResize(server.db[j].dict)) {
861 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
862 dictResize(server.db[j].dict);
863 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
864 }
865 if (htNeedsResize(server.db[j].expires))
866 dictResize(server.db[j].expires);
867 }
868}
869
870static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
871 int j, loops = server.cronloops++;
872 REDIS_NOTUSED(eventLoop);
873 REDIS_NOTUSED(id);
874 REDIS_NOTUSED(clientData);
875
876 /* Update the global state with the amount of used memory */
877 server.usedmemory = zmalloc_used_memory();
878
879 /* Show some info about non-empty databases */
880 for (j = 0; j < server.dbnum; j++) {
881 long long size, used, vkeys;
882
883 size = dictSlots(server.db[j].dict);
884 used = dictSize(server.db[j].dict);
885 vkeys = dictSize(server.db[j].expires);
886 if (!(loops % 5) && (used || vkeys)) {
887 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
888 /* dictPrintStats(server.dict); */
889 }
890 }
891
892 /* We don't want to resize the hash tables while a bacground saving
893 * is in progress: the saving child is created using fork() that is
894 * implemented with a copy-on-write semantic in most modern systems, so
895 * if we resize the HT while there is the saving child at work actually
896 * a lot of memory movements in the parent will cause a lot of pages
897 * copied. */
898 if (!server.bgsaveinprogress) tryResizeHashTables();
899
900 /* Show information about connected clients */
901 if (!(loops % 5)) {
902 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
903 listLength(server.clients)-listLength(server.slaves),
904 listLength(server.slaves),
905 server.usedmemory,
906 dictSize(server.sharingpool));
907 }
908
909 /* Close connections of timedout clients */
910 if (server.maxidletime && !(loops % 10))
911 closeTimedoutClients();
912
913 /* Check if a background saving in progress terminated */
914 if (server.bgsaveinprogress) {
915 int statloc;
916 if (wait3(&statloc,WNOHANG,NULL)) {
917 int exitcode = WEXITSTATUS(statloc);
918 int bysignal = WIFSIGNALED(statloc);
919
920 if (!bysignal && exitcode == 0) {
921 redisLog(REDIS_NOTICE,
922 "Background saving terminated with success");
923 server.dirty = 0;
924 server.lastsave = time(NULL);
925 } else if (!bysignal && exitcode != 0) {
926 redisLog(REDIS_WARNING, "Background saving error");
927 } else {
928 redisLog(REDIS_WARNING,
929 "Background saving terminated by signal");
930 rdbRemoveTempFile(server.bgsavechildpid);
931 }
932 server.bgsaveinprogress = 0;
933 server.bgsavechildpid = -1;
934 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
935 }
936 } else {
937 /* If there is not a background saving in progress check if
938 * we have to save now */
939 time_t now = time(NULL);
940 for (j = 0; j < server.saveparamslen; j++) {
941 struct saveparam *sp = server.saveparams+j;
942
943 if (server.dirty >= sp->changes &&
944 now-server.lastsave > sp->seconds) {
945 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
946 sp->changes, sp->seconds);
947 rdbSaveBackground(server.dbfilename);
948 break;
949 }
950 }
951 }
952
953 /* Try to expire a few timed out keys. The algorithm used is adaptive and
954 * will use few CPU cycles if there are few expiring keys, otherwise
955 * it will get more aggressive to avoid that too much memory is used by
956 * keys that can be removed from the keyspace. */
957 for (j = 0; j < server.dbnum; j++) {
958 int expired;
959 redisDb *db = server.db+j;
960
961 /* Continue to expire if at the end of the cycle more than 25%
962 * of the keys were expired. */
963 do {
964 int num = dictSize(db->expires);
965 time_t now = time(NULL);
966
967 expired = 0;
968 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
969 num = REDIS_EXPIRELOOKUPS_PER_CRON;
970 while (num--) {
971 dictEntry *de;
972 time_t t;
973
974 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
975 t = (time_t) dictGetEntryVal(de);
976 if (now > t) {
977 deleteKey(db,dictGetEntryKey(de));
978 expired++;
979 }
980 }
981 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
982 }
983
984 /* Check if we should connect to a MASTER */
985 if (server.replstate == REDIS_REPL_CONNECT) {
986 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
987 if (syncWithMaster() == REDIS_OK) {
988 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
989 }
990 }
991 return 1000;
992}
993
994static void createSharedObjects(void) {
995 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
996 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
997 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
998 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
999 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1000 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1001 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1002 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1003 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1004 /* no such key */
1005 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1006 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1007 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1008 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1009 "-ERR no such key\r\n"));
1010 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1011 "-ERR syntax error\r\n"));
1012 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1013 "-ERR source and destination objects are the same\r\n"));
1014 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1015 "-ERR index out of range\r\n"));
1016 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1017 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1018 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1019 shared.select0 = createStringObject("select 0\r\n",10);
1020 shared.select1 = createStringObject("select 1\r\n",10);
1021 shared.select2 = createStringObject("select 2\r\n",10);
1022 shared.select3 = createStringObject("select 3\r\n",10);
1023 shared.select4 = createStringObject("select 4\r\n",10);
1024 shared.select5 = createStringObject("select 5\r\n",10);
1025 shared.select6 = createStringObject("select 6\r\n",10);
1026 shared.select7 = createStringObject("select 7\r\n",10);
1027 shared.select8 = createStringObject("select 8\r\n",10);
1028 shared.select9 = createStringObject("select 9\r\n",10);
1029}
1030
1031static void appendServerSaveParams(time_t seconds, int changes) {
1032 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1033 server.saveparams[server.saveparamslen].seconds = seconds;
1034 server.saveparams[server.saveparamslen].changes = changes;
1035 server.saveparamslen++;
1036}
1037
1038static void resetServerSaveParams() {
1039 zfree(server.saveparams);
1040 server.saveparams = NULL;
1041 server.saveparamslen = 0;
1042}
1043
1044static void initServerConfig() {
1045 server.dbnum = REDIS_DEFAULT_DBNUM;
1046 server.port = REDIS_SERVERPORT;
1047 server.verbosity = REDIS_DEBUG;
1048 server.maxidletime = REDIS_MAXIDLETIME;
1049 server.saveparams = NULL;
1050 server.logfile = NULL; /* NULL = log on standard output */
1051 server.bindaddr = NULL;
1052 server.glueoutputbuf = 1;
1053 server.daemonize = 0;
1054 server.appendonly = 0;
1055 server.appendfsync = APPENDFSYNC_ALWAYS;
1056 server.lastfsync = time(NULL);
1057 server.appendfd = -1;
1058 server.appendseldb = -1; /* Make sure the first time will not match */
1059 server.pidfile = "/var/run/redis.pid";
1060 server.dbfilename = "dump.rdb";
1061 server.appendfilename = "appendonly.log";
1062 server.requirepass = NULL;
1063 server.shareobjects = 0;
1064 server.sharingpoolsize = 1024;
1065 server.maxclients = 0;
1066 server.maxmemory = 0;
1067 resetServerSaveParams();
1068
1069 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1070 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1071 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1072 /* Replication related */
1073 server.isslave = 0;
1074 server.masterauth = NULL;
1075 server.masterhost = NULL;
1076 server.masterport = 6379;
1077 server.master = NULL;
1078 server.replstate = REDIS_REPL_NONE;
1079
1080 /* Double constants initialization */
1081 R_Zero = 0.0;
1082 R_PosInf = 1.0/R_Zero;
1083 R_NegInf = -1.0/R_Zero;
1084 R_Nan = R_Zero/R_Zero;
1085}
1086
1087static void initServer() {
1088 int j;
1089
1090 signal(SIGHUP, SIG_IGN);
1091 signal(SIGPIPE, SIG_IGN);
1092 setupSigSegvAction();
1093
1094 server.clients = listCreate();
1095 server.slaves = listCreate();
1096 server.monitors = listCreate();
1097 server.objfreelist = listCreate();
1098 createSharedObjects();
1099 server.el = aeCreateEventLoop();
1100 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1101 server.sharingpool = dictCreate(&setDictType,NULL);
1102 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1103 if (server.fd == -1) {
1104 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1105 exit(1);
1106 }
1107 for (j = 0; j < server.dbnum; j++) {
1108 server.db[j].dict = dictCreate(&hashDictType,NULL);
1109 server.db[j].expires = dictCreate(&setDictType,NULL);
1110 server.db[j].id = j;
1111 }
1112 server.cronloops = 0;
1113 server.bgsaveinprogress = 0;
1114 server.bgsavechildpid = -1;
1115 server.lastsave = time(NULL);
1116 server.dirty = 0;
1117 server.usedmemory = 0;
1118 server.stat_numcommands = 0;
1119 server.stat_numconnections = 0;
1120 server.stat_starttime = time(NULL);
1121 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1122
1123 if (server.appendonly) {
1124 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1125 if (server.appendfd == -1) {
1126 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1127 strerror(errno));
1128 exit(1);
1129 }
1130 }
1131}
1132
1133/* Empty the whole database */
1134static long long emptyDb() {
1135 int j;
1136 long long removed = 0;
1137
1138 for (j = 0; j < server.dbnum; j++) {
1139 removed += dictSize(server.db[j].dict);
1140 dictEmpty(server.db[j].dict);
1141 dictEmpty(server.db[j].expires);
1142 }
1143 return removed;
1144}
1145
1146static int yesnotoi(char *s) {
1147 if (!strcasecmp(s,"yes")) return 1;
1148 else if (!strcasecmp(s,"no")) return 0;
1149 else return -1;
1150}
1151
1152/* I agree, this is a very rudimental way to load a configuration...
1153 will improve later if the config gets more complex */
1154static void loadServerConfig(char *filename) {
1155 FILE *fp;
1156 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1157 int linenum = 0;
1158 sds line = NULL;
1159
1160 if (filename[0] == '-' && filename[1] == '\0')
1161 fp = stdin;
1162 else {
1163 if ((fp = fopen(filename,"r")) == NULL) {
1164 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1165 exit(1);
1166 }
1167 }
1168
1169 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1170 sds *argv;
1171 int argc, j;
1172
1173 linenum++;
1174 line = sdsnew(buf);
1175 line = sdstrim(line," \t\r\n");
1176
1177 /* Skip comments and blank lines*/
1178 if (line[0] == '#' || line[0] == '\0') {
1179 sdsfree(line);
1180 continue;
1181 }
1182
1183 /* Split into arguments */
1184 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1185 sdstolower(argv[0]);
1186
1187 /* Execute config directives */
1188 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1189 server.maxidletime = atoi(argv[1]);
1190 if (server.maxidletime < 0) {
1191 err = "Invalid timeout value"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1194 server.port = atoi(argv[1]);
1195 if (server.port < 1 || server.port > 65535) {
1196 err = "Invalid port"; goto loaderr;
1197 }
1198 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1199 server.bindaddr = zstrdup(argv[1]);
1200 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1201 int seconds = atoi(argv[1]);
1202 int changes = atoi(argv[2]);
1203 if (seconds < 1 || changes < 0) {
1204 err = "Invalid save parameters"; goto loaderr;
1205 }
1206 appendServerSaveParams(seconds,changes);
1207 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1208 if (chdir(argv[1]) == -1) {
1209 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1210 argv[1], strerror(errno));
1211 exit(1);
1212 }
1213 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1214 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1215 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1216 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1217 else {
1218 err = "Invalid log level. Must be one of debug, notice, warning";
1219 goto loaderr;
1220 }
1221 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1222 FILE *logfp;
1223
1224 server.logfile = zstrdup(argv[1]);
1225 if (!strcasecmp(server.logfile,"stdout")) {
1226 zfree(server.logfile);
1227 server.logfile = NULL;
1228 }
1229 if (server.logfile) {
1230 /* Test if we are able to open the file. The server will not
1231 * be able to abort just for this problem later... */
1232 logfp = fopen(server.logfile,"a");
1233 if (logfp == NULL) {
1234 err = sdscatprintf(sdsempty(),
1235 "Can't open the log file: %s", strerror(errno));
1236 goto loaderr;
1237 }
1238 fclose(logfp);
1239 }
1240 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1241 server.dbnum = atoi(argv[1]);
1242 if (server.dbnum < 1) {
1243 err = "Invalid number of databases"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1246 server.maxclients = atoi(argv[1]);
1247 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1248 server.maxmemory = strtoll(argv[1], NULL, 10);
1249 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1250 server.masterhost = sdsnew(argv[1]);
1251 server.masterport = atoi(argv[2]);
1252 server.replstate = REDIS_REPL_CONNECT;
1253 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1254 server.masterauth = zstrdup(argv[1]);
1255 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1256 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1257 err = "argument must be 'yes' or 'no'"; goto loaderr;
1258 }
1259 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1260 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1261 err = "argument must be 'yes' or 'no'"; goto loaderr;
1262 }
1263 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1264 server.sharingpoolsize = atoi(argv[1]);
1265 if (server.sharingpoolsize < 1) {
1266 err = "invalid object sharing pool size"; goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1269 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1270 err = "argument must be 'yes' or 'no'"; goto loaderr;
1271 }
1272 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1273 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1274 err = "argument must be 'yes' or 'no'"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1277 if (!strcasecmp(argv[1],"no")) {
1278 server.appendfsync = APPENDFSYNC_NO;
1279 } else if (!strcasecmp(argv[1],"always")) {
1280 server.appendfsync = APPENDFSYNC_ALWAYS;
1281 } else if (!strcasecmp(argv[1],"everysec")) {
1282 server.appendfsync = APPENDFSYNC_EVERYSEC;
1283 } else {
1284 err = "argument must be 'no', 'always' or 'everysec'";
1285 goto loaderr;
1286 }
1287 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1288 server.requirepass = zstrdup(argv[1]);
1289 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1290 server.pidfile = zstrdup(argv[1]);
1291 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1292 server.dbfilename = zstrdup(argv[1]);
1293 } else {
1294 err = "Bad directive or wrong number of arguments"; goto loaderr;
1295 }
1296 for (j = 0; j < argc; j++)
1297 sdsfree(argv[j]);
1298 zfree(argv);
1299 sdsfree(line);
1300 }
1301 if (fp != stdin) fclose(fp);
1302 return;
1303
1304loaderr:
1305 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1306 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1307 fprintf(stderr, ">>> '%s'\n", line);
1308 fprintf(stderr, "%s\n", err);
1309 exit(1);
1310}
1311
1312static void freeClientArgv(redisClient *c) {
1313 int j;
1314
1315 for (j = 0; j < c->argc; j++)
1316 decrRefCount(c->argv[j]);
1317 for (j = 0; j < c->mbargc; j++)
1318 decrRefCount(c->mbargv[j]);
1319 c->argc = 0;
1320 c->mbargc = 0;
1321}
1322
1323static void freeClient(redisClient *c) {
1324 listNode *ln;
1325
1326 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1327 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1328 sdsfree(c->querybuf);
1329 listRelease(c->reply);
1330 freeClientArgv(c);
1331 close(c->fd);
1332 ln = listSearchKey(server.clients,c);
1333 assert(ln != NULL);
1334 listDelNode(server.clients,ln);
1335 if (c->flags & REDIS_SLAVE) {
1336 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1337 close(c->repldbfd);
1338 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1339 ln = listSearchKey(l,c);
1340 assert(ln != NULL);
1341 listDelNode(l,ln);
1342 }
1343 if (c->flags & REDIS_MASTER) {
1344 server.master = NULL;
1345 server.replstate = REDIS_REPL_CONNECT;
1346 }
1347 zfree(c->argv);
1348 zfree(c->mbargv);
1349 zfree(c);
1350}
1351
1352static void glueReplyBuffersIfNeeded(redisClient *c) {
1353 int totlen = 0;
1354 listNode *ln;
1355 robj *o;
1356
1357 listRewind(c->reply);
1358 while((ln = listYield(c->reply))) {
1359 o = ln->value;
1360 totlen += sdslen(o->ptr);
1361 /* This optimization makes more sense if we don't have to copy
1362 * too much data */
1363 if (totlen > 1024) return;
1364 }
1365 if (totlen > 0) {
1366 char buf[1024];
1367 int copylen = 0;
1368
1369 listRewind(c->reply);
1370 while((ln = listYield(c->reply))) {
1371 o = ln->value;
1372 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1373 copylen += sdslen(o->ptr);
1374 listDelNode(c->reply,ln);
1375 }
1376 /* Now the output buffer is empty, add the new single element */
1377 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1378 listAddNodeTail(c->reply,o);
1379 }
1380}
1381
1382static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1383 redisClient *c = privdata;
1384 int nwritten = 0, totwritten = 0, objlen;
1385 robj *o;
1386 REDIS_NOTUSED(el);
1387 REDIS_NOTUSED(mask);
1388
1389 if (server.glueoutputbuf && listLength(c->reply) > 1)
1390 glueReplyBuffersIfNeeded(c);
1391
1392 /* Use writev() if we have enough buffers to send */
1393#if 0
1394 if (listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1395 !(c->flags & REDIS_MASTER))
1396 {
1397 sendReplyToClientWritev(el, fd, privdata, mask);
1398 return;
1399 }
1400#endif
1401
1402 while(listLength(c->reply)) {
1403 o = listNodeValue(listFirst(c->reply));
1404 objlen = sdslen(o->ptr);
1405
1406 if (objlen == 0) {
1407 listDelNode(c->reply,listFirst(c->reply));
1408 continue;
1409 }
1410
1411 if (c->flags & REDIS_MASTER) {
1412 /* Don't reply to a master */
1413 nwritten = objlen - c->sentlen;
1414 } else {
1415 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1416 if (nwritten <= 0) break;
1417 }
1418 c->sentlen += nwritten;
1419 totwritten += nwritten;
1420 /* If we fully sent the object on head go to the next one */
1421 if (c->sentlen == objlen) {
1422 listDelNode(c->reply,listFirst(c->reply));
1423 c->sentlen = 0;
1424 }
1425 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1426 * bytes, in a single threaded server it's a good idea to serve
1427 * other clients as well, even if a very large request comes from
1428 * super fast link that is always able to accept data (in real world
1429 * scenario think about 'KEYS *' against the loopback interfae) */
1430 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1431 }
1432 if (nwritten == -1) {
1433 if (errno == EAGAIN) {
1434 nwritten = 0;
1435 } else {
1436 redisLog(REDIS_DEBUG,
1437 "Error writing to client: %s", strerror(errno));
1438 freeClient(c);
1439 return;
1440 }
1441 }
1442 if (totwritten > 0) c->lastinteraction = time(NULL);
1443 if (listLength(c->reply) == 0) {
1444 c->sentlen = 0;
1445 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1446 }
1447}
1448
1449static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1450{
1451 redisClient *c = privdata;
1452 int nwritten = 0, totwritten = 0, objlen, willwrite;
1453 robj *o;
1454 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1455 int offset, ion = 0;
1456 REDIS_NOTUSED(el);
1457 REDIS_NOTUSED(mask);
1458
1459 listNode *node;
1460 while (listLength(c->reply)) {
1461 offset = c->sentlen;
1462 ion = 0;
1463 willwrite = 0;
1464
1465 /* fill-in the iov[] array */
1466 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1467 o = listNodeValue(node);
1468 objlen = sdslen(o->ptr);
1469
1470 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1471 break;
1472
1473 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1474 break; /* no more iovecs */
1475
1476 iov[ion].iov_base = ((char*)o->ptr) + offset;
1477 iov[ion].iov_len = objlen - offset;
1478 willwrite += objlen - offset;
1479 offset = 0; /* just for the first item */
1480 ion++;
1481 }
1482
1483 if(willwrite == 0)
1484 break;
1485
1486 /* write all collected blocks at once */
1487 if((nwritten = writev(fd, iov, ion)) < 0) {
1488 if (errno != EAGAIN) {
1489 redisLog(REDIS_DEBUG,
1490 "Error writing to client: %s", strerror(errno));
1491 freeClient(c);
1492 return;
1493 }
1494 break;
1495 }
1496
1497 totwritten += nwritten;
1498 offset = c->sentlen;
1499
1500 /* remove written robjs from c->reply */
1501 while (nwritten && listLength(c->reply)) {
1502 o = listNodeValue(listFirst(c->reply));
1503 objlen = sdslen(o->ptr);
1504
1505 if(nwritten >= objlen - offset) {
1506 listDelNode(c->reply, listFirst(c->reply));
1507 nwritten -= objlen - offset;
1508 c->sentlen = 0;
1509 } else {
1510 /* partial write */
1511 c->sentlen += nwritten;
1512 break;
1513 }
1514 offset = 0;
1515 }
1516 }
1517
1518 if (totwritten > 0)
1519 c->lastinteraction = time(NULL);
1520
1521 if (listLength(c->reply) == 0) {
1522 c->sentlen = 0;
1523 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1524 }
1525}
1526
1527static struct redisCommand *lookupCommand(char *name) {
1528 int j = 0;
1529 while(cmdTable[j].name != NULL) {
1530 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1531 j++;
1532 }
1533 return NULL;
1534}
1535
1536/* resetClient prepare the client to process the next command */
1537static void resetClient(redisClient *c) {
1538 freeClientArgv(c);
1539 c->bulklen = -1;
1540 c->multibulk = 0;
1541}
1542
1543/* If this function gets called we already read a whole
1544 * command, argments are in the client argv/argc fields.
1545 * processCommand() execute the command or prepare the
1546 * server for a bulk read from the client.
1547 *
1548 * If 1 is returned the client is still alive and valid and
1549 * and other operations can be performed by the caller. Otherwise
1550 * if 0 is returned the client was destroied (i.e. after QUIT). */
1551static int processCommand(redisClient *c) {
1552 struct redisCommand *cmd;
1553 long long dirty;
1554
1555 /* Free some memory if needed (maxmemory setting) */
1556 if (server.maxmemory) freeMemoryIfNeeded();
1557
1558 /* Handle the multi bulk command type. This is an alternative protocol
1559 * supported by Redis in order to receive commands that are composed of
1560 * multiple binary-safe "bulk" arguments. The latency of processing is
1561 * a bit higher but this allows things like multi-sets, so if this
1562 * protocol is used only for MSET and similar commands this is a big win. */
1563 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1564 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1565 if (c->multibulk <= 0) {
1566 resetClient(c);
1567 return 1;
1568 } else {
1569 decrRefCount(c->argv[c->argc-1]);
1570 c->argc--;
1571 return 1;
1572 }
1573 } else if (c->multibulk) {
1574 if (c->bulklen == -1) {
1575 if (((char*)c->argv[0]->ptr)[0] != '$') {
1576 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1577 resetClient(c);
1578 return 1;
1579 } else {
1580 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1581 decrRefCount(c->argv[0]);
1582 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1583 c->argc--;
1584 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1585 resetClient(c);
1586 return 1;
1587 }
1588 c->argc--;
1589 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1590 return 1;
1591 }
1592 } else {
1593 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1594 c->mbargv[c->mbargc] = c->argv[0];
1595 c->mbargc++;
1596 c->argc--;
1597 c->multibulk--;
1598 if (c->multibulk == 0) {
1599 robj **auxargv;
1600 int auxargc;
1601
1602 /* Here we need to swap the multi-bulk argc/argv with the
1603 * normal argc/argv of the client structure. */
1604 auxargv = c->argv;
1605 c->argv = c->mbargv;
1606 c->mbargv = auxargv;
1607
1608 auxargc = c->argc;
1609 c->argc = c->mbargc;
1610 c->mbargc = auxargc;
1611
1612 /* We need to set bulklen to something different than -1
1613 * in order for the code below to process the command without
1614 * to try to read the last argument of a bulk command as
1615 * a special argument. */
1616 c->bulklen = 0;
1617 /* continue below and process the command */
1618 } else {
1619 c->bulklen = -1;
1620 return 1;
1621 }
1622 }
1623 }
1624 /* -- end of multi bulk commands processing -- */
1625
1626 /* The QUIT command is handled as a special case. Normal command
1627 * procs are unable to close the client connection safely */
1628 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1629 freeClient(c);
1630 return 0;
1631 }
1632 cmd = lookupCommand(c->argv[0]->ptr);
1633 if (!cmd) {
1634 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1635 resetClient(c);
1636 return 1;
1637 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1638 (c->argc < -cmd->arity)) {
1639 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1640 resetClient(c);
1641 return 1;
1642 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1643 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1644 resetClient(c);
1645 return 1;
1646 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1647 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1648
1649 decrRefCount(c->argv[c->argc-1]);
1650 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1651 c->argc--;
1652 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1653 resetClient(c);
1654 return 1;
1655 }
1656 c->argc--;
1657 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1658 /* It is possible that the bulk read is already in the
1659 * buffer. Check this condition and handle it accordingly.
1660 * This is just a fast path, alternative to call processInputBuffer().
1661 * It's a good idea since the code is small and this condition
1662 * happens most of the times. */
1663 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1664 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1665 c->argc++;
1666 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1667 } else {
1668 return 1;
1669 }
1670 }
1671 /* Let's try to share objects on the command arguments vector */
1672 if (server.shareobjects) {
1673 int j;
1674 for(j = 1; j < c->argc; j++)
1675 c->argv[j] = tryObjectSharing(c->argv[j]);
1676 }
1677 /* Let's try to encode the bulk object to save space. */
1678 if (cmd->flags & REDIS_CMD_BULK)
1679 tryObjectEncoding(c->argv[c->argc-1]);
1680
1681 /* Check if the user is authenticated */
1682 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1683 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1684 resetClient(c);
1685 return 1;
1686 }
1687
1688 /* Exec the command */
1689 dirty = server.dirty;
1690 cmd->proc(c);
1691 if (server.appendonly && server.dirty-dirty)
1692 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1693 if (server.dirty-dirty && listLength(server.slaves))
1694 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1695 if (listLength(server.monitors))
1696 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1697 server.stat_numcommands++;
1698
1699 /* Prepare the client for the next command */
1700 if (c->flags & REDIS_CLOSE) {
1701 freeClient(c);
1702 return 0;
1703 }
1704 resetClient(c);
1705 return 1;
1706}
1707
1708static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1709 listNode *ln;
1710 int outc = 0, j;
1711 robj **outv;
1712 /* (args*2)+1 is enough room for args, spaces, newlines */
1713 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1714
1715 if (argc <= REDIS_STATIC_ARGS) {
1716 outv = static_outv;
1717 } else {
1718 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1719 }
1720
1721 for (j = 0; j < argc; j++) {
1722 if (j != 0) outv[outc++] = shared.space;
1723 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1724 robj *lenobj;
1725
1726 lenobj = createObject(REDIS_STRING,
1727 sdscatprintf(sdsempty(),"%d\r\n",
1728 stringObjectLen(argv[j])));
1729 lenobj->refcount = 0;
1730 outv[outc++] = lenobj;
1731 }
1732 outv[outc++] = argv[j];
1733 }
1734 outv[outc++] = shared.crlf;
1735
1736 /* Increment all the refcounts at start and decrement at end in order to
1737 * be sure to free objects if there is no slave in a replication state
1738 * able to be feed with commands */
1739 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1740 listRewind(slaves);
1741 while((ln = listYield(slaves))) {
1742 redisClient *slave = ln->value;
1743
1744 /* Don't feed slaves that are still waiting for BGSAVE to start */
1745 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1746
1747 /* Feed all the other slaves, MONITORs and so on */
1748 if (slave->slaveseldb != dictid) {
1749 robj *selectcmd;
1750
1751 switch(dictid) {
1752 case 0: selectcmd = shared.select0; break;
1753 case 1: selectcmd = shared.select1; break;
1754 case 2: selectcmd = shared.select2; break;
1755 case 3: selectcmd = shared.select3; break;
1756 case 4: selectcmd = shared.select4; break;
1757 case 5: selectcmd = shared.select5; break;
1758 case 6: selectcmd = shared.select6; break;
1759 case 7: selectcmd = shared.select7; break;
1760 case 8: selectcmd = shared.select8; break;
1761 case 9: selectcmd = shared.select9; break;
1762 default:
1763 selectcmd = createObject(REDIS_STRING,
1764 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1765 selectcmd->refcount = 0;
1766 break;
1767 }
1768 addReply(slave,selectcmd);
1769 slave->slaveseldb = dictid;
1770 }
1771 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1772 }
1773 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1774 if (outv != static_outv) zfree(outv);
1775}
1776
1777static void processInputBuffer(redisClient *c) {
1778again:
1779 if (c->bulklen == -1) {
1780 /* Read the first line of the query */
1781 char *p = strchr(c->querybuf,'\n');
1782 size_t querylen;
1783
1784 if (p) {
1785 sds query, *argv;
1786 int argc, j;
1787
1788 query = c->querybuf;
1789 c->querybuf = sdsempty();
1790 querylen = 1+(p-(query));
1791 if (sdslen(query) > querylen) {
1792 /* leave data after the first line of the query in the buffer */
1793 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1794 }
1795 *p = '\0'; /* remove "\n" */
1796 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1797 sdsupdatelen(query);
1798
1799 /* Now we can split the query in arguments */
1800 if (sdslen(query) == 0) {
1801 /* Ignore empty query */
1802 sdsfree(query);
1803 return;
1804 }
1805 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1806 sdsfree(query);
1807
1808 if (c->argv) zfree(c->argv);
1809 c->argv = zmalloc(sizeof(robj*)*argc);
1810
1811 for (j = 0; j < argc; j++) {
1812 if (sdslen(argv[j])) {
1813 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1814 c->argc++;
1815 } else {
1816 sdsfree(argv[j]);
1817 }
1818 }
1819 zfree(argv);
1820 /* Execute the command. If the client is still valid
1821 * after processCommand() return and there is something
1822 * on the query buffer try to process the next command. */
1823 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1824 return;
1825 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1826 redisLog(REDIS_DEBUG, "Client protocol error");
1827 freeClient(c);
1828 return;
1829 }
1830 } else {
1831 /* Bulk read handling. Note that if we are at this point
1832 the client already sent a command terminated with a newline,
1833 we are reading the bulk data that is actually the last
1834 argument of the command. */
1835 int qbl = sdslen(c->querybuf);
1836
1837 if (c->bulklen <= qbl) {
1838 /* Copy everything but the final CRLF as final argument */
1839 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1840 c->argc++;
1841 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1842 /* Process the command. If the client is still valid after
1843 * the processing and there is more data in the buffer
1844 * try to parse it. */
1845 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1846 return;
1847 }
1848 }
1849}
1850
1851static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1852 redisClient *c = (redisClient*) privdata;
1853 char buf[REDIS_IOBUF_LEN];
1854 int nread;
1855 REDIS_NOTUSED(el);
1856 REDIS_NOTUSED(mask);
1857
1858 nread = read(fd, buf, REDIS_IOBUF_LEN);
1859 if (nread == -1) {
1860 if (errno == EAGAIN) {
1861 nread = 0;
1862 } else {
1863 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1864 freeClient(c);
1865 return;
1866 }
1867 } else if (nread == 0) {
1868 redisLog(REDIS_DEBUG, "Client closed connection");
1869 freeClient(c);
1870 return;
1871 }
1872 if (nread) {
1873 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1874 c->lastinteraction = time(NULL);
1875 } else {
1876 return;
1877 }
1878 processInputBuffer(c);
1879}
1880
1881static int selectDb(redisClient *c, int id) {
1882 if (id < 0 || id >= server.dbnum)
1883 return REDIS_ERR;
1884 c->db = &server.db[id];
1885 return REDIS_OK;
1886}
1887
1888static void *dupClientReplyValue(void *o) {
1889 incrRefCount((robj*)o);
1890 return 0;
1891}
1892
1893static redisClient *createClient(int fd) {
1894 redisClient *c = zmalloc(sizeof(*c));
1895
1896 anetNonBlock(NULL,fd);
1897 anetTcpNoDelay(NULL,fd);
1898 if (!c) return NULL;
1899 selectDb(c,0);
1900 c->fd = fd;
1901 c->querybuf = sdsempty();
1902 c->argc = 0;
1903 c->argv = NULL;
1904 c->bulklen = -1;
1905 c->multibulk = 0;
1906 c->mbargc = 0;
1907 c->mbargv = NULL;
1908 c->sentlen = 0;
1909 c->flags = 0;
1910 c->lastinteraction = time(NULL);
1911 c->authenticated = 0;
1912 c->replstate = REDIS_REPL_NONE;
1913 c->reply = listCreate();
1914 listSetFreeMethod(c->reply,decrRefCount);
1915 listSetDupMethod(c->reply,dupClientReplyValue);
1916 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1917 readQueryFromClient, c, NULL) == AE_ERR) {
1918 freeClient(c);
1919 return NULL;
1920 }
1921 listAddNodeTail(server.clients,c);
1922 return c;
1923}
1924
1925static void addReply(redisClient *c, robj *obj) {
1926 if (listLength(c->reply) == 0 &&
1927 (c->replstate == REDIS_REPL_NONE ||
1928 c->replstate == REDIS_REPL_ONLINE) &&
1929 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1930 sendReplyToClient, c, NULL) == AE_ERR) return;
1931 if (obj->encoding != REDIS_ENCODING_RAW) {
1932 obj = getDecodedObject(obj);
1933 } else {
1934 incrRefCount(obj);
1935 }
1936 listAddNodeTail(c->reply,obj);
1937}
1938
1939static void addReplySds(redisClient *c, sds s) {
1940 robj *o = createObject(REDIS_STRING,s);
1941 addReply(c,o);
1942 decrRefCount(o);
1943}
1944
1945static void addReplyBulkLen(redisClient *c, robj *obj) {
1946 size_t len;
1947
1948 if (obj->encoding == REDIS_ENCODING_RAW) {
1949 len = sdslen(obj->ptr);
1950 } else {
1951 long n = (long)obj->ptr;
1952
1953 len = 1;
1954 if (n < 0) {
1955 len++;
1956 n = -n;
1957 }
1958 while((n = n/10) != 0) {
1959 len++;
1960 }
1961 }
1962 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1963}
1964
1965static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1966 int cport, cfd;
1967 char cip[128];
1968 redisClient *c;
1969 REDIS_NOTUSED(el);
1970 REDIS_NOTUSED(mask);
1971 REDIS_NOTUSED(privdata);
1972
1973 cfd = anetAccept(server.neterr, fd, cip, &cport);
1974 if (cfd == AE_ERR) {
1975 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1976 return;
1977 }
1978 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1979 if ((c = createClient(cfd)) == NULL) {
1980 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1981 close(cfd); /* May be already closed, just ingore errors */
1982 return;
1983 }
1984 /* If maxclient directive is set and this is one client more... close the
1985 * connection. Note that we create the client instead to check before
1986 * for this condition, since now the socket is already set in nonblocking
1987 * mode and we can send an error for free using the Kernel I/O */
1988 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1989 char *err = "-ERR max number of clients reached\r\n";
1990
1991 /* That's a best effort error message, don't check write errors */
1992 if (write(c->fd,err,strlen(err)) == -1) {
1993 /* Nothing to do, Just to avoid the warning... */
1994 }
1995 freeClient(c);
1996 return;
1997 }
1998 server.stat_numconnections++;
1999}
2000
2001/* ======================= Redis objects implementation ===================== */
2002
2003static robj *createObject(int type, void *ptr) {
2004 robj *o;
2005
2006 if (listLength(server.objfreelist)) {
2007 listNode *head = listFirst(server.objfreelist);
2008 o = listNodeValue(head);
2009 listDelNode(server.objfreelist,head);
2010 } else {
2011 o = zmalloc(sizeof(*o));
2012 }
2013 o->type = type;
2014 o->encoding = REDIS_ENCODING_RAW;
2015 o->ptr = ptr;
2016 o->refcount = 1;
2017 return o;
2018}
2019
2020static robj *createStringObject(char *ptr, size_t len) {
2021 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2022}
2023
2024static robj *createListObject(void) {
2025 list *l = listCreate();
2026
2027 listSetFreeMethod(l,decrRefCount);
2028 return createObject(REDIS_LIST,l);
2029}
2030
2031static robj *createSetObject(void) {
2032 dict *d = dictCreate(&setDictType,NULL);
2033 return createObject(REDIS_SET,d);
2034}
2035
2036static robj *createZsetObject(void) {
2037 zset *zs = zmalloc(sizeof(*zs));
2038
2039 zs->dict = dictCreate(&zsetDictType,NULL);
2040 zs->zsl = zslCreate();
2041 return createObject(REDIS_ZSET,zs);
2042}
2043
2044static void freeStringObject(robj *o) {
2045 if (o->encoding == REDIS_ENCODING_RAW) {
2046 sdsfree(o->ptr);
2047 }
2048}
2049
2050static void freeListObject(robj *o) {
2051 listRelease((list*) o->ptr);
2052}
2053
2054static void freeSetObject(robj *o) {
2055 dictRelease((dict*) o->ptr);
2056}
2057
2058static void freeZsetObject(robj *o) {
2059 zset *zs = o->ptr;
2060
2061 dictRelease(zs->dict);
2062 zslFree(zs->zsl);
2063 zfree(zs);
2064}
2065
2066static void freeHashObject(robj *o) {
2067 dictRelease((dict*) o->ptr);
2068}
2069
2070static void incrRefCount(robj *o) {
2071 o->refcount++;
2072#ifdef DEBUG_REFCOUNT
2073 if (o->type == REDIS_STRING)
2074 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2075#endif
2076}
2077
2078static void decrRefCount(void *obj) {
2079 robj *o = obj;
2080
2081#ifdef DEBUG_REFCOUNT
2082 if (o->type == REDIS_STRING)
2083 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2084#endif
2085 if (--(o->refcount) == 0) {
2086 switch(o->type) {
2087 case REDIS_STRING: freeStringObject(o); break;
2088 case REDIS_LIST: freeListObject(o); break;
2089 case REDIS_SET: freeSetObject(o); break;
2090 case REDIS_ZSET: freeZsetObject(o); break;
2091 case REDIS_HASH: freeHashObject(o); break;
2092 default: assert(0 != 0); break;
2093 }
2094 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2095 !listAddNodeHead(server.objfreelist,o))
2096 zfree(o);
2097 }
2098}
2099
2100static robj *lookupKey(redisDb *db, robj *key) {
2101 dictEntry *de = dictFind(db->dict,key);
2102 return de ? dictGetEntryVal(de) : NULL;
2103}
2104
2105static robj *lookupKeyRead(redisDb *db, robj *key) {
2106 expireIfNeeded(db,key);
2107 return lookupKey(db,key);
2108}
2109
2110static robj *lookupKeyWrite(redisDb *db, robj *key) {
2111 deleteIfVolatile(db,key);
2112 return lookupKey(db,key);
2113}
2114
2115static int deleteKey(redisDb *db, robj *key) {
2116 int retval;
2117
2118 /* We need to protect key from destruction: after the first dictDelete()
2119 * it may happen that 'key' is no longer valid if we don't increment
2120 * it's count. This may happen when we get the object reference directly
2121 * from the hash table with dictRandomKey() or dict iterators */
2122 incrRefCount(key);
2123 if (dictSize(db->expires)) dictDelete(db->expires,key);
2124 retval = dictDelete(db->dict,key);
2125 decrRefCount(key);
2126
2127 return retval == DICT_OK;
2128}
2129
2130/* Try to share an object against the shared objects pool */
2131static robj *tryObjectSharing(robj *o) {
2132 struct dictEntry *de;
2133 unsigned long c;
2134
2135 if (o == NULL || server.shareobjects == 0) return o;
2136
2137 assert(o->type == REDIS_STRING);
2138 de = dictFind(server.sharingpool,o);
2139 if (de) {
2140 robj *shared = dictGetEntryKey(de);
2141
2142 c = ((unsigned long) dictGetEntryVal(de))+1;
2143 dictGetEntryVal(de) = (void*) c;
2144 incrRefCount(shared);
2145 decrRefCount(o);
2146 return shared;
2147 } else {
2148 /* Here we are using a stream algorihtm: Every time an object is
2149 * shared we increment its count, everytime there is a miss we
2150 * recrement the counter of a random object. If this object reaches
2151 * zero we remove the object and put the current object instead. */
2152 if (dictSize(server.sharingpool) >=
2153 server.sharingpoolsize) {
2154 de = dictGetRandomKey(server.sharingpool);
2155 assert(de != NULL);
2156 c = ((unsigned long) dictGetEntryVal(de))-1;
2157 dictGetEntryVal(de) = (void*) c;
2158 if (c == 0) {
2159 dictDelete(server.sharingpool,de->key);
2160 }
2161 } else {
2162 c = 0; /* If the pool is empty we want to add this object */
2163 }
2164 if (c == 0) {
2165 int retval;
2166
2167 retval = dictAdd(server.sharingpool,o,(void*)1);
2168 assert(retval == DICT_OK);
2169 incrRefCount(o);
2170 }
2171 return o;
2172 }
2173}
2174
2175/* Check if the nul-terminated string 's' can be represented by a long
2176 * (that is, is a number that fits into long without any other space or
2177 * character before or after the digits).
2178 *
2179 * If so, the function returns REDIS_OK and *longval is set to the value
2180 * of the number. Otherwise REDIS_ERR is returned */
2181static int isStringRepresentableAsLong(sds s, long *longval) {
2182 char buf[32], *endptr;
2183 long value;
2184 int slen;
2185
2186 value = strtol(s, &endptr, 10);
2187 if (endptr[0] != '\0') return REDIS_ERR;
2188 slen = snprintf(buf,32,"%ld",value);
2189
2190 /* If the number converted back into a string is not identical
2191 * then it's not possible to encode the string as integer */
2192 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2193 if (longval) *longval = value;
2194 return REDIS_OK;
2195}
2196
2197/* Try to encode a string object in order to save space */
2198static int tryObjectEncoding(robj *o) {
2199 long value;
2200 sds s = o->ptr;
2201
2202 if (o->encoding != REDIS_ENCODING_RAW)
2203 return REDIS_ERR; /* Already encoded */
2204
2205 /* It's not save to encode shared objects: shared objects can be shared
2206 * everywhere in the "object space" of Redis. Encoded objects can only
2207 * appear as "values" (and not, for instance, as keys) */
2208 if (o->refcount > 1) return REDIS_ERR;
2209
2210 /* Currently we try to encode only strings */
2211 assert(o->type == REDIS_STRING);
2212
2213 /* Check if we can represent this string as a long integer */
2214 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2215
2216 /* Ok, this object can be encoded */
2217 o->encoding = REDIS_ENCODING_INT;
2218 sdsfree(o->ptr);
2219 o->ptr = (void*) value;
2220 return REDIS_OK;
2221}
2222
2223/* Get a decoded version of an encoded object (returned as a new object) */
2224static robj *getDecodedObject(const robj *o) {
2225 robj *dec;
2226
2227 assert(o->encoding != REDIS_ENCODING_RAW);
2228 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2229 char buf[32];
2230
2231 snprintf(buf,32,"%ld",(long)o->ptr);
2232 dec = createStringObject(buf,strlen(buf));
2233 return dec;
2234 } else {
2235 assert(1 != 1);
2236 }
2237}
2238
2239/* Compare two string objects via strcmp() or alike.
2240 * Note that the objects may be integer-encoded. In such a case we
2241 * use snprintf() to get a string representation of the numbers on the stack
2242 * and compare the strings, it's much faster than calling getDecodedObject(). */
2243static int compareStringObjects(robj *a, robj *b) {
2244 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2245 char bufa[128], bufb[128], *astr, *bstr;
2246 int bothsds = 1;
2247
2248 if (a == b) return 0;
2249 if (a->encoding != REDIS_ENCODING_RAW) {
2250 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2251 astr = bufa;
2252 bothsds = 0;
2253 } else {
2254 astr = a->ptr;
2255 }
2256 if (b->encoding != REDIS_ENCODING_RAW) {
2257 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2258 bstr = bufb;
2259 bothsds = 0;
2260 } else {
2261 bstr = b->ptr;
2262 }
2263 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2264}
2265
2266static size_t stringObjectLen(robj *o) {
2267 assert(o->type == REDIS_STRING);
2268 if (o->encoding == REDIS_ENCODING_RAW) {
2269 return sdslen(o->ptr);
2270 } else {
2271 char buf[32];
2272
2273 return snprintf(buf,32,"%ld",(long)o->ptr);
2274 }
2275}
2276
2277/*============================ DB saving/loading ============================ */
2278
2279static int rdbSaveType(FILE *fp, unsigned char type) {
2280 if (fwrite(&type,1,1,fp) == 0) return -1;
2281 return 0;
2282}
2283
2284static int rdbSaveTime(FILE *fp, time_t t) {
2285 int32_t t32 = (int32_t) t;
2286 if (fwrite(&t32,4,1,fp) == 0) return -1;
2287 return 0;
2288}
2289
2290/* check rdbLoadLen() comments for more info */
2291static int rdbSaveLen(FILE *fp, uint32_t len) {
2292 unsigned char buf[2];
2293
2294 if (len < (1<<6)) {
2295 /* Save a 6 bit len */
2296 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2297 if (fwrite(buf,1,1,fp) == 0) return -1;
2298 } else if (len < (1<<14)) {
2299 /* Save a 14 bit len */
2300 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2301 buf[1] = len&0xFF;
2302 if (fwrite(buf,2,1,fp) == 0) return -1;
2303 } else {
2304 /* Save a 32 bit len */
2305 buf[0] = (REDIS_RDB_32BITLEN<<6);
2306 if (fwrite(buf,1,1,fp) == 0) return -1;
2307 len = htonl(len);
2308 if (fwrite(&len,4,1,fp) == 0) return -1;
2309 }
2310 return 0;
2311}
2312
2313/* String objects in the form "2391" "-100" without any space and with a
2314 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2315 * encoded as integers to save space */
2316static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2317 long long value;
2318 char *endptr, buf[32];
2319
2320 /* Check if it's possible to encode this value as a number */
2321 value = strtoll(s, &endptr, 10);
2322 if (endptr[0] != '\0') return 0;
2323 snprintf(buf,32,"%lld",value);
2324
2325 /* If the number converted back into a string is not identical
2326 * then it's not possible to encode the string as integer */
2327 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2328
2329 /* Finally check if it fits in our ranges */
2330 if (value >= -(1<<7) && value <= (1<<7)-1) {
2331 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2332 enc[1] = value&0xFF;
2333 return 2;
2334 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2335 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2336 enc[1] = value&0xFF;
2337 enc[2] = (value>>8)&0xFF;
2338 return 3;
2339 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2340 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2341 enc[1] = value&0xFF;
2342 enc[2] = (value>>8)&0xFF;
2343 enc[3] = (value>>16)&0xFF;
2344 enc[4] = (value>>24)&0xFF;
2345 return 5;
2346 } else {
2347 return 0;
2348 }
2349}
2350
2351static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2352 unsigned int comprlen, outlen;
2353 unsigned char byte;
2354 void *out;
2355
2356 /* We require at least four bytes compression for this to be worth it */
2357 outlen = sdslen(obj->ptr)-4;
2358 if (outlen <= 0) return 0;
2359 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2360 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2361 if (comprlen == 0) {
2362 zfree(out);
2363 return 0;
2364 }
2365 /* Data compressed! Let's save it on disk */
2366 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2367 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2368 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2369 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2370 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2371 zfree(out);
2372 return comprlen;
2373
2374writeerr:
2375 zfree(out);
2376 return -1;
2377}
2378
2379/* Save a string objet as [len][data] on disk. If the object is a string
2380 * representation of an integer value we try to safe it in a special form */
2381static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2382 size_t len;
2383 int enclen;
2384
2385 len = sdslen(obj->ptr);
2386
2387 /* Try integer encoding */
2388 if (len <= 11) {
2389 unsigned char buf[5];
2390 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2391 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2392 return 0;
2393 }
2394 }
2395
2396 /* Try LZF compression - under 20 bytes it's unable to compress even
2397 * aaaaaaaaaaaaaaaaaa so skip it */
2398 if (len > 20) {
2399 int retval;
2400
2401 retval = rdbSaveLzfStringObject(fp,obj);
2402 if (retval == -1) return -1;
2403 if (retval > 0) return 0;
2404 /* retval == 0 means data can't be compressed, save the old way */
2405 }
2406
2407 /* Store verbatim */
2408 if (rdbSaveLen(fp,len) == -1) return -1;
2409 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2410 return 0;
2411}
2412
2413/* Like rdbSaveStringObjectRaw() but handle encoded objects */
2414static int rdbSaveStringObject(FILE *fp, robj *obj) {
2415 int retval;
2416 robj *dec;
2417
2418 if (obj->encoding != REDIS_ENCODING_RAW) {
2419 dec = getDecodedObject(obj);
2420 retval = rdbSaveStringObjectRaw(fp,dec);
2421 decrRefCount(dec);
2422 return retval;
2423 } else {
2424 return rdbSaveStringObjectRaw(fp,obj);
2425 }
2426}
2427
2428/* Save a double value. Doubles are saved as strings prefixed by an unsigned
2429 * 8 bit integer specifing the length of the representation.
2430 * This 8 bit integer has special values in order to specify the following
2431 * conditions:
2432 * 253: not a number
2433 * 254: + inf
2434 * 255: - inf
2435 */
2436static int rdbSaveDoubleValue(FILE *fp, double val) {
2437 unsigned char buf[128];
2438 int len;
2439
2440 if (isnan(val)) {
2441 buf[0] = 253;
2442 len = 1;
2443 } else if (!isfinite(val)) {
2444 len = 1;
2445 buf[0] = (val < 0) ? 255 : 254;
2446 } else {
2447 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2448 buf[0] = strlen((char*)buf);
2449 len = buf[0]+1;
2450 }
2451 if (fwrite(buf,len,1,fp) == 0) return -1;
2452 return 0;
2453}
2454
2455/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2456static int rdbSave(char *filename) {
2457 dictIterator *di = NULL;
2458 dictEntry *de;
2459 FILE *fp;
2460 char tmpfile[256];
2461 int j;
2462 time_t now = time(NULL);
2463
2464 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2465 fp = fopen(tmpfile,"w");
2466 if (!fp) {
2467 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2468 return REDIS_ERR;
2469 }
2470 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2471 for (j = 0; j < server.dbnum; j++) {
2472 redisDb *db = server.db+j;
2473 dict *d = db->dict;
2474 if (dictSize(d) == 0) continue;
2475 di = dictGetIterator(d);
2476 if (!di) {
2477 fclose(fp);
2478 return REDIS_ERR;
2479 }
2480
2481 /* Write the SELECT DB opcode */
2482 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2483 if (rdbSaveLen(fp,j) == -1) goto werr;
2484
2485 /* Iterate this DB writing every entry */
2486 while((de = dictNext(di)) != NULL) {
2487 robj *key = dictGetEntryKey(de);
2488 robj *o = dictGetEntryVal(de);
2489 time_t expiretime = getExpire(db,key);
2490
2491 /* Save the expire time */
2492 if (expiretime != -1) {
2493 /* If this key is already expired skip it */
2494 if (expiretime < now) continue;
2495 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2496 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2497 }
2498 /* Save the key and associated value */
2499 if (rdbSaveType(fp,o->type) == -1) goto werr;
2500 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2501 if (o->type == REDIS_STRING) {
2502 /* Save a string value */
2503 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2504 } else if (o->type == REDIS_LIST) {
2505 /* Save a list value */
2506 list *list = o->ptr;
2507 listNode *ln;
2508
2509 listRewind(list);
2510 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2511 while((ln = listYield(list))) {
2512 robj *eleobj = listNodeValue(ln);
2513
2514 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2515 }
2516 } else if (o->type == REDIS_SET) {
2517 /* Save a set value */
2518 dict *set = o->ptr;
2519 dictIterator *di = dictGetIterator(set);
2520 dictEntry *de;
2521
2522 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2523 while((de = dictNext(di)) != NULL) {
2524 robj *eleobj = dictGetEntryKey(de);
2525
2526 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2527 }
2528 dictReleaseIterator(di);
2529 } else if (o->type == REDIS_ZSET) {
2530 /* Save a set value */
2531 zset *zs = o->ptr;
2532 dictIterator *di = dictGetIterator(zs->dict);
2533 dictEntry *de;
2534
2535 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2536 while((de = dictNext(di)) != NULL) {
2537 robj *eleobj = dictGetEntryKey(de);
2538 double *score = dictGetEntryVal(de);
2539
2540 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2541 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2542 }
2543 dictReleaseIterator(di);
2544 } else {
2545 assert(0 != 0);
2546 }
2547 }
2548 dictReleaseIterator(di);
2549 }
2550 /* EOF opcode */
2551 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2552
2553 /* Make sure data will not remain on the OS's output buffers */
2554 fflush(fp);
2555 fsync(fileno(fp));
2556 fclose(fp);
2557
2558 /* Use RENAME to make sure the DB file is changed atomically only
2559 * if the generate DB file is ok. */
2560 if (rename(tmpfile,filename) == -1) {
2561 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2562 unlink(tmpfile);
2563 return REDIS_ERR;
2564 }
2565 redisLog(REDIS_NOTICE,"DB saved on disk");
2566 server.dirty = 0;
2567 server.lastsave = time(NULL);
2568 return REDIS_OK;
2569
2570werr:
2571 fclose(fp);
2572 unlink(tmpfile);
2573 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2574 if (di) dictReleaseIterator(di);
2575 return REDIS_ERR;
2576}
2577
2578static int rdbSaveBackground(char *filename) {
2579 pid_t childpid;
2580
2581 if (server.bgsaveinprogress) return REDIS_ERR;
2582 if ((childpid = fork()) == 0) {
2583 /* Child */
2584 close(server.fd);
2585 if (rdbSave(filename) == REDIS_OK) {
2586 exit(0);
2587 } else {
2588 exit(1);
2589 }
2590 } else {
2591 /* Parent */
2592 if (childpid == -1) {
2593 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2594 strerror(errno));
2595 return REDIS_ERR;
2596 }
2597 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2598 server.bgsaveinprogress = 1;
2599 server.bgsavechildpid = childpid;
2600 return REDIS_OK;
2601 }
2602 return REDIS_OK; /* unreached */
2603}
2604
2605static void rdbRemoveTempFile(pid_t childpid) {
2606 char tmpfile[256];
2607
2608 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2609 unlink(tmpfile);
2610}
2611
2612static int rdbLoadType(FILE *fp) {
2613 unsigned char type;
2614 if (fread(&type,1,1,fp) == 0) return -1;
2615 return type;
2616}
2617
2618static time_t rdbLoadTime(FILE *fp) {
2619 int32_t t32;
2620 if (fread(&t32,4,1,fp) == 0) return -1;
2621 return (time_t) t32;
2622}
2623
2624/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2625 * of this file for a description of how this are stored on disk.
2626 *
2627 * isencoded is set to 1 if the readed length is not actually a length but
2628 * an "encoding type", check the above comments for more info */
2629static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2630 unsigned char buf[2];
2631 uint32_t len;
2632
2633 if (isencoded) *isencoded = 0;
2634 if (rdbver == 0) {
2635 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2636 return ntohl(len);
2637 } else {
2638 int type;
2639
2640 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2641 type = (buf[0]&0xC0)>>6;
2642 if (type == REDIS_RDB_6BITLEN) {
2643 /* Read a 6 bit len */
2644 return buf[0]&0x3F;
2645 } else if (type == REDIS_RDB_ENCVAL) {
2646 /* Read a 6 bit len encoding type */
2647 if (isencoded) *isencoded = 1;
2648 return buf[0]&0x3F;
2649 } else if (type == REDIS_RDB_14BITLEN) {
2650 /* Read a 14 bit len */
2651 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2652 return ((buf[0]&0x3F)<<8)|buf[1];
2653 } else {
2654 /* Read a 32 bit len */
2655 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2656 return ntohl(len);
2657 }
2658 }
2659}
2660
2661static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2662 unsigned char enc[4];
2663 long long val;
2664
2665 if (enctype == REDIS_RDB_ENC_INT8) {
2666 if (fread(enc,1,1,fp) == 0) return NULL;
2667 val = (signed char)enc[0];
2668 } else if (enctype == REDIS_RDB_ENC_INT16) {
2669 uint16_t v;
2670 if (fread(enc,2,1,fp) == 0) return NULL;
2671 v = enc[0]|(enc[1]<<8);
2672 val = (int16_t)v;
2673 } else if (enctype == REDIS_RDB_ENC_INT32) {
2674 uint32_t v;
2675 if (fread(enc,4,1,fp) == 0) return NULL;
2676 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2677 val = (int32_t)v;
2678 } else {
2679 val = 0; /* anti-warning */
2680 assert(0!=0);
2681 }
2682 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2683}
2684
2685static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2686 unsigned int len, clen;
2687 unsigned char *c = NULL;
2688 sds val = NULL;
2689
2690 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2691 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2692 if ((c = zmalloc(clen)) == NULL) goto err;
2693 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2694 if (fread(c,clen,1,fp) == 0) goto err;
2695 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2696 zfree(c);
2697 return createObject(REDIS_STRING,val);
2698err:
2699 zfree(c);
2700 sdsfree(val);
2701 return NULL;
2702}
2703
2704static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2705 int isencoded;
2706 uint32_t len;
2707 sds val;
2708
2709 len = rdbLoadLen(fp,rdbver,&isencoded);
2710 if (isencoded) {
2711 switch(len) {
2712 case REDIS_RDB_ENC_INT8:
2713 case REDIS_RDB_ENC_INT16:
2714 case REDIS_RDB_ENC_INT32:
2715 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2716 case REDIS_RDB_ENC_LZF:
2717 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2718 default:
2719 assert(0!=0);
2720 }
2721 }
2722
2723 if (len == REDIS_RDB_LENERR) return NULL;
2724 val = sdsnewlen(NULL,len);
2725 if (len && fread(val,len,1,fp) == 0) {
2726 sdsfree(val);
2727 return NULL;
2728 }
2729 return tryObjectSharing(createObject(REDIS_STRING,val));
2730}
2731
2732/* For information about double serialization check rdbSaveDoubleValue() */
2733static int rdbLoadDoubleValue(FILE *fp, double *val) {
2734 char buf[128];
2735 unsigned char len;
2736
2737 if (fread(&len,1,1,fp) == 0) return -1;
2738 switch(len) {
2739 case 255: *val = R_NegInf; return 0;
2740 case 254: *val = R_PosInf; return 0;
2741 case 253: *val = R_Nan; return 0;
2742 default:
2743 if (fread(buf,len,1,fp) == 0) return -1;
2744 sscanf(buf, "%lg", val);
2745 return 0;
2746 }
2747}
2748
2749static int rdbLoad(char *filename) {
2750 FILE *fp;
2751 robj *keyobj = NULL;
2752 uint32_t dbid;
2753 int type, retval, rdbver;
2754 dict *d = server.db[0].dict;
2755 redisDb *db = server.db+0;
2756 char buf[1024];
2757 time_t expiretime = -1, now = time(NULL);
2758
2759 fp = fopen(filename,"r");
2760 if (!fp) return REDIS_ERR;
2761 if (fread(buf,9,1,fp) == 0) goto eoferr;
2762 buf[9] = '\0';
2763 if (memcmp(buf,"REDIS",5) != 0) {
2764 fclose(fp);
2765 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2766 return REDIS_ERR;
2767 }
2768 rdbver = atoi(buf+5);
2769 if (rdbver > 1) {
2770 fclose(fp);
2771 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2772 return REDIS_ERR;
2773 }
2774 while(1) {
2775 robj *o;
2776
2777 /* Read type. */
2778 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2779 if (type == REDIS_EXPIRETIME) {
2780 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2781 /* We read the time so we need to read the object type again */
2782 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2783 }
2784 if (type == REDIS_EOF) break;
2785 /* Handle SELECT DB opcode as a special case */
2786 if (type == REDIS_SELECTDB) {
2787 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2788 goto eoferr;
2789 if (dbid >= (unsigned)server.dbnum) {
2790 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2791 exit(1);
2792 }
2793 db = server.db+dbid;
2794 d = db->dict;
2795 continue;
2796 }
2797 /* Read key */
2798 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2799
2800 if (type == REDIS_STRING) {
2801 /* Read string value */
2802 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2803 tryObjectEncoding(o);
2804 } else if (type == REDIS_LIST || type == REDIS_SET) {
2805 /* Read list/set value */
2806 uint32_t listlen;
2807
2808 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2809 goto eoferr;
2810 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2811 /* Load every single element of the list/set */
2812 while(listlen--) {
2813 robj *ele;
2814
2815 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2816 tryObjectEncoding(ele);
2817 if (type == REDIS_LIST) {
2818 listAddNodeTail((list*)o->ptr,ele);
2819 } else {
2820 dictAdd((dict*)o->ptr,ele,NULL);
2821 }
2822 }
2823 } else if (type == REDIS_ZSET) {
2824 /* Read list/set value */
2825 uint32_t zsetlen;
2826 zset *zs;
2827
2828 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2829 goto eoferr;
2830 o = createZsetObject();
2831 zs = o->ptr;
2832 /* Load every single element of the list/set */
2833 while(zsetlen--) {
2834 robj *ele;
2835 double *score = zmalloc(sizeof(double));
2836
2837 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2838 tryObjectEncoding(ele);
2839 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2840 dictAdd(zs->dict,ele,score);
2841 zslInsert(zs->zsl,*score,ele);
2842 incrRefCount(ele); /* added to skiplist */
2843 }
2844 } else {
2845 assert(0 != 0);
2846 }
2847 /* Add the new object in the hash table */
2848 retval = dictAdd(d,keyobj,o);
2849 if (retval == DICT_ERR) {
2850 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2851 exit(1);
2852 }
2853 /* Set the expire time if needed */
2854 if (expiretime != -1) {
2855 setExpire(db,keyobj,expiretime);
2856 /* Delete this key if already expired */
2857 if (expiretime < now) deleteKey(db,keyobj);
2858 expiretime = -1;
2859 }
2860 keyobj = o = NULL;
2861 }
2862 fclose(fp);
2863 return REDIS_OK;
2864
2865eoferr: /* unexpected end of file is handled here with a fatal exit */
2866 if (keyobj) decrRefCount(keyobj);
2867 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2868 exit(1);
2869 return REDIS_ERR; /* Just to avoid warning */
2870}
2871
2872/*================================== Commands =============================== */
2873
2874static void authCommand(redisClient *c) {
2875 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2876 c->authenticated = 1;
2877 addReply(c,shared.ok);
2878 } else {
2879 c->authenticated = 0;
2880 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2881 }
2882}
2883
2884static void pingCommand(redisClient *c) {
2885 addReply(c,shared.pong);
2886}
2887
2888static void echoCommand(redisClient *c) {
2889 addReplyBulkLen(c,c->argv[1]);
2890 addReply(c,c->argv[1]);
2891 addReply(c,shared.crlf);
2892}
2893
2894/*=================================== Strings =============================== */
2895
2896static void setGenericCommand(redisClient *c, int nx) {
2897 int retval;
2898
2899 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2900 if (retval == DICT_ERR) {
2901 if (!nx) {
2902 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2903 incrRefCount(c->argv[2]);
2904 } else {
2905 addReply(c,shared.czero);
2906 return;
2907 }
2908 } else {
2909 incrRefCount(c->argv[1]);
2910 incrRefCount(c->argv[2]);
2911 }
2912 server.dirty++;
2913 removeExpire(c->db,c->argv[1]);
2914 addReply(c, nx ? shared.cone : shared.ok);
2915}
2916
2917static void setCommand(redisClient *c) {
2918 setGenericCommand(c,0);
2919}
2920
2921static void setnxCommand(redisClient *c) {
2922 setGenericCommand(c,1);
2923}
2924
2925static void getCommand(redisClient *c) {
2926 robj *o = lookupKeyRead(c->db,c->argv[1]);
2927
2928 if (o == NULL) {
2929 addReply(c,shared.nullbulk);
2930 } else {
2931 if (o->type != REDIS_STRING) {
2932 addReply(c,shared.wrongtypeerr);
2933 } else {
2934 addReplyBulkLen(c,o);
2935 addReply(c,o);
2936 addReply(c,shared.crlf);
2937 }
2938 }
2939}
2940
2941static void getsetCommand(redisClient *c) {
2942 getCommand(c);
2943 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2944 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2945 } else {
2946 incrRefCount(c->argv[1]);
2947 }
2948 incrRefCount(c->argv[2]);
2949 server.dirty++;
2950 removeExpire(c->db,c->argv[1]);
2951}
2952
2953static void mgetCommand(redisClient *c) {
2954 int j;
2955
2956 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2957 for (j = 1; j < c->argc; j++) {
2958 robj *o = lookupKeyRead(c->db,c->argv[j]);
2959 if (o == NULL) {
2960 addReply(c,shared.nullbulk);
2961 } else {
2962 if (o->type != REDIS_STRING) {
2963 addReply(c,shared.nullbulk);
2964 } else {
2965 addReplyBulkLen(c,o);
2966 addReply(c,o);
2967 addReply(c,shared.crlf);
2968 }
2969 }
2970 }
2971}
2972
2973static void incrDecrCommand(redisClient *c, long long incr) {
2974 long long value;
2975 int retval;
2976 robj *o;
2977
2978 o = lookupKeyWrite(c->db,c->argv[1]);
2979 if (o == NULL) {
2980 value = 0;
2981 } else {
2982 if (o->type != REDIS_STRING) {
2983 value = 0;
2984 } else {
2985 char *eptr;
2986
2987 if (o->encoding == REDIS_ENCODING_RAW)
2988 value = strtoll(o->ptr, &eptr, 10);
2989 else if (o->encoding == REDIS_ENCODING_INT)
2990 value = (long)o->ptr;
2991 else
2992 assert(1 != 1);
2993 }
2994 }
2995
2996 value += incr;
2997 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2998 tryObjectEncoding(o);
2999 retval = dictAdd(c->db->dict,c->argv[1],o);
3000 if (retval == DICT_ERR) {
3001 dictReplace(c->db->dict,c->argv[1],o);
3002 removeExpire(c->db,c->argv[1]);
3003 } else {
3004 incrRefCount(c->argv[1]);
3005 }
3006 server.dirty++;
3007 addReply(c,shared.colon);
3008 addReply(c,o);
3009 addReply(c,shared.crlf);
3010}
3011
3012static void incrCommand(redisClient *c) {
3013 incrDecrCommand(c,1);
3014}
3015
3016static void decrCommand(redisClient *c) {
3017 incrDecrCommand(c,-1);
3018}
3019
3020static void incrbyCommand(redisClient *c) {
3021 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3022 incrDecrCommand(c,incr);
3023}
3024
3025static void decrbyCommand(redisClient *c) {
3026 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3027 incrDecrCommand(c,-incr);
3028}
3029
3030/* ========================= Type agnostic commands ========================= */
3031
3032static void delCommand(redisClient *c) {
3033 int deleted = 0, j;
3034
3035 for (j = 1; j < c->argc; j++) {
3036 if (deleteKey(c->db,c->argv[j])) {
3037 server.dirty++;
3038 deleted++;
3039 }
3040 }
3041 switch(deleted) {
3042 case 0:
3043 addReply(c,shared.czero);
3044 break;
3045 case 1:
3046 addReply(c,shared.cone);
3047 break;
3048 default:
3049 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3050 break;
3051 }
3052}
3053
3054static void existsCommand(redisClient *c) {
3055 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3056}
3057
3058static void selectCommand(redisClient *c) {
3059 int id = atoi(c->argv[1]->ptr);
3060
3061 if (selectDb(c,id) == REDIS_ERR) {
3062 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3063 } else {
3064 addReply(c,shared.ok);
3065 }
3066}
3067
3068static void randomkeyCommand(redisClient *c) {
3069 dictEntry *de;
3070
3071 while(1) {
3072 de = dictGetRandomKey(c->db->dict);
3073 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3074 }
3075 if (de == NULL) {
3076 addReply(c,shared.plus);
3077 addReply(c,shared.crlf);
3078 } else {
3079 addReply(c,shared.plus);
3080 addReply(c,dictGetEntryKey(de));
3081 addReply(c,shared.crlf);
3082 }
3083}
3084
3085static void keysCommand(redisClient *c) {
3086 dictIterator *di;
3087 dictEntry *de;
3088 sds pattern = c->argv[1]->ptr;
3089 int plen = sdslen(pattern);
3090 int numkeys = 0, keyslen = 0;
3091 robj *lenobj = createObject(REDIS_STRING,NULL);
3092
3093 di = dictGetIterator(c->db->dict);
3094 addReply(c,lenobj);
3095 decrRefCount(lenobj);
3096 while((de = dictNext(di)) != NULL) {
3097 robj *keyobj = dictGetEntryKey(de);
3098
3099 sds key = keyobj->ptr;
3100 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3101 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3102 if (expireIfNeeded(c->db,keyobj) == 0) {
3103 if (numkeys != 0)
3104 addReply(c,shared.space);
3105 addReply(c,keyobj);
3106 numkeys++;
3107 keyslen += sdslen(key);
3108 }
3109 }
3110 }
3111 dictReleaseIterator(di);
3112 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3113 addReply(c,shared.crlf);
3114}
3115
3116static void dbsizeCommand(redisClient *c) {
3117 addReplySds(c,
3118 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3119}
3120
3121static void lastsaveCommand(redisClient *c) {
3122 addReplySds(c,
3123 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3124}
3125
3126static void typeCommand(redisClient *c) {
3127 robj *o;
3128 char *type;
3129
3130 o = lookupKeyRead(c->db,c->argv[1]);
3131 if (o == NULL) {
3132 type = "+none";
3133 } else {
3134 switch(o->type) {
3135 case REDIS_STRING: type = "+string"; break;
3136 case REDIS_LIST: type = "+list"; break;
3137 case REDIS_SET: type = "+set"; break;
3138 case REDIS_ZSET: type = "+zset"; break;
3139 default: type = "unknown"; break;
3140 }
3141 }
3142 addReplySds(c,sdsnew(type));
3143 addReply(c,shared.crlf);
3144}
3145
3146static void saveCommand(redisClient *c) {
3147 if (server.bgsaveinprogress) {
3148 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3149 return;
3150 }
3151 if (rdbSave(server.dbfilename) == REDIS_OK) {
3152 addReply(c,shared.ok);
3153 } else {
3154 addReply(c,shared.err);
3155 }
3156}
3157
3158static void bgsaveCommand(redisClient *c) {
3159 if (server.bgsaveinprogress) {
3160 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3161 return;
3162 }
3163 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3164 addReply(c,shared.ok);
3165 } else {
3166 addReply(c,shared.err);
3167 }
3168}
3169
3170static void shutdownCommand(redisClient *c) {
3171 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3172 /* Kill the saving child if there is a background saving in progress.
3173 We want to avoid race conditions, for instance our saving child may
3174 overwrite the synchronous saving did by SHUTDOWN. */
3175 if (server.bgsaveinprogress) {
3176 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3177 kill(server.bgsavechildpid,SIGKILL);
3178 rdbRemoveTempFile(server.bgsavechildpid);
3179 }
3180 /* SYNC SAVE */
3181 if (rdbSave(server.dbfilename) == REDIS_OK) {
3182 if (server.daemonize)
3183 unlink(server.pidfile);
3184 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3185 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3186 exit(1);
3187 } else {
3188 /* Ooops.. error saving! The best we can do is to continue operating.
3189 * Note that if there was a background saving process, in the next
3190 * cron() Redis will be notified that the background saving aborted,
3191 * handling special stuff like slaves pending for synchronization... */
3192 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3193 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3194 }
3195}
3196
3197static void renameGenericCommand(redisClient *c, int nx) {
3198 robj *o;
3199
3200 /* To use the same key as src and dst is probably an error */
3201 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3202 addReply(c,shared.sameobjecterr);
3203 return;
3204 }
3205
3206 o = lookupKeyWrite(c->db,c->argv[1]);
3207 if (o == NULL) {
3208 addReply(c,shared.nokeyerr);
3209 return;
3210 }
3211 incrRefCount(o);
3212 deleteIfVolatile(c->db,c->argv[2]);
3213 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3214 if (nx) {
3215 decrRefCount(o);
3216 addReply(c,shared.czero);
3217 return;
3218 }
3219 dictReplace(c->db->dict,c->argv[2],o);
3220 } else {
3221 incrRefCount(c->argv[2]);
3222 }
3223 deleteKey(c->db,c->argv[1]);
3224 server.dirty++;
3225 addReply(c,nx ? shared.cone : shared.ok);
3226}
3227
3228static void renameCommand(redisClient *c) {
3229 renameGenericCommand(c,0);
3230}
3231
3232static void renamenxCommand(redisClient *c) {
3233 renameGenericCommand(c,1);
3234}
3235
3236static void moveCommand(redisClient *c) {
3237 robj *o;
3238 redisDb *src, *dst;
3239 int srcid;
3240
3241 /* Obtain source and target DB pointers */
3242 src = c->db;
3243 srcid = c->db->id;
3244 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3245 addReply(c,shared.outofrangeerr);
3246 return;
3247 }
3248 dst = c->db;
3249 selectDb(c,srcid); /* Back to the source DB */
3250
3251 /* If the user is moving using as target the same
3252 * DB as the source DB it is probably an error. */
3253 if (src == dst) {
3254 addReply(c,shared.sameobjecterr);
3255 return;
3256 }
3257
3258 /* Check if the element exists and get a reference */
3259 o = lookupKeyWrite(c->db,c->argv[1]);
3260 if (!o) {
3261 addReply(c,shared.czero);
3262 return;
3263 }
3264
3265 /* Try to add the element to the target DB */
3266 deleteIfVolatile(dst,c->argv[1]);
3267 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3268 addReply(c,shared.czero);
3269 return;
3270 }
3271 incrRefCount(c->argv[1]);
3272 incrRefCount(o);
3273
3274 /* OK! key moved, free the entry in the source DB */
3275 deleteKey(src,c->argv[1]);
3276 server.dirty++;
3277 addReply(c,shared.cone);
3278}
3279
3280/* =================================== Lists ================================ */
3281static void pushGenericCommand(redisClient *c, int where) {
3282 robj *lobj;
3283 list *list;
3284
3285 lobj = lookupKeyWrite(c->db,c->argv[1]);
3286 if (lobj == NULL) {
3287 lobj = createListObject();
3288 list = lobj->ptr;
3289 if (where == REDIS_HEAD) {
3290 listAddNodeHead(list,c->argv[2]);
3291 } else {
3292 listAddNodeTail(list,c->argv[2]);
3293 }
3294 dictAdd(c->db->dict,c->argv[1],lobj);
3295 incrRefCount(c->argv[1]);
3296 incrRefCount(c->argv[2]);
3297 } else {
3298 if (lobj->type != REDIS_LIST) {
3299 addReply(c,shared.wrongtypeerr);
3300 return;
3301 }
3302 list = lobj->ptr;
3303 if (where == REDIS_HEAD) {
3304 listAddNodeHead(list,c->argv[2]);
3305 } else {
3306 listAddNodeTail(list,c->argv[2]);
3307 }
3308 incrRefCount(c->argv[2]);
3309 }
3310 server.dirty++;
3311 addReply(c,shared.ok);
3312}
3313
3314static void lpushCommand(redisClient *c) {
3315 pushGenericCommand(c,REDIS_HEAD);
3316}
3317
3318static void rpushCommand(redisClient *c) {
3319 pushGenericCommand(c,REDIS_TAIL);
3320}
3321
3322static void llenCommand(redisClient *c) {
3323 robj *o;
3324 list *l;
3325
3326 o = lookupKeyRead(c->db,c->argv[1]);
3327 if (o == NULL) {
3328 addReply(c,shared.czero);
3329 return;
3330 } else {
3331 if (o->type != REDIS_LIST) {
3332 addReply(c,shared.wrongtypeerr);
3333 } else {
3334 l = o->ptr;
3335 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3336 }
3337 }
3338}
3339
3340static void lindexCommand(redisClient *c) {
3341 robj *o;
3342 int index = atoi(c->argv[2]->ptr);
3343
3344 o = lookupKeyRead(c->db,c->argv[1]);
3345 if (o == NULL) {
3346 addReply(c,shared.nullbulk);
3347 } else {
3348 if (o->type != REDIS_LIST) {
3349 addReply(c,shared.wrongtypeerr);
3350 } else {
3351 list *list = o->ptr;
3352 listNode *ln;
3353
3354 ln = listIndex(list, index);
3355 if (ln == NULL) {
3356 addReply(c,shared.nullbulk);
3357 } else {
3358 robj *ele = listNodeValue(ln);
3359 addReplyBulkLen(c,ele);
3360 addReply(c,ele);
3361 addReply(c,shared.crlf);
3362 }
3363 }
3364 }
3365}
3366
3367static void lsetCommand(redisClient *c) {
3368 robj *o;
3369 int index = atoi(c->argv[2]->ptr);
3370
3371 o = lookupKeyWrite(c->db,c->argv[1]);
3372 if (o == NULL) {
3373 addReply(c,shared.nokeyerr);
3374 } else {
3375 if (o->type != REDIS_LIST) {
3376 addReply(c,shared.wrongtypeerr);
3377 } else {
3378 list *list = o->ptr;
3379 listNode *ln;
3380
3381 ln = listIndex(list, index);
3382 if (ln == NULL) {
3383 addReply(c,shared.outofrangeerr);
3384 } else {
3385 robj *ele = listNodeValue(ln);
3386
3387 decrRefCount(ele);
3388 listNodeValue(ln) = c->argv[3];
3389 incrRefCount(c->argv[3]);
3390 addReply(c,shared.ok);
3391 server.dirty++;
3392 }
3393 }
3394 }
3395}
3396
3397static void popGenericCommand(redisClient *c, int where) {
3398 robj *o;
3399
3400 o = lookupKeyWrite(c->db,c->argv[1]);
3401 if (o == NULL) {
3402 addReply(c,shared.nullbulk);
3403 } else {
3404 if (o->type != REDIS_LIST) {
3405 addReply(c,shared.wrongtypeerr);
3406 } else {
3407 list *list = o->ptr;
3408 listNode *ln;
3409
3410 if (where == REDIS_HEAD)
3411 ln = listFirst(list);
3412 else
3413 ln = listLast(list);
3414
3415 if (ln == NULL) {
3416 addReply(c,shared.nullbulk);
3417 } else {
3418 robj *ele = listNodeValue(ln);
3419 addReplyBulkLen(c,ele);
3420 addReply(c,ele);
3421 addReply(c,shared.crlf);
3422 listDelNode(list,ln);
3423 server.dirty++;
3424 }
3425 }
3426 }
3427}
3428
3429static void lpopCommand(redisClient *c) {
3430 popGenericCommand(c,REDIS_HEAD);
3431}
3432
3433static void rpopCommand(redisClient *c) {
3434 popGenericCommand(c,REDIS_TAIL);
3435}
3436
3437static void lrangeCommand(redisClient *c) {
3438 robj *o;
3439 int start = atoi(c->argv[2]->ptr);
3440 int end = atoi(c->argv[3]->ptr);
3441
3442 o = lookupKeyRead(c->db,c->argv[1]);
3443 if (o == NULL) {
3444 addReply(c,shared.nullmultibulk);
3445 } else {
3446 if (o->type != REDIS_LIST) {
3447 addReply(c,shared.wrongtypeerr);
3448 } else {
3449 list *list = o->ptr;
3450 listNode *ln;
3451 int llen = listLength(list);
3452 int rangelen, j;
3453 robj *ele;
3454
3455 /* convert negative indexes */
3456 if (start < 0) start = llen+start;
3457 if (end < 0) end = llen+end;
3458 if (start < 0) start = 0;
3459 if (end < 0) end = 0;
3460
3461 /* indexes sanity checks */
3462 if (start > end || start >= llen) {
3463 /* Out of range start or start > end result in empty list */
3464 addReply(c,shared.emptymultibulk);
3465 return;
3466 }
3467 if (end >= llen) end = llen-1;
3468 rangelen = (end-start)+1;
3469
3470 /* Return the result in form of a multi-bulk reply */
3471 ln = listIndex(list, start);
3472 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3473 for (j = 0; j < rangelen; j++) {
3474 ele = listNodeValue(ln);
3475 addReplyBulkLen(c,ele);
3476 addReply(c,ele);
3477 addReply(c,shared.crlf);
3478 ln = ln->next;
3479 }
3480 }
3481 }
3482}
3483
3484static void ltrimCommand(redisClient *c) {
3485 robj *o;
3486 int start = atoi(c->argv[2]->ptr);
3487 int end = atoi(c->argv[3]->ptr);
3488
3489 o = lookupKeyWrite(c->db,c->argv[1]);
3490 if (o == NULL) {
3491 addReply(c,shared.nokeyerr);
3492 } else {
3493 if (o->type != REDIS_LIST) {
3494 addReply(c,shared.wrongtypeerr);
3495 } else {
3496 list *list = o->ptr;
3497 listNode *ln;
3498 int llen = listLength(list);
3499 int j, ltrim, rtrim;
3500
3501 /* convert negative indexes */
3502 if (start < 0) start = llen+start;
3503 if (end < 0) end = llen+end;
3504 if (start < 0) start = 0;
3505 if (end < 0) end = 0;
3506
3507 /* indexes sanity checks */
3508 if (start > end || start >= llen) {
3509 /* Out of range start or start > end result in empty list */
3510 ltrim = llen;
3511 rtrim = 0;
3512 } else {
3513 if (end >= llen) end = llen-1;
3514 ltrim = start;
3515 rtrim = llen-end-1;
3516 }
3517
3518 /* Remove list elements to perform the trim */
3519 for (j = 0; j < ltrim; j++) {
3520 ln = listFirst(list);
3521 listDelNode(list,ln);
3522 }
3523 for (j = 0; j < rtrim; j++) {
3524 ln = listLast(list);
3525 listDelNode(list,ln);
3526 }
3527 server.dirty++;
3528 addReply(c,shared.ok);
3529 }
3530 }
3531}
3532
3533static void lremCommand(redisClient *c) {
3534 robj *o;
3535
3536 o = lookupKeyWrite(c->db,c->argv[1]);
3537 if (o == NULL) {
3538 addReply(c,shared.czero);
3539 } else {
3540 if (o->type != REDIS_LIST) {
3541 addReply(c,shared.wrongtypeerr);
3542 } else {
3543 list *list = o->ptr;
3544 listNode *ln, *next;
3545 int toremove = atoi(c->argv[2]->ptr);
3546 int removed = 0;
3547 int fromtail = 0;
3548
3549 if (toremove < 0) {
3550 toremove = -toremove;
3551 fromtail = 1;
3552 }
3553 ln = fromtail ? list->tail : list->head;
3554 while (ln) {
3555 robj *ele = listNodeValue(ln);
3556
3557 next = fromtail ? ln->prev : ln->next;
3558 if (compareStringObjects(ele,c->argv[3]) == 0) {
3559 listDelNode(list,ln);
3560 server.dirty++;
3561 removed++;
3562 if (toremove && removed == toremove) break;
3563 }
3564 ln = next;
3565 }
3566 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3567 }
3568 }
3569}
3570
3571/* This is the semantic of this command:
3572 * RPOPLPUSH srclist dstlist:
3573 * IF LLEN(srclist) > 0
3574 * element = RPOP srclist
3575 * LPUSH dstlist element
3576 * RETURN element
3577 * ELSE
3578 * RETURN nil
3579 * END
3580 * END
3581 *
3582 * The idea is to be able to get an element from a list in a reliable way
3583 * since the element is not just returned but pushed against another list
3584 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3585 */
3586static void rpoplpushcommand(redisClient *c) {
3587 robj *sobj;
3588
3589 sobj = lookupKeyWrite(c->db,c->argv[1]);
3590 if (sobj == NULL) {
3591 addReply(c,shared.nullbulk);
3592 } else {
3593 if (sobj->type != REDIS_LIST) {
3594 addReply(c,shared.wrongtypeerr);
3595 } else {
3596 list *srclist = sobj->ptr;
3597 listNode *ln = listLast(srclist);
3598
3599 if (ln == NULL) {
3600 addReply(c,shared.nullbulk);
3601 } else {
3602 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3603 robj *ele = listNodeValue(ln);
3604 list *dstlist;
3605
3606 if (dobj == NULL) {
3607
3608 /* Create the list if the key does not exist */
3609 dobj = createListObject();
3610 dictAdd(c->db->dict,c->argv[2],dobj);
3611 incrRefCount(c->argv[2]);
3612 } else if (dobj->type != REDIS_LIST) {
3613 addReply(c,shared.wrongtypeerr);
3614 return;
3615 }
3616 /* Add the element to the target list */
3617 dstlist = dobj->ptr;
3618 listAddNodeHead(dstlist,ele);
3619 incrRefCount(ele);
3620
3621 /* Send the element to the client as reply as well */
3622 addReplyBulkLen(c,ele);
3623 addReply(c,ele);
3624 addReply(c,shared.crlf);
3625
3626 /* Finally remove the element from the source list */
3627 listDelNode(srclist,ln);
3628 server.dirty++;
3629 }
3630 }
3631 }
3632}
3633
3634
3635/* ==================================== Sets ================================ */
3636
3637static void saddCommand(redisClient *c) {
3638 robj *set;
3639
3640 set = lookupKeyWrite(c->db,c->argv[1]);
3641 if (set == NULL) {
3642 set = createSetObject();
3643 dictAdd(c->db->dict,c->argv[1],set);
3644 incrRefCount(c->argv[1]);
3645 } else {
3646 if (set->type != REDIS_SET) {
3647 addReply(c,shared.wrongtypeerr);
3648 return;
3649 }
3650 }
3651 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3652 incrRefCount(c->argv[2]);
3653 server.dirty++;
3654 addReply(c,shared.cone);
3655 } else {
3656 addReply(c,shared.czero);
3657 }
3658}
3659
3660static void sremCommand(redisClient *c) {
3661 robj *set;
3662
3663 set = lookupKeyWrite(c->db,c->argv[1]);
3664 if (set == NULL) {
3665 addReply(c,shared.czero);
3666 } else {
3667 if (set->type != REDIS_SET) {
3668 addReply(c,shared.wrongtypeerr);
3669 return;
3670 }
3671 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3672 server.dirty++;
3673 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3674 addReply(c,shared.cone);
3675 } else {
3676 addReply(c,shared.czero);
3677 }
3678 }
3679}
3680
3681static void smoveCommand(redisClient *c) {
3682 robj *srcset, *dstset;
3683
3684 srcset = lookupKeyWrite(c->db,c->argv[1]);
3685 dstset = lookupKeyWrite(c->db,c->argv[2]);
3686
3687 /* If the source key does not exist return 0, if it's of the wrong type
3688 * raise an error */
3689 if (srcset == NULL || srcset->type != REDIS_SET) {
3690 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3691 return;
3692 }
3693 /* Error if the destination key is not a set as well */
3694 if (dstset && dstset->type != REDIS_SET) {
3695 addReply(c,shared.wrongtypeerr);
3696 return;
3697 }
3698 /* Remove the element from the source set */
3699 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3700 /* Key not found in the src set! return zero */
3701 addReply(c,shared.czero);
3702 return;
3703 }
3704 server.dirty++;
3705 /* Add the element to the destination set */
3706 if (!dstset) {
3707 dstset = createSetObject();
3708 dictAdd(c->db->dict,c->argv[2],dstset);
3709 incrRefCount(c->argv[2]);
3710 }
3711 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3712 incrRefCount(c->argv[3]);
3713 addReply(c,shared.cone);
3714}
3715
3716static void sismemberCommand(redisClient *c) {
3717 robj *set;
3718
3719 set = lookupKeyRead(c->db,c->argv[1]);
3720 if (set == NULL) {
3721 addReply(c,shared.czero);
3722 } else {
3723 if (set->type != REDIS_SET) {
3724 addReply(c,shared.wrongtypeerr);
3725 return;
3726 }
3727 if (dictFind(set->ptr,c->argv[2]))
3728 addReply(c,shared.cone);
3729 else
3730 addReply(c,shared.czero);
3731 }
3732}
3733
3734static void scardCommand(redisClient *c) {
3735 robj *o;
3736 dict *s;
3737
3738 o = lookupKeyRead(c->db,c->argv[1]);
3739 if (o == NULL) {
3740 addReply(c,shared.czero);
3741 return;
3742 } else {
3743 if (o->type != REDIS_SET) {
3744 addReply(c,shared.wrongtypeerr);
3745 } else {
3746 s = o->ptr;
3747 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3748 dictSize(s)));
3749 }
3750 }
3751}
3752
3753static void spopCommand(redisClient *c) {
3754 robj *set;
3755 dictEntry *de;
3756
3757 set = lookupKeyWrite(c->db,c->argv[1]);
3758 if (set == NULL) {
3759 addReply(c,shared.nullbulk);
3760 } else {
3761 if (set->type != REDIS_SET) {
3762 addReply(c,shared.wrongtypeerr);
3763 return;
3764 }
3765 de = dictGetRandomKey(set->ptr);
3766 if (de == NULL) {
3767 addReply(c,shared.nullbulk);
3768 } else {
3769 robj *ele = dictGetEntryKey(de);
3770
3771 addReplyBulkLen(c,ele);
3772 addReply(c,ele);
3773 addReply(c,shared.crlf);
3774 dictDelete(set->ptr,ele);
3775 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3776 server.dirty++;
3777 }
3778 }
3779}
3780
3781static void srandmemberCommand(redisClient *c) {
3782 robj *set;
3783 dictEntry *de;
3784
3785 set = lookupKeyRead(c->db,c->argv[1]);
3786 if (set == NULL) {
3787 addReply(c,shared.nullbulk);
3788 } else {
3789 if (set->type != REDIS_SET) {
3790 addReply(c,shared.wrongtypeerr);
3791 return;
3792 }
3793 de = dictGetRandomKey(set->ptr);
3794 if (de == NULL) {
3795 addReply(c,shared.nullbulk);
3796 } else {
3797 robj *ele = dictGetEntryKey(de);
3798
3799 addReplyBulkLen(c,ele);
3800 addReply(c,ele);
3801 addReply(c,shared.crlf);
3802 }
3803 }
3804}
3805
3806static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3807 dict **d1 = (void*) s1, **d2 = (void*) s2;
3808
3809 return dictSize(*d1)-dictSize(*d2);
3810}
3811
3812static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3813 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3814 dictIterator *di;
3815 dictEntry *de;
3816 robj *lenobj = NULL, *dstset = NULL;
3817 int j, cardinality = 0;
3818
3819 for (j = 0; j < setsnum; j++) {
3820 robj *setobj;
3821
3822 setobj = dstkey ?
3823 lookupKeyWrite(c->db,setskeys[j]) :
3824 lookupKeyRead(c->db,setskeys[j]);
3825 if (!setobj) {
3826 zfree(dv);
3827 if (dstkey) {
3828 deleteKey(c->db,dstkey);
3829 addReply(c,shared.ok);
3830 } else {
3831 addReply(c,shared.nullmultibulk);
3832 }
3833 return;
3834 }
3835 if (setobj->type != REDIS_SET) {
3836 zfree(dv);
3837 addReply(c,shared.wrongtypeerr);
3838 return;
3839 }
3840 dv[j] = setobj->ptr;
3841 }
3842 /* Sort sets from the smallest to largest, this will improve our
3843 * algorithm's performace */
3844 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3845
3846 /* The first thing we should output is the total number of elements...
3847 * since this is a multi-bulk write, but at this stage we don't know
3848 * the intersection set size, so we use a trick, append an empty object
3849 * to the output list and save the pointer to later modify it with the
3850 * right length */
3851 if (!dstkey) {
3852 lenobj = createObject(REDIS_STRING,NULL);
3853 addReply(c,lenobj);
3854 decrRefCount(lenobj);
3855 } else {
3856 /* If we have a target key where to store the resulting set
3857 * create this key with an empty set inside */
3858 dstset = createSetObject();
3859 }
3860
3861 /* Iterate all the elements of the first (smallest) set, and test
3862 * the element against all the other sets, if at least one set does
3863 * not include the element it is discarded */
3864 di = dictGetIterator(dv[0]);
3865
3866 while((de = dictNext(di)) != NULL) {
3867 robj *ele;
3868
3869 for (j = 1; j < setsnum; j++)
3870 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3871 if (j != setsnum)
3872 continue; /* at least one set does not contain the member */
3873 ele = dictGetEntryKey(de);
3874 if (!dstkey) {
3875 addReplyBulkLen(c,ele);
3876 addReply(c,ele);
3877 addReply(c,shared.crlf);
3878 cardinality++;
3879 } else {
3880 dictAdd(dstset->ptr,ele,NULL);
3881 incrRefCount(ele);
3882 }
3883 }
3884 dictReleaseIterator(di);
3885
3886 if (dstkey) {
3887 /* Store the resulting set into the target */
3888 deleteKey(c->db,dstkey);
3889 dictAdd(c->db->dict,dstkey,dstset);
3890 incrRefCount(dstkey);
3891 }
3892
3893 if (!dstkey) {
3894 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3895 } else {
3896 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3897 dictSize((dict*)dstset->ptr)));
3898 server.dirty++;
3899 }
3900 zfree(dv);
3901}
3902
3903static void sinterCommand(redisClient *c) {
3904 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3905}
3906
3907static void sinterstoreCommand(redisClient *c) {
3908 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3909}
3910
3911#define REDIS_OP_UNION 0
3912#define REDIS_OP_DIFF 1
3913
3914static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3915 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3916 dictIterator *di;
3917 dictEntry *de;
3918 robj *dstset = NULL;
3919 int j, cardinality = 0;
3920
3921 for (j = 0; j < setsnum; j++) {
3922 robj *setobj;
3923
3924 setobj = dstkey ?
3925 lookupKeyWrite(c->db,setskeys[j]) :
3926 lookupKeyRead(c->db,setskeys[j]);
3927 if (!setobj) {
3928 dv[j] = NULL;
3929 continue;
3930 }
3931 if (setobj->type != REDIS_SET) {
3932 zfree(dv);
3933 addReply(c,shared.wrongtypeerr);
3934 return;
3935 }
3936 dv[j] = setobj->ptr;
3937 }
3938
3939 /* We need a temp set object to store our union. If the dstkey
3940 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3941 * this set object will be the resulting object to set into the target key*/
3942 dstset = createSetObject();
3943
3944 /* Iterate all the elements of all the sets, add every element a single
3945 * time to the result set */
3946 for (j = 0; j < setsnum; j++) {
3947 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3948 if (!dv[j]) continue; /* non existing keys are like empty sets */
3949
3950 di = dictGetIterator(dv[j]);
3951
3952 while((de = dictNext(di)) != NULL) {
3953 robj *ele;
3954
3955 /* dictAdd will not add the same element multiple times */
3956 ele = dictGetEntryKey(de);
3957 if (op == REDIS_OP_UNION || j == 0) {
3958 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3959 incrRefCount(ele);
3960 cardinality++;
3961 }
3962 } else if (op == REDIS_OP_DIFF) {
3963 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3964 cardinality--;
3965 }
3966 }
3967 }
3968 dictReleaseIterator(di);
3969
3970 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3971 }
3972
3973 /* Output the content of the resulting set, if not in STORE mode */
3974 if (!dstkey) {
3975 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3976 di = dictGetIterator(dstset->ptr);
3977 while((de = dictNext(di)) != NULL) {
3978 robj *ele;
3979
3980 ele = dictGetEntryKey(de);
3981 addReplyBulkLen(c,ele);
3982 addReply(c,ele);
3983 addReply(c,shared.crlf);
3984 }
3985 dictReleaseIterator(di);
3986 } else {
3987 /* If we have a target key where to store the resulting set
3988 * create this key with the result set inside */
3989 deleteKey(c->db,dstkey);
3990 dictAdd(c->db->dict,dstkey,dstset);
3991 incrRefCount(dstkey);
3992 }
3993
3994 /* Cleanup */
3995 if (!dstkey) {
3996 decrRefCount(dstset);
3997 } else {
3998 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3999 dictSize((dict*)dstset->ptr)));
4000 server.dirty++;
4001 }
4002 zfree(dv);
4003}
4004
4005static void sunionCommand(redisClient *c) {
4006 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4007}
4008
4009static void sunionstoreCommand(redisClient *c) {
4010 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4011}
4012
4013static void sdiffCommand(redisClient *c) {
4014 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4015}
4016
4017static void sdiffstoreCommand(redisClient *c) {
4018 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4019}
4020
4021/* ==================================== ZSets =============================== */
4022
4023/* ZSETs are ordered sets using two data structures to hold the same elements
4024 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4025 * data structure.
4026 *
4027 * The elements are added to an hash table mapping Redis objects to scores.
4028 * At the same time the elements are added to a skip list mapping scores
4029 * to Redis objects (so objects are sorted by scores in this "view"). */
4030
4031/* This skiplist implementation is almost a C translation of the original
4032 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4033 * Alternative to Balanced Trees", modified in three ways:
4034 * a) this implementation allows for repeated values.
4035 * b) the comparison is not just by key (our 'score') but by satellite data.
4036 * c) there is a back pointer, so it's a doubly linked list with the back
4037 * pointers being only at "level 1". This allows to traverse the list
4038 * from tail to head, useful for ZREVRANGE. */
4039
4040static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4041 zskiplistNode *zn = zmalloc(sizeof(*zn));
4042
4043 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4044 zn->score = score;
4045 zn->obj = obj;
4046 return zn;
4047}
4048
4049static zskiplist *zslCreate(void) {
4050 int j;
4051 zskiplist *zsl;
4052
4053 zsl = zmalloc(sizeof(*zsl));
4054 zsl->level = 1;
4055 zsl->length = 0;
4056 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4057 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4058 zsl->header->forward[j] = NULL;
4059 zsl->header->backward = NULL;
4060 zsl->tail = NULL;
4061 return zsl;
4062}
4063
4064static void zslFreeNode(zskiplistNode *node) {
4065 decrRefCount(node->obj);
4066 zfree(node->forward);
4067 zfree(node);
4068}
4069
4070static void zslFree(zskiplist *zsl) {
4071 zskiplistNode *node = zsl->header->forward[0], *next;
4072
4073 zfree(zsl->header->forward);
4074 zfree(zsl->header);
4075 while(node) {
4076 next = node->forward[0];
4077 zslFreeNode(node);
4078 node = next;
4079 }
4080 zfree(zsl);
4081}
4082
4083static int zslRandomLevel(void) {
4084 int level = 1;
4085 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4086 level += 1;
4087 return level;
4088}
4089
4090static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4091 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4092 int i, level;
4093
4094 x = zsl->header;
4095 for (i = zsl->level-1; i >= 0; i--) {
4096 while (x->forward[i] &&
4097 (x->forward[i]->score < score ||
4098 (x->forward[i]->score == score &&
4099 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4100 x = x->forward[i];
4101 update[i] = x;
4102 }
4103 /* we assume the key is not already inside, since we allow duplicated
4104 * scores, and the re-insertion of score and redis object should never
4105 * happpen since the caller of zslInsert() should test in the hash table
4106 * if the element is already inside or not. */
4107 level = zslRandomLevel();
4108 if (level > zsl->level) {
4109 for (i = zsl->level; i < level; i++)
4110 update[i] = zsl->header;
4111 zsl->level = level;
4112 }
4113 x = zslCreateNode(level,score,obj);
4114 for (i = 0; i < level; i++) {
4115 x->forward[i] = update[i]->forward[i];
4116 update[i]->forward[i] = x;
4117 }
4118 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4119 if (x->forward[0])
4120 x->forward[0]->backward = x;
4121 else
4122 zsl->tail = x;
4123 zsl->length++;
4124}
4125
4126/* Delete an element with matching score/object from the skiplist. */
4127static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4128 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4129 int i;
4130
4131 x = zsl->header;
4132 for (i = zsl->level-1; i >= 0; i--) {
4133 while (x->forward[i] &&
4134 (x->forward[i]->score < score ||
4135 (x->forward[i]->score == score &&
4136 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4137 x = x->forward[i];
4138 update[i] = x;
4139 }
4140 /* We may have multiple elements with the same score, what we need
4141 * is to find the element with both the right score and object. */
4142 x = x->forward[0];
4143 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4144 for (i = 0; i < zsl->level; i++) {
4145 if (update[i]->forward[i] != x) break;
4146 update[i]->forward[i] = x->forward[i];
4147 }
4148 if (x->forward[0]) {
4149 x->forward[0]->backward = (x->backward == zsl->header) ?
4150 NULL : x->backward;
4151 } else {
4152 zsl->tail = x->backward;
4153 }
4154 zslFreeNode(x);
4155 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4156 zsl->level--;
4157 zsl->length--;
4158 return 1;
4159 } else {
4160 return 0; /* not found */
4161 }
4162 return 0; /* not found */
4163}
4164
4165/* Delete all the elements with score between min and max from the skiplist.
4166 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4167 * Note that this function takes the reference to the hash table view of the
4168 * sorted set, in order to remove the elements from the hash table too. */
4169static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4170 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4171 unsigned long removed = 0;
4172 int i;
4173
4174 x = zsl->header;
4175 for (i = zsl->level-1; i >= 0; i--) {
4176 while (x->forward[i] && x->forward[i]->score < min)
4177 x = x->forward[i];
4178 update[i] = x;
4179 }
4180 /* We may have multiple elements with the same score, what we need
4181 * is to find the element with both the right score and object. */
4182 x = x->forward[0];
4183 while (x && x->score <= max) {
4184 zskiplistNode *next;
4185
4186 for (i = 0; i < zsl->level; i++) {
4187 if (update[i]->forward[i] != x) break;
4188 update[i]->forward[i] = x->forward[i];
4189 }
4190 if (x->forward[0]) {
4191 x->forward[0]->backward = (x->backward == zsl->header) ?
4192 NULL : x->backward;
4193 } else {
4194 zsl->tail = x->backward;
4195 }
4196 next = x->forward[0];
4197 dictDelete(dict,x->obj);
4198 zslFreeNode(x);
4199 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4200 zsl->level--;
4201 zsl->length--;
4202 removed++;
4203 x = next;
4204 }
4205 return removed; /* not found */
4206}
4207
4208/* Find the first node having a score equal or greater than the specified one.
4209 * Returns NULL if there is no match. */
4210static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4211 zskiplistNode *x;
4212 int i;
4213
4214 x = zsl->header;
4215 for (i = zsl->level-1; i >= 0; i--) {
4216 while (x->forward[i] && x->forward[i]->score < score)
4217 x = x->forward[i];
4218 }
4219 /* We may have multiple elements with the same score, what we need
4220 * is to find the element with both the right score and object. */
4221 return x->forward[0];
4222}
4223
4224/* The actual Z-commands implementations */
4225
4226static void zaddCommand(redisClient *c) {
4227 robj *zsetobj;
4228 zset *zs;
4229 double *score;
4230
4231 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4232 if (zsetobj == NULL) {
4233 zsetobj = createZsetObject();
4234 dictAdd(c->db->dict,c->argv[1],zsetobj);
4235 incrRefCount(c->argv[1]);
4236 } else {
4237 if (zsetobj->type != REDIS_ZSET) {
4238 addReply(c,shared.wrongtypeerr);
4239 return;
4240 }
4241 }
4242 score = zmalloc(sizeof(double));
4243 *score = strtod(c->argv[2]->ptr,NULL);
4244 zs = zsetobj->ptr;
4245 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4246 /* case 1: New element */
4247 incrRefCount(c->argv[3]); /* added to hash */
4248 zslInsert(zs->zsl,*score,c->argv[3]);
4249 incrRefCount(c->argv[3]); /* added to skiplist */
4250 server.dirty++;
4251 addReply(c,shared.cone);
4252 } else {
4253 dictEntry *de;
4254 double *oldscore;
4255
4256 /* case 2: Score update operation */
4257 de = dictFind(zs->dict,c->argv[3]);
4258 assert(de != NULL);
4259 oldscore = dictGetEntryVal(de);
4260 if (*score != *oldscore) {
4261 int deleted;
4262
4263 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4264 assert(deleted != 0);
4265 zslInsert(zs->zsl,*score,c->argv[3]);
4266 incrRefCount(c->argv[3]);
4267 dictReplace(zs->dict,c->argv[3],score);
4268 server.dirty++;
4269 } else {
4270 zfree(score);
4271 }
4272 addReply(c,shared.czero);
4273 }
4274}
4275
4276static void zremCommand(redisClient *c) {
4277 robj *zsetobj;
4278 zset *zs;
4279
4280 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4281 if (zsetobj == NULL) {
4282 addReply(c,shared.czero);
4283 } else {
4284 dictEntry *de;
4285 double *oldscore;
4286 int deleted;
4287
4288 if (zsetobj->type != REDIS_ZSET) {
4289 addReply(c,shared.wrongtypeerr);
4290 return;
4291 }
4292 zs = zsetobj->ptr;
4293 de = dictFind(zs->dict,c->argv[2]);
4294 if (de == NULL) {
4295 addReply(c,shared.czero);
4296 return;
4297 }
4298 /* Delete from the skiplist */
4299 oldscore = dictGetEntryVal(de);
4300 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4301 assert(deleted != 0);
4302
4303 /* Delete from the hash table */
4304 dictDelete(zs->dict,c->argv[2]);
4305 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4306 server.dirty++;
4307 addReply(c,shared.cone);
4308 }
4309}
4310
4311static void zremrangebyscoreCommand(redisClient *c) {
4312 double min = strtod(c->argv[2]->ptr,NULL);
4313 double max = strtod(c->argv[3]->ptr,NULL);
4314 robj *zsetobj;
4315 zset *zs;
4316
4317 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4318 if (zsetobj == NULL) {
4319 addReply(c,shared.czero);
4320 } else {
4321 long deleted;
4322
4323 if (zsetobj->type != REDIS_ZSET) {
4324 addReply(c,shared.wrongtypeerr);
4325 return;
4326 }
4327 zs = zsetobj->ptr;
4328 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4329 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4330 server.dirty += deleted;
4331 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4332 }
4333}
4334
4335static void zrangeGenericCommand(redisClient *c, int reverse) {
4336 robj *o;
4337 int start = atoi(c->argv[2]->ptr);
4338 int end = atoi(c->argv[3]->ptr);
4339
4340 o = lookupKeyRead(c->db,c->argv[1]);
4341 if (o == NULL) {
4342 addReply(c,shared.nullmultibulk);
4343 } else {
4344 if (o->type != REDIS_ZSET) {
4345 addReply(c,shared.wrongtypeerr);
4346 } else {
4347 zset *zsetobj = o->ptr;
4348 zskiplist *zsl = zsetobj->zsl;
4349 zskiplistNode *ln;
4350
4351 int llen = zsl->length;
4352 int rangelen, j;
4353 robj *ele;
4354
4355 /* convert negative indexes */
4356 if (start < 0) start = llen+start;
4357 if (end < 0) end = llen+end;
4358 if (start < 0) start = 0;
4359 if (end < 0) end = 0;
4360
4361 /* indexes sanity checks */
4362 if (start > end || start >= llen) {
4363 /* Out of range start or start > end result in empty list */
4364 addReply(c,shared.emptymultibulk);
4365 return;
4366 }
4367 if (end >= llen) end = llen-1;
4368 rangelen = (end-start)+1;
4369
4370 /* Return the result in form of a multi-bulk reply */
4371 if (reverse) {
4372 ln = zsl->tail;
4373 while (start--)
4374 ln = ln->backward;
4375 } else {
4376 ln = zsl->header->forward[0];
4377 while (start--)
4378 ln = ln->forward[0];
4379 }
4380
4381 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4382 for (j = 0; j < rangelen; j++) {
4383 ele = ln->obj;
4384 addReplyBulkLen(c,ele);
4385 addReply(c,ele);
4386 addReply(c,shared.crlf);
4387 ln = reverse ? ln->backward : ln->forward[0];
4388 }
4389 }
4390 }
4391}
4392
4393static void zrangeCommand(redisClient *c) {
4394 zrangeGenericCommand(c,0);
4395}
4396
4397static void zrevrangeCommand(redisClient *c) {
4398 zrangeGenericCommand(c,1);
4399}
4400
4401static void zrangebyscoreCommand(redisClient *c) {
4402 robj *o;
4403 double min = strtod(c->argv[2]->ptr,NULL);
4404 double max = strtod(c->argv[3]->ptr,NULL);
4405
4406 o = lookupKeyRead(c->db,c->argv[1]);
4407 if (o == NULL) {
4408 addReply(c,shared.nullmultibulk);
4409 } else {
4410 if (o->type != REDIS_ZSET) {
4411 addReply(c,shared.wrongtypeerr);
4412 } else {
4413 zset *zsetobj = o->ptr;
4414 zskiplist *zsl = zsetobj->zsl;
4415 zskiplistNode *ln;
4416 robj *ele, *lenobj;
4417 unsigned int rangelen = 0;
4418
4419 /* Get the first node with the score >= min */
4420 ln = zslFirstWithScore(zsl,min);
4421 if (ln == NULL) {
4422 /* No element matching the speciifed interval */
4423 addReply(c,shared.emptymultibulk);
4424 return;
4425 }
4426
4427 /* We don't know in advance how many matching elements there
4428 * are in the list, so we push this object that will represent
4429 * the multi-bulk length in the output buffer, and will "fix"
4430 * it later */
4431 lenobj = createObject(REDIS_STRING,NULL);
4432 addReply(c,lenobj);
4433
4434 while(ln && ln->score <= max) {
4435 ele = ln->obj;
4436 addReplyBulkLen(c,ele);
4437 addReply(c,ele);
4438 addReply(c,shared.crlf);
4439 ln = ln->forward[0];
4440 rangelen++;
4441 }
4442 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4443 }
4444 }
4445}
4446
4447static void zcardCommand(redisClient *c) {
4448 robj *o;
4449 zset *zs;
4450
4451 o = lookupKeyRead(c->db,c->argv[1]);
4452 if (o == NULL) {
4453 addReply(c,shared.czero);
4454 return;
4455 } else {
4456 if (o->type != REDIS_ZSET) {
4457 addReply(c,shared.wrongtypeerr);
4458 } else {
4459 zs = o->ptr;
4460 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4461 }
4462 }
4463}
4464
4465static void zscoreCommand(redisClient *c) {
4466 robj *o;
4467 zset *zs;
4468
4469 o = lookupKeyRead(c->db,c->argv[1]);
4470 if (o == NULL) {
4471 addReply(c,shared.nullbulk);
4472 return;
4473 } else {
4474 if (o->type != REDIS_ZSET) {
4475 addReply(c,shared.wrongtypeerr);
4476 } else {
4477 dictEntry *de;
4478
4479 zs = o->ptr;
4480 de = dictFind(zs->dict,c->argv[2]);
4481 if (!de) {
4482 addReply(c,shared.nullbulk);
4483 } else {
4484 char buf[128];
4485 double *score = dictGetEntryVal(de);
4486
4487 snprintf(buf,sizeof(buf),"%.17g",*score);
4488 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4489 strlen(buf),buf));
4490 }
4491 }
4492 }
4493}
4494
4495/* ========================= Non type-specific commands ==================== */
4496
4497static void flushdbCommand(redisClient *c) {
4498 server.dirty += dictSize(c->db->dict);
4499 dictEmpty(c->db->dict);
4500 dictEmpty(c->db->expires);
4501 addReply(c,shared.ok);
4502}
4503
4504static void flushallCommand(redisClient *c) {
4505 server.dirty += emptyDb();
4506 addReply(c,shared.ok);
4507 rdbSave(server.dbfilename);
4508 server.dirty++;
4509}
4510
4511static redisSortOperation *createSortOperation(int type, robj *pattern) {
4512 redisSortOperation *so = zmalloc(sizeof(*so));
4513 so->type = type;
4514 so->pattern = pattern;
4515 return so;
4516}
4517
4518/* Return the value associated to the key with a name obtained
4519 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4520static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4521 char *p;
4522 sds spat, ssub;
4523 robj keyobj;
4524 int prefixlen, sublen, postfixlen;
4525 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4526 struct {
4527 long len;
4528 long free;
4529 char buf[REDIS_SORTKEY_MAX+1];
4530 } keyname;
4531
4532 if (subst->encoding == REDIS_ENCODING_RAW)
4533 incrRefCount(subst);
4534 else {
4535 subst = getDecodedObject(subst);
4536 }
4537
4538 spat = pattern->ptr;
4539 ssub = subst->ptr;
4540 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4541 p = strchr(spat,'*');
4542 if (!p) return NULL;
4543
4544 prefixlen = p-spat;
4545 sublen = sdslen(ssub);
4546 postfixlen = sdslen(spat)-(prefixlen+1);
4547 memcpy(keyname.buf,spat,prefixlen);
4548 memcpy(keyname.buf+prefixlen,ssub,sublen);
4549 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4550 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4551 keyname.len = prefixlen+sublen+postfixlen;
4552
4553 keyobj.refcount = 1;
4554 keyobj.type = REDIS_STRING;
4555 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4556
4557 decrRefCount(subst);
4558
4559 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4560 return lookupKeyRead(db,&keyobj);
4561}
4562
4563/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4564 * the additional parameter is not standard but a BSD-specific we have to
4565 * pass sorting parameters via the global 'server' structure */
4566static int sortCompare(const void *s1, const void *s2) {
4567 const redisSortObject *so1 = s1, *so2 = s2;
4568 int cmp;
4569
4570 if (!server.sort_alpha) {
4571 /* Numeric sorting. Here it's trivial as we precomputed scores */
4572 if (so1->u.score > so2->u.score) {
4573 cmp = 1;
4574 } else if (so1->u.score < so2->u.score) {
4575 cmp = -1;
4576 } else {
4577 cmp = 0;
4578 }
4579 } else {
4580 /* Alphanumeric sorting */
4581 if (server.sort_bypattern) {
4582 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4583 /* At least one compare object is NULL */
4584 if (so1->u.cmpobj == so2->u.cmpobj)
4585 cmp = 0;
4586 else if (so1->u.cmpobj == NULL)
4587 cmp = -1;
4588 else
4589 cmp = 1;
4590 } else {
4591 /* We have both the objects, use strcoll */
4592 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4593 }
4594 } else {
4595 /* Compare elements directly */
4596 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4597 so2->obj->encoding == REDIS_ENCODING_RAW) {
4598 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4599 } else {
4600 robj *dec1, *dec2;
4601
4602 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4603 so1->obj : getDecodedObject(so1->obj);
4604 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4605 so2->obj : getDecodedObject(so2->obj);
4606 cmp = strcoll(dec1->ptr,dec2->ptr);
4607 if (dec1 != so1->obj) decrRefCount(dec1);
4608 if (dec2 != so2->obj) decrRefCount(dec2);
4609 }
4610 }
4611 }
4612 return server.sort_desc ? -cmp : cmp;
4613}
4614
4615/* The SORT command is the most complex command in Redis. Warning: this code
4616 * is optimized for speed and a bit less for readability */
4617static void sortCommand(redisClient *c) {
4618 list *operations;
4619 int outputlen = 0;
4620 int desc = 0, alpha = 0;
4621 int limit_start = 0, limit_count = -1, start, end;
4622 int j, dontsort = 0, vectorlen;
4623 int getop = 0; /* GET operation counter */
4624 robj *sortval, *sortby = NULL, *storekey = NULL;
4625 redisSortObject *vector; /* Resulting vector to sort */
4626
4627 /* Lookup the key to sort. It must be of the right types */
4628 sortval = lookupKeyRead(c->db,c->argv[1]);
4629 if (sortval == NULL) {
4630 addReply(c,shared.nokeyerr);
4631 return;
4632 }
4633 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4634 addReply(c,shared.wrongtypeerr);
4635 return;
4636 }
4637
4638 /* Create a list of operations to perform for every sorted element.
4639 * Operations can be GET/DEL/INCR/DECR */
4640 operations = listCreate();
4641 listSetFreeMethod(operations,zfree);
4642 j = 2;
4643
4644 /* Now we need to protect sortval incrementing its count, in the future
4645 * SORT may have options able to overwrite/delete keys during the sorting
4646 * and the sorted key itself may get destroied */
4647 incrRefCount(sortval);
4648
4649 /* The SORT command has an SQL-alike syntax, parse it */
4650 while(j < c->argc) {
4651 int leftargs = c->argc-j-1;
4652 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4653 desc = 0;
4654 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4655 desc = 1;
4656 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4657 alpha = 1;
4658 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4659 limit_start = atoi(c->argv[j+1]->ptr);
4660 limit_count = atoi(c->argv[j+2]->ptr);
4661 j+=2;
4662 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4663 storekey = c->argv[j+1];
4664 j++;
4665 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4666 sortby = c->argv[j+1];
4667 /* If the BY pattern does not contain '*', i.e. it is constant,
4668 * we don't need to sort nor to lookup the weight keys. */
4669 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4670 j++;
4671 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4672 listAddNodeTail(operations,createSortOperation(
4673 REDIS_SORT_GET,c->argv[j+1]));
4674 getop++;
4675 j++;
4676 } else {
4677 decrRefCount(sortval);
4678 listRelease(operations);
4679 addReply(c,shared.syntaxerr);
4680 return;
4681 }
4682 j++;
4683 }
4684
4685 /* Load the sorting vector with all the objects to sort */
4686 vectorlen = (sortval->type == REDIS_LIST) ?
4687 listLength((list*)sortval->ptr) :
4688 dictSize((dict*)sortval->ptr);
4689 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4690 j = 0;
4691 if (sortval->type == REDIS_LIST) {
4692 list *list = sortval->ptr;
4693 listNode *ln;
4694
4695 listRewind(list);
4696 while((ln = listYield(list))) {
4697 robj *ele = ln->value;
4698 vector[j].obj = ele;
4699 vector[j].u.score = 0;
4700 vector[j].u.cmpobj = NULL;
4701 j++;
4702 }
4703 } else {
4704 dict *set = sortval->ptr;
4705 dictIterator *di;
4706 dictEntry *setele;
4707
4708 di = dictGetIterator(set);
4709 while((setele = dictNext(di)) != NULL) {
4710 vector[j].obj = dictGetEntryKey(setele);
4711 vector[j].u.score = 0;
4712 vector[j].u.cmpobj = NULL;
4713 j++;
4714 }
4715 dictReleaseIterator(di);
4716 }
4717 assert(j == vectorlen);
4718
4719 /* Now it's time to load the right scores in the sorting vector */
4720 if (dontsort == 0) {
4721 for (j = 0; j < vectorlen; j++) {
4722 if (sortby) {
4723 robj *byval;
4724
4725 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4726 if (!byval || byval->type != REDIS_STRING) continue;
4727 if (alpha) {
4728 if (byval->encoding == REDIS_ENCODING_RAW) {
4729 vector[j].u.cmpobj = byval;
4730 incrRefCount(byval);
4731 } else {
4732 vector[j].u.cmpobj = getDecodedObject(byval);
4733 }
4734 } else {
4735 if (byval->encoding == REDIS_ENCODING_RAW) {
4736 vector[j].u.score = strtod(byval->ptr,NULL);
4737 } else {
4738 if (byval->encoding == REDIS_ENCODING_INT) {
4739 vector[j].u.score = (long)byval->ptr;
4740 } else
4741 assert(1 != 1);
4742 }
4743 }
4744 } else {
4745 if (!alpha) {
4746 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4747 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4748 else {
4749 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4750 vector[j].u.score = (long) vector[j].obj->ptr;
4751 else
4752 assert(1 != 1);
4753 }
4754 }
4755 }
4756 }
4757 }
4758
4759 /* We are ready to sort the vector... perform a bit of sanity check
4760 * on the LIMIT option too. We'll use a partial version of quicksort. */
4761 start = (limit_start < 0) ? 0 : limit_start;
4762 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4763 if (start >= vectorlen) {
4764 start = vectorlen-1;
4765 end = vectorlen-2;
4766 }
4767 if (end >= vectorlen) end = vectorlen-1;
4768
4769 if (dontsort == 0) {
4770 server.sort_desc = desc;
4771 server.sort_alpha = alpha;
4772 server.sort_bypattern = sortby ? 1 : 0;
4773 if (sortby && (start != 0 || end != vectorlen-1))
4774 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4775 else
4776 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4777 }
4778
4779 /* Send command output to the output buffer, performing the specified
4780 * GET/DEL/INCR/DECR operations if any. */
4781 outputlen = getop ? getop*(end-start+1) : end-start+1;
4782 if (storekey == NULL) {
4783 /* STORE option not specified, sent the sorting result to client */
4784 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4785 for (j = start; j <= end; j++) {
4786 listNode *ln;
4787 if (!getop) {
4788 addReplyBulkLen(c,vector[j].obj);
4789 addReply(c,vector[j].obj);
4790 addReply(c,shared.crlf);
4791 }
4792 listRewind(operations);
4793 while((ln = listYield(operations))) {
4794 redisSortOperation *sop = ln->value;
4795 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4796 vector[j].obj);
4797
4798 if (sop->type == REDIS_SORT_GET) {
4799 if (!val || val->type != REDIS_STRING) {
4800 addReply(c,shared.nullbulk);
4801 } else {
4802 addReplyBulkLen(c,val);
4803 addReply(c,val);
4804 addReply(c,shared.crlf);
4805 }
4806 } else {
4807 assert(sop->type == REDIS_SORT_GET); /* always fails */
4808 }
4809 }
4810 }
4811 } else {
4812 robj *listObject = createListObject();
4813 list *listPtr = (list*) listObject->ptr;
4814
4815 /* STORE option specified, set the sorting result as a List object */
4816 for (j = start; j <= end; j++) {
4817 listNode *ln;
4818 if (!getop) {
4819 listAddNodeTail(listPtr,vector[j].obj);
4820 incrRefCount(vector[j].obj);
4821 }
4822 listRewind(operations);
4823 while((ln = listYield(operations))) {
4824 redisSortOperation *sop = ln->value;
4825 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4826 vector[j].obj);
4827
4828 if (sop->type == REDIS_SORT_GET) {
4829 if (!val || val->type != REDIS_STRING) {
4830 listAddNodeTail(listPtr,createStringObject("",0));
4831 } else {
4832 listAddNodeTail(listPtr,val);
4833 incrRefCount(val);
4834 }
4835 } else {
4836 assert(sop->type == REDIS_SORT_GET); /* always fails */
4837 }
4838 }
4839 }
4840 if (dictReplace(c->db->dict,storekey,listObject)) {
4841 incrRefCount(storekey);
4842 }
4843 /* Note: we add 1 because the DB is dirty anyway since even if the
4844 * SORT result is empty a new key is set and maybe the old content
4845 * replaced. */
4846 server.dirty += 1+outputlen;
4847 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4848 }
4849
4850 /* Cleanup */
4851 decrRefCount(sortval);
4852 listRelease(operations);
4853 for (j = 0; j < vectorlen; j++) {
4854 if (sortby && alpha && vector[j].u.cmpobj)
4855 decrRefCount(vector[j].u.cmpobj);
4856 }
4857 zfree(vector);
4858}
4859
4860static void infoCommand(redisClient *c) {
4861 sds info;
4862 time_t uptime = time(NULL)-server.stat_starttime;
4863 int j;
4864
4865 info = sdscatprintf(sdsempty(),
4866 "redis_version:%s\r\n"
4867 "arch_bits:%s\r\n"
4868 "uptime_in_seconds:%d\r\n"
4869 "uptime_in_days:%d\r\n"
4870 "connected_clients:%d\r\n"
4871 "connected_slaves:%d\r\n"
4872 "used_memory:%zu\r\n"
4873 "changes_since_last_save:%lld\r\n"
4874 "bgsave_in_progress:%d\r\n"
4875 "last_save_time:%d\r\n"
4876 "total_connections_received:%lld\r\n"
4877 "total_commands_processed:%lld\r\n"
4878 "role:%s\r\n"
4879 ,REDIS_VERSION,
4880 (sizeof(long) == 8) ? "64" : "32",
4881 uptime,
4882 uptime/(3600*24),
4883 listLength(server.clients)-listLength(server.slaves),
4884 listLength(server.slaves),
4885 server.usedmemory,
4886 server.dirty,
4887 server.bgsaveinprogress,
4888 server.lastsave,
4889 server.stat_numconnections,
4890 server.stat_numcommands,
4891 server.masterhost == NULL ? "master" : "slave"
4892 );
4893 if (server.masterhost) {
4894 info = sdscatprintf(info,
4895 "master_host:%s\r\n"
4896 "master_port:%d\r\n"
4897 "master_link_status:%s\r\n"
4898 "master_last_io_seconds_ago:%d\r\n"
4899 ,server.masterhost,
4900 server.masterport,
4901 (server.replstate == REDIS_REPL_CONNECTED) ?
4902 "up" : "down",
4903 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4904 );
4905 }
4906 for (j = 0; j < server.dbnum; j++) {
4907 long long keys, vkeys;
4908
4909 keys = dictSize(server.db[j].dict);
4910 vkeys = dictSize(server.db[j].expires);
4911 if (keys || vkeys) {
4912 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4913 j, keys, vkeys);
4914 }
4915 }
4916 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4917 addReplySds(c,info);
4918 addReply(c,shared.crlf);
4919}
4920
4921static void monitorCommand(redisClient *c) {
4922 /* ignore MONITOR if aleady slave or in monitor mode */
4923 if (c->flags & REDIS_SLAVE) return;
4924
4925 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4926 c->slaveseldb = 0;
4927 listAddNodeTail(server.monitors,c);
4928 addReply(c,shared.ok);
4929}
4930
4931/* ================================= Expire ================================= */
4932static int removeExpire(redisDb *db, robj *key) {
4933 if (dictDelete(db->expires,key) == DICT_OK) {
4934 return 1;
4935 } else {
4936 return 0;
4937 }
4938}
4939
4940static int setExpire(redisDb *db, robj *key, time_t when) {
4941 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4942 return 0;
4943 } else {
4944 incrRefCount(key);
4945 return 1;
4946 }
4947}
4948
4949/* Return the expire time of the specified key, or -1 if no expire
4950 * is associated with this key (i.e. the key is non volatile) */
4951static time_t getExpire(redisDb *db, robj *key) {
4952 dictEntry *de;
4953
4954 /* No expire? return ASAP */
4955 if (dictSize(db->expires) == 0 ||
4956 (de = dictFind(db->expires,key)) == NULL) return -1;
4957
4958 return (time_t) dictGetEntryVal(de);
4959}
4960
4961static int expireIfNeeded(redisDb *db, robj *key) {
4962 time_t when;
4963 dictEntry *de;
4964
4965 /* No expire? return ASAP */
4966 if (dictSize(db->expires) == 0 ||
4967 (de = dictFind(db->expires,key)) == NULL) return 0;
4968
4969 /* Lookup the expire */
4970 when = (time_t) dictGetEntryVal(de);
4971 if (time(NULL) <= when) return 0;
4972
4973 /* Delete the key */
4974 dictDelete(db->expires,key);
4975 return dictDelete(db->dict,key) == DICT_OK;
4976}
4977
4978static int deleteIfVolatile(redisDb *db, robj *key) {
4979 dictEntry *de;
4980
4981 /* No expire? return ASAP */
4982 if (dictSize(db->expires) == 0 ||
4983 (de = dictFind(db->expires,key)) == NULL) return 0;
4984
4985 /* Delete the key */
4986 server.dirty++;
4987 dictDelete(db->expires,key);
4988 return dictDelete(db->dict,key) == DICT_OK;
4989}
4990
4991static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4992 dictEntry *de;
4993
4994 de = dictFind(c->db->dict,key);
4995 if (de == NULL) {
4996 addReply(c,shared.czero);
4997 return;
4998 }
4999 if (seconds < 0) {
5000 if (deleteKey(c->db,key)) server.dirty++;
5001 addReply(c, shared.cone);
5002 return;
5003 } else {
5004 time_t when = time(NULL)+seconds;
5005 if (setExpire(c->db,key,when)) {
5006 addReply(c,shared.cone);
5007 server.dirty++;
5008 } else {
5009 addReply(c,shared.czero);
5010 }
5011 return;
5012 }
5013}
5014
5015static void expireCommand(redisClient *c) {
5016 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5017}
5018
5019static void expireatCommand(redisClient *c) {
5020 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5021}
5022
5023static void ttlCommand(redisClient *c) {
5024 time_t expire;
5025 int ttl = -1;
5026
5027 expire = getExpire(c->db,c->argv[1]);
5028 if (expire != -1) {
5029 ttl = (int) (expire-time(NULL));
5030 if (ttl < 0) ttl = -1;
5031 }
5032 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5033}
5034
5035static void msetGenericCommand(redisClient *c, int nx) {
5036 int j;
5037
5038 if ((c->argc % 2) == 0) {
5039 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
5040 return;
5041 }
5042 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
5043 * set nothing at all if at least one already key exists. */
5044 if (nx) {
5045 for (j = 1; j < c->argc; j += 2) {
5046 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
5047 addReply(c, shared.czero);
5048 return;
5049 }
5050 }
5051 }
5052
5053 for (j = 1; j < c->argc; j += 2) {
5054 int retval;
5055
5056 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
5057 if (retval == DICT_ERR) {
5058 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
5059 incrRefCount(c->argv[j+1]);
5060 } else {
5061 incrRefCount(c->argv[j]);
5062 incrRefCount(c->argv[j+1]);
5063 }
5064 removeExpire(c->db,c->argv[j]);
5065 }
5066 server.dirty += (c->argc-1)/2;
5067 addReply(c, nx ? shared.cone : shared.ok);
5068}
5069
5070static void msetCommand(redisClient *c) {
5071 msetGenericCommand(c,0);
5072}
5073
5074static void msetnxCommand(redisClient *c) {
5075 msetGenericCommand(c,1);
5076}
5077
5078/* =============================== Replication ============================= */
5079
5080static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5081 ssize_t nwritten, ret = size;
5082 time_t start = time(NULL);
5083
5084 timeout++;
5085 while(size) {
5086 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5087 nwritten = write(fd,ptr,size);
5088 if (nwritten == -1) return -1;
5089 ptr += nwritten;
5090 size -= nwritten;
5091 }
5092 if ((time(NULL)-start) > timeout) {
5093 errno = ETIMEDOUT;
5094 return -1;
5095 }
5096 }
5097 return ret;
5098}
5099
5100static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5101 ssize_t nread, totread = 0;
5102 time_t start = time(NULL);
5103
5104 timeout++;
5105 while(size) {
5106 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5107 nread = read(fd,ptr,size);
5108 if (nread == -1) return -1;
5109 ptr += nread;
5110 size -= nread;
5111 totread += nread;
5112 }
5113 if ((time(NULL)-start) > timeout) {
5114 errno = ETIMEDOUT;
5115 return -1;
5116 }
5117 }
5118 return totread;
5119}
5120
5121static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5122 ssize_t nread = 0;
5123
5124 size--;
5125 while(size) {
5126 char c;
5127
5128 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5129 if (c == '\n') {
5130 *ptr = '\0';
5131 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5132 return nread;
5133 } else {
5134 *ptr++ = c;
5135 *ptr = '\0';
5136 nread++;
5137 }
5138 }
5139 return nread;
5140}
5141
5142static void syncCommand(redisClient *c) {
5143 /* ignore SYNC if aleady slave or in monitor mode */
5144 if (c->flags & REDIS_SLAVE) return;
5145
5146 /* SYNC can't be issued when the server has pending data to send to
5147 * the client about already issued commands. We need a fresh reply
5148 * buffer registering the differences between the BGSAVE and the current
5149 * dataset, so that we can copy to other slaves if needed. */
5150 if (listLength(c->reply) != 0) {
5151 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5152 return;
5153 }
5154
5155 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5156 /* Here we need to check if there is a background saving operation
5157 * in progress, or if it is required to start one */
5158 if (server.bgsaveinprogress) {
5159 /* Ok a background save is in progress. Let's check if it is a good
5160 * one for replication, i.e. if there is another slave that is
5161 * registering differences since the server forked to save */
5162 redisClient *slave;
5163 listNode *ln;
5164
5165 listRewind(server.slaves);
5166 while((ln = listYield(server.slaves))) {
5167 slave = ln->value;
5168 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5169 }
5170 if (ln) {
5171 /* Perfect, the server is already registering differences for
5172 * another slave. Set the right state, and copy the buffer. */
5173 listRelease(c->reply);
5174 c->reply = listDup(slave->reply);
5175 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5176 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5177 } else {
5178 /* No way, we need to wait for the next BGSAVE in order to
5179 * register differences */
5180 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5181 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5182 }
5183 } else {
5184 /* Ok we don't have a BGSAVE in progress, let's start one */
5185 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5186 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5187 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5188 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5189 return;
5190 }
5191 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5192 }
5193 c->repldbfd = -1;
5194 c->flags |= REDIS_SLAVE;
5195 c->slaveseldb = 0;
5196 listAddNodeTail(server.slaves,c);
5197 return;
5198}
5199
5200static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5201 redisClient *slave = privdata;
5202 REDIS_NOTUSED(el);
5203 REDIS_NOTUSED(mask);
5204 char buf[REDIS_IOBUF_LEN];
5205 ssize_t nwritten, buflen;
5206
5207 if (slave->repldboff == 0) {
5208 /* Write the bulk write count before to transfer the DB. In theory here
5209 * we don't know how much room there is in the output buffer of the
5210 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5211 * operations) will never be smaller than the few bytes we need. */
5212 sds bulkcount;
5213
5214 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5215 slave->repldbsize);
5216 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5217 {
5218 sdsfree(bulkcount);
5219 freeClient(slave);
5220 return;
5221 }
5222 sdsfree(bulkcount);
5223 }
5224 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5225 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5226 if (buflen <= 0) {
5227 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5228 (buflen == 0) ? "premature EOF" : strerror(errno));
5229 freeClient(slave);
5230 return;
5231 }
5232 if ((nwritten = write(fd,buf,buflen)) == -1) {
5233 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5234 strerror(errno));
5235 freeClient(slave);
5236 return;
5237 }
5238 slave->repldboff += nwritten;
5239 if (slave->repldboff == slave->repldbsize) {
5240 close(slave->repldbfd);
5241 slave->repldbfd = -1;
5242 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5243 slave->replstate = REDIS_REPL_ONLINE;
5244 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5245 sendReplyToClient, slave, NULL) == AE_ERR) {
5246 freeClient(slave);
5247 return;
5248 }
5249 addReplySds(slave,sdsempty());
5250 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5251 }
5252}
5253
5254/* This function is called at the end of every backgrond saving.
5255 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5256 * otherwise REDIS_ERR is passed to the function.
5257 *
5258 * The goal of this function is to handle slaves waiting for a successful
5259 * background saving in order to perform non-blocking synchronization. */
5260static void updateSlavesWaitingBgsave(int bgsaveerr) {
5261 listNode *ln;
5262 int startbgsave = 0;
5263
5264 listRewind(server.slaves);
5265 while((ln = listYield(server.slaves))) {
5266 redisClient *slave = ln->value;
5267
5268 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5269 startbgsave = 1;
5270 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5271 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5272 struct redis_stat buf;
5273
5274 if (bgsaveerr != REDIS_OK) {
5275 freeClient(slave);
5276 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5277 continue;
5278 }
5279 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5280 redis_fstat(slave->repldbfd,&buf) == -1) {
5281 freeClient(slave);
5282 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5283 continue;
5284 }
5285 slave->repldboff = 0;
5286 slave->repldbsize = buf.st_size;
5287 slave->replstate = REDIS_REPL_SEND_BULK;
5288 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5289 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5290 freeClient(slave);
5291 continue;
5292 }
5293 }
5294 }
5295 if (startbgsave) {
5296 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5297 listRewind(server.slaves);
5298 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5299 while((ln = listYield(server.slaves))) {
5300 redisClient *slave = ln->value;
5301
5302 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5303 freeClient(slave);
5304 }
5305 }
5306 }
5307}
5308
5309static int syncWithMaster(void) {
5310 char buf[1024], tmpfile[256], authcmd[1024];
5311 int dumpsize;
5312 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5313 int dfd;
5314
5315 if (fd == -1) {
5316 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5317 strerror(errno));
5318 return REDIS_ERR;
5319 }
5320
5321 /* AUTH with the master if required. */
5322 if(server.masterauth) {
5323 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5324 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5325 close(fd);
5326 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5327 strerror(errno));
5328 return REDIS_ERR;
5329 }
5330 /* Read the AUTH result. */
5331 if (syncReadLine(fd,buf,1024,3600) == -1) {
5332 close(fd);
5333 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5334 strerror(errno));
5335 return REDIS_ERR;
5336 }
5337 if (buf[0] != '+') {
5338 close(fd);
5339 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5340 return REDIS_ERR;
5341 }
5342 }
5343
5344 /* Issue the SYNC command */
5345 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5346 close(fd);
5347 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5348 strerror(errno));
5349 return REDIS_ERR;
5350 }
5351 /* Read the bulk write count */
5352 if (syncReadLine(fd,buf,1024,3600) == -1) {
5353 close(fd);
5354 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5355 strerror(errno));
5356 return REDIS_ERR;
5357 }
5358 if (buf[0] != '$') {
5359 close(fd);
5360 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5361 return REDIS_ERR;
5362 }
5363 dumpsize = atoi(buf+1);
5364 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5365 /* Read the bulk write data on a temp file */
5366 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5367 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5368 if (dfd == -1) {
5369 close(fd);
5370 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5371 return REDIS_ERR;
5372 }
5373 while(dumpsize) {
5374 int nread, nwritten;
5375
5376 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5377 if (nread == -1) {
5378 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5379 strerror(errno));
5380 close(fd);
5381 close(dfd);
5382 return REDIS_ERR;
5383 }
5384 nwritten = write(dfd,buf,nread);
5385 if (nwritten == -1) {
5386 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5387 close(fd);
5388 close(dfd);
5389 return REDIS_ERR;
5390 }
5391 dumpsize -= nread;
5392 }
5393 close(dfd);
5394 if (rename(tmpfile,server.dbfilename) == -1) {
5395 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5396 unlink(tmpfile);
5397 close(fd);
5398 return REDIS_ERR;
5399 }
5400 emptyDb();
5401 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5402 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5403 close(fd);
5404 return REDIS_ERR;
5405 }
5406 server.master = createClient(fd);
5407 server.master->flags |= REDIS_MASTER;
5408 server.replstate = REDIS_REPL_CONNECTED;
5409 return REDIS_OK;
5410}
5411
5412static void slaveofCommand(redisClient *c) {
5413 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5414 !strcasecmp(c->argv[2]->ptr,"one")) {
5415 if (server.masterhost) {
5416 sdsfree(server.masterhost);
5417 server.masterhost = NULL;
5418 if (server.master) freeClient(server.master);
5419 server.replstate = REDIS_REPL_NONE;
5420 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5421 }
5422 } else {
5423 sdsfree(server.masterhost);
5424 server.masterhost = sdsdup(c->argv[1]->ptr);
5425 server.masterport = atoi(c->argv[2]->ptr);
5426 if (server.master) freeClient(server.master);
5427 server.replstate = REDIS_REPL_CONNECT;
5428 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5429 server.masterhost, server.masterport);
5430 }
5431 addReply(c,shared.ok);
5432}
5433
5434/* ============================ Maxmemory directive ======================== */
5435
5436/* This function gets called when 'maxmemory' is set on the config file to limit
5437 * the max memory used by the server, and we are out of memory.
5438 * This function will try to, in order:
5439 *
5440 * - Free objects from the free list
5441 * - Try to remove keys with an EXPIRE set
5442 *
5443 * It is not possible to free enough memory to reach used-memory < maxmemory
5444 * the server will start refusing commands that will enlarge even more the
5445 * memory usage.
5446 */
5447static void freeMemoryIfNeeded(void) {
5448 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5449 if (listLength(server.objfreelist)) {
5450 robj *o;
5451
5452 listNode *head = listFirst(server.objfreelist);
5453 o = listNodeValue(head);
5454 listDelNode(server.objfreelist,head);
5455 zfree(o);
5456 } else {
5457 int j, k, freed = 0;
5458
5459 for (j = 0; j < server.dbnum; j++) {
5460 int minttl = -1;
5461 robj *minkey = NULL;
5462 struct dictEntry *de;
5463
5464 if (dictSize(server.db[j].expires)) {
5465 freed = 1;
5466 /* From a sample of three keys drop the one nearest to
5467 * the natural expire */
5468 for (k = 0; k < 3; k++) {
5469 time_t t;
5470
5471 de = dictGetRandomKey(server.db[j].expires);
5472 t = (time_t) dictGetEntryVal(de);
5473 if (minttl == -1 || t < minttl) {
5474 minkey = dictGetEntryKey(de);
5475 minttl = t;
5476 }
5477 }
5478 deleteKey(server.db+j,minkey);
5479 }
5480 }
5481 if (!freed) return; /* nothing to free... */
5482 }
5483 }
5484}
5485
5486/* ============================== Append Only file ========================== */
5487
5488static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5489 sds buf = sdsempty();
5490 int j;
5491 ssize_t nwritten;
5492 time_t now;
5493 robj *tmpargv[3];
5494
5495 /* The DB this command was targetting is not the same as the last command
5496 * we appendend. To issue a SELECT command is needed. */
5497 if (dictid != server.appendseldb) {
5498 char seldb[64];
5499
5500 snprintf(seldb,sizeof(seldb),"%d",dictid);
5501 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5502 strlen(seldb),seldb);
5503 server.appendseldb = dictid;
5504 }
5505
5506 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5507 * EXPIREs into EXPIREATs calls */
5508 if (cmd->proc == expireCommand) {
5509 long when;
5510
5511 tmpargv[0] = createStringObject("EXPIREAT",8);
5512 tmpargv[1] = argv[1];
5513 incrRefCount(argv[1]);
5514 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5515 tmpargv[2] = createObject(REDIS_STRING,
5516 sdscatprintf(sdsempty(),"%ld",when));
5517 argv = tmpargv;
5518 }
5519
5520 /* Append the actual command */
5521 buf = sdscatprintf(buf,"*%d\r\n",argc);
5522 for (j = 0; j < argc; j++) {
5523 robj *o = argv[j];
5524
5525 if (o->encoding != REDIS_ENCODING_RAW)
5526 o = getDecodedObject(o);
5527 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5528 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5529 buf = sdscatlen(buf,"\r\n",2);
5530 if (o != argv[j])
5531 decrRefCount(o);
5532 }
5533
5534 /* Free the objects from the modified argv for EXPIREAT */
5535 if (cmd->proc == expireCommand) {
5536 for (j = 0; j < 3; j++)
5537 decrRefCount(argv[j]);
5538 }
5539
5540 /* We want to perform a single write. This should be guaranteed atomic
5541 * at least if the filesystem we are writing is a real physical one.
5542 * While this will save us against the server being killed I don't think
5543 * there is much to do about the whole server stopping for power problems
5544 * or alike */
5545 nwritten = write(server.appendfd,buf,sdslen(buf));
5546 if (nwritten != (signed)sdslen(buf)) {
5547 /* Ooops, we are in troubles. The best thing to do for now is
5548 * to simply exit instead to give the illusion that everything is
5549 * working as expected. */
5550 if (nwritten == -1) {
5551 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5552 } else {
5553 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5554 }
5555 exit(1);
5556 }
5557 now = time(NULL);
5558 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5559 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5560 now-server.lastfsync > 1))
5561 {
5562 fsync(server.appendfd); /* Let's try to get this data on the disk */
5563 server.lastfsync = now;
5564 }
5565}
5566
5567/* In Redis commands are always executed in the context of a client, so in
5568 * order to load the append only file we need to create a fake client. */
5569static struct redisClient *createFakeClient(void) {
5570 struct redisClient *c = zmalloc(sizeof(*c));
5571
5572 selectDb(c,0);
5573 c->fd = -1;
5574 c->querybuf = sdsempty();
5575 c->argc = 0;
5576 c->argv = NULL;
5577 c->flags = 0;
5578 /* We set the fake client as a slave waiting for the synchronization
5579 * so that Redis will not try to send replies to this client. */
5580 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5581 c->reply = listCreate();
5582 listSetFreeMethod(c->reply,decrRefCount);
5583 listSetDupMethod(c->reply,dupClientReplyValue);
5584 return c;
5585}
5586
5587static void freeFakeClient(struct redisClient *c) {
5588 sdsfree(c->querybuf);
5589 listRelease(c->reply);
5590 zfree(c);
5591}
5592
5593/* Replay the append log file. On error REDIS_OK is returned. On non fatal
5594 * error (the append only file is zero-length) REDIS_ERR is returned. On
5595 * fatal error an error message is logged and the program exists. */
5596int loadAppendOnlyFile(char *filename) {
5597 struct redisClient *fakeClient;
5598 FILE *fp = fopen(filename,"r");
5599 struct redis_stat sb;
5600
5601 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5602 return REDIS_ERR;
5603
5604 if (fp == NULL) {
5605 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5606 exit(1);
5607 }
5608
5609 fakeClient = createFakeClient();
5610 while(1) {
5611 int argc, j;
5612 unsigned long len;
5613 robj **argv;
5614 char buf[128];
5615 sds argsds;
5616 struct redisCommand *cmd;
5617
5618 if (fgets(buf,sizeof(buf),fp) == NULL) {
5619 if (feof(fp))
5620 break;
5621 else
5622 goto readerr;
5623 }
5624 if (buf[0] != '*') goto fmterr;
5625 argc = atoi(buf+1);
5626 argv = zmalloc(sizeof(robj*)*argc);
5627 for (j = 0; j < argc; j++) {
5628 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5629 if (buf[0] != '$') goto fmterr;
5630 len = strtol(buf+1,NULL,10);
5631 argsds = sdsnewlen(NULL,len);
5632 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5633 argv[j] = createObject(REDIS_STRING,argsds);
5634 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5635 }
5636
5637 /* Command lookup */
5638 cmd = lookupCommand(argv[0]->ptr);
5639 if (!cmd) {
5640 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5641 exit(1);
5642 }
5643 /* Try object sharing and encoding */
5644 if (server.shareobjects) {
5645 int j;
5646 for(j = 1; j < argc; j++)
5647 argv[j] = tryObjectSharing(argv[j]);
5648 }
5649 if (cmd->flags & REDIS_CMD_BULK)
5650 tryObjectEncoding(argv[argc-1]);
5651 /* Run the command in the context of a fake client */
5652 fakeClient->argc = argc;
5653 fakeClient->argv = argv;
5654 cmd->proc(fakeClient);
5655 /* Discard the reply objects list from the fake client */
5656 while(listLength(fakeClient->reply))
5657 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5658 /* Clean up, ready for the next command */
5659 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5660 zfree(argv);
5661 }
5662 fclose(fp);
5663 freeFakeClient(fakeClient);
5664 return REDIS_OK;
5665
5666readerr:
5667 if (feof(fp)) {
5668 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5669 } else {
5670 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5671 }
5672 exit(1);
5673fmterr:
5674 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5675 exit(1);
5676}
5677
5678/* ================================= Debugging ============================== */
5679
5680static void debugCommand(redisClient *c) {
5681 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5682 *((char*)-1) = 'x';
5683 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5684 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5685 robj *key, *val;
5686
5687 if (!de) {
5688 addReply(c,shared.nokeyerr);
5689 return;
5690 }
5691 key = dictGetEntryKey(de);
5692 val = dictGetEntryVal(de);
5693 addReplySds(c,sdscatprintf(sdsempty(),
5694 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5695 key, key->refcount, val, val->refcount, val->encoding));
5696 } else {
5697 addReplySds(c,sdsnew(
5698 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5699 }
5700}
5701
5702/* =================================== Main! ================================ */
5703
5704#ifdef __linux__
5705int linuxOvercommitMemoryValue(void) {
5706 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5707 char buf[64];
5708
5709 if (!fp) return -1;
5710 if (fgets(buf,64,fp) == NULL) {
5711 fclose(fp);
5712 return -1;
5713 }
5714 fclose(fp);
5715
5716 return atoi(buf);
5717}
5718
5719void linuxOvercommitMemoryWarning(void) {
5720 if (linuxOvercommitMemoryValue() == 0) {
5721 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5722 }
5723}
5724#endif /* __linux__ */
5725
5726static void daemonize(void) {
5727 int fd;
5728 FILE *fp;
5729
5730 if (fork() != 0) exit(0); /* parent exits */
5731 setsid(); /* create a new session */
5732
5733 /* Every output goes to /dev/null. If Redis is daemonized but
5734 * the 'logfile' is set to 'stdout' in the configuration file
5735 * it will not log at all. */
5736 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5737 dup2(fd, STDIN_FILENO);
5738 dup2(fd, STDOUT_FILENO);
5739 dup2(fd, STDERR_FILENO);
5740 if (fd > STDERR_FILENO) close(fd);
5741 }
5742 /* Try to write the pid file */
5743 fp = fopen(server.pidfile,"w");
5744 if (fp) {
5745 fprintf(fp,"%d\n",getpid());
5746 fclose(fp);
5747 }
5748}
5749
5750int main(int argc, char **argv) {
5751 initServerConfig();
5752 if (argc == 2) {
5753 resetServerSaveParams();
5754 loadServerConfig(argv[1]);
5755 } else if (argc > 2) {
5756 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5757 exit(1);
5758 } else {
5759 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5760 }
5761 initServer();
5762 if (server.daemonize) daemonize();
5763 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5764#ifdef __linux__
5765 linuxOvercommitMemoryWarning();
5766#endif
5767 if (server.appendonly) {
5768 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5769 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5770 } else {
5771 if (rdbLoad(server.dbfilename) == REDIS_OK)
5772 redisLog(REDIS_NOTICE,"DB loaded from disk");
5773 }
5774 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5775 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5776 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5777 aeMain(server.el);
5778 aeDeleteEventLoop(server.el);
5779 return 0;
5780}
5781
5782/* ============================= Backtrace support ========================= */
5783
5784#ifdef HAVE_BACKTRACE
5785static char *findFuncName(void *pointer, unsigned long *offset);
5786
5787static void *getMcontextEip(ucontext_t *uc) {
5788#if defined(__FreeBSD__)
5789 return (void*) uc->uc_mcontext.mc_eip;
5790#elif defined(__dietlibc__)
5791 return (void*) uc->uc_mcontext.eip;
5792#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5793 return (void*) uc->uc_mcontext->__ss.__eip;
5794#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5795 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5796 return (void*) uc->uc_mcontext->__ss.__rip;
5797 #else
5798 return (void*) uc->uc_mcontext->__ss.__eip;
5799 #endif
5800#elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5801 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5802#elif defined(__ia64__) /* Linux IA64 */
5803 return (void*) uc->uc_mcontext.sc_ip;
5804#else
5805 return NULL;
5806#endif
5807}
5808
5809static void segvHandler(int sig, siginfo_t *info, void *secret) {
5810 void *trace[100];
5811 char **messages = NULL;
5812 int i, trace_size = 0;
5813 unsigned long offset=0;
5814 time_t uptime = time(NULL)-server.stat_starttime;
5815 ucontext_t *uc = (ucontext_t*) secret;
5816 REDIS_NOTUSED(info);
5817
5818 redisLog(REDIS_WARNING,
5819 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5820 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5821 "redis_version:%s; "
5822 "uptime_in_seconds:%d; "
5823 "connected_clients:%d; "
5824 "connected_slaves:%d; "
5825 "used_memory:%zu; "
5826 "changes_since_last_save:%lld; "
5827 "bgsave_in_progress:%d; "
5828 "last_save_time:%d; "
5829 "total_connections_received:%lld; "
5830 "total_commands_processed:%lld; "
5831 "role:%s;"
5832 ,REDIS_VERSION,
5833 uptime,
5834 listLength(server.clients)-listLength(server.slaves),
5835 listLength(server.slaves),
5836 server.usedmemory,
5837 server.dirty,
5838 server.bgsaveinprogress,
5839 server.lastsave,
5840 server.stat_numconnections,
5841 server.stat_numcommands,
5842 server.masterhost == NULL ? "master" : "slave"
5843 ));
5844
5845 trace_size = backtrace(trace, 100);
5846 /* overwrite sigaction with caller's address */
5847 if (getMcontextEip(uc) != NULL) {
5848 trace[1] = getMcontextEip(uc);
5849 }
5850 messages = backtrace_symbols(trace, trace_size);
5851
5852 for (i=1; i<trace_size; ++i) {
5853 char *fn = findFuncName(trace[i], &offset), *p;
5854
5855 p = strchr(messages[i],'+');
5856 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5857 redisLog(REDIS_WARNING,"%s", messages[i]);
5858 } else {
5859 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5860 }
5861 }
5862 free(messages);
5863 exit(0);
5864}
5865
5866static void setupSigSegvAction(void) {
5867 struct sigaction act;
5868
5869 sigemptyset (&act.sa_mask);
5870 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5871 * is used. Otherwise, sa_handler is used */
5872 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5873 act.sa_sigaction = segvHandler;
5874 sigaction (SIGSEGV, &act, NULL);
5875 sigaction (SIGBUS, &act, NULL);
5876 sigaction (SIGFPE, &act, NULL);
5877 sigaction (SIGILL, &act, NULL);
5878 sigaction (SIGBUS, &act, NULL);
5879 return;
5880}
5881
5882#include "staticsymbols.h"
5883/* This function try to convert a pointer into a function name. It's used in
5884 * oreder to provide a backtrace under segmentation fault that's able to
5885 * display functions declared as static (otherwise the backtrace is useless). */
5886static char *findFuncName(void *pointer, unsigned long *offset){
5887 int i, ret = -1;
5888 unsigned long off, minoff = 0;
5889
5890 /* Try to match against the Symbol with the smallest offset */
5891 for (i=0; symsTable[i].pointer; i++) {
5892 unsigned long lp = (unsigned long) pointer;
5893
5894 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5895 off=lp-symsTable[i].pointer;
5896 if (ret < 0 || off < minoff) {
5897 minoff=off;
5898 ret=i;
5899 }
5900 }
5901 }
5902 if (ret == -1) return NULL;
5903 *offset = minoff;
5904 return symsTable[ret].name;
5905}
5906#else /* HAVE_BACKTRACE */
5907static void setupSigSegvAction(void) {
5908}
5909#endif /* HAVE_BACKTRACE */
5910
5911
5912
5913/* The End */
5914
5915
5916