]> git.saurik.com Git - redis.git/blob - redis.c
52143e56735e271dd0a909239530a2192ea3284f
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 int bgsaveinprogress;
279 pid_t bgsavechildpid;
280 struct saveparam *saveparams;
281 int saveparamslen;
282 char *logfile;
283 char *bindaddr;
284 char *dbfilename;
285 char *appendfilename;
286 char *requirepass;
287 int shareobjects;
288 /* Replication related */
289 int isslave;
290 char *masterauth;
291 char *masterhost;
292 int masterport;
293 redisClient *master; /* client that is master for this slave */
294 int replstate;
295 unsigned int maxclients;
296 unsigned long maxmemory;
297 /* Sort parameters - qsort_r() is only available under BSD so we
298 * have to take this state global, in order to pass it to sortCompare() */
299 int sort_desc;
300 int sort_alpha;
301 int sort_bypattern;
302 };
303
304 typedef void redisCommandProc(redisClient *c);
305 struct redisCommand {
306 char *name;
307 redisCommandProc *proc;
308 int arity;
309 int flags;
310 };
311
312 struct redisFunctionSym {
313 char *name;
314 unsigned long pointer;
315 };
316
317 typedef struct _redisSortObject {
318 robj *obj;
319 union {
320 double score;
321 robj *cmpobj;
322 } u;
323 } redisSortObject;
324
325 typedef struct _redisSortOperation {
326 int type;
327 robj *pattern;
328 } redisSortOperation;
329
330 /* ZSETs use a specialized version of Skiplists */
331
332 typedef struct zskiplistNode {
333 struct zskiplistNode **forward;
334 struct zskiplistNode *backward;
335 double score;
336 robj *obj;
337 } zskiplistNode;
338
339 typedef struct zskiplist {
340 struct zskiplistNode *header, *tail;
341 unsigned long length;
342 int level;
343 } zskiplist;
344
345 typedef struct zset {
346 dict *dict;
347 zskiplist *zsl;
348 } zset;
349
350 /* Our shared "common" objects */
351
352 struct sharedObjectsStruct {
353 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
354 *colon, *nullbulk, *nullmultibulk,
355 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
356 *outofrangeerr, *plus,
357 *select0, *select1, *select2, *select3, *select4,
358 *select5, *select6, *select7, *select8, *select9;
359 } shared;
360
361 /* Global vars that are actally used as constants. The following double
362 * values are used for double on-disk serialization, and are initialized
363 * at runtime to avoid strange compiler optimizations. */
364
365 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
366
367 /*================================ Prototypes =============================== */
368
369 static void freeStringObject(robj *o);
370 static void freeListObject(robj *o);
371 static void freeSetObject(robj *o);
372 static void decrRefCount(void *o);
373 static robj *createObject(int type, void *ptr);
374 static void freeClient(redisClient *c);
375 static int rdbLoad(char *filename);
376 static void addReply(redisClient *c, robj *obj);
377 static void addReplySds(redisClient *c, sds s);
378 static void incrRefCount(robj *o);
379 static int rdbSaveBackground(char *filename);
380 static robj *createStringObject(char *ptr, size_t len);
381 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
382 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static int syncWithMaster(void);
384 static robj *tryObjectSharing(robj *o);
385 static int tryObjectEncoding(robj *o);
386 static robj *getDecodedObject(const robj *o);
387 static int removeExpire(redisDb *db, robj *key);
388 static int expireIfNeeded(redisDb *db, robj *key);
389 static int deleteIfVolatile(redisDb *db, robj *key);
390 static int deleteKey(redisDb *db, robj *key);
391 static time_t getExpire(redisDb *db, robj *key);
392 static int setExpire(redisDb *db, robj *key, time_t when);
393 static void updateSlavesWaitingBgsave(int bgsaveerr);
394 static void freeMemoryIfNeeded(void);
395 static int processCommand(redisClient *c);
396 static void setupSigSegvAction(void);
397 static void rdbRemoveTempFile(pid_t childpid);
398 static size_t stringObjectLen(robj *o);
399 static void processInputBuffer(redisClient *c);
400 static zskiplist *zslCreate(void);
401 static void zslFree(zskiplist *zsl);
402 static void zslInsert(zskiplist *zsl, double score, robj *obj);
403 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
404
405 static void authCommand(redisClient *c);
406 static void pingCommand(redisClient *c);
407 static void echoCommand(redisClient *c);
408 static void setCommand(redisClient *c);
409 static void setnxCommand(redisClient *c);
410 static void getCommand(redisClient *c);
411 static void delCommand(redisClient *c);
412 static void existsCommand(redisClient *c);
413 static void incrCommand(redisClient *c);
414 static void decrCommand(redisClient *c);
415 static void incrbyCommand(redisClient *c);
416 static void decrbyCommand(redisClient *c);
417 static void selectCommand(redisClient *c);
418 static void randomkeyCommand(redisClient *c);
419 static void keysCommand(redisClient *c);
420 static void dbsizeCommand(redisClient *c);
421 static void lastsaveCommand(redisClient *c);
422 static void saveCommand(redisClient *c);
423 static void bgsaveCommand(redisClient *c);
424 static void shutdownCommand(redisClient *c);
425 static void moveCommand(redisClient *c);
426 static void renameCommand(redisClient *c);
427 static void renamenxCommand(redisClient *c);
428 static void lpushCommand(redisClient *c);
429 static void rpushCommand(redisClient *c);
430 static void lpopCommand(redisClient *c);
431 static void rpopCommand(redisClient *c);
432 static void llenCommand(redisClient *c);
433 static void lindexCommand(redisClient *c);
434 static void lrangeCommand(redisClient *c);
435 static void ltrimCommand(redisClient *c);
436 static void typeCommand(redisClient *c);
437 static void lsetCommand(redisClient *c);
438 static void saddCommand(redisClient *c);
439 static void sremCommand(redisClient *c);
440 static void smoveCommand(redisClient *c);
441 static void sismemberCommand(redisClient *c);
442 static void scardCommand(redisClient *c);
443 static void spopCommand(redisClient *c);
444 static void srandmemberCommand(redisClient *c);
445 static void sinterCommand(redisClient *c);
446 static void sinterstoreCommand(redisClient *c);
447 static void sunionCommand(redisClient *c);
448 static void sunionstoreCommand(redisClient *c);
449 static void sdiffCommand(redisClient *c);
450 static void sdiffstoreCommand(redisClient *c);
451 static void syncCommand(redisClient *c);
452 static void flushdbCommand(redisClient *c);
453 static void flushallCommand(redisClient *c);
454 static void sortCommand(redisClient *c);
455 static void lremCommand(redisClient *c);
456 static void rpoplpushcommand(redisClient *c);
457 static void infoCommand(redisClient *c);
458 static void mgetCommand(redisClient *c);
459 static void monitorCommand(redisClient *c);
460 static void expireCommand(redisClient *c);
461 static void expireatCommand(redisClient *c);
462 static void getsetCommand(redisClient *c);
463 static void ttlCommand(redisClient *c);
464 static void slaveofCommand(redisClient *c);
465 static void debugCommand(redisClient *c);
466 static void msetCommand(redisClient *c);
467 static void msetnxCommand(redisClient *c);
468 static void zaddCommand(redisClient *c);
469 static void zrangeCommand(redisClient *c);
470 static void zrangebyscoreCommand(redisClient *c);
471 static void zrevrangeCommand(redisClient *c);
472 static void zcardCommand(redisClient *c);
473 static void zremCommand(redisClient *c);
474 static void zscoreCommand(redisClient *c);
475 static void zremrangebyscoreCommand(redisClient *c);
476
477 /*================================= Globals ================================= */
478
479 /* Global vars */
480 static struct redisServer server; /* server global state */
481 static struct redisCommand cmdTable[] = {
482 {"get",getCommand,2,REDIS_CMD_INLINE},
483 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"del",delCommand,-2,REDIS_CMD_INLINE},
486 {"exists",existsCommand,2,REDIS_CMD_INLINE},
487 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
490 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
492 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
493 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
494 {"llen",llenCommand,2,REDIS_CMD_INLINE},
495 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
496 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
498 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
499 {"lrem",lremCommand,4,REDIS_CMD_BULK},
500 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
501 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"srem",sremCommand,3,REDIS_CMD_BULK},
503 {"smove",smoveCommand,4,REDIS_CMD_BULK},
504 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
505 {"scard",scardCommand,2,REDIS_CMD_INLINE},
506 {"spop",spopCommand,2,REDIS_CMD_INLINE},
507 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
508 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
511 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
512 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
515 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"zrem",zremCommand,3,REDIS_CMD_BULK},
517 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
518 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
519 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
520 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
521 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
522 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
523 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
524 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
525 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
526 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
527 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
529 {"select",selectCommand,2,REDIS_CMD_INLINE},
530 {"move",moveCommand,3,REDIS_CMD_INLINE},
531 {"rename",renameCommand,3,REDIS_CMD_INLINE},
532 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
533 {"expire",expireCommand,3,REDIS_CMD_INLINE},
534 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
535 {"keys",keysCommand,2,REDIS_CMD_INLINE},
536 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
537 {"auth",authCommand,2,REDIS_CMD_INLINE},
538 {"ping",pingCommand,1,REDIS_CMD_INLINE},
539 {"echo",echoCommand,2,REDIS_CMD_BULK},
540 {"save",saveCommand,1,REDIS_CMD_INLINE},
541 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
542 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
543 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
544 {"type",typeCommand,2,REDIS_CMD_INLINE},
545 {"sync",syncCommand,1,REDIS_CMD_INLINE},
546 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
547 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
548 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
549 {"info",infoCommand,1,REDIS_CMD_INLINE},
550 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
551 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
552 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
553 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
554 {NULL,NULL,0,0}
555 };
556
557 /*============================ Utility functions ============================ */
558
559 /* Glob-style pattern matching. */
560 int stringmatchlen(const char *pattern, int patternLen,
561 const char *string, int stringLen, int nocase)
562 {
563 while(patternLen) {
564 switch(pattern[0]) {
565 case '*':
566 while (pattern[1] == '*') {
567 pattern++;
568 patternLen--;
569 }
570 if (patternLen == 1)
571 return 1; /* match */
572 while(stringLen) {
573 if (stringmatchlen(pattern+1, patternLen-1,
574 string, stringLen, nocase))
575 return 1; /* match */
576 string++;
577 stringLen--;
578 }
579 return 0; /* no match */
580 break;
581 case '?':
582 if (stringLen == 0)
583 return 0; /* no match */
584 string++;
585 stringLen--;
586 break;
587 case '[':
588 {
589 int not, match;
590
591 pattern++;
592 patternLen--;
593 not = pattern[0] == '^';
594 if (not) {
595 pattern++;
596 patternLen--;
597 }
598 match = 0;
599 while(1) {
600 if (pattern[0] == '\\') {
601 pattern++;
602 patternLen--;
603 if (pattern[0] == string[0])
604 match = 1;
605 } else if (pattern[0] == ']') {
606 break;
607 } else if (patternLen == 0) {
608 pattern--;
609 patternLen++;
610 break;
611 } else if (pattern[1] == '-' && patternLen >= 3) {
612 int start = pattern[0];
613 int end = pattern[2];
614 int c = string[0];
615 if (start > end) {
616 int t = start;
617 start = end;
618 end = t;
619 }
620 if (nocase) {
621 start = tolower(start);
622 end = tolower(end);
623 c = tolower(c);
624 }
625 pattern += 2;
626 patternLen -= 2;
627 if (c >= start && c <= end)
628 match = 1;
629 } else {
630 if (!nocase) {
631 if (pattern[0] == string[0])
632 match = 1;
633 } else {
634 if (tolower((int)pattern[0]) == tolower((int)string[0]))
635 match = 1;
636 }
637 }
638 pattern++;
639 patternLen--;
640 }
641 if (not)
642 match = !match;
643 if (!match)
644 return 0; /* no match */
645 string++;
646 stringLen--;
647 break;
648 }
649 case '\\':
650 if (patternLen >= 2) {
651 pattern++;
652 patternLen--;
653 }
654 /* fall through */
655 default:
656 if (!nocase) {
657 if (pattern[0] != string[0])
658 return 0; /* no match */
659 } else {
660 if (tolower((int)pattern[0]) != tolower((int)string[0]))
661 return 0; /* no match */
662 }
663 string++;
664 stringLen--;
665 break;
666 }
667 pattern++;
668 patternLen--;
669 if (stringLen == 0) {
670 while(*pattern == '*') {
671 pattern++;
672 patternLen--;
673 }
674 break;
675 }
676 }
677 if (patternLen == 0 && stringLen == 0)
678 return 1;
679 return 0;
680 }
681
682 static void redisLog(int level, const char *fmt, ...) {
683 va_list ap;
684 FILE *fp;
685
686 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
687 if (!fp) return;
688
689 va_start(ap, fmt);
690 if (level >= server.verbosity) {
691 char *c = ".-*";
692 char buf[64];
693 time_t now;
694
695 now = time(NULL);
696 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
697 fprintf(fp,"%s %c ",buf,c[level]);
698 vfprintf(fp, fmt, ap);
699 fprintf(fp,"\n");
700 fflush(fp);
701 }
702 va_end(ap);
703
704 if (server.logfile) fclose(fp);
705 }
706
707 /*====================== Hash table type implementation ==================== */
708
709 /* This is an hash table type that uses the SDS dynamic strings libary as
710 * keys and radis objects as values (objects can hold SDS strings,
711 * lists, sets). */
712
713 static void dictVanillaFree(void *privdata, void *val)
714 {
715 DICT_NOTUSED(privdata);
716 zfree(val);
717 }
718
719 static int sdsDictKeyCompare(void *privdata, const void *key1,
720 const void *key2)
721 {
722 int l1,l2;
723 DICT_NOTUSED(privdata);
724
725 l1 = sdslen((sds)key1);
726 l2 = sdslen((sds)key2);
727 if (l1 != l2) return 0;
728 return memcmp(key1, key2, l1) == 0;
729 }
730
731 static void dictRedisObjectDestructor(void *privdata, void *val)
732 {
733 DICT_NOTUSED(privdata);
734
735 decrRefCount(val);
736 }
737
738 static int dictObjKeyCompare(void *privdata, const void *key1,
739 const void *key2)
740 {
741 const robj *o1 = key1, *o2 = key2;
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743 }
744
745 static unsigned int dictObjHash(const void *key) {
746 const robj *o = key;
747 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
748 }
749
750 static int dictEncObjKeyCompare(void *privdata, const void *key1,
751 const void *key2)
752 {
753 const robj *o1 = key1, *o2 = key2;
754
755 if (o1->encoding == REDIS_ENCODING_RAW &&
756 o2->encoding == REDIS_ENCODING_RAW)
757 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
758 else {
759 robj *dec1, *dec2;
760 int cmp;
761
762 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
763 getDecodedObject(o1) : (robj*)o1;
764 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
765 getDecodedObject(o2) : (robj*)o2;
766 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
767 if (dec1 != o1) decrRefCount(dec1);
768 if (dec2 != o2) decrRefCount(dec2);
769 return cmp;
770 }
771 }
772
773 static unsigned int dictEncObjHash(const void *key) {
774 const robj *o = key;
775
776 if (o->encoding == REDIS_ENCODING_RAW)
777 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
778 else {
779 robj *dec = getDecodedObject(o);
780 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
781 decrRefCount(dec);
782 return hash;
783 }
784 }
785
786 static dictType setDictType = {
787 dictEncObjHash, /* hash function */
788 NULL, /* key dup */
789 NULL, /* val dup */
790 dictEncObjKeyCompare, /* key compare */
791 dictRedisObjectDestructor, /* key destructor */
792 NULL /* val destructor */
793 };
794
795 static dictType zsetDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 dictVanillaFree /* val destructor */
802 };
803
804 static dictType hashDictType = {
805 dictObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictRedisObjectDestructor /* val destructor */
811 };
812
813 /* ========================= Random utility functions ======================= */
814
815 /* Redis generally does not try to recover from out of memory conditions
816 * when allocating objects or strings, it is not clear if it will be possible
817 * to report this condition to the client since the networking layer itself
818 * is based on heap allocation for send buffers, so we simply abort.
819 * At least the code will be simpler to read... */
820 static void oom(const char *msg) {
821 fprintf(stderr, "%s: Out of memory\n",msg);
822 fflush(stderr);
823 sleep(1);
824 abort();
825 }
826
827 /* ====================== Redis server networking stuff ===================== */
828 static void closeTimedoutClients(void) {
829 redisClient *c;
830 listNode *ln;
831 time_t now = time(NULL);
832
833 listRewind(server.clients);
834 while ((ln = listYield(server.clients)) != NULL) {
835 c = listNodeValue(ln);
836 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
837 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
838 (now - c->lastinteraction > server.maxidletime)) {
839 redisLog(REDIS_DEBUG,"Closing idle client");
840 freeClient(c);
841 }
842 }
843 }
844
845 static int htNeedsResize(dict *dict) {
846 long long size, used;
847
848 size = dictSlots(dict);
849 used = dictSize(dict);
850 return (size && used && size > DICT_HT_INITIAL_SIZE &&
851 (used*100/size < REDIS_HT_MINFILL));
852 }
853
854 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
855 * we resize the hash table to save memory */
856 static void tryResizeHashTables(void) {
857 int j;
858
859 for (j = 0; j < server.dbnum; j++) {
860 if (htNeedsResize(server.db[j].dict)) {
861 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
862 dictResize(server.db[j].dict);
863 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
864 }
865 if (htNeedsResize(server.db[j].expires))
866 dictResize(server.db[j].expires);
867 }
868 }
869
870 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
871 int j, loops = server.cronloops++;
872 REDIS_NOTUSED(eventLoop);
873 REDIS_NOTUSED(id);
874 REDIS_NOTUSED(clientData);
875
876 /* Update the global state with the amount of used memory */
877 server.usedmemory = zmalloc_used_memory();
878
879 /* Show some info about non-empty databases */
880 for (j = 0; j < server.dbnum; j++) {
881 long long size, used, vkeys;
882
883 size = dictSlots(server.db[j].dict);
884 used = dictSize(server.db[j].dict);
885 vkeys = dictSize(server.db[j].expires);
886 if (!(loops % 5) && (used || vkeys)) {
887 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
888 /* dictPrintStats(server.dict); */
889 }
890 }
891
892 /* We don't want to resize the hash tables while a bacground saving
893 * is in progress: the saving child is created using fork() that is
894 * implemented with a copy-on-write semantic in most modern systems, so
895 * if we resize the HT while there is the saving child at work actually
896 * a lot of memory movements in the parent will cause a lot of pages
897 * copied. */
898 if (!server.bgsaveinprogress) tryResizeHashTables();
899
900 /* Show information about connected clients */
901 if (!(loops % 5)) {
902 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
903 listLength(server.clients)-listLength(server.slaves),
904 listLength(server.slaves),
905 server.usedmemory,
906 dictSize(server.sharingpool));
907 }
908
909 /* Close connections of timedout clients */
910 if (server.maxidletime && !(loops % 10))
911 closeTimedoutClients();
912
913 /* Check if a background saving in progress terminated */
914 if (server.bgsaveinprogress) {
915 int statloc;
916 if (wait3(&statloc,WNOHANG,NULL)) {
917 int exitcode = WEXITSTATUS(statloc);
918 int bysignal = WIFSIGNALED(statloc);
919
920 if (!bysignal && exitcode == 0) {
921 redisLog(REDIS_NOTICE,
922 "Background saving terminated with success");
923 server.dirty = 0;
924 server.lastsave = time(NULL);
925 } else if (!bysignal && exitcode != 0) {
926 redisLog(REDIS_WARNING, "Background saving error");
927 } else {
928 redisLog(REDIS_WARNING,
929 "Background saving terminated by signal");
930 rdbRemoveTempFile(server.bgsavechildpid);
931 }
932 server.bgsaveinprogress = 0;
933 server.bgsavechildpid = -1;
934 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
935 }
936 } else {
937 /* If there is not a background saving in progress check if
938 * we have to save now */
939 time_t now = time(NULL);
940 for (j = 0; j < server.saveparamslen; j++) {
941 struct saveparam *sp = server.saveparams+j;
942
943 if (server.dirty >= sp->changes &&
944 now-server.lastsave > sp->seconds) {
945 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
946 sp->changes, sp->seconds);
947 rdbSaveBackground(server.dbfilename);
948 break;
949 }
950 }
951 }
952
953 /* Try to expire a few timed out keys. The algorithm used is adaptive and
954 * will use few CPU cycles if there are few expiring keys, otherwise
955 * it will get more aggressive to avoid that too much memory is used by
956 * keys that can be removed from the keyspace. */
957 for (j = 0; j < server.dbnum; j++) {
958 int expired;
959 redisDb *db = server.db+j;
960
961 /* Continue to expire if at the end of the cycle more than 25%
962 * of the keys were expired. */
963 do {
964 int num = dictSize(db->expires);
965 time_t now = time(NULL);
966
967 expired = 0;
968 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
969 num = REDIS_EXPIRELOOKUPS_PER_CRON;
970 while (num--) {
971 dictEntry *de;
972 time_t t;
973
974 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
975 t = (time_t) dictGetEntryVal(de);
976 if (now > t) {
977 deleteKey(db,dictGetEntryKey(de));
978 expired++;
979 }
980 }
981 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
982 }
983
984 /* Check if we should connect to a MASTER */
985 if (server.replstate == REDIS_REPL_CONNECT) {
986 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
987 if (syncWithMaster() == REDIS_OK) {
988 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
989 }
990 }
991 return 1000;
992 }
993
994 static void createSharedObjects(void) {
995 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
996 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
997 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
998 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
999 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1000 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1001 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1002 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1003 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1004 /* no such key */
1005 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1006 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1007 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1008 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1009 "-ERR no such key\r\n"));
1010 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1011 "-ERR syntax error\r\n"));
1012 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1013 "-ERR source and destination objects are the same\r\n"));
1014 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1015 "-ERR index out of range\r\n"));
1016 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1017 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1018 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1019 shared.select0 = createStringObject("select 0\r\n",10);
1020 shared.select1 = createStringObject("select 1\r\n",10);
1021 shared.select2 = createStringObject("select 2\r\n",10);
1022 shared.select3 = createStringObject("select 3\r\n",10);
1023 shared.select4 = createStringObject("select 4\r\n",10);
1024 shared.select5 = createStringObject("select 5\r\n",10);
1025 shared.select6 = createStringObject("select 6\r\n",10);
1026 shared.select7 = createStringObject("select 7\r\n",10);
1027 shared.select8 = createStringObject("select 8\r\n",10);
1028 shared.select9 = createStringObject("select 9\r\n",10);
1029 }
1030
1031 static void appendServerSaveParams(time_t seconds, int changes) {
1032 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1033 server.saveparams[server.saveparamslen].seconds = seconds;
1034 server.saveparams[server.saveparamslen].changes = changes;
1035 server.saveparamslen++;
1036 }
1037
1038 static void resetServerSaveParams() {
1039 zfree(server.saveparams);
1040 server.saveparams = NULL;
1041 server.saveparamslen = 0;
1042 }
1043
1044 static void initServerConfig() {
1045 server.dbnum = REDIS_DEFAULT_DBNUM;
1046 server.port = REDIS_SERVERPORT;
1047 server.verbosity = REDIS_DEBUG;
1048 server.maxidletime = REDIS_MAXIDLETIME;
1049 server.saveparams = NULL;
1050 server.logfile = NULL; /* NULL = log on standard output */
1051 server.bindaddr = NULL;
1052 server.glueoutputbuf = 1;
1053 server.daemonize = 0;
1054 server.appendonly = 0;
1055 server.appendfsync = APPENDFSYNC_ALWAYS;
1056 server.lastfsync = time(NULL);
1057 server.appendfd = -1;
1058 server.appendseldb = -1; /* Make sure the first time will not match */
1059 server.pidfile = "/var/run/redis.pid";
1060 server.dbfilename = "dump.rdb";
1061 server.appendfilename = "appendonly.log";
1062 server.requirepass = NULL;
1063 server.shareobjects = 0;
1064 server.sharingpoolsize = 1024;
1065 server.maxclients = 0;
1066 server.maxmemory = 0;
1067 resetServerSaveParams();
1068
1069 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1070 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1071 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1072 /* Replication related */
1073 server.isslave = 0;
1074 server.masterauth = NULL;
1075 server.masterhost = NULL;
1076 server.masterport = 6379;
1077 server.master = NULL;
1078 server.replstate = REDIS_REPL_NONE;
1079
1080 /* Double constants initialization */
1081 R_Zero = 0.0;
1082 R_PosInf = 1.0/R_Zero;
1083 R_NegInf = -1.0/R_Zero;
1084 R_Nan = R_Zero/R_Zero;
1085 }
1086
1087 static void initServer() {
1088 int j;
1089
1090 signal(SIGHUP, SIG_IGN);
1091 signal(SIGPIPE, SIG_IGN);
1092 setupSigSegvAction();
1093
1094 server.clients = listCreate();
1095 server.slaves = listCreate();
1096 server.monitors = listCreate();
1097 server.objfreelist = listCreate();
1098 createSharedObjects();
1099 server.el = aeCreateEventLoop();
1100 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1101 server.sharingpool = dictCreate(&setDictType,NULL);
1102 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1103 if (server.fd == -1) {
1104 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1105 exit(1);
1106 }
1107 for (j = 0; j < server.dbnum; j++) {
1108 server.db[j].dict = dictCreate(&hashDictType,NULL);
1109 server.db[j].expires = dictCreate(&setDictType,NULL);
1110 server.db[j].id = j;
1111 }
1112 server.cronloops = 0;
1113 server.bgsaveinprogress = 0;
1114 server.bgsavechildpid = -1;
1115 server.lastsave = time(NULL);
1116 server.dirty = 0;
1117 server.usedmemory = 0;
1118 server.stat_numcommands = 0;
1119 server.stat_numconnections = 0;
1120 server.stat_starttime = time(NULL);
1121 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1122
1123 if (server.appendonly) {
1124 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1125 if (server.appendfd == -1) {
1126 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1127 strerror(errno));
1128 exit(1);
1129 }
1130 }
1131 }
1132
1133 /* Empty the whole database */
1134 static long long emptyDb() {
1135 int j;
1136 long long removed = 0;
1137
1138 for (j = 0; j < server.dbnum; j++) {
1139 removed += dictSize(server.db[j].dict);
1140 dictEmpty(server.db[j].dict);
1141 dictEmpty(server.db[j].expires);
1142 }
1143 return removed;
1144 }
1145
1146 static int yesnotoi(char *s) {
1147 if (!strcasecmp(s,"yes")) return 1;
1148 else if (!strcasecmp(s,"no")) return 0;
1149 else return -1;
1150 }
1151
1152 /* I agree, this is a very rudimental way to load a configuration...
1153 will improve later if the config gets more complex */
1154 static void loadServerConfig(char *filename) {
1155 FILE *fp;
1156 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1157 int linenum = 0;
1158 sds line = NULL;
1159
1160 if (filename[0] == '-' && filename[1] == '\0')
1161 fp = stdin;
1162 else {
1163 if ((fp = fopen(filename,"r")) == NULL) {
1164 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1165 exit(1);
1166 }
1167 }
1168
1169 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1170 sds *argv;
1171 int argc, j;
1172
1173 linenum++;
1174 line = sdsnew(buf);
1175 line = sdstrim(line," \t\r\n");
1176
1177 /* Skip comments and blank lines*/
1178 if (line[0] == '#' || line[0] == '\0') {
1179 sdsfree(line);
1180 continue;
1181 }
1182
1183 /* Split into arguments */
1184 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1185 sdstolower(argv[0]);
1186
1187 /* Execute config directives */
1188 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1189 server.maxidletime = atoi(argv[1]);
1190 if (server.maxidletime < 0) {
1191 err = "Invalid timeout value"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1194 server.port = atoi(argv[1]);
1195 if (server.port < 1 || server.port > 65535) {
1196 err = "Invalid port"; goto loaderr;
1197 }
1198 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1199 server.bindaddr = zstrdup(argv[1]);
1200 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1201 int seconds = atoi(argv[1]);
1202 int changes = atoi(argv[2]);
1203 if (seconds < 1 || changes < 0) {
1204 err = "Invalid save parameters"; goto loaderr;
1205 }
1206 appendServerSaveParams(seconds,changes);
1207 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1208 if (chdir(argv[1]) == -1) {
1209 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1210 argv[1], strerror(errno));
1211 exit(1);
1212 }
1213 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1214 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1215 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1216 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1217 else {
1218 err = "Invalid log level. Must be one of debug, notice, warning";
1219 goto loaderr;
1220 }
1221 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1222 FILE *logfp;
1223
1224 server.logfile = zstrdup(argv[1]);
1225 if (!strcasecmp(server.logfile,"stdout")) {
1226 zfree(server.logfile);
1227 server.logfile = NULL;
1228 }
1229 if (server.logfile) {
1230 /* Test if we are able to open the file. The server will not
1231 * be able to abort just for this problem later... */
1232 logfp = fopen(server.logfile,"a");
1233 if (logfp == NULL) {
1234 err = sdscatprintf(sdsempty(),
1235 "Can't open the log file: %s", strerror(errno));
1236 goto loaderr;
1237 }
1238 fclose(logfp);
1239 }
1240 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1241 server.dbnum = atoi(argv[1]);
1242 if (server.dbnum < 1) {
1243 err = "Invalid number of databases"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1246 server.maxclients = atoi(argv[1]);
1247 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1248 server.maxmemory = strtoll(argv[1], NULL, 10);
1249 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1250 server.masterhost = sdsnew(argv[1]);
1251 server.masterport = atoi(argv[2]);
1252 server.replstate = REDIS_REPL_CONNECT;
1253 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1254 server.masterauth = zstrdup(argv[1]);
1255 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1256 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1257 err = "argument must be 'yes' or 'no'"; goto loaderr;
1258 }
1259 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1260 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1261 err = "argument must be 'yes' or 'no'"; goto loaderr;
1262 }
1263 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1264 server.sharingpoolsize = atoi(argv[1]);
1265 if (server.sharingpoolsize < 1) {
1266 err = "invalid object sharing pool size"; goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1269 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1270 err = "argument must be 'yes' or 'no'"; goto loaderr;
1271 }
1272 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1273 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1274 err = "argument must be 'yes' or 'no'"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1277 if (!strcasecmp(argv[1],"no")) {
1278 server.appendfsync = APPENDFSYNC_NO;
1279 } else if (!strcasecmp(argv[1],"always")) {
1280 server.appendfsync = APPENDFSYNC_ALWAYS;
1281 } else if (!strcasecmp(argv[1],"everysec")) {
1282 server.appendfsync = APPENDFSYNC_EVERYSEC;
1283 } else {
1284 err = "argument must be 'no', 'always' or 'everysec'";
1285 goto loaderr;
1286 }
1287 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1288 server.requirepass = zstrdup(argv[1]);
1289 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1290 server.pidfile = zstrdup(argv[1]);
1291 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1292 server.dbfilename = zstrdup(argv[1]);
1293 } else {
1294 err = "Bad directive or wrong number of arguments"; goto loaderr;
1295 }
1296 for (j = 0; j < argc; j++)
1297 sdsfree(argv[j]);
1298 zfree(argv);
1299 sdsfree(line);
1300 }
1301 if (fp != stdin) fclose(fp);
1302 return;
1303
1304 loaderr:
1305 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1306 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1307 fprintf(stderr, ">>> '%s'\n", line);
1308 fprintf(stderr, "%s\n", err);
1309 exit(1);
1310 }
1311
1312 static void freeClientArgv(redisClient *c) {
1313 int j;
1314
1315 for (j = 0; j < c->argc; j++)
1316 decrRefCount(c->argv[j]);
1317 for (j = 0; j < c->mbargc; j++)
1318 decrRefCount(c->mbargv[j]);
1319 c->argc = 0;
1320 c->mbargc = 0;
1321 }
1322
1323 static void freeClient(redisClient *c) {
1324 listNode *ln;
1325
1326 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1327 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1328 sdsfree(c->querybuf);
1329 listRelease(c->reply);
1330 freeClientArgv(c);
1331 close(c->fd);
1332 ln = listSearchKey(server.clients,c);
1333 assert(ln != NULL);
1334 listDelNode(server.clients,ln);
1335 if (c->flags & REDIS_SLAVE) {
1336 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1337 close(c->repldbfd);
1338 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1339 ln = listSearchKey(l,c);
1340 assert(ln != NULL);
1341 listDelNode(l,ln);
1342 }
1343 if (c->flags & REDIS_MASTER) {
1344 server.master = NULL;
1345 server.replstate = REDIS_REPL_CONNECT;
1346 }
1347 zfree(c->argv);
1348 zfree(c->mbargv);
1349 zfree(c);
1350 }
1351
1352 #define GLUEREPLY_UP_TO (1024)
1353 static void glueReplyBuffersIfNeeded(redisClient *c) {
1354 int totlen = 0;
1355 listNode *ln;
1356 robj *o;
1357
1358 listRewind(c->reply);
1359 while((ln = listYield(c->reply))) {
1360 o = ln->value;
1361 totlen += sdslen(o->ptr);
1362 /* This optimization makes more sense if we don't have to copy
1363 * too much data */
1364 if (totlen > GLUEREPLY_UP_TO) return;
1365 }
1366 if (totlen > 0) {
1367 char buf[GLUEREPLY_UP_TO];
1368 int copylen = 0;
1369
1370 listRewind(c->reply);
1371 while((ln = listYield(c->reply))) {
1372 o = ln->value;
1373 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1374 copylen += sdslen(o->ptr);
1375 listDelNode(c->reply,ln);
1376 }
1377 /* Now the output buffer is empty, add the new single element */
1378 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1379 listAddNodeTail(c->reply,o);
1380 }
1381 }
1382
1383 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1384 redisClient *c = privdata;
1385 int nwritten = 0, totwritten = 0, objlen;
1386 robj *o;
1387 REDIS_NOTUSED(el);
1388 REDIS_NOTUSED(mask);
1389
1390 if (server.glueoutputbuf && listLength(c->reply) > 1)
1391 glueReplyBuffersIfNeeded(c);
1392
1393 /* Use writev() if we have enough buffers to send */
1394 #if 0
1395 if (listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1396 !(c->flags & REDIS_MASTER))
1397 {
1398 sendReplyToClientWritev(el, fd, privdata, mask);
1399 return;
1400 }
1401 #endif
1402
1403 while(listLength(c->reply)) {
1404 o = listNodeValue(listFirst(c->reply));
1405 objlen = sdslen(o->ptr);
1406
1407 if (objlen == 0) {
1408 listDelNode(c->reply,listFirst(c->reply));
1409 continue;
1410 }
1411
1412 if (c->flags & REDIS_MASTER) {
1413 /* Don't reply to a master */
1414 nwritten = objlen - c->sentlen;
1415 } else {
1416 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1417 if (nwritten <= 0) break;
1418 }
1419 c->sentlen += nwritten;
1420 totwritten += nwritten;
1421 /* If we fully sent the object on head go to the next one */
1422 if (c->sentlen == objlen) {
1423 listDelNode(c->reply,listFirst(c->reply));
1424 c->sentlen = 0;
1425 }
1426 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1427 * bytes, in a single threaded server it's a good idea to serve
1428 * other clients as well, even if a very large request comes from
1429 * super fast link that is always able to accept data (in real world
1430 * scenario think about 'KEYS *' against the loopback interfae) */
1431 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1432 }
1433 if (nwritten == -1) {
1434 if (errno == EAGAIN) {
1435 nwritten = 0;
1436 } else {
1437 redisLog(REDIS_DEBUG,
1438 "Error writing to client: %s", strerror(errno));
1439 freeClient(c);
1440 return;
1441 }
1442 }
1443 if (totwritten > 0) c->lastinteraction = time(NULL);
1444 if (listLength(c->reply) == 0) {
1445 c->sentlen = 0;
1446 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1447 }
1448 }
1449
1450 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1451 {
1452 redisClient *c = privdata;
1453 int nwritten = 0, totwritten = 0, objlen, willwrite;
1454 robj *o;
1455 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1456 int offset, ion = 0;
1457 REDIS_NOTUSED(el);
1458 REDIS_NOTUSED(mask);
1459
1460 listNode *node;
1461 while (listLength(c->reply)) {
1462 offset = c->sentlen;
1463 ion = 0;
1464 willwrite = 0;
1465
1466 /* fill-in the iov[] array */
1467 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1468 o = listNodeValue(node);
1469 objlen = sdslen(o->ptr);
1470
1471 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1472 break;
1473
1474 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1475 break; /* no more iovecs */
1476
1477 iov[ion].iov_base = ((char*)o->ptr) + offset;
1478 iov[ion].iov_len = objlen - offset;
1479 willwrite += objlen - offset;
1480 offset = 0; /* just for the first item */
1481 ion++;
1482 }
1483
1484 if(willwrite == 0)
1485 break;
1486
1487 /* write all collected blocks at once */
1488 if((nwritten = writev(fd, iov, ion)) < 0) {
1489 if (errno != EAGAIN) {
1490 redisLog(REDIS_DEBUG,
1491 "Error writing to client: %s", strerror(errno));
1492 freeClient(c);
1493 return;
1494 }
1495 break;
1496 }
1497
1498 totwritten += nwritten;
1499 offset = c->sentlen;
1500
1501 /* remove written robjs from c->reply */
1502 while (nwritten && listLength(c->reply)) {
1503 o = listNodeValue(listFirst(c->reply));
1504 objlen = sdslen(o->ptr);
1505
1506 if(nwritten >= objlen - offset) {
1507 listDelNode(c->reply, listFirst(c->reply));
1508 nwritten -= objlen - offset;
1509 c->sentlen = 0;
1510 } else {
1511 /* partial write */
1512 c->sentlen += nwritten;
1513 break;
1514 }
1515 offset = 0;
1516 }
1517 }
1518
1519 if (totwritten > 0)
1520 c->lastinteraction = time(NULL);
1521
1522 if (listLength(c->reply) == 0) {
1523 c->sentlen = 0;
1524 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1525 }
1526 }
1527
1528 static struct redisCommand *lookupCommand(char *name) {
1529 int j = 0;
1530 while(cmdTable[j].name != NULL) {
1531 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1532 j++;
1533 }
1534 return NULL;
1535 }
1536
1537 /* resetClient prepare the client to process the next command */
1538 static void resetClient(redisClient *c) {
1539 freeClientArgv(c);
1540 c->bulklen = -1;
1541 c->multibulk = 0;
1542 }
1543
1544 /* If this function gets called we already read a whole
1545 * command, argments are in the client argv/argc fields.
1546 * processCommand() execute the command or prepare the
1547 * server for a bulk read from the client.
1548 *
1549 * If 1 is returned the client is still alive and valid and
1550 * and other operations can be performed by the caller. Otherwise
1551 * if 0 is returned the client was destroied (i.e. after QUIT). */
1552 static int processCommand(redisClient *c) {
1553 struct redisCommand *cmd;
1554 long long dirty;
1555
1556 /* Free some memory if needed (maxmemory setting) */
1557 if (server.maxmemory) freeMemoryIfNeeded();
1558
1559 /* Handle the multi bulk command type. This is an alternative protocol
1560 * supported by Redis in order to receive commands that are composed of
1561 * multiple binary-safe "bulk" arguments. The latency of processing is
1562 * a bit higher but this allows things like multi-sets, so if this
1563 * protocol is used only for MSET and similar commands this is a big win. */
1564 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1565 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1566 if (c->multibulk <= 0) {
1567 resetClient(c);
1568 return 1;
1569 } else {
1570 decrRefCount(c->argv[c->argc-1]);
1571 c->argc--;
1572 return 1;
1573 }
1574 } else if (c->multibulk) {
1575 if (c->bulklen == -1) {
1576 if (((char*)c->argv[0]->ptr)[0] != '$') {
1577 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1578 resetClient(c);
1579 return 1;
1580 } else {
1581 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1582 decrRefCount(c->argv[0]);
1583 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1584 c->argc--;
1585 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1586 resetClient(c);
1587 return 1;
1588 }
1589 c->argc--;
1590 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1591 return 1;
1592 }
1593 } else {
1594 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1595 c->mbargv[c->mbargc] = c->argv[0];
1596 c->mbargc++;
1597 c->argc--;
1598 c->multibulk--;
1599 if (c->multibulk == 0) {
1600 robj **auxargv;
1601 int auxargc;
1602
1603 /* Here we need to swap the multi-bulk argc/argv with the
1604 * normal argc/argv of the client structure. */
1605 auxargv = c->argv;
1606 c->argv = c->mbargv;
1607 c->mbargv = auxargv;
1608
1609 auxargc = c->argc;
1610 c->argc = c->mbargc;
1611 c->mbargc = auxargc;
1612
1613 /* We need to set bulklen to something different than -1
1614 * in order for the code below to process the command without
1615 * to try to read the last argument of a bulk command as
1616 * a special argument. */
1617 c->bulklen = 0;
1618 /* continue below and process the command */
1619 } else {
1620 c->bulklen = -1;
1621 return 1;
1622 }
1623 }
1624 }
1625 /* -- end of multi bulk commands processing -- */
1626
1627 /* The QUIT command is handled as a special case. Normal command
1628 * procs are unable to close the client connection safely */
1629 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1630 freeClient(c);
1631 return 0;
1632 }
1633 cmd = lookupCommand(c->argv[0]->ptr);
1634 if (!cmd) {
1635 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1636 resetClient(c);
1637 return 1;
1638 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1639 (c->argc < -cmd->arity)) {
1640 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1641 resetClient(c);
1642 return 1;
1643 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1644 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1645 resetClient(c);
1646 return 1;
1647 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1648 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1649
1650 decrRefCount(c->argv[c->argc-1]);
1651 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1652 c->argc--;
1653 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1654 resetClient(c);
1655 return 1;
1656 }
1657 c->argc--;
1658 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1659 /* It is possible that the bulk read is already in the
1660 * buffer. Check this condition and handle it accordingly.
1661 * This is just a fast path, alternative to call processInputBuffer().
1662 * It's a good idea since the code is small and this condition
1663 * happens most of the times. */
1664 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1665 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1666 c->argc++;
1667 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1668 } else {
1669 return 1;
1670 }
1671 }
1672 /* Let's try to share objects on the command arguments vector */
1673 if (server.shareobjects) {
1674 int j;
1675 for(j = 1; j < c->argc; j++)
1676 c->argv[j] = tryObjectSharing(c->argv[j]);
1677 }
1678 /* Let's try to encode the bulk object to save space. */
1679 if (cmd->flags & REDIS_CMD_BULK)
1680 tryObjectEncoding(c->argv[c->argc-1]);
1681
1682 /* Check if the user is authenticated */
1683 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1684 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1685 resetClient(c);
1686 return 1;
1687 }
1688
1689 /* Exec the command */
1690 dirty = server.dirty;
1691 cmd->proc(c);
1692 if (server.appendonly && server.dirty-dirty)
1693 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1694 if (server.dirty-dirty && listLength(server.slaves))
1695 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1696 if (listLength(server.monitors))
1697 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1698 server.stat_numcommands++;
1699
1700 /* Prepare the client for the next command */
1701 if (c->flags & REDIS_CLOSE) {
1702 freeClient(c);
1703 return 0;
1704 }
1705 resetClient(c);
1706 return 1;
1707 }
1708
1709 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1710 listNode *ln;
1711 int outc = 0, j;
1712 robj **outv;
1713 /* (args*2)+1 is enough room for args, spaces, newlines */
1714 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1715
1716 if (argc <= REDIS_STATIC_ARGS) {
1717 outv = static_outv;
1718 } else {
1719 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1720 }
1721
1722 for (j = 0; j < argc; j++) {
1723 if (j != 0) outv[outc++] = shared.space;
1724 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1725 robj *lenobj;
1726
1727 lenobj = createObject(REDIS_STRING,
1728 sdscatprintf(sdsempty(),"%d\r\n",
1729 stringObjectLen(argv[j])));
1730 lenobj->refcount = 0;
1731 outv[outc++] = lenobj;
1732 }
1733 outv[outc++] = argv[j];
1734 }
1735 outv[outc++] = shared.crlf;
1736
1737 /* Increment all the refcounts at start and decrement at end in order to
1738 * be sure to free objects if there is no slave in a replication state
1739 * able to be feed with commands */
1740 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1741 listRewind(slaves);
1742 while((ln = listYield(slaves))) {
1743 redisClient *slave = ln->value;
1744
1745 /* Don't feed slaves that are still waiting for BGSAVE to start */
1746 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1747
1748 /* Feed all the other slaves, MONITORs and so on */
1749 if (slave->slaveseldb != dictid) {
1750 robj *selectcmd;
1751
1752 switch(dictid) {
1753 case 0: selectcmd = shared.select0; break;
1754 case 1: selectcmd = shared.select1; break;
1755 case 2: selectcmd = shared.select2; break;
1756 case 3: selectcmd = shared.select3; break;
1757 case 4: selectcmd = shared.select4; break;
1758 case 5: selectcmd = shared.select5; break;
1759 case 6: selectcmd = shared.select6; break;
1760 case 7: selectcmd = shared.select7; break;
1761 case 8: selectcmd = shared.select8; break;
1762 case 9: selectcmd = shared.select9; break;
1763 default:
1764 selectcmd = createObject(REDIS_STRING,
1765 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1766 selectcmd->refcount = 0;
1767 break;
1768 }
1769 addReply(slave,selectcmd);
1770 slave->slaveseldb = dictid;
1771 }
1772 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1773 }
1774 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1775 if (outv != static_outv) zfree(outv);
1776 }
1777
1778 static void processInputBuffer(redisClient *c) {
1779 again:
1780 if (c->bulklen == -1) {
1781 /* Read the first line of the query */
1782 char *p = strchr(c->querybuf,'\n');
1783 size_t querylen;
1784
1785 if (p) {
1786 sds query, *argv;
1787 int argc, j;
1788
1789 query = c->querybuf;
1790 c->querybuf = sdsempty();
1791 querylen = 1+(p-(query));
1792 if (sdslen(query) > querylen) {
1793 /* leave data after the first line of the query in the buffer */
1794 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1795 }
1796 *p = '\0'; /* remove "\n" */
1797 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1798 sdsupdatelen(query);
1799
1800 /* Now we can split the query in arguments */
1801 if (sdslen(query) == 0) {
1802 /* Ignore empty query */
1803 sdsfree(query);
1804 return;
1805 }
1806 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1807 sdsfree(query);
1808
1809 if (c->argv) zfree(c->argv);
1810 c->argv = zmalloc(sizeof(robj*)*argc);
1811
1812 for (j = 0; j < argc; j++) {
1813 if (sdslen(argv[j])) {
1814 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1815 c->argc++;
1816 } else {
1817 sdsfree(argv[j]);
1818 }
1819 }
1820 zfree(argv);
1821 /* Execute the command. If the client is still valid
1822 * after processCommand() return and there is something
1823 * on the query buffer try to process the next command. */
1824 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1825 return;
1826 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1827 redisLog(REDIS_DEBUG, "Client protocol error");
1828 freeClient(c);
1829 return;
1830 }
1831 } else {
1832 /* Bulk read handling. Note that if we are at this point
1833 the client already sent a command terminated with a newline,
1834 we are reading the bulk data that is actually the last
1835 argument of the command. */
1836 int qbl = sdslen(c->querybuf);
1837
1838 if (c->bulklen <= qbl) {
1839 /* Copy everything but the final CRLF as final argument */
1840 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1841 c->argc++;
1842 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1843 /* Process the command. If the client is still valid after
1844 * the processing and there is more data in the buffer
1845 * try to parse it. */
1846 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1847 return;
1848 }
1849 }
1850 }
1851
1852 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1853 redisClient *c = (redisClient*) privdata;
1854 char buf[REDIS_IOBUF_LEN];
1855 int nread;
1856 REDIS_NOTUSED(el);
1857 REDIS_NOTUSED(mask);
1858
1859 nread = read(fd, buf, REDIS_IOBUF_LEN);
1860 if (nread == -1) {
1861 if (errno == EAGAIN) {
1862 nread = 0;
1863 } else {
1864 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1865 freeClient(c);
1866 return;
1867 }
1868 } else if (nread == 0) {
1869 redisLog(REDIS_DEBUG, "Client closed connection");
1870 freeClient(c);
1871 return;
1872 }
1873 if (nread) {
1874 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1875 c->lastinteraction = time(NULL);
1876 } else {
1877 return;
1878 }
1879 processInputBuffer(c);
1880 }
1881
1882 static int selectDb(redisClient *c, int id) {
1883 if (id < 0 || id >= server.dbnum)
1884 return REDIS_ERR;
1885 c->db = &server.db[id];
1886 return REDIS_OK;
1887 }
1888
1889 static void *dupClientReplyValue(void *o) {
1890 incrRefCount((robj*)o);
1891 return 0;
1892 }
1893
1894 static redisClient *createClient(int fd) {
1895 redisClient *c = zmalloc(sizeof(*c));
1896
1897 anetNonBlock(NULL,fd);
1898 anetTcpNoDelay(NULL,fd);
1899 if (!c) return NULL;
1900 selectDb(c,0);
1901 c->fd = fd;
1902 c->querybuf = sdsempty();
1903 c->argc = 0;
1904 c->argv = NULL;
1905 c->bulklen = -1;
1906 c->multibulk = 0;
1907 c->mbargc = 0;
1908 c->mbargv = NULL;
1909 c->sentlen = 0;
1910 c->flags = 0;
1911 c->lastinteraction = time(NULL);
1912 c->authenticated = 0;
1913 c->replstate = REDIS_REPL_NONE;
1914 c->reply = listCreate();
1915 listSetFreeMethod(c->reply,decrRefCount);
1916 listSetDupMethod(c->reply,dupClientReplyValue);
1917 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1918 readQueryFromClient, c, NULL) == AE_ERR) {
1919 freeClient(c);
1920 return NULL;
1921 }
1922 listAddNodeTail(server.clients,c);
1923 return c;
1924 }
1925
1926 static void addReply(redisClient *c, robj *obj) {
1927 if (listLength(c->reply) == 0 &&
1928 (c->replstate == REDIS_REPL_NONE ||
1929 c->replstate == REDIS_REPL_ONLINE) &&
1930 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1931 sendReplyToClient, c, NULL) == AE_ERR) return;
1932 if (obj->encoding != REDIS_ENCODING_RAW) {
1933 obj = getDecodedObject(obj);
1934 } else {
1935 incrRefCount(obj);
1936 }
1937 listAddNodeTail(c->reply,obj);
1938 }
1939
1940 static void addReplySds(redisClient *c, sds s) {
1941 robj *o = createObject(REDIS_STRING,s);
1942 addReply(c,o);
1943 decrRefCount(o);
1944 }
1945
1946 static void addReplyBulkLen(redisClient *c, robj *obj) {
1947 size_t len;
1948
1949 if (obj->encoding == REDIS_ENCODING_RAW) {
1950 len = sdslen(obj->ptr);
1951 } else {
1952 long n = (long)obj->ptr;
1953
1954 len = 1;
1955 if (n < 0) {
1956 len++;
1957 n = -n;
1958 }
1959 while((n = n/10) != 0) {
1960 len++;
1961 }
1962 }
1963 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1964 }
1965
1966 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1967 int cport, cfd;
1968 char cip[128];
1969 redisClient *c;
1970 REDIS_NOTUSED(el);
1971 REDIS_NOTUSED(mask);
1972 REDIS_NOTUSED(privdata);
1973
1974 cfd = anetAccept(server.neterr, fd, cip, &cport);
1975 if (cfd == AE_ERR) {
1976 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1977 return;
1978 }
1979 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1980 if ((c = createClient(cfd)) == NULL) {
1981 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1982 close(cfd); /* May be already closed, just ingore errors */
1983 return;
1984 }
1985 /* If maxclient directive is set and this is one client more... close the
1986 * connection. Note that we create the client instead to check before
1987 * for this condition, since now the socket is already set in nonblocking
1988 * mode and we can send an error for free using the Kernel I/O */
1989 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1990 char *err = "-ERR max number of clients reached\r\n";
1991
1992 /* That's a best effort error message, don't check write errors */
1993 if (write(c->fd,err,strlen(err)) == -1) {
1994 /* Nothing to do, Just to avoid the warning... */
1995 }
1996 freeClient(c);
1997 return;
1998 }
1999 server.stat_numconnections++;
2000 }
2001
2002 /* ======================= Redis objects implementation ===================== */
2003
2004 static robj *createObject(int type, void *ptr) {
2005 robj *o;
2006
2007 if (listLength(server.objfreelist)) {
2008 listNode *head = listFirst(server.objfreelist);
2009 o = listNodeValue(head);
2010 listDelNode(server.objfreelist,head);
2011 } else {
2012 o = zmalloc(sizeof(*o));
2013 }
2014 o->type = type;
2015 o->encoding = REDIS_ENCODING_RAW;
2016 o->ptr = ptr;
2017 o->refcount = 1;
2018 return o;
2019 }
2020
2021 static robj *createStringObject(char *ptr, size_t len) {
2022 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2023 }
2024
2025 static robj *createListObject(void) {
2026 list *l = listCreate();
2027
2028 listSetFreeMethod(l,decrRefCount);
2029 return createObject(REDIS_LIST,l);
2030 }
2031
2032 static robj *createSetObject(void) {
2033 dict *d = dictCreate(&setDictType,NULL);
2034 return createObject(REDIS_SET,d);
2035 }
2036
2037 static robj *createZsetObject(void) {
2038 zset *zs = zmalloc(sizeof(*zs));
2039
2040 zs->dict = dictCreate(&zsetDictType,NULL);
2041 zs->zsl = zslCreate();
2042 return createObject(REDIS_ZSET,zs);
2043 }
2044
2045 static void freeStringObject(robj *o) {
2046 if (o->encoding == REDIS_ENCODING_RAW) {
2047 sdsfree(o->ptr);
2048 }
2049 }
2050
2051 static void freeListObject(robj *o) {
2052 listRelease((list*) o->ptr);
2053 }
2054
2055 static void freeSetObject(robj *o) {
2056 dictRelease((dict*) o->ptr);
2057 }
2058
2059 static void freeZsetObject(robj *o) {
2060 zset *zs = o->ptr;
2061
2062 dictRelease(zs->dict);
2063 zslFree(zs->zsl);
2064 zfree(zs);
2065 }
2066
2067 static void freeHashObject(robj *o) {
2068 dictRelease((dict*) o->ptr);
2069 }
2070
2071 static void incrRefCount(robj *o) {
2072 o->refcount++;
2073 #ifdef DEBUG_REFCOUNT
2074 if (o->type == REDIS_STRING)
2075 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2076 #endif
2077 }
2078
2079 static void decrRefCount(void *obj) {
2080 robj *o = obj;
2081
2082 #ifdef DEBUG_REFCOUNT
2083 if (o->type == REDIS_STRING)
2084 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2085 #endif
2086 if (--(o->refcount) == 0) {
2087 switch(o->type) {
2088 case REDIS_STRING: freeStringObject(o); break;
2089 case REDIS_LIST: freeListObject(o); break;
2090 case REDIS_SET: freeSetObject(o); break;
2091 case REDIS_ZSET: freeZsetObject(o); break;
2092 case REDIS_HASH: freeHashObject(o); break;
2093 default: assert(0 != 0); break;
2094 }
2095 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2096 !listAddNodeHead(server.objfreelist,o))
2097 zfree(o);
2098 }
2099 }
2100
2101 static robj *lookupKey(redisDb *db, robj *key) {
2102 dictEntry *de = dictFind(db->dict,key);
2103 return de ? dictGetEntryVal(de) : NULL;
2104 }
2105
2106 static robj *lookupKeyRead(redisDb *db, robj *key) {
2107 expireIfNeeded(db,key);
2108 return lookupKey(db,key);
2109 }
2110
2111 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2112 deleteIfVolatile(db,key);
2113 return lookupKey(db,key);
2114 }
2115
2116 static int deleteKey(redisDb *db, robj *key) {
2117 int retval;
2118
2119 /* We need to protect key from destruction: after the first dictDelete()
2120 * it may happen that 'key' is no longer valid if we don't increment
2121 * it's count. This may happen when we get the object reference directly
2122 * from the hash table with dictRandomKey() or dict iterators */
2123 incrRefCount(key);
2124 if (dictSize(db->expires)) dictDelete(db->expires,key);
2125 retval = dictDelete(db->dict,key);
2126 decrRefCount(key);
2127
2128 return retval == DICT_OK;
2129 }
2130
2131 /* Try to share an object against the shared objects pool */
2132 static robj *tryObjectSharing(robj *o) {
2133 struct dictEntry *de;
2134 unsigned long c;
2135
2136 if (o == NULL || server.shareobjects == 0) return o;
2137
2138 assert(o->type == REDIS_STRING);
2139 de = dictFind(server.sharingpool,o);
2140 if (de) {
2141 robj *shared = dictGetEntryKey(de);
2142
2143 c = ((unsigned long) dictGetEntryVal(de))+1;
2144 dictGetEntryVal(de) = (void*) c;
2145 incrRefCount(shared);
2146 decrRefCount(o);
2147 return shared;
2148 } else {
2149 /* Here we are using a stream algorihtm: Every time an object is
2150 * shared we increment its count, everytime there is a miss we
2151 * recrement the counter of a random object. If this object reaches
2152 * zero we remove the object and put the current object instead. */
2153 if (dictSize(server.sharingpool) >=
2154 server.sharingpoolsize) {
2155 de = dictGetRandomKey(server.sharingpool);
2156 assert(de != NULL);
2157 c = ((unsigned long) dictGetEntryVal(de))-1;
2158 dictGetEntryVal(de) = (void*) c;
2159 if (c == 0) {
2160 dictDelete(server.sharingpool,de->key);
2161 }
2162 } else {
2163 c = 0; /* If the pool is empty we want to add this object */
2164 }
2165 if (c == 0) {
2166 int retval;
2167
2168 retval = dictAdd(server.sharingpool,o,(void*)1);
2169 assert(retval == DICT_OK);
2170 incrRefCount(o);
2171 }
2172 return o;
2173 }
2174 }
2175
2176 /* Check if the nul-terminated string 's' can be represented by a long
2177 * (that is, is a number that fits into long without any other space or
2178 * character before or after the digits).
2179 *
2180 * If so, the function returns REDIS_OK and *longval is set to the value
2181 * of the number. Otherwise REDIS_ERR is returned */
2182 static int isStringRepresentableAsLong(sds s, long *longval) {
2183 char buf[32], *endptr;
2184 long value;
2185 int slen;
2186
2187 value = strtol(s, &endptr, 10);
2188 if (endptr[0] != '\0') return REDIS_ERR;
2189 slen = snprintf(buf,32,"%ld",value);
2190
2191 /* If the number converted back into a string is not identical
2192 * then it's not possible to encode the string as integer */
2193 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2194 if (longval) *longval = value;
2195 return REDIS_OK;
2196 }
2197
2198 /* Try to encode a string object in order to save space */
2199 static int tryObjectEncoding(robj *o) {
2200 long value;
2201 sds s = o->ptr;
2202
2203 if (o->encoding != REDIS_ENCODING_RAW)
2204 return REDIS_ERR; /* Already encoded */
2205
2206 /* It's not save to encode shared objects: shared objects can be shared
2207 * everywhere in the "object space" of Redis. Encoded objects can only
2208 * appear as "values" (and not, for instance, as keys) */
2209 if (o->refcount > 1) return REDIS_ERR;
2210
2211 /* Currently we try to encode only strings */
2212 assert(o->type == REDIS_STRING);
2213
2214 /* Check if we can represent this string as a long integer */
2215 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2216
2217 /* Ok, this object can be encoded */
2218 o->encoding = REDIS_ENCODING_INT;
2219 sdsfree(o->ptr);
2220 o->ptr = (void*) value;
2221 return REDIS_OK;
2222 }
2223
2224 /* Get a decoded version of an encoded object (returned as a new object) */
2225 static robj *getDecodedObject(const robj *o) {
2226 robj *dec;
2227
2228 assert(o->encoding != REDIS_ENCODING_RAW);
2229 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2230 char buf[32];
2231
2232 snprintf(buf,32,"%ld",(long)o->ptr);
2233 dec = createStringObject(buf,strlen(buf));
2234 return dec;
2235 } else {
2236 assert(1 != 1);
2237 }
2238 }
2239
2240 /* Compare two string objects via strcmp() or alike.
2241 * Note that the objects may be integer-encoded. In such a case we
2242 * use snprintf() to get a string representation of the numbers on the stack
2243 * and compare the strings, it's much faster than calling getDecodedObject(). */
2244 static int compareStringObjects(robj *a, robj *b) {
2245 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2246 char bufa[128], bufb[128], *astr, *bstr;
2247 int bothsds = 1;
2248
2249 if (a == b) return 0;
2250 if (a->encoding != REDIS_ENCODING_RAW) {
2251 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2252 astr = bufa;
2253 bothsds = 0;
2254 } else {
2255 astr = a->ptr;
2256 }
2257 if (b->encoding != REDIS_ENCODING_RAW) {
2258 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2259 bstr = bufb;
2260 bothsds = 0;
2261 } else {
2262 bstr = b->ptr;
2263 }
2264 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2265 }
2266
2267 static size_t stringObjectLen(robj *o) {
2268 assert(o->type == REDIS_STRING);
2269 if (o->encoding == REDIS_ENCODING_RAW) {
2270 return sdslen(o->ptr);
2271 } else {
2272 char buf[32];
2273
2274 return snprintf(buf,32,"%ld",(long)o->ptr);
2275 }
2276 }
2277
2278 /*============================ DB saving/loading ============================ */
2279
2280 static int rdbSaveType(FILE *fp, unsigned char type) {
2281 if (fwrite(&type,1,1,fp) == 0) return -1;
2282 return 0;
2283 }
2284
2285 static int rdbSaveTime(FILE *fp, time_t t) {
2286 int32_t t32 = (int32_t) t;
2287 if (fwrite(&t32,4,1,fp) == 0) return -1;
2288 return 0;
2289 }
2290
2291 /* check rdbLoadLen() comments for more info */
2292 static int rdbSaveLen(FILE *fp, uint32_t len) {
2293 unsigned char buf[2];
2294
2295 if (len < (1<<6)) {
2296 /* Save a 6 bit len */
2297 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2298 if (fwrite(buf,1,1,fp) == 0) return -1;
2299 } else if (len < (1<<14)) {
2300 /* Save a 14 bit len */
2301 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2302 buf[1] = len&0xFF;
2303 if (fwrite(buf,2,1,fp) == 0) return -1;
2304 } else {
2305 /* Save a 32 bit len */
2306 buf[0] = (REDIS_RDB_32BITLEN<<6);
2307 if (fwrite(buf,1,1,fp) == 0) return -1;
2308 len = htonl(len);
2309 if (fwrite(&len,4,1,fp) == 0) return -1;
2310 }
2311 return 0;
2312 }
2313
2314 /* String objects in the form "2391" "-100" without any space and with a
2315 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2316 * encoded as integers to save space */
2317 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2318 long long value;
2319 char *endptr, buf[32];
2320
2321 /* Check if it's possible to encode this value as a number */
2322 value = strtoll(s, &endptr, 10);
2323 if (endptr[0] != '\0') return 0;
2324 snprintf(buf,32,"%lld",value);
2325
2326 /* If the number converted back into a string is not identical
2327 * then it's not possible to encode the string as integer */
2328 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2329
2330 /* Finally check if it fits in our ranges */
2331 if (value >= -(1<<7) && value <= (1<<7)-1) {
2332 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2333 enc[1] = value&0xFF;
2334 return 2;
2335 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2336 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2337 enc[1] = value&0xFF;
2338 enc[2] = (value>>8)&0xFF;
2339 return 3;
2340 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2341 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2342 enc[1] = value&0xFF;
2343 enc[2] = (value>>8)&0xFF;
2344 enc[3] = (value>>16)&0xFF;
2345 enc[4] = (value>>24)&0xFF;
2346 return 5;
2347 } else {
2348 return 0;
2349 }
2350 }
2351
2352 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2353 unsigned int comprlen, outlen;
2354 unsigned char byte;
2355 void *out;
2356
2357 /* We require at least four bytes compression for this to be worth it */
2358 outlen = sdslen(obj->ptr)-4;
2359 if (outlen <= 0) return 0;
2360 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2361 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2362 if (comprlen == 0) {
2363 zfree(out);
2364 return 0;
2365 }
2366 /* Data compressed! Let's save it on disk */
2367 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2368 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2369 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2370 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2371 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2372 zfree(out);
2373 return comprlen;
2374
2375 writeerr:
2376 zfree(out);
2377 return -1;
2378 }
2379
2380 /* Save a string objet as [len][data] on disk. If the object is a string
2381 * representation of an integer value we try to safe it in a special form */
2382 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2383 size_t len;
2384 int enclen;
2385
2386 len = sdslen(obj->ptr);
2387
2388 /* Try integer encoding */
2389 if (len <= 11) {
2390 unsigned char buf[5];
2391 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2392 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2393 return 0;
2394 }
2395 }
2396
2397 /* Try LZF compression - under 20 bytes it's unable to compress even
2398 * aaaaaaaaaaaaaaaaaa so skip it */
2399 if (len > 20) {
2400 int retval;
2401
2402 retval = rdbSaveLzfStringObject(fp,obj);
2403 if (retval == -1) return -1;
2404 if (retval > 0) return 0;
2405 /* retval == 0 means data can't be compressed, save the old way */
2406 }
2407
2408 /* Store verbatim */
2409 if (rdbSaveLen(fp,len) == -1) return -1;
2410 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2411 return 0;
2412 }
2413
2414 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2415 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2416 int retval;
2417 robj *dec;
2418
2419 if (obj->encoding != REDIS_ENCODING_RAW) {
2420 dec = getDecodedObject(obj);
2421 retval = rdbSaveStringObjectRaw(fp,dec);
2422 decrRefCount(dec);
2423 return retval;
2424 } else {
2425 return rdbSaveStringObjectRaw(fp,obj);
2426 }
2427 }
2428
2429 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2430 * 8 bit integer specifing the length of the representation.
2431 * This 8 bit integer has special values in order to specify the following
2432 * conditions:
2433 * 253: not a number
2434 * 254: + inf
2435 * 255: - inf
2436 */
2437 static int rdbSaveDoubleValue(FILE *fp, double val) {
2438 unsigned char buf[128];
2439 int len;
2440
2441 if (isnan(val)) {
2442 buf[0] = 253;
2443 len = 1;
2444 } else if (!isfinite(val)) {
2445 len = 1;
2446 buf[0] = (val < 0) ? 255 : 254;
2447 } else {
2448 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2449 buf[0] = strlen((char*)buf);
2450 len = buf[0]+1;
2451 }
2452 if (fwrite(buf,len,1,fp) == 0) return -1;
2453 return 0;
2454 }
2455
2456 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2457 static int rdbSave(char *filename) {
2458 dictIterator *di = NULL;
2459 dictEntry *de;
2460 FILE *fp;
2461 char tmpfile[256];
2462 int j;
2463 time_t now = time(NULL);
2464
2465 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2466 fp = fopen(tmpfile,"w");
2467 if (!fp) {
2468 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2469 return REDIS_ERR;
2470 }
2471 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2472 for (j = 0; j < server.dbnum; j++) {
2473 redisDb *db = server.db+j;
2474 dict *d = db->dict;
2475 if (dictSize(d) == 0) continue;
2476 di = dictGetIterator(d);
2477 if (!di) {
2478 fclose(fp);
2479 return REDIS_ERR;
2480 }
2481
2482 /* Write the SELECT DB opcode */
2483 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2484 if (rdbSaveLen(fp,j) == -1) goto werr;
2485
2486 /* Iterate this DB writing every entry */
2487 while((de = dictNext(di)) != NULL) {
2488 robj *key = dictGetEntryKey(de);
2489 robj *o = dictGetEntryVal(de);
2490 time_t expiretime = getExpire(db,key);
2491
2492 /* Save the expire time */
2493 if (expiretime != -1) {
2494 /* If this key is already expired skip it */
2495 if (expiretime < now) continue;
2496 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2497 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2498 }
2499 /* Save the key and associated value */
2500 if (rdbSaveType(fp,o->type) == -1) goto werr;
2501 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2502 if (o->type == REDIS_STRING) {
2503 /* Save a string value */
2504 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2505 } else if (o->type == REDIS_LIST) {
2506 /* Save a list value */
2507 list *list = o->ptr;
2508 listNode *ln;
2509
2510 listRewind(list);
2511 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2512 while((ln = listYield(list))) {
2513 robj *eleobj = listNodeValue(ln);
2514
2515 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2516 }
2517 } else if (o->type == REDIS_SET) {
2518 /* Save a set value */
2519 dict *set = o->ptr;
2520 dictIterator *di = dictGetIterator(set);
2521 dictEntry *de;
2522
2523 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2524 while((de = dictNext(di)) != NULL) {
2525 robj *eleobj = dictGetEntryKey(de);
2526
2527 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2528 }
2529 dictReleaseIterator(di);
2530 } else if (o->type == REDIS_ZSET) {
2531 /* Save a set value */
2532 zset *zs = o->ptr;
2533 dictIterator *di = dictGetIterator(zs->dict);
2534 dictEntry *de;
2535
2536 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2537 while((de = dictNext(di)) != NULL) {
2538 robj *eleobj = dictGetEntryKey(de);
2539 double *score = dictGetEntryVal(de);
2540
2541 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2542 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2543 }
2544 dictReleaseIterator(di);
2545 } else {
2546 assert(0 != 0);
2547 }
2548 }
2549 dictReleaseIterator(di);
2550 }
2551 /* EOF opcode */
2552 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2553
2554 /* Make sure data will not remain on the OS's output buffers */
2555 fflush(fp);
2556 fsync(fileno(fp));
2557 fclose(fp);
2558
2559 /* Use RENAME to make sure the DB file is changed atomically only
2560 * if the generate DB file is ok. */
2561 if (rename(tmpfile,filename) == -1) {
2562 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2563 unlink(tmpfile);
2564 return REDIS_ERR;
2565 }
2566 redisLog(REDIS_NOTICE,"DB saved on disk");
2567 server.dirty = 0;
2568 server.lastsave = time(NULL);
2569 return REDIS_OK;
2570
2571 werr:
2572 fclose(fp);
2573 unlink(tmpfile);
2574 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2575 if (di) dictReleaseIterator(di);
2576 return REDIS_ERR;
2577 }
2578
2579 static int rdbSaveBackground(char *filename) {
2580 pid_t childpid;
2581
2582 if (server.bgsaveinprogress) return REDIS_ERR;
2583 if ((childpid = fork()) == 0) {
2584 /* Child */
2585 close(server.fd);
2586 if (rdbSave(filename) == REDIS_OK) {
2587 exit(0);
2588 } else {
2589 exit(1);
2590 }
2591 } else {
2592 /* Parent */
2593 if (childpid == -1) {
2594 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2595 strerror(errno));
2596 return REDIS_ERR;
2597 }
2598 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2599 server.bgsaveinprogress = 1;
2600 server.bgsavechildpid = childpid;
2601 return REDIS_OK;
2602 }
2603 return REDIS_OK; /* unreached */
2604 }
2605
2606 static void rdbRemoveTempFile(pid_t childpid) {
2607 char tmpfile[256];
2608
2609 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2610 unlink(tmpfile);
2611 }
2612
2613 static int rdbLoadType(FILE *fp) {
2614 unsigned char type;
2615 if (fread(&type,1,1,fp) == 0) return -1;
2616 return type;
2617 }
2618
2619 static time_t rdbLoadTime(FILE *fp) {
2620 int32_t t32;
2621 if (fread(&t32,4,1,fp) == 0) return -1;
2622 return (time_t) t32;
2623 }
2624
2625 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2626 * of this file for a description of how this are stored on disk.
2627 *
2628 * isencoded is set to 1 if the readed length is not actually a length but
2629 * an "encoding type", check the above comments for more info */
2630 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2631 unsigned char buf[2];
2632 uint32_t len;
2633
2634 if (isencoded) *isencoded = 0;
2635 if (rdbver == 0) {
2636 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2637 return ntohl(len);
2638 } else {
2639 int type;
2640
2641 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2642 type = (buf[0]&0xC0)>>6;
2643 if (type == REDIS_RDB_6BITLEN) {
2644 /* Read a 6 bit len */
2645 return buf[0]&0x3F;
2646 } else if (type == REDIS_RDB_ENCVAL) {
2647 /* Read a 6 bit len encoding type */
2648 if (isencoded) *isencoded = 1;
2649 return buf[0]&0x3F;
2650 } else if (type == REDIS_RDB_14BITLEN) {
2651 /* Read a 14 bit len */
2652 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2653 return ((buf[0]&0x3F)<<8)|buf[1];
2654 } else {
2655 /* Read a 32 bit len */
2656 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2657 return ntohl(len);
2658 }
2659 }
2660 }
2661
2662 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2663 unsigned char enc[4];
2664 long long val;
2665
2666 if (enctype == REDIS_RDB_ENC_INT8) {
2667 if (fread(enc,1,1,fp) == 0) return NULL;
2668 val = (signed char)enc[0];
2669 } else if (enctype == REDIS_RDB_ENC_INT16) {
2670 uint16_t v;
2671 if (fread(enc,2,1,fp) == 0) return NULL;
2672 v = enc[0]|(enc[1]<<8);
2673 val = (int16_t)v;
2674 } else if (enctype == REDIS_RDB_ENC_INT32) {
2675 uint32_t v;
2676 if (fread(enc,4,1,fp) == 0) return NULL;
2677 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2678 val = (int32_t)v;
2679 } else {
2680 val = 0; /* anti-warning */
2681 assert(0!=0);
2682 }
2683 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2684 }
2685
2686 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2687 unsigned int len, clen;
2688 unsigned char *c = NULL;
2689 sds val = NULL;
2690
2691 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2692 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2693 if ((c = zmalloc(clen)) == NULL) goto err;
2694 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2695 if (fread(c,clen,1,fp) == 0) goto err;
2696 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2697 zfree(c);
2698 return createObject(REDIS_STRING,val);
2699 err:
2700 zfree(c);
2701 sdsfree(val);
2702 return NULL;
2703 }
2704
2705 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2706 int isencoded;
2707 uint32_t len;
2708 sds val;
2709
2710 len = rdbLoadLen(fp,rdbver,&isencoded);
2711 if (isencoded) {
2712 switch(len) {
2713 case REDIS_RDB_ENC_INT8:
2714 case REDIS_RDB_ENC_INT16:
2715 case REDIS_RDB_ENC_INT32:
2716 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2717 case REDIS_RDB_ENC_LZF:
2718 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2719 default:
2720 assert(0!=0);
2721 }
2722 }
2723
2724 if (len == REDIS_RDB_LENERR) return NULL;
2725 val = sdsnewlen(NULL,len);
2726 if (len && fread(val,len,1,fp) == 0) {
2727 sdsfree(val);
2728 return NULL;
2729 }
2730 return tryObjectSharing(createObject(REDIS_STRING,val));
2731 }
2732
2733 /* For information about double serialization check rdbSaveDoubleValue() */
2734 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2735 char buf[128];
2736 unsigned char len;
2737
2738 if (fread(&len,1,1,fp) == 0) return -1;
2739 switch(len) {
2740 case 255: *val = R_NegInf; return 0;
2741 case 254: *val = R_PosInf; return 0;
2742 case 253: *val = R_Nan; return 0;
2743 default:
2744 if (fread(buf,len,1,fp) == 0) return -1;
2745 sscanf(buf, "%lg", val);
2746 return 0;
2747 }
2748 }
2749
2750 static int rdbLoad(char *filename) {
2751 FILE *fp;
2752 robj *keyobj = NULL;
2753 uint32_t dbid;
2754 int type, retval, rdbver;
2755 dict *d = server.db[0].dict;
2756 redisDb *db = server.db+0;
2757 char buf[1024];
2758 time_t expiretime = -1, now = time(NULL);
2759
2760 fp = fopen(filename,"r");
2761 if (!fp) return REDIS_ERR;
2762 if (fread(buf,9,1,fp) == 0) goto eoferr;
2763 buf[9] = '\0';
2764 if (memcmp(buf,"REDIS",5) != 0) {
2765 fclose(fp);
2766 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2767 return REDIS_ERR;
2768 }
2769 rdbver = atoi(buf+5);
2770 if (rdbver > 1) {
2771 fclose(fp);
2772 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2773 return REDIS_ERR;
2774 }
2775 while(1) {
2776 robj *o;
2777
2778 /* Read type. */
2779 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2780 if (type == REDIS_EXPIRETIME) {
2781 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2782 /* We read the time so we need to read the object type again */
2783 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2784 }
2785 if (type == REDIS_EOF) break;
2786 /* Handle SELECT DB opcode as a special case */
2787 if (type == REDIS_SELECTDB) {
2788 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2789 goto eoferr;
2790 if (dbid >= (unsigned)server.dbnum) {
2791 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2792 exit(1);
2793 }
2794 db = server.db+dbid;
2795 d = db->dict;
2796 continue;
2797 }
2798 /* Read key */
2799 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2800
2801 if (type == REDIS_STRING) {
2802 /* Read string value */
2803 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2804 tryObjectEncoding(o);
2805 } else if (type == REDIS_LIST || type == REDIS_SET) {
2806 /* Read list/set value */
2807 uint32_t listlen;
2808
2809 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2810 goto eoferr;
2811 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2812 /* Load every single element of the list/set */
2813 while(listlen--) {
2814 robj *ele;
2815
2816 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2817 tryObjectEncoding(ele);
2818 if (type == REDIS_LIST) {
2819 listAddNodeTail((list*)o->ptr,ele);
2820 } else {
2821 dictAdd((dict*)o->ptr,ele,NULL);
2822 }
2823 }
2824 } else if (type == REDIS_ZSET) {
2825 /* Read list/set value */
2826 uint32_t zsetlen;
2827 zset *zs;
2828
2829 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2830 goto eoferr;
2831 o = createZsetObject();
2832 zs = o->ptr;
2833 /* Load every single element of the list/set */
2834 while(zsetlen--) {
2835 robj *ele;
2836 double *score = zmalloc(sizeof(double));
2837
2838 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2839 tryObjectEncoding(ele);
2840 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2841 dictAdd(zs->dict,ele,score);
2842 zslInsert(zs->zsl,*score,ele);
2843 incrRefCount(ele); /* added to skiplist */
2844 }
2845 } else {
2846 assert(0 != 0);
2847 }
2848 /* Add the new object in the hash table */
2849 retval = dictAdd(d,keyobj,o);
2850 if (retval == DICT_ERR) {
2851 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2852 exit(1);
2853 }
2854 /* Set the expire time if needed */
2855 if (expiretime != -1) {
2856 setExpire(db,keyobj,expiretime);
2857 /* Delete this key if already expired */
2858 if (expiretime < now) deleteKey(db,keyobj);
2859 expiretime = -1;
2860 }
2861 keyobj = o = NULL;
2862 }
2863 fclose(fp);
2864 return REDIS_OK;
2865
2866 eoferr: /* unexpected end of file is handled here with a fatal exit */
2867 if (keyobj) decrRefCount(keyobj);
2868 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2869 exit(1);
2870 return REDIS_ERR; /* Just to avoid warning */
2871 }
2872
2873 /*================================== Commands =============================== */
2874
2875 static void authCommand(redisClient *c) {
2876 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2877 c->authenticated = 1;
2878 addReply(c,shared.ok);
2879 } else {
2880 c->authenticated = 0;
2881 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2882 }
2883 }
2884
2885 static void pingCommand(redisClient *c) {
2886 addReply(c,shared.pong);
2887 }
2888
2889 static void echoCommand(redisClient *c) {
2890 addReplyBulkLen(c,c->argv[1]);
2891 addReply(c,c->argv[1]);
2892 addReply(c,shared.crlf);
2893 }
2894
2895 /*=================================== Strings =============================== */
2896
2897 static void setGenericCommand(redisClient *c, int nx) {
2898 int retval;
2899
2900 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2901 if (retval == DICT_ERR) {
2902 if (!nx) {
2903 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2904 incrRefCount(c->argv[2]);
2905 } else {
2906 addReply(c,shared.czero);
2907 return;
2908 }
2909 } else {
2910 incrRefCount(c->argv[1]);
2911 incrRefCount(c->argv[2]);
2912 }
2913 server.dirty++;
2914 removeExpire(c->db,c->argv[1]);
2915 addReply(c, nx ? shared.cone : shared.ok);
2916 }
2917
2918 static void setCommand(redisClient *c) {
2919 setGenericCommand(c,0);
2920 }
2921
2922 static void setnxCommand(redisClient *c) {
2923 setGenericCommand(c,1);
2924 }
2925
2926 static void getCommand(redisClient *c) {
2927 robj *o = lookupKeyRead(c->db,c->argv[1]);
2928
2929 if (o == NULL) {
2930 addReply(c,shared.nullbulk);
2931 } else {
2932 if (o->type != REDIS_STRING) {
2933 addReply(c,shared.wrongtypeerr);
2934 } else {
2935 addReplyBulkLen(c,o);
2936 addReply(c,o);
2937 addReply(c,shared.crlf);
2938 }
2939 }
2940 }
2941
2942 static void getsetCommand(redisClient *c) {
2943 getCommand(c);
2944 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2945 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2946 } else {
2947 incrRefCount(c->argv[1]);
2948 }
2949 incrRefCount(c->argv[2]);
2950 server.dirty++;
2951 removeExpire(c->db,c->argv[1]);
2952 }
2953
2954 static void mgetCommand(redisClient *c) {
2955 int j;
2956
2957 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2958 for (j = 1; j < c->argc; j++) {
2959 robj *o = lookupKeyRead(c->db,c->argv[j]);
2960 if (o == NULL) {
2961 addReply(c,shared.nullbulk);
2962 } else {
2963 if (o->type != REDIS_STRING) {
2964 addReply(c,shared.nullbulk);
2965 } else {
2966 addReplyBulkLen(c,o);
2967 addReply(c,o);
2968 addReply(c,shared.crlf);
2969 }
2970 }
2971 }
2972 }
2973
2974 static void incrDecrCommand(redisClient *c, long long incr) {
2975 long long value;
2976 int retval;
2977 robj *o;
2978
2979 o = lookupKeyWrite(c->db,c->argv[1]);
2980 if (o == NULL) {
2981 value = 0;
2982 } else {
2983 if (o->type != REDIS_STRING) {
2984 value = 0;
2985 } else {
2986 char *eptr;
2987
2988 if (o->encoding == REDIS_ENCODING_RAW)
2989 value = strtoll(o->ptr, &eptr, 10);
2990 else if (o->encoding == REDIS_ENCODING_INT)
2991 value = (long)o->ptr;
2992 else
2993 assert(1 != 1);
2994 }
2995 }
2996
2997 value += incr;
2998 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2999 tryObjectEncoding(o);
3000 retval = dictAdd(c->db->dict,c->argv[1],o);
3001 if (retval == DICT_ERR) {
3002 dictReplace(c->db->dict,c->argv[1],o);
3003 removeExpire(c->db,c->argv[1]);
3004 } else {
3005 incrRefCount(c->argv[1]);
3006 }
3007 server.dirty++;
3008 addReply(c,shared.colon);
3009 addReply(c,o);
3010 addReply(c,shared.crlf);
3011 }
3012
3013 static void incrCommand(redisClient *c) {
3014 incrDecrCommand(c,1);
3015 }
3016
3017 static void decrCommand(redisClient *c) {
3018 incrDecrCommand(c,-1);
3019 }
3020
3021 static void incrbyCommand(redisClient *c) {
3022 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3023 incrDecrCommand(c,incr);
3024 }
3025
3026 static void decrbyCommand(redisClient *c) {
3027 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3028 incrDecrCommand(c,-incr);
3029 }
3030
3031 /* ========================= Type agnostic commands ========================= */
3032
3033 static void delCommand(redisClient *c) {
3034 int deleted = 0, j;
3035
3036 for (j = 1; j < c->argc; j++) {
3037 if (deleteKey(c->db,c->argv[j])) {
3038 server.dirty++;
3039 deleted++;
3040 }
3041 }
3042 switch(deleted) {
3043 case 0:
3044 addReply(c,shared.czero);
3045 break;
3046 case 1:
3047 addReply(c,shared.cone);
3048 break;
3049 default:
3050 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3051 break;
3052 }
3053 }
3054
3055 static void existsCommand(redisClient *c) {
3056 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3057 }
3058
3059 static void selectCommand(redisClient *c) {
3060 int id = atoi(c->argv[1]->ptr);
3061
3062 if (selectDb(c,id) == REDIS_ERR) {
3063 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3064 } else {
3065 addReply(c,shared.ok);
3066 }
3067 }
3068
3069 static void randomkeyCommand(redisClient *c) {
3070 dictEntry *de;
3071
3072 while(1) {
3073 de = dictGetRandomKey(c->db->dict);
3074 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3075 }
3076 if (de == NULL) {
3077 addReply(c,shared.plus);
3078 addReply(c,shared.crlf);
3079 } else {
3080 addReply(c,shared.plus);
3081 addReply(c,dictGetEntryKey(de));
3082 addReply(c,shared.crlf);
3083 }
3084 }
3085
3086 static void keysCommand(redisClient *c) {
3087 dictIterator *di;
3088 dictEntry *de;
3089 sds pattern = c->argv[1]->ptr;
3090 int plen = sdslen(pattern);
3091 int numkeys = 0, keyslen = 0;
3092 robj *lenobj = createObject(REDIS_STRING,NULL);
3093
3094 di = dictGetIterator(c->db->dict);
3095 addReply(c,lenobj);
3096 decrRefCount(lenobj);
3097 while((de = dictNext(di)) != NULL) {
3098 robj *keyobj = dictGetEntryKey(de);
3099
3100 sds key = keyobj->ptr;
3101 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3102 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3103 if (expireIfNeeded(c->db,keyobj) == 0) {
3104 if (numkeys != 0)
3105 addReply(c,shared.space);
3106 addReply(c,keyobj);
3107 numkeys++;
3108 keyslen += sdslen(key);
3109 }
3110 }
3111 }
3112 dictReleaseIterator(di);
3113 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3114 addReply(c,shared.crlf);
3115 }
3116
3117 static void dbsizeCommand(redisClient *c) {
3118 addReplySds(c,
3119 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3120 }
3121
3122 static void lastsaveCommand(redisClient *c) {
3123 addReplySds(c,
3124 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3125 }
3126
3127 static void typeCommand(redisClient *c) {
3128 robj *o;
3129 char *type;
3130
3131 o = lookupKeyRead(c->db,c->argv[1]);
3132 if (o == NULL) {
3133 type = "+none";
3134 } else {
3135 switch(o->type) {
3136 case REDIS_STRING: type = "+string"; break;
3137 case REDIS_LIST: type = "+list"; break;
3138 case REDIS_SET: type = "+set"; break;
3139 case REDIS_ZSET: type = "+zset"; break;
3140 default: type = "unknown"; break;
3141 }
3142 }
3143 addReplySds(c,sdsnew(type));
3144 addReply(c,shared.crlf);
3145 }
3146
3147 static void saveCommand(redisClient *c) {
3148 if (server.bgsaveinprogress) {
3149 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3150 return;
3151 }
3152 if (rdbSave(server.dbfilename) == REDIS_OK) {
3153 addReply(c,shared.ok);
3154 } else {
3155 addReply(c,shared.err);
3156 }
3157 }
3158
3159 static void bgsaveCommand(redisClient *c) {
3160 if (server.bgsaveinprogress) {
3161 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3162 return;
3163 }
3164 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3165 addReply(c,shared.ok);
3166 } else {
3167 addReply(c,shared.err);
3168 }
3169 }
3170
3171 static void shutdownCommand(redisClient *c) {
3172 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3173 /* Kill the saving child if there is a background saving in progress.
3174 We want to avoid race conditions, for instance our saving child may
3175 overwrite the synchronous saving did by SHUTDOWN. */
3176 if (server.bgsaveinprogress) {
3177 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3178 kill(server.bgsavechildpid,SIGKILL);
3179 rdbRemoveTempFile(server.bgsavechildpid);
3180 }
3181 /* SYNC SAVE */
3182 if (rdbSave(server.dbfilename) == REDIS_OK) {
3183 if (server.daemonize)
3184 unlink(server.pidfile);
3185 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3186 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3187 exit(1);
3188 } else {
3189 /* Ooops.. error saving! The best we can do is to continue operating.
3190 * Note that if there was a background saving process, in the next
3191 * cron() Redis will be notified that the background saving aborted,
3192 * handling special stuff like slaves pending for synchronization... */
3193 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3194 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3195 }
3196 }
3197
3198 static void renameGenericCommand(redisClient *c, int nx) {
3199 robj *o;
3200
3201 /* To use the same key as src and dst is probably an error */
3202 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3203 addReply(c,shared.sameobjecterr);
3204 return;
3205 }
3206
3207 o = lookupKeyWrite(c->db,c->argv[1]);
3208 if (o == NULL) {
3209 addReply(c,shared.nokeyerr);
3210 return;
3211 }
3212 incrRefCount(o);
3213 deleteIfVolatile(c->db,c->argv[2]);
3214 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3215 if (nx) {
3216 decrRefCount(o);
3217 addReply(c,shared.czero);
3218 return;
3219 }
3220 dictReplace(c->db->dict,c->argv[2],o);
3221 } else {
3222 incrRefCount(c->argv[2]);
3223 }
3224 deleteKey(c->db,c->argv[1]);
3225 server.dirty++;
3226 addReply(c,nx ? shared.cone : shared.ok);
3227 }
3228
3229 static void renameCommand(redisClient *c) {
3230 renameGenericCommand(c,0);
3231 }
3232
3233 static void renamenxCommand(redisClient *c) {
3234 renameGenericCommand(c,1);
3235 }
3236
3237 static void moveCommand(redisClient *c) {
3238 robj *o;
3239 redisDb *src, *dst;
3240 int srcid;
3241
3242 /* Obtain source and target DB pointers */
3243 src = c->db;
3244 srcid = c->db->id;
3245 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3246 addReply(c,shared.outofrangeerr);
3247 return;
3248 }
3249 dst = c->db;
3250 selectDb(c,srcid); /* Back to the source DB */
3251
3252 /* If the user is moving using as target the same
3253 * DB as the source DB it is probably an error. */
3254 if (src == dst) {
3255 addReply(c,shared.sameobjecterr);
3256 return;
3257 }
3258
3259 /* Check if the element exists and get a reference */
3260 o = lookupKeyWrite(c->db,c->argv[1]);
3261 if (!o) {
3262 addReply(c,shared.czero);
3263 return;
3264 }
3265
3266 /* Try to add the element to the target DB */
3267 deleteIfVolatile(dst,c->argv[1]);
3268 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3269 addReply(c,shared.czero);
3270 return;
3271 }
3272 incrRefCount(c->argv[1]);
3273 incrRefCount(o);
3274
3275 /* OK! key moved, free the entry in the source DB */
3276 deleteKey(src,c->argv[1]);
3277 server.dirty++;
3278 addReply(c,shared.cone);
3279 }
3280
3281 /* =================================== Lists ================================ */
3282 static void pushGenericCommand(redisClient *c, int where) {
3283 robj *lobj;
3284 list *list;
3285
3286 lobj = lookupKeyWrite(c->db,c->argv[1]);
3287 if (lobj == NULL) {
3288 lobj = createListObject();
3289 list = lobj->ptr;
3290 if (where == REDIS_HEAD) {
3291 listAddNodeHead(list,c->argv[2]);
3292 } else {
3293 listAddNodeTail(list,c->argv[2]);
3294 }
3295 dictAdd(c->db->dict,c->argv[1],lobj);
3296 incrRefCount(c->argv[1]);
3297 incrRefCount(c->argv[2]);
3298 } else {
3299 if (lobj->type != REDIS_LIST) {
3300 addReply(c,shared.wrongtypeerr);
3301 return;
3302 }
3303 list = lobj->ptr;
3304 if (where == REDIS_HEAD) {
3305 listAddNodeHead(list,c->argv[2]);
3306 } else {
3307 listAddNodeTail(list,c->argv[2]);
3308 }
3309 incrRefCount(c->argv[2]);
3310 }
3311 server.dirty++;
3312 addReply(c,shared.ok);
3313 }
3314
3315 static void lpushCommand(redisClient *c) {
3316 pushGenericCommand(c,REDIS_HEAD);
3317 }
3318
3319 static void rpushCommand(redisClient *c) {
3320 pushGenericCommand(c,REDIS_TAIL);
3321 }
3322
3323 static void llenCommand(redisClient *c) {
3324 robj *o;
3325 list *l;
3326
3327 o = lookupKeyRead(c->db,c->argv[1]);
3328 if (o == NULL) {
3329 addReply(c,shared.czero);
3330 return;
3331 } else {
3332 if (o->type != REDIS_LIST) {
3333 addReply(c,shared.wrongtypeerr);
3334 } else {
3335 l = o->ptr;
3336 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3337 }
3338 }
3339 }
3340
3341 static void lindexCommand(redisClient *c) {
3342 robj *o;
3343 int index = atoi(c->argv[2]->ptr);
3344
3345 o = lookupKeyRead(c->db,c->argv[1]);
3346 if (o == NULL) {
3347 addReply(c,shared.nullbulk);
3348 } else {
3349 if (o->type != REDIS_LIST) {
3350 addReply(c,shared.wrongtypeerr);
3351 } else {
3352 list *list = o->ptr;
3353 listNode *ln;
3354
3355 ln = listIndex(list, index);
3356 if (ln == NULL) {
3357 addReply(c,shared.nullbulk);
3358 } else {
3359 robj *ele = listNodeValue(ln);
3360 addReplyBulkLen(c,ele);
3361 addReply(c,ele);
3362 addReply(c,shared.crlf);
3363 }
3364 }
3365 }
3366 }
3367
3368 static void lsetCommand(redisClient *c) {
3369 robj *o;
3370 int index = atoi(c->argv[2]->ptr);
3371
3372 o = lookupKeyWrite(c->db,c->argv[1]);
3373 if (o == NULL) {
3374 addReply(c,shared.nokeyerr);
3375 } else {
3376 if (o->type != REDIS_LIST) {
3377 addReply(c,shared.wrongtypeerr);
3378 } else {
3379 list *list = o->ptr;
3380 listNode *ln;
3381
3382 ln = listIndex(list, index);
3383 if (ln == NULL) {
3384 addReply(c,shared.outofrangeerr);
3385 } else {
3386 robj *ele = listNodeValue(ln);
3387
3388 decrRefCount(ele);
3389 listNodeValue(ln) = c->argv[3];
3390 incrRefCount(c->argv[3]);
3391 addReply(c,shared.ok);
3392 server.dirty++;
3393 }
3394 }
3395 }
3396 }
3397
3398 static void popGenericCommand(redisClient *c, int where) {
3399 robj *o;
3400
3401 o = lookupKeyWrite(c->db,c->argv[1]);
3402 if (o == NULL) {
3403 addReply(c,shared.nullbulk);
3404 } else {
3405 if (o->type != REDIS_LIST) {
3406 addReply(c,shared.wrongtypeerr);
3407 } else {
3408 list *list = o->ptr;
3409 listNode *ln;
3410
3411 if (where == REDIS_HEAD)
3412 ln = listFirst(list);
3413 else
3414 ln = listLast(list);
3415
3416 if (ln == NULL) {
3417 addReply(c,shared.nullbulk);
3418 } else {
3419 robj *ele = listNodeValue(ln);
3420 addReplyBulkLen(c,ele);
3421 addReply(c,ele);
3422 addReply(c,shared.crlf);
3423 listDelNode(list,ln);
3424 server.dirty++;
3425 }
3426 }
3427 }
3428 }
3429
3430 static void lpopCommand(redisClient *c) {
3431 popGenericCommand(c,REDIS_HEAD);
3432 }
3433
3434 static void rpopCommand(redisClient *c) {
3435 popGenericCommand(c,REDIS_TAIL);
3436 }
3437
3438 static void lrangeCommand(redisClient *c) {
3439 robj *o;
3440 int start = atoi(c->argv[2]->ptr);
3441 int end = atoi(c->argv[3]->ptr);
3442
3443 o = lookupKeyRead(c->db,c->argv[1]);
3444 if (o == NULL) {
3445 addReply(c,shared.nullmultibulk);
3446 } else {
3447 if (o->type != REDIS_LIST) {
3448 addReply(c,shared.wrongtypeerr);
3449 } else {
3450 list *list = o->ptr;
3451 listNode *ln;
3452 int llen = listLength(list);
3453 int rangelen, j;
3454 robj *ele;
3455
3456 /* convert negative indexes */
3457 if (start < 0) start = llen+start;
3458 if (end < 0) end = llen+end;
3459 if (start < 0) start = 0;
3460 if (end < 0) end = 0;
3461
3462 /* indexes sanity checks */
3463 if (start > end || start >= llen) {
3464 /* Out of range start or start > end result in empty list */
3465 addReply(c,shared.emptymultibulk);
3466 return;
3467 }
3468 if (end >= llen) end = llen-1;
3469 rangelen = (end-start)+1;
3470
3471 /* Return the result in form of a multi-bulk reply */
3472 ln = listIndex(list, start);
3473 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3474 for (j = 0; j < rangelen; j++) {
3475 ele = listNodeValue(ln);
3476 addReplyBulkLen(c,ele);
3477 addReply(c,ele);
3478 addReply(c,shared.crlf);
3479 ln = ln->next;
3480 }
3481 }
3482 }
3483 }
3484
3485 static void ltrimCommand(redisClient *c) {
3486 robj *o;
3487 int start = atoi(c->argv[2]->ptr);
3488 int end = atoi(c->argv[3]->ptr);
3489
3490 o = lookupKeyWrite(c->db,c->argv[1]);
3491 if (o == NULL) {
3492 addReply(c,shared.nokeyerr);
3493 } else {
3494 if (o->type != REDIS_LIST) {
3495 addReply(c,shared.wrongtypeerr);
3496 } else {
3497 list *list = o->ptr;
3498 listNode *ln;
3499 int llen = listLength(list);
3500 int j, ltrim, rtrim;
3501
3502 /* convert negative indexes */
3503 if (start < 0) start = llen+start;
3504 if (end < 0) end = llen+end;
3505 if (start < 0) start = 0;
3506 if (end < 0) end = 0;
3507
3508 /* indexes sanity checks */
3509 if (start > end || start >= llen) {
3510 /* Out of range start or start > end result in empty list */
3511 ltrim = llen;
3512 rtrim = 0;
3513 } else {
3514 if (end >= llen) end = llen-1;
3515 ltrim = start;
3516 rtrim = llen-end-1;
3517 }
3518
3519 /* Remove list elements to perform the trim */
3520 for (j = 0; j < ltrim; j++) {
3521 ln = listFirst(list);
3522 listDelNode(list,ln);
3523 }
3524 for (j = 0; j < rtrim; j++) {
3525 ln = listLast(list);
3526 listDelNode(list,ln);
3527 }
3528 server.dirty++;
3529 addReply(c,shared.ok);
3530 }
3531 }
3532 }
3533
3534 static void lremCommand(redisClient *c) {
3535 robj *o;
3536
3537 o = lookupKeyWrite(c->db,c->argv[1]);
3538 if (o == NULL) {
3539 addReply(c,shared.czero);
3540 } else {
3541 if (o->type != REDIS_LIST) {
3542 addReply(c,shared.wrongtypeerr);
3543 } else {
3544 list *list = o->ptr;
3545 listNode *ln, *next;
3546 int toremove = atoi(c->argv[2]->ptr);
3547 int removed = 0;
3548 int fromtail = 0;
3549
3550 if (toremove < 0) {
3551 toremove = -toremove;
3552 fromtail = 1;
3553 }
3554 ln = fromtail ? list->tail : list->head;
3555 while (ln) {
3556 robj *ele = listNodeValue(ln);
3557
3558 next = fromtail ? ln->prev : ln->next;
3559 if (compareStringObjects(ele,c->argv[3]) == 0) {
3560 listDelNode(list,ln);
3561 server.dirty++;
3562 removed++;
3563 if (toremove && removed == toremove) break;
3564 }
3565 ln = next;
3566 }
3567 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3568 }
3569 }
3570 }
3571
3572 /* This is the semantic of this command:
3573 * RPOPLPUSH srclist dstlist:
3574 * IF LLEN(srclist) > 0
3575 * element = RPOP srclist
3576 * LPUSH dstlist element
3577 * RETURN element
3578 * ELSE
3579 * RETURN nil
3580 * END
3581 * END
3582 *
3583 * The idea is to be able to get an element from a list in a reliable way
3584 * since the element is not just returned but pushed against another list
3585 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3586 */
3587 static void rpoplpushcommand(redisClient *c) {
3588 robj *sobj;
3589
3590 sobj = lookupKeyWrite(c->db,c->argv[1]);
3591 if (sobj == NULL) {
3592 addReply(c,shared.nullbulk);
3593 } else {
3594 if (sobj->type != REDIS_LIST) {
3595 addReply(c,shared.wrongtypeerr);
3596 } else {
3597 list *srclist = sobj->ptr;
3598 listNode *ln = listLast(srclist);
3599
3600 if (ln == NULL) {
3601 addReply(c,shared.nullbulk);
3602 } else {
3603 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3604 robj *ele = listNodeValue(ln);
3605 list *dstlist;
3606
3607 if (dobj == NULL) {
3608
3609 /* Create the list if the key does not exist */
3610 dobj = createListObject();
3611 dictAdd(c->db->dict,c->argv[2],dobj);
3612 incrRefCount(c->argv[2]);
3613 } else if (dobj->type != REDIS_LIST) {
3614 addReply(c,shared.wrongtypeerr);
3615 return;
3616 }
3617 /* Add the element to the target list */
3618 dstlist = dobj->ptr;
3619 listAddNodeHead(dstlist,ele);
3620 incrRefCount(ele);
3621
3622 /* Send the element to the client as reply as well */
3623 addReplyBulkLen(c,ele);
3624 addReply(c,ele);
3625 addReply(c,shared.crlf);
3626
3627 /* Finally remove the element from the source list */
3628 listDelNode(srclist,ln);
3629 server.dirty++;
3630 }
3631 }
3632 }
3633 }
3634
3635
3636 /* ==================================== Sets ================================ */
3637
3638 static void saddCommand(redisClient *c) {
3639 robj *set;
3640
3641 set = lookupKeyWrite(c->db,c->argv[1]);
3642 if (set == NULL) {
3643 set = createSetObject();
3644 dictAdd(c->db->dict,c->argv[1],set);
3645 incrRefCount(c->argv[1]);
3646 } else {
3647 if (set->type != REDIS_SET) {
3648 addReply(c,shared.wrongtypeerr);
3649 return;
3650 }
3651 }
3652 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3653 incrRefCount(c->argv[2]);
3654 server.dirty++;
3655 addReply(c,shared.cone);
3656 } else {
3657 addReply(c,shared.czero);
3658 }
3659 }
3660
3661 static void sremCommand(redisClient *c) {
3662 robj *set;
3663
3664 set = lookupKeyWrite(c->db,c->argv[1]);
3665 if (set == NULL) {
3666 addReply(c,shared.czero);
3667 } else {
3668 if (set->type != REDIS_SET) {
3669 addReply(c,shared.wrongtypeerr);
3670 return;
3671 }
3672 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3673 server.dirty++;
3674 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3675 addReply(c,shared.cone);
3676 } else {
3677 addReply(c,shared.czero);
3678 }
3679 }
3680 }
3681
3682 static void smoveCommand(redisClient *c) {
3683 robj *srcset, *dstset;
3684
3685 srcset = lookupKeyWrite(c->db,c->argv[1]);
3686 dstset = lookupKeyWrite(c->db,c->argv[2]);
3687
3688 /* If the source key does not exist return 0, if it's of the wrong type
3689 * raise an error */
3690 if (srcset == NULL || srcset->type != REDIS_SET) {
3691 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3692 return;
3693 }
3694 /* Error if the destination key is not a set as well */
3695 if (dstset && dstset->type != REDIS_SET) {
3696 addReply(c,shared.wrongtypeerr);
3697 return;
3698 }
3699 /* Remove the element from the source set */
3700 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3701 /* Key not found in the src set! return zero */
3702 addReply(c,shared.czero);
3703 return;
3704 }
3705 server.dirty++;
3706 /* Add the element to the destination set */
3707 if (!dstset) {
3708 dstset = createSetObject();
3709 dictAdd(c->db->dict,c->argv[2],dstset);
3710 incrRefCount(c->argv[2]);
3711 }
3712 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3713 incrRefCount(c->argv[3]);
3714 addReply(c,shared.cone);
3715 }
3716
3717 static void sismemberCommand(redisClient *c) {
3718 robj *set;
3719
3720 set = lookupKeyRead(c->db,c->argv[1]);
3721 if (set == NULL) {
3722 addReply(c,shared.czero);
3723 } else {
3724 if (set->type != REDIS_SET) {
3725 addReply(c,shared.wrongtypeerr);
3726 return;
3727 }
3728 if (dictFind(set->ptr,c->argv[2]))
3729 addReply(c,shared.cone);
3730 else
3731 addReply(c,shared.czero);
3732 }
3733 }
3734
3735 static void scardCommand(redisClient *c) {
3736 robj *o;
3737 dict *s;
3738
3739 o = lookupKeyRead(c->db,c->argv[1]);
3740 if (o == NULL) {
3741 addReply(c,shared.czero);
3742 return;
3743 } else {
3744 if (o->type != REDIS_SET) {
3745 addReply(c,shared.wrongtypeerr);
3746 } else {
3747 s = o->ptr;
3748 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3749 dictSize(s)));
3750 }
3751 }
3752 }
3753
3754 static void spopCommand(redisClient *c) {
3755 robj *set;
3756 dictEntry *de;
3757
3758 set = lookupKeyWrite(c->db,c->argv[1]);
3759 if (set == NULL) {
3760 addReply(c,shared.nullbulk);
3761 } else {
3762 if (set->type != REDIS_SET) {
3763 addReply(c,shared.wrongtypeerr);
3764 return;
3765 }
3766 de = dictGetRandomKey(set->ptr);
3767 if (de == NULL) {
3768 addReply(c,shared.nullbulk);
3769 } else {
3770 robj *ele = dictGetEntryKey(de);
3771
3772 addReplyBulkLen(c,ele);
3773 addReply(c,ele);
3774 addReply(c,shared.crlf);
3775 dictDelete(set->ptr,ele);
3776 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3777 server.dirty++;
3778 }
3779 }
3780 }
3781
3782 static void srandmemberCommand(redisClient *c) {
3783 robj *set;
3784 dictEntry *de;
3785
3786 set = lookupKeyRead(c->db,c->argv[1]);
3787 if (set == NULL) {
3788 addReply(c,shared.nullbulk);
3789 } else {
3790 if (set->type != REDIS_SET) {
3791 addReply(c,shared.wrongtypeerr);
3792 return;
3793 }
3794 de = dictGetRandomKey(set->ptr);
3795 if (de == NULL) {
3796 addReply(c,shared.nullbulk);
3797 } else {
3798 robj *ele = dictGetEntryKey(de);
3799
3800 addReplyBulkLen(c,ele);
3801 addReply(c,ele);
3802 addReply(c,shared.crlf);
3803 }
3804 }
3805 }
3806
3807 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3808 dict **d1 = (void*) s1, **d2 = (void*) s2;
3809
3810 return dictSize(*d1)-dictSize(*d2);
3811 }
3812
3813 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3814 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3815 dictIterator *di;
3816 dictEntry *de;
3817 robj *lenobj = NULL, *dstset = NULL;
3818 int j, cardinality = 0;
3819
3820 for (j = 0; j < setsnum; j++) {
3821 robj *setobj;
3822
3823 setobj = dstkey ?
3824 lookupKeyWrite(c->db,setskeys[j]) :
3825 lookupKeyRead(c->db,setskeys[j]);
3826 if (!setobj) {
3827 zfree(dv);
3828 if (dstkey) {
3829 deleteKey(c->db,dstkey);
3830 addReply(c,shared.ok);
3831 } else {
3832 addReply(c,shared.nullmultibulk);
3833 }
3834 return;
3835 }
3836 if (setobj->type != REDIS_SET) {
3837 zfree(dv);
3838 addReply(c,shared.wrongtypeerr);
3839 return;
3840 }
3841 dv[j] = setobj->ptr;
3842 }
3843 /* Sort sets from the smallest to largest, this will improve our
3844 * algorithm's performace */
3845 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3846
3847 /* The first thing we should output is the total number of elements...
3848 * since this is a multi-bulk write, but at this stage we don't know
3849 * the intersection set size, so we use a trick, append an empty object
3850 * to the output list and save the pointer to later modify it with the
3851 * right length */
3852 if (!dstkey) {
3853 lenobj = createObject(REDIS_STRING,NULL);
3854 addReply(c,lenobj);
3855 decrRefCount(lenobj);
3856 } else {
3857 /* If we have a target key where to store the resulting set
3858 * create this key with an empty set inside */
3859 dstset = createSetObject();
3860 }
3861
3862 /* Iterate all the elements of the first (smallest) set, and test
3863 * the element against all the other sets, if at least one set does
3864 * not include the element it is discarded */
3865 di = dictGetIterator(dv[0]);
3866
3867 while((de = dictNext(di)) != NULL) {
3868 robj *ele;
3869
3870 for (j = 1; j < setsnum; j++)
3871 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3872 if (j != setsnum)
3873 continue; /* at least one set does not contain the member */
3874 ele = dictGetEntryKey(de);
3875 if (!dstkey) {
3876 addReplyBulkLen(c,ele);
3877 addReply(c,ele);
3878 addReply(c,shared.crlf);
3879 cardinality++;
3880 } else {
3881 dictAdd(dstset->ptr,ele,NULL);
3882 incrRefCount(ele);
3883 }
3884 }
3885 dictReleaseIterator(di);
3886
3887 if (dstkey) {
3888 /* Store the resulting set into the target */
3889 deleteKey(c->db,dstkey);
3890 dictAdd(c->db->dict,dstkey,dstset);
3891 incrRefCount(dstkey);
3892 }
3893
3894 if (!dstkey) {
3895 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3896 } else {
3897 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3898 dictSize((dict*)dstset->ptr)));
3899 server.dirty++;
3900 }
3901 zfree(dv);
3902 }
3903
3904 static void sinterCommand(redisClient *c) {
3905 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3906 }
3907
3908 static void sinterstoreCommand(redisClient *c) {
3909 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3910 }
3911
3912 #define REDIS_OP_UNION 0
3913 #define REDIS_OP_DIFF 1
3914
3915 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3916 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3917 dictIterator *di;
3918 dictEntry *de;
3919 robj *dstset = NULL;
3920 int j, cardinality = 0;
3921
3922 for (j = 0; j < setsnum; j++) {
3923 robj *setobj;
3924
3925 setobj = dstkey ?
3926 lookupKeyWrite(c->db,setskeys[j]) :
3927 lookupKeyRead(c->db,setskeys[j]);
3928 if (!setobj) {
3929 dv[j] = NULL;
3930 continue;
3931 }
3932 if (setobj->type != REDIS_SET) {
3933 zfree(dv);
3934 addReply(c,shared.wrongtypeerr);
3935 return;
3936 }
3937 dv[j] = setobj->ptr;
3938 }
3939
3940 /* We need a temp set object to store our union. If the dstkey
3941 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3942 * this set object will be the resulting object to set into the target key*/
3943 dstset = createSetObject();
3944
3945 /* Iterate all the elements of all the sets, add every element a single
3946 * time to the result set */
3947 for (j = 0; j < setsnum; j++) {
3948 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3949 if (!dv[j]) continue; /* non existing keys are like empty sets */
3950
3951 di = dictGetIterator(dv[j]);
3952
3953 while((de = dictNext(di)) != NULL) {
3954 robj *ele;
3955
3956 /* dictAdd will not add the same element multiple times */
3957 ele = dictGetEntryKey(de);
3958 if (op == REDIS_OP_UNION || j == 0) {
3959 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3960 incrRefCount(ele);
3961 cardinality++;
3962 }
3963 } else if (op == REDIS_OP_DIFF) {
3964 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3965 cardinality--;
3966 }
3967 }
3968 }
3969 dictReleaseIterator(di);
3970
3971 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3972 }
3973
3974 /* Output the content of the resulting set, if not in STORE mode */
3975 if (!dstkey) {
3976 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3977 di = dictGetIterator(dstset->ptr);
3978 while((de = dictNext(di)) != NULL) {
3979 robj *ele;
3980
3981 ele = dictGetEntryKey(de);
3982 addReplyBulkLen(c,ele);
3983 addReply(c,ele);
3984 addReply(c,shared.crlf);
3985 }
3986 dictReleaseIterator(di);
3987 } else {
3988 /* If we have a target key where to store the resulting set
3989 * create this key with the result set inside */
3990 deleteKey(c->db,dstkey);
3991 dictAdd(c->db->dict,dstkey,dstset);
3992 incrRefCount(dstkey);
3993 }
3994
3995 /* Cleanup */
3996 if (!dstkey) {
3997 decrRefCount(dstset);
3998 } else {
3999 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4000 dictSize((dict*)dstset->ptr)));
4001 server.dirty++;
4002 }
4003 zfree(dv);
4004 }
4005
4006 static void sunionCommand(redisClient *c) {
4007 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4008 }
4009
4010 static void sunionstoreCommand(redisClient *c) {
4011 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4012 }
4013
4014 static void sdiffCommand(redisClient *c) {
4015 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4016 }
4017
4018 static void sdiffstoreCommand(redisClient *c) {
4019 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4020 }
4021
4022 /* ==================================== ZSets =============================== */
4023
4024 /* ZSETs are ordered sets using two data structures to hold the same elements
4025 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4026 * data structure.
4027 *
4028 * The elements are added to an hash table mapping Redis objects to scores.
4029 * At the same time the elements are added to a skip list mapping scores
4030 * to Redis objects (so objects are sorted by scores in this "view"). */
4031
4032 /* This skiplist implementation is almost a C translation of the original
4033 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4034 * Alternative to Balanced Trees", modified in three ways:
4035 * a) this implementation allows for repeated values.
4036 * b) the comparison is not just by key (our 'score') but by satellite data.
4037 * c) there is a back pointer, so it's a doubly linked list with the back
4038 * pointers being only at "level 1". This allows to traverse the list
4039 * from tail to head, useful for ZREVRANGE. */
4040
4041 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4042 zskiplistNode *zn = zmalloc(sizeof(*zn));
4043
4044 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4045 zn->score = score;
4046 zn->obj = obj;
4047 return zn;
4048 }
4049
4050 static zskiplist *zslCreate(void) {
4051 int j;
4052 zskiplist *zsl;
4053
4054 zsl = zmalloc(sizeof(*zsl));
4055 zsl->level = 1;
4056 zsl->length = 0;
4057 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4058 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4059 zsl->header->forward[j] = NULL;
4060 zsl->header->backward = NULL;
4061 zsl->tail = NULL;
4062 return zsl;
4063 }
4064
4065 static void zslFreeNode(zskiplistNode *node) {
4066 decrRefCount(node->obj);
4067 zfree(node->forward);
4068 zfree(node);
4069 }
4070
4071 static void zslFree(zskiplist *zsl) {
4072 zskiplistNode *node = zsl->header->forward[0], *next;
4073
4074 zfree(zsl->header->forward);
4075 zfree(zsl->header);
4076 while(node) {
4077 next = node->forward[0];
4078 zslFreeNode(node);
4079 node = next;
4080 }
4081 zfree(zsl);
4082 }
4083
4084 static int zslRandomLevel(void) {
4085 int level = 1;
4086 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4087 level += 1;
4088 return level;
4089 }
4090
4091 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4092 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4093 int i, level;
4094
4095 x = zsl->header;
4096 for (i = zsl->level-1; i >= 0; i--) {
4097 while (x->forward[i] &&
4098 (x->forward[i]->score < score ||
4099 (x->forward[i]->score == score &&
4100 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4101 x = x->forward[i];
4102 update[i] = x;
4103 }
4104 /* we assume the key is not already inside, since we allow duplicated
4105 * scores, and the re-insertion of score and redis object should never
4106 * happpen since the caller of zslInsert() should test in the hash table
4107 * if the element is already inside or not. */
4108 level = zslRandomLevel();
4109 if (level > zsl->level) {
4110 for (i = zsl->level; i < level; i++)
4111 update[i] = zsl->header;
4112 zsl->level = level;
4113 }
4114 x = zslCreateNode(level,score,obj);
4115 for (i = 0; i < level; i++) {
4116 x->forward[i] = update[i]->forward[i];
4117 update[i]->forward[i] = x;
4118 }
4119 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4120 if (x->forward[0])
4121 x->forward[0]->backward = x;
4122 else
4123 zsl->tail = x;
4124 zsl->length++;
4125 }
4126
4127 /* Delete an element with matching score/object from the skiplist. */
4128 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4129 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4130 int i;
4131
4132 x = zsl->header;
4133 for (i = zsl->level-1; i >= 0; i--) {
4134 while (x->forward[i] &&
4135 (x->forward[i]->score < score ||
4136 (x->forward[i]->score == score &&
4137 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4138 x = x->forward[i];
4139 update[i] = x;
4140 }
4141 /* We may have multiple elements with the same score, what we need
4142 * is to find the element with both the right score and object. */
4143 x = x->forward[0];
4144 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4145 for (i = 0; i < zsl->level; i++) {
4146 if (update[i]->forward[i] != x) break;
4147 update[i]->forward[i] = x->forward[i];
4148 }
4149 if (x->forward[0]) {
4150 x->forward[0]->backward = (x->backward == zsl->header) ?
4151 NULL : x->backward;
4152 } else {
4153 zsl->tail = x->backward;
4154 }
4155 zslFreeNode(x);
4156 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4157 zsl->level--;
4158 zsl->length--;
4159 return 1;
4160 } else {
4161 return 0; /* not found */
4162 }
4163 return 0; /* not found */
4164 }
4165
4166 /* Delete all the elements with score between min and max from the skiplist.
4167 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4168 * Note that this function takes the reference to the hash table view of the
4169 * sorted set, in order to remove the elements from the hash table too. */
4170 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4171 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4172 unsigned long removed = 0;
4173 int i;
4174
4175 x = zsl->header;
4176 for (i = zsl->level-1; i >= 0; i--) {
4177 while (x->forward[i] && x->forward[i]->score < min)
4178 x = x->forward[i];
4179 update[i] = x;
4180 }
4181 /* We may have multiple elements with the same score, what we need
4182 * is to find the element with both the right score and object. */
4183 x = x->forward[0];
4184 while (x && x->score <= max) {
4185 zskiplistNode *next;
4186
4187 for (i = 0; i < zsl->level; i++) {
4188 if (update[i]->forward[i] != x) break;
4189 update[i]->forward[i] = x->forward[i];
4190 }
4191 if (x->forward[0]) {
4192 x->forward[0]->backward = (x->backward == zsl->header) ?
4193 NULL : x->backward;
4194 } else {
4195 zsl->tail = x->backward;
4196 }
4197 next = x->forward[0];
4198 dictDelete(dict,x->obj);
4199 zslFreeNode(x);
4200 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4201 zsl->level--;
4202 zsl->length--;
4203 removed++;
4204 x = next;
4205 }
4206 return removed; /* not found */
4207 }
4208
4209 /* Find the first node having a score equal or greater than the specified one.
4210 * Returns NULL if there is no match. */
4211 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4212 zskiplistNode *x;
4213 int i;
4214
4215 x = zsl->header;
4216 for (i = zsl->level-1; i >= 0; i--) {
4217 while (x->forward[i] && x->forward[i]->score < score)
4218 x = x->forward[i];
4219 }
4220 /* We may have multiple elements with the same score, what we need
4221 * is to find the element with both the right score and object. */
4222 return x->forward[0];
4223 }
4224
4225 /* The actual Z-commands implementations */
4226
4227 static void zaddCommand(redisClient *c) {
4228 robj *zsetobj;
4229 zset *zs;
4230 double *score;
4231
4232 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4233 if (zsetobj == NULL) {
4234 zsetobj = createZsetObject();
4235 dictAdd(c->db->dict,c->argv[1],zsetobj);
4236 incrRefCount(c->argv[1]);
4237 } else {
4238 if (zsetobj->type != REDIS_ZSET) {
4239 addReply(c,shared.wrongtypeerr);
4240 return;
4241 }
4242 }
4243 score = zmalloc(sizeof(double));
4244 *score = strtod(c->argv[2]->ptr,NULL);
4245 zs = zsetobj->ptr;
4246 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4247 /* case 1: New element */
4248 incrRefCount(c->argv[3]); /* added to hash */
4249 zslInsert(zs->zsl,*score,c->argv[3]);
4250 incrRefCount(c->argv[3]); /* added to skiplist */
4251 server.dirty++;
4252 addReply(c,shared.cone);
4253 } else {
4254 dictEntry *de;
4255 double *oldscore;
4256
4257 /* case 2: Score update operation */
4258 de = dictFind(zs->dict,c->argv[3]);
4259 assert(de != NULL);
4260 oldscore = dictGetEntryVal(de);
4261 if (*score != *oldscore) {
4262 int deleted;
4263
4264 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4265 assert(deleted != 0);
4266 zslInsert(zs->zsl,*score,c->argv[3]);
4267 incrRefCount(c->argv[3]);
4268 dictReplace(zs->dict,c->argv[3],score);
4269 server.dirty++;
4270 } else {
4271 zfree(score);
4272 }
4273 addReply(c,shared.czero);
4274 }
4275 }
4276
4277 static void zremCommand(redisClient *c) {
4278 robj *zsetobj;
4279 zset *zs;
4280
4281 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4282 if (zsetobj == NULL) {
4283 addReply(c,shared.czero);
4284 } else {
4285 dictEntry *de;
4286 double *oldscore;
4287 int deleted;
4288
4289 if (zsetobj->type != REDIS_ZSET) {
4290 addReply(c,shared.wrongtypeerr);
4291 return;
4292 }
4293 zs = zsetobj->ptr;
4294 de = dictFind(zs->dict,c->argv[2]);
4295 if (de == NULL) {
4296 addReply(c,shared.czero);
4297 return;
4298 }
4299 /* Delete from the skiplist */
4300 oldscore = dictGetEntryVal(de);
4301 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4302 assert(deleted != 0);
4303
4304 /* Delete from the hash table */
4305 dictDelete(zs->dict,c->argv[2]);
4306 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4307 server.dirty++;
4308 addReply(c,shared.cone);
4309 }
4310 }
4311
4312 static void zremrangebyscoreCommand(redisClient *c) {
4313 double min = strtod(c->argv[2]->ptr,NULL);
4314 double max = strtod(c->argv[3]->ptr,NULL);
4315 robj *zsetobj;
4316 zset *zs;
4317
4318 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4319 if (zsetobj == NULL) {
4320 addReply(c,shared.czero);
4321 } else {
4322 long deleted;
4323
4324 if (zsetobj->type != REDIS_ZSET) {
4325 addReply(c,shared.wrongtypeerr);
4326 return;
4327 }
4328 zs = zsetobj->ptr;
4329 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4330 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4331 server.dirty += deleted;
4332 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4333 }
4334 }
4335
4336 static void zrangeGenericCommand(redisClient *c, int reverse) {
4337 robj *o;
4338 int start = atoi(c->argv[2]->ptr);
4339 int end = atoi(c->argv[3]->ptr);
4340
4341 o = lookupKeyRead(c->db,c->argv[1]);
4342 if (o == NULL) {
4343 addReply(c,shared.nullmultibulk);
4344 } else {
4345 if (o->type != REDIS_ZSET) {
4346 addReply(c,shared.wrongtypeerr);
4347 } else {
4348 zset *zsetobj = o->ptr;
4349 zskiplist *zsl = zsetobj->zsl;
4350 zskiplistNode *ln;
4351
4352 int llen = zsl->length;
4353 int rangelen, j;
4354 robj *ele;
4355
4356 /* convert negative indexes */
4357 if (start < 0) start = llen+start;
4358 if (end < 0) end = llen+end;
4359 if (start < 0) start = 0;
4360 if (end < 0) end = 0;
4361
4362 /* indexes sanity checks */
4363 if (start > end || start >= llen) {
4364 /* Out of range start or start > end result in empty list */
4365 addReply(c,shared.emptymultibulk);
4366 return;
4367 }
4368 if (end >= llen) end = llen-1;
4369 rangelen = (end-start)+1;
4370
4371 /* Return the result in form of a multi-bulk reply */
4372 if (reverse) {
4373 ln = zsl->tail;
4374 while (start--)
4375 ln = ln->backward;
4376 } else {
4377 ln = zsl->header->forward[0];
4378 while (start--)
4379 ln = ln->forward[0];
4380 }
4381
4382 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4383 for (j = 0; j < rangelen; j++) {
4384 ele = ln->obj;
4385 addReplyBulkLen(c,ele);
4386 addReply(c,ele);
4387 addReply(c,shared.crlf);
4388 ln = reverse ? ln->backward : ln->forward[0];
4389 }
4390 }
4391 }
4392 }
4393
4394 static void zrangeCommand(redisClient *c) {
4395 zrangeGenericCommand(c,0);
4396 }
4397
4398 static void zrevrangeCommand(redisClient *c) {
4399 zrangeGenericCommand(c,1);
4400 }
4401
4402 static void zrangebyscoreCommand(redisClient *c) {
4403 robj *o;
4404 double min = strtod(c->argv[2]->ptr,NULL);
4405 double max = strtod(c->argv[3]->ptr,NULL);
4406
4407 o = lookupKeyRead(c->db,c->argv[1]);
4408 if (o == NULL) {
4409 addReply(c,shared.nullmultibulk);
4410 } else {
4411 if (o->type != REDIS_ZSET) {
4412 addReply(c,shared.wrongtypeerr);
4413 } else {
4414 zset *zsetobj = o->ptr;
4415 zskiplist *zsl = zsetobj->zsl;
4416 zskiplistNode *ln;
4417 robj *ele, *lenobj;
4418 unsigned int rangelen = 0;
4419
4420 /* Get the first node with the score >= min */
4421 ln = zslFirstWithScore(zsl,min);
4422 if (ln == NULL) {
4423 /* No element matching the speciifed interval */
4424 addReply(c,shared.emptymultibulk);
4425 return;
4426 }
4427
4428 /* We don't know in advance how many matching elements there
4429 * are in the list, so we push this object that will represent
4430 * the multi-bulk length in the output buffer, and will "fix"
4431 * it later */
4432 lenobj = createObject(REDIS_STRING,NULL);
4433 addReply(c,lenobj);
4434
4435 while(ln && ln->score <= max) {
4436 ele = ln->obj;
4437 addReplyBulkLen(c,ele);
4438 addReply(c,ele);
4439 addReply(c,shared.crlf);
4440 ln = ln->forward[0];
4441 rangelen++;
4442 }
4443 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4444 }
4445 }
4446 }
4447
4448 static void zcardCommand(redisClient *c) {
4449 robj *o;
4450 zset *zs;
4451
4452 o = lookupKeyRead(c->db,c->argv[1]);
4453 if (o == NULL) {
4454 addReply(c,shared.czero);
4455 return;
4456 } else {
4457 if (o->type != REDIS_ZSET) {
4458 addReply(c,shared.wrongtypeerr);
4459 } else {
4460 zs = o->ptr;
4461 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4462 }
4463 }
4464 }
4465
4466 static void zscoreCommand(redisClient *c) {
4467 robj *o;
4468 zset *zs;
4469
4470 o = lookupKeyRead(c->db,c->argv[1]);
4471 if (o == NULL) {
4472 addReply(c,shared.nullbulk);
4473 return;
4474 } else {
4475 if (o->type != REDIS_ZSET) {
4476 addReply(c,shared.wrongtypeerr);
4477 } else {
4478 dictEntry *de;
4479
4480 zs = o->ptr;
4481 de = dictFind(zs->dict,c->argv[2]);
4482 if (!de) {
4483 addReply(c,shared.nullbulk);
4484 } else {
4485 char buf[128];
4486 double *score = dictGetEntryVal(de);
4487
4488 snprintf(buf,sizeof(buf),"%.17g",*score);
4489 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4490 strlen(buf),buf));
4491 }
4492 }
4493 }
4494 }
4495
4496 /* ========================= Non type-specific commands ==================== */
4497
4498 static void flushdbCommand(redisClient *c) {
4499 server.dirty += dictSize(c->db->dict);
4500 dictEmpty(c->db->dict);
4501 dictEmpty(c->db->expires);
4502 addReply(c,shared.ok);
4503 }
4504
4505 static void flushallCommand(redisClient *c) {
4506 server.dirty += emptyDb();
4507 addReply(c,shared.ok);
4508 rdbSave(server.dbfilename);
4509 server.dirty++;
4510 }
4511
4512 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4513 redisSortOperation *so = zmalloc(sizeof(*so));
4514 so->type = type;
4515 so->pattern = pattern;
4516 return so;
4517 }
4518
4519 /* Return the value associated to the key with a name obtained
4520 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4521 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4522 char *p;
4523 sds spat, ssub;
4524 robj keyobj;
4525 int prefixlen, sublen, postfixlen;
4526 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4527 struct {
4528 long len;
4529 long free;
4530 char buf[REDIS_SORTKEY_MAX+1];
4531 } keyname;
4532
4533 if (subst->encoding == REDIS_ENCODING_RAW)
4534 incrRefCount(subst);
4535 else {
4536 subst = getDecodedObject(subst);
4537 }
4538
4539 spat = pattern->ptr;
4540 ssub = subst->ptr;
4541 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4542 p = strchr(spat,'*');
4543 if (!p) return NULL;
4544
4545 prefixlen = p-spat;
4546 sublen = sdslen(ssub);
4547 postfixlen = sdslen(spat)-(prefixlen+1);
4548 memcpy(keyname.buf,spat,prefixlen);
4549 memcpy(keyname.buf+prefixlen,ssub,sublen);
4550 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4551 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4552 keyname.len = prefixlen+sublen+postfixlen;
4553
4554 keyobj.refcount = 1;
4555 keyobj.type = REDIS_STRING;
4556 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4557
4558 decrRefCount(subst);
4559
4560 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4561 return lookupKeyRead(db,&keyobj);
4562 }
4563
4564 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4565 * the additional parameter is not standard but a BSD-specific we have to
4566 * pass sorting parameters via the global 'server' structure */
4567 static int sortCompare(const void *s1, const void *s2) {
4568 const redisSortObject *so1 = s1, *so2 = s2;
4569 int cmp;
4570
4571 if (!server.sort_alpha) {
4572 /* Numeric sorting. Here it's trivial as we precomputed scores */
4573 if (so1->u.score > so2->u.score) {
4574 cmp = 1;
4575 } else if (so1->u.score < so2->u.score) {
4576 cmp = -1;
4577 } else {
4578 cmp = 0;
4579 }
4580 } else {
4581 /* Alphanumeric sorting */
4582 if (server.sort_bypattern) {
4583 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4584 /* At least one compare object is NULL */
4585 if (so1->u.cmpobj == so2->u.cmpobj)
4586 cmp = 0;
4587 else if (so1->u.cmpobj == NULL)
4588 cmp = -1;
4589 else
4590 cmp = 1;
4591 } else {
4592 /* We have both the objects, use strcoll */
4593 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4594 }
4595 } else {
4596 /* Compare elements directly */
4597 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4598 so2->obj->encoding == REDIS_ENCODING_RAW) {
4599 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4600 } else {
4601 robj *dec1, *dec2;
4602
4603 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4604 so1->obj : getDecodedObject(so1->obj);
4605 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4606 so2->obj : getDecodedObject(so2->obj);
4607 cmp = strcoll(dec1->ptr,dec2->ptr);
4608 if (dec1 != so1->obj) decrRefCount(dec1);
4609 if (dec2 != so2->obj) decrRefCount(dec2);
4610 }
4611 }
4612 }
4613 return server.sort_desc ? -cmp : cmp;
4614 }
4615
4616 /* The SORT command is the most complex command in Redis. Warning: this code
4617 * is optimized for speed and a bit less for readability */
4618 static void sortCommand(redisClient *c) {
4619 list *operations;
4620 int outputlen = 0;
4621 int desc = 0, alpha = 0;
4622 int limit_start = 0, limit_count = -1, start, end;
4623 int j, dontsort = 0, vectorlen;
4624 int getop = 0; /* GET operation counter */
4625 robj *sortval, *sortby = NULL, *storekey = NULL;
4626 redisSortObject *vector; /* Resulting vector to sort */
4627
4628 /* Lookup the key to sort. It must be of the right types */
4629 sortval = lookupKeyRead(c->db,c->argv[1]);
4630 if (sortval == NULL) {
4631 addReply(c,shared.nokeyerr);
4632 return;
4633 }
4634 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4635 addReply(c,shared.wrongtypeerr);
4636 return;
4637 }
4638
4639 /* Create a list of operations to perform for every sorted element.
4640 * Operations can be GET/DEL/INCR/DECR */
4641 operations = listCreate();
4642 listSetFreeMethod(operations,zfree);
4643 j = 2;
4644
4645 /* Now we need to protect sortval incrementing its count, in the future
4646 * SORT may have options able to overwrite/delete keys during the sorting
4647 * and the sorted key itself may get destroied */
4648 incrRefCount(sortval);
4649
4650 /* The SORT command has an SQL-alike syntax, parse it */
4651 while(j < c->argc) {
4652 int leftargs = c->argc-j-1;
4653 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4654 desc = 0;
4655 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4656 desc = 1;
4657 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4658 alpha = 1;
4659 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4660 limit_start = atoi(c->argv[j+1]->ptr);
4661 limit_count = atoi(c->argv[j+2]->ptr);
4662 j+=2;
4663 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4664 storekey = c->argv[j+1];
4665 j++;
4666 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4667 sortby = c->argv[j+1];
4668 /* If the BY pattern does not contain '*', i.e. it is constant,
4669 * we don't need to sort nor to lookup the weight keys. */
4670 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4671 j++;
4672 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4673 listAddNodeTail(operations,createSortOperation(
4674 REDIS_SORT_GET,c->argv[j+1]));
4675 getop++;
4676 j++;
4677 } else {
4678 decrRefCount(sortval);
4679 listRelease(operations);
4680 addReply(c,shared.syntaxerr);
4681 return;
4682 }
4683 j++;
4684 }
4685
4686 /* Load the sorting vector with all the objects to sort */
4687 vectorlen = (sortval->type == REDIS_LIST) ?
4688 listLength((list*)sortval->ptr) :
4689 dictSize((dict*)sortval->ptr);
4690 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4691 j = 0;
4692 if (sortval->type == REDIS_LIST) {
4693 list *list = sortval->ptr;
4694 listNode *ln;
4695
4696 listRewind(list);
4697 while((ln = listYield(list))) {
4698 robj *ele = ln->value;
4699 vector[j].obj = ele;
4700 vector[j].u.score = 0;
4701 vector[j].u.cmpobj = NULL;
4702 j++;
4703 }
4704 } else {
4705 dict *set = sortval->ptr;
4706 dictIterator *di;
4707 dictEntry *setele;
4708
4709 di = dictGetIterator(set);
4710 while((setele = dictNext(di)) != NULL) {
4711 vector[j].obj = dictGetEntryKey(setele);
4712 vector[j].u.score = 0;
4713 vector[j].u.cmpobj = NULL;
4714 j++;
4715 }
4716 dictReleaseIterator(di);
4717 }
4718 assert(j == vectorlen);
4719
4720 /* Now it's time to load the right scores in the sorting vector */
4721 if (dontsort == 0) {
4722 for (j = 0; j < vectorlen; j++) {
4723 if (sortby) {
4724 robj *byval;
4725
4726 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4727 if (!byval || byval->type != REDIS_STRING) continue;
4728 if (alpha) {
4729 if (byval->encoding == REDIS_ENCODING_RAW) {
4730 vector[j].u.cmpobj = byval;
4731 incrRefCount(byval);
4732 } else {
4733 vector[j].u.cmpobj = getDecodedObject(byval);
4734 }
4735 } else {
4736 if (byval->encoding == REDIS_ENCODING_RAW) {
4737 vector[j].u.score = strtod(byval->ptr,NULL);
4738 } else {
4739 if (byval->encoding == REDIS_ENCODING_INT) {
4740 vector[j].u.score = (long)byval->ptr;
4741 } else
4742 assert(1 != 1);
4743 }
4744 }
4745 } else {
4746 if (!alpha) {
4747 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4748 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4749 else {
4750 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4751 vector[j].u.score = (long) vector[j].obj->ptr;
4752 else
4753 assert(1 != 1);
4754 }
4755 }
4756 }
4757 }
4758 }
4759
4760 /* We are ready to sort the vector... perform a bit of sanity check
4761 * on the LIMIT option too. We'll use a partial version of quicksort. */
4762 start = (limit_start < 0) ? 0 : limit_start;
4763 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4764 if (start >= vectorlen) {
4765 start = vectorlen-1;
4766 end = vectorlen-2;
4767 }
4768 if (end >= vectorlen) end = vectorlen-1;
4769
4770 if (dontsort == 0) {
4771 server.sort_desc = desc;
4772 server.sort_alpha = alpha;
4773 server.sort_bypattern = sortby ? 1 : 0;
4774 if (sortby && (start != 0 || end != vectorlen-1))
4775 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4776 else
4777 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4778 }
4779
4780 /* Send command output to the output buffer, performing the specified
4781 * GET/DEL/INCR/DECR operations if any. */
4782 outputlen = getop ? getop*(end-start+1) : end-start+1;
4783 if (storekey == NULL) {
4784 /* STORE option not specified, sent the sorting result to client */
4785 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4786 for (j = start; j <= end; j++) {
4787 listNode *ln;
4788 if (!getop) {
4789 addReplyBulkLen(c,vector[j].obj);
4790 addReply(c,vector[j].obj);
4791 addReply(c,shared.crlf);
4792 }
4793 listRewind(operations);
4794 while((ln = listYield(operations))) {
4795 redisSortOperation *sop = ln->value;
4796 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4797 vector[j].obj);
4798
4799 if (sop->type == REDIS_SORT_GET) {
4800 if (!val || val->type != REDIS_STRING) {
4801 addReply(c,shared.nullbulk);
4802 } else {
4803 addReplyBulkLen(c,val);
4804 addReply(c,val);
4805 addReply(c,shared.crlf);
4806 }
4807 } else {
4808 assert(sop->type == REDIS_SORT_GET); /* always fails */
4809 }
4810 }
4811 }
4812 } else {
4813 robj *listObject = createListObject();
4814 list *listPtr = (list*) listObject->ptr;
4815
4816 /* STORE option specified, set the sorting result as a List object */
4817 for (j = start; j <= end; j++) {
4818 listNode *ln;
4819 if (!getop) {
4820 listAddNodeTail(listPtr,vector[j].obj);
4821 incrRefCount(vector[j].obj);
4822 }
4823 listRewind(operations);
4824 while((ln = listYield(operations))) {
4825 redisSortOperation *sop = ln->value;
4826 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4827 vector[j].obj);
4828
4829 if (sop->type == REDIS_SORT_GET) {
4830 if (!val || val->type != REDIS_STRING) {
4831 listAddNodeTail(listPtr,createStringObject("",0));
4832 } else {
4833 listAddNodeTail(listPtr,val);
4834 incrRefCount(val);
4835 }
4836 } else {
4837 assert(sop->type == REDIS_SORT_GET); /* always fails */
4838 }
4839 }
4840 }
4841 if (dictReplace(c->db->dict,storekey,listObject)) {
4842 incrRefCount(storekey);
4843 }
4844 /* Note: we add 1 because the DB is dirty anyway since even if the
4845 * SORT result is empty a new key is set and maybe the old content
4846 * replaced. */
4847 server.dirty += 1+outputlen;
4848 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4849 }
4850
4851 /* Cleanup */
4852 decrRefCount(sortval);
4853 listRelease(operations);
4854 for (j = 0; j < vectorlen; j++) {
4855 if (sortby && alpha && vector[j].u.cmpobj)
4856 decrRefCount(vector[j].u.cmpobj);
4857 }
4858 zfree(vector);
4859 }
4860
4861 static void infoCommand(redisClient *c) {
4862 sds info;
4863 time_t uptime = time(NULL)-server.stat_starttime;
4864 int j;
4865
4866 info = sdscatprintf(sdsempty(),
4867 "redis_version:%s\r\n"
4868 "arch_bits:%s\r\n"
4869 "uptime_in_seconds:%d\r\n"
4870 "uptime_in_days:%d\r\n"
4871 "connected_clients:%d\r\n"
4872 "connected_slaves:%d\r\n"
4873 "used_memory:%zu\r\n"
4874 "changes_since_last_save:%lld\r\n"
4875 "bgsave_in_progress:%d\r\n"
4876 "last_save_time:%d\r\n"
4877 "total_connections_received:%lld\r\n"
4878 "total_commands_processed:%lld\r\n"
4879 "role:%s\r\n"
4880 ,REDIS_VERSION,
4881 (sizeof(long) == 8) ? "64" : "32",
4882 uptime,
4883 uptime/(3600*24),
4884 listLength(server.clients)-listLength(server.slaves),
4885 listLength(server.slaves),
4886 server.usedmemory,
4887 server.dirty,
4888 server.bgsaveinprogress,
4889 server.lastsave,
4890 server.stat_numconnections,
4891 server.stat_numcommands,
4892 server.masterhost == NULL ? "master" : "slave"
4893 );
4894 if (server.masterhost) {
4895 info = sdscatprintf(info,
4896 "master_host:%s\r\n"
4897 "master_port:%d\r\n"
4898 "master_link_status:%s\r\n"
4899 "master_last_io_seconds_ago:%d\r\n"
4900 ,server.masterhost,
4901 server.masterport,
4902 (server.replstate == REDIS_REPL_CONNECTED) ?
4903 "up" : "down",
4904 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4905 );
4906 }
4907 for (j = 0; j < server.dbnum; j++) {
4908 long long keys, vkeys;
4909
4910 keys = dictSize(server.db[j].dict);
4911 vkeys = dictSize(server.db[j].expires);
4912 if (keys || vkeys) {
4913 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4914 j, keys, vkeys);
4915 }
4916 }
4917 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4918 addReplySds(c,info);
4919 addReply(c,shared.crlf);
4920 }
4921
4922 static void monitorCommand(redisClient *c) {
4923 /* ignore MONITOR if aleady slave or in monitor mode */
4924 if (c->flags & REDIS_SLAVE) return;
4925
4926 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4927 c->slaveseldb = 0;
4928 listAddNodeTail(server.monitors,c);
4929 addReply(c,shared.ok);
4930 }
4931
4932 /* ================================= Expire ================================= */
4933 static int removeExpire(redisDb *db, robj *key) {
4934 if (dictDelete(db->expires,key) == DICT_OK) {
4935 return 1;
4936 } else {
4937 return 0;
4938 }
4939 }
4940
4941 static int setExpire(redisDb *db, robj *key, time_t when) {
4942 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4943 return 0;
4944 } else {
4945 incrRefCount(key);
4946 return 1;
4947 }
4948 }
4949
4950 /* Return the expire time of the specified key, or -1 if no expire
4951 * is associated with this key (i.e. the key is non volatile) */
4952 static time_t getExpire(redisDb *db, robj *key) {
4953 dictEntry *de;
4954
4955 /* No expire? return ASAP */
4956 if (dictSize(db->expires) == 0 ||
4957 (de = dictFind(db->expires,key)) == NULL) return -1;
4958
4959 return (time_t) dictGetEntryVal(de);
4960 }
4961
4962 static int expireIfNeeded(redisDb *db, robj *key) {
4963 time_t when;
4964 dictEntry *de;
4965
4966 /* No expire? return ASAP */
4967 if (dictSize(db->expires) == 0 ||
4968 (de = dictFind(db->expires,key)) == NULL) return 0;
4969
4970 /* Lookup the expire */
4971 when = (time_t) dictGetEntryVal(de);
4972 if (time(NULL) <= when) return 0;
4973
4974 /* Delete the key */
4975 dictDelete(db->expires,key);
4976 return dictDelete(db->dict,key) == DICT_OK;
4977 }
4978
4979 static int deleteIfVolatile(redisDb *db, robj *key) {
4980 dictEntry *de;
4981
4982 /* No expire? return ASAP */
4983 if (dictSize(db->expires) == 0 ||
4984 (de = dictFind(db->expires,key)) == NULL) return 0;
4985
4986 /* Delete the key */
4987 server.dirty++;
4988 dictDelete(db->expires,key);
4989 return dictDelete(db->dict,key) == DICT_OK;
4990 }
4991
4992 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4993 dictEntry *de;
4994
4995 de = dictFind(c->db->dict,key);
4996 if (de == NULL) {
4997 addReply(c,shared.czero);
4998 return;
4999 }
5000 if (seconds < 0) {
5001 if (deleteKey(c->db,key)) server.dirty++;
5002 addReply(c, shared.cone);
5003 return;
5004 } else {
5005 time_t when = time(NULL)+seconds;
5006 if (setExpire(c->db,key,when)) {
5007 addReply(c,shared.cone);
5008 server.dirty++;
5009 } else {
5010 addReply(c,shared.czero);
5011 }
5012 return;
5013 }
5014 }
5015
5016 static void expireCommand(redisClient *c) {
5017 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5018 }
5019
5020 static void expireatCommand(redisClient *c) {
5021 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5022 }
5023
5024 static void ttlCommand(redisClient *c) {
5025 time_t expire;
5026 int ttl = -1;
5027
5028 expire = getExpire(c->db,c->argv[1]);
5029 if (expire != -1) {
5030 ttl = (int) (expire-time(NULL));
5031 if (ttl < 0) ttl = -1;
5032 }
5033 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5034 }
5035
5036 static void msetGenericCommand(redisClient *c, int nx) {
5037 int j;
5038
5039 if ((c->argc % 2) == 0) {
5040 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
5041 return;
5042 }
5043 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
5044 * set nothing at all if at least one already key exists. */
5045 if (nx) {
5046 for (j = 1; j < c->argc; j += 2) {
5047 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
5048 addReply(c, shared.czero);
5049 return;
5050 }
5051 }
5052 }
5053
5054 for (j = 1; j < c->argc; j += 2) {
5055 int retval;
5056
5057 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
5058 if (retval == DICT_ERR) {
5059 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
5060 incrRefCount(c->argv[j+1]);
5061 } else {
5062 incrRefCount(c->argv[j]);
5063 incrRefCount(c->argv[j+1]);
5064 }
5065 removeExpire(c->db,c->argv[j]);
5066 }
5067 server.dirty += (c->argc-1)/2;
5068 addReply(c, nx ? shared.cone : shared.ok);
5069 }
5070
5071 static void msetCommand(redisClient *c) {
5072 msetGenericCommand(c,0);
5073 }
5074
5075 static void msetnxCommand(redisClient *c) {
5076 msetGenericCommand(c,1);
5077 }
5078
5079 /* =============================== Replication ============================= */
5080
5081 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5082 ssize_t nwritten, ret = size;
5083 time_t start = time(NULL);
5084
5085 timeout++;
5086 while(size) {
5087 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5088 nwritten = write(fd,ptr,size);
5089 if (nwritten == -1) return -1;
5090 ptr += nwritten;
5091 size -= nwritten;
5092 }
5093 if ((time(NULL)-start) > timeout) {
5094 errno = ETIMEDOUT;
5095 return -1;
5096 }
5097 }
5098 return ret;
5099 }
5100
5101 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5102 ssize_t nread, totread = 0;
5103 time_t start = time(NULL);
5104
5105 timeout++;
5106 while(size) {
5107 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5108 nread = read(fd,ptr,size);
5109 if (nread == -1) return -1;
5110 ptr += nread;
5111 size -= nread;
5112 totread += nread;
5113 }
5114 if ((time(NULL)-start) > timeout) {
5115 errno = ETIMEDOUT;
5116 return -1;
5117 }
5118 }
5119 return totread;
5120 }
5121
5122 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5123 ssize_t nread = 0;
5124
5125 size--;
5126 while(size) {
5127 char c;
5128
5129 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5130 if (c == '\n') {
5131 *ptr = '\0';
5132 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5133 return nread;
5134 } else {
5135 *ptr++ = c;
5136 *ptr = '\0';
5137 nread++;
5138 }
5139 }
5140 return nread;
5141 }
5142
5143 static void syncCommand(redisClient *c) {
5144 /* ignore SYNC if aleady slave or in monitor mode */
5145 if (c->flags & REDIS_SLAVE) return;
5146
5147 /* SYNC can't be issued when the server has pending data to send to
5148 * the client about already issued commands. We need a fresh reply
5149 * buffer registering the differences between the BGSAVE and the current
5150 * dataset, so that we can copy to other slaves if needed. */
5151 if (listLength(c->reply) != 0) {
5152 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5153 return;
5154 }
5155
5156 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5157 /* Here we need to check if there is a background saving operation
5158 * in progress, or if it is required to start one */
5159 if (server.bgsaveinprogress) {
5160 /* Ok a background save is in progress. Let's check if it is a good
5161 * one for replication, i.e. if there is another slave that is
5162 * registering differences since the server forked to save */
5163 redisClient *slave;
5164 listNode *ln;
5165
5166 listRewind(server.slaves);
5167 while((ln = listYield(server.slaves))) {
5168 slave = ln->value;
5169 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5170 }
5171 if (ln) {
5172 /* Perfect, the server is already registering differences for
5173 * another slave. Set the right state, and copy the buffer. */
5174 listRelease(c->reply);
5175 c->reply = listDup(slave->reply);
5176 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5177 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5178 } else {
5179 /* No way, we need to wait for the next BGSAVE in order to
5180 * register differences */
5181 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5182 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5183 }
5184 } else {
5185 /* Ok we don't have a BGSAVE in progress, let's start one */
5186 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5187 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5188 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5189 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5190 return;
5191 }
5192 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5193 }
5194 c->repldbfd = -1;
5195 c->flags |= REDIS_SLAVE;
5196 c->slaveseldb = 0;
5197 listAddNodeTail(server.slaves,c);
5198 return;
5199 }
5200
5201 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5202 redisClient *slave = privdata;
5203 REDIS_NOTUSED(el);
5204 REDIS_NOTUSED(mask);
5205 char buf[REDIS_IOBUF_LEN];
5206 ssize_t nwritten, buflen;
5207
5208 if (slave->repldboff == 0) {
5209 /* Write the bulk write count before to transfer the DB. In theory here
5210 * we don't know how much room there is in the output buffer of the
5211 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5212 * operations) will never be smaller than the few bytes we need. */
5213 sds bulkcount;
5214
5215 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5216 slave->repldbsize);
5217 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5218 {
5219 sdsfree(bulkcount);
5220 freeClient(slave);
5221 return;
5222 }
5223 sdsfree(bulkcount);
5224 }
5225 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5226 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5227 if (buflen <= 0) {
5228 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5229 (buflen == 0) ? "premature EOF" : strerror(errno));
5230 freeClient(slave);
5231 return;
5232 }
5233 if ((nwritten = write(fd,buf,buflen)) == -1) {
5234 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5235 strerror(errno));
5236 freeClient(slave);
5237 return;
5238 }
5239 slave->repldboff += nwritten;
5240 if (slave->repldboff == slave->repldbsize) {
5241 close(slave->repldbfd);
5242 slave->repldbfd = -1;
5243 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5244 slave->replstate = REDIS_REPL_ONLINE;
5245 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5246 sendReplyToClient, slave, NULL) == AE_ERR) {
5247 freeClient(slave);
5248 return;
5249 }
5250 addReplySds(slave,sdsempty());
5251 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5252 }
5253 }
5254
5255 /* This function is called at the end of every backgrond saving.
5256 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5257 * otherwise REDIS_ERR is passed to the function.
5258 *
5259 * The goal of this function is to handle slaves waiting for a successful
5260 * background saving in order to perform non-blocking synchronization. */
5261 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5262 listNode *ln;
5263 int startbgsave = 0;
5264
5265 listRewind(server.slaves);
5266 while((ln = listYield(server.slaves))) {
5267 redisClient *slave = ln->value;
5268
5269 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5270 startbgsave = 1;
5271 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5272 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5273 struct redis_stat buf;
5274
5275 if (bgsaveerr != REDIS_OK) {
5276 freeClient(slave);
5277 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5278 continue;
5279 }
5280 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5281 redis_fstat(slave->repldbfd,&buf) == -1) {
5282 freeClient(slave);
5283 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5284 continue;
5285 }
5286 slave->repldboff = 0;
5287 slave->repldbsize = buf.st_size;
5288 slave->replstate = REDIS_REPL_SEND_BULK;
5289 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5290 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5291 freeClient(slave);
5292 continue;
5293 }
5294 }
5295 }
5296 if (startbgsave) {
5297 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5298 listRewind(server.slaves);
5299 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5300 while((ln = listYield(server.slaves))) {
5301 redisClient *slave = ln->value;
5302
5303 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5304 freeClient(slave);
5305 }
5306 }
5307 }
5308 }
5309
5310 static int syncWithMaster(void) {
5311 char buf[1024], tmpfile[256], authcmd[1024];
5312 int dumpsize;
5313 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5314 int dfd;
5315
5316 if (fd == -1) {
5317 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5318 strerror(errno));
5319 return REDIS_ERR;
5320 }
5321
5322 /* AUTH with the master if required. */
5323 if(server.masterauth) {
5324 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5325 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5326 close(fd);
5327 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5328 strerror(errno));
5329 return REDIS_ERR;
5330 }
5331 /* Read the AUTH result. */
5332 if (syncReadLine(fd,buf,1024,3600) == -1) {
5333 close(fd);
5334 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5335 strerror(errno));
5336 return REDIS_ERR;
5337 }
5338 if (buf[0] != '+') {
5339 close(fd);
5340 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5341 return REDIS_ERR;
5342 }
5343 }
5344
5345 /* Issue the SYNC command */
5346 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5347 close(fd);
5348 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5349 strerror(errno));
5350 return REDIS_ERR;
5351 }
5352 /* Read the bulk write count */
5353 if (syncReadLine(fd,buf,1024,3600) == -1) {
5354 close(fd);
5355 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5356 strerror(errno));
5357 return REDIS_ERR;
5358 }
5359 if (buf[0] != '$') {
5360 close(fd);
5361 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5362 return REDIS_ERR;
5363 }
5364 dumpsize = atoi(buf+1);
5365 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5366 /* Read the bulk write data on a temp file */
5367 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5368 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5369 if (dfd == -1) {
5370 close(fd);
5371 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5372 return REDIS_ERR;
5373 }
5374 while(dumpsize) {
5375 int nread, nwritten;
5376
5377 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5378 if (nread == -1) {
5379 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5380 strerror(errno));
5381 close(fd);
5382 close(dfd);
5383 return REDIS_ERR;
5384 }
5385 nwritten = write(dfd,buf,nread);
5386 if (nwritten == -1) {
5387 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5388 close(fd);
5389 close(dfd);
5390 return REDIS_ERR;
5391 }
5392 dumpsize -= nread;
5393 }
5394 close(dfd);
5395 if (rename(tmpfile,server.dbfilename) == -1) {
5396 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5397 unlink(tmpfile);
5398 close(fd);
5399 return REDIS_ERR;
5400 }
5401 emptyDb();
5402 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5403 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5404 close(fd);
5405 return REDIS_ERR;
5406 }
5407 server.master = createClient(fd);
5408 server.master->flags |= REDIS_MASTER;
5409 server.replstate = REDIS_REPL_CONNECTED;
5410 return REDIS_OK;
5411 }
5412
5413 static void slaveofCommand(redisClient *c) {
5414 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5415 !strcasecmp(c->argv[2]->ptr,"one")) {
5416 if (server.masterhost) {
5417 sdsfree(server.masterhost);
5418 server.masterhost = NULL;
5419 if (server.master) freeClient(server.master);
5420 server.replstate = REDIS_REPL_NONE;
5421 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5422 }
5423 } else {
5424 sdsfree(server.masterhost);
5425 server.masterhost = sdsdup(c->argv[1]->ptr);
5426 server.masterport = atoi(c->argv[2]->ptr);
5427 if (server.master) freeClient(server.master);
5428 server.replstate = REDIS_REPL_CONNECT;
5429 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5430 server.masterhost, server.masterport);
5431 }
5432 addReply(c,shared.ok);
5433 }
5434
5435 /* ============================ Maxmemory directive ======================== */
5436
5437 /* This function gets called when 'maxmemory' is set on the config file to limit
5438 * the max memory used by the server, and we are out of memory.
5439 * This function will try to, in order:
5440 *
5441 * - Free objects from the free list
5442 * - Try to remove keys with an EXPIRE set
5443 *
5444 * It is not possible to free enough memory to reach used-memory < maxmemory
5445 * the server will start refusing commands that will enlarge even more the
5446 * memory usage.
5447 */
5448 static void freeMemoryIfNeeded(void) {
5449 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5450 if (listLength(server.objfreelist)) {
5451 robj *o;
5452
5453 listNode *head = listFirst(server.objfreelist);
5454 o = listNodeValue(head);
5455 listDelNode(server.objfreelist,head);
5456 zfree(o);
5457 } else {
5458 int j, k, freed = 0;
5459
5460 for (j = 0; j < server.dbnum; j++) {
5461 int minttl = -1;
5462 robj *minkey = NULL;
5463 struct dictEntry *de;
5464
5465 if (dictSize(server.db[j].expires)) {
5466 freed = 1;
5467 /* From a sample of three keys drop the one nearest to
5468 * the natural expire */
5469 for (k = 0; k < 3; k++) {
5470 time_t t;
5471
5472 de = dictGetRandomKey(server.db[j].expires);
5473 t = (time_t) dictGetEntryVal(de);
5474 if (minttl == -1 || t < minttl) {
5475 minkey = dictGetEntryKey(de);
5476 minttl = t;
5477 }
5478 }
5479 deleteKey(server.db+j,minkey);
5480 }
5481 }
5482 if (!freed) return; /* nothing to free... */
5483 }
5484 }
5485 }
5486
5487 /* ============================== Append Only file ========================== */
5488
5489 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5490 sds buf = sdsempty();
5491 int j;
5492 ssize_t nwritten;
5493 time_t now;
5494 robj *tmpargv[3];
5495
5496 /* The DB this command was targetting is not the same as the last command
5497 * we appendend. To issue a SELECT command is needed. */
5498 if (dictid != server.appendseldb) {
5499 char seldb[64];
5500
5501 snprintf(seldb,sizeof(seldb),"%d",dictid);
5502 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5503 strlen(seldb),seldb);
5504 server.appendseldb = dictid;
5505 }
5506
5507 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5508 * EXPIREs into EXPIREATs calls */
5509 if (cmd->proc == expireCommand) {
5510 long when;
5511
5512 tmpargv[0] = createStringObject("EXPIREAT",8);
5513 tmpargv[1] = argv[1];
5514 incrRefCount(argv[1]);
5515 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5516 tmpargv[2] = createObject(REDIS_STRING,
5517 sdscatprintf(sdsempty(),"%ld",when));
5518 argv = tmpargv;
5519 }
5520
5521 /* Append the actual command */
5522 buf = sdscatprintf(buf,"*%d\r\n",argc);
5523 for (j = 0; j < argc; j++) {
5524 robj *o = argv[j];
5525
5526 if (o->encoding != REDIS_ENCODING_RAW)
5527 o = getDecodedObject(o);
5528 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5529 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5530 buf = sdscatlen(buf,"\r\n",2);
5531 if (o != argv[j])
5532 decrRefCount(o);
5533 }
5534
5535 /* Free the objects from the modified argv for EXPIREAT */
5536 if (cmd->proc == expireCommand) {
5537 for (j = 0; j < 3; j++)
5538 decrRefCount(argv[j]);
5539 }
5540
5541 /* We want to perform a single write. This should be guaranteed atomic
5542 * at least if the filesystem we are writing is a real physical one.
5543 * While this will save us against the server being killed I don't think
5544 * there is much to do about the whole server stopping for power problems
5545 * or alike */
5546 nwritten = write(server.appendfd,buf,sdslen(buf));
5547 if (nwritten != (signed)sdslen(buf)) {
5548 /* Ooops, we are in troubles. The best thing to do for now is
5549 * to simply exit instead to give the illusion that everything is
5550 * working as expected. */
5551 if (nwritten == -1) {
5552 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5553 } else {
5554 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5555 }
5556 exit(1);
5557 }
5558 now = time(NULL);
5559 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5560 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5561 now-server.lastfsync > 1))
5562 {
5563 fsync(server.appendfd); /* Let's try to get this data on the disk */
5564 server.lastfsync = now;
5565 }
5566 }
5567
5568 /* In Redis commands are always executed in the context of a client, so in
5569 * order to load the append only file we need to create a fake client. */
5570 static struct redisClient *createFakeClient(void) {
5571 struct redisClient *c = zmalloc(sizeof(*c));
5572
5573 selectDb(c,0);
5574 c->fd = -1;
5575 c->querybuf = sdsempty();
5576 c->argc = 0;
5577 c->argv = NULL;
5578 c->flags = 0;
5579 /* We set the fake client as a slave waiting for the synchronization
5580 * so that Redis will not try to send replies to this client. */
5581 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5582 c->reply = listCreate();
5583 listSetFreeMethod(c->reply,decrRefCount);
5584 listSetDupMethod(c->reply,dupClientReplyValue);
5585 return c;
5586 }
5587
5588 static void freeFakeClient(struct redisClient *c) {
5589 sdsfree(c->querybuf);
5590 listRelease(c->reply);
5591 zfree(c);
5592 }
5593
5594 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5595 * error (the append only file is zero-length) REDIS_ERR is returned. On
5596 * fatal error an error message is logged and the program exists. */
5597 int loadAppendOnlyFile(char *filename) {
5598 struct redisClient *fakeClient;
5599 FILE *fp = fopen(filename,"r");
5600 struct redis_stat sb;
5601
5602 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5603 return REDIS_ERR;
5604
5605 if (fp == NULL) {
5606 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5607 exit(1);
5608 }
5609
5610 fakeClient = createFakeClient();
5611 while(1) {
5612 int argc, j;
5613 unsigned long len;
5614 robj **argv;
5615 char buf[128];
5616 sds argsds;
5617 struct redisCommand *cmd;
5618
5619 if (fgets(buf,sizeof(buf),fp) == NULL) {
5620 if (feof(fp))
5621 break;
5622 else
5623 goto readerr;
5624 }
5625 if (buf[0] != '*') goto fmterr;
5626 argc = atoi(buf+1);
5627 argv = zmalloc(sizeof(robj*)*argc);
5628 for (j = 0; j < argc; j++) {
5629 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5630 if (buf[0] != '$') goto fmterr;
5631 len = strtol(buf+1,NULL,10);
5632 argsds = sdsnewlen(NULL,len);
5633 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5634 argv[j] = createObject(REDIS_STRING,argsds);
5635 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5636 }
5637
5638 /* Command lookup */
5639 cmd = lookupCommand(argv[0]->ptr);
5640 if (!cmd) {
5641 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5642 exit(1);
5643 }
5644 /* Try object sharing and encoding */
5645 if (server.shareobjects) {
5646 int j;
5647 for(j = 1; j < argc; j++)
5648 argv[j] = tryObjectSharing(argv[j]);
5649 }
5650 if (cmd->flags & REDIS_CMD_BULK)
5651 tryObjectEncoding(argv[argc-1]);
5652 /* Run the command in the context of a fake client */
5653 fakeClient->argc = argc;
5654 fakeClient->argv = argv;
5655 cmd->proc(fakeClient);
5656 /* Discard the reply objects list from the fake client */
5657 while(listLength(fakeClient->reply))
5658 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5659 /* Clean up, ready for the next command */
5660 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5661 zfree(argv);
5662 }
5663 fclose(fp);
5664 freeFakeClient(fakeClient);
5665 return REDIS_OK;
5666
5667 readerr:
5668 if (feof(fp)) {
5669 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5670 } else {
5671 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5672 }
5673 exit(1);
5674 fmterr:
5675 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5676 exit(1);
5677 }
5678
5679 /* ================================= Debugging ============================== */
5680
5681 static void debugCommand(redisClient *c) {
5682 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5683 *((char*)-1) = 'x';
5684 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5685 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5686 robj *key, *val;
5687
5688 if (!de) {
5689 addReply(c,shared.nokeyerr);
5690 return;
5691 }
5692 key = dictGetEntryKey(de);
5693 val = dictGetEntryVal(de);
5694 addReplySds(c,sdscatprintf(sdsempty(),
5695 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5696 key, key->refcount, val, val->refcount, val->encoding));
5697 } else {
5698 addReplySds(c,sdsnew(
5699 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5700 }
5701 }
5702
5703 /* =================================== Main! ================================ */
5704
5705 #ifdef __linux__
5706 int linuxOvercommitMemoryValue(void) {
5707 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5708 char buf[64];
5709
5710 if (!fp) return -1;
5711 if (fgets(buf,64,fp) == NULL) {
5712 fclose(fp);
5713 return -1;
5714 }
5715 fclose(fp);
5716
5717 return atoi(buf);
5718 }
5719
5720 void linuxOvercommitMemoryWarning(void) {
5721 if (linuxOvercommitMemoryValue() == 0) {
5722 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5723 }
5724 }
5725 #endif /* __linux__ */
5726
5727 static void daemonize(void) {
5728 int fd;
5729 FILE *fp;
5730
5731 if (fork() != 0) exit(0); /* parent exits */
5732 setsid(); /* create a new session */
5733
5734 /* Every output goes to /dev/null. If Redis is daemonized but
5735 * the 'logfile' is set to 'stdout' in the configuration file
5736 * it will not log at all. */
5737 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5738 dup2(fd, STDIN_FILENO);
5739 dup2(fd, STDOUT_FILENO);
5740 dup2(fd, STDERR_FILENO);
5741 if (fd > STDERR_FILENO) close(fd);
5742 }
5743 /* Try to write the pid file */
5744 fp = fopen(server.pidfile,"w");
5745 if (fp) {
5746 fprintf(fp,"%d\n",getpid());
5747 fclose(fp);
5748 }
5749 }
5750
5751 int main(int argc, char **argv) {
5752 initServerConfig();
5753 if (argc == 2) {
5754 resetServerSaveParams();
5755 loadServerConfig(argv[1]);
5756 } else if (argc > 2) {
5757 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5758 exit(1);
5759 } else {
5760 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5761 }
5762 initServer();
5763 if (server.daemonize) daemonize();
5764 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5765 #ifdef __linux__
5766 linuxOvercommitMemoryWarning();
5767 #endif
5768 if (server.appendonly) {
5769 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5770 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5771 } else {
5772 if (rdbLoad(server.dbfilename) == REDIS_OK)
5773 redisLog(REDIS_NOTICE,"DB loaded from disk");
5774 }
5775 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5776 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5777 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5778 aeMain(server.el);
5779 aeDeleteEventLoop(server.el);
5780 return 0;
5781 }
5782
5783 /* ============================= Backtrace support ========================= */
5784
5785 #ifdef HAVE_BACKTRACE
5786 static char *findFuncName(void *pointer, unsigned long *offset);
5787
5788 static void *getMcontextEip(ucontext_t *uc) {
5789 #if defined(__FreeBSD__)
5790 return (void*) uc->uc_mcontext.mc_eip;
5791 #elif defined(__dietlibc__)
5792 return (void*) uc->uc_mcontext.eip;
5793 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5794 return (void*) uc->uc_mcontext->__ss.__eip;
5795 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5796 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5797 return (void*) uc->uc_mcontext->__ss.__rip;
5798 #else
5799 return (void*) uc->uc_mcontext->__ss.__eip;
5800 #endif
5801 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5802 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5803 #elif defined(__ia64__) /* Linux IA64 */
5804 return (void*) uc->uc_mcontext.sc_ip;
5805 #else
5806 return NULL;
5807 #endif
5808 }
5809
5810 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5811 void *trace[100];
5812 char **messages = NULL;
5813 int i, trace_size = 0;
5814 unsigned long offset=0;
5815 time_t uptime = time(NULL)-server.stat_starttime;
5816 ucontext_t *uc = (ucontext_t*) secret;
5817 REDIS_NOTUSED(info);
5818
5819 redisLog(REDIS_WARNING,
5820 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5821 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5822 "redis_version:%s; "
5823 "uptime_in_seconds:%d; "
5824 "connected_clients:%d; "
5825 "connected_slaves:%d; "
5826 "used_memory:%zu; "
5827 "changes_since_last_save:%lld; "
5828 "bgsave_in_progress:%d; "
5829 "last_save_time:%d; "
5830 "total_connections_received:%lld; "
5831 "total_commands_processed:%lld; "
5832 "role:%s;"
5833 ,REDIS_VERSION,
5834 uptime,
5835 listLength(server.clients)-listLength(server.slaves),
5836 listLength(server.slaves),
5837 server.usedmemory,
5838 server.dirty,
5839 server.bgsaveinprogress,
5840 server.lastsave,
5841 server.stat_numconnections,
5842 server.stat_numcommands,
5843 server.masterhost == NULL ? "master" : "slave"
5844 ));
5845
5846 trace_size = backtrace(trace, 100);
5847 /* overwrite sigaction with caller's address */
5848 if (getMcontextEip(uc) != NULL) {
5849 trace[1] = getMcontextEip(uc);
5850 }
5851 messages = backtrace_symbols(trace, trace_size);
5852
5853 for (i=1; i<trace_size; ++i) {
5854 char *fn = findFuncName(trace[i], &offset), *p;
5855
5856 p = strchr(messages[i],'+');
5857 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5858 redisLog(REDIS_WARNING,"%s", messages[i]);
5859 } else {
5860 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5861 }
5862 }
5863 free(messages);
5864 exit(0);
5865 }
5866
5867 static void setupSigSegvAction(void) {
5868 struct sigaction act;
5869
5870 sigemptyset (&act.sa_mask);
5871 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5872 * is used. Otherwise, sa_handler is used */
5873 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5874 act.sa_sigaction = segvHandler;
5875 sigaction (SIGSEGV, &act, NULL);
5876 sigaction (SIGBUS, &act, NULL);
5877 sigaction (SIGFPE, &act, NULL);
5878 sigaction (SIGILL, &act, NULL);
5879 sigaction (SIGBUS, &act, NULL);
5880 return;
5881 }
5882
5883 #include "staticsymbols.h"
5884 /* This function try to convert a pointer into a function name. It's used in
5885 * oreder to provide a backtrace under segmentation fault that's able to
5886 * display functions declared as static (otherwise the backtrace is useless). */
5887 static char *findFuncName(void *pointer, unsigned long *offset){
5888 int i, ret = -1;
5889 unsigned long off, minoff = 0;
5890
5891 /* Try to match against the Symbol with the smallest offset */
5892 for (i=0; symsTable[i].pointer; i++) {
5893 unsigned long lp = (unsigned long) pointer;
5894
5895 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5896 off=lp-symsTable[i].pointer;
5897 if (ret < 0 || off < minoff) {
5898 minoff=off;
5899 ret=i;
5900 }
5901 }
5902 }
5903 if (ret == -1) return NULL;
5904 *offset = minoff;
5905 return symsTable[ret].name;
5906 }
5907 #else /* HAVE_BACKTRACE */
5908 static void setupSigSegvAction(void) {
5909 }
5910 #endif /* HAVE_BACKTRACE */
5911
5912
5913
5914 /* The End */
5915
5916
5917