]> git.saurik.com Git - redis.git/blob - redis.c
daf2c9bcee6eef51bbecc6c2a48a92ab543baf14
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 int bgsaveinprogress;
279 pid_t bgsavechildpid;
280 struct saveparam *saveparams;
281 int saveparamslen;
282 char *logfile;
283 char *bindaddr;
284 char *dbfilename;
285 char *appendfilename;
286 char *requirepass;
287 int shareobjects;
288 /* Replication related */
289 int isslave;
290 char *masterauth;
291 char *masterhost;
292 int masterport;
293 redisClient *master; /* client that is master for this slave */
294 int replstate;
295 unsigned int maxclients;
296 unsigned long maxmemory;
297 /* Sort parameters - qsort_r() is only available under BSD so we
298 * have to take this state global, in order to pass it to sortCompare() */
299 int sort_desc;
300 int sort_alpha;
301 int sort_bypattern;
302 };
303
304 typedef void redisCommandProc(redisClient *c);
305 struct redisCommand {
306 char *name;
307 redisCommandProc *proc;
308 int arity;
309 int flags;
310 };
311
312 struct redisFunctionSym {
313 char *name;
314 unsigned long pointer;
315 };
316
317 typedef struct _redisSortObject {
318 robj *obj;
319 union {
320 double score;
321 robj *cmpobj;
322 } u;
323 } redisSortObject;
324
325 typedef struct _redisSortOperation {
326 int type;
327 robj *pattern;
328 } redisSortOperation;
329
330 /* ZSETs use a specialized version of Skiplists */
331
332 typedef struct zskiplistNode {
333 struct zskiplistNode **forward;
334 struct zskiplistNode *backward;
335 double score;
336 robj *obj;
337 } zskiplistNode;
338
339 typedef struct zskiplist {
340 struct zskiplistNode *header, *tail;
341 unsigned long length;
342 int level;
343 } zskiplist;
344
345 typedef struct zset {
346 dict *dict;
347 zskiplist *zsl;
348 } zset;
349
350 /* Our shared "common" objects */
351
352 struct sharedObjectsStruct {
353 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
354 *colon, *nullbulk, *nullmultibulk,
355 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
356 *outofrangeerr, *plus,
357 *select0, *select1, *select2, *select3, *select4,
358 *select5, *select6, *select7, *select8, *select9;
359 } shared;
360
361 /* Global vars that are actally used as constants. The following double
362 * values are used for double on-disk serialization, and are initialized
363 * at runtime to avoid strange compiler optimizations. */
364
365 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
366
367 /*================================ Prototypes =============================== */
368
369 static void freeStringObject(robj *o);
370 static void freeListObject(robj *o);
371 static void freeSetObject(robj *o);
372 static void decrRefCount(void *o);
373 static robj *createObject(int type, void *ptr);
374 static void freeClient(redisClient *c);
375 static int rdbLoad(char *filename);
376 static void addReply(redisClient *c, robj *obj);
377 static void addReplySds(redisClient *c, sds s);
378 static void incrRefCount(robj *o);
379 static int rdbSaveBackground(char *filename);
380 static robj *createStringObject(char *ptr, size_t len);
381 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
382 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static int syncWithMaster(void);
384 static robj *tryObjectSharing(robj *o);
385 static int tryObjectEncoding(robj *o);
386 static robj *getDecodedObject(const robj *o);
387 static int removeExpire(redisDb *db, robj *key);
388 static int expireIfNeeded(redisDb *db, robj *key);
389 static int deleteIfVolatile(redisDb *db, robj *key);
390 static int deleteKey(redisDb *db, robj *key);
391 static time_t getExpire(redisDb *db, robj *key);
392 static int setExpire(redisDb *db, robj *key, time_t when);
393 static void updateSlavesWaitingBgsave(int bgsaveerr);
394 static void freeMemoryIfNeeded(void);
395 static int processCommand(redisClient *c);
396 static void setupSigSegvAction(void);
397 static void rdbRemoveTempFile(pid_t childpid);
398 static size_t stringObjectLen(robj *o);
399 static void processInputBuffer(redisClient *c);
400 static zskiplist *zslCreate(void);
401 static void zslFree(zskiplist *zsl);
402 static void zslInsert(zskiplist *zsl, double score, robj *obj);
403 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
404
405 static void authCommand(redisClient *c);
406 static void pingCommand(redisClient *c);
407 static void echoCommand(redisClient *c);
408 static void setCommand(redisClient *c);
409 static void setnxCommand(redisClient *c);
410 static void getCommand(redisClient *c);
411 static void delCommand(redisClient *c);
412 static void existsCommand(redisClient *c);
413 static void incrCommand(redisClient *c);
414 static void decrCommand(redisClient *c);
415 static void incrbyCommand(redisClient *c);
416 static void decrbyCommand(redisClient *c);
417 static void selectCommand(redisClient *c);
418 static void randomkeyCommand(redisClient *c);
419 static void keysCommand(redisClient *c);
420 static void dbsizeCommand(redisClient *c);
421 static void lastsaveCommand(redisClient *c);
422 static void saveCommand(redisClient *c);
423 static void bgsaveCommand(redisClient *c);
424 static void shutdownCommand(redisClient *c);
425 static void moveCommand(redisClient *c);
426 static void renameCommand(redisClient *c);
427 static void renamenxCommand(redisClient *c);
428 static void lpushCommand(redisClient *c);
429 static void rpushCommand(redisClient *c);
430 static void lpopCommand(redisClient *c);
431 static void rpopCommand(redisClient *c);
432 static void llenCommand(redisClient *c);
433 static void lindexCommand(redisClient *c);
434 static void lrangeCommand(redisClient *c);
435 static void ltrimCommand(redisClient *c);
436 static void typeCommand(redisClient *c);
437 static void lsetCommand(redisClient *c);
438 static void saddCommand(redisClient *c);
439 static void sremCommand(redisClient *c);
440 static void smoveCommand(redisClient *c);
441 static void sismemberCommand(redisClient *c);
442 static void scardCommand(redisClient *c);
443 static void spopCommand(redisClient *c);
444 static void srandmemberCommand(redisClient *c);
445 static void sinterCommand(redisClient *c);
446 static void sinterstoreCommand(redisClient *c);
447 static void sunionCommand(redisClient *c);
448 static void sunionstoreCommand(redisClient *c);
449 static void sdiffCommand(redisClient *c);
450 static void sdiffstoreCommand(redisClient *c);
451 static void syncCommand(redisClient *c);
452 static void flushdbCommand(redisClient *c);
453 static void flushallCommand(redisClient *c);
454 static void sortCommand(redisClient *c);
455 static void lremCommand(redisClient *c);
456 static void rpoplpushcommand(redisClient *c);
457 static void infoCommand(redisClient *c);
458 static void mgetCommand(redisClient *c);
459 static void monitorCommand(redisClient *c);
460 static void expireCommand(redisClient *c);
461 static void expireatCommand(redisClient *c);
462 static void getsetCommand(redisClient *c);
463 static void ttlCommand(redisClient *c);
464 static void slaveofCommand(redisClient *c);
465 static void debugCommand(redisClient *c);
466 static void msetCommand(redisClient *c);
467 static void msetnxCommand(redisClient *c);
468 static void zaddCommand(redisClient *c);
469 static void zrangeCommand(redisClient *c);
470 static void zrangebyscoreCommand(redisClient *c);
471 static void zrevrangeCommand(redisClient *c);
472 static void zcardCommand(redisClient *c);
473 static void zremCommand(redisClient *c);
474 static void zscoreCommand(redisClient *c);
475 static void zremrangebyscoreCommand(redisClient *c);
476
477 /*================================= Globals ================================= */
478
479 /* Global vars */
480 static struct redisServer server; /* server global state */
481 static struct redisCommand cmdTable[] = {
482 {"get",getCommand,2,REDIS_CMD_INLINE},
483 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"del",delCommand,-2,REDIS_CMD_INLINE},
486 {"exists",existsCommand,2,REDIS_CMD_INLINE},
487 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
490 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
492 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
493 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
494 {"llen",llenCommand,2,REDIS_CMD_INLINE},
495 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
496 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
498 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
499 {"lrem",lremCommand,4,REDIS_CMD_BULK},
500 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
501 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"srem",sremCommand,3,REDIS_CMD_BULK},
503 {"smove",smoveCommand,4,REDIS_CMD_BULK},
504 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
505 {"scard",scardCommand,2,REDIS_CMD_INLINE},
506 {"spop",spopCommand,2,REDIS_CMD_INLINE},
507 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
508 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
511 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
512 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
515 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"zrem",zremCommand,3,REDIS_CMD_BULK},
517 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
518 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
519 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
520 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
521 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
522 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
523 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
524 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
525 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
526 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
527 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
529 {"select",selectCommand,2,REDIS_CMD_INLINE},
530 {"move",moveCommand,3,REDIS_CMD_INLINE},
531 {"rename",renameCommand,3,REDIS_CMD_INLINE},
532 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
533 {"expire",expireCommand,3,REDIS_CMD_INLINE},
534 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
535 {"keys",keysCommand,2,REDIS_CMD_INLINE},
536 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
537 {"auth",authCommand,2,REDIS_CMD_INLINE},
538 {"ping",pingCommand,1,REDIS_CMD_INLINE},
539 {"echo",echoCommand,2,REDIS_CMD_BULK},
540 {"save",saveCommand,1,REDIS_CMD_INLINE},
541 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
542 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
543 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
544 {"type",typeCommand,2,REDIS_CMD_INLINE},
545 {"sync",syncCommand,1,REDIS_CMD_INLINE},
546 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
547 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
548 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
549 {"info",infoCommand,1,REDIS_CMD_INLINE},
550 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
551 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
552 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
553 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
554 {NULL,NULL,0,0}
555 };
556
557 /*============================ Utility functions ============================ */
558
559 /* Glob-style pattern matching. */
560 int stringmatchlen(const char *pattern, int patternLen,
561 const char *string, int stringLen, int nocase)
562 {
563 while(patternLen) {
564 switch(pattern[0]) {
565 case '*':
566 while (pattern[1] == '*') {
567 pattern++;
568 patternLen--;
569 }
570 if (patternLen == 1)
571 return 1; /* match */
572 while(stringLen) {
573 if (stringmatchlen(pattern+1, patternLen-1,
574 string, stringLen, nocase))
575 return 1; /* match */
576 string++;
577 stringLen--;
578 }
579 return 0; /* no match */
580 break;
581 case '?':
582 if (stringLen == 0)
583 return 0; /* no match */
584 string++;
585 stringLen--;
586 break;
587 case '[':
588 {
589 int not, match;
590
591 pattern++;
592 patternLen--;
593 not = pattern[0] == '^';
594 if (not) {
595 pattern++;
596 patternLen--;
597 }
598 match = 0;
599 while(1) {
600 if (pattern[0] == '\\') {
601 pattern++;
602 patternLen--;
603 if (pattern[0] == string[0])
604 match = 1;
605 } else if (pattern[0] == ']') {
606 break;
607 } else if (patternLen == 0) {
608 pattern--;
609 patternLen++;
610 break;
611 } else if (pattern[1] == '-' && patternLen >= 3) {
612 int start = pattern[0];
613 int end = pattern[2];
614 int c = string[0];
615 if (start > end) {
616 int t = start;
617 start = end;
618 end = t;
619 }
620 if (nocase) {
621 start = tolower(start);
622 end = tolower(end);
623 c = tolower(c);
624 }
625 pattern += 2;
626 patternLen -= 2;
627 if (c >= start && c <= end)
628 match = 1;
629 } else {
630 if (!nocase) {
631 if (pattern[0] == string[0])
632 match = 1;
633 } else {
634 if (tolower((int)pattern[0]) == tolower((int)string[0]))
635 match = 1;
636 }
637 }
638 pattern++;
639 patternLen--;
640 }
641 if (not)
642 match = !match;
643 if (!match)
644 return 0; /* no match */
645 string++;
646 stringLen--;
647 break;
648 }
649 case '\\':
650 if (patternLen >= 2) {
651 pattern++;
652 patternLen--;
653 }
654 /* fall through */
655 default:
656 if (!nocase) {
657 if (pattern[0] != string[0])
658 return 0; /* no match */
659 } else {
660 if (tolower((int)pattern[0]) != tolower((int)string[0]))
661 return 0; /* no match */
662 }
663 string++;
664 stringLen--;
665 break;
666 }
667 pattern++;
668 patternLen--;
669 if (stringLen == 0) {
670 while(*pattern == '*') {
671 pattern++;
672 patternLen--;
673 }
674 break;
675 }
676 }
677 if (patternLen == 0 && stringLen == 0)
678 return 1;
679 return 0;
680 }
681
682 static void redisLog(int level, const char *fmt, ...) {
683 va_list ap;
684 FILE *fp;
685
686 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
687 if (!fp) return;
688
689 va_start(ap, fmt);
690 if (level >= server.verbosity) {
691 char *c = ".-*";
692 char buf[64];
693 time_t now;
694
695 now = time(NULL);
696 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
697 fprintf(fp,"%s %c ",buf,c[level]);
698 vfprintf(fp, fmt, ap);
699 fprintf(fp,"\n");
700 fflush(fp);
701 }
702 va_end(ap);
703
704 if (server.logfile) fclose(fp);
705 }
706
707 /*====================== Hash table type implementation ==================== */
708
709 /* This is an hash table type that uses the SDS dynamic strings libary as
710 * keys and radis objects as values (objects can hold SDS strings,
711 * lists, sets). */
712
713 static void dictVanillaFree(void *privdata, void *val)
714 {
715 DICT_NOTUSED(privdata);
716 zfree(val);
717 }
718
719 static int sdsDictKeyCompare(void *privdata, const void *key1,
720 const void *key2)
721 {
722 int l1,l2;
723 DICT_NOTUSED(privdata);
724
725 l1 = sdslen((sds)key1);
726 l2 = sdslen((sds)key2);
727 if (l1 != l2) return 0;
728 return memcmp(key1, key2, l1) == 0;
729 }
730
731 static void dictRedisObjectDestructor(void *privdata, void *val)
732 {
733 DICT_NOTUSED(privdata);
734
735 decrRefCount(val);
736 }
737
738 static int dictObjKeyCompare(void *privdata, const void *key1,
739 const void *key2)
740 {
741 const robj *o1 = key1, *o2 = key2;
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743 }
744
745 static unsigned int dictObjHash(const void *key) {
746 const robj *o = key;
747 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
748 }
749
750 static int dictEncObjKeyCompare(void *privdata, const void *key1,
751 const void *key2)
752 {
753 const robj *o1 = key1, *o2 = key2;
754
755 if (o1->encoding == REDIS_ENCODING_RAW &&
756 o2->encoding == REDIS_ENCODING_RAW)
757 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
758 else {
759 robj *dec1, *dec2;
760 int cmp;
761
762 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
763 getDecodedObject(o1) : (robj*)o1;
764 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
765 getDecodedObject(o2) : (robj*)o2;
766 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
767 if (dec1 != o1) decrRefCount(dec1);
768 if (dec2 != o2) decrRefCount(dec2);
769 return cmp;
770 }
771 }
772
773 static unsigned int dictEncObjHash(const void *key) {
774 const robj *o = key;
775
776 if (o->encoding == REDIS_ENCODING_RAW)
777 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
778 else {
779 robj *dec = getDecodedObject(o);
780 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
781 decrRefCount(dec);
782 return hash;
783 }
784 }
785
786 static dictType setDictType = {
787 dictEncObjHash, /* hash function */
788 NULL, /* key dup */
789 NULL, /* val dup */
790 dictEncObjKeyCompare, /* key compare */
791 dictRedisObjectDestructor, /* key destructor */
792 NULL /* val destructor */
793 };
794
795 static dictType zsetDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 dictVanillaFree /* val destructor */
802 };
803
804 static dictType hashDictType = {
805 dictObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictRedisObjectDestructor /* val destructor */
811 };
812
813 /* ========================= Random utility functions ======================= */
814
815 /* Redis generally does not try to recover from out of memory conditions
816 * when allocating objects or strings, it is not clear if it will be possible
817 * to report this condition to the client since the networking layer itself
818 * is based on heap allocation for send buffers, so we simply abort.
819 * At least the code will be simpler to read... */
820 static void oom(const char *msg) {
821 fprintf(stderr, "%s: Out of memory\n",msg);
822 fflush(stderr);
823 sleep(1);
824 abort();
825 }
826
827 /* ====================== Redis server networking stuff ===================== */
828 static void closeTimedoutClients(void) {
829 redisClient *c;
830 listNode *ln;
831 time_t now = time(NULL);
832
833 listRewind(server.clients);
834 while ((ln = listYield(server.clients)) != NULL) {
835 c = listNodeValue(ln);
836 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
837 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
838 (now - c->lastinteraction > server.maxidletime)) {
839 redisLog(REDIS_DEBUG,"Closing idle client");
840 freeClient(c);
841 }
842 }
843 }
844
845 static int htNeedsResize(dict *dict) {
846 long long size, used;
847
848 size = dictSlots(dict);
849 used = dictSize(dict);
850 return (size && used && size > DICT_HT_INITIAL_SIZE &&
851 (used*100/size < REDIS_HT_MINFILL));
852 }
853
854 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
855 * we resize the hash table to save memory */
856 static void tryResizeHashTables(void) {
857 int j;
858
859 for (j = 0; j < server.dbnum; j++) {
860 if (htNeedsResize(server.db[j].dict)) {
861 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
862 dictResize(server.db[j].dict);
863 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
864 }
865 if (htNeedsResize(server.db[j].expires))
866 dictResize(server.db[j].expires);
867 }
868 }
869
870 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
871 int j, loops = server.cronloops++;
872 REDIS_NOTUSED(eventLoop);
873 REDIS_NOTUSED(id);
874 REDIS_NOTUSED(clientData);
875
876 /* Update the global state with the amount of used memory */
877 server.usedmemory = zmalloc_used_memory();
878
879 /* Show some info about non-empty databases */
880 for (j = 0; j < server.dbnum; j++) {
881 long long size, used, vkeys;
882
883 size = dictSlots(server.db[j].dict);
884 used = dictSize(server.db[j].dict);
885 vkeys = dictSize(server.db[j].expires);
886 if (!(loops % 5) && (used || vkeys)) {
887 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
888 /* dictPrintStats(server.dict); */
889 }
890 }
891
892 /* We don't want to resize the hash tables while a bacground saving
893 * is in progress: the saving child is created using fork() that is
894 * implemented with a copy-on-write semantic in most modern systems, so
895 * if we resize the HT while there is the saving child at work actually
896 * a lot of memory movements in the parent will cause a lot of pages
897 * copied. */
898 if (!server.bgsaveinprogress) tryResizeHashTables();
899
900 /* Show information about connected clients */
901 if (!(loops % 5)) {
902 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
903 listLength(server.clients)-listLength(server.slaves),
904 listLength(server.slaves),
905 server.usedmemory,
906 dictSize(server.sharingpool));
907 }
908
909 /* Close connections of timedout clients */
910 if (server.maxidletime && !(loops % 10))
911 closeTimedoutClients();
912
913 /* Check if a background saving in progress terminated */
914 if (server.bgsaveinprogress) {
915 int statloc;
916 if (wait3(&statloc,WNOHANG,NULL)) {
917 int exitcode = WEXITSTATUS(statloc);
918 int bysignal = WIFSIGNALED(statloc);
919
920 if (!bysignal && exitcode == 0) {
921 redisLog(REDIS_NOTICE,
922 "Background saving terminated with success");
923 server.dirty = 0;
924 server.lastsave = time(NULL);
925 } else if (!bysignal && exitcode != 0) {
926 redisLog(REDIS_WARNING, "Background saving error");
927 } else {
928 redisLog(REDIS_WARNING,
929 "Background saving terminated by signal");
930 rdbRemoveTempFile(server.bgsavechildpid);
931 }
932 server.bgsaveinprogress = 0;
933 server.bgsavechildpid = -1;
934 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
935 }
936 } else {
937 /* If there is not a background saving in progress check if
938 * we have to save now */
939 time_t now = time(NULL);
940 for (j = 0; j < server.saveparamslen; j++) {
941 struct saveparam *sp = server.saveparams+j;
942
943 if (server.dirty >= sp->changes &&
944 now-server.lastsave > sp->seconds) {
945 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
946 sp->changes, sp->seconds);
947 rdbSaveBackground(server.dbfilename);
948 break;
949 }
950 }
951 }
952
953 /* Try to expire a few timed out keys. The algorithm used is adaptive and
954 * will use few CPU cycles if there are few expiring keys, otherwise
955 * it will get more aggressive to avoid that too much memory is used by
956 * keys that can be removed from the keyspace. */
957 for (j = 0; j < server.dbnum; j++) {
958 int expired;
959 redisDb *db = server.db+j;
960
961 /* Continue to expire if at the end of the cycle more than 25%
962 * of the keys were expired. */
963 do {
964 int num = dictSize(db->expires);
965 time_t now = time(NULL);
966
967 expired = 0;
968 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
969 num = REDIS_EXPIRELOOKUPS_PER_CRON;
970 while (num--) {
971 dictEntry *de;
972 time_t t;
973
974 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
975 t = (time_t) dictGetEntryVal(de);
976 if (now > t) {
977 deleteKey(db,dictGetEntryKey(de));
978 expired++;
979 }
980 }
981 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
982 }
983
984 /* Check if we should connect to a MASTER */
985 if (server.replstate == REDIS_REPL_CONNECT) {
986 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
987 if (syncWithMaster() == REDIS_OK) {
988 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
989 }
990 }
991 return 1000;
992 }
993
994 static void createSharedObjects(void) {
995 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
996 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
997 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
998 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
999 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1000 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1001 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1002 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1003 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1004 /* no such key */
1005 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1006 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1007 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1008 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1009 "-ERR no such key\r\n"));
1010 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1011 "-ERR syntax error\r\n"));
1012 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1013 "-ERR source and destination objects are the same\r\n"));
1014 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1015 "-ERR index out of range\r\n"));
1016 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1017 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1018 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1019 shared.select0 = createStringObject("select 0\r\n",10);
1020 shared.select1 = createStringObject("select 1\r\n",10);
1021 shared.select2 = createStringObject("select 2\r\n",10);
1022 shared.select3 = createStringObject("select 3\r\n",10);
1023 shared.select4 = createStringObject("select 4\r\n",10);
1024 shared.select5 = createStringObject("select 5\r\n",10);
1025 shared.select6 = createStringObject("select 6\r\n",10);
1026 shared.select7 = createStringObject("select 7\r\n",10);
1027 shared.select8 = createStringObject("select 8\r\n",10);
1028 shared.select9 = createStringObject("select 9\r\n",10);
1029 }
1030
1031 static void appendServerSaveParams(time_t seconds, int changes) {
1032 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1033 server.saveparams[server.saveparamslen].seconds = seconds;
1034 server.saveparams[server.saveparamslen].changes = changes;
1035 server.saveparamslen++;
1036 }
1037
1038 static void resetServerSaveParams() {
1039 zfree(server.saveparams);
1040 server.saveparams = NULL;
1041 server.saveparamslen = 0;
1042 }
1043
1044 static void initServerConfig() {
1045 server.dbnum = REDIS_DEFAULT_DBNUM;
1046 server.port = REDIS_SERVERPORT;
1047 server.verbosity = REDIS_DEBUG;
1048 server.maxidletime = REDIS_MAXIDLETIME;
1049 server.saveparams = NULL;
1050 server.logfile = NULL; /* NULL = log on standard output */
1051 server.bindaddr = NULL;
1052 server.glueoutputbuf = 1;
1053 server.daemonize = 0;
1054 server.appendonly = 0;
1055 server.appendfsync = APPENDFSYNC_ALWAYS;
1056 server.lastfsync = time(NULL);
1057 server.appendfd = -1;
1058 server.appendseldb = -1; /* Make sure the first time will not match */
1059 server.pidfile = "/var/run/redis.pid";
1060 server.dbfilename = "dump.rdb";
1061 server.appendfilename = "appendonly.log";
1062 server.requirepass = NULL;
1063 server.shareobjects = 0;
1064 server.sharingpoolsize = 1024;
1065 server.maxclients = 0;
1066 server.maxmemory = 0;
1067 resetServerSaveParams();
1068
1069 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1070 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1071 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1072 /* Replication related */
1073 server.isslave = 0;
1074 server.masterauth = NULL;
1075 server.masterhost = NULL;
1076 server.masterport = 6379;
1077 server.master = NULL;
1078 server.replstate = REDIS_REPL_NONE;
1079
1080 /* Double constants initialization */
1081 R_Zero = 0.0;
1082 R_PosInf = 1.0/R_Zero;
1083 R_NegInf = -1.0/R_Zero;
1084 R_Nan = R_Zero/R_Zero;
1085 }
1086
1087 static void initServer() {
1088 int j;
1089
1090 signal(SIGHUP, SIG_IGN);
1091 signal(SIGPIPE, SIG_IGN);
1092 setupSigSegvAction();
1093
1094 server.clients = listCreate();
1095 server.slaves = listCreate();
1096 server.monitors = listCreate();
1097 server.objfreelist = listCreate();
1098 createSharedObjects();
1099 server.el = aeCreateEventLoop();
1100 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1101 server.sharingpool = dictCreate(&setDictType,NULL);
1102 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1103 if (server.fd == -1) {
1104 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1105 exit(1);
1106 }
1107 for (j = 0; j < server.dbnum; j++) {
1108 server.db[j].dict = dictCreate(&hashDictType,NULL);
1109 server.db[j].expires = dictCreate(&setDictType,NULL);
1110 server.db[j].id = j;
1111 }
1112 server.cronloops = 0;
1113 server.bgsaveinprogress = 0;
1114 server.bgsavechildpid = -1;
1115 server.lastsave = time(NULL);
1116 server.dirty = 0;
1117 server.usedmemory = 0;
1118 server.stat_numcommands = 0;
1119 server.stat_numconnections = 0;
1120 server.stat_starttime = time(NULL);
1121 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1122
1123 if (server.appendonly) {
1124 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1125 if (server.appendfd == -1) {
1126 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1127 strerror(errno));
1128 exit(1);
1129 }
1130 }
1131 }
1132
1133 /* Empty the whole database */
1134 static long long emptyDb() {
1135 int j;
1136 long long removed = 0;
1137
1138 for (j = 0; j < server.dbnum; j++) {
1139 removed += dictSize(server.db[j].dict);
1140 dictEmpty(server.db[j].dict);
1141 dictEmpty(server.db[j].expires);
1142 }
1143 return removed;
1144 }
1145
1146 static int yesnotoi(char *s) {
1147 if (!strcasecmp(s,"yes")) return 1;
1148 else if (!strcasecmp(s,"no")) return 0;
1149 else return -1;
1150 }
1151
1152 /* I agree, this is a very rudimental way to load a configuration...
1153 will improve later if the config gets more complex */
1154 static void loadServerConfig(char *filename) {
1155 FILE *fp;
1156 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1157 int linenum = 0;
1158 sds line = NULL;
1159
1160 if (filename[0] == '-' && filename[1] == '\0')
1161 fp = stdin;
1162 else {
1163 if ((fp = fopen(filename,"r")) == NULL) {
1164 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1165 exit(1);
1166 }
1167 }
1168
1169 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1170 sds *argv;
1171 int argc, j;
1172
1173 linenum++;
1174 line = sdsnew(buf);
1175 line = sdstrim(line," \t\r\n");
1176
1177 /* Skip comments and blank lines*/
1178 if (line[0] == '#' || line[0] == '\0') {
1179 sdsfree(line);
1180 continue;
1181 }
1182
1183 /* Split into arguments */
1184 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1185 sdstolower(argv[0]);
1186
1187 /* Execute config directives */
1188 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1189 server.maxidletime = atoi(argv[1]);
1190 if (server.maxidletime < 0) {
1191 err = "Invalid timeout value"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1194 server.port = atoi(argv[1]);
1195 if (server.port < 1 || server.port > 65535) {
1196 err = "Invalid port"; goto loaderr;
1197 }
1198 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1199 server.bindaddr = zstrdup(argv[1]);
1200 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1201 int seconds = atoi(argv[1]);
1202 int changes = atoi(argv[2]);
1203 if (seconds < 1 || changes < 0) {
1204 err = "Invalid save parameters"; goto loaderr;
1205 }
1206 appendServerSaveParams(seconds,changes);
1207 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1208 if (chdir(argv[1]) == -1) {
1209 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1210 argv[1], strerror(errno));
1211 exit(1);
1212 }
1213 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1214 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1215 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1216 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1217 else {
1218 err = "Invalid log level. Must be one of debug, notice, warning";
1219 goto loaderr;
1220 }
1221 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1222 FILE *logfp;
1223
1224 server.logfile = zstrdup(argv[1]);
1225 if (!strcasecmp(server.logfile,"stdout")) {
1226 zfree(server.logfile);
1227 server.logfile = NULL;
1228 }
1229 if (server.logfile) {
1230 /* Test if we are able to open the file. The server will not
1231 * be able to abort just for this problem later... */
1232 logfp = fopen(server.logfile,"a");
1233 if (logfp == NULL) {
1234 err = sdscatprintf(sdsempty(),
1235 "Can't open the log file: %s", strerror(errno));
1236 goto loaderr;
1237 }
1238 fclose(logfp);
1239 }
1240 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1241 server.dbnum = atoi(argv[1]);
1242 if (server.dbnum < 1) {
1243 err = "Invalid number of databases"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1246 server.maxclients = atoi(argv[1]);
1247 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1248 server.maxmemory = strtoll(argv[1], NULL, 10);
1249 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1250 server.masterhost = sdsnew(argv[1]);
1251 server.masterport = atoi(argv[2]);
1252 server.replstate = REDIS_REPL_CONNECT;
1253 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1254 server.masterauth = zstrdup(argv[1]);
1255 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1256 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1257 err = "argument must be 'yes' or 'no'"; goto loaderr;
1258 }
1259 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1260 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1261 err = "argument must be 'yes' or 'no'"; goto loaderr;
1262 }
1263 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1264 server.sharingpoolsize = atoi(argv[1]);
1265 if (server.sharingpoolsize < 1) {
1266 err = "invalid object sharing pool size"; goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1269 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1270 err = "argument must be 'yes' or 'no'"; goto loaderr;
1271 }
1272 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1273 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1274 err = "argument must be 'yes' or 'no'"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1277 if (!strcasecmp(argv[1],"no")) {
1278 server.appendfsync = APPENDFSYNC_NO;
1279 } else if (!strcasecmp(argv[1],"always")) {
1280 server.appendfsync = APPENDFSYNC_ALWAYS;
1281 } else if (!strcasecmp(argv[1],"everysec")) {
1282 server.appendfsync = APPENDFSYNC_EVERYSEC;
1283 } else {
1284 err = "argument must be 'no', 'always' or 'everysec'";
1285 goto loaderr;
1286 }
1287 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1288 server.requirepass = zstrdup(argv[1]);
1289 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1290 server.pidfile = zstrdup(argv[1]);
1291 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1292 server.dbfilename = zstrdup(argv[1]);
1293 } else {
1294 err = "Bad directive or wrong number of arguments"; goto loaderr;
1295 }
1296 for (j = 0; j < argc; j++)
1297 sdsfree(argv[j]);
1298 zfree(argv);
1299 sdsfree(line);
1300 }
1301 if (fp != stdin) fclose(fp);
1302 return;
1303
1304 loaderr:
1305 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1306 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1307 fprintf(stderr, ">>> '%s'\n", line);
1308 fprintf(stderr, "%s\n", err);
1309 exit(1);
1310 }
1311
1312 static void freeClientArgv(redisClient *c) {
1313 int j;
1314
1315 for (j = 0; j < c->argc; j++)
1316 decrRefCount(c->argv[j]);
1317 for (j = 0; j < c->mbargc; j++)
1318 decrRefCount(c->mbargv[j]);
1319 c->argc = 0;
1320 c->mbargc = 0;
1321 }
1322
1323 static void freeClient(redisClient *c) {
1324 listNode *ln;
1325
1326 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1327 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1328 sdsfree(c->querybuf);
1329 listRelease(c->reply);
1330 freeClientArgv(c);
1331 close(c->fd);
1332 ln = listSearchKey(server.clients,c);
1333 assert(ln != NULL);
1334 listDelNode(server.clients,ln);
1335 if (c->flags & REDIS_SLAVE) {
1336 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1337 close(c->repldbfd);
1338 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1339 ln = listSearchKey(l,c);
1340 assert(ln != NULL);
1341 listDelNode(l,ln);
1342 }
1343 if (c->flags & REDIS_MASTER) {
1344 server.master = NULL;
1345 server.replstate = REDIS_REPL_CONNECT;
1346 }
1347 zfree(c->argv);
1348 zfree(c->mbargv);
1349 zfree(c);
1350 }
1351
1352 #define GLUEREPLY_UP_TO (1024)
1353 static void glueReplyBuffersIfNeeded(redisClient *c) {
1354 int copylen = 0;
1355 char buf[GLUEREPLY_UP_TO];
1356 listNode *ln;
1357 robj *o;
1358
1359 listRewind(c->reply);
1360 while((ln = listYield(c->reply))) {
1361 int objlen;
1362
1363 o = ln->value;
1364 objlen = sdslen(o->ptr);
1365 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1366 memcpy(buf+copylen,o->ptr,objlen);
1367 copylen += objlen;
1368 listDelNode(c->reply,ln);
1369 } else {
1370 if (copylen == 0) return;
1371 break;
1372 }
1373 }
1374 /* Now the output buffer is empty, add the new single element */
1375 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1376 listAddNodeHead(c->reply,o);
1377 }
1378
1379 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1380 redisClient *c = privdata;
1381 int nwritten = 0, totwritten = 0, objlen;
1382 robj *o;
1383 REDIS_NOTUSED(el);
1384 REDIS_NOTUSED(mask);
1385
1386
1387 /* Use writev() if we have enough buffers to send */
1388 #if 0
1389 if (listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1390 !(c->flags & REDIS_MASTER))
1391 {
1392 sendReplyToClientWritev(el, fd, privdata, mask);
1393 return;
1394 }
1395 #endif
1396
1397 while(listLength(c->reply)) {
1398 if (server.glueoutputbuf && listLength(c->reply) > 1)
1399 glueReplyBuffersIfNeeded(c);
1400
1401 o = listNodeValue(listFirst(c->reply));
1402 objlen = sdslen(o->ptr);
1403
1404 if (objlen == 0) {
1405 listDelNode(c->reply,listFirst(c->reply));
1406 continue;
1407 }
1408
1409 if (c->flags & REDIS_MASTER) {
1410 /* Don't reply to a master */
1411 nwritten = objlen - c->sentlen;
1412 } else {
1413 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1414 if (nwritten <= 0) break;
1415 }
1416 c->sentlen += nwritten;
1417 totwritten += nwritten;
1418 /* If we fully sent the object on head go to the next one */
1419 if (c->sentlen == objlen) {
1420 listDelNode(c->reply,listFirst(c->reply));
1421 c->sentlen = 0;
1422 }
1423 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1424 * bytes, in a single threaded server it's a good idea to serve
1425 * other clients as well, even if a very large request comes from
1426 * super fast link that is always able to accept data (in real world
1427 * scenario think about 'KEYS *' against the loopback interfae) */
1428 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1429 }
1430 if (nwritten == -1) {
1431 if (errno == EAGAIN) {
1432 nwritten = 0;
1433 } else {
1434 redisLog(REDIS_DEBUG,
1435 "Error writing to client: %s", strerror(errno));
1436 freeClient(c);
1437 return;
1438 }
1439 }
1440 if (totwritten > 0) c->lastinteraction = time(NULL);
1441 if (listLength(c->reply) == 0) {
1442 c->sentlen = 0;
1443 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1444 }
1445 }
1446
1447 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1448 {
1449 redisClient *c = privdata;
1450 int nwritten = 0, totwritten = 0, objlen, willwrite;
1451 robj *o;
1452 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1453 int offset, ion = 0;
1454 REDIS_NOTUSED(el);
1455 REDIS_NOTUSED(mask);
1456
1457 listNode *node;
1458 while (listLength(c->reply)) {
1459 offset = c->sentlen;
1460 ion = 0;
1461 willwrite = 0;
1462
1463 /* fill-in the iov[] array */
1464 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1465 o = listNodeValue(node);
1466 objlen = sdslen(o->ptr);
1467
1468 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1469 break;
1470
1471 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1472 break; /* no more iovecs */
1473
1474 iov[ion].iov_base = ((char*)o->ptr) + offset;
1475 iov[ion].iov_len = objlen - offset;
1476 willwrite += objlen - offset;
1477 offset = 0; /* just for the first item */
1478 ion++;
1479 }
1480
1481 if(willwrite == 0)
1482 break;
1483
1484 /* write all collected blocks at once */
1485 if((nwritten = writev(fd, iov, ion)) < 0) {
1486 if (errno != EAGAIN) {
1487 redisLog(REDIS_DEBUG,
1488 "Error writing to client: %s", strerror(errno));
1489 freeClient(c);
1490 return;
1491 }
1492 break;
1493 }
1494
1495 totwritten += nwritten;
1496 offset = c->sentlen;
1497
1498 /* remove written robjs from c->reply */
1499 while (nwritten && listLength(c->reply)) {
1500 o = listNodeValue(listFirst(c->reply));
1501 objlen = sdslen(o->ptr);
1502
1503 if(nwritten >= objlen - offset) {
1504 listDelNode(c->reply, listFirst(c->reply));
1505 nwritten -= objlen - offset;
1506 c->sentlen = 0;
1507 } else {
1508 /* partial write */
1509 c->sentlen += nwritten;
1510 break;
1511 }
1512 offset = 0;
1513 }
1514 }
1515
1516 if (totwritten > 0)
1517 c->lastinteraction = time(NULL);
1518
1519 if (listLength(c->reply) == 0) {
1520 c->sentlen = 0;
1521 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1522 }
1523 }
1524
1525 static struct redisCommand *lookupCommand(char *name) {
1526 int j = 0;
1527 while(cmdTable[j].name != NULL) {
1528 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1529 j++;
1530 }
1531 return NULL;
1532 }
1533
1534 /* resetClient prepare the client to process the next command */
1535 static void resetClient(redisClient *c) {
1536 freeClientArgv(c);
1537 c->bulklen = -1;
1538 c->multibulk = 0;
1539 }
1540
1541 /* If this function gets called we already read a whole
1542 * command, argments are in the client argv/argc fields.
1543 * processCommand() execute the command or prepare the
1544 * server for a bulk read from the client.
1545 *
1546 * If 1 is returned the client is still alive and valid and
1547 * and other operations can be performed by the caller. Otherwise
1548 * if 0 is returned the client was destroied (i.e. after QUIT). */
1549 static int processCommand(redisClient *c) {
1550 struct redisCommand *cmd;
1551 long long dirty;
1552
1553 /* Free some memory if needed (maxmemory setting) */
1554 if (server.maxmemory) freeMemoryIfNeeded();
1555
1556 /* Handle the multi bulk command type. This is an alternative protocol
1557 * supported by Redis in order to receive commands that are composed of
1558 * multiple binary-safe "bulk" arguments. The latency of processing is
1559 * a bit higher but this allows things like multi-sets, so if this
1560 * protocol is used only for MSET and similar commands this is a big win. */
1561 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1562 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1563 if (c->multibulk <= 0) {
1564 resetClient(c);
1565 return 1;
1566 } else {
1567 decrRefCount(c->argv[c->argc-1]);
1568 c->argc--;
1569 return 1;
1570 }
1571 } else if (c->multibulk) {
1572 if (c->bulklen == -1) {
1573 if (((char*)c->argv[0]->ptr)[0] != '$') {
1574 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1575 resetClient(c);
1576 return 1;
1577 } else {
1578 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1579 decrRefCount(c->argv[0]);
1580 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1581 c->argc--;
1582 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1583 resetClient(c);
1584 return 1;
1585 }
1586 c->argc--;
1587 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1588 return 1;
1589 }
1590 } else {
1591 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1592 c->mbargv[c->mbargc] = c->argv[0];
1593 c->mbargc++;
1594 c->argc--;
1595 c->multibulk--;
1596 if (c->multibulk == 0) {
1597 robj **auxargv;
1598 int auxargc;
1599
1600 /* Here we need to swap the multi-bulk argc/argv with the
1601 * normal argc/argv of the client structure. */
1602 auxargv = c->argv;
1603 c->argv = c->mbargv;
1604 c->mbargv = auxargv;
1605
1606 auxargc = c->argc;
1607 c->argc = c->mbargc;
1608 c->mbargc = auxargc;
1609
1610 /* We need to set bulklen to something different than -1
1611 * in order for the code below to process the command without
1612 * to try to read the last argument of a bulk command as
1613 * a special argument. */
1614 c->bulklen = 0;
1615 /* continue below and process the command */
1616 } else {
1617 c->bulklen = -1;
1618 return 1;
1619 }
1620 }
1621 }
1622 /* -- end of multi bulk commands processing -- */
1623
1624 /* The QUIT command is handled as a special case. Normal command
1625 * procs are unable to close the client connection safely */
1626 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1627 freeClient(c);
1628 return 0;
1629 }
1630 cmd = lookupCommand(c->argv[0]->ptr);
1631 if (!cmd) {
1632 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1633 resetClient(c);
1634 return 1;
1635 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1636 (c->argc < -cmd->arity)) {
1637 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1638 resetClient(c);
1639 return 1;
1640 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1641 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1642 resetClient(c);
1643 return 1;
1644 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1645 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1646
1647 decrRefCount(c->argv[c->argc-1]);
1648 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1649 c->argc--;
1650 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1651 resetClient(c);
1652 return 1;
1653 }
1654 c->argc--;
1655 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1656 /* It is possible that the bulk read is already in the
1657 * buffer. Check this condition and handle it accordingly.
1658 * This is just a fast path, alternative to call processInputBuffer().
1659 * It's a good idea since the code is small and this condition
1660 * happens most of the times. */
1661 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1662 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1663 c->argc++;
1664 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1665 } else {
1666 return 1;
1667 }
1668 }
1669 /* Let's try to share objects on the command arguments vector */
1670 if (server.shareobjects) {
1671 int j;
1672 for(j = 1; j < c->argc; j++)
1673 c->argv[j] = tryObjectSharing(c->argv[j]);
1674 }
1675 /* Let's try to encode the bulk object to save space. */
1676 if (cmd->flags & REDIS_CMD_BULK)
1677 tryObjectEncoding(c->argv[c->argc-1]);
1678
1679 /* Check if the user is authenticated */
1680 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1681 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1682 resetClient(c);
1683 return 1;
1684 }
1685
1686 /* Exec the command */
1687 dirty = server.dirty;
1688 cmd->proc(c);
1689 if (server.appendonly && server.dirty-dirty)
1690 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1691 if (server.dirty-dirty && listLength(server.slaves))
1692 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1693 if (listLength(server.monitors))
1694 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1695 server.stat_numcommands++;
1696
1697 /* Prepare the client for the next command */
1698 if (c->flags & REDIS_CLOSE) {
1699 freeClient(c);
1700 return 0;
1701 }
1702 resetClient(c);
1703 return 1;
1704 }
1705
1706 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1707 listNode *ln;
1708 int outc = 0, j;
1709 robj **outv;
1710 /* (args*2)+1 is enough room for args, spaces, newlines */
1711 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1712
1713 if (argc <= REDIS_STATIC_ARGS) {
1714 outv = static_outv;
1715 } else {
1716 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1717 }
1718
1719 for (j = 0; j < argc; j++) {
1720 if (j != 0) outv[outc++] = shared.space;
1721 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1722 robj *lenobj;
1723
1724 lenobj = createObject(REDIS_STRING,
1725 sdscatprintf(sdsempty(),"%d\r\n",
1726 stringObjectLen(argv[j])));
1727 lenobj->refcount = 0;
1728 outv[outc++] = lenobj;
1729 }
1730 outv[outc++] = argv[j];
1731 }
1732 outv[outc++] = shared.crlf;
1733
1734 /* Increment all the refcounts at start and decrement at end in order to
1735 * be sure to free objects if there is no slave in a replication state
1736 * able to be feed with commands */
1737 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1738 listRewind(slaves);
1739 while((ln = listYield(slaves))) {
1740 redisClient *slave = ln->value;
1741
1742 /* Don't feed slaves that are still waiting for BGSAVE to start */
1743 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1744
1745 /* Feed all the other slaves, MONITORs and so on */
1746 if (slave->slaveseldb != dictid) {
1747 robj *selectcmd;
1748
1749 switch(dictid) {
1750 case 0: selectcmd = shared.select0; break;
1751 case 1: selectcmd = shared.select1; break;
1752 case 2: selectcmd = shared.select2; break;
1753 case 3: selectcmd = shared.select3; break;
1754 case 4: selectcmd = shared.select4; break;
1755 case 5: selectcmd = shared.select5; break;
1756 case 6: selectcmd = shared.select6; break;
1757 case 7: selectcmd = shared.select7; break;
1758 case 8: selectcmd = shared.select8; break;
1759 case 9: selectcmd = shared.select9; break;
1760 default:
1761 selectcmd = createObject(REDIS_STRING,
1762 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1763 selectcmd->refcount = 0;
1764 break;
1765 }
1766 addReply(slave,selectcmd);
1767 slave->slaveseldb = dictid;
1768 }
1769 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1770 }
1771 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1772 if (outv != static_outv) zfree(outv);
1773 }
1774
1775 static void processInputBuffer(redisClient *c) {
1776 again:
1777 if (c->bulklen == -1) {
1778 /* Read the first line of the query */
1779 char *p = strchr(c->querybuf,'\n');
1780 size_t querylen;
1781
1782 if (p) {
1783 sds query, *argv;
1784 int argc, j;
1785
1786 query = c->querybuf;
1787 c->querybuf = sdsempty();
1788 querylen = 1+(p-(query));
1789 if (sdslen(query) > querylen) {
1790 /* leave data after the first line of the query in the buffer */
1791 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1792 }
1793 *p = '\0'; /* remove "\n" */
1794 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1795 sdsupdatelen(query);
1796
1797 /* Now we can split the query in arguments */
1798 if (sdslen(query) == 0) {
1799 /* Ignore empty query */
1800 sdsfree(query);
1801 return;
1802 }
1803 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1804 sdsfree(query);
1805
1806 if (c->argv) zfree(c->argv);
1807 c->argv = zmalloc(sizeof(robj*)*argc);
1808
1809 for (j = 0; j < argc; j++) {
1810 if (sdslen(argv[j])) {
1811 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1812 c->argc++;
1813 } else {
1814 sdsfree(argv[j]);
1815 }
1816 }
1817 zfree(argv);
1818 /* Execute the command. If the client is still valid
1819 * after processCommand() return and there is something
1820 * on the query buffer try to process the next command. */
1821 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1822 return;
1823 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1824 redisLog(REDIS_DEBUG, "Client protocol error");
1825 freeClient(c);
1826 return;
1827 }
1828 } else {
1829 /* Bulk read handling. Note that if we are at this point
1830 the client already sent a command terminated with a newline,
1831 we are reading the bulk data that is actually the last
1832 argument of the command. */
1833 int qbl = sdslen(c->querybuf);
1834
1835 if (c->bulklen <= qbl) {
1836 /* Copy everything but the final CRLF as final argument */
1837 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1838 c->argc++;
1839 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1840 /* Process the command. If the client is still valid after
1841 * the processing and there is more data in the buffer
1842 * try to parse it. */
1843 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1844 return;
1845 }
1846 }
1847 }
1848
1849 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1850 redisClient *c = (redisClient*) privdata;
1851 char buf[REDIS_IOBUF_LEN];
1852 int nread;
1853 REDIS_NOTUSED(el);
1854 REDIS_NOTUSED(mask);
1855
1856 nread = read(fd, buf, REDIS_IOBUF_LEN);
1857 if (nread == -1) {
1858 if (errno == EAGAIN) {
1859 nread = 0;
1860 } else {
1861 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1862 freeClient(c);
1863 return;
1864 }
1865 } else if (nread == 0) {
1866 redisLog(REDIS_DEBUG, "Client closed connection");
1867 freeClient(c);
1868 return;
1869 }
1870 if (nread) {
1871 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1872 c->lastinteraction = time(NULL);
1873 } else {
1874 return;
1875 }
1876 processInputBuffer(c);
1877 }
1878
1879 static int selectDb(redisClient *c, int id) {
1880 if (id < 0 || id >= server.dbnum)
1881 return REDIS_ERR;
1882 c->db = &server.db[id];
1883 return REDIS_OK;
1884 }
1885
1886 static void *dupClientReplyValue(void *o) {
1887 incrRefCount((robj*)o);
1888 return 0;
1889 }
1890
1891 static redisClient *createClient(int fd) {
1892 redisClient *c = zmalloc(sizeof(*c));
1893
1894 anetNonBlock(NULL,fd);
1895 anetTcpNoDelay(NULL,fd);
1896 if (!c) return NULL;
1897 selectDb(c,0);
1898 c->fd = fd;
1899 c->querybuf = sdsempty();
1900 c->argc = 0;
1901 c->argv = NULL;
1902 c->bulklen = -1;
1903 c->multibulk = 0;
1904 c->mbargc = 0;
1905 c->mbargv = NULL;
1906 c->sentlen = 0;
1907 c->flags = 0;
1908 c->lastinteraction = time(NULL);
1909 c->authenticated = 0;
1910 c->replstate = REDIS_REPL_NONE;
1911 c->reply = listCreate();
1912 listSetFreeMethod(c->reply,decrRefCount);
1913 listSetDupMethod(c->reply,dupClientReplyValue);
1914 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1915 readQueryFromClient, c, NULL) == AE_ERR) {
1916 freeClient(c);
1917 return NULL;
1918 }
1919 listAddNodeTail(server.clients,c);
1920 return c;
1921 }
1922
1923 static void addReply(redisClient *c, robj *obj) {
1924 if (listLength(c->reply) == 0 &&
1925 (c->replstate == REDIS_REPL_NONE ||
1926 c->replstate == REDIS_REPL_ONLINE) &&
1927 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1928 sendReplyToClient, c, NULL) == AE_ERR) return;
1929 if (obj->encoding != REDIS_ENCODING_RAW) {
1930 obj = getDecodedObject(obj);
1931 } else {
1932 incrRefCount(obj);
1933 }
1934 listAddNodeTail(c->reply,obj);
1935 }
1936
1937 static void addReplySds(redisClient *c, sds s) {
1938 robj *o = createObject(REDIS_STRING,s);
1939 addReply(c,o);
1940 decrRefCount(o);
1941 }
1942
1943 static void addReplyBulkLen(redisClient *c, robj *obj) {
1944 size_t len;
1945
1946 if (obj->encoding == REDIS_ENCODING_RAW) {
1947 len = sdslen(obj->ptr);
1948 } else {
1949 long n = (long)obj->ptr;
1950
1951 len = 1;
1952 if (n < 0) {
1953 len++;
1954 n = -n;
1955 }
1956 while((n = n/10) != 0) {
1957 len++;
1958 }
1959 }
1960 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1961 }
1962
1963 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1964 int cport, cfd;
1965 char cip[128];
1966 redisClient *c;
1967 REDIS_NOTUSED(el);
1968 REDIS_NOTUSED(mask);
1969 REDIS_NOTUSED(privdata);
1970
1971 cfd = anetAccept(server.neterr, fd, cip, &cport);
1972 if (cfd == AE_ERR) {
1973 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1974 return;
1975 }
1976 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1977 if ((c = createClient(cfd)) == NULL) {
1978 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1979 close(cfd); /* May be already closed, just ingore errors */
1980 return;
1981 }
1982 /* If maxclient directive is set and this is one client more... close the
1983 * connection. Note that we create the client instead to check before
1984 * for this condition, since now the socket is already set in nonblocking
1985 * mode and we can send an error for free using the Kernel I/O */
1986 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1987 char *err = "-ERR max number of clients reached\r\n";
1988
1989 /* That's a best effort error message, don't check write errors */
1990 if (write(c->fd,err,strlen(err)) == -1) {
1991 /* Nothing to do, Just to avoid the warning... */
1992 }
1993 freeClient(c);
1994 return;
1995 }
1996 server.stat_numconnections++;
1997 }
1998
1999 /* ======================= Redis objects implementation ===================== */
2000
2001 static robj *createObject(int type, void *ptr) {
2002 robj *o;
2003
2004 if (listLength(server.objfreelist)) {
2005 listNode *head = listFirst(server.objfreelist);
2006 o = listNodeValue(head);
2007 listDelNode(server.objfreelist,head);
2008 } else {
2009 o = zmalloc(sizeof(*o));
2010 }
2011 o->type = type;
2012 o->encoding = REDIS_ENCODING_RAW;
2013 o->ptr = ptr;
2014 o->refcount = 1;
2015 return o;
2016 }
2017
2018 static robj *createStringObject(char *ptr, size_t len) {
2019 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2020 }
2021
2022 static robj *createListObject(void) {
2023 list *l = listCreate();
2024
2025 listSetFreeMethod(l,decrRefCount);
2026 return createObject(REDIS_LIST,l);
2027 }
2028
2029 static robj *createSetObject(void) {
2030 dict *d = dictCreate(&setDictType,NULL);
2031 return createObject(REDIS_SET,d);
2032 }
2033
2034 static robj *createZsetObject(void) {
2035 zset *zs = zmalloc(sizeof(*zs));
2036
2037 zs->dict = dictCreate(&zsetDictType,NULL);
2038 zs->zsl = zslCreate();
2039 return createObject(REDIS_ZSET,zs);
2040 }
2041
2042 static void freeStringObject(robj *o) {
2043 if (o->encoding == REDIS_ENCODING_RAW) {
2044 sdsfree(o->ptr);
2045 }
2046 }
2047
2048 static void freeListObject(robj *o) {
2049 listRelease((list*) o->ptr);
2050 }
2051
2052 static void freeSetObject(robj *o) {
2053 dictRelease((dict*) o->ptr);
2054 }
2055
2056 static void freeZsetObject(robj *o) {
2057 zset *zs = o->ptr;
2058
2059 dictRelease(zs->dict);
2060 zslFree(zs->zsl);
2061 zfree(zs);
2062 }
2063
2064 static void freeHashObject(robj *o) {
2065 dictRelease((dict*) o->ptr);
2066 }
2067
2068 static void incrRefCount(robj *o) {
2069 o->refcount++;
2070 #ifdef DEBUG_REFCOUNT
2071 if (o->type == REDIS_STRING)
2072 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2073 #endif
2074 }
2075
2076 static void decrRefCount(void *obj) {
2077 robj *o = obj;
2078
2079 #ifdef DEBUG_REFCOUNT
2080 if (o->type == REDIS_STRING)
2081 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2082 #endif
2083 if (--(o->refcount) == 0) {
2084 switch(o->type) {
2085 case REDIS_STRING: freeStringObject(o); break;
2086 case REDIS_LIST: freeListObject(o); break;
2087 case REDIS_SET: freeSetObject(o); break;
2088 case REDIS_ZSET: freeZsetObject(o); break;
2089 case REDIS_HASH: freeHashObject(o); break;
2090 default: assert(0 != 0); break;
2091 }
2092 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2093 !listAddNodeHead(server.objfreelist,o))
2094 zfree(o);
2095 }
2096 }
2097
2098 static robj *lookupKey(redisDb *db, robj *key) {
2099 dictEntry *de = dictFind(db->dict,key);
2100 return de ? dictGetEntryVal(de) : NULL;
2101 }
2102
2103 static robj *lookupKeyRead(redisDb *db, robj *key) {
2104 expireIfNeeded(db,key);
2105 return lookupKey(db,key);
2106 }
2107
2108 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2109 deleteIfVolatile(db,key);
2110 return lookupKey(db,key);
2111 }
2112
2113 static int deleteKey(redisDb *db, robj *key) {
2114 int retval;
2115
2116 /* We need to protect key from destruction: after the first dictDelete()
2117 * it may happen that 'key' is no longer valid if we don't increment
2118 * it's count. This may happen when we get the object reference directly
2119 * from the hash table with dictRandomKey() or dict iterators */
2120 incrRefCount(key);
2121 if (dictSize(db->expires)) dictDelete(db->expires,key);
2122 retval = dictDelete(db->dict,key);
2123 decrRefCount(key);
2124
2125 return retval == DICT_OK;
2126 }
2127
2128 /* Try to share an object against the shared objects pool */
2129 static robj *tryObjectSharing(robj *o) {
2130 struct dictEntry *de;
2131 unsigned long c;
2132
2133 if (o == NULL || server.shareobjects == 0) return o;
2134
2135 assert(o->type == REDIS_STRING);
2136 de = dictFind(server.sharingpool,o);
2137 if (de) {
2138 robj *shared = dictGetEntryKey(de);
2139
2140 c = ((unsigned long) dictGetEntryVal(de))+1;
2141 dictGetEntryVal(de) = (void*) c;
2142 incrRefCount(shared);
2143 decrRefCount(o);
2144 return shared;
2145 } else {
2146 /* Here we are using a stream algorihtm: Every time an object is
2147 * shared we increment its count, everytime there is a miss we
2148 * recrement the counter of a random object. If this object reaches
2149 * zero we remove the object and put the current object instead. */
2150 if (dictSize(server.sharingpool) >=
2151 server.sharingpoolsize) {
2152 de = dictGetRandomKey(server.sharingpool);
2153 assert(de != NULL);
2154 c = ((unsigned long) dictGetEntryVal(de))-1;
2155 dictGetEntryVal(de) = (void*) c;
2156 if (c == 0) {
2157 dictDelete(server.sharingpool,de->key);
2158 }
2159 } else {
2160 c = 0; /* If the pool is empty we want to add this object */
2161 }
2162 if (c == 0) {
2163 int retval;
2164
2165 retval = dictAdd(server.sharingpool,o,(void*)1);
2166 assert(retval == DICT_OK);
2167 incrRefCount(o);
2168 }
2169 return o;
2170 }
2171 }
2172
2173 /* Check if the nul-terminated string 's' can be represented by a long
2174 * (that is, is a number that fits into long without any other space or
2175 * character before or after the digits).
2176 *
2177 * If so, the function returns REDIS_OK and *longval is set to the value
2178 * of the number. Otherwise REDIS_ERR is returned */
2179 static int isStringRepresentableAsLong(sds s, long *longval) {
2180 char buf[32], *endptr;
2181 long value;
2182 int slen;
2183
2184 value = strtol(s, &endptr, 10);
2185 if (endptr[0] != '\0') return REDIS_ERR;
2186 slen = snprintf(buf,32,"%ld",value);
2187
2188 /* If the number converted back into a string is not identical
2189 * then it's not possible to encode the string as integer */
2190 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2191 if (longval) *longval = value;
2192 return REDIS_OK;
2193 }
2194
2195 /* Try to encode a string object in order to save space */
2196 static int tryObjectEncoding(robj *o) {
2197 long value;
2198 sds s = o->ptr;
2199
2200 if (o->encoding != REDIS_ENCODING_RAW)
2201 return REDIS_ERR; /* Already encoded */
2202
2203 /* It's not save to encode shared objects: shared objects can be shared
2204 * everywhere in the "object space" of Redis. Encoded objects can only
2205 * appear as "values" (and not, for instance, as keys) */
2206 if (o->refcount > 1) return REDIS_ERR;
2207
2208 /* Currently we try to encode only strings */
2209 assert(o->type == REDIS_STRING);
2210
2211 /* Check if we can represent this string as a long integer */
2212 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2213
2214 /* Ok, this object can be encoded */
2215 o->encoding = REDIS_ENCODING_INT;
2216 sdsfree(o->ptr);
2217 o->ptr = (void*) value;
2218 return REDIS_OK;
2219 }
2220
2221 /* Get a decoded version of an encoded object (returned as a new object) */
2222 static robj *getDecodedObject(const robj *o) {
2223 robj *dec;
2224
2225 assert(o->encoding != REDIS_ENCODING_RAW);
2226 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2227 char buf[32];
2228
2229 snprintf(buf,32,"%ld",(long)o->ptr);
2230 dec = createStringObject(buf,strlen(buf));
2231 return dec;
2232 } else {
2233 assert(1 != 1);
2234 }
2235 }
2236
2237 /* Compare two string objects via strcmp() or alike.
2238 * Note that the objects may be integer-encoded. In such a case we
2239 * use snprintf() to get a string representation of the numbers on the stack
2240 * and compare the strings, it's much faster than calling getDecodedObject(). */
2241 static int compareStringObjects(robj *a, robj *b) {
2242 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2243 char bufa[128], bufb[128], *astr, *bstr;
2244 int bothsds = 1;
2245
2246 if (a == b) return 0;
2247 if (a->encoding != REDIS_ENCODING_RAW) {
2248 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2249 astr = bufa;
2250 bothsds = 0;
2251 } else {
2252 astr = a->ptr;
2253 }
2254 if (b->encoding != REDIS_ENCODING_RAW) {
2255 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2256 bstr = bufb;
2257 bothsds = 0;
2258 } else {
2259 bstr = b->ptr;
2260 }
2261 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2262 }
2263
2264 static size_t stringObjectLen(robj *o) {
2265 assert(o->type == REDIS_STRING);
2266 if (o->encoding == REDIS_ENCODING_RAW) {
2267 return sdslen(o->ptr);
2268 } else {
2269 char buf[32];
2270
2271 return snprintf(buf,32,"%ld",(long)o->ptr);
2272 }
2273 }
2274
2275 /*============================ DB saving/loading ============================ */
2276
2277 static int rdbSaveType(FILE *fp, unsigned char type) {
2278 if (fwrite(&type,1,1,fp) == 0) return -1;
2279 return 0;
2280 }
2281
2282 static int rdbSaveTime(FILE *fp, time_t t) {
2283 int32_t t32 = (int32_t) t;
2284 if (fwrite(&t32,4,1,fp) == 0) return -1;
2285 return 0;
2286 }
2287
2288 /* check rdbLoadLen() comments for more info */
2289 static int rdbSaveLen(FILE *fp, uint32_t len) {
2290 unsigned char buf[2];
2291
2292 if (len < (1<<6)) {
2293 /* Save a 6 bit len */
2294 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2295 if (fwrite(buf,1,1,fp) == 0) return -1;
2296 } else if (len < (1<<14)) {
2297 /* Save a 14 bit len */
2298 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2299 buf[1] = len&0xFF;
2300 if (fwrite(buf,2,1,fp) == 0) return -1;
2301 } else {
2302 /* Save a 32 bit len */
2303 buf[0] = (REDIS_RDB_32BITLEN<<6);
2304 if (fwrite(buf,1,1,fp) == 0) return -1;
2305 len = htonl(len);
2306 if (fwrite(&len,4,1,fp) == 0) return -1;
2307 }
2308 return 0;
2309 }
2310
2311 /* String objects in the form "2391" "-100" without any space and with a
2312 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2313 * encoded as integers to save space */
2314 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2315 long long value;
2316 char *endptr, buf[32];
2317
2318 /* Check if it's possible to encode this value as a number */
2319 value = strtoll(s, &endptr, 10);
2320 if (endptr[0] != '\0') return 0;
2321 snprintf(buf,32,"%lld",value);
2322
2323 /* If the number converted back into a string is not identical
2324 * then it's not possible to encode the string as integer */
2325 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2326
2327 /* Finally check if it fits in our ranges */
2328 if (value >= -(1<<7) && value <= (1<<7)-1) {
2329 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2330 enc[1] = value&0xFF;
2331 return 2;
2332 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2333 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2334 enc[1] = value&0xFF;
2335 enc[2] = (value>>8)&0xFF;
2336 return 3;
2337 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2338 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2339 enc[1] = value&0xFF;
2340 enc[2] = (value>>8)&0xFF;
2341 enc[3] = (value>>16)&0xFF;
2342 enc[4] = (value>>24)&0xFF;
2343 return 5;
2344 } else {
2345 return 0;
2346 }
2347 }
2348
2349 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2350 unsigned int comprlen, outlen;
2351 unsigned char byte;
2352 void *out;
2353
2354 /* We require at least four bytes compression for this to be worth it */
2355 outlen = sdslen(obj->ptr)-4;
2356 if (outlen <= 0) return 0;
2357 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2358 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2359 if (comprlen == 0) {
2360 zfree(out);
2361 return 0;
2362 }
2363 /* Data compressed! Let's save it on disk */
2364 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2365 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2366 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2367 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2368 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2369 zfree(out);
2370 return comprlen;
2371
2372 writeerr:
2373 zfree(out);
2374 return -1;
2375 }
2376
2377 /* Save a string objet as [len][data] on disk. If the object is a string
2378 * representation of an integer value we try to safe it in a special form */
2379 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2380 size_t len;
2381 int enclen;
2382
2383 len = sdslen(obj->ptr);
2384
2385 /* Try integer encoding */
2386 if (len <= 11) {
2387 unsigned char buf[5];
2388 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2389 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2390 return 0;
2391 }
2392 }
2393
2394 /* Try LZF compression - under 20 bytes it's unable to compress even
2395 * aaaaaaaaaaaaaaaaaa so skip it */
2396 if (len > 20) {
2397 int retval;
2398
2399 retval = rdbSaveLzfStringObject(fp,obj);
2400 if (retval == -1) return -1;
2401 if (retval > 0) return 0;
2402 /* retval == 0 means data can't be compressed, save the old way */
2403 }
2404
2405 /* Store verbatim */
2406 if (rdbSaveLen(fp,len) == -1) return -1;
2407 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2408 return 0;
2409 }
2410
2411 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2412 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2413 int retval;
2414 robj *dec;
2415
2416 if (obj->encoding != REDIS_ENCODING_RAW) {
2417 dec = getDecodedObject(obj);
2418 retval = rdbSaveStringObjectRaw(fp,dec);
2419 decrRefCount(dec);
2420 return retval;
2421 } else {
2422 return rdbSaveStringObjectRaw(fp,obj);
2423 }
2424 }
2425
2426 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2427 * 8 bit integer specifing the length of the representation.
2428 * This 8 bit integer has special values in order to specify the following
2429 * conditions:
2430 * 253: not a number
2431 * 254: + inf
2432 * 255: - inf
2433 */
2434 static int rdbSaveDoubleValue(FILE *fp, double val) {
2435 unsigned char buf[128];
2436 int len;
2437
2438 if (isnan(val)) {
2439 buf[0] = 253;
2440 len = 1;
2441 } else if (!isfinite(val)) {
2442 len = 1;
2443 buf[0] = (val < 0) ? 255 : 254;
2444 } else {
2445 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2446 buf[0] = strlen((char*)buf);
2447 len = buf[0]+1;
2448 }
2449 if (fwrite(buf,len,1,fp) == 0) return -1;
2450 return 0;
2451 }
2452
2453 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2454 static int rdbSave(char *filename) {
2455 dictIterator *di = NULL;
2456 dictEntry *de;
2457 FILE *fp;
2458 char tmpfile[256];
2459 int j;
2460 time_t now = time(NULL);
2461
2462 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2463 fp = fopen(tmpfile,"w");
2464 if (!fp) {
2465 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2466 return REDIS_ERR;
2467 }
2468 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2469 for (j = 0; j < server.dbnum; j++) {
2470 redisDb *db = server.db+j;
2471 dict *d = db->dict;
2472 if (dictSize(d) == 0) continue;
2473 di = dictGetIterator(d);
2474 if (!di) {
2475 fclose(fp);
2476 return REDIS_ERR;
2477 }
2478
2479 /* Write the SELECT DB opcode */
2480 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2481 if (rdbSaveLen(fp,j) == -1) goto werr;
2482
2483 /* Iterate this DB writing every entry */
2484 while((de = dictNext(di)) != NULL) {
2485 robj *key = dictGetEntryKey(de);
2486 robj *o = dictGetEntryVal(de);
2487 time_t expiretime = getExpire(db,key);
2488
2489 /* Save the expire time */
2490 if (expiretime != -1) {
2491 /* If this key is already expired skip it */
2492 if (expiretime < now) continue;
2493 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2494 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2495 }
2496 /* Save the key and associated value */
2497 if (rdbSaveType(fp,o->type) == -1) goto werr;
2498 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2499 if (o->type == REDIS_STRING) {
2500 /* Save a string value */
2501 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2502 } else if (o->type == REDIS_LIST) {
2503 /* Save a list value */
2504 list *list = o->ptr;
2505 listNode *ln;
2506
2507 listRewind(list);
2508 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2509 while((ln = listYield(list))) {
2510 robj *eleobj = listNodeValue(ln);
2511
2512 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2513 }
2514 } else if (o->type == REDIS_SET) {
2515 /* Save a set value */
2516 dict *set = o->ptr;
2517 dictIterator *di = dictGetIterator(set);
2518 dictEntry *de;
2519
2520 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2521 while((de = dictNext(di)) != NULL) {
2522 robj *eleobj = dictGetEntryKey(de);
2523
2524 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2525 }
2526 dictReleaseIterator(di);
2527 } else if (o->type == REDIS_ZSET) {
2528 /* Save a set value */
2529 zset *zs = o->ptr;
2530 dictIterator *di = dictGetIterator(zs->dict);
2531 dictEntry *de;
2532
2533 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2534 while((de = dictNext(di)) != NULL) {
2535 robj *eleobj = dictGetEntryKey(de);
2536 double *score = dictGetEntryVal(de);
2537
2538 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2539 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2540 }
2541 dictReleaseIterator(di);
2542 } else {
2543 assert(0 != 0);
2544 }
2545 }
2546 dictReleaseIterator(di);
2547 }
2548 /* EOF opcode */
2549 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2550
2551 /* Make sure data will not remain on the OS's output buffers */
2552 fflush(fp);
2553 fsync(fileno(fp));
2554 fclose(fp);
2555
2556 /* Use RENAME to make sure the DB file is changed atomically only
2557 * if the generate DB file is ok. */
2558 if (rename(tmpfile,filename) == -1) {
2559 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2560 unlink(tmpfile);
2561 return REDIS_ERR;
2562 }
2563 redisLog(REDIS_NOTICE,"DB saved on disk");
2564 server.dirty = 0;
2565 server.lastsave = time(NULL);
2566 return REDIS_OK;
2567
2568 werr:
2569 fclose(fp);
2570 unlink(tmpfile);
2571 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2572 if (di) dictReleaseIterator(di);
2573 return REDIS_ERR;
2574 }
2575
2576 static int rdbSaveBackground(char *filename) {
2577 pid_t childpid;
2578
2579 if (server.bgsaveinprogress) return REDIS_ERR;
2580 if ((childpid = fork()) == 0) {
2581 /* Child */
2582 close(server.fd);
2583 if (rdbSave(filename) == REDIS_OK) {
2584 exit(0);
2585 } else {
2586 exit(1);
2587 }
2588 } else {
2589 /* Parent */
2590 if (childpid == -1) {
2591 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2592 strerror(errno));
2593 return REDIS_ERR;
2594 }
2595 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2596 server.bgsaveinprogress = 1;
2597 server.bgsavechildpid = childpid;
2598 return REDIS_OK;
2599 }
2600 return REDIS_OK; /* unreached */
2601 }
2602
2603 static void rdbRemoveTempFile(pid_t childpid) {
2604 char tmpfile[256];
2605
2606 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2607 unlink(tmpfile);
2608 }
2609
2610 static int rdbLoadType(FILE *fp) {
2611 unsigned char type;
2612 if (fread(&type,1,1,fp) == 0) return -1;
2613 return type;
2614 }
2615
2616 static time_t rdbLoadTime(FILE *fp) {
2617 int32_t t32;
2618 if (fread(&t32,4,1,fp) == 0) return -1;
2619 return (time_t) t32;
2620 }
2621
2622 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2623 * of this file for a description of how this are stored on disk.
2624 *
2625 * isencoded is set to 1 if the readed length is not actually a length but
2626 * an "encoding type", check the above comments for more info */
2627 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2628 unsigned char buf[2];
2629 uint32_t len;
2630
2631 if (isencoded) *isencoded = 0;
2632 if (rdbver == 0) {
2633 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2634 return ntohl(len);
2635 } else {
2636 int type;
2637
2638 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2639 type = (buf[0]&0xC0)>>6;
2640 if (type == REDIS_RDB_6BITLEN) {
2641 /* Read a 6 bit len */
2642 return buf[0]&0x3F;
2643 } else if (type == REDIS_RDB_ENCVAL) {
2644 /* Read a 6 bit len encoding type */
2645 if (isencoded) *isencoded = 1;
2646 return buf[0]&0x3F;
2647 } else if (type == REDIS_RDB_14BITLEN) {
2648 /* Read a 14 bit len */
2649 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2650 return ((buf[0]&0x3F)<<8)|buf[1];
2651 } else {
2652 /* Read a 32 bit len */
2653 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2654 return ntohl(len);
2655 }
2656 }
2657 }
2658
2659 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2660 unsigned char enc[4];
2661 long long val;
2662
2663 if (enctype == REDIS_RDB_ENC_INT8) {
2664 if (fread(enc,1,1,fp) == 0) return NULL;
2665 val = (signed char)enc[0];
2666 } else if (enctype == REDIS_RDB_ENC_INT16) {
2667 uint16_t v;
2668 if (fread(enc,2,1,fp) == 0) return NULL;
2669 v = enc[0]|(enc[1]<<8);
2670 val = (int16_t)v;
2671 } else if (enctype == REDIS_RDB_ENC_INT32) {
2672 uint32_t v;
2673 if (fread(enc,4,1,fp) == 0) return NULL;
2674 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2675 val = (int32_t)v;
2676 } else {
2677 val = 0; /* anti-warning */
2678 assert(0!=0);
2679 }
2680 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2681 }
2682
2683 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2684 unsigned int len, clen;
2685 unsigned char *c = NULL;
2686 sds val = NULL;
2687
2688 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2689 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2690 if ((c = zmalloc(clen)) == NULL) goto err;
2691 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2692 if (fread(c,clen,1,fp) == 0) goto err;
2693 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2694 zfree(c);
2695 return createObject(REDIS_STRING,val);
2696 err:
2697 zfree(c);
2698 sdsfree(val);
2699 return NULL;
2700 }
2701
2702 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2703 int isencoded;
2704 uint32_t len;
2705 sds val;
2706
2707 len = rdbLoadLen(fp,rdbver,&isencoded);
2708 if (isencoded) {
2709 switch(len) {
2710 case REDIS_RDB_ENC_INT8:
2711 case REDIS_RDB_ENC_INT16:
2712 case REDIS_RDB_ENC_INT32:
2713 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2714 case REDIS_RDB_ENC_LZF:
2715 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2716 default:
2717 assert(0!=0);
2718 }
2719 }
2720
2721 if (len == REDIS_RDB_LENERR) return NULL;
2722 val = sdsnewlen(NULL,len);
2723 if (len && fread(val,len,1,fp) == 0) {
2724 sdsfree(val);
2725 return NULL;
2726 }
2727 return tryObjectSharing(createObject(REDIS_STRING,val));
2728 }
2729
2730 /* For information about double serialization check rdbSaveDoubleValue() */
2731 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2732 char buf[128];
2733 unsigned char len;
2734
2735 if (fread(&len,1,1,fp) == 0) return -1;
2736 switch(len) {
2737 case 255: *val = R_NegInf; return 0;
2738 case 254: *val = R_PosInf; return 0;
2739 case 253: *val = R_Nan; return 0;
2740 default:
2741 if (fread(buf,len,1,fp) == 0) return -1;
2742 sscanf(buf, "%lg", val);
2743 return 0;
2744 }
2745 }
2746
2747 static int rdbLoad(char *filename) {
2748 FILE *fp;
2749 robj *keyobj = NULL;
2750 uint32_t dbid;
2751 int type, retval, rdbver;
2752 dict *d = server.db[0].dict;
2753 redisDb *db = server.db+0;
2754 char buf[1024];
2755 time_t expiretime = -1, now = time(NULL);
2756
2757 fp = fopen(filename,"r");
2758 if (!fp) return REDIS_ERR;
2759 if (fread(buf,9,1,fp) == 0) goto eoferr;
2760 buf[9] = '\0';
2761 if (memcmp(buf,"REDIS",5) != 0) {
2762 fclose(fp);
2763 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2764 return REDIS_ERR;
2765 }
2766 rdbver = atoi(buf+5);
2767 if (rdbver > 1) {
2768 fclose(fp);
2769 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2770 return REDIS_ERR;
2771 }
2772 while(1) {
2773 robj *o;
2774
2775 /* Read type. */
2776 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2777 if (type == REDIS_EXPIRETIME) {
2778 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2779 /* We read the time so we need to read the object type again */
2780 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2781 }
2782 if (type == REDIS_EOF) break;
2783 /* Handle SELECT DB opcode as a special case */
2784 if (type == REDIS_SELECTDB) {
2785 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2786 goto eoferr;
2787 if (dbid >= (unsigned)server.dbnum) {
2788 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2789 exit(1);
2790 }
2791 db = server.db+dbid;
2792 d = db->dict;
2793 continue;
2794 }
2795 /* Read key */
2796 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2797
2798 if (type == REDIS_STRING) {
2799 /* Read string value */
2800 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2801 tryObjectEncoding(o);
2802 } else if (type == REDIS_LIST || type == REDIS_SET) {
2803 /* Read list/set value */
2804 uint32_t listlen;
2805
2806 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2807 goto eoferr;
2808 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2809 /* Load every single element of the list/set */
2810 while(listlen--) {
2811 robj *ele;
2812
2813 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2814 tryObjectEncoding(ele);
2815 if (type == REDIS_LIST) {
2816 listAddNodeTail((list*)o->ptr,ele);
2817 } else {
2818 dictAdd((dict*)o->ptr,ele,NULL);
2819 }
2820 }
2821 } else if (type == REDIS_ZSET) {
2822 /* Read list/set value */
2823 uint32_t zsetlen;
2824 zset *zs;
2825
2826 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2827 goto eoferr;
2828 o = createZsetObject();
2829 zs = o->ptr;
2830 /* Load every single element of the list/set */
2831 while(zsetlen--) {
2832 robj *ele;
2833 double *score = zmalloc(sizeof(double));
2834
2835 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2836 tryObjectEncoding(ele);
2837 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2838 dictAdd(zs->dict,ele,score);
2839 zslInsert(zs->zsl,*score,ele);
2840 incrRefCount(ele); /* added to skiplist */
2841 }
2842 } else {
2843 assert(0 != 0);
2844 }
2845 /* Add the new object in the hash table */
2846 retval = dictAdd(d,keyobj,o);
2847 if (retval == DICT_ERR) {
2848 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2849 exit(1);
2850 }
2851 /* Set the expire time if needed */
2852 if (expiretime != -1) {
2853 setExpire(db,keyobj,expiretime);
2854 /* Delete this key if already expired */
2855 if (expiretime < now) deleteKey(db,keyobj);
2856 expiretime = -1;
2857 }
2858 keyobj = o = NULL;
2859 }
2860 fclose(fp);
2861 return REDIS_OK;
2862
2863 eoferr: /* unexpected end of file is handled here with a fatal exit */
2864 if (keyobj) decrRefCount(keyobj);
2865 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2866 exit(1);
2867 return REDIS_ERR; /* Just to avoid warning */
2868 }
2869
2870 /*================================== Commands =============================== */
2871
2872 static void authCommand(redisClient *c) {
2873 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2874 c->authenticated = 1;
2875 addReply(c,shared.ok);
2876 } else {
2877 c->authenticated = 0;
2878 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2879 }
2880 }
2881
2882 static void pingCommand(redisClient *c) {
2883 addReply(c,shared.pong);
2884 }
2885
2886 static void echoCommand(redisClient *c) {
2887 addReplyBulkLen(c,c->argv[1]);
2888 addReply(c,c->argv[1]);
2889 addReply(c,shared.crlf);
2890 }
2891
2892 /*=================================== Strings =============================== */
2893
2894 static void setGenericCommand(redisClient *c, int nx) {
2895 int retval;
2896
2897 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2898 if (retval == DICT_ERR) {
2899 if (!nx) {
2900 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2901 incrRefCount(c->argv[2]);
2902 } else {
2903 addReply(c,shared.czero);
2904 return;
2905 }
2906 } else {
2907 incrRefCount(c->argv[1]);
2908 incrRefCount(c->argv[2]);
2909 }
2910 server.dirty++;
2911 removeExpire(c->db,c->argv[1]);
2912 addReply(c, nx ? shared.cone : shared.ok);
2913 }
2914
2915 static void setCommand(redisClient *c) {
2916 setGenericCommand(c,0);
2917 }
2918
2919 static void setnxCommand(redisClient *c) {
2920 setGenericCommand(c,1);
2921 }
2922
2923 static void getCommand(redisClient *c) {
2924 robj *o = lookupKeyRead(c->db,c->argv[1]);
2925
2926 if (o == NULL) {
2927 addReply(c,shared.nullbulk);
2928 } else {
2929 if (o->type != REDIS_STRING) {
2930 addReply(c,shared.wrongtypeerr);
2931 } else {
2932 addReplyBulkLen(c,o);
2933 addReply(c,o);
2934 addReply(c,shared.crlf);
2935 }
2936 }
2937 }
2938
2939 static void getsetCommand(redisClient *c) {
2940 getCommand(c);
2941 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2942 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2943 } else {
2944 incrRefCount(c->argv[1]);
2945 }
2946 incrRefCount(c->argv[2]);
2947 server.dirty++;
2948 removeExpire(c->db,c->argv[1]);
2949 }
2950
2951 static void mgetCommand(redisClient *c) {
2952 int j;
2953
2954 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2955 for (j = 1; j < c->argc; j++) {
2956 robj *o = lookupKeyRead(c->db,c->argv[j]);
2957 if (o == NULL) {
2958 addReply(c,shared.nullbulk);
2959 } else {
2960 if (o->type != REDIS_STRING) {
2961 addReply(c,shared.nullbulk);
2962 } else {
2963 addReplyBulkLen(c,o);
2964 addReply(c,o);
2965 addReply(c,shared.crlf);
2966 }
2967 }
2968 }
2969 }
2970
2971 static void incrDecrCommand(redisClient *c, long long incr) {
2972 long long value;
2973 int retval;
2974 robj *o;
2975
2976 o = lookupKeyWrite(c->db,c->argv[1]);
2977 if (o == NULL) {
2978 value = 0;
2979 } else {
2980 if (o->type != REDIS_STRING) {
2981 value = 0;
2982 } else {
2983 char *eptr;
2984
2985 if (o->encoding == REDIS_ENCODING_RAW)
2986 value = strtoll(o->ptr, &eptr, 10);
2987 else if (o->encoding == REDIS_ENCODING_INT)
2988 value = (long)o->ptr;
2989 else
2990 assert(1 != 1);
2991 }
2992 }
2993
2994 value += incr;
2995 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2996 tryObjectEncoding(o);
2997 retval = dictAdd(c->db->dict,c->argv[1],o);
2998 if (retval == DICT_ERR) {
2999 dictReplace(c->db->dict,c->argv[1],o);
3000 removeExpire(c->db,c->argv[1]);
3001 } else {
3002 incrRefCount(c->argv[1]);
3003 }
3004 server.dirty++;
3005 addReply(c,shared.colon);
3006 addReply(c,o);
3007 addReply(c,shared.crlf);
3008 }
3009
3010 static void incrCommand(redisClient *c) {
3011 incrDecrCommand(c,1);
3012 }
3013
3014 static void decrCommand(redisClient *c) {
3015 incrDecrCommand(c,-1);
3016 }
3017
3018 static void incrbyCommand(redisClient *c) {
3019 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3020 incrDecrCommand(c,incr);
3021 }
3022
3023 static void decrbyCommand(redisClient *c) {
3024 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3025 incrDecrCommand(c,-incr);
3026 }
3027
3028 /* ========================= Type agnostic commands ========================= */
3029
3030 static void delCommand(redisClient *c) {
3031 int deleted = 0, j;
3032
3033 for (j = 1; j < c->argc; j++) {
3034 if (deleteKey(c->db,c->argv[j])) {
3035 server.dirty++;
3036 deleted++;
3037 }
3038 }
3039 switch(deleted) {
3040 case 0:
3041 addReply(c,shared.czero);
3042 break;
3043 case 1:
3044 addReply(c,shared.cone);
3045 break;
3046 default:
3047 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3048 break;
3049 }
3050 }
3051
3052 static void existsCommand(redisClient *c) {
3053 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3054 }
3055
3056 static void selectCommand(redisClient *c) {
3057 int id = atoi(c->argv[1]->ptr);
3058
3059 if (selectDb(c,id) == REDIS_ERR) {
3060 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3061 } else {
3062 addReply(c,shared.ok);
3063 }
3064 }
3065
3066 static void randomkeyCommand(redisClient *c) {
3067 dictEntry *de;
3068
3069 while(1) {
3070 de = dictGetRandomKey(c->db->dict);
3071 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3072 }
3073 if (de == NULL) {
3074 addReply(c,shared.plus);
3075 addReply(c,shared.crlf);
3076 } else {
3077 addReply(c,shared.plus);
3078 addReply(c,dictGetEntryKey(de));
3079 addReply(c,shared.crlf);
3080 }
3081 }
3082
3083 static void keysCommand(redisClient *c) {
3084 dictIterator *di;
3085 dictEntry *de;
3086 sds pattern = c->argv[1]->ptr;
3087 int plen = sdslen(pattern);
3088 int numkeys = 0, keyslen = 0;
3089 robj *lenobj = createObject(REDIS_STRING,NULL);
3090
3091 di = dictGetIterator(c->db->dict);
3092 addReply(c,lenobj);
3093 decrRefCount(lenobj);
3094 while((de = dictNext(di)) != NULL) {
3095 robj *keyobj = dictGetEntryKey(de);
3096
3097 sds key = keyobj->ptr;
3098 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3099 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3100 if (expireIfNeeded(c->db,keyobj) == 0) {
3101 if (numkeys != 0)
3102 addReply(c,shared.space);
3103 addReply(c,keyobj);
3104 numkeys++;
3105 keyslen += sdslen(key);
3106 }
3107 }
3108 }
3109 dictReleaseIterator(di);
3110 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3111 addReply(c,shared.crlf);
3112 }
3113
3114 static void dbsizeCommand(redisClient *c) {
3115 addReplySds(c,
3116 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3117 }
3118
3119 static void lastsaveCommand(redisClient *c) {
3120 addReplySds(c,
3121 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3122 }
3123
3124 static void typeCommand(redisClient *c) {
3125 robj *o;
3126 char *type;
3127
3128 o = lookupKeyRead(c->db,c->argv[1]);
3129 if (o == NULL) {
3130 type = "+none";
3131 } else {
3132 switch(o->type) {
3133 case REDIS_STRING: type = "+string"; break;
3134 case REDIS_LIST: type = "+list"; break;
3135 case REDIS_SET: type = "+set"; break;
3136 case REDIS_ZSET: type = "+zset"; break;
3137 default: type = "unknown"; break;
3138 }
3139 }
3140 addReplySds(c,sdsnew(type));
3141 addReply(c,shared.crlf);
3142 }
3143
3144 static void saveCommand(redisClient *c) {
3145 if (server.bgsaveinprogress) {
3146 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3147 return;
3148 }
3149 if (rdbSave(server.dbfilename) == REDIS_OK) {
3150 addReply(c,shared.ok);
3151 } else {
3152 addReply(c,shared.err);
3153 }
3154 }
3155
3156 static void bgsaveCommand(redisClient *c) {
3157 if (server.bgsaveinprogress) {
3158 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3159 return;
3160 }
3161 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3162 addReply(c,shared.ok);
3163 } else {
3164 addReply(c,shared.err);
3165 }
3166 }
3167
3168 static void shutdownCommand(redisClient *c) {
3169 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3170 /* Kill the saving child if there is a background saving in progress.
3171 We want to avoid race conditions, for instance our saving child may
3172 overwrite the synchronous saving did by SHUTDOWN. */
3173 if (server.bgsaveinprogress) {
3174 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3175 kill(server.bgsavechildpid,SIGKILL);
3176 rdbRemoveTempFile(server.bgsavechildpid);
3177 }
3178 /* SYNC SAVE */
3179 if (rdbSave(server.dbfilename) == REDIS_OK) {
3180 if (server.daemonize)
3181 unlink(server.pidfile);
3182 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3183 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3184 exit(1);
3185 } else {
3186 /* Ooops.. error saving! The best we can do is to continue operating.
3187 * Note that if there was a background saving process, in the next
3188 * cron() Redis will be notified that the background saving aborted,
3189 * handling special stuff like slaves pending for synchronization... */
3190 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3191 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3192 }
3193 }
3194
3195 static void renameGenericCommand(redisClient *c, int nx) {
3196 robj *o;
3197
3198 /* To use the same key as src and dst is probably an error */
3199 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3200 addReply(c,shared.sameobjecterr);
3201 return;
3202 }
3203
3204 o = lookupKeyWrite(c->db,c->argv[1]);
3205 if (o == NULL) {
3206 addReply(c,shared.nokeyerr);
3207 return;
3208 }
3209 incrRefCount(o);
3210 deleteIfVolatile(c->db,c->argv[2]);
3211 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3212 if (nx) {
3213 decrRefCount(o);
3214 addReply(c,shared.czero);
3215 return;
3216 }
3217 dictReplace(c->db->dict,c->argv[2],o);
3218 } else {
3219 incrRefCount(c->argv[2]);
3220 }
3221 deleteKey(c->db,c->argv[1]);
3222 server.dirty++;
3223 addReply(c,nx ? shared.cone : shared.ok);
3224 }
3225
3226 static void renameCommand(redisClient *c) {
3227 renameGenericCommand(c,0);
3228 }
3229
3230 static void renamenxCommand(redisClient *c) {
3231 renameGenericCommand(c,1);
3232 }
3233
3234 static void moveCommand(redisClient *c) {
3235 robj *o;
3236 redisDb *src, *dst;
3237 int srcid;
3238
3239 /* Obtain source and target DB pointers */
3240 src = c->db;
3241 srcid = c->db->id;
3242 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3243 addReply(c,shared.outofrangeerr);
3244 return;
3245 }
3246 dst = c->db;
3247 selectDb(c,srcid); /* Back to the source DB */
3248
3249 /* If the user is moving using as target the same
3250 * DB as the source DB it is probably an error. */
3251 if (src == dst) {
3252 addReply(c,shared.sameobjecterr);
3253 return;
3254 }
3255
3256 /* Check if the element exists and get a reference */
3257 o = lookupKeyWrite(c->db,c->argv[1]);
3258 if (!o) {
3259 addReply(c,shared.czero);
3260 return;
3261 }
3262
3263 /* Try to add the element to the target DB */
3264 deleteIfVolatile(dst,c->argv[1]);
3265 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3266 addReply(c,shared.czero);
3267 return;
3268 }
3269 incrRefCount(c->argv[1]);
3270 incrRefCount(o);
3271
3272 /* OK! key moved, free the entry in the source DB */
3273 deleteKey(src,c->argv[1]);
3274 server.dirty++;
3275 addReply(c,shared.cone);
3276 }
3277
3278 /* =================================== Lists ================================ */
3279 static void pushGenericCommand(redisClient *c, int where) {
3280 robj *lobj;
3281 list *list;
3282
3283 lobj = lookupKeyWrite(c->db,c->argv[1]);
3284 if (lobj == NULL) {
3285 lobj = createListObject();
3286 list = lobj->ptr;
3287 if (where == REDIS_HEAD) {
3288 listAddNodeHead(list,c->argv[2]);
3289 } else {
3290 listAddNodeTail(list,c->argv[2]);
3291 }
3292 dictAdd(c->db->dict,c->argv[1],lobj);
3293 incrRefCount(c->argv[1]);
3294 incrRefCount(c->argv[2]);
3295 } else {
3296 if (lobj->type != REDIS_LIST) {
3297 addReply(c,shared.wrongtypeerr);
3298 return;
3299 }
3300 list = lobj->ptr;
3301 if (where == REDIS_HEAD) {
3302 listAddNodeHead(list,c->argv[2]);
3303 } else {
3304 listAddNodeTail(list,c->argv[2]);
3305 }
3306 incrRefCount(c->argv[2]);
3307 }
3308 server.dirty++;
3309 addReply(c,shared.ok);
3310 }
3311
3312 static void lpushCommand(redisClient *c) {
3313 pushGenericCommand(c,REDIS_HEAD);
3314 }
3315
3316 static void rpushCommand(redisClient *c) {
3317 pushGenericCommand(c,REDIS_TAIL);
3318 }
3319
3320 static void llenCommand(redisClient *c) {
3321 robj *o;
3322 list *l;
3323
3324 o = lookupKeyRead(c->db,c->argv[1]);
3325 if (o == NULL) {
3326 addReply(c,shared.czero);
3327 return;
3328 } else {
3329 if (o->type != REDIS_LIST) {
3330 addReply(c,shared.wrongtypeerr);
3331 } else {
3332 l = o->ptr;
3333 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3334 }
3335 }
3336 }
3337
3338 static void lindexCommand(redisClient *c) {
3339 robj *o;
3340 int index = atoi(c->argv[2]->ptr);
3341
3342 o = lookupKeyRead(c->db,c->argv[1]);
3343 if (o == NULL) {
3344 addReply(c,shared.nullbulk);
3345 } else {
3346 if (o->type != REDIS_LIST) {
3347 addReply(c,shared.wrongtypeerr);
3348 } else {
3349 list *list = o->ptr;
3350 listNode *ln;
3351
3352 ln = listIndex(list, index);
3353 if (ln == NULL) {
3354 addReply(c,shared.nullbulk);
3355 } else {
3356 robj *ele = listNodeValue(ln);
3357 addReplyBulkLen(c,ele);
3358 addReply(c,ele);
3359 addReply(c,shared.crlf);
3360 }
3361 }
3362 }
3363 }
3364
3365 static void lsetCommand(redisClient *c) {
3366 robj *o;
3367 int index = atoi(c->argv[2]->ptr);
3368
3369 o = lookupKeyWrite(c->db,c->argv[1]);
3370 if (o == NULL) {
3371 addReply(c,shared.nokeyerr);
3372 } else {
3373 if (o->type != REDIS_LIST) {
3374 addReply(c,shared.wrongtypeerr);
3375 } else {
3376 list *list = o->ptr;
3377 listNode *ln;
3378
3379 ln = listIndex(list, index);
3380 if (ln == NULL) {
3381 addReply(c,shared.outofrangeerr);
3382 } else {
3383 robj *ele = listNodeValue(ln);
3384
3385 decrRefCount(ele);
3386 listNodeValue(ln) = c->argv[3];
3387 incrRefCount(c->argv[3]);
3388 addReply(c,shared.ok);
3389 server.dirty++;
3390 }
3391 }
3392 }
3393 }
3394
3395 static void popGenericCommand(redisClient *c, int where) {
3396 robj *o;
3397
3398 o = lookupKeyWrite(c->db,c->argv[1]);
3399 if (o == NULL) {
3400 addReply(c,shared.nullbulk);
3401 } else {
3402 if (o->type != REDIS_LIST) {
3403 addReply(c,shared.wrongtypeerr);
3404 } else {
3405 list *list = o->ptr;
3406 listNode *ln;
3407
3408 if (where == REDIS_HEAD)
3409 ln = listFirst(list);
3410 else
3411 ln = listLast(list);
3412
3413 if (ln == NULL) {
3414 addReply(c,shared.nullbulk);
3415 } else {
3416 robj *ele = listNodeValue(ln);
3417 addReplyBulkLen(c,ele);
3418 addReply(c,ele);
3419 addReply(c,shared.crlf);
3420 listDelNode(list,ln);
3421 server.dirty++;
3422 }
3423 }
3424 }
3425 }
3426
3427 static void lpopCommand(redisClient *c) {
3428 popGenericCommand(c,REDIS_HEAD);
3429 }
3430
3431 static void rpopCommand(redisClient *c) {
3432 popGenericCommand(c,REDIS_TAIL);
3433 }
3434
3435 static void lrangeCommand(redisClient *c) {
3436 robj *o;
3437 int start = atoi(c->argv[2]->ptr);
3438 int end = atoi(c->argv[3]->ptr);
3439
3440 o = lookupKeyRead(c->db,c->argv[1]);
3441 if (o == NULL) {
3442 addReply(c,shared.nullmultibulk);
3443 } else {
3444 if (o->type != REDIS_LIST) {
3445 addReply(c,shared.wrongtypeerr);
3446 } else {
3447 list *list = o->ptr;
3448 listNode *ln;
3449 int llen = listLength(list);
3450 int rangelen, j;
3451 robj *ele;
3452
3453 /* convert negative indexes */
3454 if (start < 0) start = llen+start;
3455 if (end < 0) end = llen+end;
3456 if (start < 0) start = 0;
3457 if (end < 0) end = 0;
3458
3459 /* indexes sanity checks */
3460 if (start > end || start >= llen) {
3461 /* Out of range start or start > end result in empty list */
3462 addReply(c,shared.emptymultibulk);
3463 return;
3464 }
3465 if (end >= llen) end = llen-1;
3466 rangelen = (end-start)+1;
3467
3468 /* Return the result in form of a multi-bulk reply */
3469 ln = listIndex(list, start);
3470 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3471 for (j = 0; j < rangelen; j++) {
3472 ele = listNodeValue(ln);
3473 addReplyBulkLen(c,ele);
3474 addReply(c,ele);
3475 addReply(c,shared.crlf);
3476 ln = ln->next;
3477 }
3478 }
3479 }
3480 }
3481
3482 static void ltrimCommand(redisClient *c) {
3483 robj *o;
3484 int start = atoi(c->argv[2]->ptr);
3485 int end = atoi(c->argv[3]->ptr);
3486
3487 o = lookupKeyWrite(c->db,c->argv[1]);
3488 if (o == NULL) {
3489 addReply(c,shared.nokeyerr);
3490 } else {
3491 if (o->type != REDIS_LIST) {
3492 addReply(c,shared.wrongtypeerr);
3493 } else {
3494 list *list = o->ptr;
3495 listNode *ln;
3496 int llen = listLength(list);
3497 int j, ltrim, rtrim;
3498
3499 /* convert negative indexes */
3500 if (start < 0) start = llen+start;
3501 if (end < 0) end = llen+end;
3502 if (start < 0) start = 0;
3503 if (end < 0) end = 0;
3504
3505 /* indexes sanity checks */
3506 if (start > end || start >= llen) {
3507 /* Out of range start or start > end result in empty list */
3508 ltrim = llen;
3509 rtrim = 0;
3510 } else {
3511 if (end >= llen) end = llen-1;
3512 ltrim = start;
3513 rtrim = llen-end-1;
3514 }
3515
3516 /* Remove list elements to perform the trim */
3517 for (j = 0; j < ltrim; j++) {
3518 ln = listFirst(list);
3519 listDelNode(list,ln);
3520 }
3521 for (j = 0; j < rtrim; j++) {
3522 ln = listLast(list);
3523 listDelNode(list,ln);
3524 }
3525 server.dirty++;
3526 addReply(c,shared.ok);
3527 }
3528 }
3529 }
3530
3531 static void lremCommand(redisClient *c) {
3532 robj *o;
3533
3534 o = lookupKeyWrite(c->db,c->argv[1]);
3535 if (o == NULL) {
3536 addReply(c,shared.czero);
3537 } else {
3538 if (o->type != REDIS_LIST) {
3539 addReply(c,shared.wrongtypeerr);
3540 } else {
3541 list *list = o->ptr;
3542 listNode *ln, *next;
3543 int toremove = atoi(c->argv[2]->ptr);
3544 int removed = 0;
3545 int fromtail = 0;
3546
3547 if (toremove < 0) {
3548 toremove = -toremove;
3549 fromtail = 1;
3550 }
3551 ln = fromtail ? list->tail : list->head;
3552 while (ln) {
3553 robj *ele = listNodeValue(ln);
3554
3555 next = fromtail ? ln->prev : ln->next;
3556 if (compareStringObjects(ele,c->argv[3]) == 0) {
3557 listDelNode(list,ln);
3558 server.dirty++;
3559 removed++;
3560 if (toremove && removed == toremove) break;
3561 }
3562 ln = next;
3563 }
3564 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3565 }
3566 }
3567 }
3568
3569 /* This is the semantic of this command:
3570 * RPOPLPUSH srclist dstlist:
3571 * IF LLEN(srclist) > 0
3572 * element = RPOP srclist
3573 * LPUSH dstlist element
3574 * RETURN element
3575 * ELSE
3576 * RETURN nil
3577 * END
3578 * END
3579 *
3580 * The idea is to be able to get an element from a list in a reliable way
3581 * since the element is not just returned but pushed against another list
3582 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3583 */
3584 static void rpoplpushcommand(redisClient *c) {
3585 robj *sobj;
3586
3587 sobj = lookupKeyWrite(c->db,c->argv[1]);
3588 if (sobj == NULL) {
3589 addReply(c,shared.nullbulk);
3590 } else {
3591 if (sobj->type != REDIS_LIST) {
3592 addReply(c,shared.wrongtypeerr);
3593 } else {
3594 list *srclist = sobj->ptr;
3595 listNode *ln = listLast(srclist);
3596
3597 if (ln == NULL) {
3598 addReply(c,shared.nullbulk);
3599 } else {
3600 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3601 robj *ele = listNodeValue(ln);
3602 list *dstlist;
3603
3604 if (dobj == NULL) {
3605
3606 /* Create the list if the key does not exist */
3607 dobj = createListObject();
3608 dictAdd(c->db->dict,c->argv[2],dobj);
3609 incrRefCount(c->argv[2]);
3610 } else if (dobj->type != REDIS_LIST) {
3611 addReply(c,shared.wrongtypeerr);
3612 return;
3613 }
3614 /* Add the element to the target list */
3615 dstlist = dobj->ptr;
3616 listAddNodeHead(dstlist,ele);
3617 incrRefCount(ele);
3618
3619 /* Send the element to the client as reply as well */
3620 addReplyBulkLen(c,ele);
3621 addReply(c,ele);
3622 addReply(c,shared.crlf);
3623
3624 /* Finally remove the element from the source list */
3625 listDelNode(srclist,ln);
3626 server.dirty++;
3627 }
3628 }
3629 }
3630 }
3631
3632
3633 /* ==================================== Sets ================================ */
3634
3635 static void saddCommand(redisClient *c) {
3636 robj *set;
3637
3638 set = lookupKeyWrite(c->db,c->argv[1]);
3639 if (set == NULL) {
3640 set = createSetObject();
3641 dictAdd(c->db->dict,c->argv[1],set);
3642 incrRefCount(c->argv[1]);
3643 } else {
3644 if (set->type != REDIS_SET) {
3645 addReply(c,shared.wrongtypeerr);
3646 return;
3647 }
3648 }
3649 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3650 incrRefCount(c->argv[2]);
3651 server.dirty++;
3652 addReply(c,shared.cone);
3653 } else {
3654 addReply(c,shared.czero);
3655 }
3656 }
3657
3658 static void sremCommand(redisClient *c) {
3659 robj *set;
3660
3661 set = lookupKeyWrite(c->db,c->argv[1]);
3662 if (set == NULL) {
3663 addReply(c,shared.czero);
3664 } else {
3665 if (set->type != REDIS_SET) {
3666 addReply(c,shared.wrongtypeerr);
3667 return;
3668 }
3669 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3670 server.dirty++;
3671 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3672 addReply(c,shared.cone);
3673 } else {
3674 addReply(c,shared.czero);
3675 }
3676 }
3677 }
3678
3679 static void smoveCommand(redisClient *c) {
3680 robj *srcset, *dstset;
3681
3682 srcset = lookupKeyWrite(c->db,c->argv[1]);
3683 dstset = lookupKeyWrite(c->db,c->argv[2]);
3684
3685 /* If the source key does not exist return 0, if it's of the wrong type
3686 * raise an error */
3687 if (srcset == NULL || srcset->type != REDIS_SET) {
3688 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3689 return;
3690 }
3691 /* Error if the destination key is not a set as well */
3692 if (dstset && dstset->type != REDIS_SET) {
3693 addReply(c,shared.wrongtypeerr);
3694 return;
3695 }
3696 /* Remove the element from the source set */
3697 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3698 /* Key not found in the src set! return zero */
3699 addReply(c,shared.czero);
3700 return;
3701 }
3702 server.dirty++;
3703 /* Add the element to the destination set */
3704 if (!dstset) {
3705 dstset = createSetObject();
3706 dictAdd(c->db->dict,c->argv[2],dstset);
3707 incrRefCount(c->argv[2]);
3708 }
3709 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3710 incrRefCount(c->argv[3]);
3711 addReply(c,shared.cone);
3712 }
3713
3714 static void sismemberCommand(redisClient *c) {
3715 robj *set;
3716
3717 set = lookupKeyRead(c->db,c->argv[1]);
3718 if (set == NULL) {
3719 addReply(c,shared.czero);
3720 } else {
3721 if (set->type != REDIS_SET) {
3722 addReply(c,shared.wrongtypeerr);
3723 return;
3724 }
3725 if (dictFind(set->ptr,c->argv[2]))
3726 addReply(c,shared.cone);
3727 else
3728 addReply(c,shared.czero);
3729 }
3730 }
3731
3732 static void scardCommand(redisClient *c) {
3733 robj *o;
3734 dict *s;
3735
3736 o = lookupKeyRead(c->db,c->argv[1]);
3737 if (o == NULL) {
3738 addReply(c,shared.czero);
3739 return;
3740 } else {
3741 if (o->type != REDIS_SET) {
3742 addReply(c,shared.wrongtypeerr);
3743 } else {
3744 s = o->ptr;
3745 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3746 dictSize(s)));
3747 }
3748 }
3749 }
3750
3751 static void spopCommand(redisClient *c) {
3752 robj *set;
3753 dictEntry *de;
3754
3755 set = lookupKeyWrite(c->db,c->argv[1]);
3756 if (set == NULL) {
3757 addReply(c,shared.nullbulk);
3758 } else {
3759 if (set->type != REDIS_SET) {
3760 addReply(c,shared.wrongtypeerr);
3761 return;
3762 }
3763 de = dictGetRandomKey(set->ptr);
3764 if (de == NULL) {
3765 addReply(c,shared.nullbulk);
3766 } else {
3767 robj *ele = dictGetEntryKey(de);
3768
3769 addReplyBulkLen(c,ele);
3770 addReply(c,ele);
3771 addReply(c,shared.crlf);
3772 dictDelete(set->ptr,ele);
3773 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3774 server.dirty++;
3775 }
3776 }
3777 }
3778
3779 static void srandmemberCommand(redisClient *c) {
3780 robj *set;
3781 dictEntry *de;
3782
3783 set = lookupKeyRead(c->db,c->argv[1]);
3784 if (set == NULL) {
3785 addReply(c,shared.nullbulk);
3786 } else {
3787 if (set->type != REDIS_SET) {
3788 addReply(c,shared.wrongtypeerr);
3789 return;
3790 }
3791 de = dictGetRandomKey(set->ptr);
3792 if (de == NULL) {
3793 addReply(c,shared.nullbulk);
3794 } else {
3795 robj *ele = dictGetEntryKey(de);
3796
3797 addReplyBulkLen(c,ele);
3798 addReply(c,ele);
3799 addReply(c,shared.crlf);
3800 }
3801 }
3802 }
3803
3804 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3805 dict **d1 = (void*) s1, **d2 = (void*) s2;
3806
3807 return dictSize(*d1)-dictSize(*d2);
3808 }
3809
3810 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3811 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3812 dictIterator *di;
3813 dictEntry *de;
3814 robj *lenobj = NULL, *dstset = NULL;
3815 int j, cardinality = 0;
3816
3817 for (j = 0; j < setsnum; j++) {
3818 robj *setobj;
3819
3820 setobj = dstkey ?
3821 lookupKeyWrite(c->db,setskeys[j]) :
3822 lookupKeyRead(c->db,setskeys[j]);
3823 if (!setobj) {
3824 zfree(dv);
3825 if (dstkey) {
3826 deleteKey(c->db,dstkey);
3827 addReply(c,shared.ok);
3828 } else {
3829 addReply(c,shared.nullmultibulk);
3830 }
3831 return;
3832 }
3833 if (setobj->type != REDIS_SET) {
3834 zfree(dv);
3835 addReply(c,shared.wrongtypeerr);
3836 return;
3837 }
3838 dv[j] = setobj->ptr;
3839 }
3840 /* Sort sets from the smallest to largest, this will improve our
3841 * algorithm's performace */
3842 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3843
3844 /* The first thing we should output is the total number of elements...
3845 * since this is a multi-bulk write, but at this stage we don't know
3846 * the intersection set size, so we use a trick, append an empty object
3847 * to the output list and save the pointer to later modify it with the
3848 * right length */
3849 if (!dstkey) {
3850 lenobj = createObject(REDIS_STRING,NULL);
3851 addReply(c,lenobj);
3852 decrRefCount(lenobj);
3853 } else {
3854 /* If we have a target key where to store the resulting set
3855 * create this key with an empty set inside */
3856 dstset = createSetObject();
3857 }
3858
3859 /* Iterate all the elements of the first (smallest) set, and test
3860 * the element against all the other sets, if at least one set does
3861 * not include the element it is discarded */
3862 di = dictGetIterator(dv[0]);
3863
3864 while((de = dictNext(di)) != NULL) {
3865 robj *ele;
3866
3867 for (j = 1; j < setsnum; j++)
3868 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3869 if (j != setsnum)
3870 continue; /* at least one set does not contain the member */
3871 ele = dictGetEntryKey(de);
3872 if (!dstkey) {
3873 addReplyBulkLen(c,ele);
3874 addReply(c,ele);
3875 addReply(c,shared.crlf);
3876 cardinality++;
3877 } else {
3878 dictAdd(dstset->ptr,ele,NULL);
3879 incrRefCount(ele);
3880 }
3881 }
3882 dictReleaseIterator(di);
3883
3884 if (dstkey) {
3885 /* Store the resulting set into the target */
3886 deleteKey(c->db,dstkey);
3887 dictAdd(c->db->dict,dstkey,dstset);
3888 incrRefCount(dstkey);
3889 }
3890
3891 if (!dstkey) {
3892 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3893 } else {
3894 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3895 dictSize((dict*)dstset->ptr)));
3896 server.dirty++;
3897 }
3898 zfree(dv);
3899 }
3900
3901 static void sinterCommand(redisClient *c) {
3902 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3903 }
3904
3905 static void sinterstoreCommand(redisClient *c) {
3906 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3907 }
3908
3909 #define REDIS_OP_UNION 0
3910 #define REDIS_OP_DIFF 1
3911
3912 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3913 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3914 dictIterator *di;
3915 dictEntry *de;
3916 robj *dstset = NULL;
3917 int j, cardinality = 0;
3918
3919 for (j = 0; j < setsnum; j++) {
3920 robj *setobj;
3921
3922 setobj = dstkey ?
3923 lookupKeyWrite(c->db,setskeys[j]) :
3924 lookupKeyRead(c->db,setskeys[j]);
3925 if (!setobj) {
3926 dv[j] = NULL;
3927 continue;
3928 }
3929 if (setobj->type != REDIS_SET) {
3930 zfree(dv);
3931 addReply(c,shared.wrongtypeerr);
3932 return;
3933 }
3934 dv[j] = setobj->ptr;
3935 }
3936
3937 /* We need a temp set object to store our union. If the dstkey
3938 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3939 * this set object will be the resulting object to set into the target key*/
3940 dstset = createSetObject();
3941
3942 /* Iterate all the elements of all the sets, add every element a single
3943 * time to the result set */
3944 for (j = 0; j < setsnum; j++) {
3945 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3946 if (!dv[j]) continue; /* non existing keys are like empty sets */
3947
3948 di = dictGetIterator(dv[j]);
3949
3950 while((de = dictNext(di)) != NULL) {
3951 robj *ele;
3952
3953 /* dictAdd will not add the same element multiple times */
3954 ele = dictGetEntryKey(de);
3955 if (op == REDIS_OP_UNION || j == 0) {
3956 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3957 incrRefCount(ele);
3958 cardinality++;
3959 }
3960 } else if (op == REDIS_OP_DIFF) {
3961 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3962 cardinality--;
3963 }
3964 }
3965 }
3966 dictReleaseIterator(di);
3967
3968 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3969 }
3970
3971 /* Output the content of the resulting set, if not in STORE mode */
3972 if (!dstkey) {
3973 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3974 di = dictGetIterator(dstset->ptr);
3975 while((de = dictNext(di)) != NULL) {
3976 robj *ele;
3977
3978 ele = dictGetEntryKey(de);
3979 addReplyBulkLen(c,ele);
3980 addReply(c,ele);
3981 addReply(c,shared.crlf);
3982 }
3983 dictReleaseIterator(di);
3984 } else {
3985 /* If we have a target key where to store the resulting set
3986 * create this key with the result set inside */
3987 deleteKey(c->db,dstkey);
3988 dictAdd(c->db->dict,dstkey,dstset);
3989 incrRefCount(dstkey);
3990 }
3991
3992 /* Cleanup */
3993 if (!dstkey) {
3994 decrRefCount(dstset);
3995 } else {
3996 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3997 dictSize((dict*)dstset->ptr)));
3998 server.dirty++;
3999 }
4000 zfree(dv);
4001 }
4002
4003 static void sunionCommand(redisClient *c) {
4004 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4005 }
4006
4007 static void sunionstoreCommand(redisClient *c) {
4008 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4009 }
4010
4011 static void sdiffCommand(redisClient *c) {
4012 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4013 }
4014
4015 static void sdiffstoreCommand(redisClient *c) {
4016 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4017 }
4018
4019 /* ==================================== ZSets =============================== */
4020
4021 /* ZSETs are ordered sets using two data structures to hold the same elements
4022 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4023 * data structure.
4024 *
4025 * The elements are added to an hash table mapping Redis objects to scores.
4026 * At the same time the elements are added to a skip list mapping scores
4027 * to Redis objects (so objects are sorted by scores in this "view"). */
4028
4029 /* This skiplist implementation is almost a C translation of the original
4030 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4031 * Alternative to Balanced Trees", modified in three ways:
4032 * a) this implementation allows for repeated values.
4033 * b) the comparison is not just by key (our 'score') but by satellite data.
4034 * c) there is a back pointer, so it's a doubly linked list with the back
4035 * pointers being only at "level 1". This allows to traverse the list
4036 * from tail to head, useful for ZREVRANGE. */
4037
4038 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4039 zskiplistNode *zn = zmalloc(sizeof(*zn));
4040
4041 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4042 zn->score = score;
4043 zn->obj = obj;
4044 return zn;
4045 }
4046
4047 static zskiplist *zslCreate(void) {
4048 int j;
4049 zskiplist *zsl;
4050
4051 zsl = zmalloc(sizeof(*zsl));
4052 zsl->level = 1;
4053 zsl->length = 0;
4054 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4055 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4056 zsl->header->forward[j] = NULL;
4057 zsl->header->backward = NULL;
4058 zsl->tail = NULL;
4059 return zsl;
4060 }
4061
4062 static void zslFreeNode(zskiplistNode *node) {
4063 decrRefCount(node->obj);
4064 zfree(node->forward);
4065 zfree(node);
4066 }
4067
4068 static void zslFree(zskiplist *zsl) {
4069 zskiplistNode *node = zsl->header->forward[0], *next;
4070
4071 zfree(zsl->header->forward);
4072 zfree(zsl->header);
4073 while(node) {
4074 next = node->forward[0];
4075 zslFreeNode(node);
4076 node = next;
4077 }
4078 zfree(zsl);
4079 }
4080
4081 static int zslRandomLevel(void) {
4082 int level = 1;
4083 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4084 level += 1;
4085 return level;
4086 }
4087
4088 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4089 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4090 int i, level;
4091
4092 x = zsl->header;
4093 for (i = zsl->level-1; i >= 0; i--) {
4094 while (x->forward[i] &&
4095 (x->forward[i]->score < score ||
4096 (x->forward[i]->score == score &&
4097 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4098 x = x->forward[i];
4099 update[i] = x;
4100 }
4101 /* we assume the key is not already inside, since we allow duplicated
4102 * scores, and the re-insertion of score and redis object should never
4103 * happpen since the caller of zslInsert() should test in the hash table
4104 * if the element is already inside or not. */
4105 level = zslRandomLevel();
4106 if (level > zsl->level) {
4107 for (i = zsl->level; i < level; i++)
4108 update[i] = zsl->header;
4109 zsl->level = level;
4110 }
4111 x = zslCreateNode(level,score,obj);
4112 for (i = 0; i < level; i++) {
4113 x->forward[i] = update[i]->forward[i];
4114 update[i]->forward[i] = x;
4115 }
4116 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4117 if (x->forward[0])
4118 x->forward[0]->backward = x;
4119 else
4120 zsl->tail = x;
4121 zsl->length++;
4122 }
4123
4124 /* Delete an element with matching score/object from the skiplist. */
4125 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4126 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4127 int i;
4128
4129 x = zsl->header;
4130 for (i = zsl->level-1; i >= 0; i--) {
4131 while (x->forward[i] &&
4132 (x->forward[i]->score < score ||
4133 (x->forward[i]->score == score &&
4134 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4135 x = x->forward[i];
4136 update[i] = x;
4137 }
4138 /* We may have multiple elements with the same score, what we need
4139 * is to find the element with both the right score and object. */
4140 x = x->forward[0];
4141 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4142 for (i = 0; i < zsl->level; i++) {
4143 if (update[i]->forward[i] != x) break;
4144 update[i]->forward[i] = x->forward[i];
4145 }
4146 if (x->forward[0]) {
4147 x->forward[0]->backward = (x->backward == zsl->header) ?
4148 NULL : x->backward;
4149 } else {
4150 zsl->tail = x->backward;
4151 }
4152 zslFreeNode(x);
4153 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4154 zsl->level--;
4155 zsl->length--;
4156 return 1;
4157 } else {
4158 return 0; /* not found */
4159 }
4160 return 0; /* not found */
4161 }
4162
4163 /* Delete all the elements with score between min and max from the skiplist.
4164 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4165 * Note that this function takes the reference to the hash table view of the
4166 * sorted set, in order to remove the elements from the hash table too. */
4167 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4168 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4169 unsigned long removed = 0;
4170 int i;
4171
4172 x = zsl->header;
4173 for (i = zsl->level-1; i >= 0; i--) {
4174 while (x->forward[i] && x->forward[i]->score < min)
4175 x = x->forward[i];
4176 update[i] = x;
4177 }
4178 /* We may have multiple elements with the same score, what we need
4179 * is to find the element with both the right score and object. */
4180 x = x->forward[0];
4181 while (x && x->score <= max) {
4182 zskiplistNode *next;
4183
4184 for (i = 0; i < zsl->level; i++) {
4185 if (update[i]->forward[i] != x) break;
4186 update[i]->forward[i] = x->forward[i];
4187 }
4188 if (x->forward[0]) {
4189 x->forward[0]->backward = (x->backward == zsl->header) ?
4190 NULL : x->backward;
4191 } else {
4192 zsl->tail = x->backward;
4193 }
4194 next = x->forward[0];
4195 dictDelete(dict,x->obj);
4196 zslFreeNode(x);
4197 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4198 zsl->level--;
4199 zsl->length--;
4200 removed++;
4201 x = next;
4202 }
4203 return removed; /* not found */
4204 }
4205
4206 /* Find the first node having a score equal or greater than the specified one.
4207 * Returns NULL if there is no match. */
4208 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4209 zskiplistNode *x;
4210 int i;
4211
4212 x = zsl->header;
4213 for (i = zsl->level-1; i >= 0; i--) {
4214 while (x->forward[i] && x->forward[i]->score < score)
4215 x = x->forward[i];
4216 }
4217 /* We may have multiple elements with the same score, what we need
4218 * is to find the element with both the right score and object. */
4219 return x->forward[0];
4220 }
4221
4222 /* The actual Z-commands implementations */
4223
4224 static void zaddCommand(redisClient *c) {
4225 robj *zsetobj;
4226 zset *zs;
4227 double *score;
4228
4229 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4230 if (zsetobj == NULL) {
4231 zsetobj = createZsetObject();
4232 dictAdd(c->db->dict,c->argv[1],zsetobj);
4233 incrRefCount(c->argv[1]);
4234 } else {
4235 if (zsetobj->type != REDIS_ZSET) {
4236 addReply(c,shared.wrongtypeerr);
4237 return;
4238 }
4239 }
4240 score = zmalloc(sizeof(double));
4241 *score = strtod(c->argv[2]->ptr,NULL);
4242 zs = zsetobj->ptr;
4243 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4244 /* case 1: New element */
4245 incrRefCount(c->argv[3]); /* added to hash */
4246 zslInsert(zs->zsl,*score,c->argv[3]);
4247 incrRefCount(c->argv[3]); /* added to skiplist */
4248 server.dirty++;
4249 addReply(c,shared.cone);
4250 } else {
4251 dictEntry *de;
4252 double *oldscore;
4253
4254 /* case 2: Score update operation */
4255 de = dictFind(zs->dict,c->argv[3]);
4256 assert(de != NULL);
4257 oldscore = dictGetEntryVal(de);
4258 if (*score != *oldscore) {
4259 int deleted;
4260
4261 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4262 assert(deleted != 0);
4263 zslInsert(zs->zsl,*score,c->argv[3]);
4264 incrRefCount(c->argv[3]);
4265 dictReplace(zs->dict,c->argv[3],score);
4266 server.dirty++;
4267 } else {
4268 zfree(score);
4269 }
4270 addReply(c,shared.czero);
4271 }
4272 }
4273
4274 static void zremCommand(redisClient *c) {
4275 robj *zsetobj;
4276 zset *zs;
4277
4278 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4279 if (zsetobj == NULL) {
4280 addReply(c,shared.czero);
4281 } else {
4282 dictEntry *de;
4283 double *oldscore;
4284 int deleted;
4285
4286 if (zsetobj->type != REDIS_ZSET) {
4287 addReply(c,shared.wrongtypeerr);
4288 return;
4289 }
4290 zs = zsetobj->ptr;
4291 de = dictFind(zs->dict,c->argv[2]);
4292 if (de == NULL) {
4293 addReply(c,shared.czero);
4294 return;
4295 }
4296 /* Delete from the skiplist */
4297 oldscore = dictGetEntryVal(de);
4298 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4299 assert(deleted != 0);
4300
4301 /* Delete from the hash table */
4302 dictDelete(zs->dict,c->argv[2]);
4303 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4304 server.dirty++;
4305 addReply(c,shared.cone);
4306 }
4307 }
4308
4309 static void zremrangebyscoreCommand(redisClient *c) {
4310 double min = strtod(c->argv[2]->ptr,NULL);
4311 double max = strtod(c->argv[3]->ptr,NULL);
4312 robj *zsetobj;
4313 zset *zs;
4314
4315 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4316 if (zsetobj == NULL) {
4317 addReply(c,shared.czero);
4318 } else {
4319 long deleted;
4320
4321 if (zsetobj->type != REDIS_ZSET) {
4322 addReply(c,shared.wrongtypeerr);
4323 return;
4324 }
4325 zs = zsetobj->ptr;
4326 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4327 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4328 server.dirty += deleted;
4329 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4330 }
4331 }
4332
4333 static void zrangeGenericCommand(redisClient *c, int reverse) {
4334 robj *o;
4335 int start = atoi(c->argv[2]->ptr);
4336 int end = atoi(c->argv[3]->ptr);
4337
4338 o = lookupKeyRead(c->db,c->argv[1]);
4339 if (o == NULL) {
4340 addReply(c,shared.nullmultibulk);
4341 } else {
4342 if (o->type != REDIS_ZSET) {
4343 addReply(c,shared.wrongtypeerr);
4344 } else {
4345 zset *zsetobj = o->ptr;
4346 zskiplist *zsl = zsetobj->zsl;
4347 zskiplistNode *ln;
4348
4349 int llen = zsl->length;
4350 int rangelen, j;
4351 robj *ele;
4352
4353 /* convert negative indexes */
4354 if (start < 0) start = llen+start;
4355 if (end < 0) end = llen+end;
4356 if (start < 0) start = 0;
4357 if (end < 0) end = 0;
4358
4359 /* indexes sanity checks */
4360 if (start > end || start >= llen) {
4361 /* Out of range start or start > end result in empty list */
4362 addReply(c,shared.emptymultibulk);
4363 return;
4364 }
4365 if (end >= llen) end = llen-1;
4366 rangelen = (end-start)+1;
4367
4368 /* Return the result in form of a multi-bulk reply */
4369 if (reverse) {
4370 ln = zsl->tail;
4371 while (start--)
4372 ln = ln->backward;
4373 } else {
4374 ln = zsl->header->forward[0];
4375 while (start--)
4376 ln = ln->forward[0];
4377 }
4378
4379 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4380 for (j = 0; j < rangelen; j++) {
4381 ele = ln->obj;
4382 addReplyBulkLen(c,ele);
4383 addReply(c,ele);
4384 addReply(c,shared.crlf);
4385 ln = reverse ? ln->backward : ln->forward[0];
4386 }
4387 }
4388 }
4389 }
4390
4391 static void zrangeCommand(redisClient *c) {
4392 zrangeGenericCommand(c,0);
4393 }
4394
4395 static void zrevrangeCommand(redisClient *c) {
4396 zrangeGenericCommand(c,1);
4397 }
4398
4399 static void zrangebyscoreCommand(redisClient *c) {
4400 robj *o;
4401 double min = strtod(c->argv[2]->ptr,NULL);
4402 double max = strtod(c->argv[3]->ptr,NULL);
4403
4404 o = lookupKeyRead(c->db,c->argv[1]);
4405 if (o == NULL) {
4406 addReply(c,shared.nullmultibulk);
4407 } else {
4408 if (o->type != REDIS_ZSET) {
4409 addReply(c,shared.wrongtypeerr);
4410 } else {
4411 zset *zsetobj = o->ptr;
4412 zskiplist *zsl = zsetobj->zsl;
4413 zskiplistNode *ln;
4414 robj *ele, *lenobj;
4415 unsigned int rangelen = 0;
4416
4417 /* Get the first node with the score >= min */
4418 ln = zslFirstWithScore(zsl,min);
4419 if (ln == NULL) {
4420 /* No element matching the speciifed interval */
4421 addReply(c,shared.emptymultibulk);
4422 return;
4423 }
4424
4425 /* We don't know in advance how many matching elements there
4426 * are in the list, so we push this object that will represent
4427 * the multi-bulk length in the output buffer, and will "fix"
4428 * it later */
4429 lenobj = createObject(REDIS_STRING,NULL);
4430 addReply(c,lenobj);
4431
4432 while(ln && ln->score <= max) {
4433 ele = ln->obj;
4434 addReplyBulkLen(c,ele);
4435 addReply(c,ele);
4436 addReply(c,shared.crlf);
4437 ln = ln->forward[0];
4438 rangelen++;
4439 }
4440 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4441 }
4442 }
4443 }
4444
4445 static void zcardCommand(redisClient *c) {
4446 robj *o;
4447 zset *zs;
4448
4449 o = lookupKeyRead(c->db,c->argv[1]);
4450 if (o == NULL) {
4451 addReply(c,shared.czero);
4452 return;
4453 } else {
4454 if (o->type != REDIS_ZSET) {
4455 addReply(c,shared.wrongtypeerr);
4456 } else {
4457 zs = o->ptr;
4458 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4459 }
4460 }
4461 }
4462
4463 static void zscoreCommand(redisClient *c) {
4464 robj *o;
4465 zset *zs;
4466
4467 o = lookupKeyRead(c->db,c->argv[1]);
4468 if (o == NULL) {
4469 addReply(c,shared.nullbulk);
4470 return;
4471 } else {
4472 if (o->type != REDIS_ZSET) {
4473 addReply(c,shared.wrongtypeerr);
4474 } else {
4475 dictEntry *de;
4476
4477 zs = o->ptr;
4478 de = dictFind(zs->dict,c->argv[2]);
4479 if (!de) {
4480 addReply(c,shared.nullbulk);
4481 } else {
4482 char buf[128];
4483 double *score = dictGetEntryVal(de);
4484
4485 snprintf(buf,sizeof(buf),"%.17g",*score);
4486 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4487 strlen(buf),buf));
4488 }
4489 }
4490 }
4491 }
4492
4493 /* ========================= Non type-specific commands ==================== */
4494
4495 static void flushdbCommand(redisClient *c) {
4496 server.dirty += dictSize(c->db->dict);
4497 dictEmpty(c->db->dict);
4498 dictEmpty(c->db->expires);
4499 addReply(c,shared.ok);
4500 }
4501
4502 static void flushallCommand(redisClient *c) {
4503 server.dirty += emptyDb();
4504 addReply(c,shared.ok);
4505 rdbSave(server.dbfilename);
4506 server.dirty++;
4507 }
4508
4509 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4510 redisSortOperation *so = zmalloc(sizeof(*so));
4511 so->type = type;
4512 so->pattern = pattern;
4513 return so;
4514 }
4515
4516 /* Return the value associated to the key with a name obtained
4517 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4518 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4519 char *p;
4520 sds spat, ssub;
4521 robj keyobj;
4522 int prefixlen, sublen, postfixlen;
4523 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4524 struct {
4525 long len;
4526 long free;
4527 char buf[REDIS_SORTKEY_MAX+1];
4528 } keyname;
4529
4530 if (subst->encoding == REDIS_ENCODING_RAW)
4531 incrRefCount(subst);
4532 else {
4533 subst = getDecodedObject(subst);
4534 }
4535
4536 spat = pattern->ptr;
4537 ssub = subst->ptr;
4538 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4539 p = strchr(spat,'*');
4540 if (!p) return NULL;
4541
4542 prefixlen = p-spat;
4543 sublen = sdslen(ssub);
4544 postfixlen = sdslen(spat)-(prefixlen+1);
4545 memcpy(keyname.buf,spat,prefixlen);
4546 memcpy(keyname.buf+prefixlen,ssub,sublen);
4547 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4548 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4549 keyname.len = prefixlen+sublen+postfixlen;
4550
4551 keyobj.refcount = 1;
4552 keyobj.type = REDIS_STRING;
4553 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4554
4555 decrRefCount(subst);
4556
4557 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4558 return lookupKeyRead(db,&keyobj);
4559 }
4560
4561 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4562 * the additional parameter is not standard but a BSD-specific we have to
4563 * pass sorting parameters via the global 'server' structure */
4564 static int sortCompare(const void *s1, const void *s2) {
4565 const redisSortObject *so1 = s1, *so2 = s2;
4566 int cmp;
4567
4568 if (!server.sort_alpha) {
4569 /* Numeric sorting. Here it's trivial as we precomputed scores */
4570 if (so1->u.score > so2->u.score) {
4571 cmp = 1;
4572 } else if (so1->u.score < so2->u.score) {
4573 cmp = -1;
4574 } else {
4575 cmp = 0;
4576 }
4577 } else {
4578 /* Alphanumeric sorting */
4579 if (server.sort_bypattern) {
4580 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4581 /* At least one compare object is NULL */
4582 if (so1->u.cmpobj == so2->u.cmpobj)
4583 cmp = 0;
4584 else if (so1->u.cmpobj == NULL)
4585 cmp = -1;
4586 else
4587 cmp = 1;
4588 } else {
4589 /* We have both the objects, use strcoll */
4590 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4591 }
4592 } else {
4593 /* Compare elements directly */
4594 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4595 so2->obj->encoding == REDIS_ENCODING_RAW) {
4596 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4597 } else {
4598 robj *dec1, *dec2;
4599
4600 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4601 so1->obj : getDecodedObject(so1->obj);
4602 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4603 so2->obj : getDecodedObject(so2->obj);
4604 cmp = strcoll(dec1->ptr,dec2->ptr);
4605 if (dec1 != so1->obj) decrRefCount(dec1);
4606 if (dec2 != so2->obj) decrRefCount(dec2);
4607 }
4608 }
4609 }
4610 return server.sort_desc ? -cmp : cmp;
4611 }
4612
4613 /* The SORT command is the most complex command in Redis. Warning: this code
4614 * is optimized for speed and a bit less for readability */
4615 static void sortCommand(redisClient *c) {
4616 list *operations;
4617 int outputlen = 0;
4618 int desc = 0, alpha = 0;
4619 int limit_start = 0, limit_count = -1, start, end;
4620 int j, dontsort = 0, vectorlen;
4621 int getop = 0; /* GET operation counter */
4622 robj *sortval, *sortby = NULL, *storekey = NULL;
4623 redisSortObject *vector; /* Resulting vector to sort */
4624
4625 /* Lookup the key to sort. It must be of the right types */
4626 sortval = lookupKeyRead(c->db,c->argv[1]);
4627 if (sortval == NULL) {
4628 addReply(c,shared.nokeyerr);
4629 return;
4630 }
4631 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4632 addReply(c,shared.wrongtypeerr);
4633 return;
4634 }
4635
4636 /* Create a list of operations to perform for every sorted element.
4637 * Operations can be GET/DEL/INCR/DECR */
4638 operations = listCreate();
4639 listSetFreeMethod(operations,zfree);
4640 j = 2;
4641
4642 /* Now we need to protect sortval incrementing its count, in the future
4643 * SORT may have options able to overwrite/delete keys during the sorting
4644 * and the sorted key itself may get destroied */
4645 incrRefCount(sortval);
4646
4647 /* The SORT command has an SQL-alike syntax, parse it */
4648 while(j < c->argc) {
4649 int leftargs = c->argc-j-1;
4650 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4651 desc = 0;
4652 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4653 desc = 1;
4654 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4655 alpha = 1;
4656 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4657 limit_start = atoi(c->argv[j+1]->ptr);
4658 limit_count = atoi(c->argv[j+2]->ptr);
4659 j+=2;
4660 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4661 storekey = c->argv[j+1];
4662 j++;
4663 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4664 sortby = c->argv[j+1];
4665 /* If the BY pattern does not contain '*', i.e. it is constant,
4666 * we don't need to sort nor to lookup the weight keys. */
4667 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4668 j++;
4669 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4670 listAddNodeTail(operations,createSortOperation(
4671 REDIS_SORT_GET,c->argv[j+1]));
4672 getop++;
4673 j++;
4674 } else {
4675 decrRefCount(sortval);
4676 listRelease(operations);
4677 addReply(c,shared.syntaxerr);
4678 return;
4679 }
4680 j++;
4681 }
4682
4683 /* Load the sorting vector with all the objects to sort */
4684 vectorlen = (sortval->type == REDIS_LIST) ?
4685 listLength((list*)sortval->ptr) :
4686 dictSize((dict*)sortval->ptr);
4687 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4688 j = 0;
4689 if (sortval->type == REDIS_LIST) {
4690 list *list = sortval->ptr;
4691 listNode *ln;
4692
4693 listRewind(list);
4694 while((ln = listYield(list))) {
4695 robj *ele = ln->value;
4696 vector[j].obj = ele;
4697 vector[j].u.score = 0;
4698 vector[j].u.cmpobj = NULL;
4699 j++;
4700 }
4701 } else {
4702 dict *set = sortval->ptr;
4703 dictIterator *di;
4704 dictEntry *setele;
4705
4706 di = dictGetIterator(set);
4707 while((setele = dictNext(di)) != NULL) {
4708 vector[j].obj = dictGetEntryKey(setele);
4709 vector[j].u.score = 0;
4710 vector[j].u.cmpobj = NULL;
4711 j++;
4712 }
4713 dictReleaseIterator(di);
4714 }
4715 assert(j == vectorlen);
4716
4717 /* Now it's time to load the right scores in the sorting vector */
4718 if (dontsort == 0) {
4719 for (j = 0; j < vectorlen; j++) {
4720 if (sortby) {
4721 robj *byval;
4722
4723 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4724 if (!byval || byval->type != REDIS_STRING) continue;
4725 if (alpha) {
4726 if (byval->encoding == REDIS_ENCODING_RAW) {
4727 vector[j].u.cmpobj = byval;
4728 incrRefCount(byval);
4729 } else {
4730 vector[j].u.cmpobj = getDecodedObject(byval);
4731 }
4732 } else {
4733 if (byval->encoding == REDIS_ENCODING_RAW) {
4734 vector[j].u.score = strtod(byval->ptr,NULL);
4735 } else {
4736 if (byval->encoding == REDIS_ENCODING_INT) {
4737 vector[j].u.score = (long)byval->ptr;
4738 } else
4739 assert(1 != 1);
4740 }
4741 }
4742 } else {
4743 if (!alpha) {
4744 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4745 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4746 else {
4747 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4748 vector[j].u.score = (long) vector[j].obj->ptr;
4749 else
4750 assert(1 != 1);
4751 }
4752 }
4753 }
4754 }
4755 }
4756
4757 /* We are ready to sort the vector... perform a bit of sanity check
4758 * on the LIMIT option too. We'll use a partial version of quicksort. */
4759 start = (limit_start < 0) ? 0 : limit_start;
4760 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4761 if (start >= vectorlen) {
4762 start = vectorlen-1;
4763 end = vectorlen-2;
4764 }
4765 if (end >= vectorlen) end = vectorlen-1;
4766
4767 if (dontsort == 0) {
4768 server.sort_desc = desc;
4769 server.sort_alpha = alpha;
4770 server.sort_bypattern = sortby ? 1 : 0;
4771 if (sortby && (start != 0 || end != vectorlen-1))
4772 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4773 else
4774 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4775 }
4776
4777 /* Send command output to the output buffer, performing the specified
4778 * GET/DEL/INCR/DECR operations if any. */
4779 outputlen = getop ? getop*(end-start+1) : end-start+1;
4780 if (storekey == NULL) {
4781 /* STORE option not specified, sent the sorting result to client */
4782 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4783 for (j = start; j <= end; j++) {
4784 listNode *ln;
4785 if (!getop) {
4786 addReplyBulkLen(c,vector[j].obj);
4787 addReply(c,vector[j].obj);
4788 addReply(c,shared.crlf);
4789 }
4790 listRewind(operations);
4791 while((ln = listYield(operations))) {
4792 redisSortOperation *sop = ln->value;
4793 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4794 vector[j].obj);
4795
4796 if (sop->type == REDIS_SORT_GET) {
4797 if (!val || val->type != REDIS_STRING) {
4798 addReply(c,shared.nullbulk);
4799 } else {
4800 addReplyBulkLen(c,val);
4801 addReply(c,val);
4802 addReply(c,shared.crlf);
4803 }
4804 } else {
4805 assert(sop->type == REDIS_SORT_GET); /* always fails */
4806 }
4807 }
4808 }
4809 } else {
4810 robj *listObject = createListObject();
4811 list *listPtr = (list*) listObject->ptr;
4812
4813 /* STORE option specified, set the sorting result as a List object */
4814 for (j = start; j <= end; j++) {
4815 listNode *ln;
4816 if (!getop) {
4817 listAddNodeTail(listPtr,vector[j].obj);
4818 incrRefCount(vector[j].obj);
4819 }
4820 listRewind(operations);
4821 while((ln = listYield(operations))) {
4822 redisSortOperation *sop = ln->value;
4823 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4824 vector[j].obj);
4825
4826 if (sop->type == REDIS_SORT_GET) {
4827 if (!val || val->type != REDIS_STRING) {
4828 listAddNodeTail(listPtr,createStringObject("",0));
4829 } else {
4830 listAddNodeTail(listPtr,val);
4831 incrRefCount(val);
4832 }
4833 } else {
4834 assert(sop->type == REDIS_SORT_GET); /* always fails */
4835 }
4836 }
4837 }
4838 if (dictReplace(c->db->dict,storekey,listObject)) {
4839 incrRefCount(storekey);
4840 }
4841 /* Note: we add 1 because the DB is dirty anyway since even if the
4842 * SORT result is empty a new key is set and maybe the old content
4843 * replaced. */
4844 server.dirty += 1+outputlen;
4845 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4846 }
4847
4848 /* Cleanup */
4849 decrRefCount(sortval);
4850 listRelease(operations);
4851 for (j = 0; j < vectorlen; j++) {
4852 if (sortby && alpha && vector[j].u.cmpobj)
4853 decrRefCount(vector[j].u.cmpobj);
4854 }
4855 zfree(vector);
4856 }
4857
4858 static void infoCommand(redisClient *c) {
4859 sds info;
4860 time_t uptime = time(NULL)-server.stat_starttime;
4861 int j;
4862
4863 info = sdscatprintf(sdsempty(),
4864 "redis_version:%s\r\n"
4865 "arch_bits:%s\r\n"
4866 "uptime_in_seconds:%d\r\n"
4867 "uptime_in_days:%d\r\n"
4868 "connected_clients:%d\r\n"
4869 "connected_slaves:%d\r\n"
4870 "used_memory:%zu\r\n"
4871 "changes_since_last_save:%lld\r\n"
4872 "bgsave_in_progress:%d\r\n"
4873 "last_save_time:%d\r\n"
4874 "total_connections_received:%lld\r\n"
4875 "total_commands_processed:%lld\r\n"
4876 "role:%s\r\n"
4877 ,REDIS_VERSION,
4878 (sizeof(long) == 8) ? "64" : "32",
4879 uptime,
4880 uptime/(3600*24),
4881 listLength(server.clients)-listLength(server.slaves),
4882 listLength(server.slaves),
4883 server.usedmemory,
4884 server.dirty,
4885 server.bgsaveinprogress,
4886 server.lastsave,
4887 server.stat_numconnections,
4888 server.stat_numcommands,
4889 server.masterhost == NULL ? "master" : "slave"
4890 );
4891 if (server.masterhost) {
4892 info = sdscatprintf(info,
4893 "master_host:%s\r\n"
4894 "master_port:%d\r\n"
4895 "master_link_status:%s\r\n"
4896 "master_last_io_seconds_ago:%d\r\n"
4897 ,server.masterhost,
4898 server.masterport,
4899 (server.replstate == REDIS_REPL_CONNECTED) ?
4900 "up" : "down",
4901 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4902 );
4903 }
4904 for (j = 0; j < server.dbnum; j++) {
4905 long long keys, vkeys;
4906
4907 keys = dictSize(server.db[j].dict);
4908 vkeys = dictSize(server.db[j].expires);
4909 if (keys || vkeys) {
4910 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4911 j, keys, vkeys);
4912 }
4913 }
4914 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4915 addReplySds(c,info);
4916 addReply(c,shared.crlf);
4917 }
4918
4919 static void monitorCommand(redisClient *c) {
4920 /* ignore MONITOR if aleady slave or in monitor mode */
4921 if (c->flags & REDIS_SLAVE) return;
4922
4923 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4924 c->slaveseldb = 0;
4925 listAddNodeTail(server.monitors,c);
4926 addReply(c,shared.ok);
4927 }
4928
4929 /* ================================= Expire ================================= */
4930 static int removeExpire(redisDb *db, robj *key) {
4931 if (dictDelete(db->expires,key) == DICT_OK) {
4932 return 1;
4933 } else {
4934 return 0;
4935 }
4936 }
4937
4938 static int setExpire(redisDb *db, robj *key, time_t when) {
4939 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4940 return 0;
4941 } else {
4942 incrRefCount(key);
4943 return 1;
4944 }
4945 }
4946
4947 /* Return the expire time of the specified key, or -1 if no expire
4948 * is associated with this key (i.e. the key is non volatile) */
4949 static time_t getExpire(redisDb *db, robj *key) {
4950 dictEntry *de;
4951
4952 /* No expire? return ASAP */
4953 if (dictSize(db->expires) == 0 ||
4954 (de = dictFind(db->expires,key)) == NULL) return -1;
4955
4956 return (time_t) dictGetEntryVal(de);
4957 }
4958
4959 static int expireIfNeeded(redisDb *db, robj *key) {
4960 time_t when;
4961 dictEntry *de;
4962
4963 /* No expire? return ASAP */
4964 if (dictSize(db->expires) == 0 ||
4965 (de = dictFind(db->expires,key)) == NULL) return 0;
4966
4967 /* Lookup the expire */
4968 when = (time_t) dictGetEntryVal(de);
4969 if (time(NULL) <= when) return 0;
4970
4971 /* Delete the key */
4972 dictDelete(db->expires,key);
4973 return dictDelete(db->dict,key) == DICT_OK;
4974 }
4975
4976 static int deleteIfVolatile(redisDb *db, robj *key) {
4977 dictEntry *de;
4978
4979 /* No expire? return ASAP */
4980 if (dictSize(db->expires) == 0 ||
4981 (de = dictFind(db->expires,key)) == NULL) return 0;
4982
4983 /* Delete the key */
4984 server.dirty++;
4985 dictDelete(db->expires,key);
4986 return dictDelete(db->dict,key) == DICT_OK;
4987 }
4988
4989 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4990 dictEntry *de;
4991
4992 de = dictFind(c->db->dict,key);
4993 if (de == NULL) {
4994 addReply(c,shared.czero);
4995 return;
4996 }
4997 if (seconds < 0) {
4998 if (deleteKey(c->db,key)) server.dirty++;
4999 addReply(c, shared.cone);
5000 return;
5001 } else {
5002 time_t when = time(NULL)+seconds;
5003 if (setExpire(c->db,key,when)) {
5004 addReply(c,shared.cone);
5005 server.dirty++;
5006 } else {
5007 addReply(c,shared.czero);
5008 }
5009 return;
5010 }
5011 }
5012
5013 static void expireCommand(redisClient *c) {
5014 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5015 }
5016
5017 static void expireatCommand(redisClient *c) {
5018 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5019 }
5020
5021 static void ttlCommand(redisClient *c) {
5022 time_t expire;
5023 int ttl = -1;
5024
5025 expire = getExpire(c->db,c->argv[1]);
5026 if (expire != -1) {
5027 ttl = (int) (expire-time(NULL));
5028 if (ttl < 0) ttl = -1;
5029 }
5030 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5031 }
5032
5033 static void msetGenericCommand(redisClient *c, int nx) {
5034 int j;
5035
5036 if ((c->argc % 2) == 0) {
5037 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
5038 return;
5039 }
5040 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
5041 * set nothing at all if at least one already key exists. */
5042 if (nx) {
5043 for (j = 1; j < c->argc; j += 2) {
5044 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
5045 addReply(c, shared.czero);
5046 return;
5047 }
5048 }
5049 }
5050
5051 for (j = 1; j < c->argc; j += 2) {
5052 int retval;
5053
5054 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
5055 if (retval == DICT_ERR) {
5056 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
5057 incrRefCount(c->argv[j+1]);
5058 } else {
5059 incrRefCount(c->argv[j]);
5060 incrRefCount(c->argv[j+1]);
5061 }
5062 removeExpire(c->db,c->argv[j]);
5063 }
5064 server.dirty += (c->argc-1)/2;
5065 addReply(c, nx ? shared.cone : shared.ok);
5066 }
5067
5068 static void msetCommand(redisClient *c) {
5069 msetGenericCommand(c,0);
5070 }
5071
5072 static void msetnxCommand(redisClient *c) {
5073 msetGenericCommand(c,1);
5074 }
5075
5076 /* =============================== Replication ============================= */
5077
5078 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5079 ssize_t nwritten, ret = size;
5080 time_t start = time(NULL);
5081
5082 timeout++;
5083 while(size) {
5084 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5085 nwritten = write(fd,ptr,size);
5086 if (nwritten == -1) return -1;
5087 ptr += nwritten;
5088 size -= nwritten;
5089 }
5090 if ((time(NULL)-start) > timeout) {
5091 errno = ETIMEDOUT;
5092 return -1;
5093 }
5094 }
5095 return ret;
5096 }
5097
5098 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5099 ssize_t nread, totread = 0;
5100 time_t start = time(NULL);
5101
5102 timeout++;
5103 while(size) {
5104 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5105 nread = read(fd,ptr,size);
5106 if (nread == -1) return -1;
5107 ptr += nread;
5108 size -= nread;
5109 totread += nread;
5110 }
5111 if ((time(NULL)-start) > timeout) {
5112 errno = ETIMEDOUT;
5113 return -1;
5114 }
5115 }
5116 return totread;
5117 }
5118
5119 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5120 ssize_t nread = 0;
5121
5122 size--;
5123 while(size) {
5124 char c;
5125
5126 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5127 if (c == '\n') {
5128 *ptr = '\0';
5129 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5130 return nread;
5131 } else {
5132 *ptr++ = c;
5133 *ptr = '\0';
5134 nread++;
5135 }
5136 }
5137 return nread;
5138 }
5139
5140 static void syncCommand(redisClient *c) {
5141 /* ignore SYNC if aleady slave or in monitor mode */
5142 if (c->flags & REDIS_SLAVE) return;
5143
5144 /* SYNC can't be issued when the server has pending data to send to
5145 * the client about already issued commands. We need a fresh reply
5146 * buffer registering the differences between the BGSAVE and the current
5147 * dataset, so that we can copy to other slaves if needed. */
5148 if (listLength(c->reply) != 0) {
5149 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5150 return;
5151 }
5152
5153 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5154 /* Here we need to check if there is a background saving operation
5155 * in progress, or if it is required to start one */
5156 if (server.bgsaveinprogress) {
5157 /* Ok a background save is in progress. Let's check if it is a good
5158 * one for replication, i.e. if there is another slave that is
5159 * registering differences since the server forked to save */
5160 redisClient *slave;
5161 listNode *ln;
5162
5163 listRewind(server.slaves);
5164 while((ln = listYield(server.slaves))) {
5165 slave = ln->value;
5166 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5167 }
5168 if (ln) {
5169 /* Perfect, the server is already registering differences for
5170 * another slave. Set the right state, and copy the buffer. */
5171 listRelease(c->reply);
5172 c->reply = listDup(slave->reply);
5173 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5174 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5175 } else {
5176 /* No way, we need to wait for the next BGSAVE in order to
5177 * register differences */
5178 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5179 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5180 }
5181 } else {
5182 /* Ok we don't have a BGSAVE in progress, let's start one */
5183 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5184 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5185 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5186 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5187 return;
5188 }
5189 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5190 }
5191 c->repldbfd = -1;
5192 c->flags |= REDIS_SLAVE;
5193 c->slaveseldb = 0;
5194 listAddNodeTail(server.slaves,c);
5195 return;
5196 }
5197
5198 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5199 redisClient *slave = privdata;
5200 REDIS_NOTUSED(el);
5201 REDIS_NOTUSED(mask);
5202 char buf[REDIS_IOBUF_LEN];
5203 ssize_t nwritten, buflen;
5204
5205 if (slave->repldboff == 0) {
5206 /* Write the bulk write count before to transfer the DB. In theory here
5207 * we don't know how much room there is in the output buffer of the
5208 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5209 * operations) will never be smaller than the few bytes we need. */
5210 sds bulkcount;
5211
5212 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5213 slave->repldbsize);
5214 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5215 {
5216 sdsfree(bulkcount);
5217 freeClient(slave);
5218 return;
5219 }
5220 sdsfree(bulkcount);
5221 }
5222 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5223 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5224 if (buflen <= 0) {
5225 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5226 (buflen == 0) ? "premature EOF" : strerror(errno));
5227 freeClient(slave);
5228 return;
5229 }
5230 if ((nwritten = write(fd,buf,buflen)) == -1) {
5231 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5232 strerror(errno));
5233 freeClient(slave);
5234 return;
5235 }
5236 slave->repldboff += nwritten;
5237 if (slave->repldboff == slave->repldbsize) {
5238 close(slave->repldbfd);
5239 slave->repldbfd = -1;
5240 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5241 slave->replstate = REDIS_REPL_ONLINE;
5242 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5243 sendReplyToClient, slave, NULL) == AE_ERR) {
5244 freeClient(slave);
5245 return;
5246 }
5247 addReplySds(slave,sdsempty());
5248 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5249 }
5250 }
5251
5252 /* This function is called at the end of every backgrond saving.
5253 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5254 * otherwise REDIS_ERR is passed to the function.
5255 *
5256 * The goal of this function is to handle slaves waiting for a successful
5257 * background saving in order to perform non-blocking synchronization. */
5258 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5259 listNode *ln;
5260 int startbgsave = 0;
5261
5262 listRewind(server.slaves);
5263 while((ln = listYield(server.slaves))) {
5264 redisClient *slave = ln->value;
5265
5266 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5267 startbgsave = 1;
5268 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5269 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5270 struct redis_stat buf;
5271
5272 if (bgsaveerr != REDIS_OK) {
5273 freeClient(slave);
5274 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5275 continue;
5276 }
5277 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5278 redis_fstat(slave->repldbfd,&buf) == -1) {
5279 freeClient(slave);
5280 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5281 continue;
5282 }
5283 slave->repldboff = 0;
5284 slave->repldbsize = buf.st_size;
5285 slave->replstate = REDIS_REPL_SEND_BULK;
5286 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5287 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5288 freeClient(slave);
5289 continue;
5290 }
5291 }
5292 }
5293 if (startbgsave) {
5294 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5295 listRewind(server.slaves);
5296 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5297 while((ln = listYield(server.slaves))) {
5298 redisClient *slave = ln->value;
5299
5300 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5301 freeClient(slave);
5302 }
5303 }
5304 }
5305 }
5306
5307 static int syncWithMaster(void) {
5308 char buf[1024], tmpfile[256], authcmd[1024];
5309 int dumpsize;
5310 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5311 int dfd;
5312
5313 if (fd == -1) {
5314 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5315 strerror(errno));
5316 return REDIS_ERR;
5317 }
5318
5319 /* AUTH with the master if required. */
5320 if(server.masterauth) {
5321 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5322 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5323 close(fd);
5324 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5325 strerror(errno));
5326 return REDIS_ERR;
5327 }
5328 /* Read the AUTH result. */
5329 if (syncReadLine(fd,buf,1024,3600) == -1) {
5330 close(fd);
5331 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5332 strerror(errno));
5333 return REDIS_ERR;
5334 }
5335 if (buf[0] != '+') {
5336 close(fd);
5337 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5338 return REDIS_ERR;
5339 }
5340 }
5341
5342 /* Issue the SYNC command */
5343 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5344 close(fd);
5345 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5346 strerror(errno));
5347 return REDIS_ERR;
5348 }
5349 /* Read the bulk write count */
5350 if (syncReadLine(fd,buf,1024,3600) == -1) {
5351 close(fd);
5352 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5353 strerror(errno));
5354 return REDIS_ERR;
5355 }
5356 if (buf[0] != '$') {
5357 close(fd);
5358 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5359 return REDIS_ERR;
5360 }
5361 dumpsize = atoi(buf+1);
5362 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5363 /* Read the bulk write data on a temp file */
5364 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5365 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5366 if (dfd == -1) {
5367 close(fd);
5368 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5369 return REDIS_ERR;
5370 }
5371 while(dumpsize) {
5372 int nread, nwritten;
5373
5374 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5375 if (nread == -1) {
5376 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5377 strerror(errno));
5378 close(fd);
5379 close(dfd);
5380 return REDIS_ERR;
5381 }
5382 nwritten = write(dfd,buf,nread);
5383 if (nwritten == -1) {
5384 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5385 close(fd);
5386 close(dfd);
5387 return REDIS_ERR;
5388 }
5389 dumpsize -= nread;
5390 }
5391 close(dfd);
5392 if (rename(tmpfile,server.dbfilename) == -1) {
5393 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5394 unlink(tmpfile);
5395 close(fd);
5396 return REDIS_ERR;
5397 }
5398 emptyDb();
5399 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5400 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5401 close(fd);
5402 return REDIS_ERR;
5403 }
5404 server.master = createClient(fd);
5405 server.master->flags |= REDIS_MASTER;
5406 server.replstate = REDIS_REPL_CONNECTED;
5407 return REDIS_OK;
5408 }
5409
5410 static void slaveofCommand(redisClient *c) {
5411 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5412 !strcasecmp(c->argv[2]->ptr,"one")) {
5413 if (server.masterhost) {
5414 sdsfree(server.masterhost);
5415 server.masterhost = NULL;
5416 if (server.master) freeClient(server.master);
5417 server.replstate = REDIS_REPL_NONE;
5418 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5419 }
5420 } else {
5421 sdsfree(server.masterhost);
5422 server.masterhost = sdsdup(c->argv[1]->ptr);
5423 server.masterport = atoi(c->argv[2]->ptr);
5424 if (server.master) freeClient(server.master);
5425 server.replstate = REDIS_REPL_CONNECT;
5426 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5427 server.masterhost, server.masterport);
5428 }
5429 addReply(c,shared.ok);
5430 }
5431
5432 /* ============================ Maxmemory directive ======================== */
5433
5434 /* This function gets called when 'maxmemory' is set on the config file to limit
5435 * the max memory used by the server, and we are out of memory.
5436 * This function will try to, in order:
5437 *
5438 * - Free objects from the free list
5439 * - Try to remove keys with an EXPIRE set
5440 *
5441 * It is not possible to free enough memory to reach used-memory < maxmemory
5442 * the server will start refusing commands that will enlarge even more the
5443 * memory usage.
5444 */
5445 static void freeMemoryIfNeeded(void) {
5446 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5447 if (listLength(server.objfreelist)) {
5448 robj *o;
5449
5450 listNode *head = listFirst(server.objfreelist);
5451 o = listNodeValue(head);
5452 listDelNode(server.objfreelist,head);
5453 zfree(o);
5454 } else {
5455 int j, k, freed = 0;
5456
5457 for (j = 0; j < server.dbnum; j++) {
5458 int minttl = -1;
5459 robj *minkey = NULL;
5460 struct dictEntry *de;
5461
5462 if (dictSize(server.db[j].expires)) {
5463 freed = 1;
5464 /* From a sample of three keys drop the one nearest to
5465 * the natural expire */
5466 for (k = 0; k < 3; k++) {
5467 time_t t;
5468
5469 de = dictGetRandomKey(server.db[j].expires);
5470 t = (time_t) dictGetEntryVal(de);
5471 if (minttl == -1 || t < minttl) {
5472 minkey = dictGetEntryKey(de);
5473 minttl = t;
5474 }
5475 }
5476 deleteKey(server.db+j,minkey);
5477 }
5478 }
5479 if (!freed) return; /* nothing to free... */
5480 }
5481 }
5482 }
5483
5484 /* ============================== Append Only file ========================== */
5485
5486 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5487 sds buf = sdsempty();
5488 int j;
5489 ssize_t nwritten;
5490 time_t now;
5491 robj *tmpargv[3];
5492
5493 /* The DB this command was targetting is not the same as the last command
5494 * we appendend. To issue a SELECT command is needed. */
5495 if (dictid != server.appendseldb) {
5496 char seldb[64];
5497
5498 snprintf(seldb,sizeof(seldb),"%d",dictid);
5499 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5500 strlen(seldb),seldb);
5501 server.appendseldb = dictid;
5502 }
5503
5504 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5505 * EXPIREs into EXPIREATs calls */
5506 if (cmd->proc == expireCommand) {
5507 long when;
5508
5509 tmpargv[0] = createStringObject("EXPIREAT",8);
5510 tmpargv[1] = argv[1];
5511 incrRefCount(argv[1]);
5512 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5513 tmpargv[2] = createObject(REDIS_STRING,
5514 sdscatprintf(sdsempty(),"%ld",when));
5515 argv = tmpargv;
5516 }
5517
5518 /* Append the actual command */
5519 buf = sdscatprintf(buf,"*%d\r\n",argc);
5520 for (j = 0; j < argc; j++) {
5521 robj *o = argv[j];
5522
5523 if (o->encoding != REDIS_ENCODING_RAW)
5524 o = getDecodedObject(o);
5525 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5526 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5527 buf = sdscatlen(buf,"\r\n",2);
5528 if (o != argv[j])
5529 decrRefCount(o);
5530 }
5531
5532 /* Free the objects from the modified argv for EXPIREAT */
5533 if (cmd->proc == expireCommand) {
5534 for (j = 0; j < 3; j++)
5535 decrRefCount(argv[j]);
5536 }
5537
5538 /* We want to perform a single write. This should be guaranteed atomic
5539 * at least if the filesystem we are writing is a real physical one.
5540 * While this will save us against the server being killed I don't think
5541 * there is much to do about the whole server stopping for power problems
5542 * or alike */
5543 nwritten = write(server.appendfd,buf,sdslen(buf));
5544 if (nwritten != (signed)sdslen(buf)) {
5545 /* Ooops, we are in troubles. The best thing to do for now is
5546 * to simply exit instead to give the illusion that everything is
5547 * working as expected. */
5548 if (nwritten == -1) {
5549 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5550 } else {
5551 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5552 }
5553 exit(1);
5554 }
5555 now = time(NULL);
5556 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5557 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5558 now-server.lastfsync > 1))
5559 {
5560 fsync(server.appendfd); /* Let's try to get this data on the disk */
5561 server.lastfsync = now;
5562 }
5563 }
5564
5565 /* In Redis commands are always executed in the context of a client, so in
5566 * order to load the append only file we need to create a fake client. */
5567 static struct redisClient *createFakeClient(void) {
5568 struct redisClient *c = zmalloc(sizeof(*c));
5569
5570 selectDb(c,0);
5571 c->fd = -1;
5572 c->querybuf = sdsempty();
5573 c->argc = 0;
5574 c->argv = NULL;
5575 c->flags = 0;
5576 /* We set the fake client as a slave waiting for the synchronization
5577 * so that Redis will not try to send replies to this client. */
5578 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5579 c->reply = listCreate();
5580 listSetFreeMethod(c->reply,decrRefCount);
5581 listSetDupMethod(c->reply,dupClientReplyValue);
5582 return c;
5583 }
5584
5585 static void freeFakeClient(struct redisClient *c) {
5586 sdsfree(c->querybuf);
5587 listRelease(c->reply);
5588 zfree(c);
5589 }
5590
5591 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5592 * error (the append only file is zero-length) REDIS_ERR is returned. On
5593 * fatal error an error message is logged and the program exists. */
5594 int loadAppendOnlyFile(char *filename) {
5595 struct redisClient *fakeClient;
5596 FILE *fp = fopen(filename,"r");
5597 struct redis_stat sb;
5598
5599 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5600 return REDIS_ERR;
5601
5602 if (fp == NULL) {
5603 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5604 exit(1);
5605 }
5606
5607 fakeClient = createFakeClient();
5608 while(1) {
5609 int argc, j;
5610 unsigned long len;
5611 robj **argv;
5612 char buf[128];
5613 sds argsds;
5614 struct redisCommand *cmd;
5615
5616 if (fgets(buf,sizeof(buf),fp) == NULL) {
5617 if (feof(fp))
5618 break;
5619 else
5620 goto readerr;
5621 }
5622 if (buf[0] != '*') goto fmterr;
5623 argc = atoi(buf+1);
5624 argv = zmalloc(sizeof(robj*)*argc);
5625 for (j = 0; j < argc; j++) {
5626 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5627 if (buf[0] != '$') goto fmterr;
5628 len = strtol(buf+1,NULL,10);
5629 argsds = sdsnewlen(NULL,len);
5630 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5631 argv[j] = createObject(REDIS_STRING,argsds);
5632 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5633 }
5634
5635 /* Command lookup */
5636 cmd = lookupCommand(argv[0]->ptr);
5637 if (!cmd) {
5638 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5639 exit(1);
5640 }
5641 /* Try object sharing and encoding */
5642 if (server.shareobjects) {
5643 int j;
5644 for(j = 1; j < argc; j++)
5645 argv[j] = tryObjectSharing(argv[j]);
5646 }
5647 if (cmd->flags & REDIS_CMD_BULK)
5648 tryObjectEncoding(argv[argc-1]);
5649 /* Run the command in the context of a fake client */
5650 fakeClient->argc = argc;
5651 fakeClient->argv = argv;
5652 cmd->proc(fakeClient);
5653 /* Discard the reply objects list from the fake client */
5654 while(listLength(fakeClient->reply))
5655 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5656 /* Clean up, ready for the next command */
5657 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5658 zfree(argv);
5659 }
5660 fclose(fp);
5661 freeFakeClient(fakeClient);
5662 return REDIS_OK;
5663
5664 readerr:
5665 if (feof(fp)) {
5666 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5667 } else {
5668 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5669 }
5670 exit(1);
5671 fmterr:
5672 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5673 exit(1);
5674 }
5675
5676 /* ================================= Debugging ============================== */
5677
5678 static void debugCommand(redisClient *c) {
5679 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5680 *((char*)-1) = 'x';
5681 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5682 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5683 robj *key, *val;
5684
5685 if (!de) {
5686 addReply(c,shared.nokeyerr);
5687 return;
5688 }
5689 key = dictGetEntryKey(de);
5690 val = dictGetEntryVal(de);
5691 addReplySds(c,sdscatprintf(sdsempty(),
5692 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5693 key, key->refcount, val, val->refcount, val->encoding));
5694 } else {
5695 addReplySds(c,sdsnew(
5696 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5697 }
5698 }
5699
5700 /* =================================== Main! ================================ */
5701
5702 #ifdef __linux__
5703 int linuxOvercommitMemoryValue(void) {
5704 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5705 char buf[64];
5706
5707 if (!fp) return -1;
5708 if (fgets(buf,64,fp) == NULL) {
5709 fclose(fp);
5710 return -1;
5711 }
5712 fclose(fp);
5713
5714 return atoi(buf);
5715 }
5716
5717 void linuxOvercommitMemoryWarning(void) {
5718 if (linuxOvercommitMemoryValue() == 0) {
5719 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5720 }
5721 }
5722 #endif /* __linux__ */
5723
5724 static void daemonize(void) {
5725 int fd;
5726 FILE *fp;
5727
5728 if (fork() != 0) exit(0); /* parent exits */
5729 setsid(); /* create a new session */
5730
5731 /* Every output goes to /dev/null. If Redis is daemonized but
5732 * the 'logfile' is set to 'stdout' in the configuration file
5733 * it will not log at all. */
5734 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5735 dup2(fd, STDIN_FILENO);
5736 dup2(fd, STDOUT_FILENO);
5737 dup2(fd, STDERR_FILENO);
5738 if (fd > STDERR_FILENO) close(fd);
5739 }
5740 /* Try to write the pid file */
5741 fp = fopen(server.pidfile,"w");
5742 if (fp) {
5743 fprintf(fp,"%d\n",getpid());
5744 fclose(fp);
5745 }
5746 }
5747
5748 int main(int argc, char **argv) {
5749 initServerConfig();
5750 if (argc == 2) {
5751 resetServerSaveParams();
5752 loadServerConfig(argv[1]);
5753 } else if (argc > 2) {
5754 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5755 exit(1);
5756 } else {
5757 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5758 }
5759 initServer();
5760 if (server.daemonize) daemonize();
5761 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5762 #ifdef __linux__
5763 linuxOvercommitMemoryWarning();
5764 #endif
5765 if (server.appendonly) {
5766 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5767 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5768 } else {
5769 if (rdbLoad(server.dbfilename) == REDIS_OK)
5770 redisLog(REDIS_NOTICE,"DB loaded from disk");
5771 }
5772 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5773 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5774 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5775 aeMain(server.el);
5776 aeDeleteEventLoop(server.el);
5777 return 0;
5778 }
5779
5780 /* ============================= Backtrace support ========================= */
5781
5782 #ifdef HAVE_BACKTRACE
5783 static char *findFuncName(void *pointer, unsigned long *offset);
5784
5785 static void *getMcontextEip(ucontext_t *uc) {
5786 #if defined(__FreeBSD__)
5787 return (void*) uc->uc_mcontext.mc_eip;
5788 #elif defined(__dietlibc__)
5789 return (void*) uc->uc_mcontext.eip;
5790 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5791 return (void*) uc->uc_mcontext->__ss.__eip;
5792 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5793 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5794 return (void*) uc->uc_mcontext->__ss.__rip;
5795 #else
5796 return (void*) uc->uc_mcontext->__ss.__eip;
5797 #endif
5798 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5799 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5800 #elif defined(__ia64__) /* Linux IA64 */
5801 return (void*) uc->uc_mcontext.sc_ip;
5802 #else
5803 return NULL;
5804 #endif
5805 }
5806
5807 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5808 void *trace[100];
5809 char **messages = NULL;
5810 int i, trace_size = 0;
5811 unsigned long offset=0;
5812 time_t uptime = time(NULL)-server.stat_starttime;
5813 ucontext_t *uc = (ucontext_t*) secret;
5814 REDIS_NOTUSED(info);
5815
5816 redisLog(REDIS_WARNING,
5817 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5818 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5819 "redis_version:%s; "
5820 "uptime_in_seconds:%d; "
5821 "connected_clients:%d; "
5822 "connected_slaves:%d; "
5823 "used_memory:%zu; "
5824 "changes_since_last_save:%lld; "
5825 "bgsave_in_progress:%d; "
5826 "last_save_time:%d; "
5827 "total_connections_received:%lld; "
5828 "total_commands_processed:%lld; "
5829 "role:%s;"
5830 ,REDIS_VERSION,
5831 uptime,
5832 listLength(server.clients)-listLength(server.slaves),
5833 listLength(server.slaves),
5834 server.usedmemory,
5835 server.dirty,
5836 server.bgsaveinprogress,
5837 server.lastsave,
5838 server.stat_numconnections,
5839 server.stat_numcommands,
5840 server.masterhost == NULL ? "master" : "slave"
5841 ));
5842
5843 trace_size = backtrace(trace, 100);
5844 /* overwrite sigaction with caller's address */
5845 if (getMcontextEip(uc) != NULL) {
5846 trace[1] = getMcontextEip(uc);
5847 }
5848 messages = backtrace_symbols(trace, trace_size);
5849
5850 for (i=1; i<trace_size; ++i) {
5851 char *fn = findFuncName(trace[i], &offset), *p;
5852
5853 p = strchr(messages[i],'+');
5854 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5855 redisLog(REDIS_WARNING,"%s", messages[i]);
5856 } else {
5857 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5858 }
5859 }
5860 free(messages);
5861 exit(0);
5862 }
5863
5864 static void setupSigSegvAction(void) {
5865 struct sigaction act;
5866
5867 sigemptyset (&act.sa_mask);
5868 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5869 * is used. Otherwise, sa_handler is used */
5870 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5871 act.sa_sigaction = segvHandler;
5872 sigaction (SIGSEGV, &act, NULL);
5873 sigaction (SIGBUS, &act, NULL);
5874 sigaction (SIGFPE, &act, NULL);
5875 sigaction (SIGILL, &act, NULL);
5876 sigaction (SIGBUS, &act, NULL);
5877 return;
5878 }
5879
5880 #include "staticsymbols.h"
5881 /* This function try to convert a pointer into a function name. It's used in
5882 * oreder to provide a backtrace under segmentation fault that's able to
5883 * display functions declared as static (otherwise the backtrace is useless). */
5884 static char *findFuncName(void *pointer, unsigned long *offset){
5885 int i, ret = -1;
5886 unsigned long off, minoff = 0;
5887
5888 /* Try to match against the Symbol with the smallest offset */
5889 for (i=0; symsTable[i].pointer; i++) {
5890 unsigned long lp = (unsigned long) pointer;
5891
5892 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5893 off=lp-symsTable[i].pointer;
5894 if (ret < 0 || off < minoff) {
5895 minoff=off;
5896 ret=i;
5897 }
5898 }
5899 }
5900 if (ret == -1) return NULL;
5901 *offset = minoff;
5902 return symsTable[ret].name;
5903 }
5904 #else /* HAVE_BACKTRACE */
5905 static void setupSigSegvAction(void) {
5906 }
5907 #endif /* HAVE_BACKTRACE */
5908
5909
5910
5911 /* The End */
5912
5913
5914