]> git.saurik.com Git - redis.git/blob - redis.c
8b771c029c6c33df534b3767a987112c8065dfa2
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.0"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
162
163 /* Slave replication state - slave side */
164 #define REDIS_REPL_NONE 0 /* No active replication */
165 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
166 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
167
168 /* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
176
177 /* List related stuff */
178 #define REDIS_HEAD 0
179 #define REDIS_TAIL 1
180
181 /* Sort operations */
182 #define REDIS_SORT_GET 0
183 #define REDIS_SORT_ASC 1
184 #define REDIS_SORT_DESC 2
185 #define REDIS_SORTKEY_MAX 1024
186
187 /* Log levels */
188 #define REDIS_DEBUG 0
189 #define REDIS_NOTICE 1
190 #define REDIS_WARNING 2
191
192 /* Anti-warning macro... */
193 #define REDIS_NOTUSED(V) ((void) V)
194
195 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
197
198 /* Append only defines */
199 #define APPENDFSYNC_NO 0
200 #define APPENDFSYNC_ALWAYS 1
201 #define APPENDFSYNC_EVERYSEC 2
202
203 /* We can print the stacktrace, so our assert is defined this way: */
204 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205 static void _redisAssert(char *estr);
206
207 /*================================= Data types ============================== */
208
209 /* A redis object, that is a type able to hold a string / list / set */
210 typedef struct redisObject {
211 void *ptr;
212 unsigned char type;
213 unsigned char encoding;
214 unsigned char notused[2];
215 int refcount;
216 } robj;
217
218 /* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222 #define initStaticStringObject(_var,_ptr) do { \
223 _var.refcount = 1; \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
226 _var.ptr = _ptr; \
227 } while(0);
228
229 typedef struct redisDb {
230 dict *dict; /* The keyspace for this DB */
231 dict *expires; /* Timeout of keys with a timeout set */
232 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
233 int id;
234 } redisDb;
235
236 /* Client MULTI/EXEC state */
237 typedef struct multiCmd {
238 robj **argv;
239 int argc;
240 struct redisCommand *cmd;
241 } multiCmd;
242
243 typedef struct multiState {
244 multiCmd *commands; /* Array of MULTI commands */
245 int count; /* Total number of MULTI commands */
246 } multiState;
247
248 /* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250 typedef struct redisClient {
251 int fd;
252 redisDb *db;
253 int dictid;
254 sds querybuf;
255 robj **argv, **mbargv;
256 int argc, mbargc;
257 int bulklen; /* bulk read len. -1 if not in bulk read mode */
258 int multibulk; /* multi bulk command format active */
259 list *reply;
260 int sentlen;
261 time_t lastinteraction; /* time of the last interaction, used for timeout */
262 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
263 /* REDIS_MULTI */
264 int slaveseldb; /* slave selected db, if this client is a slave */
265 int authenticated; /* when requirepass is non-NULL */
266 int replstate; /* replication state if this is a slave */
267 int repldbfd; /* replication DB file descriptor */
268 long repldboff; /* replication DB file offset */
269 off_t repldbsize; /* replication DB file size */
270 multiState mstate; /* MULTI/EXEC state */
271 robj *blockingkey; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 time_t blockingto; /* Blocking operation timeout. If UNIX current time
274 * is >= blockingto then the operation timed out. */
275 } redisClient;
276
277 struct saveparam {
278 time_t seconds;
279 int changes;
280 };
281
282 /* Global server state structure */
283 struct redisServer {
284 int port;
285 int fd;
286 redisDb *db;
287 dict *sharingpool; /* Poll used for object sharing */
288 unsigned int sharingpoolsize;
289 long long dirty; /* changes to DB from the last save */
290 list *clients;
291 list *slaves, *monitors;
292 char neterr[ANET_ERR_LEN];
293 aeEventLoop *el;
294 int cronloops; /* number of times the cron function run */
295 list *objfreelist; /* A list of freed objects to avoid malloc() */
296 time_t lastsave; /* Unix time of last save succeeede */
297 size_t usedmemory; /* Used memory in megabytes */
298 /* Fields used only for stats */
299 time_t stat_starttime; /* server start time */
300 long long stat_numcommands; /* number of processed commands */
301 long long stat_numconnections; /* number of connections received */
302 /* Configuration */
303 int verbosity;
304 int glueoutputbuf;
305 int maxidletime;
306 int dbnum;
307 int daemonize;
308 int appendonly;
309 int appendfsync;
310 time_t lastfsync;
311 int appendfd;
312 int appendseldb;
313 char *pidfile;
314 pid_t bgsavechildpid;
315 pid_t bgrewritechildpid;
316 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
317 struct saveparam *saveparams;
318 int saveparamslen;
319 char *logfile;
320 char *bindaddr;
321 char *dbfilename;
322 char *appendfilename;
323 char *requirepass;
324 int shareobjects;
325 int rdbcompression;
326 /* Replication related */
327 int isslave;
328 char *masterauth;
329 char *masterhost;
330 int masterport;
331 redisClient *master; /* client that is master for this slave */
332 int replstate;
333 unsigned int maxclients;
334 unsigned long maxmemory;
335 unsigned int blockedclients;
336 /* Sort parameters - qsort_r() is only available under BSD so we
337 * have to take this state global, in order to pass it to sortCompare() */
338 int sort_desc;
339 int sort_alpha;
340 int sort_bypattern;
341 };
342
343 typedef void redisCommandProc(redisClient *c);
344 struct redisCommand {
345 char *name;
346 redisCommandProc *proc;
347 int arity;
348 int flags;
349 };
350
351 struct redisFunctionSym {
352 char *name;
353 unsigned long pointer;
354 };
355
356 typedef struct _redisSortObject {
357 robj *obj;
358 union {
359 double score;
360 robj *cmpobj;
361 } u;
362 } redisSortObject;
363
364 typedef struct _redisSortOperation {
365 int type;
366 robj *pattern;
367 } redisSortOperation;
368
369 /* ZSETs use a specialized version of Skiplists */
370
371 typedef struct zskiplistNode {
372 struct zskiplistNode **forward;
373 struct zskiplistNode *backward;
374 double score;
375 robj *obj;
376 } zskiplistNode;
377
378 typedef struct zskiplist {
379 struct zskiplistNode *header, *tail;
380 unsigned long length;
381 int level;
382 } zskiplist;
383
384 typedef struct zset {
385 dict *dict;
386 zskiplist *zsl;
387 } zset;
388
389 /* Our shared "common" objects */
390
391 struct sharedObjectsStruct {
392 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
393 *colon, *nullbulk, *nullmultibulk, *queued,
394 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
395 *outofrangeerr, *plus,
396 *select0, *select1, *select2, *select3, *select4,
397 *select5, *select6, *select7, *select8, *select9;
398 } shared;
399
400 /* Global vars that are actally used as constants. The following double
401 * values are used for double on-disk serialization, and are initialized
402 * at runtime to avoid strange compiler optimizations. */
403
404 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
405
406 /*================================ Prototypes =============================== */
407
408 static void freeStringObject(robj *o);
409 static void freeListObject(robj *o);
410 static void freeSetObject(robj *o);
411 static void decrRefCount(void *o);
412 static robj *createObject(int type, void *ptr);
413 static void freeClient(redisClient *c);
414 static int rdbLoad(char *filename);
415 static void addReply(redisClient *c, robj *obj);
416 static void addReplySds(redisClient *c, sds s);
417 static void incrRefCount(robj *o);
418 static int rdbSaveBackground(char *filename);
419 static robj *createStringObject(char *ptr, size_t len);
420 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
421 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
422 static int syncWithMaster(void);
423 static robj *tryObjectSharing(robj *o);
424 static int tryObjectEncoding(robj *o);
425 static robj *getDecodedObject(robj *o);
426 static int removeExpire(redisDb *db, robj *key);
427 static int expireIfNeeded(redisDb *db, robj *key);
428 static int deleteIfVolatile(redisDb *db, robj *key);
429 static int deleteKey(redisDb *db, robj *key);
430 static time_t getExpire(redisDb *db, robj *key);
431 static int setExpire(redisDb *db, robj *key, time_t when);
432 static void updateSlavesWaitingBgsave(int bgsaveerr);
433 static void freeMemoryIfNeeded(void);
434 static int processCommand(redisClient *c);
435 static void setupSigSegvAction(void);
436 static void rdbRemoveTempFile(pid_t childpid);
437 static void aofRemoveTempFile(pid_t childpid);
438 static size_t stringObjectLen(robj *o);
439 static void processInputBuffer(redisClient *c);
440 static zskiplist *zslCreate(void);
441 static void zslFree(zskiplist *zsl);
442 static void zslInsert(zskiplist *zsl, double score, robj *obj);
443 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
444 static void initClientMultiState(redisClient *c);
445 static void freeClientMultiState(redisClient *c);
446 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
447 static void unblockClient(redisClient *c);
448 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
449
450 static void authCommand(redisClient *c);
451 static void pingCommand(redisClient *c);
452 static void echoCommand(redisClient *c);
453 static void setCommand(redisClient *c);
454 static void setnxCommand(redisClient *c);
455 static void getCommand(redisClient *c);
456 static void delCommand(redisClient *c);
457 static void existsCommand(redisClient *c);
458 static void incrCommand(redisClient *c);
459 static void decrCommand(redisClient *c);
460 static void incrbyCommand(redisClient *c);
461 static void decrbyCommand(redisClient *c);
462 static void selectCommand(redisClient *c);
463 static void randomkeyCommand(redisClient *c);
464 static void keysCommand(redisClient *c);
465 static void dbsizeCommand(redisClient *c);
466 static void lastsaveCommand(redisClient *c);
467 static void saveCommand(redisClient *c);
468 static void bgsaveCommand(redisClient *c);
469 static void bgrewriteaofCommand(redisClient *c);
470 static void shutdownCommand(redisClient *c);
471 static void moveCommand(redisClient *c);
472 static void renameCommand(redisClient *c);
473 static void renamenxCommand(redisClient *c);
474 static void lpushCommand(redisClient *c);
475 static void rpushCommand(redisClient *c);
476 static void lpopCommand(redisClient *c);
477 static void rpopCommand(redisClient *c);
478 static void llenCommand(redisClient *c);
479 static void lindexCommand(redisClient *c);
480 static void lrangeCommand(redisClient *c);
481 static void ltrimCommand(redisClient *c);
482 static void typeCommand(redisClient *c);
483 static void lsetCommand(redisClient *c);
484 static void saddCommand(redisClient *c);
485 static void sremCommand(redisClient *c);
486 static void smoveCommand(redisClient *c);
487 static void sismemberCommand(redisClient *c);
488 static void scardCommand(redisClient *c);
489 static void spopCommand(redisClient *c);
490 static void srandmemberCommand(redisClient *c);
491 static void sinterCommand(redisClient *c);
492 static void sinterstoreCommand(redisClient *c);
493 static void sunionCommand(redisClient *c);
494 static void sunionstoreCommand(redisClient *c);
495 static void sdiffCommand(redisClient *c);
496 static void sdiffstoreCommand(redisClient *c);
497 static void syncCommand(redisClient *c);
498 static void flushdbCommand(redisClient *c);
499 static void flushallCommand(redisClient *c);
500 static void sortCommand(redisClient *c);
501 static void lremCommand(redisClient *c);
502 static void rpoplpushcommand(redisClient *c);
503 static void infoCommand(redisClient *c);
504 static void mgetCommand(redisClient *c);
505 static void monitorCommand(redisClient *c);
506 static void expireCommand(redisClient *c);
507 static void expireatCommand(redisClient *c);
508 static void getsetCommand(redisClient *c);
509 static void ttlCommand(redisClient *c);
510 static void slaveofCommand(redisClient *c);
511 static void debugCommand(redisClient *c);
512 static void msetCommand(redisClient *c);
513 static void msetnxCommand(redisClient *c);
514 static void zaddCommand(redisClient *c);
515 static void zincrbyCommand(redisClient *c);
516 static void zrangeCommand(redisClient *c);
517 static void zrangebyscoreCommand(redisClient *c);
518 static void zrevrangeCommand(redisClient *c);
519 static void zcardCommand(redisClient *c);
520 static void zremCommand(redisClient *c);
521 static void zscoreCommand(redisClient *c);
522 static void zremrangebyscoreCommand(redisClient *c);
523 static void multiCommand(redisClient *c);
524 static void execCommand(redisClient *c);
525 static void blpopCommand(redisClient *c);
526 static void brpopCommand(redisClient *c);
527
528 /*================================= Globals ================================= */
529
530 /* Global vars */
531 static struct redisServer server; /* server global state */
532 static struct redisCommand cmdTable[] = {
533 {"get",getCommand,2,REDIS_CMD_INLINE},
534 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
535 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"del",delCommand,-2,REDIS_CMD_INLINE},
537 {"exists",existsCommand,2,REDIS_CMD_INLINE},
538 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
539 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
541 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
542 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
544 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
545 {"brpop",brpopCommand,3,REDIS_CMD_INLINE},
546 {"blpop",blpopCommand,3,REDIS_CMD_INLINE},
547 {"llen",llenCommand,2,REDIS_CMD_INLINE},
548 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
549 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
550 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
551 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
552 {"lrem",lremCommand,4,REDIS_CMD_BULK},
553 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
554 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
555 {"srem",sremCommand,3,REDIS_CMD_BULK},
556 {"smove",smoveCommand,4,REDIS_CMD_BULK},
557 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
558 {"scard",scardCommand,2,REDIS_CMD_INLINE},
559 {"spop",spopCommand,2,REDIS_CMD_INLINE},
560 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
561 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
562 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
563 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
564 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
567 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
568 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
569 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
570 {"zrem",zremCommand,3,REDIS_CMD_BULK},
571 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
572 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
573 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
574 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
575 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
576 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
577 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
578 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
579 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
582 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
583 {"select",selectCommand,2,REDIS_CMD_INLINE},
584 {"move",moveCommand,3,REDIS_CMD_INLINE},
585 {"rename",renameCommand,3,REDIS_CMD_INLINE},
586 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
587 {"expire",expireCommand,3,REDIS_CMD_INLINE},
588 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
589 {"keys",keysCommand,2,REDIS_CMD_INLINE},
590 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
591 {"auth",authCommand,2,REDIS_CMD_INLINE},
592 {"ping",pingCommand,1,REDIS_CMD_INLINE},
593 {"echo",echoCommand,2,REDIS_CMD_BULK},
594 {"save",saveCommand,1,REDIS_CMD_INLINE},
595 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
596 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
597 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
598 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
599 {"type",typeCommand,2,REDIS_CMD_INLINE},
600 {"multi",multiCommand,1,REDIS_CMD_INLINE},
601 {"exec",execCommand,1,REDIS_CMD_INLINE},
602 {"sync",syncCommand,1,REDIS_CMD_INLINE},
603 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
604 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
605 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
606 {"info",infoCommand,1,REDIS_CMD_INLINE},
607 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
608 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
609 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
610 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
611 {NULL,NULL,0,0}
612 };
613
614 /*============================ Utility functions ============================ */
615
616 /* Glob-style pattern matching. */
617 int stringmatchlen(const char *pattern, int patternLen,
618 const char *string, int stringLen, int nocase)
619 {
620 while(patternLen) {
621 switch(pattern[0]) {
622 case '*':
623 while (pattern[1] == '*') {
624 pattern++;
625 patternLen--;
626 }
627 if (patternLen == 1)
628 return 1; /* match */
629 while(stringLen) {
630 if (stringmatchlen(pattern+1, patternLen-1,
631 string, stringLen, nocase))
632 return 1; /* match */
633 string++;
634 stringLen--;
635 }
636 return 0; /* no match */
637 break;
638 case '?':
639 if (stringLen == 0)
640 return 0; /* no match */
641 string++;
642 stringLen--;
643 break;
644 case '[':
645 {
646 int not, match;
647
648 pattern++;
649 patternLen--;
650 not = pattern[0] == '^';
651 if (not) {
652 pattern++;
653 patternLen--;
654 }
655 match = 0;
656 while(1) {
657 if (pattern[0] == '\\') {
658 pattern++;
659 patternLen--;
660 if (pattern[0] == string[0])
661 match = 1;
662 } else if (pattern[0] == ']') {
663 break;
664 } else if (patternLen == 0) {
665 pattern--;
666 patternLen++;
667 break;
668 } else if (pattern[1] == '-' && patternLen >= 3) {
669 int start = pattern[0];
670 int end = pattern[2];
671 int c = string[0];
672 if (start > end) {
673 int t = start;
674 start = end;
675 end = t;
676 }
677 if (nocase) {
678 start = tolower(start);
679 end = tolower(end);
680 c = tolower(c);
681 }
682 pattern += 2;
683 patternLen -= 2;
684 if (c >= start && c <= end)
685 match = 1;
686 } else {
687 if (!nocase) {
688 if (pattern[0] == string[0])
689 match = 1;
690 } else {
691 if (tolower((int)pattern[0]) == tolower((int)string[0]))
692 match = 1;
693 }
694 }
695 pattern++;
696 patternLen--;
697 }
698 if (not)
699 match = !match;
700 if (!match)
701 return 0; /* no match */
702 string++;
703 stringLen--;
704 break;
705 }
706 case '\\':
707 if (patternLen >= 2) {
708 pattern++;
709 patternLen--;
710 }
711 /* fall through */
712 default:
713 if (!nocase) {
714 if (pattern[0] != string[0])
715 return 0; /* no match */
716 } else {
717 if (tolower((int)pattern[0]) != tolower((int)string[0]))
718 return 0; /* no match */
719 }
720 string++;
721 stringLen--;
722 break;
723 }
724 pattern++;
725 patternLen--;
726 if (stringLen == 0) {
727 while(*pattern == '*') {
728 pattern++;
729 patternLen--;
730 }
731 break;
732 }
733 }
734 if (patternLen == 0 && stringLen == 0)
735 return 1;
736 return 0;
737 }
738
739 static void redisLog(int level, const char *fmt, ...) {
740 va_list ap;
741 FILE *fp;
742
743 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
744 if (!fp) return;
745
746 va_start(ap, fmt);
747 if (level >= server.verbosity) {
748 char *c = ".-*";
749 char buf[64];
750 time_t now;
751
752 now = time(NULL);
753 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
754 fprintf(fp,"%s %c ",buf,c[level]);
755 vfprintf(fp, fmt, ap);
756 fprintf(fp,"\n");
757 fflush(fp);
758 }
759 va_end(ap);
760
761 if (server.logfile) fclose(fp);
762 }
763
764 /*====================== Hash table type implementation ==================== */
765
766 /* This is an hash table type that uses the SDS dynamic strings libary as
767 * keys and radis objects as values (objects can hold SDS strings,
768 * lists, sets). */
769
770 static void dictVanillaFree(void *privdata, void *val)
771 {
772 DICT_NOTUSED(privdata);
773 zfree(val);
774 }
775
776 static void dictListDestructor(void *privdata, void *val)
777 {
778 DICT_NOTUSED(privdata);
779 listRelease((list*)val);
780 }
781
782 static int sdsDictKeyCompare(void *privdata, const void *key1,
783 const void *key2)
784 {
785 int l1,l2;
786 DICT_NOTUSED(privdata);
787
788 l1 = sdslen((sds)key1);
789 l2 = sdslen((sds)key2);
790 if (l1 != l2) return 0;
791 return memcmp(key1, key2, l1) == 0;
792 }
793
794 static void dictRedisObjectDestructor(void *privdata, void *val)
795 {
796 DICT_NOTUSED(privdata);
797
798 decrRefCount(val);
799 }
800
801 static int dictObjKeyCompare(void *privdata, const void *key1,
802 const void *key2)
803 {
804 const robj *o1 = key1, *o2 = key2;
805 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
806 }
807
808 static unsigned int dictObjHash(const void *key) {
809 const robj *o = key;
810 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
811 }
812
813 static int dictEncObjKeyCompare(void *privdata, const void *key1,
814 const void *key2)
815 {
816 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
817 int cmp;
818
819 o1 = getDecodedObject(o1);
820 o2 = getDecodedObject(o2);
821 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
822 decrRefCount(o1);
823 decrRefCount(o2);
824 return cmp;
825 }
826
827 static unsigned int dictEncObjHash(const void *key) {
828 robj *o = (robj*) key;
829
830 o = getDecodedObject(o);
831 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
832 decrRefCount(o);
833 return hash;
834 }
835
836 static dictType setDictType = {
837 dictEncObjHash, /* hash function */
838 NULL, /* key dup */
839 NULL, /* val dup */
840 dictEncObjKeyCompare, /* key compare */
841 dictRedisObjectDestructor, /* key destructor */
842 NULL /* val destructor */
843 };
844
845 static dictType zsetDictType = {
846 dictEncObjHash, /* hash function */
847 NULL, /* key dup */
848 NULL, /* val dup */
849 dictEncObjKeyCompare, /* key compare */
850 dictRedisObjectDestructor, /* key destructor */
851 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
852 };
853
854 static dictType hashDictType = {
855 dictObjHash, /* hash function */
856 NULL, /* key dup */
857 NULL, /* val dup */
858 dictObjKeyCompare, /* key compare */
859 dictRedisObjectDestructor, /* key destructor */
860 dictRedisObjectDestructor /* val destructor */
861 };
862
863 /* Keylist hash table type has unencoded redis objects as keys and
864 * lists as values. It's used for blocking operations (BLPOP) */
865 static dictType keylistDictType = {
866 dictObjHash, /* hash function */
867 NULL, /* key dup */
868 NULL, /* val dup */
869 dictObjKeyCompare, /* key compare */
870 dictRedisObjectDestructor, /* key destructor */
871 dictListDestructor /* val destructor */
872 };
873
874 /* ========================= Random utility functions ======================= */
875
876 /* Redis generally does not try to recover from out of memory conditions
877 * when allocating objects or strings, it is not clear if it will be possible
878 * to report this condition to the client since the networking layer itself
879 * is based on heap allocation for send buffers, so we simply abort.
880 * At least the code will be simpler to read... */
881 static void oom(const char *msg) {
882 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
883 sleep(1);
884 abort();
885 }
886
887 /* ====================== Redis server networking stuff ===================== */
888 static void closeTimedoutClients(void) {
889 redisClient *c;
890 listNode *ln;
891 time_t now = time(NULL);
892
893 listRewind(server.clients);
894 while ((ln = listYield(server.clients)) != NULL) {
895 c = listNodeValue(ln);
896 if (server.maxidletime &&
897 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
898 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
899 (now - c->lastinteraction > server.maxidletime))
900 {
901 redisLog(REDIS_DEBUG,"Closing idle client");
902 freeClient(c);
903 } else if (c->flags & REDIS_BLOCKED) {
904 if (c->blockingto != 0 && c->blockingto < now) {
905 addReply(c,shared.nullbulk);
906 unblockClient(c);
907 }
908 }
909 }
910 }
911
912 static int htNeedsResize(dict *dict) {
913 long long size, used;
914
915 size = dictSlots(dict);
916 used = dictSize(dict);
917 return (size && used && size > DICT_HT_INITIAL_SIZE &&
918 (used*100/size < REDIS_HT_MINFILL));
919 }
920
921 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
922 * we resize the hash table to save memory */
923 static void tryResizeHashTables(void) {
924 int j;
925
926 for (j = 0; j < server.dbnum; j++) {
927 if (htNeedsResize(server.db[j].dict)) {
928 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
929 dictResize(server.db[j].dict);
930 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
931 }
932 if (htNeedsResize(server.db[j].expires))
933 dictResize(server.db[j].expires);
934 }
935 }
936
937 /* A background saving child (BGSAVE) terminated its work. Handle this. */
938 void backgroundSaveDoneHandler(int statloc) {
939 int exitcode = WEXITSTATUS(statloc);
940 int bysignal = WIFSIGNALED(statloc);
941
942 if (!bysignal && exitcode == 0) {
943 redisLog(REDIS_NOTICE,
944 "Background saving terminated with success");
945 server.dirty = 0;
946 server.lastsave = time(NULL);
947 } else if (!bysignal && exitcode != 0) {
948 redisLog(REDIS_WARNING, "Background saving error");
949 } else {
950 redisLog(REDIS_WARNING,
951 "Background saving terminated by signal");
952 rdbRemoveTempFile(server.bgsavechildpid);
953 }
954 server.bgsavechildpid = -1;
955 /* Possibly there are slaves waiting for a BGSAVE in order to be served
956 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
957 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
958 }
959
960 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
961 * Handle this. */
962 void backgroundRewriteDoneHandler(int statloc) {
963 int exitcode = WEXITSTATUS(statloc);
964 int bysignal = WIFSIGNALED(statloc);
965
966 if (!bysignal && exitcode == 0) {
967 int fd;
968 char tmpfile[256];
969
970 redisLog(REDIS_NOTICE,
971 "Background append only file rewriting terminated with success");
972 /* Now it's time to flush the differences accumulated by the parent */
973 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
974 fd = open(tmpfile,O_WRONLY|O_APPEND);
975 if (fd == -1) {
976 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
977 goto cleanup;
978 }
979 /* Flush our data... */
980 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
981 (signed) sdslen(server.bgrewritebuf)) {
982 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
983 close(fd);
984 goto cleanup;
985 }
986 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
987 /* Now our work is to rename the temp file into the stable file. And
988 * switch the file descriptor used by the server for append only. */
989 if (rename(tmpfile,server.appendfilename) == -1) {
990 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
991 close(fd);
992 goto cleanup;
993 }
994 /* Mission completed... almost */
995 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
996 if (server.appendfd != -1) {
997 /* If append only is actually enabled... */
998 close(server.appendfd);
999 server.appendfd = fd;
1000 fsync(fd);
1001 server.appendseldb = -1; /* Make sure it will issue SELECT */
1002 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1003 } else {
1004 /* If append only is disabled we just generate a dump in this
1005 * format. Why not? */
1006 close(fd);
1007 }
1008 } else if (!bysignal && exitcode != 0) {
1009 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1010 } else {
1011 redisLog(REDIS_WARNING,
1012 "Background append only file rewriting terminated by signal");
1013 }
1014 cleanup:
1015 sdsfree(server.bgrewritebuf);
1016 server.bgrewritebuf = sdsempty();
1017 aofRemoveTempFile(server.bgrewritechildpid);
1018 server.bgrewritechildpid = -1;
1019 }
1020
1021 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1022 int j, loops = server.cronloops++;
1023 REDIS_NOTUSED(eventLoop);
1024 REDIS_NOTUSED(id);
1025 REDIS_NOTUSED(clientData);
1026
1027 /* Update the global state with the amount of used memory */
1028 server.usedmemory = zmalloc_used_memory();
1029
1030 /* Show some info about non-empty databases */
1031 for (j = 0; j < server.dbnum; j++) {
1032 long long size, used, vkeys;
1033
1034 size = dictSlots(server.db[j].dict);
1035 used = dictSize(server.db[j].dict);
1036 vkeys = dictSize(server.db[j].expires);
1037 if (!(loops % 5) && (used || vkeys)) {
1038 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1039 /* dictPrintStats(server.dict); */
1040 }
1041 }
1042
1043 /* We don't want to resize the hash tables while a bacground saving
1044 * is in progress: the saving child is created using fork() that is
1045 * implemented with a copy-on-write semantic in most modern systems, so
1046 * if we resize the HT while there is the saving child at work actually
1047 * a lot of memory movements in the parent will cause a lot of pages
1048 * copied. */
1049 if (server.bgsavechildpid == -1) tryResizeHashTables();
1050
1051 /* Show information about connected clients */
1052 if (!(loops % 5)) {
1053 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1054 listLength(server.clients)-listLength(server.slaves),
1055 listLength(server.slaves),
1056 server.usedmemory,
1057 dictSize(server.sharingpool));
1058 }
1059
1060 /* Close connections of timedout clients */
1061 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1062 closeTimedoutClients();
1063
1064 /* Check if a background saving or AOF rewrite in progress terminated */
1065 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1066 int statloc;
1067 pid_t pid;
1068
1069 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1070 if (pid == server.bgsavechildpid) {
1071 backgroundSaveDoneHandler(statloc);
1072 } else {
1073 backgroundRewriteDoneHandler(statloc);
1074 }
1075 }
1076 } else {
1077 /* If there is not a background saving in progress check if
1078 * we have to save now */
1079 time_t now = time(NULL);
1080 for (j = 0; j < server.saveparamslen; j++) {
1081 struct saveparam *sp = server.saveparams+j;
1082
1083 if (server.dirty >= sp->changes &&
1084 now-server.lastsave > sp->seconds) {
1085 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1086 sp->changes, sp->seconds);
1087 rdbSaveBackground(server.dbfilename);
1088 break;
1089 }
1090 }
1091 }
1092
1093 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1094 * will use few CPU cycles if there are few expiring keys, otherwise
1095 * it will get more aggressive to avoid that too much memory is used by
1096 * keys that can be removed from the keyspace. */
1097 for (j = 0; j < server.dbnum; j++) {
1098 int expired;
1099 redisDb *db = server.db+j;
1100
1101 /* Continue to expire if at the end of the cycle more than 25%
1102 * of the keys were expired. */
1103 do {
1104 int num = dictSize(db->expires);
1105 time_t now = time(NULL);
1106
1107 expired = 0;
1108 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1109 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1110 while (num--) {
1111 dictEntry *de;
1112 time_t t;
1113
1114 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1115 t = (time_t) dictGetEntryVal(de);
1116 if (now > t) {
1117 deleteKey(db,dictGetEntryKey(de));
1118 expired++;
1119 }
1120 }
1121 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1122 }
1123
1124 /* Check if we should connect to a MASTER */
1125 if (server.replstate == REDIS_REPL_CONNECT) {
1126 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1127 if (syncWithMaster() == REDIS_OK) {
1128 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1129 }
1130 }
1131 return 1000;
1132 }
1133
1134 static void createSharedObjects(void) {
1135 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1136 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1137 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1138 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1139 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1140 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1141 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1142 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1143 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1144 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1145 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1146 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1147 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1148 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1149 "-ERR no such key\r\n"));
1150 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1151 "-ERR syntax error\r\n"));
1152 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1153 "-ERR source and destination objects are the same\r\n"));
1154 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1155 "-ERR index out of range\r\n"));
1156 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1157 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1158 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1159 shared.select0 = createStringObject("select 0\r\n",10);
1160 shared.select1 = createStringObject("select 1\r\n",10);
1161 shared.select2 = createStringObject("select 2\r\n",10);
1162 shared.select3 = createStringObject("select 3\r\n",10);
1163 shared.select4 = createStringObject("select 4\r\n",10);
1164 shared.select5 = createStringObject("select 5\r\n",10);
1165 shared.select6 = createStringObject("select 6\r\n",10);
1166 shared.select7 = createStringObject("select 7\r\n",10);
1167 shared.select8 = createStringObject("select 8\r\n",10);
1168 shared.select9 = createStringObject("select 9\r\n",10);
1169 }
1170
1171 static void appendServerSaveParams(time_t seconds, int changes) {
1172 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1173 server.saveparams[server.saveparamslen].seconds = seconds;
1174 server.saveparams[server.saveparamslen].changes = changes;
1175 server.saveparamslen++;
1176 }
1177
1178 static void resetServerSaveParams() {
1179 zfree(server.saveparams);
1180 server.saveparams = NULL;
1181 server.saveparamslen = 0;
1182 }
1183
1184 static void initServerConfig() {
1185 server.dbnum = REDIS_DEFAULT_DBNUM;
1186 server.port = REDIS_SERVERPORT;
1187 server.verbosity = REDIS_DEBUG;
1188 server.maxidletime = REDIS_MAXIDLETIME;
1189 server.saveparams = NULL;
1190 server.logfile = NULL; /* NULL = log on standard output */
1191 server.bindaddr = NULL;
1192 server.glueoutputbuf = 1;
1193 server.daemonize = 0;
1194 server.appendonly = 0;
1195 server.appendfsync = APPENDFSYNC_ALWAYS;
1196 server.lastfsync = time(NULL);
1197 server.appendfd = -1;
1198 server.appendseldb = -1; /* Make sure the first time will not match */
1199 server.pidfile = "/var/run/redis.pid";
1200 server.dbfilename = "dump.rdb";
1201 server.appendfilename = "appendonly.aof";
1202 server.requirepass = NULL;
1203 server.shareobjects = 0;
1204 server.rdbcompression = 1;
1205 server.sharingpoolsize = 1024;
1206 server.maxclients = 0;
1207 server.blockedclients = 0;
1208 server.maxmemory = 0;
1209 resetServerSaveParams();
1210
1211 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1212 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1213 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1214 /* Replication related */
1215 server.isslave = 0;
1216 server.masterauth = NULL;
1217 server.masterhost = NULL;
1218 server.masterport = 6379;
1219 server.master = NULL;
1220 server.replstate = REDIS_REPL_NONE;
1221
1222 /* Double constants initialization */
1223 R_Zero = 0.0;
1224 R_PosInf = 1.0/R_Zero;
1225 R_NegInf = -1.0/R_Zero;
1226 R_Nan = R_Zero/R_Zero;
1227 }
1228
1229 static void initServer() {
1230 int j;
1231
1232 signal(SIGHUP, SIG_IGN);
1233 signal(SIGPIPE, SIG_IGN);
1234 setupSigSegvAction();
1235
1236 server.clients = listCreate();
1237 server.slaves = listCreate();
1238 server.monitors = listCreate();
1239 server.objfreelist = listCreate();
1240 createSharedObjects();
1241 server.el = aeCreateEventLoop();
1242 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1243 server.sharingpool = dictCreate(&setDictType,NULL);
1244 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1245 if (server.fd == -1) {
1246 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1247 exit(1);
1248 }
1249 for (j = 0; j < server.dbnum; j++) {
1250 server.db[j].dict = dictCreate(&hashDictType,NULL);
1251 server.db[j].expires = dictCreate(&setDictType,NULL);
1252 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1253 server.db[j].id = j;
1254 }
1255 server.cronloops = 0;
1256 server.bgsavechildpid = -1;
1257 server.bgrewritechildpid = -1;
1258 server.bgrewritebuf = sdsempty();
1259 server.lastsave = time(NULL);
1260 server.dirty = 0;
1261 server.usedmemory = 0;
1262 server.stat_numcommands = 0;
1263 server.stat_numconnections = 0;
1264 server.stat_starttime = time(NULL);
1265 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1266
1267 if (server.appendonly) {
1268 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1269 if (server.appendfd == -1) {
1270 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1271 strerror(errno));
1272 exit(1);
1273 }
1274 }
1275 }
1276
1277 /* Empty the whole database */
1278 static long long emptyDb() {
1279 int j;
1280 long long removed = 0;
1281
1282 for (j = 0; j < server.dbnum; j++) {
1283 removed += dictSize(server.db[j].dict);
1284 dictEmpty(server.db[j].dict);
1285 dictEmpty(server.db[j].expires);
1286 }
1287 return removed;
1288 }
1289
1290 static int yesnotoi(char *s) {
1291 if (!strcasecmp(s,"yes")) return 1;
1292 else if (!strcasecmp(s,"no")) return 0;
1293 else return -1;
1294 }
1295
1296 /* I agree, this is a very rudimental way to load a configuration...
1297 will improve later if the config gets more complex */
1298 static void loadServerConfig(char *filename) {
1299 FILE *fp;
1300 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1301 int linenum = 0;
1302 sds line = NULL;
1303
1304 if (filename[0] == '-' && filename[1] == '\0')
1305 fp = stdin;
1306 else {
1307 if ((fp = fopen(filename,"r")) == NULL) {
1308 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1309 exit(1);
1310 }
1311 }
1312
1313 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1314 sds *argv;
1315 int argc, j;
1316
1317 linenum++;
1318 line = sdsnew(buf);
1319 line = sdstrim(line," \t\r\n");
1320
1321 /* Skip comments and blank lines*/
1322 if (line[0] == '#' || line[0] == '\0') {
1323 sdsfree(line);
1324 continue;
1325 }
1326
1327 /* Split into arguments */
1328 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1329 sdstolower(argv[0]);
1330
1331 /* Execute config directives */
1332 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1333 server.maxidletime = atoi(argv[1]);
1334 if (server.maxidletime < 0) {
1335 err = "Invalid timeout value"; goto loaderr;
1336 }
1337 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1338 server.port = atoi(argv[1]);
1339 if (server.port < 1 || server.port > 65535) {
1340 err = "Invalid port"; goto loaderr;
1341 }
1342 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1343 server.bindaddr = zstrdup(argv[1]);
1344 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1345 int seconds = atoi(argv[1]);
1346 int changes = atoi(argv[2]);
1347 if (seconds < 1 || changes < 0) {
1348 err = "Invalid save parameters"; goto loaderr;
1349 }
1350 appendServerSaveParams(seconds,changes);
1351 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1352 if (chdir(argv[1]) == -1) {
1353 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1354 argv[1], strerror(errno));
1355 exit(1);
1356 }
1357 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1358 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1359 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1360 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1361 else {
1362 err = "Invalid log level. Must be one of debug, notice, warning";
1363 goto loaderr;
1364 }
1365 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1366 FILE *logfp;
1367
1368 server.logfile = zstrdup(argv[1]);
1369 if (!strcasecmp(server.logfile,"stdout")) {
1370 zfree(server.logfile);
1371 server.logfile = NULL;
1372 }
1373 if (server.logfile) {
1374 /* Test if we are able to open the file. The server will not
1375 * be able to abort just for this problem later... */
1376 logfp = fopen(server.logfile,"a");
1377 if (logfp == NULL) {
1378 err = sdscatprintf(sdsempty(),
1379 "Can't open the log file: %s", strerror(errno));
1380 goto loaderr;
1381 }
1382 fclose(logfp);
1383 }
1384 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1385 server.dbnum = atoi(argv[1]);
1386 if (server.dbnum < 1) {
1387 err = "Invalid number of databases"; goto loaderr;
1388 }
1389 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1390 server.maxclients = atoi(argv[1]);
1391 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1392 server.maxmemory = strtoll(argv[1], NULL, 10);
1393 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1394 server.masterhost = sdsnew(argv[1]);
1395 server.masterport = atoi(argv[2]);
1396 server.replstate = REDIS_REPL_CONNECT;
1397 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1398 server.masterauth = zstrdup(argv[1]);
1399 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1400 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1401 err = "argument must be 'yes' or 'no'"; goto loaderr;
1402 }
1403 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1404 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1405 err = "argument must be 'yes' or 'no'"; goto loaderr;
1406 }
1407 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1408 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1409 err = "argument must be 'yes' or 'no'"; goto loaderr;
1410 }
1411 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1412 server.sharingpoolsize = atoi(argv[1]);
1413 if (server.sharingpoolsize < 1) {
1414 err = "invalid object sharing pool size"; goto loaderr;
1415 }
1416 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1417 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1418 err = "argument must be 'yes' or 'no'"; goto loaderr;
1419 }
1420 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1421 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1422 err = "argument must be 'yes' or 'no'"; goto loaderr;
1423 }
1424 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1425 if (!strcasecmp(argv[1],"no")) {
1426 server.appendfsync = APPENDFSYNC_NO;
1427 } else if (!strcasecmp(argv[1],"always")) {
1428 server.appendfsync = APPENDFSYNC_ALWAYS;
1429 } else if (!strcasecmp(argv[1],"everysec")) {
1430 server.appendfsync = APPENDFSYNC_EVERYSEC;
1431 } else {
1432 err = "argument must be 'no', 'always' or 'everysec'";
1433 goto loaderr;
1434 }
1435 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1436 server.requirepass = zstrdup(argv[1]);
1437 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1438 server.pidfile = zstrdup(argv[1]);
1439 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1440 server.dbfilename = zstrdup(argv[1]);
1441 } else {
1442 err = "Bad directive or wrong number of arguments"; goto loaderr;
1443 }
1444 for (j = 0; j < argc; j++)
1445 sdsfree(argv[j]);
1446 zfree(argv);
1447 sdsfree(line);
1448 }
1449 if (fp != stdin) fclose(fp);
1450 return;
1451
1452 loaderr:
1453 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1454 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1455 fprintf(stderr, ">>> '%s'\n", line);
1456 fprintf(stderr, "%s\n", err);
1457 exit(1);
1458 }
1459
1460 static void freeClientArgv(redisClient *c) {
1461 int j;
1462
1463 for (j = 0; j < c->argc; j++)
1464 decrRefCount(c->argv[j]);
1465 for (j = 0; j < c->mbargc; j++)
1466 decrRefCount(c->mbargv[j]);
1467 c->argc = 0;
1468 c->mbargc = 0;
1469 }
1470
1471 static void freeClient(redisClient *c) {
1472 listNode *ln;
1473
1474 /* Note that if the client we are freeing is blocked into a blocking
1475 * call, we have to set querybuf to NULL *before* to call unblockClient()
1476 * to avoid processInputBuffer() will get called. Also it is important
1477 * to remove the file events after this, because this call adds
1478 * the READABLE event. */
1479 sdsfree(c->querybuf);
1480 c->querybuf = NULL;
1481 if (c->flags & REDIS_BLOCKED)
1482 unblockClient(c);
1483
1484 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1485 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1486 listRelease(c->reply);
1487 freeClientArgv(c);
1488 close(c->fd);
1489 ln = listSearchKey(server.clients,c);
1490 redisAssert(ln != NULL);
1491 listDelNode(server.clients,ln);
1492 if (c->flags & REDIS_SLAVE) {
1493 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1494 close(c->repldbfd);
1495 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1496 ln = listSearchKey(l,c);
1497 redisAssert(ln != NULL);
1498 listDelNode(l,ln);
1499 }
1500 if (c->flags & REDIS_MASTER) {
1501 server.master = NULL;
1502 server.replstate = REDIS_REPL_CONNECT;
1503 }
1504 zfree(c->argv);
1505 zfree(c->mbargv);
1506 freeClientMultiState(c);
1507 zfree(c);
1508 }
1509
1510 #define GLUEREPLY_UP_TO (1024)
1511 static void glueReplyBuffersIfNeeded(redisClient *c) {
1512 int copylen = 0;
1513 char buf[GLUEREPLY_UP_TO];
1514 listNode *ln;
1515 robj *o;
1516
1517 listRewind(c->reply);
1518 while((ln = listYield(c->reply))) {
1519 int objlen;
1520
1521 o = ln->value;
1522 objlen = sdslen(o->ptr);
1523 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1524 memcpy(buf+copylen,o->ptr,objlen);
1525 copylen += objlen;
1526 listDelNode(c->reply,ln);
1527 } else {
1528 if (copylen == 0) return;
1529 break;
1530 }
1531 }
1532 /* Now the output buffer is empty, add the new single element */
1533 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1534 listAddNodeHead(c->reply,o);
1535 }
1536
1537 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1538 redisClient *c = privdata;
1539 int nwritten = 0, totwritten = 0, objlen;
1540 robj *o;
1541 REDIS_NOTUSED(el);
1542 REDIS_NOTUSED(mask);
1543
1544 /* Use writev() if we have enough buffers to send */
1545 if (!server.glueoutputbuf &&
1546 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1547 !(c->flags & REDIS_MASTER))
1548 {
1549 sendReplyToClientWritev(el, fd, privdata, mask);
1550 return;
1551 }
1552
1553 while(listLength(c->reply)) {
1554 if (server.glueoutputbuf && listLength(c->reply) > 1)
1555 glueReplyBuffersIfNeeded(c);
1556
1557 o = listNodeValue(listFirst(c->reply));
1558 objlen = sdslen(o->ptr);
1559
1560 if (objlen == 0) {
1561 listDelNode(c->reply,listFirst(c->reply));
1562 continue;
1563 }
1564
1565 if (c->flags & REDIS_MASTER) {
1566 /* Don't reply to a master */
1567 nwritten = objlen - c->sentlen;
1568 } else {
1569 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1570 if (nwritten <= 0) break;
1571 }
1572 c->sentlen += nwritten;
1573 totwritten += nwritten;
1574 /* If we fully sent the object on head go to the next one */
1575 if (c->sentlen == objlen) {
1576 listDelNode(c->reply,listFirst(c->reply));
1577 c->sentlen = 0;
1578 }
1579 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1580 * bytes, in a single threaded server it's a good idea to serve
1581 * other clients as well, even if a very large request comes from
1582 * super fast link that is always able to accept data (in real world
1583 * scenario think about 'KEYS *' against the loopback interfae) */
1584 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1585 }
1586 if (nwritten == -1) {
1587 if (errno == EAGAIN) {
1588 nwritten = 0;
1589 } else {
1590 redisLog(REDIS_DEBUG,
1591 "Error writing to client: %s", strerror(errno));
1592 freeClient(c);
1593 return;
1594 }
1595 }
1596 if (totwritten > 0) c->lastinteraction = time(NULL);
1597 if (listLength(c->reply) == 0) {
1598 c->sentlen = 0;
1599 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1600 }
1601 }
1602
1603 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1604 {
1605 redisClient *c = privdata;
1606 int nwritten = 0, totwritten = 0, objlen, willwrite;
1607 robj *o;
1608 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1609 int offset, ion = 0;
1610 REDIS_NOTUSED(el);
1611 REDIS_NOTUSED(mask);
1612
1613 listNode *node;
1614 while (listLength(c->reply)) {
1615 offset = c->sentlen;
1616 ion = 0;
1617 willwrite = 0;
1618
1619 /* fill-in the iov[] array */
1620 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1621 o = listNodeValue(node);
1622 objlen = sdslen(o->ptr);
1623
1624 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1625 break;
1626
1627 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1628 break; /* no more iovecs */
1629
1630 iov[ion].iov_base = ((char*)o->ptr) + offset;
1631 iov[ion].iov_len = objlen - offset;
1632 willwrite += objlen - offset;
1633 offset = 0; /* just for the first item */
1634 ion++;
1635 }
1636
1637 if(willwrite == 0)
1638 break;
1639
1640 /* write all collected blocks at once */
1641 if((nwritten = writev(fd, iov, ion)) < 0) {
1642 if (errno != EAGAIN) {
1643 redisLog(REDIS_DEBUG,
1644 "Error writing to client: %s", strerror(errno));
1645 freeClient(c);
1646 return;
1647 }
1648 break;
1649 }
1650
1651 totwritten += nwritten;
1652 offset = c->sentlen;
1653
1654 /* remove written robjs from c->reply */
1655 while (nwritten && listLength(c->reply)) {
1656 o = listNodeValue(listFirst(c->reply));
1657 objlen = sdslen(o->ptr);
1658
1659 if(nwritten >= objlen - offset) {
1660 listDelNode(c->reply, listFirst(c->reply));
1661 nwritten -= objlen - offset;
1662 c->sentlen = 0;
1663 } else {
1664 /* partial write */
1665 c->sentlen += nwritten;
1666 break;
1667 }
1668 offset = 0;
1669 }
1670 }
1671
1672 if (totwritten > 0)
1673 c->lastinteraction = time(NULL);
1674
1675 if (listLength(c->reply) == 0) {
1676 c->sentlen = 0;
1677 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1678 }
1679 }
1680
1681 static struct redisCommand *lookupCommand(char *name) {
1682 int j = 0;
1683 while(cmdTable[j].name != NULL) {
1684 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1685 j++;
1686 }
1687 return NULL;
1688 }
1689
1690 /* resetClient prepare the client to process the next command */
1691 static void resetClient(redisClient *c) {
1692 freeClientArgv(c);
1693 c->bulklen = -1;
1694 c->multibulk = 0;
1695 }
1696
1697 /* Call() is the core of Redis execution of a command */
1698 static void call(redisClient *c, struct redisCommand *cmd) {
1699 long long dirty;
1700
1701 dirty = server.dirty;
1702 cmd->proc(c);
1703 if (server.appendonly && server.dirty-dirty)
1704 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1705 if (server.dirty-dirty && listLength(server.slaves))
1706 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1707 if (listLength(server.monitors))
1708 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1709 server.stat_numcommands++;
1710 }
1711
1712 /* If this function gets called we already read a whole
1713 * command, argments are in the client argv/argc fields.
1714 * processCommand() execute the command or prepare the
1715 * server for a bulk read from the client.
1716 *
1717 * If 1 is returned the client is still alive and valid and
1718 * and other operations can be performed by the caller. Otherwise
1719 * if 0 is returned the client was destroied (i.e. after QUIT). */
1720 static int processCommand(redisClient *c) {
1721 struct redisCommand *cmd;
1722
1723 /* Free some memory if needed (maxmemory setting) */
1724 if (server.maxmemory) freeMemoryIfNeeded();
1725
1726 /* Handle the multi bulk command type. This is an alternative protocol
1727 * supported by Redis in order to receive commands that are composed of
1728 * multiple binary-safe "bulk" arguments. The latency of processing is
1729 * a bit higher but this allows things like multi-sets, so if this
1730 * protocol is used only for MSET and similar commands this is a big win. */
1731 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1732 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1733 if (c->multibulk <= 0) {
1734 resetClient(c);
1735 return 1;
1736 } else {
1737 decrRefCount(c->argv[c->argc-1]);
1738 c->argc--;
1739 return 1;
1740 }
1741 } else if (c->multibulk) {
1742 if (c->bulklen == -1) {
1743 if (((char*)c->argv[0]->ptr)[0] != '$') {
1744 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1745 resetClient(c);
1746 return 1;
1747 } else {
1748 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1749 decrRefCount(c->argv[0]);
1750 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1751 c->argc--;
1752 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1753 resetClient(c);
1754 return 1;
1755 }
1756 c->argc--;
1757 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1758 return 1;
1759 }
1760 } else {
1761 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1762 c->mbargv[c->mbargc] = c->argv[0];
1763 c->mbargc++;
1764 c->argc--;
1765 c->multibulk--;
1766 if (c->multibulk == 0) {
1767 robj **auxargv;
1768 int auxargc;
1769
1770 /* Here we need to swap the multi-bulk argc/argv with the
1771 * normal argc/argv of the client structure. */
1772 auxargv = c->argv;
1773 c->argv = c->mbargv;
1774 c->mbargv = auxargv;
1775
1776 auxargc = c->argc;
1777 c->argc = c->mbargc;
1778 c->mbargc = auxargc;
1779
1780 /* We need to set bulklen to something different than -1
1781 * in order for the code below to process the command without
1782 * to try to read the last argument of a bulk command as
1783 * a special argument. */
1784 c->bulklen = 0;
1785 /* continue below and process the command */
1786 } else {
1787 c->bulklen = -1;
1788 return 1;
1789 }
1790 }
1791 }
1792 /* -- end of multi bulk commands processing -- */
1793
1794 /* The QUIT command is handled as a special case. Normal command
1795 * procs are unable to close the client connection safely */
1796 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1797 freeClient(c);
1798 return 0;
1799 }
1800 cmd = lookupCommand(c->argv[0]->ptr);
1801 if (!cmd) {
1802 addReplySds(c,
1803 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1804 (char*)c->argv[0]->ptr));
1805 resetClient(c);
1806 return 1;
1807 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1808 (c->argc < -cmd->arity)) {
1809 addReplySds(c,
1810 sdscatprintf(sdsempty(),
1811 "-ERR wrong number of arguments for '%s' command\r\n",
1812 cmd->name));
1813 resetClient(c);
1814 return 1;
1815 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1816 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1817 resetClient(c);
1818 return 1;
1819 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1820 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1821
1822 decrRefCount(c->argv[c->argc-1]);
1823 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1824 c->argc--;
1825 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1826 resetClient(c);
1827 return 1;
1828 }
1829 c->argc--;
1830 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1831 /* It is possible that the bulk read is already in the
1832 * buffer. Check this condition and handle it accordingly.
1833 * This is just a fast path, alternative to call processInputBuffer().
1834 * It's a good idea since the code is small and this condition
1835 * happens most of the times. */
1836 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1837 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1838 c->argc++;
1839 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1840 } else {
1841 return 1;
1842 }
1843 }
1844 /* Let's try to share objects on the command arguments vector */
1845 if (server.shareobjects) {
1846 int j;
1847 for(j = 1; j < c->argc; j++)
1848 c->argv[j] = tryObjectSharing(c->argv[j]);
1849 }
1850 /* Let's try to encode the bulk object to save space. */
1851 if (cmd->flags & REDIS_CMD_BULK)
1852 tryObjectEncoding(c->argv[c->argc-1]);
1853
1854 /* Check if the user is authenticated */
1855 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1856 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1857 resetClient(c);
1858 return 1;
1859 }
1860
1861 /* Exec the command */
1862 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1863 queueMultiCommand(c,cmd);
1864 addReply(c,shared.queued);
1865 } else {
1866 call(c,cmd);
1867 }
1868
1869 /* Prepare the client for the next command */
1870 if (c->flags & REDIS_CLOSE) {
1871 freeClient(c);
1872 return 0;
1873 }
1874 resetClient(c);
1875 return 1;
1876 }
1877
1878 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1879 listNode *ln;
1880 int outc = 0, j;
1881 robj **outv;
1882 /* (args*2)+1 is enough room for args, spaces, newlines */
1883 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1884
1885 if (argc <= REDIS_STATIC_ARGS) {
1886 outv = static_outv;
1887 } else {
1888 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1889 }
1890
1891 for (j = 0; j < argc; j++) {
1892 if (j != 0) outv[outc++] = shared.space;
1893 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1894 robj *lenobj;
1895
1896 lenobj = createObject(REDIS_STRING,
1897 sdscatprintf(sdsempty(),"%lu\r\n",
1898 (unsigned long) stringObjectLen(argv[j])));
1899 lenobj->refcount = 0;
1900 outv[outc++] = lenobj;
1901 }
1902 outv[outc++] = argv[j];
1903 }
1904 outv[outc++] = shared.crlf;
1905
1906 /* Increment all the refcounts at start and decrement at end in order to
1907 * be sure to free objects if there is no slave in a replication state
1908 * able to be feed with commands */
1909 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1910 listRewind(slaves);
1911 while((ln = listYield(slaves))) {
1912 redisClient *slave = ln->value;
1913
1914 /* Don't feed slaves that are still waiting for BGSAVE to start */
1915 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1916
1917 /* Feed all the other slaves, MONITORs and so on */
1918 if (slave->slaveseldb != dictid) {
1919 robj *selectcmd;
1920
1921 switch(dictid) {
1922 case 0: selectcmd = shared.select0; break;
1923 case 1: selectcmd = shared.select1; break;
1924 case 2: selectcmd = shared.select2; break;
1925 case 3: selectcmd = shared.select3; break;
1926 case 4: selectcmd = shared.select4; break;
1927 case 5: selectcmd = shared.select5; break;
1928 case 6: selectcmd = shared.select6; break;
1929 case 7: selectcmd = shared.select7; break;
1930 case 8: selectcmd = shared.select8; break;
1931 case 9: selectcmd = shared.select9; break;
1932 default:
1933 selectcmd = createObject(REDIS_STRING,
1934 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1935 selectcmd->refcount = 0;
1936 break;
1937 }
1938 addReply(slave,selectcmd);
1939 slave->slaveseldb = dictid;
1940 }
1941 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1942 }
1943 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1944 if (outv != static_outv) zfree(outv);
1945 }
1946
1947 static void processInputBuffer(redisClient *c) {
1948 again:
1949 /* Before to process the input buffer, make sure the client is not
1950 * waitig for a blocking operation such as BLPOP. Note that the first
1951 * iteration the client is never blocked, otherwise the processInputBuffer
1952 * would not be called at all, but after the execution of the first commands
1953 * in the input buffer the client may be blocked, and the "goto again"
1954 * will try to reiterate. The following line will make it return asap. */
1955 if (c->flags & REDIS_BLOCKED) return;
1956 if (c->bulklen == -1) {
1957 /* Read the first line of the query */
1958 char *p = strchr(c->querybuf,'\n');
1959 size_t querylen;
1960
1961 if (p) {
1962 sds query, *argv;
1963 int argc, j;
1964
1965 query = c->querybuf;
1966 c->querybuf = sdsempty();
1967 querylen = 1+(p-(query));
1968 if (sdslen(query) > querylen) {
1969 /* leave data after the first line of the query in the buffer */
1970 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1971 }
1972 *p = '\0'; /* remove "\n" */
1973 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1974 sdsupdatelen(query);
1975
1976 /* Now we can split the query in arguments */
1977 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1978 sdsfree(query);
1979
1980 if (c->argv) zfree(c->argv);
1981 c->argv = zmalloc(sizeof(robj*)*argc);
1982
1983 for (j = 0; j < argc; j++) {
1984 if (sdslen(argv[j])) {
1985 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1986 c->argc++;
1987 } else {
1988 sdsfree(argv[j]);
1989 }
1990 }
1991 zfree(argv);
1992 if (c->argc) {
1993 /* Execute the command. If the client is still valid
1994 * after processCommand() return and there is something
1995 * on the query buffer try to process the next command. */
1996 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1997 } else {
1998 /* Nothing to process, argc == 0. Just process the query
1999 * buffer if it's not empty or return to the caller */
2000 if (sdslen(c->querybuf)) goto again;
2001 }
2002 return;
2003 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2004 redisLog(REDIS_DEBUG, "Client protocol error");
2005 freeClient(c);
2006 return;
2007 }
2008 } else {
2009 /* Bulk read handling. Note that if we are at this point
2010 the client already sent a command terminated with a newline,
2011 we are reading the bulk data that is actually the last
2012 argument of the command. */
2013 int qbl = sdslen(c->querybuf);
2014
2015 if (c->bulklen <= qbl) {
2016 /* Copy everything but the final CRLF as final argument */
2017 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2018 c->argc++;
2019 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2020 /* Process the command. If the client is still valid after
2021 * the processing and there is more data in the buffer
2022 * try to parse it. */
2023 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2024 return;
2025 }
2026 }
2027 }
2028
2029 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2030 redisClient *c = (redisClient*) privdata;
2031 char buf[REDIS_IOBUF_LEN];
2032 int nread;
2033 REDIS_NOTUSED(el);
2034 REDIS_NOTUSED(mask);
2035
2036 nread = read(fd, buf, REDIS_IOBUF_LEN);
2037 if (nread == -1) {
2038 if (errno == EAGAIN) {
2039 nread = 0;
2040 } else {
2041 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2042 freeClient(c);
2043 return;
2044 }
2045 } else if (nread == 0) {
2046 redisLog(REDIS_DEBUG, "Client closed connection");
2047 freeClient(c);
2048 return;
2049 }
2050 if (nread) {
2051 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2052 c->lastinteraction = time(NULL);
2053 } else {
2054 return;
2055 }
2056 processInputBuffer(c);
2057 }
2058
2059 static int selectDb(redisClient *c, int id) {
2060 if (id < 0 || id >= server.dbnum)
2061 return REDIS_ERR;
2062 c->db = &server.db[id];
2063 return REDIS_OK;
2064 }
2065
2066 static void *dupClientReplyValue(void *o) {
2067 incrRefCount((robj*)o);
2068 return 0;
2069 }
2070
2071 static redisClient *createClient(int fd) {
2072 redisClient *c = zmalloc(sizeof(*c));
2073
2074 anetNonBlock(NULL,fd);
2075 anetTcpNoDelay(NULL,fd);
2076 if (!c) return NULL;
2077 selectDb(c,0);
2078 c->fd = fd;
2079 c->querybuf = sdsempty();
2080 c->argc = 0;
2081 c->argv = NULL;
2082 c->bulklen = -1;
2083 c->multibulk = 0;
2084 c->mbargc = 0;
2085 c->mbargv = NULL;
2086 c->sentlen = 0;
2087 c->flags = 0;
2088 c->lastinteraction = time(NULL);
2089 c->authenticated = 0;
2090 c->replstate = REDIS_REPL_NONE;
2091 c->reply = listCreate();
2092 c->blockingkey = NULL;
2093 listSetFreeMethod(c->reply,decrRefCount);
2094 listSetDupMethod(c->reply,dupClientReplyValue);
2095 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2096 readQueryFromClient, c) == AE_ERR) {
2097 freeClient(c);
2098 return NULL;
2099 }
2100 listAddNodeTail(server.clients,c);
2101 initClientMultiState(c);
2102 return c;
2103 }
2104
2105 static void addReply(redisClient *c, robj *obj) {
2106 if (listLength(c->reply) == 0 &&
2107 (c->replstate == REDIS_REPL_NONE ||
2108 c->replstate == REDIS_REPL_ONLINE) &&
2109 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2110 sendReplyToClient, c) == AE_ERR) return;
2111 listAddNodeTail(c->reply,getDecodedObject(obj));
2112 }
2113
2114 static void addReplySds(redisClient *c, sds s) {
2115 robj *o = createObject(REDIS_STRING,s);
2116 addReply(c,o);
2117 decrRefCount(o);
2118 }
2119
2120 static void addReplyDouble(redisClient *c, double d) {
2121 char buf[128];
2122
2123 snprintf(buf,sizeof(buf),"%.17g",d);
2124 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2125 (unsigned long) strlen(buf),buf));
2126 }
2127
2128 static void addReplyBulkLen(redisClient *c, robj *obj) {
2129 size_t len;
2130
2131 if (obj->encoding == REDIS_ENCODING_RAW) {
2132 len = sdslen(obj->ptr);
2133 } else {
2134 long n = (long)obj->ptr;
2135
2136 /* Compute how many bytes will take this integer as a radix 10 string */
2137 len = 1;
2138 if (n < 0) {
2139 len++;
2140 n = -n;
2141 }
2142 while((n = n/10) != 0) {
2143 len++;
2144 }
2145 }
2146 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2147 }
2148
2149 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2150 int cport, cfd;
2151 char cip[128];
2152 redisClient *c;
2153 REDIS_NOTUSED(el);
2154 REDIS_NOTUSED(mask);
2155 REDIS_NOTUSED(privdata);
2156
2157 cfd = anetAccept(server.neterr, fd, cip, &cport);
2158 if (cfd == AE_ERR) {
2159 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2160 return;
2161 }
2162 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2163 if ((c = createClient(cfd)) == NULL) {
2164 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2165 close(cfd); /* May be already closed, just ingore errors */
2166 return;
2167 }
2168 /* If maxclient directive is set and this is one client more... close the
2169 * connection. Note that we create the client instead to check before
2170 * for this condition, since now the socket is already set in nonblocking
2171 * mode and we can send an error for free using the Kernel I/O */
2172 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2173 char *err = "-ERR max number of clients reached\r\n";
2174
2175 /* That's a best effort error message, don't check write errors */
2176 if (write(c->fd,err,strlen(err)) == -1) {
2177 /* Nothing to do, Just to avoid the warning... */
2178 }
2179 freeClient(c);
2180 return;
2181 }
2182 server.stat_numconnections++;
2183 }
2184
2185 /* ======================= Redis objects implementation ===================== */
2186
2187 static robj *createObject(int type, void *ptr) {
2188 robj *o;
2189
2190 if (listLength(server.objfreelist)) {
2191 listNode *head = listFirst(server.objfreelist);
2192 o = listNodeValue(head);
2193 listDelNode(server.objfreelist,head);
2194 } else {
2195 o = zmalloc(sizeof(*o));
2196 }
2197 o->type = type;
2198 o->encoding = REDIS_ENCODING_RAW;
2199 o->ptr = ptr;
2200 o->refcount = 1;
2201 return o;
2202 }
2203
2204 static robj *createStringObject(char *ptr, size_t len) {
2205 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2206 }
2207
2208 static robj *createListObject(void) {
2209 list *l = listCreate();
2210
2211 listSetFreeMethod(l,decrRefCount);
2212 return createObject(REDIS_LIST,l);
2213 }
2214
2215 static robj *createSetObject(void) {
2216 dict *d = dictCreate(&setDictType,NULL);
2217 return createObject(REDIS_SET,d);
2218 }
2219
2220 static robj *createZsetObject(void) {
2221 zset *zs = zmalloc(sizeof(*zs));
2222
2223 zs->dict = dictCreate(&zsetDictType,NULL);
2224 zs->zsl = zslCreate();
2225 return createObject(REDIS_ZSET,zs);
2226 }
2227
2228 static void freeStringObject(robj *o) {
2229 if (o->encoding == REDIS_ENCODING_RAW) {
2230 sdsfree(o->ptr);
2231 }
2232 }
2233
2234 static void freeListObject(robj *o) {
2235 listRelease((list*) o->ptr);
2236 }
2237
2238 static void freeSetObject(robj *o) {
2239 dictRelease((dict*) o->ptr);
2240 }
2241
2242 static void freeZsetObject(robj *o) {
2243 zset *zs = o->ptr;
2244
2245 dictRelease(zs->dict);
2246 zslFree(zs->zsl);
2247 zfree(zs);
2248 }
2249
2250 static void freeHashObject(robj *o) {
2251 dictRelease((dict*) o->ptr);
2252 }
2253
2254 static void incrRefCount(robj *o) {
2255 o->refcount++;
2256 #ifdef DEBUG_REFCOUNT
2257 if (o->type == REDIS_STRING)
2258 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2259 #endif
2260 }
2261
2262 static void decrRefCount(void *obj) {
2263 robj *o = obj;
2264
2265 #ifdef DEBUG_REFCOUNT
2266 if (o->type == REDIS_STRING)
2267 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2268 #endif
2269 if (--(o->refcount) == 0) {
2270 switch(o->type) {
2271 case REDIS_STRING: freeStringObject(o); break;
2272 case REDIS_LIST: freeListObject(o); break;
2273 case REDIS_SET: freeSetObject(o); break;
2274 case REDIS_ZSET: freeZsetObject(o); break;
2275 case REDIS_HASH: freeHashObject(o); break;
2276 default: redisAssert(0 != 0); break;
2277 }
2278 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2279 !listAddNodeHead(server.objfreelist,o))
2280 zfree(o);
2281 }
2282 }
2283
2284 static robj *lookupKey(redisDb *db, robj *key) {
2285 dictEntry *de = dictFind(db->dict,key);
2286 return de ? dictGetEntryVal(de) : NULL;
2287 }
2288
2289 static robj *lookupKeyRead(redisDb *db, robj *key) {
2290 expireIfNeeded(db,key);
2291 return lookupKey(db,key);
2292 }
2293
2294 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2295 deleteIfVolatile(db,key);
2296 return lookupKey(db,key);
2297 }
2298
2299 static int deleteKey(redisDb *db, robj *key) {
2300 int retval;
2301
2302 /* We need to protect key from destruction: after the first dictDelete()
2303 * it may happen that 'key' is no longer valid if we don't increment
2304 * it's count. This may happen when we get the object reference directly
2305 * from the hash table with dictRandomKey() or dict iterators */
2306 incrRefCount(key);
2307 if (dictSize(db->expires)) dictDelete(db->expires,key);
2308 retval = dictDelete(db->dict,key);
2309 decrRefCount(key);
2310
2311 return retval == DICT_OK;
2312 }
2313
2314 /* Try to share an object against the shared objects pool */
2315 static robj *tryObjectSharing(robj *o) {
2316 struct dictEntry *de;
2317 unsigned long c;
2318
2319 if (o == NULL || server.shareobjects == 0) return o;
2320
2321 redisAssert(o->type == REDIS_STRING);
2322 de = dictFind(server.sharingpool,o);
2323 if (de) {
2324 robj *shared = dictGetEntryKey(de);
2325
2326 c = ((unsigned long) dictGetEntryVal(de))+1;
2327 dictGetEntryVal(de) = (void*) c;
2328 incrRefCount(shared);
2329 decrRefCount(o);
2330 return shared;
2331 } else {
2332 /* Here we are using a stream algorihtm: Every time an object is
2333 * shared we increment its count, everytime there is a miss we
2334 * recrement the counter of a random object. If this object reaches
2335 * zero we remove the object and put the current object instead. */
2336 if (dictSize(server.sharingpool) >=
2337 server.sharingpoolsize) {
2338 de = dictGetRandomKey(server.sharingpool);
2339 redisAssert(de != NULL);
2340 c = ((unsigned long) dictGetEntryVal(de))-1;
2341 dictGetEntryVal(de) = (void*) c;
2342 if (c == 0) {
2343 dictDelete(server.sharingpool,de->key);
2344 }
2345 } else {
2346 c = 0; /* If the pool is empty we want to add this object */
2347 }
2348 if (c == 0) {
2349 int retval;
2350
2351 retval = dictAdd(server.sharingpool,o,(void*)1);
2352 redisAssert(retval == DICT_OK);
2353 incrRefCount(o);
2354 }
2355 return o;
2356 }
2357 }
2358
2359 /* Check if the nul-terminated string 's' can be represented by a long
2360 * (that is, is a number that fits into long without any other space or
2361 * character before or after the digits).
2362 *
2363 * If so, the function returns REDIS_OK and *longval is set to the value
2364 * of the number. Otherwise REDIS_ERR is returned */
2365 static int isStringRepresentableAsLong(sds s, long *longval) {
2366 char buf[32], *endptr;
2367 long value;
2368 int slen;
2369
2370 value = strtol(s, &endptr, 10);
2371 if (endptr[0] != '\0') return REDIS_ERR;
2372 slen = snprintf(buf,32,"%ld",value);
2373
2374 /* If the number converted back into a string is not identical
2375 * then it's not possible to encode the string as integer */
2376 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2377 if (longval) *longval = value;
2378 return REDIS_OK;
2379 }
2380
2381 /* Try to encode a string object in order to save space */
2382 static int tryObjectEncoding(robj *o) {
2383 long value;
2384 sds s = o->ptr;
2385
2386 if (o->encoding != REDIS_ENCODING_RAW)
2387 return REDIS_ERR; /* Already encoded */
2388
2389 /* It's not save to encode shared objects: shared objects can be shared
2390 * everywhere in the "object space" of Redis. Encoded objects can only
2391 * appear as "values" (and not, for instance, as keys) */
2392 if (o->refcount > 1) return REDIS_ERR;
2393
2394 /* Currently we try to encode only strings */
2395 redisAssert(o->type == REDIS_STRING);
2396
2397 /* Check if we can represent this string as a long integer */
2398 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2399
2400 /* Ok, this object can be encoded */
2401 o->encoding = REDIS_ENCODING_INT;
2402 sdsfree(o->ptr);
2403 o->ptr = (void*) value;
2404 return REDIS_OK;
2405 }
2406
2407 /* Get a decoded version of an encoded object (returned as a new object).
2408 * If the object is already raw-encoded just increment the ref count. */
2409 static robj *getDecodedObject(robj *o) {
2410 robj *dec;
2411
2412 if (o->encoding == REDIS_ENCODING_RAW) {
2413 incrRefCount(o);
2414 return o;
2415 }
2416 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2417 char buf[32];
2418
2419 snprintf(buf,32,"%ld",(long)o->ptr);
2420 dec = createStringObject(buf,strlen(buf));
2421 return dec;
2422 } else {
2423 redisAssert(1 != 1);
2424 }
2425 }
2426
2427 /* Compare two string objects via strcmp() or alike.
2428 * Note that the objects may be integer-encoded. In such a case we
2429 * use snprintf() to get a string representation of the numbers on the stack
2430 * and compare the strings, it's much faster than calling getDecodedObject().
2431 *
2432 * Important note: if objects are not integer encoded, but binary-safe strings,
2433 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2434 * binary safe. */
2435 static int compareStringObjects(robj *a, robj *b) {
2436 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2437 char bufa[128], bufb[128], *astr, *bstr;
2438 int bothsds = 1;
2439
2440 if (a == b) return 0;
2441 if (a->encoding != REDIS_ENCODING_RAW) {
2442 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2443 astr = bufa;
2444 bothsds = 0;
2445 } else {
2446 astr = a->ptr;
2447 }
2448 if (b->encoding != REDIS_ENCODING_RAW) {
2449 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2450 bstr = bufb;
2451 bothsds = 0;
2452 } else {
2453 bstr = b->ptr;
2454 }
2455 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2456 }
2457
2458 static size_t stringObjectLen(robj *o) {
2459 redisAssert(o->type == REDIS_STRING);
2460 if (o->encoding == REDIS_ENCODING_RAW) {
2461 return sdslen(o->ptr);
2462 } else {
2463 char buf[32];
2464
2465 return snprintf(buf,32,"%ld",(long)o->ptr);
2466 }
2467 }
2468
2469 /*============================ DB saving/loading ============================ */
2470
2471 static int rdbSaveType(FILE *fp, unsigned char type) {
2472 if (fwrite(&type,1,1,fp) == 0) return -1;
2473 return 0;
2474 }
2475
2476 static int rdbSaveTime(FILE *fp, time_t t) {
2477 int32_t t32 = (int32_t) t;
2478 if (fwrite(&t32,4,1,fp) == 0) return -1;
2479 return 0;
2480 }
2481
2482 /* check rdbLoadLen() comments for more info */
2483 static int rdbSaveLen(FILE *fp, uint32_t len) {
2484 unsigned char buf[2];
2485
2486 if (len < (1<<6)) {
2487 /* Save a 6 bit len */
2488 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2489 if (fwrite(buf,1,1,fp) == 0) return -1;
2490 } else if (len < (1<<14)) {
2491 /* Save a 14 bit len */
2492 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2493 buf[1] = len&0xFF;
2494 if (fwrite(buf,2,1,fp) == 0) return -1;
2495 } else {
2496 /* Save a 32 bit len */
2497 buf[0] = (REDIS_RDB_32BITLEN<<6);
2498 if (fwrite(buf,1,1,fp) == 0) return -1;
2499 len = htonl(len);
2500 if (fwrite(&len,4,1,fp) == 0) return -1;
2501 }
2502 return 0;
2503 }
2504
2505 /* String objects in the form "2391" "-100" without any space and with a
2506 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2507 * encoded as integers to save space */
2508 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2509 long long value;
2510 char *endptr, buf[32];
2511
2512 /* Check if it's possible to encode this value as a number */
2513 value = strtoll(s, &endptr, 10);
2514 if (endptr[0] != '\0') return 0;
2515 snprintf(buf,32,"%lld",value);
2516
2517 /* If the number converted back into a string is not identical
2518 * then it's not possible to encode the string as integer */
2519 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2520
2521 /* Finally check if it fits in our ranges */
2522 if (value >= -(1<<7) && value <= (1<<7)-1) {
2523 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2524 enc[1] = value&0xFF;
2525 return 2;
2526 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2527 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2528 enc[1] = value&0xFF;
2529 enc[2] = (value>>8)&0xFF;
2530 return 3;
2531 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2532 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2533 enc[1] = value&0xFF;
2534 enc[2] = (value>>8)&0xFF;
2535 enc[3] = (value>>16)&0xFF;
2536 enc[4] = (value>>24)&0xFF;
2537 return 5;
2538 } else {
2539 return 0;
2540 }
2541 }
2542
2543 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2544 unsigned int comprlen, outlen;
2545 unsigned char byte;
2546 void *out;
2547
2548 /* We require at least four bytes compression for this to be worth it */
2549 outlen = sdslen(obj->ptr)-4;
2550 if (outlen <= 0) return 0;
2551 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2552 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2553 if (comprlen == 0) {
2554 zfree(out);
2555 return 0;
2556 }
2557 /* Data compressed! Let's save it on disk */
2558 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2559 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2560 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2561 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2562 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2563 zfree(out);
2564 return comprlen;
2565
2566 writeerr:
2567 zfree(out);
2568 return -1;
2569 }
2570
2571 /* Save a string objet as [len][data] on disk. If the object is a string
2572 * representation of an integer value we try to safe it in a special form */
2573 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2574 size_t len;
2575 int enclen;
2576
2577 len = sdslen(obj->ptr);
2578
2579 /* Try integer encoding */
2580 if (len <= 11) {
2581 unsigned char buf[5];
2582 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2583 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2584 return 0;
2585 }
2586 }
2587
2588 /* Try LZF compression - under 20 bytes it's unable to compress even
2589 * aaaaaaaaaaaaaaaaaa so skip it */
2590 if (server.rdbcompression && len > 20) {
2591 int retval;
2592
2593 retval = rdbSaveLzfStringObject(fp,obj);
2594 if (retval == -1) return -1;
2595 if (retval > 0) return 0;
2596 /* retval == 0 means data can't be compressed, save the old way */
2597 }
2598
2599 /* Store verbatim */
2600 if (rdbSaveLen(fp,len) == -1) return -1;
2601 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2602 return 0;
2603 }
2604
2605 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2606 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2607 int retval;
2608
2609 obj = getDecodedObject(obj);
2610 retval = rdbSaveStringObjectRaw(fp,obj);
2611 decrRefCount(obj);
2612 return retval;
2613 }
2614
2615 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2616 * 8 bit integer specifing the length of the representation.
2617 * This 8 bit integer has special values in order to specify the following
2618 * conditions:
2619 * 253: not a number
2620 * 254: + inf
2621 * 255: - inf
2622 */
2623 static int rdbSaveDoubleValue(FILE *fp, double val) {
2624 unsigned char buf[128];
2625 int len;
2626
2627 if (isnan(val)) {
2628 buf[0] = 253;
2629 len = 1;
2630 } else if (!isfinite(val)) {
2631 len = 1;
2632 buf[0] = (val < 0) ? 255 : 254;
2633 } else {
2634 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2635 buf[0] = strlen((char*)buf+1);
2636 len = buf[0]+1;
2637 }
2638 if (fwrite(buf,len,1,fp) == 0) return -1;
2639 return 0;
2640 }
2641
2642 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2643 static int rdbSave(char *filename) {
2644 dictIterator *di = NULL;
2645 dictEntry *de;
2646 FILE *fp;
2647 char tmpfile[256];
2648 int j;
2649 time_t now = time(NULL);
2650
2651 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2652 fp = fopen(tmpfile,"w");
2653 if (!fp) {
2654 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2655 return REDIS_ERR;
2656 }
2657 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2658 for (j = 0; j < server.dbnum; j++) {
2659 redisDb *db = server.db+j;
2660 dict *d = db->dict;
2661 if (dictSize(d) == 0) continue;
2662 di = dictGetIterator(d);
2663 if (!di) {
2664 fclose(fp);
2665 return REDIS_ERR;
2666 }
2667
2668 /* Write the SELECT DB opcode */
2669 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2670 if (rdbSaveLen(fp,j) == -1) goto werr;
2671
2672 /* Iterate this DB writing every entry */
2673 while((de = dictNext(di)) != NULL) {
2674 robj *key = dictGetEntryKey(de);
2675 robj *o = dictGetEntryVal(de);
2676 time_t expiretime = getExpire(db,key);
2677
2678 /* Save the expire time */
2679 if (expiretime != -1) {
2680 /* If this key is already expired skip it */
2681 if (expiretime < now) continue;
2682 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2683 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2684 }
2685 /* Save the key and associated value */
2686 if (rdbSaveType(fp,o->type) == -1) goto werr;
2687 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2688 if (o->type == REDIS_STRING) {
2689 /* Save a string value */
2690 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2691 } else if (o->type == REDIS_LIST) {
2692 /* Save a list value */
2693 list *list = o->ptr;
2694 listNode *ln;
2695
2696 listRewind(list);
2697 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2698 while((ln = listYield(list))) {
2699 robj *eleobj = listNodeValue(ln);
2700
2701 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2702 }
2703 } else if (o->type == REDIS_SET) {
2704 /* Save a set value */
2705 dict *set = o->ptr;
2706 dictIterator *di = dictGetIterator(set);
2707 dictEntry *de;
2708
2709 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2710 while((de = dictNext(di)) != NULL) {
2711 robj *eleobj = dictGetEntryKey(de);
2712
2713 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2714 }
2715 dictReleaseIterator(di);
2716 } else if (o->type == REDIS_ZSET) {
2717 /* Save a set value */
2718 zset *zs = o->ptr;
2719 dictIterator *di = dictGetIterator(zs->dict);
2720 dictEntry *de;
2721
2722 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2723 while((de = dictNext(di)) != NULL) {
2724 robj *eleobj = dictGetEntryKey(de);
2725 double *score = dictGetEntryVal(de);
2726
2727 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2728 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2729 }
2730 dictReleaseIterator(di);
2731 } else {
2732 redisAssert(0 != 0);
2733 }
2734 }
2735 dictReleaseIterator(di);
2736 }
2737 /* EOF opcode */
2738 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2739
2740 /* Make sure data will not remain on the OS's output buffers */
2741 fflush(fp);
2742 fsync(fileno(fp));
2743 fclose(fp);
2744
2745 /* Use RENAME to make sure the DB file is changed atomically only
2746 * if the generate DB file is ok. */
2747 if (rename(tmpfile,filename) == -1) {
2748 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2749 unlink(tmpfile);
2750 return REDIS_ERR;
2751 }
2752 redisLog(REDIS_NOTICE,"DB saved on disk");
2753 server.dirty = 0;
2754 server.lastsave = time(NULL);
2755 return REDIS_OK;
2756
2757 werr:
2758 fclose(fp);
2759 unlink(tmpfile);
2760 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2761 if (di) dictReleaseIterator(di);
2762 return REDIS_ERR;
2763 }
2764
2765 static int rdbSaveBackground(char *filename) {
2766 pid_t childpid;
2767
2768 if (server.bgsavechildpid != -1) return REDIS_ERR;
2769 if ((childpid = fork()) == 0) {
2770 /* Child */
2771 close(server.fd);
2772 if (rdbSave(filename) == REDIS_OK) {
2773 exit(0);
2774 } else {
2775 exit(1);
2776 }
2777 } else {
2778 /* Parent */
2779 if (childpid == -1) {
2780 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2781 strerror(errno));
2782 return REDIS_ERR;
2783 }
2784 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2785 server.bgsavechildpid = childpid;
2786 return REDIS_OK;
2787 }
2788 return REDIS_OK; /* unreached */
2789 }
2790
2791 static void rdbRemoveTempFile(pid_t childpid) {
2792 char tmpfile[256];
2793
2794 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2795 unlink(tmpfile);
2796 }
2797
2798 static int rdbLoadType(FILE *fp) {
2799 unsigned char type;
2800 if (fread(&type,1,1,fp) == 0) return -1;
2801 return type;
2802 }
2803
2804 static time_t rdbLoadTime(FILE *fp) {
2805 int32_t t32;
2806 if (fread(&t32,4,1,fp) == 0) return -1;
2807 return (time_t) t32;
2808 }
2809
2810 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2811 * of this file for a description of how this are stored on disk.
2812 *
2813 * isencoded is set to 1 if the readed length is not actually a length but
2814 * an "encoding type", check the above comments for more info */
2815 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2816 unsigned char buf[2];
2817 uint32_t len;
2818
2819 if (isencoded) *isencoded = 0;
2820 if (rdbver == 0) {
2821 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2822 return ntohl(len);
2823 } else {
2824 int type;
2825
2826 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2827 type = (buf[0]&0xC0)>>6;
2828 if (type == REDIS_RDB_6BITLEN) {
2829 /* Read a 6 bit len */
2830 return buf[0]&0x3F;
2831 } else if (type == REDIS_RDB_ENCVAL) {
2832 /* Read a 6 bit len encoding type */
2833 if (isencoded) *isencoded = 1;
2834 return buf[0]&0x3F;
2835 } else if (type == REDIS_RDB_14BITLEN) {
2836 /* Read a 14 bit len */
2837 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2838 return ((buf[0]&0x3F)<<8)|buf[1];
2839 } else {
2840 /* Read a 32 bit len */
2841 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2842 return ntohl(len);
2843 }
2844 }
2845 }
2846
2847 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2848 unsigned char enc[4];
2849 long long val;
2850
2851 if (enctype == REDIS_RDB_ENC_INT8) {
2852 if (fread(enc,1,1,fp) == 0) return NULL;
2853 val = (signed char)enc[0];
2854 } else if (enctype == REDIS_RDB_ENC_INT16) {
2855 uint16_t v;
2856 if (fread(enc,2,1,fp) == 0) return NULL;
2857 v = enc[0]|(enc[1]<<8);
2858 val = (int16_t)v;
2859 } else if (enctype == REDIS_RDB_ENC_INT32) {
2860 uint32_t v;
2861 if (fread(enc,4,1,fp) == 0) return NULL;
2862 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2863 val = (int32_t)v;
2864 } else {
2865 val = 0; /* anti-warning */
2866 redisAssert(0!=0);
2867 }
2868 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2869 }
2870
2871 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2872 unsigned int len, clen;
2873 unsigned char *c = NULL;
2874 sds val = NULL;
2875
2876 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2877 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2878 if ((c = zmalloc(clen)) == NULL) goto err;
2879 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2880 if (fread(c,clen,1,fp) == 0) goto err;
2881 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2882 zfree(c);
2883 return createObject(REDIS_STRING,val);
2884 err:
2885 zfree(c);
2886 sdsfree(val);
2887 return NULL;
2888 }
2889
2890 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2891 int isencoded;
2892 uint32_t len;
2893 sds val;
2894
2895 len = rdbLoadLen(fp,rdbver,&isencoded);
2896 if (isencoded) {
2897 switch(len) {
2898 case REDIS_RDB_ENC_INT8:
2899 case REDIS_RDB_ENC_INT16:
2900 case REDIS_RDB_ENC_INT32:
2901 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2902 case REDIS_RDB_ENC_LZF:
2903 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2904 default:
2905 redisAssert(0!=0);
2906 }
2907 }
2908
2909 if (len == REDIS_RDB_LENERR) return NULL;
2910 val = sdsnewlen(NULL,len);
2911 if (len && fread(val,len,1,fp) == 0) {
2912 sdsfree(val);
2913 return NULL;
2914 }
2915 return tryObjectSharing(createObject(REDIS_STRING,val));
2916 }
2917
2918 /* For information about double serialization check rdbSaveDoubleValue() */
2919 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2920 char buf[128];
2921 unsigned char len;
2922
2923 if (fread(&len,1,1,fp) == 0) return -1;
2924 switch(len) {
2925 case 255: *val = R_NegInf; return 0;
2926 case 254: *val = R_PosInf; return 0;
2927 case 253: *val = R_Nan; return 0;
2928 default:
2929 if (fread(buf,len,1,fp) == 0) return -1;
2930 buf[len] = '\0';
2931 sscanf(buf, "%lg", val);
2932 return 0;
2933 }
2934 }
2935
2936 static int rdbLoad(char *filename) {
2937 FILE *fp;
2938 robj *keyobj = NULL;
2939 uint32_t dbid;
2940 int type, retval, rdbver;
2941 dict *d = server.db[0].dict;
2942 redisDb *db = server.db+0;
2943 char buf[1024];
2944 time_t expiretime = -1, now = time(NULL);
2945
2946 fp = fopen(filename,"r");
2947 if (!fp) return REDIS_ERR;
2948 if (fread(buf,9,1,fp) == 0) goto eoferr;
2949 buf[9] = '\0';
2950 if (memcmp(buf,"REDIS",5) != 0) {
2951 fclose(fp);
2952 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2953 return REDIS_ERR;
2954 }
2955 rdbver = atoi(buf+5);
2956 if (rdbver > 1) {
2957 fclose(fp);
2958 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2959 return REDIS_ERR;
2960 }
2961 while(1) {
2962 robj *o;
2963
2964 /* Read type. */
2965 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2966 if (type == REDIS_EXPIRETIME) {
2967 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2968 /* We read the time so we need to read the object type again */
2969 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2970 }
2971 if (type == REDIS_EOF) break;
2972 /* Handle SELECT DB opcode as a special case */
2973 if (type == REDIS_SELECTDB) {
2974 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2975 goto eoferr;
2976 if (dbid >= (unsigned)server.dbnum) {
2977 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2978 exit(1);
2979 }
2980 db = server.db+dbid;
2981 d = db->dict;
2982 continue;
2983 }
2984 /* Read key */
2985 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2986
2987 if (type == REDIS_STRING) {
2988 /* Read string value */
2989 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2990 tryObjectEncoding(o);
2991 } else if (type == REDIS_LIST || type == REDIS_SET) {
2992 /* Read list/set value */
2993 uint32_t listlen;
2994
2995 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2996 goto eoferr;
2997 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2998 /* Load every single element of the list/set */
2999 while(listlen--) {
3000 robj *ele;
3001
3002 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3003 tryObjectEncoding(ele);
3004 if (type == REDIS_LIST) {
3005 listAddNodeTail((list*)o->ptr,ele);
3006 } else {
3007 dictAdd((dict*)o->ptr,ele,NULL);
3008 }
3009 }
3010 } else if (type == REDIS_ZSET) {
3011 /* Read list/set value */
3012 uint32_t zsetlen;
3013 zset *zs;
3014
3015 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3016 goto eoferr;
3017 o = createZsetObject();
3018 zs = o->ptr;
3019 /* Load every single element of the list/set */
3020 while(zsetlen--) {
3021 robj *ele;
3022 double *score = zmalloc(sizeof(double));
3023
3024 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3025 tryObjectEncoding(ele);
3026 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3027 dictAdd(zs->dict,ele,score);
3028 zslInsert(zs->zsl,*score,ele);
3029 incrRefCount(ele); /* added to skiplist */
3030 }
3031 } else {
3032 redisAssert(0 != 0);
3033 }
3034 /* Add the new object in the hash table */
3035 retval = dictAdd(d,keyobj,o);
3036 if (retval == DICT_ERR) {
3037 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3038 exit(1);
3039 }
3040 /* Set the expire time if needed */
3041 if (expiretime != -1) {
3042 setExpire(db,keyobj,expiretime);
3043 /* Delete this key if already expired */
3044 if (expiretime < now) deleteKey(db,keyobj);
3045 expiretime = -1;
3046 }
3047 keyobj = o = NULL;
3048 }
3049 fclose(fp);
3050 return REDIS_OK;
3051
3052 eoferr: /* unexpected end of file is handled here with a fatal exit */
3053 if (keyobj) decrRefCount(keyobj);
3054 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3055 exit(1);
3056 return REDIS_ERR; /* Just to avoid warning */
3057 }
3058
3059 /*================================== Commands =============================== */
3060
3061 static void authCommand(redisClient *c) {
3062 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3063 c->authenticated = 1;
3064 addReply(c,shared.ok);
3065 } else {
3066 c->authenticated = 0;
3067 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3068 }
3069 }
3070
3071 static void pingCommand(redisClient *c) {
3072 addReply(c,shared.pong);
3073 }
3074
3075 static void echoCommand(redisClient *c) {
3076 addReplyBulkLen(c,c->argv[1]);
3077 addReply(c,c->argv[1]);
3078 addReply(c,shared.crlf);
3079 }
3080
3081 /*=================================== Strings =============================== */
3082
3083 static void setGenericCommand(redisClient *c, int nx) {
3084 int retval;
3085
3086 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3087 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3088 if (retval == DICT_ERR) {
3089 if (!nx) {
3090 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3091 incrRefCount(c->argv[2]);
3092 } else {
3093 addReply(c,shared.czero);
3094 return;
3095 }
3096 } else {
3097 incrRefCount(c->argv[1]);
3098 incrRefCount(c->argv[2]);
3099 }
3100 server.dirty++;
3101 removeExpire(c->db,c->argv[1]);
3102 addReply(c, nx ? shared.cone : shared.ok);
3103 }
3104
3105 static void setCommand(redisClient *c) {
3106 setGenericCommand(c,0);
3107 }
3108
3109 static void setnxCommand(redisClient *c) {
3110 setGenericCommand(c,1);
3111 }
3112
3113 static int getGenericCommand(redisClient *c) {
3114 robj *o = lookupKeyRead(c->db,c->argv[1]);
3115
3116 if (o == NULL) {
3117 addReply(c,shared.nullbulk);
3118 return REDIS_OK;
3119 } else {
3120 if (o->type != REDIS_STRING) {
3121 addReply(c,shared.wrongtypeerr);
3122 return REDIS_ERR;
3123 } else {
3124 addReplyBulkLen(c,o);
3125 addReply(c,o);
3126 addReply(c,shared.crlf);
3127 return REDIS_OK;
3128 }
3129 }
3130 }
3131
3132 static void getCommand(redisClient *c) {
3133 getGenericCommand(c);
3134 }
3135
3136 static void getsetCommand(redisClient *c) {
3137 if (getGenericCommand(c) == REDIS_ERR) return;
3138 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3139 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3140 } else {
3141 incrRefCount(c->argv[1]);
3142 }
3143 incrRefCount(c->argv[2]);
3144 server.dirty++;
3145 removeExpire(c->db,c->argv[1]);
3146 }
3147
3148 static void mgetCommand(redisClient *c) {
3149 int j;
3150
3151 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3152 for (j = 1; j < c->argc; j++) {
3153 robj *o = lookupKeyRead(c->db,c->argv[j]);
3154 if (o == NULL) {
3155 addReply(c,shared.nullbulk);
3156 } else {
3157 if (o->type != REDIS_STRING) {
3158 addReply(c,shared.nullbulk);
3159 } else {
3160 addReplyBulkLen(c,o);
3161 addReply(c,o);
3162 addReply(c,shared.crlf);
3163 }
3164 }
3165 }
3166 }
3167
3168 static void msetGenericCommand(redisClient *c, int nx) {
3169 int j, busykeys = 0;
3170
3171 if ((c->argc % 2) == 0) {
3172 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3173 return;
3174 }
3175 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3176 * set nothing at all if at least one already key exists. */
3177 if (nx) {
3178 for (j = 1; j < c->argc; j += 2) {
3179 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3180 busykeys++;
3181 }
3182 }
3183 }
3184 if (busykeys) {
3185 addReply(c, shared.czero);
3186 return;
3187 }
3188
3189 for (j = 1; j < c->argc; j += 2) {
3190 int retval;
3191
3192 tryObjectEncoding(c->argv[j+1]);
3193 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3194 if (retval == DICT_ERR) {
3195 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3196 incrRefCount(c->argv[j+1]);
3197 } else {
3198 incrRefCount(c->argv[j]);
3199 incrRefCount(c->argv[j+1]);
3200 }
3201 removeExpire(c->db,c->argv[j]);
3202 }
3203 server.dirty += (c->argc-1)/2;
3204 addReply(c, nx ? shared.cone : shared.ok);
3205 }
3206
3207 static void msetCommand(redisClient *c) {
3208 msetGenericCommand(c,0);
3209 }
3210
3211 static void msetnxCommand(redisClient *c) {
3212 msetGenericCommand(c,1);
3213 }
3214
3215 static void incrDecrCommand(redisClient *c, long long incr) {
3216 long long value;
3217 int retval;
3218 robj *o;
3219
3220 o = lookupKeyWrite(c->db,c->argv[1]);
3221 if (o == NULL) {
3222 value = 0;
3223 } else {
3224 if (o->type != REDIS_STRING) {
3225 value = 0;
3226 } else {
3227 char *eptr;
3228
3229 if (o->encoding == REDIS_ENCODING_RAW)
3230 value = strtoll(o->ptr, &eptr, 10);
3231 else if (o->encoding == REDIS_ENCODING_INT)
3232 value = (long)o->ptr;
3233 else
3234 redisAssert(1 != 1);
3235 }
3236 }
3237
3238 value += incr;
3239 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3240 tryObjectEncoding(o);
3241 retval = dictAdd(c->db->dict,c->argv[1],o);
3242 if (retval == DICT_ERR) {
3243 dictReplace(c->db->dict,c->argv[1],o);
3244 removeExpire(c->db,c->argv[1]);
3245 } else {
3246 incrRefCount(c->argv[1]);
3247 }
3248 server.dirty++;
3249 addReply(c,shared.colon);
3250 addReply(c,o);
3251 addReply(c,shared.crlf);
3252 }
3253
3254 static void incrCommand(redisClient *c) {
3255 incrDecrCommand(c,1);
3256 }
3257
3258 static void decrCommand(redisClient *c) {
3259 incrDecrCommand(c,-1);
3260 }
3261
3262 static void incrbyCommand(redisClient *c) {
3263 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3264 incrDecrCommand(c,incr);
3265 }
3266
3267 static void decrbyCommand(redisClient *c) {
3268 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3269 incrDecrCommand(c,-incr);
3270 }
3271
3272 /* ========================= Type agnostic commands ========================= */
3273
3274 static void delCommand(redisClient *c) {
3275 int deleted = 0, j;
3276
3277 for (j = 1; j < c->argc; j++) {
3278 if (deleteKey(c->db,c->argv[j])) {
3279 server.dirty++;
3280 deleted++;
3281 }
3282 }
3283 switch(deleted) {
3284 case 0:
3285 addReply(c,shared.czero);
3286 break;
3287 case 1:
3288 addReply(c,shared.cone);
3289 break;
3290 default:
3291 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3292 break;
3293 }
3294 }
3295
3296 static void existsCommand(redisClient *c) {
3297 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3298 }
3299
3300 static void selectCommand(redisClient *c) {
3301 int id = atoi(c->argv[1]->ptr);
3302
3303 if (selectDb(c,id) == REDIS_ERR) {
3304 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3305 } else {
3306 addReply(c,shared.ok);
3307 }
3308 }
3309
3310 static void randomkeyCommand(redisClient *c) {
3311 dictEntry *de;
3312
3313 while(1) {
3314 de = dictGetRandomKey(c->db->dict);
3315 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3316 }
3317 if (de == NULL) {
3318 addReply(c,shared.plus);
3319 addReply(c,shared.crlf);
3320 } else {
3321 addReply(c,shared.plus);
3322 addReply(c,dictGetEntryKey(de));
3323 addReply(c,shared.crlf);
3324 }
3325 }
3326
3327 static void keysCommand(redisClient *c) {
3328 dictIterator *di;
3329 dictEntry *de;
3330 sds pattern = c->argv[1]->ptr;
3331 int plen = sdslen(pattern);
3332 unsigned long numkeys = 0, keyslen = 0;
3333 robj *lenobj = createObject(REDIS_STRING,NULL);
3334
3335 di = dictGetIterator(c->db->dict);
3336 addReply(c,lenobj);
3337 decrRefCount(lenobj);
3338 while((de = dictNext(di)) != NULL) {
3339 robj *keyobj = dictGetEntryKey(de);
3340
3341 sds key = keyobj->ptr;
3342 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3343 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3344 if (expireIfNeeded(c->db,keyobj) == 0) {
3345 if (numkeys != 0)
3346 addReply(c,shared.space);
3347 addReply(c,keyobj);
3348 numkeys++;
3349 keyslen += sdslen(key);
3350 }
3351 }
3352 }
3353 dictReleaseIterator(di);
3354 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3355 addReply(c,shared.crlf);
3356 }
3357
3358 static void dbsizeCommand(redisClient *c) {
3359 addReplySds(c,
3360 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3361 }
3362
3363 static void lastsaveCommand(redisClient *c) {
3364 addReplySds(c,
3365 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3366 }
3367
3368 static void typeCommand(redisClient *c) {
3369 robj *o;
3370 char *type;
3371
3372 o = lookupKeyRead(c->db,c->argv[1]);
3373 if (o == NULL) {
3374 type = "+none";
3375 } else {
3376 switch(o->type) {
3377 case REDIS_STRING: type = "+string"; break;
3378 case REDIS_LIST: type = "+list"; break;
3379 case REDIS_SET: type = "+set"; break;
3380 case REDIS_ZSET: type = "+zset"; break;
3381 default: type = "unknown"; break;
3382 }
3383 }
3384 addReplySds(c,sdsnew(type));
3385 addReply(c,shared.crlf);
3386 }
3387
3388 static void saveCommand(redisClient *c) {
3389 if (server.bgsavechildpid != -1) {
3390 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3391 return;
3392 }
3393 if (rdbSave(server.dbfilename) == REDIS_OK) {
3394 addReply(c,shared.ok);
3395 } else {
3396 addReply(c,shared.err);
3397 }
3398 }
3399
3400 static void bgsaveCommand(redisClient *c) {
3401 if (server.bgsavechildpid != -1) {
3402 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3403 return;
3404 }
3405 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3406 char *status = "+Background saving started\r\n";
3407 addReplySds(c,sdsnew(status));
3408 } else {
3409 addReply(c,shared.err);
3410 }
3411 }
3412
3413 static void shutdownCommand(redisClient *c) {
3414 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3415 /* Kill the saving child if there is a background saving in progress.
3416 We want to avoid race conditions, for instance our saving child may
3417 overwrite the synchronous saving did by SHUTDOWN. */
3418 if (server.bgsavechildpid != -1) {
3419 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3420 kill(server.bgsavechildpid,SIGKILL);
3421 rdbRemoveTempFile(server.bgsavechildpid);
3422 }
3423 if (server.appendonly) {
3424 /* Append only file: fsync() the AOF and exit */
3425 fsync(server.appendfd);
3426 exit(0);
3427 } else {
3428 /* Snapshotting. Perform a SYNC SAVE and exit */
3429 if (rdbSave(server.dbfilename) == REDIS_OK) {
3430 if (server.daemonize)
3431 unlink(server.pidfile);
3432 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3433 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3434 exit(0);
3435 } else {
3436 /* Ooops.. error saving! The best we can do is to continue operating.
3437 * Note that if there was a background saving process, in the next
3438 * cron() Redis will be notified that the background saving aborted,
3439 * handling special stuff like slaves pending for synchronization... */
3440 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3441 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3442 }
3443 }
3444 }
3445
3446 static void renameGenericCommand(redisClient *c, int nx) {
3447 robj *o;
3448
3449 /* To use the same key as src and dst is probably an error */
3450 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3451 addReply(c,shared.sameobjecterr);
3452 return;
3453 }
3454
3455 o = lookupKeyWrite(c->db,c->argv[1]);
3456 if (o == NULL) {
3457 addReply(c,shared.nokeyerr);
3458 return;
3459 }
3460 incrRefCount(o);
3461 deleteIfVolatile(c->db,c->argv[2]);
3462 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3463 if (nx) {
3464 decrRefCount(o);
3465 addReply(c,shared.czero);
3466 return;
3467 }
3468 dictReplace(c->db->dict,c->argv[2],o);
3469 } else {
3470 incrRefCount(c->argv[2]);
3471 }
3472 deleteKey(c->db,c->argv[1]);
3473 server.dirty++;
3474 addReply(c,nx ? shared.cone : shared.ok);
3475 }
3476
3477 static void renameCommand(redisClient *c) {
3478 renameGenericCommand(c,0);
3479 }
3480
3481 static void renamenxCommand(redisClient *c) {
3482 renameGenericCommand(c,1);
3483 }
3484
3485 static void moveCommand(redisClient *c) {
3486 robj *o;
3487 redisDb *src, *dst;
3488 int srcid;
3489
3490 /* Obtain source and target DB pointers */
3491 src = c->db;
3492 srcid = c->db->id;
3493 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3494 addReply(c,shared.outofrangeerr);
3495 return;
3496 }
3497 dst = c->db;
3498 selectDb(c,srcid); /* Back to the source DB */
3499
3500 /* If the user is moving using as target the same
3501 * DB as the source DB it is probably an error. */
3502 if (src == dst) {
3503 addReply(c,shared.sameobjecterr);
3504 return;
3505 }
3506
3507 /* Check if the element exists and get a reference */
3508 o = lookupKeyWrite(c->db,c->argv[1]);
3509 if (!o) {
3510 addReply(c,shared.czero);
3511 return;
3512 }
3513
3514 /* Try to add the element to the target DB */
3515 deleteIfVolatile(dst,c->argv[1]);
3516 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3517 addReply(c,shared.czero);
3518 return;
3519 }
3520 incrRefCount(c->argv[1]);
3521 incrRefCount(o);
3522
3523 /* OK! key moved, free the entry in the source DB */
3524 deleteKey(src,c->argv[1]);
3525 server.dirty++;
3526 addReply(c,shared.cone);
3527 }
3528
3529 /* =================================== Lists ================================ */
3530 static void pushGenericCommand(redisClient *c, int where) {
3531 robj *lobj;
3532 list *list;
3533
3534 lobj = lookupKeyWrite(c->db,c->argv[1]);
3535 if (lobj == NULL) {
3536 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3537 addReply(c,shared.ok);
3538 return;
3539 }
3540 lobj = createListObject();
3541 list = lobj->ptr;
3542 if (where == REDIS_HEAD) {
3543 listAddNodeHead(list,c->argv[2]);
3544 } else {
3545 listAddNodeTail(list,c->argv[2]);
3546 }
3547 dictAdd(c->db->dict,c->argv[1],lobj);
3548 incrRefCount(c->argv[1]);
3549 incrRefCount(c->argv[2]);
3550 } else {
3551 if (lobj->type != REDIS_LIST) {
3552 addReply(c,shared.wrongtypeerr);
3553 return;
3554 }
3555 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3556 addReply(c,shared.ok);
3557 return;
3558 }
3559 list = lobj->ptr;
3560 if (where == REDIS_HEAD) {
3561 listAddNodeHead(list,c->argv[2]);
3562 } else {
3563 listAddNodeTail(list,c->argv[2]);
3564 }
3565 incrRefCount(c->argv[2]);
3566 }
3567 server.dirty++;
3568 addReply(c,shared.ok);
3569 }
3570
3571 static void lpushCommand(redisClient *c) {
3572 pushGenericCommand(c,REDIS_HEAD);
3573 }
3574
3575 static void rpushCommand(redisClient *c) {
3576 pushGenericCommand(c,REDIS_TAIL);
3577 }
3578
3579 static void llenCommand(redisClient *c) {
3580 robj *o;
3581 list *l;
3582
3583 o = lookupKeyRead(c->db,c->argv[1]);
3584 if (o == NULL) {
3585 addReply(c,shared.czero);
3586 return;
3587 } else {
3588 if (o->type != REDIS_LIST) {
3589 addReply(c,shared.wrongtypeerr);
3590 } else {
3591 l = o->ptr;
3592 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3593 }
3594 }
3595 }
3596
3597 static void lindexCommand(redisClient *c) {
3598 robj *o;
3599 int index = atoi(c->argv[2]->ptr);
3600
3601 o = lookupKeyRead(c->db,c->argv[1]);
3602 if (o == NULL) {
3603 addReply(c,shared.nullbulk);
3604 } else {
3605 if (o->type != REDIS_LIST) {
3606 addReply(c,shared.wrongtypeerr);
3607 } else {
3608 list *list = o->ptr;
3609 listNode *ln;
3610
3611 ln = listIndex(list, index);
3612 if (ln == NULL) {
3613 addReply(c,shared.nullbulk);
3614 } else {
3615 robj *ele = listNodeValue(ln);
3616 addReplyBulkLen(c,ele);
3617 addReply(c,ele);
3618 addReply(c,shared.crlf);
3619 }
3620 }
3621 }
3622 }
3623
3624 static void lsetCommand(redisClient *c) {
3625 robj *o;
3626 int index = atoi(c->argv[2]->ptr);
3627
3628 o = lookupKeyWrite(c->db,c->argv[1]);
3629 if (o == NULL) {
3630 addReply(c,shared.nokeyerr);
3631 } else {
3632 if (o->type != REDIS_LIST) {
3633 addReply(c,shared.wrongtypeerr);
3634 } else {
3635 list *list = o->ptr;
3636 listNode *ln;
3637
3638 ln = listIndex(list, index);
3639 if (ln == NULL) {
3640 addReply(c,shared.outofrangeerr);
3641 } else {
3642 robj *ele = listNodeValue(ln);
3643
3644 decrRefCount(ele);
3645 listNodeValue(ln) = c->argv[3];
3646 incrRefCount(c->argv[3]);
3647 addReply(c,shared.ok);
3648 server.dirty++;
3649 }
3650 }
3651 }
3652 }
3653
3654 static void popGenericCommand(redisClient *c, int where) {
3655 robj *o;
3656
3657 o = lookupKeyWrite(c->db,c->argv[1]);
3658 if (o == NULL) {
3659 addReply(c,shared.nullbulk);
3660 } else {
3661 if (o->type != REDIS_LIST) {
3662 addReply(c,shared.wrongtypeerr);
3663 } else {
3664 list *list = o->ptr;
3665 listNode *ln;
3666
3667 if (where == REDIS_HEAD)
3668 ln = listFirst(list);
3669 else
3670 ln = listLast(list);
3671
3672 if (ln == NULL) {
3673 addReply(c,shared.nullbulk);
3674 } else {
3675 robj *ele = listNodeValue(ln);
3676 addReplyBulkLen(c,ele);
3677 addReply(c,ele);
3678 addReply(c,shared.crlf);
3679 listDelNode(list,ln);
3680 server.dirty++;
3681 }
3682 }
3683 }
3684 }
3685
3686 static void lpopCommand(redisClient *c) {
3687 popGenericCommand(c,REDIS_HEAD);
3688 }
3689
3690 static void rpopCommand(redisClient *c) {
3691 popGenericCommand(c,REDIS_TAIL);
3692 }
3693
3694 static void lrangeCommand(redisClient *c) {
3695 robj *o;
3696 int start = atoi(c->argv[2]->ptr);
3697 int end = atoi(c->argv[3]->ptr);
3698
3699 o = lookupKeyRead(c->db,c->argv[1]);
3700 if (o == NULL) {
3701 addReply(c,shared.nullmultibulk);
3702 } else {
3703 if (o->type != REDIS_LIST) {
3704 addReply(c,shared.wrongtypeerr);
3705 } else {
3706 list *list = o->ptr;
3707 listNode *ln;
3708 int llen = listLength(list);
3709 int rangelen, j;
3710 robj *ele;
3711
3712 /* convert negative indexes */
3713 if (start < 0) start = llen+start;
3714 if (end < 0) end = llen+end;
3715 if (start < 0) start = 0;
3716 if (end < 0) end = 0;
3717
3718 /* indexes sanity checks */
3719 if (start > end || start >= llen) {
3720 /* Out of range start or start > end result in empty list */
3721 addReply(c,shared.emptymultibulk);
3722 return;
3723 }
3724 if (end >= llen) end = llen-1;
3725 rangelen = (end-start)+1;
3726
3727 /* Return the result in form of a multi-bulk reply */
3728 ln = listIndex(list, start);
3729 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3730 for (j = 0; j < rangelen; j++) {
3731 ele = listNodeValue(ln);
3732 addReplyBulkLen(c,ele);
3733 addReply(c,ele);
3734 addReply(c,shared.crlf);
3735 ln = ln->next;
3736 }
3737 }
3738 }
3739 }
3740
3741 static void ltrimCommand(redisClient *c) {
3742 robj *o;
3743 int start = atoi(c->argv[2]->ptr);
3744 int end = atoi(c->argv[3]->ptr);
3745
3746 o = lookupKeyWrite(c->db,c->argv[1]);
3747 if (o == NULL) {
3748 addReply(c,shared.ok);
3749 } else {
3750 if (o->type != REDIS_LIST) {
3751 addReply(c,shared.wrongtypeerr);
3752 } else {
3753 list *list = o->ptr;
3754 listNode *ln;
3755 int llen = listLength(list);
3756 int j, ltrim, rtrim;
3757
3758 /* convert negative indexes */
3759 if (start < 0) start = llen+start;
3760 if (end < 0) end = llen+end;
3761 if (start < 0) start = 0;
3762 if (end < 0) end = 0;
3763
3764 /* indexes sanity checks */
3765 if (start > end || start >= llen) {
3766 /* Out of range start or start > end result in empty list */
3767 ltrim = llen;
3768 rtrim = 0;
3769 } else {
3770 if (end >= llen) end = llen-1;
3771 ltrim = start;
3772 rtrim = llen-end-1;
3773 }
3774
3775 /* Remove list elements to perform the trim */
3776 for (j = 0; j < ltrim; j++) {
3777 ln = listFirst(list);
3778 listDelNode(list,ln);
3779 }
3780 for (j = 0; j < rtrim; j++) {
3781 ln = listLast(list);
3782 listDelNode(list,ln);
3783 }
3784 server.dirty++;
3785 addReply(c,shared.ok);
3786 }
3787 }
3788 }
3789
3790 static void lremCommand(redisClient *c) {
3791 robj *o;
3792
3793 o = lookupKeyWrite(c->db,c->argv[1]);
3794 if (o == NULL) {
3795 addReply(c,shared.czero);
3796 } else {
3797 if (o->type != REDIS_LIST) {
3798 addReply(c,shared.wrongtypeerr);
3799 } else {
3800 list *list = o->ptr;
3801 listNode *ln, *next;
3802 int toremove = atoi(c->argv[2]->ptr);
3803 int removed = 0;
3804 int fromtail = 0;
3805
3806 if (toremove < 0) {
3807 toremove = -toremove;
3808 fromtail = 1;
3809 }
3810 ln = fromtail ? list->tail : list->head;
3811 while (ln) {
3812 robj *ele = listNodeValue(ln);
3813
3814 next = fromtail ? ln->prev : ln->next;
3815 if (compareStringObjects(ele,c->argv[3]) == 0) {
3816 listDelNode(list,ln);
3817 server.dirty++;
3818 removed++;
3819 if (toremove && removed == toremove) break;
3820 }
3821 ln = next;
3822 }
3823 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3824 }
3825 }
3826 }
3827
3828 /* This is the semantic of this command:
3829 * RPOPLPUSH srclist dstlist:
3830 * IF LLEN(srclist) > 0
3831 * element = RPOP srclist
3832 * LPUSH dstlist element
3833 * RETURN element
3834 * ELSE
3835 * RETURN nil
3836 * END
3837 * END
3838 *
3839 * The idea is to be able to get an element from a list in a reliable way
3840 * since the element is not just returned but pushed against another list
3841 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3842 */
3843 static void rpoplpushcommand(redisClient *c) {
3844 robj *sobj;
3845
3846 sobj = lookupKeyWrite(c->db,c->argv[1]);
3847 if (sobj == NULL) {
3848 addReply(c,shared.nullbulk);
3849 } else {
3850 if (sobj->type != REDIS_LIST) {
3851 addReply(c,shared.wrongtypeerr);
3852 } else {
3853 list *srclist = sobj->ptr;
3854 listNode *ln = listLast(srclist);
3855
3856 if (ln == NULL) {
3857 addReply(c,shared.nullbulk);
3858 } else {
3859 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3860 robj *ele = listNodeValue(ln);
3861 list *dstlist;
3862
3863 if (dobj == NULL) {
3864
3865 /* Create the list if the key does not exist */
3866 dobj = createListObject();
3867 dictAdd(c->db->dict,c->argv[2],dobj);
3868 incrRefCount(c->argv[2]);
3869 } else if (dobj->type != REDIS_LIST) {
3870 addReply(c,shared.wrongtypeerr);
3871 return;
3872 }
3873 /* Add the element to the target list */
3874 dstlist = dobj->ptr;
3875 listAddNodeHead(dstlist,ele);
3876 incrRefCount(ele);
3877
3878 /* Send the element to the client as reply as well */
3879 addReplyBulkLen(c,ele);
3880 addReply(c,ele);
3881 addReply(c,shared.crlf);
3882
3883 /* Finally remove the element from the source list */
3884 listDelNode(srclist,ln);
3885 server.dirty++;
3886 }
3887 }
3888 }
3889 }
3890
3891
3892 /* ==================================== Sets ================================ */
3893
3894 static void saddCommand(redisClient *c) {
3895 robj *set;
3896
3897 set = lookupKeyWrite(c->db,c->argv[1]);
3898 if (set == NULL) {
3899 set = createSetObject();
3900 dictAdd(c->db->dict,c->argv[1],set);
3901 incrRefCount(c->argv[1]);
3902 } else {
3903 if (set->type != REDIS_SET) {
3904 addReply(c,shared.wrongtypeerr);
3905 return;
3906 }
3907 }
3908 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3909 incrRefCount(c->argv[2]);
3910 server.dirty++;
3911 addReply(c,shared.cone);
3912 } else {
3913 addReply(c,shared.czero);
3914 }
3915 }
3916
3917 static void sremCommand(redisClient *c) {
3918 robj *set;
3919
3920 set = lookupKeyWrite(c->db,c->argv[1]);
3921 if (set == NULL) {
3922 addReply(c,shared.czero);
3923 } else {
3924 if (set->type != REDIS_SET) {
3925 addReply(c,shared.wrongtypeerr);
3926 return;
3927 }
3928 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3929 server.dirty++;
3930 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3931 addReply(c,shared.cone);
3932 } else {
3933 addReply(c,shared.czero);
3934 }
3935 }
3936 }
3937
3938 static void smoveCommand(redisClient *c) {
3939 robj *srcset, *dstset;
3940
3941 srcset = lookupKeyWrite(c->db,c->argv[1]);
3942 dstset = lookupKeyWrite(c->db,c->argv[2]);
3943
3944 /* If the source key does not exist return 0, if it's of the wrong type
3945 * raise an error */
3946 if (srcset == NULL || srcset->type != REDIS_SET) {
3947 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3948 return;
3949 }
3950 /* Error if the destination key is not a set as well */
3951 if (dstset && dstset->type != REDIS_SET) {
3952 addReply(c,shared.wrongtypeerr);
3953 return;
3954 }
3955 /* Remove the element from the source set */
3956 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3957 /* Key not found in the src set! return zero */
3958 addReply(c,shared.czero);
3959 return;
3960 }
3961 server.dirty++;
3962 /* Add the element to the destination set */
3963 if (!dstset) {
3964 dstset = createSetObject();
3965 dictAdd(c->db->dict,c->argv[2],dstset);
3966 incrRefCount(c->argv[2]);
3967 }
3968 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3969 incrRefCount(c->argv[3]);
3970 addReply(c,shared.cone);
3971 }
3972
3973 static void sismemberCommand(redisClient *c) {
3974 robj *set;
3975
3976 set = lookupKeyRead(c->db,c->argv[1]);
3977 if (set == NULL) {
3978 addReply(c,shared.czero);
3979 } else {
3980 if (set->type != REDIS_SET) {
3981 addReply(c,shared.wrongtypeerr);
3982 return;
3983 }
3984 if (dictFind(set->ptr,c->argv[2]))
3985 addReply(c,shared.cone);
3986 else
3987 addReply(c,shared.czero);
3988 }
3989 }
3990
3991 static void scardCommand(redisClient *c) {
3992 robj *o;
3993 dict *s;
3994
3995 o = lookupKeyRead(c->db,c->argv[1]);
3996 if (o == NULL) {
3997 addReply(c,shared.czero);
3998 return;
3999 } else {
4000 if (o->type != REDIS_SET) {
4001 addReply(c,shared.wrongtypeerr);
4002 } else {
4003 s = o->ptr;
4004 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4005 dictSize(s)));
4006 }
4007 }
4008 }
4009
4010 static void spopCommand(redisClient *c) {
4011 robj *set;
4012 dictEntry *de;
4013
4014 set = lookupKeyWrite(c->db,c->argv[1]);
4015 if (set == NULL) {
4016 addReply(c,shared.nullbulk);
4017 } else {
4018 if (set->type != REDIS_SET) {
4019 addReply(c,shared.wrongtypeerr);
4020 return;
4021 }
4022 de = dictGetRandomKey(set->ptr);
4023 if (de == NULL) {
4024 addReply(c,shared.nullbulk);
4025 } else {
4026 robj *ele = dictGetEntryKey(de);
4027
4028 addReplyBulkLen(c,ele);
4029 addReply(c,ele);
4030 addReply(c,shared.crlf);
4031 dictDelete(set->ptr,ele);
4032 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4033 server.dirty++;
4034 }
4035 }
4036 }
4037
4038 static void srandmemberCommand(redisClient *c) {
4039 robj *set;
4040 dictEntry *de;
4041
4042 set = lookupKeyRead(c->db,c->argv[1]);
4043 if (set == NULL) {
4044 addReply(c,shared.nullbulk);
4045 } else {
4046 if (set->type != REDIS_SET) {
4047 addReply(c,shared.wrongtypeerr);
4048 return;
4049 }
4050 de = dictGetRandomKey(set->ptr);
4051 if (de == NULL) {
4052 addReply(c,shared.nullbulk);
4053 } else {
4054 robj *ele = dictGetEntryKey(de);
4055
4056 addReplyBulkLen(c,ele);
4057 addReply(c,ele);
4058 addReply(c,shared.crlf);
4059 }
4060 }
4061 }
4062
4063 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4064 dict **d1 = (void*) s1, **d2 = (void*) s2;
4065
4066 return dictSize(*d1)-dictSize(*d2);
4067 }
4068
4069 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4070 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4071 dictIterator *di;
4072 dictEntry *de;
4073 robj *lenobj = NULL, *dstset = NULL;
4074 unsigned long j, cardinality = 0;
4075
4076 for (j = 0; j < setsnum; j++) {
4077 robj *setobj;
4078
4079 setobj = dstkey ?
4080 lookupKeyWrite(c->db,setskeys[j]) :
4081 lookupKeyRead(c->db,setskeys[j]);
4082 if (!setobj) {
4083 zfree(dv);
4084 if (dstkey) {
4085 if (deleteKey(c->db,dstkey))
4086 server.dirty++;
4087 addReply(c,shared.czero);
4088 } else {
4089 addReply(c,shared.nullmultibulk);
4090 }
4091 return;
4092 }
4093 if (setobj->type != REDIS_SET) {
4094 zfree(dv);
4095 addReply(c,shared.wrongtypeerr);
4096 return;
4097 }
4098 dv[j] = setobj->ptr;
4099 }
4100 /* Sort sets from the smallest to largest, this will improve our
4101 * algorithm's performace */
4102 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4103
4104 /* The first thing we should output is the total number of elements...
4105 * since this is a multi-bulk write, but at this stage we don't know
4106 * the intersection set size, so we use a trick, append an empty object
4107 * to the output list and save the pointer to later modify it with the
4108 * right length */
4109 if (!dstkey) {
4110 lenobj = createObject(REDIS_STRING,NULL);
4111 addReply(c,lenobj);
4112 decrRefCount(lenobj);
4113 } else {
4114 /* If we have a target key where to store the resulting set
4115 * create this key with an empty set inside */
4116 dstset = createSetObject();
4117 }
4118
4119 /* Iterate all the elements of the first (smallest) set, and test
4120 * the element against all the other sets, if at least one set does
4121 * not include the element it is discarded */
4122 di = dictGetIterator(dv[0]);
4123
4124 while((de = dictNext(di)) != NULL) {
4125 robj *ele;
4126
4127 for (j = 1; j < setsnum; j++)
4128 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4129 if (j != setsnum)
4130 continue; /* at least one set does not contain the member */
4131 ele = dictGetEntryKey(de);
4132 if (!dstkey) {
4133 addReplyBulkLen(c,ele);
4134 addReply(c,ele);
4135 addReply(c,shared.crlf);
4136 cardinality++;
4137 } else {
4138 dictAdd(dstset->ptr,ele,NULL);
4139 incrRefCount(ele);
4140 }
4141 }
4142 dictReleaseIterator(di);
4143
4144 if (dstkey) {
4145 /* Store the resulting set into the target */
4146 deleteKey(c->db,dstkey);
4147 dictAdd(c->db->dict,dstkey,dstset);
4148 incrRefCount(dstkey);
4149 }
4150
4151 if (!dstkey) {
4152 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4153 } else {
4154 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4155 dictSize((dict*)dstset->ptr)));
4156 server.dirty++;
4157 }
4158 zfree(dv);
4159 }
4160
4161 static void sinterCommand(redisClient *c) {
4162 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4163 }
4164
4165 static void sinterstoreCommand(redisClient *c) {
4166 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4167 }
4168
4169 #define REDIS_OP_UNION 0
4170 #define REDIS_OP_DIFF 1
4171
4172 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4173 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4174 dictIterator *di;
4175 dictEntry *de;
4176 robj *dstset = NULL;
4177 int j, cardinality = 0;
4178
4179 for (j = 0; j < setsnum; j++) {
4180 robj *setobj;
4181
4182 setobj = dstkey ?
4183 lookupKeyWrite(c->db,setskeys[j]) :
4184 lookupKeyRead(c->db,setskeys[j]);
4185 if (!setobj) {
4186 dv[j] = NULL;
4187 continue;
4188 }
4189 if (setobj->type != REDIS_SET) {
4190 zfree(dv);
4191 addReply(c,shared.wrongtypeerr);
4192 return;
4193 }
4194 dv[j] = setobj->ptr;
4195 }
4196
4197 /* We need a temp set object to store our union. If the dstkey
4198 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4199 * this set object will be the resulting object to set into the target key*/
4200 dstset = createSetObject();
4201
4202 /* Iterate all the elements of all the sets, add every element a single
4203 * time to the result set */
4204 for (j = 0; j < setsnum; j++) {
4205 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4206 if (!dv[j]) continue; /* non existing keys are like empty sets */
4207
4208 di = dictGetIterator(dv[j]);
4209
4210 while((de = dictNext(di)) != NULL) {
4211 robj *ele;
4212
4213 /* dictAdd will not add the same element multiple times */
4214 ele = dictGetEntryKey(de);
4215 if (op == REDIS_OP_UNION || j == 0) {
4216 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4217 incrRefCount(ele);
4218 cardinality++;
4219 }
4220 } else if (op == REDIS_OP_DIFF) {
4221 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4222 cardinality--;
4223 }
4224 }
4225 }
4226 dictReleaseIterator(di);
4227
4228 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4229 }
4230
4231 /* Output the content of the resulting set, if not in STORE mode */
4232 if (!dstkey) {
4233 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4234 di = dictGetIterator(dstset->ptr);
4235 while((de = dictNext(di)) != NULL) {
4236 robj *ele;
4237
4238 ele = dictGetEntryKey(de);
4239 addReplyBulkLen(c,ele);
4240 addReply(c,ele);
4241 addReply(c,shared.crlf);
4242 }
4243 dictReleaseIterator(di);
4244 } else {
4245 /* If we have a target key where to store the resulting set
4246 * create this key with the result set inside */
4247 deleteKey(c->db,dstkey);
4248 dictAdd(c->db->dict,dstkey,dstset);
4249 incrRefCount(dstkey);
4250 }
4251
4252 /* Cleanup */
4253 if (!dstkey) {
4254 decrRefCount(dstset);
4255 } else {
4256 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4257 dictSize((dict*)dstset->ptr)));
4258 server.dirty++;
4259 }
4260 zfree(dv);
4261 }
4262
4263 static void sunionCommand(redisClient *c) {
4264 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4265 }
4266
4267 static void sunionstoreCommand(redisClient *c) {
4268 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4269 }
4270
4271 static void sdiffCommand(redisClient *c) {
4272 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4273 }
4274
4275 static void sdiffstoreCommand(redisClient *c) {
4276 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4277 }
4278
4279 /* ==================================== ZSets =============================== */
4280
4281 /* ZSETs are ordered sets using two data structures to hold the same elements
4282 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4283 * data structure.
4284 *
4285 * The elements are added to an hash table mapping Redis objects to scores.
4286 * At the same time the elements are added to a skip list mapping scores
4287 * to Redis objects (so objects are sorted by scores in this "view"). */
4288
4289 /* This skiplist implementation is almost a C translation of the original
4290 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4291 * Alternative to Balanced Trees", modified in three ways:
4292 * a) this implementation allows for repeated values.
4293 * b) the comparison is not just by key (our 'score') but by satellite data.
4294 * c) there is a back pointer, so it's a doubly linked list with the back
4295 * pointers being only at "level 1". This allows to traverse the list
4296 * from tail to head, useful for ZREVRANGE. */
4297
4298 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4299 zskiplistNode *zn = zmalloc(sizeof(*zn));
4300
4301 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4302 zn->score = score;
4303 zn->obj = obj;
4304 return zn;
4305 }
4306
4307 static zskiplist *zslCreate(void) {
4308 int j;
4309 zskiplist *zsl;
4310
4311 zsl = zmalloc(sizeof(*zsl));
4312 zsl->level = 1;
4313 zsl->length = 0;
4314 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4315 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4316 zsl->header->forward[j] = NULL;
4317 zsl->header->backward = NULL;
4318 zsl->tail = NULL;
4319 return zsl;
4320 }
4321
4322 static void zslFreeNode(zskiplistNode *node) {
4323 decrRefCount(node->obj);
4324 zfree(node->forward);
4325 zfree(node);
4326 }
4327
4328 static void zslFree(zskiplist *zsl) {
4329 zskiplistNode *node = zsl->header->forward[0], *next;
4330
4331 zfree(zsl->header->forward);
4332 zfree(zsl->header);
4333 while(node) {
4334 next = node->forward[0];
4335 zslFreeNode(node);
4336 node = next;
4337 }
4338 zfree(zsl);
4339 }
4340
4341 static int zslRandomLevel(void) {
4342 int level = 1;
4343 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4344 level += 1;
4345 return level;
4346 }
4347
4348 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4349 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4350 int i, level;
4351
4352 x = zsl->header;
4353 for (i = zsl->level-1; i >= 0; i--) {
4354 while (x->forward[i] &&
4355 (x->forward[i]->score < score ||
4356 (x->forward[i]->score == score &&
4357 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4358 x = x->forward[i];
4359 update[i] = x;
4360 }
4361 /* we assume the key is not already inside, since we allow duplicated
4362 * scores, and the re-insertion of score and redis object should never
4363 * happpen since the caller of zslInsert() should test in the hash table
4364 * if the element is already inside or not. */
4365 level = zslRandomLevel();
4366 if (level > zsl->level) {
4367 for (i = zsl->level; i < level; i++)
4368 update[i] = zsl->header;
4369 zsl->level = level;
4370 }
4371 x = zslCreateNode(level,score,obj);
4372 for (i = 0; i < level; i++) {
4373 x->forward[i] = update[i]->forward[i];
4374 update[i]->forward[i] = x;
4375 }
4376 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4377 if (x->forward[0])
4378 x->forward[0]->backward = x;
4379 else
4380 zsl->tail = x;
4381 zsl->length++;
4382 }
4383
4384 /* Delete an element with matching score/object from the skiplist. */
4385 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4386 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4387 int i;
4388
4389 x = zsl->header;
4390 for (i = zsl->level-1; i >= 0; i--) {
4391 while (x->forward[i] &&
4392 (x->forward[i]->score < score ||
4393 (x->forward[i]->score == score &&
4394 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4395 x = x->forward[i];
4396 update[i] = x;
4397 }
4398 /* We may have multiple elements with the same score, what we need
4399 * is to find the element with both the right score and object. */
4400 x = x->forward[0];
4401 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4402 for (i = 0; i < zsl->level; i++) {
4403 if (update[i]->forward[i] != x) break;
4404 update[i]->forward[i] = x->forward[i];
4405 }
4406 if (x->forward[0]) {
4407 x->forward[0]->backward = (x->backward == zsl->header) ?
4408 NULL : x->backward;
4409 } else {
4410 zsl->tail = x->backward;
4411 }
4412 zslFreeNode(x);
4413 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4414 zsl->level--;
4415 zsl->length--;
4416 return 1;
4417 } else {
4418 return 0; /* not found */
4419 }
4420 return 0; /* not found */
4421 }
4422
4423 /* Delete all the elements with score between min and max from the skiplist.
4424 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4425 * Note that this function takes the reference to the hash table view of the
4426 * sorted set, in order to remove the elements from the hash table too. */
4427 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4428 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4429 unsigned long removed = 0;
4430 int i;
4431
4432 x = zsl->header;
4433 for (i = zsl->level-1; i >= 0; i--) {
4434 while (x->forward[i] && x->forward[i]->score < min)
4435 x = x->forward[i];
4436 update[i] = x;
4437 }
4438 /* We may have multiple elements with the same score, what we need
4439 * is to find the element with both the right score and object. */
4440 x = x->forward[0];
4441 while (x && x->score <= max) {
4442 zskiplistNode *next;
4443
4444 for (i = 0; i < zsl->level; i++) {
4445 if (update[i]->forward[i] != x) break;
4446 update[i]->forward[i] = x->forward[i];
4447 }
4448 if (x->forward[0]) {
4449 x->forward[0]->backward = (x->backward == zsl->header) ?
4450 NULL : x->backward;
4451 } else {
4452 zsl->tail = x->backward;
4453 }
4454 next = x->forward[0];
4455 dictDelete(dict,x->obj);
4456 zslFreeNode(x);
4457 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4458 zsl->level--;
4459 zsl->length--;
4460 removed++;
4461 x = next;
4462 }
4463 return removed; /* not found */
4464 }
4465
4466 /* Find the first node having a score equal or greater than the specified one.
4467 * Returns NULL if there is no match. */
4468 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4469 zskiplistNode *x;
4470 int i;
4471
4472 x = zsl->header;
4473 for (i = zsl->level-1; i >= 0; i--) {
4474 while (x->forward[i] && x->forward[i]->score < score)
4475 x = x->forward[i];
4476 }
4477 /* We may have multiple elements with the same score, what we need
4478 * is to find the element with both the right score and object. */
4479 return x->forward[0];
4480 }
4481
4482 /* The actual Z-commands implementations */
4483
4484 /* This generic command implements both ZADD and ZINCRBY.
4485 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4486 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4487 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4488 robj *zsetobj;
4489 zset *zs;
4490 double *score;
4491
4492 zsetobj = lookupKeyWrite(c->db,key);
4493 if (zsetobj == NULL) {
4494 zsetobj = createZsetObject();
4495 dictAdd(c->db->dict,key,zsetobj);
4496 incrRefCount(key);
4497 } else {
4498 if (zsetobj->type != REDIS_ZSET) {
4499 addReply(c,shared.wrongtypeerr);
4500 return;
4501 }
4502 }
4503 zs = zsetobj->ptr;
4504
4505 /* Ok now since we implement both ZADD and ZINCRBY here the code
4506 * needs to handle the two different conditions. It's all about setting
4507 * '*score', that is, the new score to set, to the right value. */
4508 score = zmalloc(sizeof(double));
4509 if (doincrement) {
4510 dictEntry *de;
4511
4512 /* Read the old score. If the element was not present starts from 0 */
4513 de = dictFind(zs->dict,ele);
4514 if (de) {
4515 double *oldscore = dictGetEntryVal(de);
4516 *score = *oldscore + scoreval;
4517 } else {
4518 *score = scoreval;
4519 }
4520 } else {
4521 *score = scoreval;
4522 }
4523
4524 /* What follows is a simple remove and re-insert operation that is common
4525 * to both ZADD and ZINCRBY... */
4526 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4527 /* case 1: New element */
4528 incrRefCount(ele); /* added to hash */
4529 zslInsert(zs->zsl,*score,ele);
4530 incrRefCount(ele); /* added to skiplist */
4531 server.dirty++;
4532 if (doincrement)
4533 addReplyDouble(c,*score);
4534 else
4535 addReply(c,shared.cone);
4536 } else {
4537 dictEntry *de;
4538 double *oldscore;
4539
4540 /* case 2: Score update operation */
4541 de = dictFind(zs->dict,ele);
4542 redisAssert(de != NULL);
4543 oldscore = dictGetEntryVal(de);
4544 if (*score != *oldscore) {
4545 int deleted;
4546
4547 /* Remove and insert the element in the skip list with new score */
4548 deleted = zslDelete(zs->zsl,*oldscore,ele);
4549 redisAssert(deleted != 0);
4550 zslInsert(zs->zsl,*score,ele);
4551 incrRefCount(ele);
4552 /* Update the score in the hash table */
4553 dictReplace(zs->dict,ele,score);
4554 server.dirty++;
4555 } else {
4556 zfree(score);
4557 }
4558 if (doincrement)
4559 addReplyDouble(c,*score);
4560 else
4561 addReply(c,shared.czero);
4562 }
4563 }
4564
4565 static void zaddCommand(redisClient *c) {
4566 double scoreval;
4567
4568 scoreval = strtod(c->argv[2]->ptr,NULL);
4569 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4570 }
4571
4572 static void zincrbyCommand(redisClient *c) {
4573 double scoreval;
4574
4575 scoreval = strtod(c->argv[2]->ptr,NULL);
4576 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4577 }
4578
4579 static void zremCommand(redisClient *c) {
4580 robj *zsetobj;
4581 zset *zs;
4582
4583 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4584 if (zsetobj == NULL) {
4585 addReply(c,shared.czero);
4586 } else {
4587 dictEntry *de;
4588 double *oldscore;
4589 int deleted;
4590
4591 if (zsetobj->type != REDIS_ZSET) {
4592 addReply(c,shared.wrongtypeerr);
4593 return;
4594 }
4595 zs = zsetobj->ptr;
4596 de = dictFind(zs->dict,c->argv[2]);
4597 if (de == NULL) {
4598 addReply(c,shared.czero);
4599 return;
4600 }
4601 /* Delete from the skiplist */
4602 oldscore = dictGetEntryVal(de);
4603 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4604 redisAssert(deleted != 0);
4605
4606 /* Delete from the hash table */
4607 dictDelete(zs->dict,c->argv[2]);
4608 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4609 server.dirty++;
4610 addReply(c,shared.cone);
4611 }
4612 }
4613
4614 static void zremrangebyscoreCommand(redisClient *c) {
4615 double min = strtod(c->argv[2]->ptr,NULL);
4616 double max = strtod(c->argv[3]->ptr,NULL);
4617 robj *zsetobj;
4618 zset *zs;
4619
4620 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4621 if (zsetobj == NULL) {
4622 addReply(c,shared.czero);
4623 } else {
4624 long deleted;
4625
4626 if (zsetobj->type != REDIS_ZSET) {
4627 addReply(c,shared.wrongtypeerr);
4628 return;
4629 }
4630 zs = zsetobj->ptr;
4631 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4632 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4633 server.dirty += deleted;
4634 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4635 }
4636 }
4637
4638 static void zrangeGenericCommand(redisClient *c, int reverse) {
4639 robj *o;
4640 int start = atoi(c->argv[2]->ptr);
4641 int end = atoi(c->argv[3]->ptr);
4642 int withscores = 0;
4643
4644 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4645 withscores = 1;
4646 } else if (c->argc >= 5) {
4647 addReply(c,shared.syntaxerr);
4648 return;
4649 }
4650
4651 o = lookupKeyRead(c->db,c->argv[1]);
4652 if (o == NULL) {
4653 addReply(c,shared.nullmultibulk);
4654 } else {
4655 if (o->type != REDIS_ZSET) {
4656 addReply(c,shared.wrongtypeerr);
4657 } else {
4658 zset *zsetobj = o->ptr;
4659 zskiplist *zsl = zsetobj->zsl;
4660 zskiplistNode *ln;
4661
4662 int llen = zsl->length;
4663 int rangelen, j;
4664 robj *ele;
4665
4666 /* convert negative indexes */
4667 if (start < 0) start = llen+start;
4668 if (end < 0) end = llen+end;
4669 if (start < 0) start = 0;
4670 if (end < 0) end = 0;
4671
4672 /* indexes sanity checks */
4673 if (start > end || start >= llen) {
4674 /* Out of range start or start > end result in empty list */
4675 addReply(c,shared.emptymultibulk);
4676 return;
4677 }
4678 if (end >= llen) end = llen-1;
4679 rangelen = (end-start)+1;
4680
4681 /* Return the result in form of a multi-bulk reply */
4682 if (reverse) {
4683 ln = zsl->tail;
4684 while (start--)
4685 ln = ln->backward;
4686 } else {
4687 ln = zsl->header->forward[0];
4688 while (start--)
4689 ln = ln->forward[0];
4690 }
4691
4692 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4693 withscores ? (rangelen*2) : rangelen));
4694 for (j = 0; j < rangelen; j++) {
4695 ele = ln->obj;
4696 addReplyBulkLen(c,ele);
4697 addReply(c,ele);
4698 addReply(c,shared.crlf);
4699 if (withscores)
4700 addReplyDouble(c,ln->score);
4701 ln = reverse ? ln->backward : ln->forward[0];
4702 }
4703 }
4704 }
4705 }
4706
4707 static void zrangeCommand(redisClient *c) {
4708 zrangeGenericCommand(c,0);
4709 }
4710
4711 static void zrevrangeCommand(redisClient *c) {
4712 zrangeGenericCommand(c,1);
4713 }
4714
4715 static void zrangebyscoreCommand(redisClient *c) {
4716 robj *o;
4717 double min = strtod(c->argv[2]->ptr,NULL);
4718 double max = strtod(c->argv[3]->ptr,NULL);
4719 int offset = 0, limit = -1;
4720
4721 if (c->argc != 4 && c->argc != 7) {
4722 addReplySds(c,
4723 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4724 return;
4725 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4726 addReply(c,shared.syntaxerr);
4727 return;
4728 } else if (c->argc == 7) {
4729 offset = atoi(c->argv[5]->ptr);
4730 limit = atoi(c->argv[6]->ptr);
4731 if (offset < 0) offset = 0;
4732 }
4733
4734 o = lookupKeyRead(c->db,c->argv[1]);
4735 if (o == NULL) {
4736 addReply(c,shared.nullmultibulk);
4737 } else {
4738 if (o->type != REDIS_ZSET) {
4739 addReply(c,shared.wrongtypeerr);
4740 } else {
4741 zset *zsetobj = o->ptr;
4742 zskiplist *zsl = zsetobj->zsl;
4743 zskiplistNode *ln;
4744 robj *ele, *lenobj;
4745 unsigned int rangelen = 0;
4746
4747 /* Get the first node with the score >= min */
4748 ln = zslFirstWithScore(zsl,min);
4749 if (ln == NULL) {
4750 /* No element matching the speciifed interval */
4751 addReply(c,shared.emptymultibulk);
4752 return;
4753 }
4754
4755 /* We don't know in advance how many matching elements there
4756 * are in the list, so we push this object that will represent
4757 * the multi-bulk length in the output buffer, and will "fix"
4758 * it later */
4759 lenobj = createObject(REDIS_STRING,NULL);
4760 addReply(c,lenobj);
4761 decrRefCount(lenobj);
4762
4763 while(ln && ln->score <= max) {
4764 if (offset) {
4765 offset--;
4766 ln = ln->forward[0];
4767 continue;
4768 }
4769 if (limit == 0) break;
4770 ele = ln->obj;
4771 addReplyBulkLen(c,ele);
4772 addReply(c,ele);
4773 addReply(c,shared.crlf);
4774 ln = ln->forward[0];
4775 rangelen++;
4776 if (limit > 0) limit--;
4777 }
4778 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4779 }
4780 }
4781 }
4782
4783 static void zcardCommand(redisClient *c) {
4784 robj *o;
4785 zset *zs;
4786
4787 o = lookupKeyRead(c->db,c->argv[1]);
4788 if (o == NULL) {
4789 addReply(c,shared.czero);
4790 return;
4791 } else {
4792 if (o->type != REDIS_ZSET) {
4793 addReply(c,shared.wrongtypeerr);
4794 } else {
4795 zs = o->ptr;
4796 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4797 }
4798 }
4799 }
4800
4801 static void zscoreCommand(redisClient *c) {
4802 robj *o;
4803 zset *zs;
4804
4805 o = lookupKeyRead(c->db,c->argv[1]);
4806 if (o == NULL) {
4807 addReply(c,shared.nullbulk);
4808 return;
4809 } else {
4810 if (o->type != REDIS_ZSET) {
4811 addReply(c,shared.wrongtypeerr);
4812 } else {
4813 dictEntry *de;
4814
4815 zs = o->ptr;
4816 de = dictFind(zs->dict,c->argv[2]);
4817 if (!de) {
4818 addReply(c,shared.nullbulk);
4819 } else {
4820 double *score = dictGetEntryVal(de);
4821
4822 addReplyDouble(c,*score);
4823 }
4824 }
4825 }
4826 }
4827
4828 /* ========================= Non type-specific commands ==================== */
4829
4830 static void flushdbCommand(redisClient *c) {
4831 server.dirty += dictSize(c->db->dict);
4832 dictEmpty(c->db->dict);
4833 dictEmpty(c->db->expires);
4834 addReply(c,shared.ok);
4835 }
4836
4837 static void flushallCommand(redisClient *c) {
4838 server.dirty += emptyDb();
4839 addReply(c,shared.ok);
4840 rdbSave(server.dbfilename);
4841 server.dirty++;
4842 }
4843
4844 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4845 redisSortOperation *so = zmalloc(sizeof(*so));
4846 so->type = type;
4847 so->pattern = pattern;
4848 return so;
4849 }
4850
4851 /* Return the value associated to the key with a name obtained
4852 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4853 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4854 char *p;
4855 sds spat, ssub;
4856 robj keyobj;
4857 int prefixlen, sublen, postfixlen;
4858 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4859 struct {
4860 long len;
4861 long free;
4862 char buf[REDIS_SORTKEY_MAX+1];
4863 } keyname;
4864
4865 /* If the pattern is "#" return the substitution object itself in order
4866 * to implement the "SORT ... GET #" feature. */
4867 spat = pattern->ptr;
4868 if (spat[0] == '#' && spat[1] == '\0') {
4869 return subst;
4870 }
4871
4872 /* The substitution object may be specially encoded. If so we create
4873 * a decoded object on the fly. Otherwise getDecodedObject will just
4874 * increment the ref count, that we'll decrement later. */
4875 subst = getDecodedObject(subst);
4876
4877 ssub = subst->ptr;
4878 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4879 p = strchr(spat,'*');
4880 if (!p) {
4881 decrRefCount(subst);
4882 return NULL;
4883 }
4884
4885 prefixlen = p-spat;
4886 sublen = sdslen(ssub);
4887 postfixlen = sdslen(spat)-(prefixlen+1);
4888 memcpy(keyname.buf,spat,prefixlen);
4889 memcpy(keyname.buf+prefixlen,ssub,sublen);
4890 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4891 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4892 keyname.len = prefixlen+sublen+postfixlen;
4893
4894 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4895 decrRefCount(subst);
4896
4897 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4898 return lookupKeyRead(db,&keyobj);
4899 }
4900
4901 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4902 * the additional parameter is not standard but a BSD-specific we have to
4903 * pass sorting parameters via the global 'server' structure */
4904 static int sortCompare(const void *s1, const void *s2) {
4905 const redisSortObject *so1 = s1, *so2 = s2;
4906 int cmp;
4907
4908 if (!server.sort_alpha) {
4909 /* Numeric sorting. Here it's trivial as we precomputed scores */
4910 if (so1->u.score > so2->u.score) {
4911 cmp = 1;
4912 } else if (so1->u.score < so2->u.score) {
4913 cmp = -1;
4914 } else {
4915 cmp = 0;
4916 }
4917 } else {
4918 /* Alphanumeric sorting */
4919 if (server.sort_bypattern) {
4920 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4921 /* At least one compare object is NULL */
4922 if (so1->u.cmpobj == so2->u.cmpobj)
4923 cmp = 0;
4924 else if (so1->u.cmpobj == NULL)
4925 cmp = -1;
4926 else
4927 cmp = 1;
4928 } else {
4929 /* We have both the objects, use strcoll */
4930 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4931 }
4932 } else {
4933 /* Compare elements directly */
4934 robj *dec1, *dec2;
4935
4936 dec1 = getDecodedObject(so1->obj);
4937 dec2 = getDecodedObject(so2->obj);
4938 cmp = strcoll(dec1->ptr,dec2->ptr);
4939 decrRefCount(dec1);
4940 decrRefCount(dec2);
4941 }
4942 }
4943 return server.sort_desc ? -cmp : cmp;
4944 }
4945
4946 /* The SORT command is the most complex command in Redis. Warning: this code
4947 * is optimized for speed and a bit less for readability */
4948 static void sortCommand(redisClient *c) {
4949 list *operations;
4950 int outputlen = 0;
4951 int desc = 0, alpha = 0;
4952 int limit_start = 0, limit_count = -1, start, end;
4953 int j, dontsort = 0, vectorlen;
4954 int getop = 0; /* GET operation counter */
4955 robj *sortval, *sortby = NULL, *storekey = NULL;
4956 redisSortObject *vector; /* Resulting vector to sort */
4957
4958 /* Lookup the key to sort. It must be of the right types */
4959 sortval = lookupKeyRead(c->db,c->argv[1]);
4960 if (sortval == NULL) {
4961 addReply(c,shared.nullmultibulk);
4962 return;
4963 }
4964 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4965 sortval->type != REDIS_ZSET)
4966 {
4967 addReply(c,shared.wrongtypeerr);
4968 return;
4969 }
4970
4971 /* Create a list of operations to perform for every sorted element.
4972 * Operations can be GET/DEL/INCR/DECR */
4973 operations = listCreate();
4974 listSetFreeMethod(operations,zfree);
4975 j = 2;
4976
4977 /* Now we need to protect sortval incrementing its count, in the future
4978 * SORT may have options able to overwrite/delete keys during the sorting
4979 * and the sorted key itself may get destroied */
4980 incrRefCount(sortval);
4981
4982 /* The SORT command has an SQL-alike syntax, parse it */
4983 while(j < c->argc) {
4984 int leftargs = c->argc-j-1;
4985 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4986 desc = 0;
4987 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4988 desc = 1;
4989 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4990 alpha = 1;
4991 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4992 limit_start = atoi(c->argv[j+1]->ptr);
4993 limit_count = atoi(c->argv[j+2]->ptr);
4994 j+=2;
4995 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4996 storekey = c->argv[j+1];
4997 j++;
4998 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4999 sortby = c->argv[j+1];
5000 /* If the BY pattern does not contain '*', i.e. it is constant,
5001 * we don't need to sort nor to lookup the weight keys. */
5002 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5003 j++;
5004 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5005 listAddNodeTail(operations,createSortOperation(
5006 REDIS_SORT_GET,c->argv[j+1]));
5007 getop++;
5008 j++;
5009 } else {
5010 decrRefCount(sortval);
5011 listRelease(operations);
5012 addReply(c,shared.syntaxerr);
5013 return;
5014 }
5015 j++;
5016 }
5017
5018 /* Load the sorting vector with all the objects to sort */
5019 switch(sortval->type) {
5020 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5021 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5022 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5023 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5024 }
5025 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5026 j = 0;
5027
5028 if (sortval->type == REDIS_LIST) {
5029 list *list = sortval->ptr;
5030 listNode *ln;
5031
5032 listRewind(list);
5033 while((ln = listYield(list))) {
5034 robj *ele = ln->value;
5035 vector[j].obj = ele;
5036 vector[j].u.score = 0;
5037 vector[j].u.cmpobj = NULL;
5038 j++;
5039 }
5040 } else {
5041 dict *set;
5042 dictIterator *di;
5043 dictEntry *setele;
5044
5045 if (sortval->type == REDIS_SET) {
5046 set = sortval->ptr;
5047 } else {
5048 zset *zs = sortval->ptr;
5049 set = zs->dict;
5050 }
5051
5052 di = dictGetIterator(set);
5053 while((setele = dictNext(di)) != NULL) {
5054 vector[j].obj = dictGetEntryKey(setele);
5055 vector[j].u.score = 0;
5056 vector[j].u.cmpobj = NULL;
5057 j++;
5058 }
5059 dictReleaseIterator(di);
5060 }
5061 redisAssert(j == vectorlen);
5062
5063 /* Now it's time to load the right scores in the sorting vector */
5064 if (dontsort == 0) {
5065 for (j = 0; j < vectorlen; j++) {
5066 if (sortby) {
5067 robj *byval;
5068
5069 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5070 if (!byval || byval->type != REDIS_STRING) continue;
5071 if (alpha) {
5072 vector[j].u.cmpobj = getDecodedObject(byval);
5073 } else {
5074 if (byval->encoding == REDIS_ENCODING_RAW) {
5075 vector[j].u.score = strtod(byval->ptr,NULL);
5076 } else {
5077 /* Don't need to decode the object if it's
5078 * integer-encoded (the only encoding supported) so
5079 * far. We can just cast it */
5080 if (byval->encoding == REDIS_ENCODING_INT) {
5081 vector[j].u.score = (long)byval->ptr;
5082 } else
5083 redisAssert(1 != 1);
5084 }
5085 }
5086 } else {
5087 if (!alpha) {
5088 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5089 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5090 else {
5091 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5092 vector[j].u.score = (long) vector[j].obj->ptr;
5093 else
5094 redisAssert(1 != 1);
5095 }
5096 }
5097 }
5098 }
5099 }
5100
5101 /* We are ready to sort the vector... perform a bit of sanity check
5102 * on the LIMIT option too. We'll use a partial version of quicksort. */
5103 start = (limit_start < 0) ? 0 : limit_start;
5104 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5105 if (start >= vectorlen) {
5106 start = vectorlen-1;
5107 end = vectorlen-2;
5108 }
5109 if (end >= vectorlen) end = vectorlen-1;
5110
5111 if (dontsort == 0) {
5112 server.sort_desc = desc;
5113 server.sort_alpha = alpha;
5114 server.sort_bypattern = sortby ? 1 : 0;
5115 if (sortby && (start != 0 || end != vectorlen-1))
5116 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5117 else
5118 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5119 }
5120
5121 /* Send command output to the output buffer, performing the specified
5122 * GET/DEL/INCR/DECR operations if any. */
5123 outputlen = getop ? getop*(end-start+1) : end-start+1;
5124 if (storekey == NULL) {
5125 /* STORE option not specified, sent the sorting result to client */
5126 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5127 for (j = start; j <= end; j++) {
5128 listNode *ln;
5129 if (!getop) {
5130 addReplyBulkLen(c,vector[j].obj);
5131 addReply(c,vector[j].obj);
5132 addReply(c,shared.crlf);
5133 }
5134 listRewind(operations);
5135 while((ln = listYield(operations))) {
5136 redisSortOperation *sop = ln->value;
5137 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5138 vector[j].obj);
5139
5140 if (sop->type == REDIS_SORT_GET) {
5141 if (!val || val->type != REDIS_STRING) {
5142 addReply(c,shared.nullbulk);
5143 } else {
5144 addReplyBulkLen(c,val);
5145 addReply(c,val);
5146 addReply(c,shared.crlf);
5147 }
5148 } else {
5149 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5150 }
5151 }
5152 }
5153 } else {
5154 robj *listObject = createListObject();
5155 list *listPtr = (list*) listObject->ptr;
5156
5157 /* STORE option specified, set the sorting result as a List object */
5158 for (j = start; j <= end; j++) {
5159 listNode *ln;
5160 if (!getop) {
5161 listAddNodeTail(listPtr,vector[j].obj);
5162 incrRefCount(vector[j].obj);
5163 }
5164 listRewind(operations);
5165 while((ln = listYield(operations))) {
5166 redisSortOperation *sop = ln->value;
5167 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5168 vector[j].obj);
5169
5170 if (sop->type == REDIS_SORT_GET) {
5171 if (!val || val->type != REDIS_STRING) {
5172 listAddNodeTail(listPtr,createStringObject("",0));
5173 } else {
5174 listAddNodeTail(listPtr,val);
5175 incrRefCount(val);
5176 }
5177 } else {
5178 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5179 }
5180 }
5181 }
5182 if (dictReplace(c->db->dict,storekey,listObject)) {
5183 incrRefCount(storekey);
5184 }
5185 /* Note: we add 1 because the DB is dirty anyway since even if the
5186 * SORT result is empty a new key is set and maybe the old content
5187 * replaced. */
5188 server.dirty += 1+outputlen;
5189 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5190 }
5191
5192 /* Cleanup */
5193 decrRefCount(sortval);
5194 listRelease(operations);
5195 for (j = 0; j < vectorlen; j++) {
5196 if (sortby && alpha && vector[j].u.cmpobj)
5197 decrRefCount(vector[j].u.cmpobj);
5198 }
5199 zfree(vector);
5200 }
5201
5202 /* Create the string returned by the INFO command. This is decoupled
5203 * by the INFO command itself as we need to report the same information
5204 * on memory corruption problems. */
5205 static sds genRedisInfoString(void) {
5206 sds info;
5207 time_t uptime = time(NULL)-server.stat_starttime;
5208 int j;
5209
5210 info = sdscatprintf(sdsempty(),
5211 "redis_version:%s\r\n"
5212 "arch_bits:%s\r\n"
5213 "multiplexing_api:%s\r\n"
5214 "uptime_in_seconds:%ld\r\n"
5215 "uptime_in_days:%ld\r\n"
5216 "connected_clients:%d\r\n"
5217 "connected_slaves:%d\r\n"
5218 "blocked_clients:%d\r\n"
5219 "used_memory:%zu\r\n"
5220 "changes_since_last_save:%lld\r\n"
5221 "bgsave_in_progress:%d\r\n"
5222 "last_save_time:%ld\r\n"
5223 "bgrewriteaof_in_progress:%d\r\n"
5224 "total_connections_received:%lld\r\n"
5225 "total_commands_processed:%lld\r\n"
5226 "role:%s\r\n"
5227 ,REDIS_VERSION,
5228 (sizeof(long) == 8) ? "64" : "32",
5229 aeGetApiName(),
5230 uptime,
5231 uptime/(3600*24),
5232 listLength(server.clients)-listLength(server.slaves),
5233 listLength(server.slaves),
5234 server.blockedclients,
5235 server.usedmemory,
5236 server.dirty,
5237 server.bgsavechildpid != -1,
5238 server.lastsave,
5239 server.bgrewritechildpid != -1,
5240 server.stat_numconnections,
5241 server.stat_numcommands,
5242 server.masterhost == NULL ? "master" : "slave"
5243 );
5244 if (server.masterhost) {
5245 info = sdscatprintf(info,
5246 "master_host:%s\r\n"
5247 "master_port:%d\r\n"
5248 "master_link_status:%s\r\n"
5249 "master_last_io_seconds_ago:%d\r\n"
5250 ,server.masterhost,
5251 server.masterport,
5252 (server.replstate == REDIS_REPL_CONNECTED) ?
5253 "up" : "down",
5254 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5255 );
5256 }
5257 for (j = 0; j < server.dbnum; j++) {
5258 long long keys, vkeys;
5259
5260 keys = dictSize(server.db[j].dict);
5261 vkeys = dictSize(server.db[j].expires);
5262 if (keys || vkeys) {
5263 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5264 j, keys, vkeys);
5265 }
5266 }
5267 return info;
5268 }
5269
5270 static void infoCommand(redisClient *c) {
5271 sds info = genRedisInfoString();
5272 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5273 (unsigned long)sdslen(info)));
5274 addReplySds(c,info);
5275 addReply(c,shared.crlf);
5276 }
5277
5278 static void monitorCommand(redisClient *c) {
5279 /* ignore MONITOR if aleady slave or in monitor mode */
5280 if (c->flags & REDIS_SLAVE) return;
5281
5282 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5283 c->slaveseldb = 0;
5284 listAddNodeTail(server.monitors,c);
5285 addReply(c,shared.ok);
5286 }
5287
5288 /* ================================= Expire ================================= */
5289 static int removeExpire(redisDb *db, robj *key) {
5290 if (dictDelete(db->expires,key) == DICT_OK) {
5291 return 1;
5292 } else {
5293 return 0;
5294 }
5295 }
5296
5297 static int setExpire(redisDb *db, robj *key, time_t when) {
5298 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5299 return 0;
5300 } else {
5301 incrRefCount(key);
5302 return 1;
5303 }
5304 }
5305
5306 /* Return the expire time of the specified key, or -1 if no expire
5307 * is associated with this key (i.e. the key is non volatile) */
5308 static time_t getExpire(redisDb *db, robj *key) {
5309 dictEntry *de;
5310
5311 /* No expire? return ASAP */
5312 if (dictSize(db->expires) == 0 ||
5313 (de = dictFind(db->expires,key)) == NULL) return -1;
5314
5315 return (time_t) dictGetEntryVal(de);
5316 }
5317
5318 static int expireIfNeeded(redisDb *db, robj *key) {
5319 time_t when;
5320 dictEntry *de;
5321
5322 /* No expire? return ASAP */
5323 if (dictSize(db->expires) == 0 ||
5324 (de = dictFind(db->expires,key)) == NULL) return 0;
5325
5326 /* Lookup the expire */
5327 when = (time_t) dictGetEntryVal(de);
5328 if (time(NULL) <= when) return 0;
5329
5330 /* Delete the key */
5331 dictDelete(db->expires,key);
5332 return dictDelete(db->dict,key) == DICT_OK;
5333 }
5334
5335 static int deleteIfVolatile(redisDb *db, robj *key) {
5336 dictEntry *de;
5337
5338 /* No expire? return ASAP */
5339 if (dictSize(db->expires) == 0 ||
5340 (de = dictFind(db->expires,key)) == NULL) return 0;
5341
5342 /* Delete the key */
5343 server.dirty++;
5344 dictDelete(db->expires,key);
5345 return dictDelete(db->dict,key) == DICT_OK;
5346 }
5347
5348 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5349 dictEntry *de;
5350
5351 de = dictFind(c->db->dict,key);
5352 if (de == NULL) {
5353 addReply(c,shared.czero);
5354 return;
5355 }
5356 if (seconds < 0) {
5357 if (deleteKey(c->db,key)) server.dirty++;
5358 addReply(c, shared.cone);
5359 return;
5360 } else {
5361 time_t when = time(NULL)+seconds;
5362 if (setExpire(c->db,key,when)) {
5363 addReply(c,shared.cone);
5364 server.dirty++;
5365 } else {
5366 addReply(c,shared.czero);
5367 }
5368 return;
5369 }
5370 }
5371
5372 static void expireCommand(redisClient *c) {
5373 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5374 }
5375
5376 static void expireatCommand(redisClient *c) {
5377 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5378 }
5379
5380 static void ttlCommand(redisClient *c) {
5381 time_t expire;
5382 int ttl = -1;
5383
5384 expire = getExpire(c->db,c->argv[1]);
5385 if (expire != -1) {
5386 ttl = (int) (expire-time(NULL));
5387 if (ttl < 0) ttl = -1;
5388 }
5389 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5390 }
5391
5392 /* ================================ MULTI/EXEC ============================== */
5393
5394 /* Client state initialization for MULTI/EXEC */
5395 static void initClientMultiState(redisClient *c) {
5396 c->mstate.commands = NULL;
5397 c->mstate.count = 0;
5398 }
5399
5400 /* Release all the resources associated with MULTI/EXEC state */
5401 static void freeClientMultiState(redisClient *c) {
5402 int j;
5403
5404 for (j = 0; j < c->mstate.count; j++) {
5405 int i;
5406 multiCmd *mc = c->mstate.commands+j;
5407
5408 for (i = 0; i < mc->argc; i++)
5409 decrRefCount(mc->argv[i]);
5410 zfree(mc->argv);
5411 }
5412 zfree(c->mstate.commands);
5413 }
5414
5415 /* Add a new command into the MULTI commands queue */
5416 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5417 multiCmd *mc;
5418 int j;
5419
5420 c->mstate.commands = zrealloc(c->mstate.commands,
5421 sizeof(multiCmd)*(c->mstate.count+1));
5422 mc = c->mstate.commands+c->mstate.count;
5423 mc->cmd = cmd;
5424 mc->argc = c->argc;
5425 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5426 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5427 for (j = 0; j < c->argc; j++)
5428 incrRefCount(mc->argv[j]);
5429 c->mstate.count++;
5430 }
5431
5432 static void multiCommand(redisClient *c) {
5433 c->flags |= REDIS_MULTI;
5434 addReply(c,shared.ok);
5435 }
5436
5437 static void execCommand(redisClient *c) {
5438 int j;
5439 robj **orig_argv;
5440 int orig_argc;
5441
5442 if (!(c->flags & REDIS_MULTI)) {
5443 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5444 return;
5445 }
5446
5447 orig_argv = c->argv;
5448 orig_argc = c->argc;
5449 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5450 for (j = 0; j < c->mstate.count; j++) {
5451 c->argc = c->mstate.commands[j].argc;
5452 c->argv = c->mstate.commands[j].argv;
5453 call(c,c->mstate.commands[j].cmd);
5454 }
5455 c->argv = orig_argv;
5456 c->argc = orig_argc;
5457 freeClientMultiState(c);
5458 initClientMultiState(c);
5459 c->flags &= (~REDIS_MULTI);
5460 }
5461
5462 /* =========================== Blocking Operations ========================= */
5463
5464 /* Currently Redis blocking operations support is limited to list POP ops,
5465 * so the current implementation is not fully generic, but it is also not
5466 * completely specific so it will not require a rewrite to support new
5467 * kind of blocking operations in the future.
5468 *
5469 * Still it's important to note that list blocking operations can be already
5470 * used as a notification mechanism in order to implement other blocking
5471 * operations at application level, so there must be a very strong evidence
5472 * of usefulness and generality before new blocking operations are implemented.
5473 *
5474 * This is how the current blocking POP works, we use BLPOP as example:
5475 * - If the user calls BLPOP and the key exists and contains a non empty list
5476 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5477 * if there is not to block.
5478 * - If instead BLPOP is called and the key does not exists or the list is
5479 * empty we need to block. In order to do so we remove the notification for
5480 * new data to read in the client socket (so that we'll not serve new
5481 * requests if the blocking request is not served). Also we put the client
5482 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5483 * blocking for this keys.
5484 * - If a PUSH operation against a key with blocked clients waiting is
5485 * performed, we serve the first in the list: basically instead to push
5486 * the new element inside the list we return it to the (first / oldest)
5487 * blocking client, unblock the client, and remove it form the list.
5488 *
5489 * The above comment and the source code should be enough in order to understand
5490 * the implementation and modify / fix it later.
5491 */
5492
5493 /* Set a client in blocking mode for the specified key, with the specified
5494 * timeout */
5495 static void blockForKey(redisClient *c, robj *key, time_t timeout) {
5496 dictEntry *de;
5497 list *l;
5498
5499 c->blockingkey = key;
5500 incrRefCount(key);
5501 c->blockingto = timeout;
5502 de = dictFind(c->db->blockingkeys,key);
5503 if (de == NULL) {
5504 int retval;
5505
5506 /* We take a list of clients blocked for a given key */
5507 l = listCreate();
5508 retval = dictAdd(c->db->blockingkeys,key,l);
5509 incrRefCount(key);
5510 assert(retval == DICT_OK);
5511 } else {
5512 l = dictGetEntryVal(de);
5513 }
5514 /* Add this client to the list, and mark it as blocked */
5515 listAddNodeTail(l,c);
5516 c->flags |= REDIS_BLOCKED;
5517 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5518 server.blockedclients++;
5519 }
5520
5521 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5522 static void unblockClient(redisClient *c) {
5523 dictEntry *de;
5524 list *l;
5525
5526 /* Remove this client from the list of clients waiting for this key. */
5527 assert(c->blockingkey != NULL);
5528 de = dictFind(c->db->blockingkeys,c->blockingkey);
5529 assert(de != NULL);
5530 l = dictGetEntryVal(de);
5531 listDelNode(l,listSearchKey(l,c));
5532 /* If the list is empty we need to remove it to avoid wasting memory */
5533 if (listLength(l) == 0)
5534 dictDelete(c->db->blockingkeys,c->blockingkey);
5535 /* Finally set the right flags in the client structure */
5536 decrRefCount(c->blockingkey);
5537 c->blockingkey = NULL;
5538 c->flags &= (~REDIS_BLOCKED);
5539 server.blockedclients--;
5540 /* Ok now we are ready to get read events from socket, note that we
5541 * can't trap errors here as it's possible that unblockClients() is
5542 * called from freeClient() itself, and the only thing we can do
5543 * if we failed to register the READABLE event is to kill the client.
5544 * Still the following function should never fail in the real world as
5545 * we are sure the file descriptor is sane, and we exit on out of mem. */
5546 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5547 /* As a final step we want to process data if there is some command waiting
5548 * in the input buffer. Note that this is safe even if unblockClient()
5549 * gets called from freeClient() because freeClient() will be smart
5550 * enough to call this function *after* c->querybuf was set to NULL. */
5551 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5552 }
5553
5554 /* This should be called from any function PUSHing into lists.
5555 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5556 * 'ele' is the element pushed.
5557 *
5558 * If the function returns 0 there was no client waiting for a list push
5559 * against this key.
5560 *
5561 * If the function returns 1 there was a client waiting for a list push
5562 * against this key, the element was passed to this client thus it's not
5563 * needed to actually add it to the list and the caller should return asap. */
5564 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5565 struct dictEntry *de;
5566 redisClient *receiver;
5567 list *l;
5568 listNode *ln;
5569
5570 de = dictFind(c->db->blockingkeys,key);
5571 if (de == NULL) return 0;
5572 l = dictGetEntryVal(de);
5573 ln = listFirst(l);
5574 assert(ln != NULL);
5575 receiver = ln->value;
5576
5577 addReplyBulkLen(receiver,ele);
5578 addReply(receiver,ele);
5579 addReply(receiver,shared.crlf);
5580 unblockClient(receiver);
5581 return 1;
5582 }
5583
5584 /* Blocking RPOP/LPOP */
5585 static void blockingPopGenericCommand(redisClient *c, int where) {
5586 robj *o;
5587 time_t timeout;
5588
5589 o = lookupKeyWrite(c->db,c->argv[1]);
5590 if (o != NULL) {
5591 if (o->type != REDIS_LIST) {
5592 popGenericCommand(c,where);
5593 return;
5594 } else {
5595 list *list = o->ptr;
5596 if (listLength(list) != 0) {
5597 /* If the list contains elements fall back to the usual
5598 * non-blocking POP operation */
5599 popGenericCommand(c,where);
5600 return;
5601 }
5602 }
5603 }
5604 /* If the list is empty or the key does not exists we must block */
5605 timeout = strtol(c->argv[2]->ptr,NULL,10);
5606 if (timeout > 0) timeout += time(NULL);
5607 blockForKey(c,c->argv[1],timeout);
5608 }
5609
5610 static void blpopCommand(redisClient *c) {
5611 blockingPopGenericCommand(c,REDIS_HEAD);
5612 }
5613
5614 static void brpopCommand(redisClient *c) {
5615 blockingPopGenericCommand(c,REDIS_TAIL);
5616 }
5617
5618 /* =============================== Replication ============================= */
5619
5620 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5621 ssize_t nwritten, ret = size;
5622 time_t start = time(NULL);
5623
5624 timeout++;
5625 while(size) {
5626 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5627 nwritten = write(fd,ptr,size);
5628 if (nwritten == -1) return -1;
5629 ptr += nwritten;
5630 size -= nwritten;
5631 }
5632 if ((time(NULL)-start) > timeout) {
5633 errno = ETIMEDOUT;
5634 return -1;
5635 }
5636 }
5637 return ret;
5638 }
5639
5640 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5641 ssize_t nread, totread = 0;
5642 time_t start = time(NULL);
5643
5644 timeout++;
5645 while(size) {
5646 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5647 nread = read(fd,ptr,size);
5648 if (nread == -1) return -1;
5649 ptr += nread;
5650 size -= nread;
5651 totread += nread;
5652 }
5653 if ((time(NULL)-start) > timeout) {
5654 errno = ETIMEDOUT;
5655 return -1;
5656 }
5657 }
5658 return totread;
5659 }
5660
5661 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5662 ssize_t nread = 0;
5663
5664 size--;
5665 while(size) {
5666 char c;
5667
5668 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5669 if (c == '\n') {
5670 *ptr = '\0';
5671 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5672 return nread;
5673 } else {
5674 *ptr++ = c;
5675 *ptr = '\0';
5676 nread++;
5677 }
5678 }
5679 return nread;
5680 }
5681
5682 static void syncCommand(redisClient *c) {
5683 /* ignore SYNC if aleady slave or in monitor mode */
5684 if (c->flags & REDIS_SLAVE) return;
5685
5686 /* SYNC can't be issued when the server has pending data to send to
5687 * the client about already issued commands. We need a fresh reply
5688 * buffer registering the differences between the BGSAVE and the current
5689 * dataset, so that we can copy to other slaves if needed. */
5690 if (listLength(c->reply) != 0) {
5691 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5692 return;
5693 }
5694
5695 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5696 /* Here we need to check if there is a background saving operation
5697 * in progress, or if it is required to start one */
5698 if (server.bgsavechildpid != -1) {
5699 /* Ok a background save is in progress. Let's check if it is a good
5700 * one for replication, i.e. if there is another slave that is
5701 * registering differences since the server forked to save */
5702 redisClient *slave;
5703 listNode *ln;
5704
5705 listRewind(server.slaves);
5706 while((ln = listYield(server.slaves))) {
5707 slave = ln->value;
5708 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5709 }
5710 if (ln) {
5711 /* Perfect, the server is already registering differences for
5712 * another slave. Set the right state, and copy the buffer. */
5713 listRelease(c->reply);
5714 c->reply = listDup(slave->reply);
5715 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5716 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5717 } else {
5718 /* No way, we need to wait for the next BGSAVE in order to
5719 * register differences */
5720 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5721 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5722 }
5723 } else {
5724 /* Ok we don't have a BGSAVE in progress, let's start one */
5725 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5726 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5727 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5728 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5729 return;
5730 }
5731 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5732 }
5733 c->repldbfd = -1;
5734 c->flags |= REDIS_SLAVE;
5735 c->slaveseldb = 0;
5736 listAddNodeTail(server.slaves,c);
5737 return;
5738 }
5739
5740 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5741 redisClient *slave = privdata;
5742 REDIS_NOTUSED(el);
5743 REDIS_NOTUSED(mask);
5744 char buf[REDIS_IOBUF_LEN];
5745 ssize_t nwritten, buflen;
5746
5747 if (slave->repldboff == 0) {
5748 /* Write the bulk write count before to transfer the DB. In theory here
5749 * we don't know how much room there is in the output buffer of the
5750 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5751 * operations) will never be smaller than the few bytes we need. */
5752 sds bulkcount;
5753
5754 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5755 slave->repldbsize);
5756 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5757 {
5758 sdsfree(bulkcount);
5759 freeClient(slave);
5760 return;
5761 }
5762 sdsfree(bulkcount);
5763 }
5764 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5765 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5766 if (buflen <= 0) {
5767 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5768 (buflen == 0) ? "premature EOF" : strerror(errno));
5769 freeClient(slave);
5770 return;
5771 }
5772 if ((nwritten = write(fd,buf,buflen)) == -1) {
5773 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5774 strerror(errno));
5775 freeClient(slave);
5776 return;
5777 }
5778 slave->repldboff += nwritten;
5779 if (slave->repldboff == slave->repldbsize) {
5780 close(slave->repldbfd);
5781 slave->repldbfd = -1;
5782 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5783 slave->replstate = REDIS_REPL_ONLINE;
5784 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5785 sendReplyToClient, slave) == AE_ERR) {
5786 freeClient(slave);
5787 return;
5788 }
5789 addReplySds(slave,sdsempty());
5790 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5791 }
5792 }
5793
5794 /* This function is called at the end of every backgrond saving.
5795 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5796 * otherwise REDIS_ERR is passed to the function.
5797 *
5798 * The goal of this function is to handle slaves waiting for a successful
5799 * background saving in order to perform non-blocking synchronization. */
5800 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5801 listNode *ln;
5802 int startbgsave = 0;
5803
5804 listRewind(server.slaves);
5805 while((ln = listYield(server.slaves))) {
5806 redisClient *slave = ln->value;
5807
5808 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5809 startbgsave = 1;
5810 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5811 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5812 struct redis_stat buf;
5813
5814 if (bgsaveerr != REDIS_OK) {
5815 freeClient(slave);
5816 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5817 continue;
5818 }
5819 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5820 redis_fstat(slave->repldbfd,&buf) == -1) {
5821 freeClient(slave);
5822 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5823 continue;
5824 }
5825 slave->repldboff = 0;
5826 slave->repldbsize = buf.st_size;
5827 slave->replstate = REDIS_REPL_SEND_BULK;
5828 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5829 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5830 freeClient(slave);
5831 continue;
5832 }
5833 }
5834 }
5835 if (startbgsave) {
5836 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5837 listRewind(server.slaves);
5838 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5839 while((ln = listYield(server.slaves))) {
5840 redisClient *slave = ln->value;
5841
5842 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5843 freeClient(slave);
5844 }
5845 }
5846 }
5847 }
5848
5849 static int syncWithMaster(void) {
5850 char buf[1024], tmpfile[256], authcmd[1024];
5851 int dumpsize;
5852 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5853 int dfd;
5854
5855 if (fd == -1) {
5856 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5857 strerror(errno));
5858 return REDIS_ERR;
5859 }
5860
5861 /* AUTH with the master if required. */
5862 if(server.masterauth) {
5863 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5864 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5865 close(fd);
5866 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5867 strerror(errno));
5868 return REDIS_ERR;
5869 }
5870 /* Read the AUTH result. */
5871 if (syncReadLine(fd,buf,1024,3600) == -1) {
5872 close(fd);
5873 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5874 strerror(errno));
5875 return REDIS_ERR;
5876 }
5877 if (buf[0] != '+') {
5878 close(fd);
5879 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5880 return REDIS_ERR;
5881 }
5882 }
5883
5884 /* Issue the SYNC command */
5885 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5886 close(fd);
5887 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5888 strerror(errno));
5889 return REDIS_ERR;
5890 }
5891 /* Read the bulk write count */
5892 if (syncReadLine(fd,buf,1024,3600) == -1) {
5893 close(fd);
5894 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5895 strerror(errno));
5896 return REDIS_ERR;
5897 }
5898 if (buf[0] != '$') {
5899 close(fd);
5900 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5901 return REDIS_ERR;
5902 }
5903 dumpsize = atoi(buf+1);
5904 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5905 /* Read the bulk write data on a temp file */
5906 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5907 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5908 if (dfd == -1) {
5909 close(fd);
5910 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5911 return REDIS_ERR;
5912 }
5913 while(dumpsize) {
5914 int nread, nwritten;
5915
5916 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5917 if (nread == -1) {
5918 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5919 strerror(errno));
5920 close(fd);
5921 close(dfd);
5922 return REDIS_ERR;
5923 }
5924 nwritten = write(dfd,buf,nread);
5925 if (nwritten == -1) {
5926 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5927 close(fd);
5928 close(dfd);
5929 return REDIS_ERR;
5930 }
5931 dumpsize -= nread;
5932 }
5933 close(dfd);
5934 if (rename(tmpfile,server.dbfilename) == -1) {
5935 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5936 unlink(tmpfile);
5937 close(fd);
5938 return REDIS_ERR;
5939 }
5940 emptyDb();
5941 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5942 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5943 close(fd);
5944 return REDIS_ERR;
5945 }
5946 server.master = createClient(fd);
5947 server.master->flags |= REDIS_MASTER;
5948 server.master->authenticated = 1;
5949 server.replstate = REDIS_REPL_CONNECTED;
5950 return REDIS_OK;
5951 }
5952
5953 static void slaveofCommand(redisClient *c) {
5954 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5955 !strcasecmp(c->argv[2]->ptr,"one")) {
5956 if (server.masterhost) {
5957 sdsfree(server.masterhost);
5958 server.masterhost = NULL;
5959 if (server.master) freeClient(server.master);
5960 server.replstate = REDIS_REPL_NONE;
5961 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5962 }
5963 } else {
5964 sdsfree(server.masterhost);
5965 server.masterhost = sdsdup(c->argv[1]->ptr);
5966 server.masterport = atoi(c->argv[2]->ptr);
5967 if (server.master) freeClient(server.master);
5968 server.replstate = REDIS_REPL_CONNECT;
5969 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5970 server.masterhost, server.masterport);
5971 }
5972 addReply(c,shared.ok);
5973 }
5974
5975 /* ============================ Maxmemory directive ======================== */
5976
5977 /* This function gets called when 'maxmemory' is set on the config file to limit
5978 * the max memory used by the server, and we are out of memory.
5979 * This function will try to, in order:
5980 *
5981 * - Free objects from the free list
5982 * - Try to remove keys with an EXPIRE set
5983 *
5984 * It is not possible to free enough memory to reach used-memory < maxmemory
5985 * the server will start refusing commands that will enlarge even more the
5986 * memory usage.
5987 */
5988 static void freeMemoryIfNeeded(void) {
5989 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5990 if (listLength(server.objfreelist)) {
5991 robj *o;
5992
5993 listNode *head = listFirst(server.objfreelist);
5994 o = listNodeValue(head);
5995 listDelNode(server.objfreelist,head);
5996 zfree(o);
5997 } else {
5998 int j, k, freed = 0;
5999
6000 for (j = 0; j < server.dbnum; j++) {
6001 int minttl = -1;
6002 robj *minkey = NULL;
6003 struct dictEntry *de;
6004
6005 if (dictSize(server.db[j].expires)) {
6006 freed = 1;
6007 /* From a sample of three keys drop the one nearest to
6008 * the natural expire */
6009 for (k = 0; k < 3; k++) {
6010 time_t t;
6011
6012 de = dictGetRandomKey(server.db[j].expires);
6013 t = (time_t) dictGetEntryVal(de);
6014 if (minttl == -1 || t < minttl) {
6015 minkey = dictGetEntryKey(de);
6016 minttl = t;
6017 }
6018 }
6019 deleteKey(server.db+j,minkey);
6020 }
6021 }
6022 if (!freed) return; /* nothing to free... */
6023 }
6024 }
6025 }
6026
6027 /* ============================== Append Only file ========================== */
6028
6029 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6030 sds buf = sdsempty();
6031 int j;
6032 ssize_t nwritten;
6033 time_t now;
6034 robj *tmpargv[3];
6035
6036 /* The DB this command was targetting is not the same as the last command
6037 * we appendend. To issue a SELECT command is needed. */
6038 if (dictid != server.appendseldb) {
6039 char seldb[64];
6040
6041 snprintf(seldb,sizeof(seldb),"%d",dictid);
6042 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6043 (unsigned long)strlen(seldb),seldb);
6044 server.appendseldb = dictid;
6045 }
6046
6047 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6048 * EXPIREs into EXPIREATs calls */
6049 if (cmd->proc == expireCommand) {
6050 long when;
6051
6052 tmpargv[0] = createStringObject("EXPIREAT",8);
6053 tmpargv[1] = argv[1];
6054 incrRefCount(argv[1]);
6055 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6056 tmpargv[2] = createObject(REDIS_STRING,
6057 sdscatprintf(sdsempty(),"%ld",when));
6058 argv = tmpargv;
6059 }
6060
6061 /* Append the actual command */
6062 buf = sdscatprintf(buf,"*%d\r\n",argc);
6063 for (j = 0; j < argc; j++) {
6064 robj *o = argv[j];
6065
6066 o = getDecodedObject(o);
6067 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6068 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6069 buf = sdscatlen(buf,"\r\n",2);
6070 decrRefCount(o);
6071 }
6072
6073 /* Free the objects from the modified argv for EXPIREAT */
6074 if (cmd->proc == expireCommand) {
6075 for (j = 0; j < 3; j++)
6076 decrRefCount(argv[j]);
6077 }
6078
6079 /* We want to perform a single write. This should be guaranteed atomic
6080 * at least if the filesystem we are writing is a real physical one.
6081 * While this will save us against the server being killed I don't think
6082 * there is much to do about the whole server stopping for power problems
6083 * or alike */
6084 nwritten = write(server.appendfd,buf,sdslen(buf));
6085 if (nwritten != (signed)sdslen(buf)) {
6086 /* Ooops, we are in troubles. The best thing to do for now is
6087 * to simply exit instead to give the illusion that everything is
6088 * working as expected. */
6089 if (nwritten == -1) {
6090 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6091 } else {
6092 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6093 }
6094 exit(1);
6095 }
6096 /* If a background append only file rewriting is in progress we want to
6097 * accumulate the differences between the child DB and the current one
6098 * in a buffer, so that when the child process will do its work we
6099 * can append the differences to the new append only file. */
6100 if (server.bgrewritechildpid != -1)
6101 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6102
6103 sdsfree(buf);
6104 now = time(NULL);
6105 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6106 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6107 now-server.lastfsync > 1))
6108 {
6109 fsync(server.appendfd); /* Let's try to get this data on the disk */
6110 server.lastfsync = now;
6111 }
6112 }
6113
6114 /* In Redis commands are always executed in the context of a client, so in
6115 * order to load the append only file we need to create a fake client. */
6116 static struct redisClient *createFakeClient(void) {
6117 struct redisClient *c = zmalloc(sizeof(*c));
6118
6119 selectDb(c,0);
6120 c->fd = -1;
6121 c->querybuf = sdsempty();
6122 c->argc = 0;
6123 c->argv = NULL;
6124 c->flags = 0;
6125 /* We set the fake client as a slave waiting for the synchronization
6126 * so that Redis will not try to send replies to this client. */
6127 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6128 c->reply = listCreate();
6129 listSetFreeMethod(c->reply,decrRefCount);
6130 listSetDupMethod(c->reply,dupClientReplyValue);
6131 return c;
6132 }
6133
6134 static void freeFakeClient(struct redisClient *c) {
6135 sdsfree(c->querybuf);
6136 listRelease(c->reply);
6137 zfree(c);
6138 }
6139
6140 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6141 * error (the append only file is zero-length) REDIS_ERR is returned. On
6142 * fatal error an error message is logged and the program exists. */
6143 int loadAppendOnlyFile(char *filename) {
6144 struct redisClient *fakeClient;
6145 FILE *fp = fopen(filename,"r");
6146 struct redis_stat sb;
6147
6148 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6149 return REDIS_ERR;
6150
6151 if (fp == NULL) {
6152 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6153 exit(1);
6154 }
6155
6156 fakeClient = createFakeClient();
6157 while(1) {
6158 int argc, j;
6159 unsigned long len;
6160 robj **argv;
6161 char buf[128];
6162 sds argsds;
6163 struct redisCommand *cmd;
6164
6165 if (fgets(buf,sizeof(buf),fp) == NULL) {
6166 if (feof(fp))
6167 break;
6168 else
6169 goto readerr;
6170 }
6171 if (buf[0] != '*') goto fmterr;
6172 argc = atoi(buf+1);
6173 argv = zmalloc(sizeof(robj*)*argc);
6174 for (j = 0; j < argc; j++) {
6175 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6176 if (buf[0] != '$') goto fmterr;
6177 len = strtol(buf+1,NULL,10);
6178 argsds = sdsnewlen(NULL,len);
6179 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6180 argv[j] = createObject(REDIS_STRING,argsds);
6181 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6182 }
6183
6184 /* Command lookup */
6185 cmd = lookupCommand(argv[0]->ptr);
6186 if (!cmd) {
6187 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6188 exit(1);
6189 }
6190 /* Try object sharing and encoding */
6191 if (server.shareobjects) {
6192 int j;
6193 for(j = 1; j < argc; j++)
6194 argv[j] = tryObjectSharing(argv[j]);
6195 }
6196 if (cmd->flags & REDIS_CMD_BULK)
6197 tryObjectEncoding(argv[argc-1]);
6198 /* Run the command in the context of a fake client */
6199 fakeClient->argc = argc;
6200 fakeClient->argv = argv;
6201 cmd->proc(fakeClient);
6202 /* Discard the reply objects list from the fake client */
6203 while(listLength(fakeClient->reply))
6204 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6205 /* Clean up, ready for the next command */
6206 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6207 zfree(argv);
6208 }
6209 fclose(fp);
6210 freeFakeClient(fakeClient);
6211 return REDIS_OK;
6212
6213 readerr:
6214 if (feof(fp)) {
6215 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6216 } else {
6217 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6218 }
6219 exit(1);
6220 fmterr:
6221 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6222 exit(1);
6223 }
6224
6225 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6226 static int fwriteBulk(FILE *fp, robj *obj) {
6227 char buf[128];
6228 obj = getDecodedObject(obj);
6229 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6230 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6231 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6232 goto err;
6233 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6234 decrRefCount(obj);
6235 return 1;
6236 err:
6237 decrRefCount(obj);
6238 return 0;
6239 }
6240
6241 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6242 static int fwriteBulkDouble(FILE *fp, double d) {
6243 char buf[128], dbuf[128];
6244
6245 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6246 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6247 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6248 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6249 return 1;
6250 }
6251
6252 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6253 static int fwriteBulkLong(FILE *fp, long l) {
6254 char buf[128], lbuf[128];
6255
6256 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6257 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6258 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6259 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6260 return 1;
6261 }
6262
6263 /* Write a sequence of commands able to fully rebuild the dataset into
6264 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6265 static int rewriteAppendOnlyFile(char *filename) {
6266 dictIterator *di = NULL;
6267 dictEntry *de;
6268 FILE *fp;
6269 char tmpfile[256];
6270 int j;
6271 time_t now = time(NULL);
6272
6273 /* Note that we have to use a different temp name here compared to the
6274 * one used by rewriteAppendOnlyFileBackground() function. */
6275 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6276 fp = fopen(tmpfile,"w");
6277 if (!fp) {
6278 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6279 return REDIS_ERR;
6280 }
6281 for (j = 0; j < server.dbnum; j++) {
6282 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6283 redisDb *db = server.db+j;
6284 dict *d = db->dict;
6285 if (dictSize(d) == 0) continue;
6286 di = dictGetIterator(d);
6287 if (!di) {
6288 fclose(fp);
6289 return REDIS_ERR;
6290 }
6291
6292 /* SELECT the new DB */
6293 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6294 if (fwriteBulkLong(fp,j) == 0) goto werr;
6295
6296 /* Iterate this DB writing every entry */
6297 while((de = dictNext(di)) != NULL) {
6298 robj *key = dictGetEntryKey(de);
6299 robj *o = dictGetEntryVal(de);
6300 time_t expiretime = getExpire(db,key);
6301
6302 /* Save the key and associated value */
6303 if (o->type == REDIS_STRING) {
6304 /* Emit a SET command */
6305 char cmd[]="*3\r\n$3\r\nSET\r\n";
6306 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6307 /* Key and value */
6308 if (fwriteBulk(fp,key) == 0) goto werr;
6309 if (fwriteBulk(fp,o) == 0) goto werr;
6310 } else if (o->type == REDIS_LIST) {
6311 /* Emit the RPUSHes needed to rebuild the list */
6312 list *list = o->ptr;
6313 listNode *ln;
6314
6315 listRewind(list);
6316 while((ln = listYield(list))) {
6317 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6318 robj *eleobj = listNodeValue(ln);
6319
6320 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6321 if (fwriteBulk(fp,key) == 0) goto werr;
6322 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6323 }
6324 } else if (o->type == REDIS_SET) {
6325 /* Emit the SADDs needed to rebuild the set */
6326 dict *set = o->ptr;
6327 dictIterator *di = dictGetIterator(set);
6328 dictEntry *de;
6329
6330 while((de = dictNext(di)) != NULL) {
6331 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6332 robj *eleobj = dictGetEntryKey(de);
6333
6334 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6335 if (fwriteBulk(fp,key) == 0) goto werr;
6336 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6337 }
6338 dictReleaseIterator(di);
6339 } else if (o->type == REDIS_ZSET) {
6340 /* Emit the ZADDs needed to rebuild the sorted set */
6341 zset *zs = o->ptr;
6342 dictIterator *di = dictGetIterator(zs->dict);
6343 dictEntry *de;
6344
6345 while((de = dictNext(di)) != NULL) {
6346 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6347 robj *eleobj = dictGetEntryKey(de);
6348 double *score = dictGetEntryVal(de);
6349
6350 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6351 if (fwriteBulk(fp,key) == 0) goto werr;
6352 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6353 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6354 }
6355 dictReleaseIterator(di);
6356 } else {
6357 redisAssert(0 != 0);
6358 }
6359 /* Save the expire time */
6360 if (expiretime != -1) {
6361 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6362 /* If this key is already expired skip it */
6363 if (expiretime < now) continue;
6364 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6365 if (fwriteBulk(fp,key) == 0) goto werr;
6366 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6367 }
6368 }
6369 dictReleaseIterator(di);
6370 }
6371
6372 /* Make sure data will not remain on the OS's output buffers */
6373 fflush(fp);
6374 fsync(fileno(fp));
6375 fclose(fp);
6376
6377 /* Use RENAME to make sure the DB file is changed atomically only
6378 * if the generate DB file is ok. */
6379 if (rename(tmpfile,filename) == -1) {
6380 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6381 unlink(tmpfile);
6382 return REDIS_ERR;
6383 }
6384 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6385 return REDIS_OK;
6386
6387 werr:
6388 fclose(fp);
6389 unlink(tmpfile);
6390 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6391 if (di) dictReleaseIterator(di);
6392 return REDIS_ERR;
6393 }
6394
6395 /* This is how rewriting of the append only file in background works:
6396 *
6397 * 1) The user calls BGREWRITEAOF
6398 * 2) Redis calls this function, that forks():
6399 * 2a) the child rewrite the append only file in a temp file.
6400 * 2b) the parent accumulates differences in server.bgrewritebuf.
6401 * 3) When the child finished '2a' exists.
6402 * 4) The parent will trap the exit code, if it's OK, will append the
6403 * data accumulated into server.bgrewritebuf into the temp file, and
6404 * finally will rename(2) the temp file in the actual file name.
6405 * The the new file is reopened as the new append only file. Profit!
6406 */
6407 static int rewriteAppendOnlyFileBackground(void) {
6408 pid_t childpid;
6409
6410 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6411 if ((childpid = fork()) == 0) {
6412 /* Child */
6413 char tmpfile[256];
6414 close(server.fd);
6415
6416 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6417 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6418 exit(0);
6419 } else {
6420 exit(1);
6421 }
6422 } else {
6423 /* Parent */
6424 if (childpid == -1) {
6425 redisLog(REDIS_WARNING,
6426 "Can't rewrite append only file in background: fork: %s",
6427 strerror(errno));
6428 return REDIS_ERR;
6429 }
6430 redisLog(REDIS_NOTICE,
6431 "Background append only file rewriting started by pid %d",childpid);
6432 server.bgrewritechildpid = childpid;
6433 /* We set appendseldb to -1 in order to force the next call to the
6434 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6435 * accumulated by the parent into server.bgrewritebuf will start
6436 * with a SELECT statement and it will be safe to merge. */
6437 server.appendseldb = -1;
6438 return REDIS_OK;
6439 }
6440 return REDIS_OK; /* unreached */
6441 }
6442
6443 static void bgrewriteaofCommand(redisClient *c) {
6444 if (server.bgrewritechildpid != -1) {
6445 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6446 return;
6447 }
6448 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6449 char *status = "+Background append only file rewriting started\r\n";
6450 addReplySds(c,sdsnew(status));
6451 } else {
6452 addReply(c,shared.err);
6453 }
6454 }
6455
6456 static void aofRemoveTempFile(pid_t childpid) {
6457 char tmpfile[256];
6458
6459 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6460 unlink(tmpfile);
6461 }
6462
6463 /* ================================= Debugging ============================== */
6464
6465 static void debugCommand(redisClient *c) {
6466 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6467 *((char*)-1) = 'x';
6468 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6469 if (rdbSave(server.dbfilename) != REDIS_OK) {
6470 addReply(c,shared.err);
6471 return;
6472 }
6473 emptyDb();
6474 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6475 addReply(c,shared.err);
6476 return;
6477 }
6478 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6479 addReply(c,shared.ok);
6480 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6481 emptyDb();
6482 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6483 addReply(c,shared.err);
6484 return;
6485 }
6486 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6487 addReply(c,shared.ok);
6488 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6489 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6490 robj *key, *val;
6491
6492 if (!de) {
6493 addReply(c,shared.nokeyerr);
6494 return;
6495 }
6496 key = dictGetEntryKey(de);
6497 val = dictGetEntryVal(de);
6498 addReplySds(c,sdscatprintf(sdsempty(),
6499 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6500 (void*)key, key->refcount, (void*)val, val->refcount,
6501 val->encoding));
6502 } else {
6503 addReplySds(c,sdsnew(
6504 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6505 }
6506 }
6507
6508 static void _redisAssert(char *estr) {
6509 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6510 redisLog(REDIS_WARNING,"==> %s\n",estr);
6511 #ifdef HAVE_BACKTRACE
6512 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6513 *((char*)-1) = 'x';
6514 #endif
6515 }
6516
6517 /* =================================== Main! ================================ */
6518
6519 #ifdef __linux__
6520 int linuxOvercommitMemoryValue(void) {
6521 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6522 char buf[64];
6523
6524 if (!fp) return -1;
6525 if (fgets(buf,64,fp) == NULL) {
6526 fclose(fp);
6527 return -1;
6528 }
6529 fclose(fp);
6530
6531 return atoi(buf);
6532 }
6533
6534 void linuxOvercommitMemoryWarning(void) {
6535 if (linuxOvercommitMemoryValue() == 0) {
6536 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6537 }
6538 }
6539 #endif /* __linux__ */
6540
6541 static void daemonize(void) {
6542 int fd;
6543 FILE *fp;
6544
6545 if (fork() != 0) exit(0); /* parent exits */
6546 printf("New pid: %d\n", getpid());
6547 setsid(); /* create a new session */
6548
6549 /* Every output goes to /dev/null. If Redis is daemonized but
6550 * the 'logfile' is set to 'stdout' in the configuration file
6551 * it will not log at all. */
6552 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6553 dup2(fd, STDIN_FILENO);
6554 dup2(fd, STDOUT_FILENO);
6555 dup2(fd, STDERR_FILENO);
6556 if (fd > STDERR_FILENO) close(fd);
6557 }
6558 /* Try to write the pid file */
6559 fp = fopen(server.pidfile,"w");
6560 if (fp) {
6561 fprintf(fp,"%d\n",getpid());
6562 fclose(fp);
6563 }
6564 }
6565
6566 int main(int argc, char **argv) {
6567 initServerConfig();
6568 if (argc == 2) {
6569 resetServerSaveParams();
6570 loadServerConfig(argv[1]);
6571 } else if (argc > 2) {
6572 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6573 exit(1);
6574 } else {
6575 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6576 }
6577 if (server.daemonize) daemonize();
6578 initServer();
6579 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6580 #ifdef __linux__
6581 linuxOvercommitMemoryWarning();
6582 #endif
6583 if (server.appendonly) {
6584 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6585 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6586 } else {
6587 if (rdbLoad(server.dbfilename) == REDIS_OK)
6588 redisLog(REDIS_NOTICE,"DB loaded from disk");
6589 }
6590 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6591 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6592 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6593 aeMain(server.el);
6594 aeDeleteEventLoop(server.el);
6595 return 0;
6596 }
6597
6598 /* ============================= Backtrace support ========================= */
6599
6600 #ifdef HAVE_BACKTRACE
6601 static char *findFuncName(void *pointer, unsigned long *offset);
6602
6603 static void *getMcontextEip(ucontext_t *uc) {
6604 #if defined(__FreeBSD__)
6605 return (void*) uc->uc_mcontext.mc_eip;
6606 #elif defined(__dietlibc__)
6607 return (void*) uc->uc_mcontext.eip;
6608 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6609 #if __x86_64__
6610 return (void*) uc->uc_mcontext->__ss.__rip;
6611 #else
6612 return (void*) uc->uc_mcontext->__ss.__eip;
6613 #endif
6614 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6615 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6616 return (void*) uc->uc_mcontext->__ss.__rip;
6617 #else
6618 return (void*) uc->uc_mcontext->__ss.__eip;
6619 #endif
6620 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6621 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6622 #elif defined(__ia64__) /* Linux IA64 */
6623 return (void*) uc->uc_mcontext.sc_ip;
6624 #else
6625 return NULL;
6626 #endif
6627 }
6628
6629 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6630 void *trace[100];
6631 char **messages = NULL;
6632 int i, trace_size = 0;
6633 unsigned long offset=0;
6634 ucontext_t *uc = (ucontext_t*) secret;
6635 sds infostring;
6636 REDIS_NOTUSED(info);
6637
6638 redisLog(REDIS_WARNING,
6639 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6640 infostring = genRedisInfoString();
6641 redisLog(REDIS_WARNING, "%s",infostring);
6642 /* It's not safe to sdsfree() the returned string under memory
6643 * corruption conditions. Let it leak as we are going to abort */
6644
6645 trace_size = backtrace(trace, 100);
6646 /* overwrite sigaction with caller's address */
6647 if (getMcontextEip(uc) != NULL) {
6648 trace[1] = getMcontextEip(uc);
6649 }
6650 messages = backtrace_symbols(trace, trace_size);
6651
6652 for (i=1; i<trace_size; ++i) {
6653 char *fn = findFuncName(trace[i], &offset), *p;
6654
6655 p = strchr(messages[i],'+');
6656 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6657 redisLog(REDIS_WARNING,"%s", messages[i]);
6658 } else {
6659 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6660 }
6661 }
6662 // free(messages); Don't call free() with possibly corrupted memory.
6663 exit(0);
6664 }
6665
6666 static void setupSigSegvAction(void) {
6667 struct sigaction act;
6668
6669 sigemptyset (&act.sa_mask);
6670 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6671 * is used. Otherwise, sa_handler is used */
6672 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6673 act.sa_sigaction = segvHandler;
6674 sigaction (SIGSEGV, &act, NULL);
6675 sigaction (SIGBUS, &act, NULL);
6676 sigaction (SIGFPE, &act, NULL);
6677 sigaction (SIGILL, &act, NULL);
6678 sigaction (SIGBUS, &act, NULL);
6679 return;
6680 }
6681
6682 #include "staticsymbols.h"
6683 /* This function try to convert a pointer into a function name. It's used in
6684 * oreder to provide a backtrace under segmentation fault that's able to
6685 * display functions declared as static (otherwise the backtrace is useless). */
6686 static char *findFuncName(void *pointer, unsigned long *offset){
6687 int i, ret = -1;
6688 unsigned long off, minoff = 0;
6689
6690 /* Try to match against the Symbol with the smallest offset */
6691 for (i=0; symsTable[i].pointer; i++) {
6692 unsigned long lp = (unsigned long) pointer;
6693
6694 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6695 off=lp-symsTable[i].pointer;
6696 if (ret < 0 || off < minoff) {
6697 minoff=off;
6698 ret=i;
6699 }
6700 }
6701 }
6702 if (ret == -1) return NULL;
6703 *offset = minoff;
6704 return symsTable[ret].name;
6705 }
6706 #else /* HAVE_BACKTRACE */
6707 static void setupSigSegvAction(void) {
6708 }
6709 #endif /* HAVE_BACKTRACE */
6710
6711
6712
6713 /* The End */
6714
6715
6716