]> git.saurik.com Git - redis.git/blob - redis.c
ca88add3d8bc34c6b1ea096009ff7b6271b43050
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.07"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 pid_t bgsavechildpid;
279 pid_t bgrewritechildpid;
280 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
281 struct saveparam *saveparams;
282 int saveparamslen;
283 char *logfile;
284 char *bindaddr;
285 char *dbfilename;
286 char *appendfilename;
287 char *requirepass;
288 int shareobjects;
289 /* Replication related */
290 int isslave;
291 char *masterauth;
292 char *masterhost;
293 int masterport;
294 redisClient *master; /* client that is master for this slave */
295 int replstate;
296 unsigned int maxclients;
297 unsigned long maxmemory;
298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
300 int sort_desc;
301 int sort_alpha;
302 int sort_bypattern;
303 };
304
305 typedef void redisCommandProc(redisClient *c);
306 struct redisCommand {
307 char *name;
308 redisCommandProc *proc;
309 int arity;
310 int flags;
311 };
312
313 struct redisFunctionSym {
314 char *name;
315 unsigned long pointer;
316 };
317
318 typedef struct _redisSortObject {
319 robj *obj;
320 union {
321 double score;
322 robj *cmpobj;
323 } u;
324 } redisSortObject;
325
326 typedef struct _redisSortOperation {
327 int type;
328 robj *pattern;
329 } redisSortOperation;
330
331 /* ZSETs use a specialized version of Skiplists */
332
333 typedef struct zskiplistNode {
334 struct zskiplistNode **forward;
335 struct zskiplistNode *backward;
336 double score;
337 robj *obj;
338 } zskiplistNode;
339
340 typedef struct zskiplist {
341 struct zskiplistNode *header, *tail;
342 unsigned long length;
343 int level;
344 } zskiplist;
345
346 typedef struct zset {
347 dict *dict;
348 zskiplist *zsl;
349 } zset;
350
351 /* Our shared "common" objects */
352
353 struct sharedObjectsStruct {
354 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
355 *colon, *nullbulk, *nullmultibulk,
356 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
357 *outofrangeerr, *plus,
358 *select0, *select1, *select2, *select3, *select4,
359 *select5, *select6, *select7, *select8, *select9;
360 } shared;
361
362 /* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
365
366 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
367
368 /*================================ Prototypes =============================== */
369
370 static void freeStringObject(robj *o);
371 static void freeListObject(robj *o);
372 static void freeSetObject(robj *o);
373 static void decrRefCount(void *o);
374 static robj *createObject(int type, void *ptr);
375 static void freeClient(redisClient *c);
376 static int rdbLoad(char *filename);
377 static void addReply(redisClient *c, robj *obj);
378 static void addReplySds(redisClient *c, sds s);
379 static void incrRefCount(robj *o);
380 static int rdbSaveBackground(char *filename);
381 static robj *createStringObject(char *ptr, size_t len);
382 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
384 static int syncWithMaster(void);
385 static robj *tryObjectSharing(robj *o);
386 static int tryObjectEncoding(robj *o);
387 static robj *getDecodedObject(robj *o);
388 static int removeExpire(redisDb *db, robj *key);
389 static int expireIfNeeded(redisDb *db, robj *key);
390 static int deleteIfVolatile(redisDb *db, robj *key);
391 static int deleteKey(redisDb *db, robj *key);
392 static time_t getExpire(redisDb *db, robj *key);
393 static int setExpire(redisDb *db, robj *key, time_t when);
394 static void updateSlavesWaitingBgsave(int bgsaveerr);
395 static void freeMemoryIfNeeded(void);
396 static int processCommand(redisClient *c);
397 static void setupSigSegvAction(void);
398 static void rdbRemoveTempFile(pid_t childpid);
399 static void aofRemoveTempFile(pid_t childpid);
400 static size_t stringObjectLen(robj *o);
401 static void processInputBuffer(redisClient *c);
402 static zskiplist *zslCreate(void);
403 static void zslFree(zskiplist *zsl);
404 static void zslInsert(zskiplist *zsl, double score, robj *obj);
405 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
406
407 static void authCommand(redisClient *c);
408 static void pingCommand(redisClient *c);
409 static void echoCommand(redisClient *c);
410 static void setCommand(redisClient *c);
411 static void setnxCommand(redisClient *c);
412 static void getCommand(redisClient *c);
413 static void delCommand(redisClient *c);
414 static void existsCommand(redisClient *c);
415 static void incrCommand(redisClient *c);
416 static void decrCommand(redisClient *c);
417 static void incrbyCommand(redisClient *c);
418 static void decrbyCommand(redisClient *c);
419 static void selectCommand(redisClient *c);
420 static void randomkeyCommand(redisClient *c);
421 static void keysCommand(redisClient *c);
422 static void dbsizeCommand(redisClient *c);
423 static void lastsaveCommand(redisClient *c);
424 static void saveCommand(redisClient *c);
425 static void bgsaveCommand(redisClient *c);
426 static void bgrewriteaofCommand(redisClient *c);
427 static void shutdownCommand(redisClient *c);
428 static void moveCommand(redisClient *c);
429 static void renameCommand(redisClient *c);
430 static void renamenxCommand(redisClient *c);
431 static void lpushCommand(redisClient *c);
432 static void rpushCommand(redisClient *c);
433 static void lpopCommand(redisClient *c);
434 static void rpopCommand(redisClient *c);
435 static void llenCommand(redisClient *c);
436 static void lindexCommand(redisClient *c);
437 static void lrangeCommand(redisClient *c);
438 static void ltrimCommand(redisClient *c);
439 static void typeCommand(redisClient *c);
440 static void lsetCommand(redisClient *c);
441 static void saddCommand(redisClient *c);
442 static void sremCommand(redisClient *c);
443 static void smoveCommand(redisClient *c);
444 static void sismemberCommand(redisClient *c);
445 static void scardCommand(redisClient *c);
446 static void spopCommand(redisClient *c);
447 static void srandmemberCommand(redisClient *c);
448 static void sinterCommand(redisClient *c);
449 static void sinterstoreCommand(redisClient *c);
450 static void sunionCommand(redisClient *c);
451 static void sunionstoreCommand(redisClient *c);
452 static void sdiffCommand(redisClient *c);
453 static void sdiffstoreCommand(redisClient *c);
454 static void syncCommand(redisClient *c);
455 static void flushdbCommand(redisClient *c);
456 static void flushallCommand(redisClient *c);
457 static void sortCommand(redisClient *c);
458 static void lremCommand(redisClient *c);
459 static void rpoplpushcommand(redisClient *c);
460 static void infoCommand(redisClient *c);
461 static void mgetCommand(redisClient *c);
462 static void monitorCommand(redisClient *c);
463 static void expireCommand(redisClient *c);
464 static void expireatCommand(redisClient *c);
465 static void getsetCommand(redisClient *c);
466 static void ttlCommand(redisClient *c);
467 static void slaveofCommand(redisClient *c);
468 static void debugCommand(redisClient *c);
469 static void msetCommand(redisClient *c);
470 static void msetnxCommand(redisClient *c);
471 static void zaddCommand(redisClient *c);
472 static void zincrbyCommand(redisClient *c);
473 static void zrangeCommand(redisClient *c);
474 static void zrangebyscoreCommand(redisClient *c);
475 static void zrevrangeCommand(redisClient *c);
476 static void zcardCommand(redisClient *c);
477 static void zremCommand(redisClient *c);
478 static void zscoreCommand(redisClient *c);
479 static void zremrangebyscoreCommand(redisClient *c);
480
481 /*================================= Globals ================================= */
482
483 /* Global vars */
484 static struct redisServer server; /* server global state */
485 static struct redisCommand cmdTable[] = {
486 {"get",getCommand,2,REDIS_CMD_INLINE},
487 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"del",delCommand,-2,REDIS_CMD_INLINE},
490 {"exists",existsCommand,2,REDIS_CMD_INLINE},
491 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
494 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
496 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
497 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
498 {"llen",llenCommand,2,REDIS_CMD_INLINE},
499 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
500 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
502 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
503 {"lrem",lremCommand,4,REDIS_CMD_BULK},
504 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
505 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
506 {"srem",sremCommand,3,REDIS_CMD_BULK},
507 {"smove",smoveCommand,4,REDIS_CMD_BULK},
508 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
509 {"scard",scardCommand,2,REDIS_CMD_INLINE},
510 {"spop",spopCommand,2,REDIS_CMD_INLINE},
511 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
512 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
519 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"zrem",zremCommand,3,REDIS_CMD_BULK},
522 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
523 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
524 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
525 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
526 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
527 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
533 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
534 {"select",selectCommand,2,REDIS_CMD_INLINE},
535 {"move",moveCommand,3,REDIS_CMD_INLINE},
536 {"rename",renameCommand,3,REDIS_CMD_INLINE},
537 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
538 {"expire",expireCommand,3,REDIS_CMD_INLINE},
539 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
540 {"keys",keysCommand,2,REDIS_CMD_INLINE},
541 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
542 {"auth",authCommand,2,REDIS_CMD_INLINE},
543 {"ping",pingCommand,1,REDIS_CMD_INLINE},
544 {"echo",echoCommand,2,REDIS_CMD_BULK},
545 {"save",saveCommand,1,REDIS_CMD_INLINE},
546 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
547 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
548 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
549 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
550 {"type",typeCommand,2,REDIS_CMD_INLINE},
551 {"sync",syncCommand,1,REDIS_CMD_INLINE},
552 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
553 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
554 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
555 {"info",infoCommand,1,REDIS_CMD_INLINE},
556 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
557 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
558 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
559 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
560 {NULL,NULL,0,0}
561 };
562
563 /*============================ Utility functions ============================ */
564
565 /* Glob-style pattern matching. */
566 int stringmatchlen(const char *pattern, int patternLen,
567 const char *string, int stringLen, int nocase)
568 {
569 while(patternLen) {
570 switch(pattern[0]) {
571 case '*':
572 while (pattern[1] == '*') {
573 pattern++;
574 patternLen--;
575 }
576 if (patternLen == 1)
577 return 1; /* match */
578 while(stringLen) {
579 if (stringmatchlen(pattern+1, patternLen-1,
580 string, stringLen, nocase))
581 return 1; /* match */
582 string++;
583 stringLen--;
584 }
585 return 0; /* no match */
586 break;
587 case '?':
588 if (stringLen == 0)
589 return 0; /* no match */
590 string++;
591 stringLen--;
592 break;
593 case '[':
594 {
595 int not, match;
596
597 pattern++;
598 patternLen--;
599 not = pattern[0] == '^';
600 if (not) {
601 pattern++;
602 patternLen--;
603 }
604 match = 0;
605 while(1) {
606 if (pattern[0] == '\\') {
607 pattern++;
608 patternLen--;
609 if (pattern[0] == string[0])
610 match = 1;
611 } else if (pattern[0] == ']') {
612 break;
613 } else if (patternLen == 0) {
614 pattern--;
615 patternLen++;
616 break;
617 } else if (pattern[1] == '-' && patternLen >= 3) {
618 int start = pattern[0];
619 int end = pattern[2];
620 int c = string[0];
621 if (start > end) {
622 int t = start;
623 start = end;
624 end = t;
625 }
626 if (nocase) {
627 start = tolower(start);
628 end = tolower(end);
629 c = tolower(c);
630 }
631 pattern += 2;
632 patternLen -= 2;
633 if (c >= start && c <= end)
634 match = 1;
635 } else {
636 if (!nocase) {
637 if (pattern[0] == string[0])
638 match = 1;
639 } else {
640 if (tolower((int)pattern[0]) == tolower((int)string[0]))
641 match = 1;
642 }
643 }
644 pattern++;
645 patternLen--;
646 }
647 if (not)
648 match = !match;
649 if (!match)
650 return 0; /* no match */
651 string++;
652 stringLen--;
653 break;
654 }
655 case '\\':
656 if (patternLen >= 2) {
657 pattern++;
658 patternLen--;
659 }
660 /* fall through */
661 default:
662 if (!nocase) {
663 if (pattern[0] != string[0])
664 return 0; /* no match */
665 } else {
666 if (tolower((int)pattern[0]) != tolower((int)string[0]))
667 return 0; /* no match */
668 }
669 string++;
670 stringLen--;
671 break;
672 }
673 pattern++;
674 patternLen--;
675 if (stringLen == 0) {
676 while(*pattern == '*') {
677 pattern++;
678 patternLen--;
679 }
680 break;
681 }
682 }
683 if (patternLen == 0 && stringLen == 0)
684 return 1;
685 return 0;
686 }
687
688 static void redisLog(int level, const char *fmt, ...) {
689 va_list ap;
690 FILE *fp;
691
692 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
693 if (!fp) return;
694
695 va_start(ap, fmt);
696 if (level >= server.verbosity) {
697 char *c = ".-*";
698 char buf[64];
699 time_t now;
700
701 now = time(NULL);
702 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
703 fprintf(fp,"%s %c ",buf,c[level]);
704 vfprintf(fp, fmt, ap);
705 fprintf(fp,"\n");
706 fflush(fp);
707 }
708 va_end(ap);
709
710 if (server.logfile) fclose(fp);
711 }
712
713 /*====================== Hash table type implementation ==================== */
714
715 /* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
717 * lists, sets). */
718
719 static void dictVanillaFree(void *privdata, void *val)
720 {
721 DICT_NOTUSED(privdata);
722 zfree(val);
723 }
724
725 static int sdsDictKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727 {
728 int l1,l2;
729 DICT_NOTUSED(privdata);
730
731 l1 = sdslen((sds)key1);
732 l2 = sdslen((sds)key2);
733 if (l1 != l2) return 0;
734 return memcmp(key1, key2, l1) == 0;
735 }
736
737 static void dictRedisObjectDestructor(void *privdata, void *val)
738 {
739 DICT_NOTUSED(privdata);
740
741 decrRefCount(val);
742 }
743
744 static int dictObjKeyCompare(void *privdata, const void *key1,
745 const void *key2)
746 {
747 const robj *o1 = key1, *o2 = key2;
748 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
749 }
750
751 static unsigned int dictObjHash(const void *key) {
752 const robj *o = key;
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754 }
755
756 static int dictEncObjKeyCompare(void *privdata, const void *key1,
757 const void *key2)
758 {
759 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
760 int cmp;
761
762 o1 = getDecodedObject(o1);
763 o2 = getDecodedObject(o2);
764 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 decrRefCount(o1);
766 decrRefCount(o2);
767 return cmp;
768 }
769
770 static unsigned int dictEncObjHash(const void *key) {
771 robj *o = (robj*) key;
772
773 o = getDecodedObject(o);
774 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
775 decrRefCount(o);
776 return hash;
777 }
778
779 static dictType setDictType = {
780 dictEncObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictEncObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786 };
787
788 static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795 };
796
797 static dictType hashDictType = {
798 dictObjHash, /* hash function */
799 NULL, /* key dup */
800 NULL, /* val dup */
801 dictObjKeyCompare, /* key compare */
802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804 };
805
806 /* ========================= Random utility functions ======================= */
807
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818 }
819
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
822 redisClient *c;
823 listNode *ln;
824 time_t now = time(NULL);
825
826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
836 }
837
838 static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845 }
846
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
855 dictResize(server.db[j].dict);
856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
857 }
858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
860 }
861 }
862
863 /* A background saving child (BGSAVE) terminated its work. Handle this. */
864 void backgroundSaveDoneHandler(int statloc) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsavechildpid = -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
884 }
885
886 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
887 * Handle this. */
888 void backgroundRewriteDoneHandler(int statloc) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 int fd;
894 char tmpfile[256];
895
896 redisLog(REDIS_NOTICE,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
900 fd = open(tmpfile,O_WRONLY|O_APPEND);
901 if (fd == -1) {
902 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
903 goto cleanup;
904 }
905 /* Flush our data... */
906 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
907 (signed) sdslen(server.bgrewritebuf)) {
908 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
909 close(fd);
910 goto cleanup;
911 }
912 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile,server.appendfilename) == -1) {
916 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
917 close(fd);
918 goto cleanup;
919 }
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
922 if (server.appendfd != -1) {
923 /* If append only is actually enabled... */
924 close(server.appendfd);
925 server.appendfd = fd;
926 fsync(fd);
927 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
928 } else {
929 /* If append only is disabled we just generate a dump in this
930 * format. Why not? */
931 close(fd);
932 }
933 } else if (!bysignal && exitcode != 0) {
934 redisLog(REDIS_WARNING, "Background append only file rewriting error");
935 } else {
936 redisLog(REDIS_WARNING,
937 "Background append only file rewriting terminated by signal");
938 }
939 cleanup:
940 sdsfree(server.bgrewritebuf);
941 server.bgrewritebuf = sdsempty();
942 aofRemoveTempFile(server.bgrewritechildpid);
943 server.bgrewritechildpid = -1;
944 }
945
946 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
947 int j, loops = server.cronloops++;
948 REDIS_NOTUSED(eventLoop);
949 REDIS_NOTUSED(id);
950 REDIS_NOTUSED(clientData);
951
952 /* Update the global state with the amount of used memory */
953 server.usedmemory = zmalloc_used_memory();
954
955 /* Show some info about non-empty databases */
956 for (j = 0; j < server.dbnum; j++) {
957 long long size, used, vkeys;
958
959 size = dictSlots(server.db[j].dict);
960 used = dictSize(server.db[j].dict);
961 vkeys = dictSize(server.db[j].expires);
962 if (!(loops % 5) && (used || vkeys)) {
963 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
964 /* dictPrintStats(server.dict); */
965 }
966 }
967
968 /* We don't want to resize the hash tables while a bacground saving
969 * is in progress: the saving child is created using fork() that is
970 * implemented with a copy-on-write semantic in most modern systems, so
971 * if we resize the HT while there is the saving child at work actually
972 * a lot of memory movements in the parent will cause a lot of pages
973 * copied. */
974 if (server.bgsavechildpid == -1) tryResizeHashTables();
975
976 /* Show information about connected clients */
977 if (!(loops % 5)) {
978 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
979 listLength(server.clients)-listLength(server.slaves),
980 listLength(server.slaves),
981 server.usedmemory,
982 dictSize(server.sharingpool));
983 }
984
985 /* Close connections of timedout clients */
986 if (server.maxidletime && !(loops % 10))
987 closeTimedoutClients();
988
989 /* Check if a background saving or AOF rewrite in progress terminated */
990 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
991 int statloc;
992 pid_t pid;
993
994 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
995 if (pid == server.bgsavechildpid) {
996 backgroundSaveDoneHandler(statloc);
997 } else {
998 backgroundRewriteDoneHandler(statloc);
999 }
1000 }
1001 } else {
1002 /* If there is not a background saving in progress check if
1003 * we have to save now */
1004 time_t now = time(NULL);
1005 for (j = 0; j < server.saveparamslen; j++) {
1006 struct saveparam *sp = server.saveparams+j;
1007
1008 if (server.dirty >= sp->changes &&
1009 now-server.lastsave > sp->seconds) {
1010 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1011 sp->changes, sp->seconds);
1012 rdbSaveBackground(server.dbfilename);
1013 break;
1014 }
1015 }
1016 }
1017
1018 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1019 * will use few CPU cycles if there are few expiring keys, otherwise
1020 * it will get more aggressive to avoid that too much memory is used by
1021 * keys that can be removed from the keyspace. */
1022 for (j = 0; j < server.dbnum; j++) {
1023 int expired;
1024 redisDb *db = server.db+j;
1025
1026 /* Continue to expire if at the end of the cycle more than 25%
1027 * of the keys were expired. */
1028 do {
1029 int num = dictSize(db->expires);
1030 time_t now = time(NULL);
1031
1032 expired = 0;
1033 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1034 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1035 while (num--) {
1036 dictEntry *de;
1037 time_t t;
1038
1039 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1040 t = (time_t) dictGetEntryVal(de);
1041 if (now > t) {
1042 deleteKey(db,dictGetEntryKey(de));
1043 expired++;
1044 }
1045 }
1046 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1047 }
1048
1049 /* Check if we should connect to a MASTER */
1050 if (server.replstate == REDIS_REPL_CONNECT) {
1051 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1052 if (syncWithMaster() == REDIS_OK) {
1053 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1054 }
1055 }
1056 return 1000;
1057 }
1058
1059 static void createSharedObjects(void) {
1060 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1061 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1062 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1063 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1064 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1065 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1066 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1067 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1068 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1069 /* no such key */
1070 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1071 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1072 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1073 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1074 "-ERR no such key\r\n"));
1075 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1076 "-ERR syntax error\r\n"));
1077 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1078 "-ERR source and destination objects are the same\r\n"));
1079 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1080 "-ERR index out of range\r\n"));
1081 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1082 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1083 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1084 shared.select0 = createStringObject("select 0\r\n",10);
1085 shared.select1 = createStringObject("select 1\r\n",10);
1086 shared.select2 = createStringObject("select 2\r\n",10);
1087 shared.select3 = createStringObject("select 3\r\n",10);
1088 shared.select4 = createStringObject("select 4\r\n",10);
1089 shared.select5 = createStringObject("select 5\r\n",10);
1090 shared.select6 = createStringObject("select 6\r\n",10);
1091 shared.select7 = createStringObject("select 7\r\n",10);
1092 shared.select8 = createStringObject("select 8\r\n",10);
1093 shared.select9 = createStringObject("select 9\r\n",10);
1094 }
1095
1096 static void appendServerSaveParams(time_t seconds, int changes) {
1097 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1098 server.saveparams[server.saveparamslen].seconds = seconds;
1099 server.saveparams[server.saveparamslen].changes = changes;
1100 server.saveparamslen++;
1101 }
1102
1103 static void resetServerSaveParams() {
1104 zfree(server.saveparams);
1105 server.saveparams = NULL;
1106 server.saveparamslen = 0;
1107 }
1108
1109 static void initServerConfig() {
1110 server.dbnum = REDIS_DEFAULT_DBNUM;
1111 server.port = REDIS_SERVERPORT;
1112 server.verbosity = REDIS_DEBUG;
1113 server.maxidletime = REDIS_MAXIDLETIME;
1114 server.saveparams = NULL;
1115 server.logfile = NULL; /* NULL = log on standard output */
1116 server.bindaddr = NULL;
1117 server.glueoutputbuf = 1;
1118 server.daemonize = 0;
1119 server.appendonly = 0;
1120 server.appendfsync = APPENDFSYNC_ALWAYS;
1121 server.lastfsync = time(NULL);
1122 server.appendfd = -1;
1123 server.appendseldb = -1; /* Make sure the first time will not match */
1124 server.pidfile = "/var/run/redis.pid";
1125 server.dbfilename = "dump.rdb";
1126 server.appendfilename = "appendonly.aof";
1127 server.requirepass = NULL;
1128 server.shareobjects = 0;
1129 server.sharingpoolsize = 1024;
1130 server.maxclients = 0;
1131 server.maxmemory = 0;
1132 resetServerSaveParams();
1133
1134 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1135 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1136 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1137 /* Replication related */
1138 server.isslave = 0;
1139 server.masterauth = NULL;
1140 server.masterhost = NULL;
1141 server.masterport = 6379;
1142 server.master = NULL;
1143 server.replstate = REDIS_REPL_NONE;
1144
1145 /* Double constants initialization */
1146 R_Zero = 0.0;
1147 R_PosInf = 1.0/R_Zero;
1148 R_NegInf = -1.0/R_Zero;
1149 R_Nan = R_Zero/R_Zero;
1150 }
1151
1152 static void initServer() {
1153 int j;
1154
1155 signal(SIGHUP, SIG_IGN);
1156 signal(SIGPIPE, SIG_IGN);
1157 setupSigSegvAction();
1158
1159 server.clients = listCreate();
1160 server.slaves = listCreate();
1161 server.monitors = listCreate();
1162 server.objfreelist = listCreate();
1163 createSharedObjects();
1164 server.el = aeCreateEventLoop();
1165 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1166 server.sharingpool = dictCreate(&setDictType,NULL);
1167 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1168 if (server.fd == -1) {
1169 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1170 exit(1);
1171 }
1172 for (j = 0; j < server.dbnum; j++) {
1173 server.db[j].dict = dictCreate(&hashDictType,NULL);
1174 server.db[j].expires = dictCreate(&setDictType,NULL);
1175 server.db[j].id = j;
1176 }
1177 server.cronloops = 0;
1178 server.bgsavechildpid = -1;
1179 server.bgrewritechildpid = -1;
1180 server.bgrewritebuf = sdsempty();
1181 server.lastsave = time(NULL);
1182 server.dirty = 0;
1183 server.usedmemory = 0;
1184 server.stat_numcommands = 0;
1185 server.stat_numconnections = 0;
1186 server.stat_starttime = time(NULL);
1187 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1188
1189 if (server.appendonly) {
1190 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1191 if (server.appendfd == -1) {
1192 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1193 strerror(errno));
1194 exit(1);
1195 }
1196 }
1197 }
1198
1199 /* Empty the whole database */
1200 static long long emptyDb() {
1201 int j;
1202 long long removed = 0;
1203
1204 for (j = 0; j < server.dbnum; j++) {
1205 removed += dictSize(server.db[j].dict);
1206 dictEmpty(server.db[j].dict);
1207 dictEmpty(server.db[j].expires);
1208 }
1209 return removed;
1210 }
1211
1212 static int yesnotoi(char *s) {
1213 if (!strcasecmp(s,"yes")) return 1;
1214 else if (!strcasecmp(s,"no")) return 0;
1215 else return -1;
1216 }
1217
1218 /* I agree, this is a very rudimental way to load a configuration...
1219 will improve later if the config gets more complex */
1220 static void loadServerConfig(char *filename) {
1221 FILE *fp;
1222 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1223 int linenum = 0;
1224 sds line = NULL;
1225
1226 if (filename[0] == '-' && filename[1] == '\0')
1227 fp = stdin;
1228 else {
1229 if ((fp = fopen(filename,"r")) == NULL) {
1230 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1231 exit(1);
1232 }
1233 }
1234
1235 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1236 sds *argv;
1237 int argc, j;
1238
1239 linenum++;
1240 line = sdsnew(buf);
1241 line = sdstrim(line," \t\r\n");
1242
1243 /* Skip comments and blank lines*/
1244 if (line[0] == '#' || line[0] == '\0') {
1245 sdsfree(line);
1246 continue;
1247 }
1248
1249 /* Split into arguments */
1250 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1251 sdstolower(argv[0]);
1252
1253 /* Execute config directives */
1254 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1255 server.maxidletime = atoi(argv[1]);
1256 if (server.maxidletime < 0) {
1257 err = "Invalid timeout value"; goto loaderr;
1258 }
1259 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1260 server.port = atoi(argv[1]);
1261 if (server.port < 1 || server.port > 65535) {
1262 err = "Invalid port"; goto loaderr;
1263 }
1264 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1265 server.bindaddr = zstrdup(argv[1]);
1266 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1267 int seconds = atoi(argv[1]);
1268 int changes = atoi(argv[2]);
1269 if (seconds < 1 || changes < 0) {
1270 err = "Invalid save parameters"; goto loaderr;
1271 }
1272 appendServerSaveParams(seconds,changes);
1273 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1274 if (chdir(argv[1]) == -1) {
1275 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1276 argv[1], strerror(errno));
1277 exit(1);
1278 }
1279 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1280 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1281 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1282 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1283 else {
1284 err = "Invalid log level. Must be one of debug, notice, warning";
1285 goto loaderr;
1286 }
1287 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1288 FILE *logfp;
1289
1290 server.logfile = zstrdup(argv[1]);
1291 if (!strcasecmp(server.logfile,"stdout")) {
1292 zfree(server.logfile);
1293 server.logfile = NULL;
1294 }
1295 if (server.logfile) {
1296 /* Test if we are able to open the file. The server will not
1297 * be able to abort just for this problem later... */
1298 logfp = fopen(server.logfile,"a");
1299 if (logfp == NULL) {
1300 err = sdscatprintf(sdsempty(),
1301 "Can't open the log file: %s", strerror(errno));
1302 goto loaderr;
1303 }
1304 fclose(logfp);
1305 }
1306 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1307 server.dbnum = atoi(argv[1]);
1308 if (server.dbnum < 1) {
1309 err = "Invalid number of databases"; goto loaderr;
1310 }
1311 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1312 server.maxclients = atoi(argv[1]);
1313 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1314 server.maxmemory = strtoll(argv[1], NULL, 10);
1315 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1316 server.masterhost = sdsnew(argv[1]);
1317 server.masterport = atoi(argv[2]);
1318 server.replstate = REDIS_REPL_CONNECT;
1319 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1320 server.masterauth = zstrdup(argv[1]);
1321 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1322 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1323 err = "argument must be 'yes' or 'no'"; goto loaderr;
1324 }
1325 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1326 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1327 err = "argument must be 'yes' or 'no'"; goto loaderr;
1328 }
1329 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1330 server.sharingpoolsize = atoi(argv[1]);
1331 if (server.sharingpoolsize < 1) {
1332 err = "invalid object sharing pool size"; goto loaderr;
1333 }
1334 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1335 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1336 err = "argument must be 'yes' or 'no'"; goto loaderr;
1337 }
1338 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1339 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1340 err = "argument must be 'yes' or 'no'"; goto loaderr;
1341 }
1342 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1343 if (!strcasecmp(argv[1],"no")) {
1344 server.appendfsync = APPENDFSYNC_NO;
1345 } else if (!strcasecmp(argv[1],"always")) {
1346 server.appendfsync = APPENDFSYNC_ALWAYS;
1347 } else if (!strcasecmp(argv[1],"everysec")) {
1348 server.appendfsync = APPENDFSYNC_EVERYSEC;
1349 } else {
1350 err = "argument must be 'no', 'always' or 'everysec'";
1351 goto loaderr;
1352 }
1353 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1354 server.requirepass = zstrdup(argv[1]);
1355 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1356 server.pidfile = zstrdup(argv[1]);
1357 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1358 server.dbfilename = zstrdup(argv[1]);
1359 } else {
1360 err = "Bad directive or wrong number of arguments"; goto loaderr;
1361 }
1362 for (j = 0; j < argc; j++)
1363 sdsfree(argv[j]);
1364 zfree(argv);
1365 sdsfree(line);
1366 }
1367 if (fp != stdin) fclose(fp);
1368 return;
1369
1370 loaderr:
1371 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1372 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1373 fprintf(stderr, ">>> '%s'\n", line);
1374 fprintf(stderr, "%s\n", err);
1375 exit(1);
1376 }
1377
1378 static void freeClientArgv(redisClient *c) {
1379 int j;
1380
1381 for (j = 0; j < c->argc; j++)
1382 decrRefCount(c->argv[j]);
1383 for (j = 0; j < c->mbargc; j++)
1384 decrRefCount(c->mbargv[j]);
1385 c->argc = 0;
1386 c->mbargc = 0;
1387 }
1388
1389 static void freeClient(redisClient *c) {
1390 listNode *ln;
1391
1392 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1393 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1394 sdsfree(c->querybuf);
1395 listRelease(c->reply);
1396 freeClientArgv(c);
1397 close(c->fd);
1398 ln = listSearchKey(server.clients,c);
1399 assert(ln != NULL);
1400 listDelNode(server.clients,ln);
1401 if (c->flags & REDIS_SLAVE) {
1402 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1403 close(c->repldbfd);
1404 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1405 ln = listSearchKey(l,c);
1406 assert(ln != NULL);
1407 listDelNode(l,ln);
1408 }
1409 if (c->flags & REDIS_MASTER) {
1410 server.master = NULL;
1411 server.replstate = REDIS_REPL_CONNECT;
1412 }
1413 zfree(c->argv);
1414 zfree(c->mbargv);
1415 zfree(c);
1416 }
1417
1418 #define GLUEREPLY_UP_TO (1024)
1419 static void glueReplyBuffersIfNeeded(redisClient *c) {
1420 int copylen = 0;
1421 char buf[GLUEREPLY_UP_TO];
1422 listNode *ln;
1423 robj *o;
1424
1425 listRewind(c->reply);
1426 while((ln = listYield(c->reply))) {
1427 int objlen;
1428
1429 o = ln->value;
1430 objlen = sdslen(o->ptr);
1431 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1432 memcpy(buf+copylen,o->ptr,objlen);
1433 copylen += objlen;
1434 listDelNode(c->reply,ln);
1435 } else {
1436 if (copylen == 0) return;
1437 break;
1438 }
1439 }
1440 /* Now the output buffer is empty, add the new single element */
1441 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1442 listAddNodeHead(c->reply,o);
1443 }
1444
1445 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1446 redisClient *c = privdata;
1447 int nwritten = 0, totwritten = 0, objlen;
1448 robj *o;
1449 REDIS_NOTUSED(el);
1450 REDIS_NOTUSED(mask);
1451
1452 /* Use writev() if we have enough buffers to send */
1453 if (!server.glueoutputbuf &&
1454 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1455 !(c->flags & REDIS_MASTER))
1456 {
1457 sendReplyToClientWritev(el, fd, privdata, mask);
1458 return;
1459 }
1460
1461 while(listLength(c->reply)) {
1462 if (server.glueoutputbuf && listLength(c->reply) > 1)
1463 glueReplyBuffersIfNeeded(c);
1464
1465 o = listNodeValue(listFirst(c->reply));
1466 objlen = sdslen(o->ptr);
1467
1468 if (objlen == 0) {
1469 listDelNode(c->reply,listFirst(c->reply));
1470 continue;
1471 }
1472
1473 if (c->flags & REDIS_MASTER) {
1474 /* Don't reply to a master */
1475 nwritten = objlen - c->sentlen;
1476 } else {
1477 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1478 if (nwritten <= 0) break;
1479 }
1480 c->sentlen += nwritten;
1481 totwritten += nwritten;
1482 /* If we fully sent the object on head go to the next one */
1483 if (c->sentlen == objlen) {
1484 listDelNode(c->reply,listFirst(c->reply));
1485 c->sentlen = 0;
1486 }
1487 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1488 * bytes, in a single threaded server it's a good idea to serve
1489 * other clients as well, even if a very large request comes from
1490 * super fast link that is always able to accept data (in real world
1491 * scenario think about 'KEYS *' against the loopback interfae) */
1492 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1493 }
1494 if (nwritten == -1) {
1495 if (errno == EAGAIN) {
1496 nwritten = 0;
1497 } else {
1498 redisLog(REDIS_DEBUG,
1499 "Error writing to client: %s", strerror(errno));
1500 freeClient(c);
1501 return;
1502 }
1503 }
1504 if (totwritten > 0) c->lastinteraction = time(NULL);
1505 if (listLength(c->reply) == 0) {
1506 c->sentlen = 0;
1507 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1508 }
1509 }
1510
1511 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1512 {
1513 redisClient *c = privdata;
1514 int nwritten = 0, totwritten = 0, objlen, willwrite;
1515 robj *o;
1516 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1517 int offset, ion = 0;
1518 REDIS_NOTUSED(el);
1519 REDIS_NOTUSED(mask);
1520
1521 listNode *node;
1522 while (listLength(c->reply)) {
1523 offset = c->sentlen;
1524 ion = 0;
1525 willwrite = 0;
1526
1527 /* fill-in the iov[] array */
1528 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1529 o = listNodeValue(node);
1530 objlen = sdslen(o->ptr);
1531
1532 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1533 break;
1534
1535 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1536 break; /* no more iovecs */
1537
1538 iov[ion].iov_base = ((char*)o->ptr) + offset;
1539 iov[ion].iov_len = objlen - offset;
1540 willwrite += objlen - offset;
1541 offset = 0; /* just for the first item */
1542 ion++;
1543 }
1544
1545 if(willwrite == 0)
1546 break;
1547
1548 /* write all collected blocks at once */
1549 if((nwritten = writev(fd, iov, ion)) < 0) {
1550 if (errno != EAGAIN) {
1551 redisLog(REDIS_DEBUG,
1552 "Error writing to client: %s", strerror(errno));
1553 freeClient(c);
1554 return;
1555 }
1556 break;
1557 }
1558
1559 totwritten += nwritten;
1560 offset = c->sentlen;
1561
1562 /* remove written robjs from c->reply */
1563 while (nwritten && listLength(c->reply)) {
1564 o = listNodeValue(listFirst(c->reply));
1565 objlen = sdslen(o->ptr);
1566
1567 if(nwritten >= objlen - offset) {
1568 listDelNode(c->reply, listFirst(c->reply));
1569 nwritten -= objlen - offset;
1570 c->sentlen = 0;
1571 } else {
1572 /* partial write */
1573 c->sentlen += nwritten;
1574 break;
1575 }
1576 offset = 0;
1577 }
1578 }
1579
1580 if (totwritten > 0)
1581 c->lastinteraction = time(NULL);
1582
1583 if (listLength(c->reply) == 0) {
1584 c->sentlen = 0;
1585 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1586 }
1587 }
1588
1589 static struct redisCommand *lookupCommand(char *name) {
1590 int j = 0;
1591 while(cmdTable[j].name != NULL) {
1592 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1593 j++;
1594 }
1595 return NULL;
1596 }
1597
1598 /* resetClient prepare the client to process the next command */
1599 static void resetClient(redisClient *c) {
1600 freeClientArgv(c);
1601 c->bulklen = -1;
1602 c->multibulk = 0;
1603 }
1604
1605 /* If this function gets called we already read a whole
1606 * command, argments are in the client argv/argc fields.
1607 * processCommand() execute the command or prepare the
1608 * server for a bulk read from the client.
1609 *
1610 * If 1 is returned the client is still alive and valid and
1611 * and other operations can be performed by the caller. Otherwise
1612 * if 0 is returned the client was destroied (i.e. after QUIT). */
1613 static int processCommand(redisClient *c) {
1614 struct redisCommand *cmd;
1615 long long dirty;
1616
1617 /* Free some memory if needed (maxmemory setting) */
1618 if (server.maxmemory) freeMemoryIfNeeded();
1619
1620 /* Handle the multi bulk command type. This is an alternative protocol
1621 * supported by Redis in order to receive commands that are composed of
1622 * multiple binary-safe "bulk" arguments. The latency of processing is
1623 * a bit higher but this allows things like multi-sets, so if this
1624 * protocol is used only for MSET and similar commands this is a big win. */
1625 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1626 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1627 if (c->multibulk <= 0) {
1628 resetClient(c);
1629 return 1;
1630 } else {
1631 decrRefCount(c->argv[c->argc-1]);
1632 c->argc--;
1633 return 1;
1634 }
1635 } else if (c->multibulk) {
1636 if (c->bulklen == -1) {
1637 if (((char*)c->argv[0]->ptr)[0] != '$') {
1638 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1639 resetClient(c);
1640 return 1;
1641 } else {
1642 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1643 decrRefCount(c->argv[0]);
1644 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1645 c->argc--;
1646 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1647 resetClient(c);
1648 return 1;
1649 }
1650 c->argc--;
1651 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1652 return 1;
1653 }
1654 } else {
1655 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1656 c->mbargv[c->mbargc] = c->argv[0];
1657 c->mbargc++;
1658 c->argc--;
1659 c->multibulk--;
1660 if (c->multibulk == 0) {
1661 robj **auxargv;
1662 int auxargc;
1663
1664 /* Here we need to swap the multi-bulk argc/argv with the
1665 * normal argc/argv of the client structure. */
1666 auxargv = c->argv;
1667 c->argv = c->mbargv;
1668 c->mbargv = auxargv;
1669
1670 auxargc = c->argc;
1671 c->argc = c->mbargc;
1672 c->mbargc = auxargc;
1673
1674 /* We need to set bulklen to something different than -1
1675 * in order for the code below to process the command without
1676 * to try to read the last argument of a bulk command as
1677 * a special argument. */
1678 c->bulklen = 0;
1679 /* continue below and process the command */
1680 } else {
1681 c->bulklen = -1;
1682 return 1;
1683 }
1684 }
1685 }
1686 /* -- end of multi bulk commands processing -- */
1687
1688 /* The QUIT command is handled as a special case. Normal command
1689 * procs are unable to close the client connection safely */
1690 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1691 freeClient(c);
1692 return 0;
1693 }
1694 cmd = lookupCommand(c->argv[0]->ptr);
1695 if (!cmd) {
1696 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1697 resetClient(c);
1698 return 1;
1699 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1700 (c->argc < -cmd->arity)) {
1701 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1702 resetClient(c);
1703 return 1;
1704 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1705 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1706 resetClient(c);
1707 return 1;
1708 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1709 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1710
1711 decrRefCount(c->argv[c->argc-1]);
1712 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1713 c->argc--;
1714 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1715 resetClient(c);
1716 return 1;
1717 }
1718 c->argc--;
1719 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1720 /* It is possible that the bulk read is already in the
1721 * buffer. Check this condition and handle it accordingly.
1722 * This is just a fast path, alternative to call processInputBuffer().
1723 * It's a good idea since the code is small and this condition
1724 * happens most of the times. */
1725 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1726 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1727 c->argc++;
1728 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1729 } else {
1730 return 1;
1731 }
1732 }
1733 /* Let's try to share objects on the command arguments vector */
1734 if (server.shareobjects) {
1735 int j;
1736 for(j = 1; j < c->argc; j++)
1737 c->argv[j] = tryObjectSharing(c->argv[j]);
1738 }
1739 /* Let's try to encode the bulk object to save space. */
1740 if (cmd->flags & REDIS_CMD_BULK)
1741 tryObjectEncoding(c->argv[c->argc-1]);
1742
1743 /* Check if the user is authenticated */
1744 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1745 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1746 resetClient(c);
1747 return 1;
1748 }
1749
1750 /* Exec the command */
1751 dirty = server.dirty;
1752 cmd->proc(c);
1753 if (server.appendonly && server.dirty-dirty)
1754 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1755 if (server.dirty-dirty && listLength(server.slaves))
1756 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1757 if (listLength(server.monitors))
1758 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1759 server.stat_numcommands++;
1760
1761 /* Prepare the client for the next command */
1762 if (c->flags & REDIS_CLOSE) {
1763 freeClient(c);
1764 return 0;
1765 }
1766 resetClient(c);
1767 return 1;
1768 }
1769
1770 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1771 listNode *ln;
1772 int outc = 0, j;
1773 robj **outv;
1774 /* (args*2)+1 is enough room for args, spaces, newlines */
1775 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1776
1777 if (argc <= REDIS_STATIC_ARGS) {
1778 outv = static_outv;
1779 } else {
1780 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1781 }
1782
1783 for (j = 0; j < argc; j++) {
1784 if (j != 0) outv[outc++] = shared.space;
1785 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1786 robj *lenobj;
1787
1788 lenobj = createObject(REDIS_STRING,
1789 sdscatprintf(sdsempty(),"%d\r\n",
1790 stringObjectLen(argv[j])));
1791 lenobj->refcount = 0;
1792 outv[outc++] = lenobj;
1793 }
1794 outv[outc++] = argv[j];
1795 }
1796 outv[outc++] = shared.crlf;
1797
1798 /* Increment all the refcounts at start and decrement at end in order to
1799 * be sure to free objects if there is no slave in a replication state
1800 * able to be feed with commands */
1801 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1802 listRewind(slaves);
1803 while((ln = listYield(slaves))) {
1804 redisClient *slave = ln->value;
1805
1806 /* Don't feed slaves that are still waiting for BGSAVE to start */
1807 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1808
1809 /* Feed all the other slaves, MONITORs and so on */
1810 if (slave->slaveseldb != dictid) {
1811 robj *selectcmd;
1812
1813 switch(dictid) {
1814 case 0: selectcmd = shared.select0; break;
1815 case 1: selectcmd = shared.select1; break;
1816 case 2: selectcmd = shared.select2; break;
1817 case 3: selectcmd = shared.select3; break;
1818 case 4: selectcmd = shared.select4; break;
1819 case 5: selectcmd = shared.select5; break;
1820 case 6: selectcmd = shared.select6; break;
1821 case 7: selectcmd = shared.select7; break;
1822 case 8: selectcmd = shared.select8; break;
1823 case 9: selectcmd = shared.select9; break;
1824 default:
1825 selectcmd = createObject(REDIS_STRING,
1826 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1827 selectcmd->refcount = 0;
1828 break;
1829 }
1830 addReply(slave,selectcmd);
1831 slave->slaveseldb = dictid;
1832 }
1833 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1834 }
1835 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1836 if (outv != static_outv) zfree(outv);
1837 }
1838
1839 static void processInputBuffer(redisClient *c) {
1840 again:
1841 if (c->bulklen == -1) {
1842 /* Read the first line of the query */
1843 char *p = strchr(c->querybuf,'\n');
1844 size_t querylen;
1845
1846 if (p) {
1847 sds query, *argv;
1848 int argc, j;
1849
1850 query = c->querybuf;
1851 c->querybuf = sdsempty();
1852 querylen = 1+(p-(query));
1853 if (sdslen(query) > querylen) {
1854 /* leave data after the first line of the query in the buffer */
1855 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1856 }
1857 *p = '\0'; /* remove "\n" */
1858 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1859 sdsupdatelen(query);
1860
1861 /* Now we can split the query in arguments */
1862 if (sdslen(query) == 0) {
1863 /* Ignore empty query */
1864 sdsfree(query);
1865 return;
1866 }
1867 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1868 sdsfree(query);
1869
1870 if (c->argv) zfree(c->argv);
1871 c->argv = zmalloc(sizeof(robj*)*argc);
1872
1873 for (j = 0; j < argc; j++) {
1874 if (sdslen(argv[j])) {
1875 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1876 c->argc++;
1877 } else {
1878 sdsfree(argv[j]);
1879 }
1880 }
1881 zfree(argv);
1882 /* Execute the command. If the client is still valid
1883 * after processCommand() return and there is something
1884 * on the query buffer try to process the next command. */
1885 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1886 return;
1887 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1888 redisLog(REDIS_DEBUG, "Client protocol error");
1889 freeClient(c);
1890 return;
1891 }
1892 } else {
1893 /* Bulk read handling. Note that if we are at this point
1894 the client already sent a command terminated with a newline,
1895 we are reading the bulk data that is actually the last
1896 argument of the command. */
1897 int qbl = sdslen(c->querybuf);
1898
1899 if (c->bulklen <= qbl) {
1900 /* Copy everything but the final CRLF as final argument */
1901 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1902 c->argc++;
1903 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1904 /* Process the command. If the client is still valid after
1905 * the processing and there is more data in the buffer
1906 * try to parse it. */
1907 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1908 return;
1909 }
1910 }
1911 }
1912
1913 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1914 redisClient *c = (redisClient*) privdata;
1915 char buf[REDIS_IOBUF_LEN];
1916 int nread;
1917 REDIS_NOTUSED(el);
1918 REDIS_NOTUSED(mask);
1919
1920 nread = read(fd, buf, REDIS_IOBUF_LEN);
1921 if (nread == -1) {
1922 if (errno == EAGAIN) {
1923 nread = 0;
1924 } else {
1925 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1926 freeClient(c);
1927 return;
1928 }
1929 } else if (nread == 0) {
1930 redisLog(REDIS_DEBUG, "Client closed connection");
1931 freeClient(c);
1932 return;
1933 }
1934 if (nread) {
1935 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1936 c->lastinteraction = time(NULL);
1937 } else {
1938 return;
1939 }
1940 processInputBuffer(c);
1941 }
1942
1943 static int selectDb(redisClient *c, int id) {
1944 if (id < 0 || id >= server.dbnum)
1945 return REDIS_ERR;
1946 c->db = &server.db[id];
1947 return REDIS_OK;
1948 }
1949
1950 static void *dupClientReplyValue(void *o) {
1951 incrRefCount((robj*)o);
1952 return 0;
1953 }
1954
1955 static redisClient *createClient(int fd) {
1956 redisClient *c = zmalloc(sizeof(*c));
1957
1958 anetNonBlock(NULL,fd);
1959 anetTcpNoDelay(NULL,fd);
1960 if (!c) return NULL;
1961 selectDb(c,0);
1962 c->fd = fd;
1963 c->querybuf = sdsempty();
1964 c->argc = 0;
1965 c->argv = NULL;
1966 c->bulklen = -1;
1967 c->multibulk = 0;
1968 c->mbargc = 0;
1969 c->mbargv = NULL;
1970 c->sentlen = 0;
1971 c->flags = 0;
1972 c->lastinteraction = time(NULL);
1973 c->authenticated = 0;
1974 c->replstate = REDIS_REPL_NONE;
1975 c->reply = listCreate();
1976 listSetFreeMethod(c->reply,decrRefCount);
1977 listSetDupMethod(c->reply,dupClientReplyValue);
1978 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1979 readQueryFromClient, c) == AE_ERR) {
1980 freeClient(c);
1981 return NULL;
1982 }
1983 listAddNodeTail(server.clients,c);
1984 return c;
1985 }
1986
1987 static void addReply(redisClient *c, robj *obj) {
1988 if (listLength(c->reply) == 0 &&
1989 (c->replstate == REDIS_REPL_NONE ||
1990 c->replstate == REDIS_REPL_ONLINE) &&
1991 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1992 sendReplyToClient, c) == AE_ERR) return;
1993 listAddNodeTail(c->reply,getDecodedObject(obj));
1994 }
1995
1996 static void addReplySds(redisClient *c, sds s) {
1997 robj *o = createObject(REDIS_STRING,s);
1998 addReply(c,o);
1999 decrRefCount(o);
2000 }
2001
2002 static void addReplyDouble(redisClient *c, double d) {
2003 char buf[128];
2004
2005 snprintf(buf,sizeof(buf),"%.17g",d);
2006 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2007 strlen(buf),buf));
2008 }
2009
2010 static void addReplyBulkLen(redisClient *c, robj *obj) {
2011 size_t len;
2012
2013 if (obj->encoding == REDIS_ENCODING_RAW) {
2014 len = sdslen(obj->ptr);
2015 } else {
2016 long n = (long)obj->ptr;
2017
2018 len = 1;
2019 if (n < 0) {
2020 len++;
2021 n = -n;
2022 }
2023 while((n = n/10) != 0) {
2024 len++;
2025 }
2026 }
2027 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
2028 }
2029
2030 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2031 int cport, cfd;
2032 char cip[128];
2033 redisClient *c;
2034 REDIS_NOTUSED(el);
2035 REDIS_NOTUSED(mask);
2036 REDIS_NOTUSED(privdata);
2037
2038 cfd = anetAccept(server.neterr, fd, cip, &cport);
2039 if (cfd == AE_ERR) {
2040 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2041 return;
2042 }
2043 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2044 if ((c = createClient(cfd)) == NULL) {
2045 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2046 close(cfd); /* May be already closed, just ingore errors */
2047 return;
2048 }
2049 /* If maxclient directive is set and this is one client more... close the
2050 * connection. Note that we create the client instead to check before
2051 * for this condition, since now the socket is already set in nonblocking
2052 * mode and we can send an error for free using the Kernel I/O */
2053 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2054 char *err = "-ERR max number of clients reached\r\n";
2055
2056 /* That's a best effort error message, don't check write errors */
2057 if (write(c->fd,err,strlen(err)) == -1) {
2058 /* Nothing to do, Just to avoid the warning... */
2059 }
2060 freeClient(c);
2061 return;
2062 }
2063 server.stat_numconnections++;
2064 }
2065
2066 /* ======================= Redis objects implementation ===================== */
2067
2068 static robj *createObject(int type, void *ptr) {
2069 robj *o;
2070
2071 if (listLength(server.objfreelist)) {
2072 listNode *head = listFirst(server.objfreelist);
2073 o = listNodeValue(head);
2074 listDelNode(server.objfreelist,head);
2075 } else {
2076 o = zmalloc(sizeof(*o));
2077 }
2078 o->type = type;
2079 o->encoding = REDIS_ENCODING_RAW;
2080 o->ptr = ptr;
2081 o->refcount = 1;
2082 return o;
2083 }
2084
2085 static robj *createStringObject(char *ptr, size_t len) {
2086 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2087 }
2088
2089 static robj *createListObject(void) {
2090 list *l = listCreate();
2091
2092 listSetFreeMethod(l,decrRefCount);
2093 return createObject(REDIS_LIST,l);
2094 }
2095
2096 static robj *createSetObject(void) {
2097 dict *d = dictCreate(&setDictType,NULL);
2098 return createObject(REDIS_SET,d);
2099 }
2100
2101 static robj *createZsetObject(void) {
2102 zset *zs = zmalloc(sizeof(*zs));
2103
2104 zs->dict = dictCreate(&zsetDictType,NULL);
2105 zs->zsl = zslCreate();
2106 return createObject(REDIS_ZSET,zs);
2107 }
2108
2109 static void freeStringObject(robj *o) {
2110 if (o->encoding == REDIS_ENCODING_RAW) {
2111 sdsfree(o->ptr);
2112 }
2113 }
2114
2115 static void freeListObject(robj *o) {
2116 listRelease((list*) o->ptr);
2117 }
2118
2119 static void freeSetObject(robj *o) {
2120 dictRelease((dict*) o->ptr);
2121 }
2122
2123 static void freeZsetObject(robj *o) {
2124 zset *zs = o->ptr;
2125
2126 dictRelease(zs->dict);
2127 zslFree(zs->zsl);
2128 zfree(zs);
2129 }
2130
2131 static void freeHashObject(robj *o) {
2132 dictRelease((dict*) o->ptr);
2133 }
2134
2135 static void incrRefCount(robj *o) {
2136 o->refcount++;
2137 #ifdef DEBUG_REFCOUNT
2138 if (o->type == REDIS_STRING)
2139 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2140 #endif
2141 }
2142
2143 static void decrRefCount(void *obj) {
2144 robj *o = obj;
2145
2146 #ifdef DEBUG_REFCOUNT
2147 if (o->type == REDIS_STRING)
2148 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2149 #endif
2150 if (--(o->refcount) == 0) {
2151 switch(o->type) {
2152 case REDIS_STRING: freeStringObject(o); break;
2153 case REDIS_LIST: freeListObject(o); break;
2154 case REDIS_SET: freeSetObject(o); break;
2155 case REDIS_ZSET: freeZsetObject(o); break;
2156 case REDIS_HASH: freeHashObject(o); break;
2157 default: assert(0 != 0); break;
2158 }
2159 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2160 !listAddNodeHead(server.objfreelist,o))
2161 zfree(o);
2162 }
2163 }
2164
2165 static robj *lookupKey(redisDb *db, robj *key) {
2166 dictEntry *de = dictFind(db->dict,key);
2167 return de ? dictGetEntryVal(de) : NULL;
2168 }
2169
2170 static robj *lookupKeyRead(redisDb *db, robj *key) {
2171 expireIfNeeded(db,key);
2172 return lookupKey(db,key);
2173 }
2174
2175 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2176 deleteIfVolatile(db,key);
2177 return lookupKey(db,key);
2178 }
2179
2180 static int deleteKey(redisDb *db, robj *key) {
2181 int retval;
2182
2183 /* We need to protect key from destruction: after the first dictDelete()
2184 * it may happen that 'key' is no longer valid if we don't increment
2185 * it's count. This may happen when we get the object reference directly
2186 * from the hash table with dictRandomKey() or dict iterators */
2187 incrRefCount(key);
2188 if (dictSize(db->expires)) dictDelete(db->expires,key);
2189 retval = dictDelete(db->dict,key);
2190 decrRefCount(key);
2191
2192 return retval == DICT_OK;
2193 }
2194
2195 /* Try to share an object against the shared objects pool */
2196 static robj *tryObjectSharing(robj *o) {
2197 struct dictEntry *de;
2198 unsigned long c;
2199
2200 if (o == NULL || server.shareobjects == 0) return o;
2201
2202 assert(o->type == REDIS_STRING);
2203 de = dictFind(server.sharingpool,o);
2204 if (de) {
2205 robj *shared = dictGetEntryKey(de);
2206
2207 c = ((unsigned long) dictGetEntryVal(de))+1;
2208 dictGetEntryVal(de) = (void*) c;
2209 incrRefCount(shared);
2210 decrRefCount(o);
2211 return shared;
2212 } else {
2213 /* Here we are using a stream algorihtm: Every time an object is
2214 * shared we increment its count, everytime there is a miss we
2215 * recrement the counter of a random object. If this object reaches
2216 * zero we remove the object and put the current object instead. */
2217 if (dictSize(server.sharingpool) >=
2218 server.sharingpoolsize) {
2219 de = dictGetRandomKey(server.sharingpool);
2220 assert(de != NULL);
2221 c = ((unsigned long) dictGetEntryVal(de))-1;
2222 dictGetEntryVal(de) = (void*) c;
2223 if (c == 0) {
2224 dictDelete(server.sharingpool,de->key);
2225 }
2226 } else {
2227 c = 0; /* If the pool is empty we want to add this object */
2228 }
2229 if (c == 0) {
2230 int retval;
2231
2232 retval = dictAdd(server.sharingpool,o,(void*)1);
2233 assert(retval == DICT_OK);
2234 incrRefCount(o);
2235 }
2236 return o;
2237 }
2238 }
2239
2240 /* Check if the nul-terminated string 's' can be represented by a long
2241 * (that is, is a number that fits into long without any other space or
2242 * character before or after the digits).
2243 *
2244 * If so, the function returns REDIS_OK and *longval is set to the value
2245 * of the number. Otherwise REDIS_ERR is returned */
2246 static int isStringRepresentableAsLong(sds s, long *longval) {
2247 char buf[32], *endptr;
2248 long value;
2249 int slen;
2250
2251 value = strtol(s, &endptr, 10);
2252 if (endptr[0] != '\0') return REDIS_ERR;
2253 slen = snprintf(buf,32,"%ld",value);
2254
2255 /* If the number converted back into a string is not identical
2256 * then it's not possible to encode the string as integer */
2257 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2258 if (longval) *longval = value;
2259 return REDIS_OK;
2260 }
2261
2262 /* Try to encode a string object in order to save space */
2263 static int tryObjectEncoding(robj *o) {
2264 long value;
2265 sds s = o->ptr;
2266
2267 if (o->encoding != REDIS_ENCODING_RAW)
2268 return REDIS_ERR; /* Already encoded */
2269
2270 /* It's not save to encode shared objects: shared objects can be shared
2271 * everywhere in the "object space" of Redis. Encoded objects can only
2272 * appear as "values" (and not, for instance, as keys) */
2273 if (o->refcount > 1) return REDIS_ERR;
2274
2275 /* Currently we try to encode only strings */
2276 assert(o->type == REDIS_STRING);
2277
2278 /* Check if we can represent this string as a long integer */
2279 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2280
2281 /* Ok, this object can be encoded */
2282 o->encoding = REDIS_ENCODING_INT;
2283 sdsfree(o->ptr);
2284 o->ptr = (void*) value;
2285 return REDIS_OK;
2286 }
2287
2288 /* Get a decoded version of an encoded object (returned as a new object).
2289 * If the object is already raw-encoded just increment the ref count. */
2290 static robj *getDecodedObject(robj *o) {
2291 robj *dec;
2292
2293 if (o->encoding == REDIS_ENCODING_RAW) {
2294 incrRefCount(o);
2295 return o;
2296 }
2297 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2298 char buf[32];
2299
2300 snprintf(buf,32,"%ld",(long)o->ptr);
2301 dec = createStringObject(buf,strlen(buf));
2302 return dec;
2303 } else {
2304 assert(1 != 1);
2305 }
2306 }
2307
2308 /* Compare two string objects via strcmp() or alike.
2309 * Note that the objects may be integer-encoded. In such a case we
2310 * use snprintf() to get a string representation of the numbers on the stack
2311 * and compare the strings, it's much faster than calling getDecodedObject(). */
2312 static int compareStringObjects(robj *a, robj *b) {
2313 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2314 char bufa[128], bufb[128], *astr, *bstr;
2315 int bothsds = 1;
2316
2317 if (a == b) return 0;
2318 if (a->encoding != REDIS_ENCODING_RAW) {
2319 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2320 astr = bufa;
2321 bothsds = 0;
2322 } else {
2323 astr = a->ptr;
2324 }
2325 if (b->encoding != REDIS_ENCODING_RAW) {
2326 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2327 bstr = bufb;
2328 bothsds = 0;
2329 } else {
2330 bstr = b->ptr;
2331 }
2332 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2333 }
2334
2335 static size_t stringObjectLen(robj *o) {
2336 assert(o->type == REDIS_STRING);
2337 if (o->encoding == REDIS_ENCODING_RAW) {
2338 return sdslen(o->ptr);
2339 } else {
2340 char buf[32];
2341
2342 return snprintf(buf,32,"%ld",(long)o->ptr);
2343 }
2344 }
2345
2346 /*============================ DB saving/loading ============================ */
2347
2348 static int rdbSaveType(FILE *fp, unsigned char type) {
2349 if (fwrite(&type,1,1,fp) == 0) return -1;
2350 return 0;
2351 }
2352
2353 static int rdbSaveTime(FILE *fp, time_t t) {
2354 int32_t t32 = (int32_t) t;
2355 if (fwrite(&t32,4,1,fp) == 0) return -1;
2356 return 0;
2357 }
2358
2359 /* check rdbLoadLen() comments for more info */
2360 static int rdbSaveLen(FILE *fp, uint32_t len) {
2361 unsigned char buf[2];
2362
2363 if (len < (1<<6)) {
2364 /* Save a 6 bit len */
2365 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2366 if (fwrite(buf,1,1,fp) == 0) return -1;
2367 } else if (len < (1<<14)) {
2368 /* Save a 14 bit len */
2369 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2370 buf[1] = len&0xFF;
2371 if (fwrite(buf,2,1,fp) == 0) return -1;
2372 } else {
2373 /* Save a 32 bit len */
2374 buf[0] = (REDIS_RDB_32BITLEN<<6);
2375 if (fwrite(buf,1,1,fp) == 0) return -1;
2376 len = htonl(len);
2377 if (fwrite(&len,4,1,fp) == 0) return -1;
2378 }
2379 return 0;
2380 }
2381
2382 /* String objects in the form "2391" "-100" without any space and with a
2383 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2384 * encoded as integers to save space */
2385 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2386 long long value;
2387 char *endptr, buf[32];
2388
2389 /* Check if it's possible to encode this value as a number */
2390 value = strtoll(s, &endptr, 10);
2391 if (endptr[0] != '\0') return 0;
2392 snprintf(buf,32,"%lld",value);
2393
2394 /* If the number converted back into a string is not identical
2395 * then it's not possible to encode the string as integer */
2396 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2397
2398 /* Finally check if it fits in our ranges */
2399 if (value >= -(1<<7) && value <= (1<<7)-1) {
2400 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2401 enc[1] = value&0xFF;
2402 return 2;
2403 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2404 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2405 enc[1] = value&0xFF;
2406 enc[2] = (value>>8)&0xFF;
2407 return 3;
2408 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2409 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2410 enc[1] = value&0xFF;
2411 enc[2] = (value>>8)&0xFF;
2412 enc[3] = (value>>16)&0xFF;
2413 enc[4] = (value>>24)&0xFF;
2414 return 5;
2415 } else {
2416 return 0;
2417 }
2418 }
2419
2420 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2421 unsigned int comprlen, outlen;
2422 unsigned char byte;
2423 void *out;
2424
2425 /* We require at least four bytes compression for this to be worth it */
2426 outlen = sdslen(obj->ptr)-4;
2427 if (outlen <= 0) return 0;
2428 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2429 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2430 if (comprlen == 0) {
2431 zfree(out);
2432 return 0;
2433 }
2434 /* Data compressed! Let's save it on disk */
2435 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2436 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2437 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2438 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2439 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2440 zfree(out);
2441 return comprlen;
2442
2443 writeerr:
2444 zfree(out);
2445 return -1;
2446 }
2447
2448 /* Save a string objet as [len][data] on disk. If the object is a string
2449 * representation of an integer value we try to safe it in a special form */
2450 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2451 size_t len;
2452 int enclen;
2453
2454 len = sdslen(obj->ptr);
2455
2456 /* Try integer encoding */
2457 if (len <= 11) {
2458 unsigned char buf[5];
2459 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2460 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2461 return 0;
2462 }
2463 }
2464
2465 /* Try LZF compression - under 20 bytes it's unable to compress even
2466 * aaaaaaaaaaaaaaaaaa so skip it */
2467 if (len > 20) {
2468 int retval;
2469
2470 retval = rdbSaveLzfStringObject(fp,obj);
2471 if (retval == -1) return -1;
2472 if (retval > 0) return 0;
2473 /* retval == 0 means data can't be compressed, save the old way */
2474 }
2475
2476 /* Store verbatim */
2477 if (rdbSaveLen(fp,len) == -1) return -1;
2478 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2479 return 0;
2480 }
2481
2482 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2483 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2484 int retval;
2485
2486 obj = getDecodedObject(obj);
2487 retval = rdbSaveStringObjectRaw(fp,obj);
2488 decrRefCount(obj);
2489 return retval;
2490 }
2491
2492 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2493 * 8 bit integer specifing the length of the representation.
2494 * This 8 bit integer has special values in order to specify the following
2495 * conditions:
2496 * 253: not a number
2497 * 254: + inf
2498 * 255: - inf
2499 */
2500 static int rdbSaveDoubleValue(FILE *fp, double val) {
2501 unsigned char buf[128];
2502 int len;
2503
2504 if (isnan(val)) {
2505 buf[0] = 253;
2506 len = 1;
2507 } else if (!isfinite(val)) {
2508 len = 1;
2509 buf[0] = (val < 0) ? 255 : 254;
2510 } else {
2511 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2512 buf[0] = strlen((char*)buf+1);
2513 len = buf[0]+1;
2514 }
2515 if (fwrite(buf,len,1,fp) == 0) return -1;
2516 return 0;
2517 }
2518
2519 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2520 static int rdbSave(char *filename) {
2521 dictIterator *di = NULL;
2522 dictEntry *de;
2523 FILE *fp;
2524 char tmpfile[256];
2525 int j;
2526 time_t now = time(NULL);
2527
2528 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2529 fp = fopen(tmpfile,"w");
2530 if (!fp) {
2531 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2532 return REDIS_ERR;
2533 }
2534 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2535 for (j = 0; j < server.dbnum; j++) {
2536 redisDb *db = server.db+j;
2537 dict *d = db->dict;
2538 if (dictSize(d) == 0) continue;
2539 di = dictGetIterator(d);
2540 if (!di) {
2541 fclose(fp);
2542 return REDIS_ERR;
2543 }
2544
2545 /* Write the SELECT DB opcode */
2546 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2547 if (rdbSaveLen(fp,j) == -1) goto werr;
2548
2549 /* Iterate this DB writing every entry */
2550 while((de = dictNext(di)) != NULL) {
2551 robj *key = dictGetEntryKey(de);
2552 robj *o = dictGetEntryVal(de);
2553 time_t expiretime = getExpire(db,key);
2554
2555 /* Save the expire time */
2556 if (expiretime != -1) {
2557 /* If this key is already expired skip it */
2558 if (expiretime < now) continue;
2559 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2560 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2561 }
2562 /* Save the key and associated value */
2563 if (rdbSaveType(fp,o->type) == -1) goto werr;
2564 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2565 if (o->type == REDIS_STRING) {
2566 /* Save a string value */
2567 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2568 } else if (o->type == REDIS_LIST) {
2569 /* Save a list value */
2570 list *list = o->ptr;
2571 listNode *ln;
2572
2573 listRewind(list);
2574 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2575 while((ln = listYield(list))) {
2576 robj *eleobj = listNodeValue(ln);
2577
2578 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2579 }
2580 } else if (o->type == REDIS_SET) {
2581 /* Save a set value */
2582 dict *set = o->ptr;
2583 dictIterator *di = dictGetIterator(set);
2584 dictEntry *de;
2585
2586 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2587 while((de = dictNext(di)) != NULL) {
2588 robj *eleobj = dictGetEntryKey(de);
2589
2590 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2591 }
2592 dictReleaseIterator(di);
2593 } else if (o->type == REDIS_ZSET) {
2594 /* Save a set value */
2595 zset *zs = o->ptr;
2596 dictIterator *di = dictGetIterator(zs->dict);
2597 dictEntry *de;
2598
2599 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2600 while((de = dictNext(di)) != NULL) {
2601 robj *eleobj = dictGetEntryKey(de);
2602 double *score = dictGetEntryVal(de);
2603
2604 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2605 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2606 }
2607 dictReleaseIterator(di);
2608 } else {
2609 assert(0 != 0);
2610 }
2611 }
2612 dictReleaseIterator(di);
2613 }
2614 /* EOF opcode */
2615 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2616
2617 /* Make sure data will not remain on the OS's output buffers */
2618 fflush(fp);
2619 fsync(fileno(fp));
2620 fclose(fp);
2621
2622 /* Use RENAME to make sure the DB file is changed atomically only
2623 * if the generate DB file is ok. */
2624 if (rename(tmpfile,filename) == -1) {
2625 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2626 unlink(tmpfile);
2627 return REDIS_ERR;
2628 }
2629 redisLog(REDIS_NOTICE,"DB saved on disk");
2630 server.dirty = 0;
2631 server.lastsave = time(NULL);
2632 return REDIS_OK;
2633
2634 werr:
2635 fclose(fp);
2636 unlink(tmpfile);
2637 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2638 if (di) dictReleaseIterator(di);
2639 return REDIS_ERR;
2640 }
2641
2642 static int rdbSaveBackground(char *filename) {
2643 pid_t childpid;
2644
2645 if (server.bgsavechildpid != -1) return REDIS_ERR;
2646 if ((childpid = fork()) == 0) {
2647 /* Child */
2648 close(server.fd);
2649 if (rdbSave(filename) == REDIS_OK) {
2650 exit(0);
2651 } else {
2652 exit(1);
2653 }
2654 } else {
2655 /* Parent */
2656 if (childpid == -1) {
2657 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2658 strerror(errno));
2659 return REDIS_ERR;
2660 }
2661 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2662 server.bgsavechildpid = childpid;
2663 return REDIS_OK;
2664 }
2665 return REDIS_OK; /* unreached */
2666 }
2667
2668 static void rdbRemoveTempFile(pid_t childpid) {
2669 char tmpfile[256];
2670
2671 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2672 unlink(tmpfile);
2673 }
2674
2675 static int rdbLoadType(FILE *fp) {
2676 unsigned char type;
2677 if (fread(&type,1,1,fp) == 0) return -1;
2678 return type;
2679 }
2680
2681 static time_t rdbLoadTime(FILE *fp) {
2682 int32_t t32;
2683 if (fread(&t32,4,1,fp) == 0) return -1;
2684 return (time_t) t32;
2685 }
2686
2687 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2688 * of this file for a description of how this are stored on disk.
2689 *
2690 * isencoded is set to 1 if the readed length is not actually a length but
2691 * an "encoding type", check the above comments for more info */
2692 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2693 unsigned char buf[2];
2694 uint32_t len;
2695
2696 if (isencoded) *isencoded = 0;
2697 if (rdbver == 0) {
2698 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2699 return ntohl(len);
2700 } else {
2701 int type;
2702
2703 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2704 type = (buf[0]&0xC0)>>6;
2705 if (type == REDIS_RDB_6BITLEN) {
2706 /* Read a 6 bit len */
2707 return buf[0]&0x3F;
2708 } else if (type == REDIS_RDB_ENCVAL) {
2709 /* Read a 6 bit len encoding type */
2710 if (isencoded) *isencoded = 1;
2711 return buf[0]&0x3F;
2712 } else if (type == REDIS_RDB_14BITLEN) {
2713 /* Read a 14 bit len */
2714 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2715 return ((buf[0]&0x3F)<<8)|buf[1];
2716 } else {
2717 /* Read a 32 bit len */
2718 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2719 return ntohl(len);
2720 }
2721 }
2722 }
2723
2724 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2725 unsigned char enc[4];
2726 long long val;
2727
2728 if (enctype == REDIS_RDB_ENC_INT8) {
2729 if (fread(enc,1,1,fp) == 0) return NULL;
2730 val = (signed char)enc[0];
2731 } else if (enctype == REDIS_RDB_ENC_INT16) {
2732 uint16_t v;
2733 if (fread(enc,2,1,fp) == 0) return NULL;
2734 v = enc[0]|(enc[1]<<8);
2735 val = (int16_t)v;
2736 } else if (enctype == REDIS_RDB_ENC_INT32) {
2737 uint32_t v;
2738 if (fread(enc,4,1,fp) == 0) return NULL;
2739 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2740 val = (int32_t)v;
2741 } else {
2742 val = 0; /* anti-warning */
2743 assert(0!=0);
2744 }
2745 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2746 }
2747
2748 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2749 unsigned int len, clen;
2750 unsigned char *c = NULL;
2751 sds val = NULL;
2752
2753 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2754 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2755 if ((c = zmalloc(clen)) == NULL) goto err;
2756 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2757 if (fread(c,clen,1,fp) == 0) goto err;
2758 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2759 zfree(c);
2760 return createObject(REDIS_STRING,val);
2761 err:
2762 zfree(c);
2763 sdsfree(val);
2764 return NULL;
2765 }
2766
2767 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2768 int isencoded;
2769 uint32_t len;
2770 sds val;
2771
2772 len = rdbLoadLen(fp,rdbver,&isencoded);
2773 if (isencoded) {
2774 switch(len) {
2775 case REDIS_RDB_ENC_INT8:
2776 case REDIS_RDB_ENC_INT16:
2777 case REDIS_RDB_ENC_INT32:
2778 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2779 case REDIS_RDB_ENC_LZF:
2780 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2781 default:
2782 assert(0!=0);
2783 }
2784 }
2785
2786 if (len == REDIS_RDB_LENERR) return NULL;
2787 val = sdsnewlen(NULL,len);
2788 if (len && fread(val,len,1,fp) == 0) {
2789 sdsfree(val);
2790 return NULL;
2791 }
2792 return tryObjectSharing(createObject(REDIS_STRING,val));
2793 }
2794
2795 /* For information about double serialization check rdbSaveDoubleValue() */
2796 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2797 char buf[128];
2798 unsigned char len;
2799
2800 if (fread(&len,1,1,fp) == 0) return -1;
2801 switch(len) {
2802 case 255: *val = R_NegInf; return 0;
2803 case 254: *val = R_PosInf; return 0;
2804 case 253: *val = R_Nan; return 0;
2805 default:
2806 if (fread(buf,len,1,fp) == 0) return -1;
2807 sscanf(buf, "%lg", val);
2808 return 0;
2809 }
2810 }
2811
2812 static int rdbLoad(char *filename) {
2813 FILE *fp;
2814 robj *keyobj = NULL;
2815 uint32_t dbid;
2816 int type, retval, rdbver;
2817 dict *d = server.db[0].dict;
2818 redisDb *db = server.db+0;
2819 char buf[1024];
2820 time_t expiretime = -1, now = time(NULL);
2821
2822 fp = fopen(filename,"r");
2823 if (!fp) return REDIS_ERR;
2824 if (fread(buf,9,1,fp) == 0) goto eoferr;
2825 buf[9] = '\0';
2826 if (memcmp(buf,"REDIS",5) != 0) {
2827 fclose(fp);
2828 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2829 return REDIS_ERR;
2830 }
2831 rdbver = atoi(buf+5);
2832 if (rdbver > 1) {
2833 fclose(fp);
2834 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2835 return REDIS_ERR;
2836 }
2837 while(1) {
2838 robj *o;
2839
2840 /* Read type. */
2841 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2842 if (type == REDIS_EXPIRETIME) {
2843 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2844 /* We read the time so we need to read the object type again */
2845 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2846 }
2847 if (type == REDIS_EOF) break;
2848 /* Handle SELECT DB opcode as a special case */
2849 if (type == REDIS_SELECTDB) {
2850 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2851 goto eoferr;
2852 if (dbid >= (unsigned)server.dbnum) {
2853 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2854 exit(1);
2855 }
2856 db = server.db+dbid;
2857 d = db->dict;
2858 continue;
2859 }
2860 /* Read key */
2861 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2862
2863 if (type == REDIS_STRING) {
2864 /* Read string value */
2865 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2866 tryObjectEncoding(o);
2867 } else if (type == REDIS_LIST || type == REDIS_SET) {
2868 /* Read list/set value */
2869 uint32_t listlen;
2870
2871 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2872 goto eoferr;
2873 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2874 /* Load every single element of the list/set */
2875 while(listlen--) {
2876 robj *ele;
2877
2878 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2879 tryObjectEncoding(ele);
2880 if (type == REDIS_LIST) {
2881 listAddNodeTail((list*)o->ptr,ele);
2882 } else {
2883 dictAdd((dict*)o->ptr,ele,NULL);
2884 }
2885 }
2886 } else if (type == REDIS_ZSET) {
2887 /* Read list/set value */
2888 uint32_t zsetlen;
2889 zset *zs;
2890
2891 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2892 goto eoferr;
2893 o = createZsetObject();
2894 zs = o->ptr;
2895 /* Load every single element of the list/set */
2896 while(zsetlen--) {
2897 robj *ele;
2898 double *score = zmalloc(sizeof(double));
2899
2900 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2901 tryObjectEncoding(ele);
2902 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2903 dictAdd(zs->dict,ele,score);
2904 zslInsert(zs->zsl,*score,ele);
2905 incrRefCount(ele); /* added to skiplist */
2906 }
2907 } else {
2908 assert(0 != 0);
2909 }
2910 /* Add the new object in the hash table */
2911 retval = dictAdd(d,keyobj,o);
2912 if (retval == DICT_ERR) {
2913 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2914 exit(1);
2915 }
2916 /* Set the expire time if needed */
2917 if (expiretime != -1) {
2918 setExpire(db,keyobj,expiretime);
2919 /* Delete this key if already expired */
2920 if (expiretime < now) deleteKey(db,keyobj);
2921 expiretime = -1;
2922 }
2923 keyobj = o = NULL;
2924 }
2925 fclose(fp);
2926 return REDIS_OK;
2927
2928 eoferr: /* unexpected end of file is handled here with a fatal exit */
2929 if (keyobj) decrRefCount(keyobj);
2930 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2931 exit(1);
2932 return REDIS_ERR; /* Just to avoid warning */
2933 }
2934
2935 /*================================== Commands =============================== */
2936
2937 static void authCommand(redisClient *c) {
2938 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2939 c->authenticated = 1;
2940 addReply(c,shared.ok);
2941 } else {
2942 c->authenticated = 0;
2943 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2944 }
2945 }
2946
2947 static void pingCommand(redisClient *c) {
2948 addReply(c,shared.pong);
2949 }
2950
2951 static void echoCommand(redisClient *c) {
2952 addReplyBulkLen(c,c->argv[1]);
2953 addReply(c,c->argv[1]);
2954 addReply(c,shared.crlf);
2955 }
2956
2957 /*=================================== Strings =============================== */
2958
2959 static void setGenericCommand(redisClient *c, int nx) {
2960 int retval;
2961
2962 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2963 if (retval == DICT_ERR) {
2964 if (!nx) {
2965 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2966 incrRefCount(c->argv[2]);
2967 } else {
2968 addReply(c,shared.czero);
2969 return;
2970 }
2971 } else {
2972 incrRefCount(c->argv[1]);
2973 incrRefCount(c->argv[2]);
2974 }
2975 server.dirty++;
2976 removeExpire(c->db,c->argv[1]);
2977 addReply(c, nx ? shared.cone : shared.ok);
2978 }
2979
2980 static void setCommand(redisClient *c) {
2981 setGenericCommand(c,0);
2982 }
2983
2984 static void setnxCommand(redisClient *c) {
2985 setGenericCommand(c,1);
2986 }
2987
2988 static void getCommand(redisClient *c) {
2989 robj *o = lookupKeyRead(c->db,c->argv[1]);
2990
2991 if (o == NULL) {
2992 addReply(c,shared.nullbulk);
2993 } else {
2994 if (o->type != REDIS_STRING) {
2995 addReply(c,shared.wrongtypeerr);
2996 } else {
2997 addReplyBulkLen(c,o);
2998 addReply(c,o);
2999 addReply(c,shared.crlf);
3000 }
3001 }
3002 }
3003
3004 static void getsetCommand(redisClient *c) {
3005 getCommand(c);
3006 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3007 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3008 } else {
3009 incrRefCount(c->argv[1]);
3010 }
3011 incrRefCount(c->argv[2]);
3012 server.dirty++;
3013 removeExpire(c->db,c->argv[1]);
3014 }
3015
3016 static void mgetCommand(redisClient *c) {
3017 int j;
3018
3019 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3020 for (j = 1; j < c->argc; j++) {
3021 robj *o = lookupKeyRead(c->db,c->argv[j]);
3022 if (o == NULL) {
3023 addReply(c,shared.nullbulk);
3024 } else {
3025 if (o->type != REDIS_STRING) {
3026 addReply(c,shared.nullbulk);
3027 } else {
3028 addReplyBulkLen(c,o);
3029 addReply(c,o);
3030 addReply(c,shared.crlf);
3031 }
3032 }
3033 }
3034 }
3035
3036 static void msetGenericCommand(redisClient *c, int nx) {
3037 int j;
3038
3039 if ((c->argc % 2) == 0) {
3040 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3041 return;
3042 }
3043 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3044 * set nothing at all if at least one already key exists. */
3045 if (nx) {
3046 for (j = 1; j < c->argc; j += 2) {
3047 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3048 addReply(c, shared.czero);
3049 return;
3050 }
3051 }
3052 }
3053
3054 for (j = 1; j < c->argc; j += 2) {
3055 int retval;
3056
3057 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3058 if (retval == DICT_ERR) {
3059 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3060 incrRefCount(c->argv[j+1]);
3061 } else {
3062 incrRefCount(c->argv[j]);
3063 incrRefCount(c->argv[j+1]);
3064 }
3065 removeExpire(c->db,c->argv[j]);
3066 }
3067 server.dirty += (c->argc-1)/2;
3068 addReply(c, nx ? shared.cone : shared.ok);
3069 }
3070
3071 static void msetCommand(redisClient *c) {
3072 msetGenericCommand(c,0);
3073 }
3074
3075 static void msetnxCommand(redisClient *c) {
3076 msetGenericCommand(c,1);
3077 }
3078
3079 static void incrDecrCommand(redisClient *c, long long incr) {
3080 long long value;
3081 int retval;
3082 robj *o;
3083
3084 o = lookupKeyWrite(c->db,c->argv[1]);
3085 if (o == NULL) {
3086 value = 0;
3087 } else {
3088 if (o->type != REDIS_STRING) {
3089 value = 0;
3090 } else {
3091 char *eptr;
3092
3093 if (o->encoding == REDIS_ENCODING_RAW)
3094 value = strtoll(o->ptr, &eptr, 10);
3095 else if (o->encoding == REDIS_ENCODING_INT)
3096 value = (long)o->ptr;
3097 else
3098 assert(1 != 1);
3099 }
3100 }
3101
3102 value += incr;
3103 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3104 tryObjectEncoding(o);
3105 retval = dictAdd(c->db->dict,c->argv[1],o);
3106 if (retval == DICT_ERR) {
3107 dictReplace(c->db->dict,c->argv[1],o);
3108 removeExpire(c->db,c->argv[1]);
3109 } else {
3110 incrRefCount(c->argv[1]);
3111 }
3112 server.dirty++;
3113 addReply(c,shared.colon);
3114 addReply(c,o);
3115 addReply(c,shared.crlf);
3116 }
3117
3118 static void incrCommand(redisClient *c) {
3119 incrDecrCommand(c,1);
3120 }
3121
3122 static void decrCommand(redisClient *c) {
3123 incrDecrCommand(c,-1);
3124 }
3125
3126 static void incrbyCommand(redisClient *c) {
3127 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3128 incrDecrCommand(c,incr);
3129 }
3130
3131 static void decrbyCommand(redisClient *c) {
3132 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3133 incrDecrCommand(c,-incr);
3134 }
3135
3136 /* ========================= Type agnostic commands ========================= */
3137
3138 static void delCommand(redisClient *c) {
3139 int deleted = 0, j;
3140
3141 for (j = 1; j < c->argc; j++) {
3142 if (deleteKey(c->db,c->argv[j])) {
3143 server.dirty++;
3144 deleted++;
3145 }
3146 }
3147 switch(deleted) {
3148 case 0:
3149 addReply(c,shared.czero);
3150 break;
3151 case 1:
3152 addReply(c,shared.cone);
3153 break;
3154 default:
3155 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3156 break;
3157 }
3158 }
3159
3160 static void existsCommand(redisClient *c) {
3161 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3162 }
3163
3164 static void selectCommand(redisClient *c) {
3165 int id = atoi(c->argv[1]->ptr);
3166
3167 if (selectDb(c,id) == REDIS_ERR) {
3168 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3169 } else {
3170 addReply(c,shared.ok);
3171 }
3172 }
3173
3174 static void randomkeyCommand(redisClient *c) {
3175 dictEntry *de;
3176
3177 while(1) {
3178 de = dictGetRandomKey(c->db->dict);
3179 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3180 }
3181 if (de == NULL) {
3182 addReply(c,shared.plus);
3183 addReply(c,shared.crlf);
3184 } else {
3185 addReply(c,shared.plus);
3186 addReply(c,dictGetEntryKey(de));
3187 addReply(c,shared.crlf);
3188 }
3189 }
3190
3191 static void keysCommand(redisClient *c) {
3192 dictIterator *di;
3193 dictEntry *de;
3194 sds pattern = c->argv[1]->ptr;
3195 int plen = sdslen(pattern);
3196 int numkeys = 0, keyslen = 0;
3197 robj *lenobj = createObject(REDIS_STRING,NULL);
3198
3199 di = dictGetIterator(c->db->dict);
3200 addReply(c,lenobj);
3201 decrRefCount(lenobj);
3202 while((de = dictNext(di)) != NULL) {
3203 robj *keyobj = dictGetEntryKey(de);
3204
3205 sds key = keyobj->ptr;
3206 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3207 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3208 if (expireIfNeeded(c->db,keyobj) == 0) {
3209 if (numkeys != 0)
3210 addReply(c,shared.space);
3211 addReply(c,keyobj);
3212 numkeys++;
3213 keyslen += sdslen(key);
3214 }
3215 }
3216 }
3217 dictReleaseIterator(di);
3218 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3219 addReply(c,shared.crlf);
3220 }
3221
3222 static void dbsizeCommand(redisClient *c) {
3223 addReplySds(c,
3224 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3225 }
3226
3227 static void lastsaveCommand(redisClient *c) {
3228 addReplySds(c,
3229 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3230 }
3231
3232 static void typeCommand(redisClient *c) {
3233 robj *o;
3234 char *type;
3235
3236 o = lookupKeyRead(c->db,c->argv[1]);
3237 if (o == NULL) {
3238 type = "+none";
3239 } else {
3240 switch(o->type) {
3241 case REDIS_STRING: type = "+string"; break;
3242 case REDIS_LIST: type = "+list"; break;
3243 case REDIS_SET: type = "+set"; break;
3244 case REDIS_ZSET: type = "+zset"; break;
3245 default: type = "unknown"; break;
3246 }
3247 }
3248 addReplySds(c,sdsnew(type));
3249 addReply(c,shared.crlf);
3250 }
3251
3252 static void saveCommand(redisClient *c) {
3253 if (server.bgsavechildpid != -1) {
3254 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3255 return;
3256 }
3257 if (rdbSave(server.dbfilename) == REDIS_OK) {
3258 addReply(c,shared.ok);
3259 } else {
3260 addReply(c,shared.err);
3261 }
3262 }
3263
3264 static void bgsaveCommand(redisClient *c) {
3265 if (server.bgsavechildpid != -1) {
3266 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3267 return;
3268 }
3269 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3270 addReply(c,shared.ok);
3271 } else {
3272 addReply(c,shared.err);
3273 }
3274 }
3275
3276 static void shutdownCommand(redisClient *c) {
3277 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3278 /* Kill the saving child if there is a background saving in progress.
3279 We want to avoid race conditions, for instance our saving child may
3280 overwrite the synchronous saving did by SHUTDOWN. */
3281 if (server.bgsavechildpid != -1) {
3282 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3283 kill(server.bgsavechildpid,SIGKILL);
3284 rdbRemoveTempFile(server.bgsavechildpid);
3285 }
3286 /* SYNC SAVE */
3287 if (rdbSave(server.dbfilename) == REDIS_OK) {
3288 if (server.daemonize)
3289 unlink(server.pidfile);
3290 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3291 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3292 exit(1);
3293 } else {
3294 /* Ooops.. error saving! The best we can do is to continue operating.
3295 * Note that if there was a background saving process, in the next
3296 * cron() Redis will be notified that the background saving aborted,
3297 * handling special stuff like slaves pending for synchronization... */
3298 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3299 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3300 }
3301 }
3302
3303 static void renameGenericCommand(redisClient *c, int nx) {
3304 robj *o;
3305
3306 /* To use the same key as src and dst is probably an error */
3307 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3308 addReply(c,shared.sameobjecterr);
3309 return;
3310 }
3311
3312 o = lookupKeyWrite(c->db,c->argv[1]);
3313 if (o == NULL) {
3314 addReply(c,shared.nokeyerr);
3315 return;
3316 }
3317 incrRefCount(o);
3318 deleteIfVolatile(c->db,c->argv[2]);
3319 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3320 if (nx) {
3321 decrRefCount(o);
3322 addReply(c,shared.czero);
3323 return;
3324 }
3325 dictReplace(c->db->dict,c->argv[2],o);
3326 } else {
3327 incrRefCount(c->argv[2]);
3328 }
3329 deleteKey(c->db,c->argv[1]);
3330 server.dirty++;
3331 addReply(c,nx ? shared.cone : shared.ok);
3332 }
3333
3334 static void renameCommand(redisClient *c) {
3335 renameGenericCommand(c,0);
3336 }
3337
3338 static void renamenxCommand(redisClient *c) {
3339 renameGenericCommand(c,1);
3340 }
3341
3342 static void moveCommand(redisClient *c) {
3343 robj *o;
3344 redisDb *src, *dst;
3345 int srcid;
3346
3347 /* Obtain source and target DB pointers */
3348 src = c->db;
3349 srcid = c->db->id;
3350 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3351 addReply(c,shared.outofrangeerr);
3352 return;
3353 }
3354 dst = c->db;
3355 selectDb(c,srcid); /* Back to the source DB */
3356
3357 /* If the user is moving using as target the same
3358 * DB as the source DB it is probably an error. */
3359 if (src == dst) {
3360 addReply(c,shared.sameobjecterr);
3361 return;
3362 }
3363
3364 /* Check if the element exists and get a reference */
3365 o = lookupKeyWrite(c->db,c->argv[1]);
3366 if (!o) {
3367 addReply(c,shared.czero);
3368 return;
3369 }
3370
3371 /* Try to add the element to the target DB */
3372 deleteIfVolatile(dst,c->argv[1]);
3373 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3374 addReply(c,shared.czero);
3375 return;
3376 }
3377 incrRefCount(c->argv[1]);
3378 incrRefCount(o);
3379
3380 /* OK! key moved, free the entry in the source DB */
3381 deleteKey(src,c->argv[1]);
3382 server.dirty++;
3383 addReply(c,shared.cone);
3384 }
3385
3386 /* =================================== Lists ================================ */
3387 static void pushGenericCommand(redisClient *c, int where) {
3388 robj *lobj;
3389 list *list;
3390
3391 lobj = lookupKeyWrite(c->db,c->argv[1]);
3392 if (lobj == NULL) {
3393 lobj = createListObject();
3394 list = lobj->ptr;
3395 if (where == REDIS_HEAD) {
3396 listAddNodeHead(list,c->argv[2]);
3397 } else {
3398 listAddNodeTail(list,c->argv[2]);
3399 }
3400 dictAdd(c->db->dict,c->argv[1],lobj);
3401 incrRefCount(c->argv[1]);
3402 incrRefCount(c->argv[2]);
3403 } else {
3404 if (lobj->type != REDIS_LIST) {
3405 addReply(c,shared.wrongtypeerr);
3406 return;
3407 }
3408 list = lobj->ptr;
3409 if (where == REDIS_HEAD) {
3410 listAddNodeHead(list,c->argv[2]);
3411 } else {
3412 listAddNodeTail(list,c->argv[2]);
3413 }
3414 incrRefCount(c->argv[2]);
3415 }
3416 server.dirty++;
3417 addReply(c,shared.ok);
3418 }
3419
3420 static void lpushCommand(redisClient *c) {
3421 pushGenericCommand(c,REDIS_HEAD);
3422 }
3423
3424 static void rpushCommand(redisClient *c) {
3425 pushGenericCommand(c,REDIS_TAIL);
3426 }
3427
3428 static void llenCommand(redisClient *c) {
3429 robj *o;
3430 list *l;
3431
3432 o = lookupKeyRead(c->db,c->argv[1]);
3433 if (o == NULL) {
3434 addReply(c,shared.czero);
3435 return;
3436 } else {
3437 if (o->type != REDIS_LIST) {
3438 addReply(c,shared.wrongtypeerr);
3439 } else {
3440 l = o->ptr;
3441 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3442 }
3443 }
3444 }
3445
3446 static void lindexCommand(redisClient *c) {
3447 robj *o;
3448 int index = atoi(c->argv[2]->ptr);
3449
3450 o = lookupKeyRead(c->db,c->argv[1]);
3451 if (o == NULL) {
3452 addReply(c,shared.nullbulk);
3453 } else {
3454 if (o->type != REDIS_LIST) {
3455 addReply(c,shared.wrongtypeerr);
3456 } else {
3457 list *list = o->ptr;
3458 listNode *ln;
3459
3460 ln = listIndex(list, index);
3461 if (ln == NULL) {
3462 addReply(c,shared.nullbulk);
3463 } else {
3464 robj *ele = listNodeValue(ln);
3465 addReplyBulkLen(c,ele);
3466 addReply(c,ele);
3467 addReply(c,shared.crlf);
3468 }
3469 }
3470 }
3471 }
3472
3473 static void lsetCommand(redisClient *c) {
3474 robj *o;
3475 int index = atoi(c->argv[2]->ptr);
3476
3477 o = lookupKeyWrite(c->db,c->argv[1]);
3478 if (o == NULL) {
3479 addReply(c,shared.nokeyerr);
3480 } else {
3481 if (o->type != REDIS_LIST) {
3482 addReply(c,shared.wrongtypeerr);
3483 } else {
3484 list *list = o->ptr;
3485 listNode *ln;
3486
3487 ln = listIndex(list, index);
3488 if (ln == NULL) {
3489 addReply(c,shared.outofrangeerr);
3490 } else {
3491 robj *ele = listNodeValue(ln);
3492
3493 decrRefCount(ele);
3494 listNodeValue(ln) = c->argv[3];
3495 incrRefCount(c->argv[3]);
3496 addReply(c,shared.ok);
3497 server.dirty++;
3498 }
3499 }
3500 }
3501 }
3502
3503 static void popGenericCommand(redisClient *c, int where) {
3504 robj *o;
3505
3506 o = lookupKeyWrite(c->db,c->argv[1]);
3507 if (o == NULL) {
3508 addReply(c,shared.nullbulk);
3509 } else {
3510 if (o->type != REDIS_LIST) {
3511 addReply(c,shared.wrongtypeerr);
3512 } else {
3513 list *list = o->ptr;
3514 listNode *ln;
3515
3516 if (where == REDIS_HEAD)
3517 ln = listFirst(list);
3518 else
3519 ln = listLast(list);
3520
3521 if (ln == NULL) {
3522 addReply(c,shared.nullbulk);
3523 } else {
3524 robj *ele = listNodeValue(ln);
3525 addReplyBulkLen(c,ele);
3526 addReply(c,ele);
3527 addReply(c,shared.crlf);
3528 listDelNode(list,ln);
3529 server.dirty++;
3530 }
3531 }
3532 }
3533 }
3534
3535 static void lpopCommand(redisClient *c) {
3536 popGenericCommand(c,REDIS_HEAD);
3537 }
3538
3539 static void rpopCommand(redisClient *c) {
3540 popGenericCommand(c,REDIS_TAIL);
3541 }
3542
3543 static void lrangeCommand(redisClient *c) {
3544 robj *o;
3545 int start = atoi(c->argv[2]->ptr);
3546 int end = atoi(c->argv[3]->ptr);
3547
3548 o = lookupKeyRead(c->db,c->argv[1]);
3549 if (o == NULL) {
3550 addReply(c,shared.nullmultibulk);
3551 } else {
3552 if (o->type != REDIS_LIST) {
3553 addReply(c,shared.wrongtypeerr);
3554 } else {
3555 list *list = o->ptr;
3556 listNode *ln;
3557 int llen = listLength(list);
3558 int rangelen, j;
3559 robj *ele;
3560
3561 /* convert negative indexes */
3562 if (start < 0) start = llen+start;
3563 if (end < 0) end = llen+end;
3564 if (start < 0) start = 0;
3565 if (end < 0) end = 0;
3566
3567 /* indexes sanity checks */
3568 if (start > end || start >= llen) {
3569 /* Out of range start or start > end result in empty list */
3570 addReply(c,shared.emptymultibulk);
3571 return;
3572 }
3573 if (end >= llen) end = llen-1;
3574 rangelen = (end-start)+1;
3575
3576 /* Return the result in form of a multi-bulk reply */
3577 ln = listIndex(list, start);
3578 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3579 for (j = 0; j < rangelen; j++) {
3580 ele = listNodeValue(ln);
3581 addReplyBulkLen(c,ele);
3582 addReply(c,ele);
3583 addReply(c,shared.crlf);
3584 ln = ln->next;
3585 }
3586 }
3587 }
3588 }
3589
3590 static void ltrimCommand(redisClient *c) {
3591 robj *o;
3592 int start = atoi(c->argv[2]->ptr);
3593 int end = atoi(c->argv[3]->ptr);
3594
3595 o = lookupKeyWrite(c->db,c->argv[1]);
3596 if (o == NULL) {
3597 addReply(c,shared.nokeyerr);
3598 } else {
3599 if (o->type != REDIS_LIST) {
3600 addReply(c,shared.wrongtypeerr);
3601 } else {
3602 list *list = o->ptr;
3603 listNode *ln;
3604 int llen = listLength(list);
3605 int j, ltrim, rtrim;
3606
3607 /* convert negative indexes */
3608 if (start < 0) start = llen+start;
3609 if (end < 0) end = llen+end;
3610 if (start < 0) start = 0;
3611 if (end < 0) end = 0;
3612
3613 /* indexes sanity checks */
3614 if (start > end || start >= llen) {
3615 /* Out of range start or start > end result in empty list */
3616 ltrim = llen;
3617 rtrim = 0;
3618 } else {
3619 if (end >= llen) end = llen-1;
3620 ltrim = start;
3621 rtrim = llen-end-1;
3622 }
3623
3624 /* Remove list elements to perform the trim */
3625 for (j = 0; j < ltrim; j++) {
3626 ln = listFirst(list);
3627 listDelNode(list,ln);
3628 }
3629 for (j = 0; j < rtrim; j++) {
3630 ln = listLast(list);
3631 listDelNode(list,ln);
3632 }
3633 server.dirty++;
3634 addReply(c,shared.ok);
3635 }
3636 }
3637 }
3638
3639 static void lremCommand(redisClient *c) {
3640 robj *o;
3641
3642 o = lookupKeyWrite(c->db,c->argv[1]);
3643 if (o == NULL) {
3644 addReply(c,shared.czero);
3645 } else {
3646 if (o->type != REDIS_LIST) {
3647 addReply(c,shared.wrongtypeerr);
3648 } else {
3649 list *list = o->ptr;
3650 listNode *ln, *next;
3651 int toremove = atoi(c->argv[2]->ptr);
3652 int removed = 0;
3653 int fromtail = 0;
3654
3655 if (toremove < 0) {
3656 toremove = -toremove;
3657 fromtail = 1;
3658 }
3659 ln = fromtail ? list->tail : list->head;
3660 while (ln) {
3661 robj *ele = listNodeValue(ln);
3662
3663 next = fromtail ? ln->prev : ln->next;
3664 if (compareStringObjects(ele,c->argv[3]) == 0) {
3665 listDelNode(list,ln);
3666 server.dirty++;
3667 removed++;
3668 if (toremove && removed == toremove) break;
3669 }
3670 ln = next;
3671 }
3672 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3673 }
3674 }
3675 }
3676
3677 /* This is the semantic of this command:
3678 * RPOPLPUSH srclist dstlist:
3679 * IF LLEN(srclist) > 0
3680 * element = RPOP srclist
3681 * LPUSH dstlist element
3682 * RETURN element
3683 * ELSE
3684 * RETURN nil
3685 * END
3686 * END
3687 *
3688 * The idea is to be able to get an element from a list in a reliable way
3689 * since the element is not just returned but pushed against another list
3690 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3691 */
3692 static void rpoplpushcommand(redisClient *c) {
3693 robj *sobj;
3694
3695 sobj = lookupKeyWrite(c->db,c->argv[1]);
3696 if (sobj == NULL) {
3697 addReply(c,shared.nullbulk);
3698 } else {
3699 if (sobj->type != REDIS_LIST) {
3700 addReply(c,shared.wrongtypeerr);
3701 } else {
3702 list *srclist = sobj->ptr;
3703 listNode *ln = listLast(srclist);
3704
3705 if (ln == NULL) {
3706 addReply(c,shared.nullbulk);
3707 } else {
3708 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3709 robj *ele = listNodeValue(ln);
3710 list *dstlist;
3711
3712 if (dobj == NULL) {
3713
3714 /* Create the list if the key does not exist */
3715 dobj = createListObject();
3716 dictAdd(c->db->dict,c->argv[2],dobj);
3717 incrRefCount(c->argv[2]);
3718 } else if (dobj->type != REDIS_LIST) {
3719 addReply(c,shared.wrongtypeerr);
3720 return;
3721 }
3722 /* Add the element to the target list */
3723 dstlist = dobj->ptr;
3724 listAddNodeHead(dstlist,ele);
3725 incrRefCount(ele);
3726
3727 /* Send the element to the client as reply as well */
3728 addReplyBulkLen(c,ele);
3729 addReply(c,ele);
3730 addReply(c,shared.crlf);
3731
3732 /* Finally remove the element from the source list */
3733 listDelNode(srclist,ln);
3734 server.dirty++;
3735 }
3736 }
3737 }
3738 }
3739
3740
3741 /* ==================================== Sets ================================ */
3742
3743 static void saddCommand(redisClient *c) {
3744 robj *set;
3745
3746 set = lookupKeyWrite(c->db,c->argv[1]);
3747 if (set == NULL) {
3748 set = createSetObject();
3749 dictAdd(c->db->dict,c->argv[1],set);
3750 incrRefCount(c->argv[1]);
3751 } else {
3752 if (set->type != REDIS_SET) {
3753 addReply(c,shared.wrongtypeerr);
3754 return;
3755 }
3756 }
3757 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3758 incrRefCount(c->argv[2]);
3759 server.dirty++;
3760 addReply(c,shared.cone);
3761 } else {
3762 addReply(c,shared.czero);
3763 }
3764 }
3765
3766 static void sremCommand(redisClient *c) {
3767 robj *set;
3768
3769 set = lookupKeyWrite(c->db,c->argv[1]);
3770 if (set == NULL) {
3771 addReply(c,shared.czero);
3772 } else {
3773 if (set->type != REDIS_SET) {
3774 addReply(c,shared.wrongtypeerr);
3775 return;
3776 }
3777 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3778 server.dirty++;
3779 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3780 addReply(c,shared.cone);
3781 } else {
3782 addReply(c,shared.czero);
3783 }
3784 }
3785 }
3786
3787 static void smoveCommand(redisClient *c) {
3788 robj *srcset, *dstset;
3789
3790 srcset = lookupKeyWrite(c->db,c->argv[1]);
3791 dstset = lookupKeyWrite(c->db,c->argv[2]);
3792
3793 /* If the source key does not exist return 0, if it's of the wrong type
3794 * raise an error */
3795 if (srcset == NULL || srcset->type != REDIS_SET) {
3796 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3797 return;
3798 }
3799 /* Error if the destination key is not a set as well */
3800 if (dstset && dstset->type != REDIS_SET) {
3801 addReply(c,shared.wrongtypeerr);
3802 return;
3803 }
3804 /* Remove the element from the source set */
3805 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3806 /* Key not found in the src set! return zero */
3807 addReply(c,shared.czero);
3808 return;
3809 }
3810 server.dirty++;
3811 /* Add the element to the destination set */
3812 if (!dstset) {
3813 dstset = createSetObject();
3814 dictAdd(c->db->dict,c->argv[2],dstset);
3815 incrRefCount(c->argv[2]);
3816 }
3817 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3818 incrRefCount(c->argv[3]);
3819 addReply(c,shared.cone);
3820 }
3821
3822 static void sismemberCommand(redisClient *c) {
3823 robj *set;
3824
3825 set = lookupKeyRead(c->db,c->argv[1]);
3826 if (set == NULL) {
3827 addReply(c,shared.czero);
3828 } else {
3829 if (set->type != REDIS_SET) {
3830 addReply(c,shared.wrongtypeerr);
3831 return;
3832 }
3833 if (dictFind(set->ptr,c->argv[2]))
3834 addReply(c,shared.cone);
3835 else
3836 addReply(c,shared.czero);
3837 }
3838 }
3839
3840 static void scardCommand(redisClient *c) {
3841 robj *o;
3842 dict *s;
3843
3844 o = lookupKeyRead(c->db,c->argv[1]);
3845 if (o == NULL) {
3846 addReply(c,shared.czero);
3847 return;
3848 } else {
3849 if (o->type != REDIS_SET) {
3850 addReply(c,shared.wrongtypeerr);
3851 } else {
3852 s = o->ptr;
3853 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3854 dictSize(s)));
3855 }
3856 }
3857 }
3858
3859 static void spopCommand(redisClient *c) {
3860 robj *set;
3861 dictEntry *de;
3862
3863 set = lookupKeyWrite(c->db,c->argv[1]);
3864 if (set == NULL) {
3865 addReply(c,shared.nullbulk);
3866 } else {
3867 if (set->type != REDIS_SET) {
3868 addReply(c,shared.wrongtypeerr);
3869 return;
3870 }
3871 de = dictGetRandomKey(set->ptr);
3872 if (de == NULL) {
3873 addReply(c,shared.nullbulk);
3874 } else {
3875 robj *ele = dictGetEntryKey(de);
3876
3877 addReplyBulkLen(c,ele);
3878 addReply(c,ele);
3879 addReply(c,shared.crlf);
3880 dictDelete(set->ptr,ele);
3881 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3882 server.dirty++;
3883 }
3884 }
3885 }
3886
3887 static void srandmemberCommand(redisClient *c) {
3888 robj *set;
3889 dictEntry *de;
3890
3891 set = lookupKeyRead(c->db,c->argv[1]);
3892 if (set == NULL) {
3893 addReply(c,shared.nullbulk);
3894 } else {
3895 if (set->type != REDIS_SET) {
3896 addReply(c,shared.wrongtypeerr);
3897 return;
3898 }
3899 de = dictGetRandomKey(set->ptr);
3900 if (de == NULL) {
3901 addReply(c,shared.nullbulk);
3902 } else {
3903 robj *ele = dictGetEntryKey(de);
3904
3905 addReplyBulkLen(c,ele);
3906 addReply(c,ele);
3907 addReply(c,shared.crlf);
3908 }
3909 }
3910 }
3911
3912 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3913 dict **d1 = (void*) s1, **d2 = (void*) s2;
3914
3915 return dictSize(*d1)-dictSize(*d2);
3916 }
3917
3918 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3919 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3920 dictIterator *di;
3921 dictEntry *de;
3922 robj *lenobj = NULL, *dstset = NULL;
3923 int j, cardinality = 0;
3924
3925 for (j = 0; j < setsnum; j++) {
3926 robj *setobj;
3927
3928 setobj = dstkey ?
3929 lookupKeyWrite(c->db,setskeys[j]) :
3930 lookupKeyRead(c->db,setskeys[j]);
3931 if (!setobj) {
3932 zfree(dv);
3933 if (dstkey) {
3934 deleteKey(c->db,dstkey);
3935 addReply(c,shared.ok);
3936 } else {
3937 addReply(c,shared.nullmultibulk);
3938 }
3939 return;
3940 }
3941 if (setobj->type != REDIS_SET) {
3942 zfree(dv);
3943 addReply(c,shared.wrongtypeerr);
3944 return;
3945 }
3946 dv[j] = setobj->ptr;
3947 }
3948 /* Sort sets from the smallest to largest, this will improve our
3949 * algorithm's performace */
3950 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3951
3952 /* The first thing we should output is the total number of elements...
3953 * since this is a multi-bulk write, but at this stage we don't know
3954 * the intersection set size, so we use a trick, append an empty object
3955 * to the output list and save the pointer to later modify it with the
3956 * right length */
3957 if (!dstkey) {
3958 lenobj = createObject(REDIS_STRING,NULL);
3959 addReply(c,lenobj);
3960 decrRefCount(lenobj);
3961 } else {
3962 /* If we have a target key where to store the resulting set
3963 * create this key with an empty set inside */
3964 dstset = createSetObject();
3965 }
3966
3967 /* Iterate all the elements of the first (smallest) set, and test
3968 * the element against all the other sets, if at least one set does
3969 * not include the element it is discarded */
3970 di = dictGetIterator(dv[0]);
3971
3972 while((de = dictNext(di)) != NULL) {
3973 robj *ele;
3974
3975 for (j = 1; j < setsnum; j++)
3976 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3977 if (j != setsnum)
3978 continue; /* at least one set does not contain the member */
3979 ele = dictGetEntryKey(de);
3980 if (!dstkey) {
3981 addReplyBulkLen(c,ele);
3982 addReply(c,ele);
3983 addReply(c,shared.crlf);
3984 cardinality++;
3985 } else {
3986 dictAdd(dstset->ptr,ele,NULL);
3987 incrRefCount(ele);
3988 }
3989 }
3990 dictReleaseIterator(di);
3991
3992 if (dstkey) {
3993 /* Store the resulting set into the target */
3994 deleteKey(c->db,dstkey);
3995 dictAdd(c->db->dict,dstkey,dstset);
3996 incrRefCount(dstkey);
3997 }
3998
3999 if (!dstkey) {
4000 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
4001 } else {
4002 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4003 dictSize((dict*)dstset->ptr)));
4004 server.dirty++;
4005 }
4006 zfree(dv);
4007 }
4008
4009 static void sinterCommand(redisClient *c) {
4010 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4011 }
4012
4013 static void sinterstoreCommand(redisClient *c) {
4014 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4015 }
4016
4017 #define REDIS_OP_UNION 0
4018 #define REDIS_OP_DIFF 1
4019
4020 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4021 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4022 dictIterator *di;
4023 dictEntry *de;
4024 robj *dstset = NULL;
4025 int j, cardinality = 0;
4026
4027 for (j = 0; j < setsnum; j++) {
4028 robj *setobj;
4029
4030 setobj = dstkey ?
4031 lookupKeyWrite(c->db,setskeys[j]) :
4032 lookupKeyRead(c->db,setskeys[j]);
4033 if (!setobj) {
4034 dv[j] = NULL;
4035 continue;
4036 }
4037 if (setobj->type != REDIS_SET) {
4038 zfree(dv);
4039 addReply(c,shared.wrongtypeerr);
4040 return;
4041 }
4042 dv[j] = setobj->ptr;
4043 }
4044
4045 /* We need a temp set object to store our union. If the dstkey
4046 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4047 * this set object will be the resulting object to set into the target key*/
4048 dstset = createSetObject();
4049
4050 /* Iterate all the elements of all the sets, add every element a single
4051 * time to the result set */
4052 for (j = 0; j < setsnum; j++) {
4053 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4054 if (!dv[j]) continue; /* non existing keys are like empty sets */
4055
4056 di = dictGetIterator(dv[j]);
4057
4058 while((de = dictNext(di)) != NULL) {
4059 robj *ele;
4060
4061 /* dictAdd will not add the same element multiple times */
4062 ele = dictGetEntryKey(de);
4063 if (op == REDIS_OP_UNION || j == 0) {
4064 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4065 incrRefCount(ele);
4066 cardinality++;
4067 }
4068 } else if (op == REDIS_OP_DIFF) {
4069 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4070 cardinality--;
4071 }
4072 }
4073 }
4074 dictReleaseIterator(di);
4075
4076 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4077 }
4078
4079 /* Output the content of the resulting set, if not in STORE mode */
4080 if (!dstkey) {
4081 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4082 di = dictGetIterator(dstset->ptr);
4083 while((de = dictNext(di)) != NULL) {
4084 robj *ele;
4085
4086 ele = dictGetEntryKey(de);
4087 addReplyBulkLen(c,ele);
4088 addReply(c,ele);
4089 addReply(c,shared.crlf);
4090 }
4091 dictReleaseIterator(di);
4092 } else {
4093 /* If we have a target key where to store the resulting set
4094 * create this key with the result set inside */
4095 deleteKey(c->db,dstkey);
4096 dictAdd(c->db->dict,dstkey,dstset);
4097 incrRefCount(dstkey);
4098 }
4099
4100 /* Cleanup */
4101 if (!dstkey) {
4102 decrRefCount(dstset);
4103 } else {
4104 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4105 dictSize((dict*)dstset->ptr)));
4106 server.dirty++;
4107 }
4108 zfree(dv);
4109 }
4110
4111 static void sunionCommand(redisClient *c) {
4112 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4113 }
4114
4115 static void sunionstoreCommand(redisClient *c) {
4116 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4117 }
4118
4119 static void sdiffCommand(redisClient *c) {
4120 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4121 }
4122
4123 static void sdiffstoreCommand(redisClient *c) {
4124 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4125 }
4126
4127 /* ==================================== ZSets =============================== */
4128
4129 /* ZSETs are ordered sets using two data structures to hold the same elements
4130 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4131 * data structure.
4132 *
4133 * The elements are added to an hash table mapping Redis objects to scores.
4134 * At the same time the elements are added to a skip list mapping scores
4135 * to Redis objects (so objects are sorted by scores in this "view"). */
4136
4137 /* This skiplist implementation is almost a C translation of the original
4138 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4139 * Alternative to Balanced Trees", modified in three ways:
4140 * a) this implementation allows for repeated values.
4141 * b) the comparison is not just by key (our 'score') but by satellite data.
4142 * c) there is a back pointer, so it's a doubly linked list with the back
4143 * pointers being only at "level 1". This allows to traverse the list
4144 * from tail to head, useful for ZREVRANGE. */
4145
4146 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4147 zskiplistNode *zn = zmalloc(sizeof(*zn));
4148
4149 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4150 zn->score = score;
4151 zn->obj = obj;
4152 return zn;
4153 }
4154
4155 static zskiplist *zslCreate(void) {
4156 int j;
4157 zskiplist *zsl;
4158
4159 zsl = zmalloc(sizeof(*zsl));
4160 zsl->level = 1;
4161 zsl->length = 0;
4162 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4163 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4164 zsl->header->forward[j] = NULL;
4165 zsl->header->backward = NULL;
4166 zsl->tail = NULL;
4167 return zsl;
4168 }
4169
4170 static void zslFreeNode(zskiplistNode *node) {
4171 decrRefCount(node->obj);
4172 zfree(node->forward);
4173 zfree(node);
4174 }
4175
4176 static void zslFree(zskiplist *zsl) {
4177 zskiplistNode *node = zsl->header->forward[0], *next;
4178
4179 zfree(zsl->header->forward);
4180 zfree(zsl->header);
4181 while(node) {
4182 next = node->forward[0];
4183 zslFreeNode(node);
4184 node = next;
4185 }
4186 zfree(zsl);
4187 }
4188
4189 static int zslRandomLevel(void) {
4190 int level = 1;
4191 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4192 level += 1;
4193 return level;
4194 }
4195
4196 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4197 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4198 int i, level;
4199
4200 x = zsl->header;
4201 for (i = zsl->level-1; i >= 0; i--) {
4202 while (x->forward[i] &&
4203 (x->forward[i]->score < score ||
4204 (x->forward[i]->score == score &&
4205 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4206 x = x->forward[i];
4207 update[i] = x;
4208 }
4209 /* we assume the key is not already inside, since we allow duplicated
4210 * scores, and the re-insertion of score and redis object should never
4211 * happpen since the caller of zslInsert() should test in the hash table
4212 * if the element is already inside or not. */
4213 level = zslRandomLevel();
4214 if (level > zsl->level) {
4215 for (i = zsl->level; i < level; i++)
4216 update[i] = zsl->header;
4217 zsl->level = level;
4218 }
4219 x = zslCreateNode(level,score,obj);
4220 for (i = 0; i < level; i++) {
4221 x->forward[i] = update[i]->forward[i];
4222 update[i]->forward[i] = x;
4223 }
4224 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4225 if (x->forward[0])
4226 x->forward[0]->backward = x;
4227 else
4228 zsl->tail = x;
4229 zsl->length++;
4230 }
4231
4232 /* Delete an element with matching score/object from the skiplist. */
4233 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4234 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4235 int i;
4236
4237 x = zsl->header;
4238 for (i = zsl->level-1; i >= 0; i--) {
4239 while (x->forward[i] &&
4240 (x->forward[i]->score < score ||
4241 (x->forward[i]->score == score &&
4242 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4243 x = x->forward[i];
4244 update[i] = x;
4245 }
4246 /* We may have multiple elements with the same score, what we need
4247 * is to find the element with both the right score and object. */
4248 x = x->forward[0];
4249 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4250 for (i = 0; i < zsl->level; i++) {
4251 if (update[i]->forward[i] != x) break;
4252 update[i]->forward[i] = x->forward[i];
4253 }
4254 if (x->forward[0]) {
4255 x->forward[0]->backward = (x->backward == zsl->header) ?
4256 NULL : x->backward;
4257 } else {
4258 zsl->tail = x->backward;
4259 }
4260 zslFreeNode(x);
4261 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4262 zsl->level--;
4263 zsl->length--;
4264 return 1;
4265 } else {
4266 return 0; /* not found */
4267 }
4268 return 0; /* not found */
4269 }
4270
4271 /* Delete all the elements with score between min and max from the skiplist.
4272 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4273 * Note that this function takes the reference to the hash table view of the
4274 * sorted set, in order to remove the elements from the hash table too. */
4275 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4276 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4277 unsigned long removed = 0;
4278 int i;
4279
4280 x = zsl->header;
4281 for (i = zsl->level-1; i >= 0; i--) {
4282 while (x->forward[i] && x->forward[i]->score < min)
4283 x = x->forward[i];
4284 update[i] = x;
4285 }
4286 /* We may have multiple elements with the same score, what we need
4287 * is to find the element with both the right score and object. */
4288 x = x->forward[0];
4289 while (x && x->score <= max) {
4290 zskiplistNode *next;
4291
4292 for (i = 0; i < zsl->level; i++) {
4293 if (update[i]->forward[i] != x) break;
4294 update[i]->forward[i] = x->forward[i];
4295 }
4296 if (x->forward[0]) {
4297 x->forward[0]->backward = (x->backward == zsl->header) ?
4298 NULL : x->backward;
4299 } else {
4300 zsl->tail = x->backward;
4301 }
4302 next = x->forward[0];
4303 dictDelete(dict,x->obj);
4304 zslFreeNode(x);
4305 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4306 zsl->level--;
4307 zsl->length--;
4308 removed++;
4309 x = next;
4310 }
4311 return removed; /* not found */
4312 }
4313
4314 /* Find the first node having a score equal or greater than the specified one.
4315 * Returns NULL if there is no match. */
4316 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4317 zskiplistNode *x;
4318 int i;
4319
4320 x = zsl->header;
4321 for (i = zsl->level-1; i >= 0; i--) {
4322 while (x->forward[i] && x->forward[i]->score < score)
4323 x = x->forward[i];
4324 }
4325 /* We may have multiple elements with the same score, what we need
4326 * is to find the element with both the right score and object. */
4327 return x->forward[0];
4328 }
4329
4330 /* The actual Z-commands implementations */
4331
4332 /* This generic command implements both ZADD and ZINCRBY.
4333 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4334 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4335 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4336 robj *zsetobj;
4337 zset *zs;
4338 double *score;
4339
4340 zsetobj = lookupKeyWrite(c->db,key);
4341 if (zsetobj == NULL) {
4342 zsetobj = createZsetObject();
4343 dictAdd(c->db->dict,key,zsetobj);
4344 incrRefCount(key);
4345 } else {
4346 if (zsetobj->type != REDIS_ZSET) {
4347 addReply(c,shared.wrongtypeerr);
4348 return;
4349 }
4350 }
4351 zs = zsetobj->ptr;
4352
4353 /* Ok now since we implement both ZADD and ZINCRBY here the code
4354 * needs to handle the two different conditions. It's all about setting
4355 * '*score', that is, the new score to set, to the right value. */
4356 score = zmalloc(sizeof(double));
4357 if (doincrement) {
4358 dictEntry *de;
4359
4360 /* Read the old score. If the element was not present starts from 0 */
4361 de = dictFind(zs->dict,ele);
4362 if (de) {
4363 double *oldscore = dictGetEntryVal(de);
4364 *score = *oldscore + scoreval;
4365 } else {
4366 *score = scoreval;
4367 }
4368 } else {
4369 *score = scoreval;
4370 }
4371
4372 /* What follows is a simple remove and re-insert operation that is common
4373 * to both ZADD and ZINCRBY... */
4374 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4375 /* case 1: New element */
4376 incrRefCount(ele); /* added to hash */
4377 zslInsert(zs->zsl,*score,ele);
4378 incrRefCount(ele); /* added to skiplist */
4379 server.dirty++;
4380 if (doincrement)
4381 addReplyDouble(c,*score);
4382 else
4383 addReply(c,shared.cone);
4384 } else {
4385 dictEntry *de;
4386 double *oldscore;
4387
4388 /* case 2: Score update operation */
4389 de = dictFind(zs->dict,ele);
4390 assert(de != NULL);
4391 oldscore = dictGetEntryVal(de);
4392 if (*score != *oldscore) {
4393 int deleted;
4394
4395 /* Remove and insert the element in the skip list with new score */
4396 deleted = zslDelete(zs->zsl,*oldscore,ele);
4397 assert(deleted != 0);
4398 zslInsert(zs->zsl,*score,ele);
4399 incrRefCount(ele);
4400 /* Update the score in the hash table */
4401 dictReplace(zs->dict,ele,score);
4402 server.dirty++;
4403 } else {
4404 zfree(score);
4405 }
4406 if (doincrement)
4407 addReplyDouble(c,*score);
4408 else
4409 addReply(c,shared.czero);
4410 }
4411 }
4412
4413 static void zaddCommand(redisClient *c) {
4414 double scoreval;
4415
4416 scoreval = strtod(c->argv[2]->ptr,NULL);
4417 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4418 }
4419
4420 static void zincrbyCommand(redisClient *c) {
4421 double scoreval;
4422
4423 scoreval = strtod(c->argv[2]->ptr,NULL);
4424 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4425 }
4426
4427 static void zremCommand(redisClient *c) {
4428 robj *zsetobj;
4429 zset *zs;
4430
4431 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4432 if (zsetobj == NULL) {
4433 addReply(c,shared.czero);
4434 } else {
4435 dictEntry *de;
4436 double *oldscore;
4437 int deleted;
4438
4439 if (zsetobj->type != REDIS_ZSET) {
4440 addReply(c,shared.wrongtypeerr);
4441 return;
4442 }
4443 zs = zsetobj->ptr;
4444 de = dictFind(zs->dict,c->argv[2]);
4445 if (de == NULL) {
4446 addReply(c,shared.czero);
4447 return;
4448 }
4449 /* Delete from the skiplist */
4450 oldscore = dictGetEntryVal(de);
4451 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4452 assert(deleted != 0);
4453
4454 /* Delete from the hash table */
4455 dictDelete(zs->dict,c->argv[2]);
4456 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4457 server.dirty++;
4458 addReply(c,shared.cone);
4459 }
4460 }
4461
4462 static void zremrangebyscoreCommand(redisClient *c) {
4463 double min = strtod(c->argv[2]->ptr,NULL);
4464 double max = strtod(c->argv[3]->ptr,NULL);
4465 robj *zsetobj;
4466 zset *zs;
4467
4468 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4469 if (zsetobj == NULL) {
4470 addReply(c,shared.czero);
4471 } else {
4472 long deleted;
4473
4474 if (zsetobj->type != REDIS_ZSET) {
4475 addReply(c,shared.wrongtypeerr);
4476 return;
4477 }
4478 zs = zsetobj->ptr;
4479 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4480 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4481 server.dirty += deleted;
4482 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4483 }
4484 }
4485
4486 static void zrangeGenericCommand(redisClient *c, int reverse) {
4487 robj *o;
4488 int start = atoi(c->argv[2]->ptr);
4489 int end = atoi(c->argv[3]->ptr);
4490
4491 o = lookupKeyRead(c->db,c->argv[1]);
4492 if (o == NULL) {
4493 addReply(c,shared.nullmultibulk);
4494 } else {
4495 if (o->type != REDIS_ZSET) {
4496 addReply(c,shared.wrongtypeerr);
4497 } else {
4498 zset *zsetobj = o->ptr;
4499 zskiplist *zsl = zsetobj->zsl;
4500 zskiplistNode *ln;
4501
4502 int llen = zsl->length;
4503 int rangelen, j;
4504 robj *ele;
4505
4506 /* convert negative indexes */
4507 if (start < 0) start = llen+start;
4508 if (end < 0) end = llen+end;
4509 if (start < 0) start = 0;
4510 if (end < 0) end = 0;
4511
4512 /* indexes sanity checks */
4513 if (start > end || start >= llen) {
4514 /* Out of range start or start > end result in empty list */
4515 addReply(c,shared.emptymultibulk);
4516 return;
4517 }
4518 if (end >= llen) end = llen-1;
4519 rangelen = (end-start)+1;
4520
4521 /* Return the result in form of a multi-bulk reply */
4522 if (reverse) {
4523 ln = zsl->tail;
4524 while (start--)
4525 ln = ln->backward;
4526 } else {
4527 ln = zsl->header->forward[0];
4528 while (start--)
4529 ln = ln->forward[0];
4530 }
4531
4532 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4533 for (j = 0; j < rangelen; j++) {
4534 ele = ln->obj;
4535 addReplyBulkLen(c,ele);
4536 addReply(c,ele);
4537 addReply(c,shared.crlf);
4538 ln = reverse ? ln->backward : ln->forward[0];
4539 }
4540 }
4541 }
4542 }
4543
4544 static void zrangeCommand(redisClient *c) {
4545 zrangeGenericCommand(c,0);
4546 }
4547
4548 static void zrevrangeCommand(redisClient *c) {
4549 zrangeGenericCommand(c,1);
4550 }
4551
4552 static void zrangebyscoreCommand(redisClient *c) {
4553 robj *o;
4554 double min = strtod(c->argv[2]->ptr,NULL);
4555 double max = strtod(c->argv[3]->ptr,NULL);
4556
4557 o = lookupKeyRead(c->db,c->argv[1]);
4558 if (o == NULL) {
4559 addReply(c,shared.nullmultibulk);
4560 } else {
4561 if (o->type != REDIS_ZSET) {
4562 addReply(c,shared.wrongtypeerr);
4563 } else {
4564 zset *zsetobj = o->ptr;
4565 zskiplist *zsl = zsetobj->zsl;
4566 zskiplistNode *ln;
4567 robj *ele, *lenobj;
4568 unsigned int rangelen = 0;
4569
4570 /* Get the first node with the score >= min */
4571 ln = zslFirstWithScore(zsl,min);
4572 if (ln == NULL) {
4573 /* No element matching the speciifed interval */
4574 addReply(c,shared.emptymultibulk);
4575 return;
4576 }
4577
4578 /* We don't know in advance how many matching elements there
4579 * are in the list, so we push this object that will represent
4580 * the multi-bulk length in the output buffer, and will "fix"
4581 * it later */
4582 lenobj = createObject(REDIS_STRING,NULL);
4583 addReply(c,lenobj);
4584
4585 while(ln && ln->score <= max) {
4586 ele = ln->obj;
4587 addReplyBulkLen(c,ele);
4588 addReply(c,ele);
4589 addReply(c,shared.crlf);
4590 ln = ln->forward[0];
4591 rangelen++;
4592 }
4593 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4594 }
4595 }
4596 }
4597
4598 static void zcardCommand(redisClient *c) {
4599 robj *o;
4600 zset *zs;
4601
4602 o = lookupKeyRead(c->db,c->argv[1]);
4603 if (o == NULL) {
4604 addReply(c,shared.czero);
4605 return;
4606 } else {
4607 if (o->type != REDIS_ZSET) {
4608 addReply(c,shared.wrongtypeerr);
4609 } else {
4610 zs = o->ptr;
4611 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4612 }
4613 }
4614 }
4615
4616 static void zscoreCommand(redisClient *c) {
4617 robj *o;
4618 zset *zs;
4619
4620 o = lookupKeyRead(c->db,c->argv[1]);
4621 if (o == NULL) {
4622 addReply(c,shared.nullbulk);
4623 return;
4624 } else {
4625 if (o->type != REDIS_ZSET) {
4626 addReply(c,shared.wrongtypeerr);
4627 } else {
4628 dictEntry *de;
4629
4630 zs = o->ptr;
4631 de = dictFind(zs->dict,c->argv[2]);
4632 if (!de) {
4633 addReply(c,shared.nullbulk);
4634 } else {
4635 double *score = dictGetEntryVal(de);
4636
4637 addReplyDouble(c,*score);
4638 }
4639 }
4640 }
4641 }
4642
4643 /* ========================= Non type-specific commands ==================== */
4644
4645 static void flushdbCommand(redisClient *c) {
4646 server.dirty += dictSize(c->db->dict);
4647 dictEmpty(c->db->dict);
4648 dictEmpty(c->db->expires);
4649 addReply(c,shared.ok);
4650 }
4651
4652 static void flushallCommand(redisClient *c) {
4653 server.dirty += emptyDb();
4654 addReply(c,shared.ok);
4655 rdbSave(server.dbfilename);
4656 server.dirty++;
4657 }
4658
4659 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4660 redisSortOperation *so = zmalloc(sizeof(*so));
4661 so->type = type;
4662 so->pattern = pattern;
4663 return so;
4664 }
4665
4666 /* Return the value associated to the key with a name obtained
4667 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4668 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4669 char *p;
4670 sds spat, ssub;
4671 robj keyobj;
4672 int prefixlen, sublen, postfixlen;
4673 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4674 struct {
4675 long len;
4676 long free;
4677 char buf[REDIS_SORTKEY_MAX+1];
4678 } keyname;
4679
4680 /* If the pattern is "#" return the substitution object itself in order
4681 * to implement the "SORT ... GET #" feature. */
4682 spat = pattern->ptr;
4683 if (spat[0] == '#' && spat[1] == '\0') {
4684 return subst;
4685 }
4686
4687 /* The substitution object may be specially encoded. If so we create
4688 * a decoded object on the fly. Otherwise getDecodedObject will just
4689 * increment the ref count, that we'll decrement later. */
4690 subst = getDecodedObject(subst);
4691
4692 ssub = subst->ptr;
4693 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4694 p = strchr(spat,'*');
4695 if (!p) {
4696 decrRefCount(subst);
4697 return NULL;
4698 }
4699
4700 prefixlen = p-spat;
4701 sublen = sdslen(ssub);
4702 postfixlen = sdslen(spat)-(prefixlen+1);
4703 memcpy(keyname.buf,spat,prefixlen);
4704 memcpy(keyname.buf+prefixlen,ssub,sublen);
4705 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4706 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4707 keyname.len = prefixlen+sublen+postfixlen;
4708
4709 keyobj.refcount = 1;
4710 keyobj.type = REDIS_STRING;
4711 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4712
4713 decrRefCount(subst);
4714
4715 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4716 return lookupKeyRead(db,&keyobj);
4717 }
4718
4719 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4720 * the additional parameter is not standard but a BSD-specific we have to
4721 * pass sorting parameters via the global 'server' structure */
4722 static int sortCompare(const void *s1, const void *s2) {
4723 const redisSortObject *so1 = s1, *so2 = s2;
4724 int cmp;
4725
4726 if (!server.sort_alpha) {
4727 /* Numeric sorting. Here it's trivial as we precomputed scores */
4728 if (so1->u.score > so2->u.score) {
4729 cmp = 1;
4730 } else if (so1->u.score < so2->u.score) {
4731 cmp = -1;
4732 } else {
4733 cmp = 0;
4734 }
4735 } else {
4736 /* Alphanumeric sorting */
4737 if (server.sort_bypattern) {
4738 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4739 /* At least one compare object is NULL */
4740 if (so1->u.cmpobj == so2->u.cmpobj)
4741 cmp = 0;
4742 else if (so1->u.cmpobj == NULL)
4743 cmp = -1;
4744 else
4745 cmp = 1;
4746 } else {
4747 /* We have both the objects, use strcoll */
4748 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4749 }
4750 } else {
4751 /* Compare elements directly */
4752 robj *dec1, *dec2;
4753
4754 dec1 = getDecodedObject(so1->obj);
4755 dec2 = getDecodedObject(so2->obj);
4756 cmp = strcoll(dec1->ptr,dec2->ptr);
4757 decrRefCount(dec1);
4758 decrRefCount(dec2);
4759 }
4760 }
4761 return server.sort_desc ? -cmp : cmp;
4762 }
4763
4764 /* The SORT command is the most complex command in Redis. Warning: this code
4765 * is optimized for speed and a bit less for readability */
4766 static void sortCommand(redisClient *c) {
4767 list *operations;
4768 int outputlen = 0;
4769 int desc = 0, alpha = 0;
4770 int limit_start = 0, limit_count = -1, start, end;
4771 int j, dontsort = 0, vectorlen;
4772 int getop = 0; /* GET operation counter */
4773 robj *sortval, *sortby = NULL, *storekey = NULL;
4774 redisSortObject *vector; /* Resulting vector to sort */
4775
4776 /* Lookup the key to sort. It must be of the right types */
4777 sortval = lookupKeyRead(c->db,c->argv[1]);
4778 if (sortval == NULL) {
4779 addReply(c,shared.nokeyerr);
4780 return;
4781 }
4782 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4783 addReply(c,shared.wrongtypeerr);
4784 return;
4785 }
4786
4787 /* Create a list of operations to perform for every sorted element.
4788 * Operations can be GET/DEL/INCR/DECR */
4789 operations = listCreate();
4790 listSetFreeMethod(operations,zfree);
4791 j = 2;
4792
4793 /* Now we need to protect sortval incrementing its count, in the future
4794 * SORT may have options able to overwrite/delete keys during the sorting
4795 * and the sorted key itself may get destroied */
4796 incrRefCount(sortval);
4797
4798 /* The SORT command has an SQL-alike syntax, parse it */
4799 while(j < c->argc) {
4800 int leftargs = c->argc-j-1;
4801 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4802 desc = 0;
4803 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4804 desc = 1;
4805 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4806 alpha = 1;
4807 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4808 limit_start = atoi(c->argv[j+1]->ptr);
4809 limit_count = atoi(c->argv[j+2]->ptr);
4810 j+=2;
4811 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4812 storekey = c->argv[j+1];
4813 j++;
4814 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4815 sortby = c->argv[j+1];
4816 /* If the BY pattern does not contain '*', i.e. it is constant,
4817 * we don't need to sort nor to lookup the weight keys. */
4818 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4819 j++;
4820 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4821 listAddNodeTail(operations,createSortOperation(
4822 REDIS_SORT_GET,c->argv[j+1]));
4823 getop++;
4824 j++;
4825 } else {
4826 decrRefCount(sortval);
4827 listRelease(operations);
4828 addReply(c,shared.syntaxerr);
4829 return;
4830 }
4831 j++;
4832 }
4833
4834 /* Load the sorting vector with all the objects to sort */
4835 vectorlen = (sortval->type == REDIS_LIST) ?
4836 listLength((list*)sortval->ptr) :
4837 dictSize((dict*)sortval->ptr);
4838 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4839 j = 0;
4840 if (sortval->type == REDIS_LIST) {
4841 list *list = sortval->ptr;
4842 listNode *ln;
4843
4844 listRewind(list);
4845 while((ln = listYield(list))) {
4846 robj *ele = ln->value;
4847 vector[j].obj = ele;
4848 vector[j].u.score = 0;
4849 vector[j].u.cmpobj = NULL;
4850 j++;
4851 }
4852 } else {
4853 dict *set = sortval->ptr;
4854 dictIterator *di;
4855 dictEntry *setele;
4856
4857 di = dictGetIterator(set);
4858 while((setele = dictNext(di)) != NULL) {
4859 vector[j].obj = dictGetEntryKey(setele);
4860 vector[j].u.score = 0;
4861 vector[j].u.cmpobj = NULL;
4862 j++;
4863 }
4864 dictReleaseIterator(di);
4865 }
4866 assert(j == vectorlen);
4867
4868 /* Now it's time to load the right scores in the sorting vector */
4869 if (dontsort == 0) {
4870 for (j = 0; j < vectorlen; j++) {
4871 if (sortby) {
4872 robj *byval;
4873
4874 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4875 if (!byval || byval->type != REDIS_STRING) continue;
4876 if (alpha) {
4877 vector[j].u.cmpobj = getDecodedObject(byval);
4878 } else {
4879 if (byval->encoding == REDIS_ENCODING_RAW) {
4880 vector[j].u.score = strtod(byval->ptr,NULL);
4881 } else {
4882 /* Don't need to decode the object if it's
4883 * integer-encoded (the only encoding supported) so
4884 * far. We can just cast it */
4885 if (byval->encoding == REDIS_ENCODING_INT) {
4886 vector[j].u.score = (long)byval->ptr;
4887 } else
4888 assert(1 != 1);
4889 }
4890 }
4891 } else {
4892 if (!alpha) {
4893 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4894 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4895 else {
4896 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4897 vector[j].u.score = (long) vector[j].obj->ptr;
4898 else
4899 assert(1 != 1);
4900 }
4901 }
4902 }
4903 }
4904 }
4905
4906 /* We are ready to sort the vector... perform a bit of sanity check
4907 * on the LIMIT option too. We'll use a partial version of quicksort. */
4908 start = (limit_start < 0) ? 0 : limit_start;
4909 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4910 if (start >= vectorlen) {
4911 start = vectorlen-1;
4912 end = vectorlen-2;
4913 }
4914 if (end >= vectorlen) end = vectorlen-1;
4915
4916 if (dontsort == 0) {
4917 server.sort_desc = desc;
4918 server.sort_alpha = alpha;
4919 server.sort_bypattern = sortby ? 1 : 0;
4920 if (sortby && (start != 0 || end != vectorlen-1))
4921 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4922 else
4923 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4924 }
4925
4926 /* Send command output to the output buffer, performing the specified
4927 * GET/DEL/INCR/DECR operations if any. */
4928 outputlen = getop ? getop*(end-start+1) : end-start+1;
4929 if (storekey == NULL) {
4930 /* STORE option not specified, sent the sorting result to client */
4931 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4932 for (j = start; j <= end; j++) {
4933 listNode *ln;
4934 if (!getop) {
4935 addReplyBulkLen(c,vector[j].obj);
4936 addReply(c,vector[j].obj);
4937 addReply(c,shared.crlf);
4938 }
4939 listRewind(operations);
4940 while((ln = listYield(operations))) {
4941 redisSortOperation *sop = ln->value;
4942 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4943 vector[j].obj);
4944
4945 if (sop->type == REDIS_SORT_GET) {
4946 if (!val || val->type != REDIS_STRING) {
4947 addReply(c,shared.nullbulk);
4948 } else {
4949 addReplyBulkLen(c,val);
4950 addReply(c,val);
4951 addReply(c,shared.crlf);
4952 }
4953 } else {
4954 assert(sop->type == REDIS_SORT_GET); /* always fails */
4955 }
4956 }
4957 }
4958 } else {
4959 robj *listObject = createListObject();
4960 list *listPtr = (list*) listObject->ptr;
4961
4962 /* STORE option specified, set the sorting result as a List object */
4963 for (j = start; j <= end; j++) {
4964 listNode *ln;
4965 if (!getop) {
4966 listAddNodeTail(listPtr,vector[j].obj);
4967 incrRefCount(vector[j].obj);
4968 }
4969 listRewind(operations);
4970 while((ln = listYield(operations))) {
4971 redisSortOperation *sop = ln->value;
4972 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4973 vector[j].obj);
4974
4975 if (sop->type == REDIS_SORT_GET) {
4976 if (!val || val->type != REDIS_STRING) {
4977 listAddNodeTail(listPtr,createStringObject("",0));
4978 } else {
4979 listAddNodeTail(listPtr,val);
4980 incrRefCount(val);
4981 }
4982 } else {
4983 assert(sop->type == REDIS_SORT_GET); /* always fails */
4984 }
4985 }
4986 }
4987 if (dictReplace(c->db->dict,storekey,listObject)) {
4988 incrRefCount(storekey);
4989 }
4990 /* Note: we add 1 because the DB is dirty anyway since even if the
4991 * SORT result is empty a new key is set and maybe the old content
4992 * replaced. */
4993 server.dirty += 1+outputlen;
4994 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4995 }
4996
4997 /* Cleanup */
4998 decrRefCount(sortval);
4999 listRelease(operations);
5000 for (j = 0; j < vectorlen; j++) {
5001 if (sortby && alpha && vector[j].u.cmpobj)
5002 decrRefCount(vector[j].u.cmpobj);
5003 }
5004 zfree(vector);
5005 }
5006
5007 static void infoCommand(redisClient *c) {
5008 sds info;
5009 time_t uptime = time(NULL)-server.stat_starttime;
5010 int j;
5011
5012 info = sdscatprintf(sdsempty(),
5013 "redis_version:%s\r\n"
5014 "arch_bits:%s\r\n"
5015 "uptime_in_seconds:%d\r\n"
5016 "uptime_in_days:%d\r\n"
5017 "connected_clients:%d\r\n"
5018 "connected_slaves:%d\r\n"
5019 "used_memory:%zu\r\n"
5020 "changes_since_last_save:%lld\r\n"
5021 "bgsave_in_progress:%d\r\n"
5022 "last_save_time:%d\r\n"
5023 "total_connections_received:%lld\r\n"
5024 "total_commands_processed:%lld\r\n"
5025 "role:%s\r\n"
5026 ,REDIS_VERSION,
5027 (sizeof(long) == 8) ? "64" : "32",
5028 uptime,
5029 uptime/(3600*24),
5030 listLength(server.clients)-listLength(server.slaves),
5031 listLength(server.slaves),
5032 server.usedmemory,
5033 server.dirty,
5034 server.bgsavechildpid != -1,
5035 server.lastsave,
5036 server.stat_numconnections,
5037 server.stat_numcommands,
5038 server.masterhost == NULL ? "master" : "slave"
5039 );
5040 if (server.masterhost) {
5041 info = sdscatprintf(info,
5042 "master_host:%s\r\n"
5043 "master_port:%d\r\n"
5044 "master_link_status:%s\r\n"
5045 "master_last_io_seconds_ago:%d\r\n"
5046 ,server.masterhost,
5047 server.masterport,
5048 (server.replstate == REDIS_REPL_CONNECTED) ?
5049 "up" : "down",
5050 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5051 );
5052 }
5053 for (j = 0; j < server.dbnum; j++) {
5054 long long keys, vkeys;
5055
5056 keys = dictSize(server.db[j].dict);
5057 vkeys = dictSize(server.db[j].expires);
5058 if (keys || vkeys) {
5059 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5060 j, keys, vkeys);
5061 }
5062 }
5063 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
5064 addReplySds(c,info);
5065 addReply(c,shared.crlf);
5066 }
5067
5068 static void monitorCommand(redisClient *c) {
5069 /* ignore MONITOR if aleady slave or in monitor mode */
5070 if (c->flags & REDIS_SLAVE) return;
5071
5072 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5073 c->slaveseldb = 0;
5074 listAddNodeTail(server.monitors,c);
5075 addReply(c,shared.ok);
5076 }
5077
5078 /* ================================= Expire ================================= */
5079 static int removeExpire(redisDb *db, robj *key) {
5080 if (dictDelete(db->expires,key) == DICT_OK) {
5081 return 1;
5082 } else {
5083 return 0;
5084 }
5085 }
5086
5087 static int setExpire(redisDb *db, robj *key, time_t when) {
5088 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5089 return 0;
5090 } else {
5091 incrRefCount(key);
5092 return 1;
5093 }
5094 }
5095
5096 /* Return the expire time of the specified key, or -1 if no expire
5097 * is associated with this key (i.e. the key is non volatile) */
5098 static time_t getExpire(redisDb *db, robj *key) {
5099 dictEntry *de;
5100
5101 /* No expire? return ASAP */
5102 if (dictSize(db->expires) == 0 ||
5103 (de = dictFind(db->expires,key)) == NULL) return -1;
5104
5105 return (time_t) dictGetEntryVal(de);
5106 }
5107
5108 static int expireIfNeeded(redisDb *db, robj *key) {
5109 time_t when;
5110 dictEntry *de;
5111
5112 /* No expire? return ASAP */
5113 if (dictSize(db->expires) == 0 ||
5114 (de = dictFind(db->expires,key)) == NULL) return 0;
5115
5116 /* Lookup the expire */
5117 when = (time_t) dictGetEntryVal(de);
5118 if (time(NULL) <= when) return 0;
5119
5120 /* Delete the key */
5121 dictDelete(db->expires,key);
5122 return dictDelete(db->dict,key) == DICT_OK;
5123 }
5124
5125 static int deleteIfVolatile(redisDb *db, robj *key) {
5126 dictEntry *de;
5127
5128 /* No expire? return ASAP */
5129 if (dictSize(db->expires) == 0 ||
5130 (de = dictFind(db->expires,key)) == NULL) return 0;
5131
5132 /* Delete the key */
5133 server.dirty++;
5134 dictDelete(db->expires,key);
5135 return dictDelete(db->dict,key) == DICT_OK;
5136 }
5137
5138 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5139 dictEntry *de;
5140
5141 de = dictFind(c->db->dict,key);
5142 if (de == NULL) {
5143 addReply(c,shared.czero);
5144 return;
5145 }
5146 if (seconds < 0) {
5147 if (deleteKey(c->db,key)) server.dirty++;
5148 addReply(c, shared.cone);
5149 return;
5150 } else {
5151 time_t when = time(NULL)+seconds;
5152 if (setExpire(c->db,key,when)) {
5153 addReply(c,shared.cone);
5154 server.dirty++;
5155 } else {
5156 addReply(c,shared.czero);
5157 }
5158 return;
5159 }
5160 }
5161
5162 static void expireCommand(redisClient *c) {
5163 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5164 }
5165
5166 static void expireatCommand(redisClient *c) {
5167 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5168 }
5169
5170 static void ttlCommand(redisClient *c) {
5171 time_t expire;
5172 int ttl = -1;
5173
5174 expire = getExpire(c->db,c->argv[1]);
5175 if (expire != -1) {
5176 ttl = (int) (expire-time(NULL));
5177 if (ttl < 0) ttl = -1;
5178 }
5179 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5180 }
5181
5182 /* =============================== Replication ============================= */
5183
5184 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5185 ssize_t nwritten, ret = size;
5186 time_t start = time(NULL);
5187
5188 timeout++;
5189 while(size) {
5190 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5191 nwritten = write(fd,ptr,size);
5192 if (nwritten == -1) return -1;
5193 ptr += nwritten;
5194 size -= nwritten;
5195 }
5196 if ((time(NULL)-start) > timeout) {
5197 errno = ETIMEDOUT;
5198 return -1;
5199 }
5200 }
5201 return ret;
5202 }
5203
5204 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5205 ssize_t nread, totread = 0;
5206 time_t start = time(NULL);
5207
5208 timeout++;
5209 while(size) {
5210 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5211 nread = read(fd,ptr,size);
5212 if (nread == -1) return -1;
5213 ptr += nread;
5214 size -= nread;
5215 totread += nread;
5216 }
5217 if ((time(NULL)-start) > timeout) {
5218 errno = ETIMEDOUT;
5219 return -1;
5220 }
5221 }
5222 return totread;
5223 }
5224
5225 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5226 ssize_t nread = 0;
5227
5228 size--;
5229 while(size) {
5230 char c;
5231
5232 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5233 if (c == '\n') {
5234 *ptr = '\0';
5235 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5236 return nread;
5237 } else {
5238 *ptr++ = c;
5239 *ptr = '\0';
5240 nread++;
5241 }
5242 }
5243 return nread;
5244 }
5245
5246 static void syncCommand(redisClient *c) {
5247 /* ignore SYNC if aleady slave or in monitor mode */
5248 if (c->flags & REDIS_SLAVE) return;
5249
5250 /* SYNC can't be issued when the server has pending data to send to
5251 * the client about already issued commands. We need a fresh reply
5252 * buffer registering the differences between the BGSAVE and the current
5253 * dataset, so that we can copy to other slaves if needed. */
5254 if (listLength(c->reply) != 0) {
5255 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5256 return;
5257 }
5258
5259 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5260 /* Here we need to check if there is a background saving operation
5261 * in progress, or if it is required to start one */
5262 if (server.bgsavechildpid != -1) {
5263 /* Ok a background save is in progress. Let's check if it is a good
5264 * one for replication, i.e. if there is another slave that is
5265 * registering differences since the server forked to save */
5266 redisClient *slave;
5267 listNode *ln;
5268
5269 listRewind(server.slaves);
5270 while((ln = listYield(server.slaves))) {
5271 slave = ln->value;
5272 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5273 }
5274 if (ln) {
5275 /* Perfect, the server is already registering differences for
5276 * another slave. Set the right state, and copy the buffer. */
5277 listRelease(c->reply);
5278 c->reply = listDup(slave->reply);
5279 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5280 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5281 } else {
5282 /* No way, we need to wait for the next BGSAVE in order to
5283 * register differences */
5284 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5285 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5286 }
5287 } else {
5288 /* Ok we don't have a BGSAVE in progress, let's start one */
5289 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5290 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5291 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5292 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5293 return;
5294 }
5295 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5296 }
5297 c->repldbfd = -1;
5298 c->flags |= REDIS_SLAVE;
5299 c->slaveseldb = 0;
5300 listAddNodeTail(server.slaves,c);
5301 return;
5302 }
5303
5304 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5305 redisClient *slave = privdata;
5306 REDIS_NOTUSED(el);
5307 REDIS_NOTUSED(mask);
5308 char buf[REDIS_IOBUF_LEN];
5309 ssize_t nwritten, buflen;
5310
5311 if (slave->repldboff == 0) {
5312 /* Write the bulk write count before to transfer the DB. In theory here
5313 * we don't know how much room there is in the output buffer of the
5314 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5315 * operations) will never be smaller than the few bytes we need. */
5316 sds bulkcount;
5317
5318 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5319 slave->repldbsize);
5320 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5321 {
5322 sdsfree(bulkcount);
5323 freeClient(slave);
5324 return;
5325 }
5326 sdsfree(bulkcount);
5327 }
5328 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5329 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5330 if (buflen <= 0) {
5331 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5332 (buflen == 0) ? "premature EOF" : strerror(errno));
5333 freeClient(slave);
5334 return;
5335 }
5336 if ((nwritten = write(fd,buf,buflen)) == -1) {
5337 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5338 strerror(errno));
5339 freeClient(slave);
5340 return;
5341 }
5342 slave->repldboff += nwritten;
5343 if (slave->repldboff == slave->repldbsize) {
5344 close(slave->repldbfd);
5345 slave->repldbfd = -1;
5346 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5347 slave->replstate = REDIS_REPL_ONLINE;
5348 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5349 sendReplyToClient, slave) == AE_ERR) {
5350 freeClient(slave);
5351 return;
5352 }
5353 addReplySds(slave,sdsempty());
5354 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5355 }
5356 }
5357
5358 /* This function is called at the end of every backgrond saving.
5359 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5360 * otherwise REDIS_ERR is passed to the function.
5361 *
5362 * The goal of this function is to handle slaves waiting for a successful
5363 * background saving in order to perform non-blocking synchronization. */
5364 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5365 listNode *ln;
5366 int startbgsave = 0;
5367
5368 listRewind(server.slaves);
5369 while((ln = listYield(server.slaves))) {
5370 redisClient *slave = ln->value;
5371
5372 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5373 startbgsave = 1;
5374 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5375 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5376 struct redis_stat buf;
5377
5378 if (bgsaveerr != REDIS_OK) {
5379 freeClient(slave);
5380 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5381 continue;
5382 }
5383 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5384 redis_fstat(slave->repldbfd,&buf) == -1) {
5385 freeClient(slave);
5386 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5387 continue;
5388 }
5389 slave->repldboff = 0;
5390 slave->repldbsize = buf.st_size;
5391 slave->replstate = REDIS_REPL_SEND_BULK;
5392 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5393 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5394 freeClient(slave);
5395 continue;
5396 }
5397 }
5398 }
5399 if (startbgsave) {
5400 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5401 listRewind(server.slaves);
5402 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5403 while((ln = listYield(server.slaves))) {
5404 redisClient *slave = ln->value;
5405
5406 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5407 freeClient(slave);
5408 }
5409 }
5410 }
5411 }
5412
5413 static int syncWithMaster(void) {
5414 char buf[1024], tmpfile[256], authcmd[1024];
5415 int dumpsize;
5416 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5417 int dfd;
5418
5419 if (fd == -1) {
5420 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5421 strerror(errno));
5422 return REDIS_ERR;
5423 }
5424
5425 /* AUTH with the master if required. */
5426 if(server.masterauth) {
5427 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5428 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5429 close(fd);
5430 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5431 strerror(errno));
5432 return REDIS_ERR;
5433 }
5434 /* Read the AUTH result. */
5435 if (syncReadLine(fd,buf,1024,3600) == -1) {
5436 close(fd);
5437 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5438 strerror(errno));
5439 return REDIS_ERR;
5440 }
5441 if (buf[0] != '+') {
5442 close(fd);
5443 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5444 return REDIS_ERR;
5445 }
5446 }
5447
5448 /* Issue the SYNC command */
5449 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5450 close(fd);
5451 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5452 strerror(errno));
5453 return REDIS_ERR;
5454 }
5455 /* Read the bulk write count */
5456 if (syncReadLine(fd,buf,1024,3600) == -1) {
5457 close(fd);
5458 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5459 strerror(errno));
5460 return REDIS_ERR;
5461 }
5462 if (buf[0] != '$') {
5463 close(fd);
5464 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5465 return REDIS_ERR;
5466 }
5467 dumpsize = atoi(buf+1);
5468 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5469 /* Read the bulk write data on a temp file */
5470 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5471 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5472 if (dfd == -1) {
5473 close(fd);
5474 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5475 return REDIS_ERR;
5476 }
5477 while(dumpsize) {
5478 int nread, nwritten;
5479
5480 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5481 if (nread == -1) {
5482 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5483 strerror(errno));
5484 close(fd);
5485 close(dfd);
5486 return REDIS_ERR;
5487 }
5488 nwritten = write(dfd,buf,nread);
5489 if (nwritten == -1) {
5490 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5491 close(fd);
5492 close(dfd);
5493 return REDIS_ERR;
5494 }
5495 dumpsize -= nread;
5496 }
5497 close(dfd);
5498 if (rename(tmpfile,server.dbfilename) == -1) {
5499 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5500 unlink(tmpfile);
5501 close(fd);
5502 return REDIS_ERR;
5503 }
5504 emptyDb();
5505 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5506 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5507 close(fd);
5508 return REDIS_ERR;
5509 }
5510 server.master = createClient(fd);
5511 server.master->flags |= REDIS_MASTER;
5512 server.replstate = REDIS_REPL_CONNECTED;
5513 return REDIS_OK;
5514 }
5515
5516 static void slaveofCommand(redisClient *c) {
5517 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5518 !strcasecmp(c->argv[2]->ptr,"one")) {
5519 if (server.masterhost) {
5520 sdsfree(server.masterhost);
5521 server.masterhost = NULL;
5522 if (server.master) freeClient(server.master);
5523 server.replstate = REDIS_REPL_NONE;
5524 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5525 }
5526 } else {
5527 sdsfree(server.masterhost);
5528 server.masterhost = sdsdup(c->argv[1]->ptr);
5529 server.masterport = atoi(c->argv[2]->ptr);
5530 if (server.master) freeClient(server.master);
5531 server.replstate = REDIS_REPL_CONNECT;
5532 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5533 server.masterhost, server.masterport);
5534 }
5535 addReply(c,shared.ok);
5536 }
5537
5538 /* ============================ Maxmemory directive ======================== */
5539
5540 /* This function gets called when 'maxmemory' is set on the config file to limit
5541 * the max memory used by the server, and we are out of memory.
5542 * This function will try to, in order:
5543 *
5544 * - Free objects from the free list
5545 * - Try to remove keys with an EXPIRE set
5546 *
5547 * It is not possible to free enough memory to reach used-memory < maxmemory
5548 * the server will start refusing commands that will enlarge even more the
5549 * memory usage.
5550 */
5551 static void freeMemoryIfNeeded(void) {
5552 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5553 if (listLength(server.objfreelist)) {
5554 robj *o;
5555
5556 listNode *head = listFirst(server.objfreelist);
5557 o = listNodeValue(head);
5558 listDelNode(server.objfreelist,head);
5559 zfree(o);
5560 } else {
5561 int j, k, freed = 0;
5562
5563 for (j = 0; j < server.dbnum; j++) {
5564 int minttl = -1;
5565 robj *minkey = NULL;
5566 struct dictEntry *de;
5567
5568 if (dictSize(server.db[j].expires)) {
5569 freed = 1;
5570 /* From a sample of three keys drop the one nearest to
5571 * the natural expire */
5572 for (k = 0; k < 3; k++) {
5573 time_t t;
5574
5575 de = dictGetRandomKey(server.db[j].expires);
5576 t = (time_t) dictGetEntryVal(de);
5577 if (minttl == -1 || t < minttl) {
5578 minkey = dictGetEntryKey(de);
5579 minttl = t;
5580 }
5581 }
5582 deleteKey(server.db+j,minkey);
5583 }
5584 }
5585 if (!freed) return; /* nothing to free... */
5586 }
5587 }
5588 }
5589
5590 /* ============================== Append Only file ========================== */
5591
5592 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5593 sds buf = sdsempty();
5594 int j;
5595 ssize_t nwritten;
5596 time_t now;
5597 robj *tmpargv[3];
5598
5599 /* The DB this command was targetting is not the same as the last command
5600 * we appendend. To issue a SELECT command is needed. */
5601 if (dictid != server.appendseldb) {
5602 char seldb[64];
5603
5604 snprintf(seldb,sizeof(seldb),"%d",dictid);
5605 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5606 strlen(seldb),seldb);
5607 server.appendseldb = dictid;
5608 }
5609
5610 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5611 * EXPIREs into EXPIREATs calls */
5612 if (cmd->proc == expireCommand) {
5613 long when;
5614
5615 tmpargv[0] = createStringObject("EXPIREAT",8);
5616 tmpargv[1] = argv[1];
5617 incrRefCount(argv[1]);
5618 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5619 tmpargv[2] = createObject(REDIS_STRING,
5620 sdscatprintf(sdsempty(),"%ld",when));
5621 argv = tmpargv;
5622 }
5623
5624 /* Append the actual command */
5625 buf = sdscatprintf(buf,"*%d\r\n",argc);
5626 for (j = 0; j < argc; j++) {
5627 robj *o = argv[j];
5628
5629 o = getDecodedObject(o);
5630 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5631 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5632 buf = sdscatlen(buf,"\r\n",2);
5633 decrRefCount(o);
5634 }
5635
5636 /* Free the objects from the modified argv for EXPIREAT */
5637 if (cmd->proc == expireCommand) {
5638 for (j = 0; j < 3; j++)
5639 decrRefCount(argv[j]);
5640 }
5641
5642 /* We want to perform a single write. This should be guaranteed atomic
5643 * at least if the filesystem we are writing is a real physical one.
5644 * While this will save us against the server being killed I don't think
5645 * there is much to do about the whole server stopping for power problems
5646 * or alike */
5647 nwritten = write(server.appendfd,buf,sdslen(buf));
5648 if (nwritten != (signed)sdslen(buf)) {
5649 /* Ooops, we are in troubles. The best thing to do for now is
5650 * to simply exit instead to give the illusion that everything is
5651 * working as expected. */
5652 if (nwritten == -1) {
5653 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5654 } else {
5655 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5656 }
5657 exit(1);
5658 }
5659 now = time(NULL);
5660 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5661 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5662 now-server.lastfsync > 1))
5663 {
5664 fsync(server.appendfd); /* Let's try to get this data on the disk */
5665 server.lastfsync = now;
5666 }
5667 }
5668
5669 /* In Redis commands are always executed in the context of a client, so in
5670 * order to load the append only file we need to create a fake client. */
5671 static struct redisClient *createFakeClient(void) {
5672 struct redisClient *c = zmalloc(sizeof(*c));
5673
5674 selectDb(c,0);
5675 c->fd = -1;
5676 c->querybuf = sdsempty();
5677 c->argc = 0;
5678 c->argv = NULL;
5679 c->flags = 0;
5680 /* We set the fake client as a slave waiting for the synchronization
5681 * so that Redis will not try to send replies to this client. */
5682 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5683 c->reply = listCreate();
5684 listSetFreeMethod(c->reply,decrRefCount);
5685 listSetDupMethod(c->reply,dupClientReplyValue);
5686 return c;
5687 }
5688
5689 static void freeFakeClient(struct redisClient *c) {
5690 sdsfree(c->querybuf);
5691 listRelease(c->reply);
5692 zfree(c);
5693 }
5694
5695 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5696 * error (the append only file is zero-length) REDIS_ERR is returned. On
5697 * fatal error an error message is logged and the program exists. */
5698 int loadAppendOnlyFile(char *filename) {
5699 struct redisClient *fakeClient;
5700 FILE *fp = fopen(filename,"r");
5701 struct redis_stat sb;
5702
5703 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5704 return REDIS_ERR;
5705
5706 if (fp == NULL) {
5707 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5708 exit(1);
5709 }
5710
5711 fakeClient = createFakeClient();
5712 while(1) {
5713 int argc, j;
5714 unsigned long len;
5715 robj **argv;
5716 char buf[128];
5717 sds argsds;
5718 struct redisCommand *cmd;
5719
5720 if (fgets(buf,sizeof(buf),fp) == NULL) {
5721 if (feof(fp))
5722 break;
5723 else
5724 goto readerr;
5725 }
5726 if (buf[0] != '*') goto fmterr;
5727 argc = atoi(buf+1);
5728 argv = zmalloc(sizeof(robj*)*argc);
5729 for (j = 0; j < argc; j++) {
5730 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5731 if (buf[0] != '$') goto fmterr;
5732 len = strtol(buf+1,NULL,10);
5733 argsds = sdsnewlen(NULL,len);
5734 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5735 argv[j] = createObject(REDIS_STRING,argsds);
5736 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5737 }
5738
5739 /* Command lookup */
5740 cmd = lookupCommand(argv[0]->ptr);
5741 if (!cmd) {
5742 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5743 exit(1);
5744 }
5745 /* Try object sharing and encoding */
5746 if (server.shareobjects) {
5747 int j;
5748 for(j = 1; j < argc; j++)
5749 argv[j] = tryObjectSharing(argv[j]);
5750 }
5751 if (cmd->flags & REDIS_CMD_BULK)
5752 tryObjectEncoding(argv[argc-1]);
5753 /* Run the command in the context of a fake client */
5754 fakeClient->argc = argc;
5755 fakeClient->argv = argv;
5756 cmd->proc(fakeClient);
5757 /* Discard the reply objects list from the fake client */
5758 while(listLength(fakeClient->reply))
5759 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5760 /* Clean up, ready for the next command */
5761 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5762 zfree(argv);
5763 }
5764 fclose(fp);
5765 freeFakeClient(fakeClient);
5766 return REDIS_OK;
5767
5768 readerr:
5769 if (feof(fp)) {
5770 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5771 } else {
5772 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5773 }
5774 exit(1);
5775 fmterr:
5776 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5777 exit(1);
5778 }
5779
5780 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5781 static int fwriteBulk(FILE *fp, robj *obj) {
5782 char buf[128];
5783 obj = getDecodedObject(obj);
5784 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5785 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5786 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5787 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5788 decrRefCount(obj);
5789 return 1;
5790 err:
5791 decrRefCount(obj);
5792 return 0;
5793 }
5794
5795 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5796 static int fwriteBulkDouble(FILE *fp, double d) {
5797 char buf[128], dbuf[128];
5798
5799 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5800 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5801 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5802 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5803 return 1;
5804 }
5805
5806 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5807 static int fwriteBulkLong(FILE *fp, long l) {
5808 char buf[128], lbuf[128];
5809
5810 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5811 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5812 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5813 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5814 return 1;
5815 }
5816
5817 /* Write a sequence of commands able to fully rebuild the dataset into
5818 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5819 static int rewriteAppendOnlyFile(char *filename) {
5820 dictIterator *di = NULL;
5821 dictEntry *de;
5822 FILE *fp;
5823 char tmpfile[256];
5824 int j;
5825 time_t now = time(NULL);
5826
5827 /* Note that we have to use a different temp name here compared to the
5828 * one used by rewriteAppendOnlyFileBackground() function. */
5829 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5830 fp = fopen(tmpfile,"w");
5831 if (!fp) {
5832 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5833 return REDIS_ERR;
5834 }
5835 for (j = 0; j < server.dbnum; j++) {
5836 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5837 redisDb *db = server.db+j;
5838 dict *d = db->dict;
5839 if (dictSize(d) == 0) continue;
5840 di = dictGetIterator(d);
5841 if (!di) {
5842 fclose(fp);
5843 return REDIS_ERR;
5844 }
5845
5846 /* SELECT the new DB */
5847 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5848 if (fwriteBulkLong(fp,j+1) == 0) goto werr;
5849
5850 /* Iterate this DB writing every entry */
5851 while((de = dictNext(di)) != NULL) {
5852 robj *key = dictGetEntryKey(de);
5853 robj *o = dictGetEntryVal(de);
5854 time_t expiretime = getExpire(db,key);
5855
5856 /* Save the key and associated value */
5857 if (rdbSaveStringObject(fp,key) == -1) goto werr;
5858 if (o->type == REDIS_STRING) {
5859 /* Emit a SET command */
5860 char cmd[]="*3\r\n$3\r\nSET\r\n";
5861 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5862 /* Key and value */
5863 if (fwriteBulk(fp,key) == 0) goto werr;
5864 if (fwriteBulk(fp,o) == 0) goto werr;
5865 } else if (o->type == REDIS_LIST) {
5866 /* Emit the RPUSHes needed to rebuild the list */
5867 list *list = o->ptr;
5868 listNode *ln;
5869
5870 listRewind(list);
5871 while((ln = listYield(list))) {
5872 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5873 robj *eleobj = listNodeValue(ln);
5874
5875 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5876 if (fwriteBulk(fp,key) == 0) goto werr;
5877 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5878 }
5879 } else if (o->type == REDIS_SET) {
5880 /* Emit the SADDs needed to rebuild the set */
5881 dict *set = o->ptr;
5882 dictIterator *di = dictGetIterator(set);
5883 dictEntry *de;
5884
5885 while((de = dictNext(di)) != NULL) {
5886 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5887 robj *eleobj = dictGetEntryKey(de);
5888
5889 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5890 if (fwriteBulk(fp,key) == 0) goto werr;
5891 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5892 }
5893 dictReleaseIterator(di);
5894 } else if (o->type == REDIS_ZSET) {
5895 /* Emit the ZADDs needed to rebuild the sorted set */
5896 zset *zs = o->ptr;
5897 dictIterator *di = dictGetIterator(zs->dict);
5898 dictEntry *de;
5899
5900 while((de = dictNext(di)) != NULL) {
5901 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5902 robj *eleobj = dictGetEntryKey(de);
5903 double *score = dictGetEntryVal(de);
5904
5905 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5906 if (fwriteBulk(fp,key) == 0) goto werr;
5907 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5908 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5909 }
5910 dictReleaseIterator(di);
5911 } else {
5912 assert(0 != 0);
5913 }
5914 /* Save the expire time */
5915 if (expiretime != -1) {
5916 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5917 /* If this key is already expired skip it */
5918 if (expiretime < now) continue;
5919 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5920 if (fwriteBulk(fp,key) == 0) goto werr;
5921 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5922 }
5923 }
5924 dictReleaseIterator(di);
5925 }
5926
5927 /* Make sure data will not remain on the OS's output buffers */
5928 fflush(fp);
5929 fsync(fileno(fp));
5930 fclose(fp);
5931
5932 /* Use RENAME to make sure the DB file is changed atomically only
5933 * if the generate DB file is ok. */
5934 if (rename(tmpfile,filename) == -1) {
5935 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
5936 unlink(tmpfile);
5937 return REDIS_ERR;
5938 }
5939 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
5940 return REDIS_OK;
5941
5942 werr:
5943 fclose(fp);
5944 unlink(tmpfile);
5945 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
5946 if (di) dictReleaseIterator(di);
5947 return REDIS_ERR;
5948 }
5949
5950 /* This is how rewriting of the append only file in background works:
5951 *
5952 * 1) The user calls BGREWRITEAOF
5953 * 2) Redis calls this function, that forks():
5954 * 2a) the child rewrite the append only file in a temp file.
5955 * 2b) the parent accumulates differences in server.bgrewritebuf.
5956 * 3) When the child finished '2a' exists.
5957 * 4) The parent will trap the exit code, if it's OK, will append the
5958 * data accumulated into server.bgrewritebuf into the temp file, and
5959 * finally will rename(2) the temp file in the actual file name.
5960 * The the new file is reopened as the new append only file. Profit!
5961 */
5962 static int rewriteAppendOnlyFileBackground(void) {
5963 pid_t childpid;
5964
5965 if (server.bgrewritechildpid != -1) return REDIS_ERR;
5966 if ((childpid = fork()) == 0) {
5967 /* Child */
5968 char tmpfile[256];
5969 close(server.fd);
5970
5971 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5972 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
5973 exit(0);
5974 } else {
5975 exit(1);
5976 }
5977 } else {
5978 /* Parent */
5979 if (childpid == -1) {
5980 redisLog(REDIS_WARNING,
5981 "Can't rewrite append only file in background: fork: %s",
5982 strerror(errno));
5983 return REDIS_ERR;
5984 }
5985 redisLog(REDIS_NOTICE,
5986 "Background append only file rewriting started by pid %d",childpid);
5987 server.bgrewritechildpid = childpid;
5988 return REDIS_OK;
5989 }
5990 return REDIS_OK; /* unreached */
5991 }
5992
5993 static void bgrewriteaofCommand(redisClient *c) {
5994 if (server.bgrewritechildpid != -1) {
5995 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
5996 return;
5997 }
5998 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
5999 addReply(c,shared.ok);
6000 } else {
6001 addReply(c,shared.err);
6002 }
6003 }
6004
6005 static void aofRemoveTempFile(pid_t childpid) {
6006 char tmpfile[256];
6007
6008 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6009 unlink(tmpfile);
6010 }
6011
6012 /* ================================= Debugging ============================== */
6013
6014 static void debugCommand(redisClient *c) {
6015 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6016 *((char*)-1) = 'x';
6017 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6018 if (rdbSave(server.dbfilename) != REDIS_OK) {
6019 addReply(c,shared.err);
6020 return;
6021 }
6022 emptyDb();
6023 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6024 addReply(c,shared.err);
6025 return;
6026 }
6027 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6028 addReply(c,shared.ok);
6029 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6030 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6031 robj *key, *val;
6032
6033 if (!de) {
6034 addReply(c,shared.nokeyerr);
6035 return;
6036 }
6037 key = dictGetEntryKey(de);
6038 val = dictGetEntryVal(de);
6039 addReplySds(c,sdscatprintf(sdsempty(),
6040 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6041 key, key->refcount, val, val->refcount, val->encoding));
6042 } else {
6043 addReplySds(c,sdsnew(
6044 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6045 }
6046 }
6047
6048 /* =================================== Main! ================================ */
6049
6050 #ifdef __linux__
6051 int linuxOvercommitMemoryValue(void) {
6052 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6053 char buf[64];
6054
6055 if (!fp) return -1;
6056 if (fgets(buf,64,fp) == NULL) {
6057 fclose(fp);
6058 return -1;
6059 }
6060 fclose(fp);
6061
6062 return atoi(buf);
6063 }
6064
6065 void linuxOvercommitMemoryWarning(void) {
6066 if (linuxOvercommitMemoryValue() == 0) {
6067 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6068 }
6069 }
6070 #endif /* __linux__ */
6071
6072 static void daemonize(void) {
6073 int fd;
6074 FILE *fp;
6075
6076 if (fork() != 0) exit(0); /* parent exits */
6077 setsid(); /* create a new session */
6078
6079 /* Every output goes to /dev/null. If Redis is daemonized but
6080 * the 'logfile' is set to 'stdout' in the configuration file
6081 * it will not log at all. */
6082 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6083 dup2(fd, STDIN_FILENO);
6084 dup2(fd, STDOUT_FILENO);
6085 dup2(fd, STDERR_FILENO);
6086 if (fd > STDERR_FILENO) close(fd);
6087 }
6088 /* Try to write the pid file */
6089 fp = fopen(server.pidfile,"w");
6090 if (fp) {
6091 fprintf(fp,"%d\n",getpid());
6092 fclose(fp);
6093 }
6094 }
6095
6096 int main(int argc, char **argv) {
6097 initServerConfig();
6098 if (argc == 2) {
6099 resetServerSaveParams();
6100 loadServerConfig(argv[1]);
6101 } else if (argc > 2) {
6102 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6103 exit(1);
6104 } else {
6105 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6106 }
6107 initServer();
6108 if (server.daemonize) daemonize();
6109 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6110 #ifdef __linux__
6111 linuxOvercommitMemoryWarning();
6112 #endif
6113 if (server.appendonly) {
6114 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6115 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6116 } else {
6117 if (rdbLoad(server.dbfilename) == REDIS_OK)
6118 redisLog(REDIS_NOTICE,"DB loaded from disk");
6119 }
6120 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6121 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6122 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6123 aeMain(server.el);
6124 aeDeleteEventLoop(server.el);
6125 return 0;
6126 }
6127
6128 /* ============================= Backtrace support ========================= */
6129
6130 #ifdef HAVE_BACKTRACE
6131 static char *findFuncName(void *pointer, unsigned long *offset);
6132
6133 static void *getMcontextEip(ucontext_t *uc) {
6134 #if defined(__FreeBSD__)
6135 return (void*) uc->uc_mcontext.mc_eip;
6136 #elif defined(__dietlibc__)
6137 return (void*) uc->uc_mcontext.eip;
6138 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6139 return (void*) uc->uc_mcontext->__ss.__eip;
6140 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6141 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6142 return (void*) uc->uc_mcontext->__ss.__rip;
6143 #else
6144 return (void*) uc->uc_mcontext->__ss.__eip;
6145 #endif
6146 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6147 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6148 #elif defined(__ia64__) /* Linux IA64 */
6149 return (void*) uc->uc_mcontext.sc_ip;
6150 #else
6151 return NULL;
6152 #endif
6153 }
6154
6155 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6156 void *trace[100];
6157 char **messages = NULL;
6158 int i, trace_size = 0;
6159 unsigned long offset=0;
6160 time_t uptime = time(NULL)-server.stat_starttime;
6161 ucontext_t *uc = (ucontext_t*) secret;
6162 REDIS_NOTUSED(info);
6163
6164 redisLog(REDIS_WARNING,
6165 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6166 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
6167 "redis_version:%s; "
6168 "uptime_in_seconds:%d; "
6169 "connected_clients:%d; "
6170 "connected_slaves:%d; "
6171 "used_memory:%zu; "
6172 "changes_since_last_save:%lld; "
6173 "bgsave_in_progress:%d; "
6174 "last_save_time:%d; "
6175 "total_connections_received:%lld; "
6176 "total_commands_processed:%lld; "
6177 "role:%s;"
6178 ,REDIS_VERSION,
6179 uptime,
6180 listLength(server.clients)-listLength(server.slaves),
6181 listLength(server.slaves),
6182 server.usedmemory,
6183 server.dirty,
6184 server.bgsavechildpid != -1,
6185 server.lastsave,
6186 server.stat_numconnections,
6187 server.stat_numcommands,
6188 server.masterhost == NULL ? "master" : "slave"
6189 ));
6190
6191 trace_size = backtrace(trace, 100);
6192 /* overwrite sigaction with caller's address */
6193 if (getMcontextEip(uc) != NULL) {
6194 trace[1] = getMcontextEip(uc);
6195 }
6196 messages = backtrace_symbols(trace, trace_size);
6197
6198 for (i=1; i<trace_size; ++i) {
6199 char *fn = findFuncName(trace[i], &offset), *p;
6200
6201 p = strchr(messages[i],'+');
6202 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6203 redisLog(REDIS_WARNING,"%s", messages[i]);
6204 } else {
6205 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6206 }
6207 }
6208 free(messages);
6209 exit(0);
6210 }
6211
6212 static void setupSigSegvAction(void) {
6213 struct sigaction act;
6214
6215 sigemptyset (&act.sa_mask);
6216 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6217 * is used. Otherwise, sa_handler is used */
6218 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6219 act.sa_sigaction = segvHandler;
6220 sigaction (SIGSEGV, &act, NULL);
6221 sigaction (SIGBUS, &act, NULL);
6222 sigaction (SIGFPE, &act, NULL);
6223 sigaction (SIGILL, &act, NULL);
6224 sigaction (SIGBUS, &act, NULL);
6225 return;
6226 }
6227
6228 #include "staticsymbols.h"
6229 /* This function try to convert a pointer into a function name. It's used in
6230 * oreder to provide a backtrace under segmentation fault that's able to
6231 * display functions declared as static (otherwise the backtrace is useless). */
6232 static char *findFuncName(void *pointer, unsigned long *offset){
6233 int i, ret = -1;
6234 unsigned long off, minoff = 0;
6235
6236 /* Try to match against the Symbol with the smallest offset */
6237 for (i=0; symsTable[i].pointer; i++) {
6238 unsigned long lp = (unsigned long) pointer;
6239
6240 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6241 off=lp-symsTable[i].pointer;
6242 if (ret < 0 || off < minoff) {
6243 minoff=off;
6244 ret=i;
6245 }
6246 }
6247 }
6248 if (ret == -1) return NULL;
6249 *offset = minoff;
6250 return symsTable[ret].name;
6251 }
6252 #else /* HAVE_BACKTRACE */
6253 static void setupSigSegvAction(void) {
6254 }
6255 #endif /* HAVE_BACKTRACE */
6256
6257
6258
6259 /* The End */
6260
6261
6262