]> git.saurik.com Git - redis.git/blob - redis.c
e6b4455e963dab404ab611f29f5dcd91c4598bc4
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166 /* Client flags */
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188 /* List related stuff */
189 #define REDIS_HEAD 0
190 #define REDIS_TAIL 1
191
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
197
198 /* Log levels */
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
202
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
205
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
213
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr);
217
218 /*================================= Data types ============================== */
219
220 /* A redis object, that is a type able to hold a string / list / set */
221
222 /* The VM object structure */
223 struct redisObjectVM {
224 off_t page; /* the page at witch the object is stored on disk */
225 off_t usedpages; /* number of pages used on disk */
226 time_t atime; /* Last access time */
227 } vm;
228
229 /* The actual Redis Object */
230 typedef struct redisObject {
231 void *ptr;
232 unsigned char type;
233 unsigned char encoding;
234 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
235 unsigned char notused;
236 int refcount;
237 /* VM fields, this are only allocated if VM is active, otherwise the
238 * object allocation function will just allocate
239 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
240 * Redis without VM active will not have any overhead. */
241 struct redisObjectVM vm;
242 } robj;
243
244 /* Macro used to initalize a Redis object allocated on the stack.
245 * Note that this macro is taken near the structure definition to make sure
246 * we'll update it when the structure is changed, to avoid bugs like
247 * bug #85 introduced exactly in this way. */
248 #define initStaticStringObject(_var,_ptr) do { \
249 _var.refcount = 1; \
250 _var.type = REDIS_STRING; \
251 _var.encoding = REDIS_ENCODING_RAW; \
252 _var.ptr = _ptr; \
253 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
254 } while(0);
255
256 typedef struct redisDb {
257 dict *dict; /* The keyspace for this DB */
258 dict *expires; /* Timeout of keys with a timeout set */
259 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
260 int id;
261 } redisDb;
262
263 /* Client MULTI/EXEC state */
264 typedef struct multiCmd {
265 robj **argv;
266 int argc;
267 struct redisCommand *cmd;
268 } multiCmd;
269
270 typedef struct multiState {
271 multiCmd *commands; /* Array of MULTI commands */
272 int count; /* Total number of MULTI commands */
273 } multiState;
274
275 /* With multiplexing we need to take per-clinet state.
276 * Clients are taken in a liked list. */
277 typedef struct redisClient {
278 int fd;
279 redisDb *db;
280 int dictid;
281 sds querybuf;
282 robj **argv, **mbargv;
283 int argc, mbargc;
284 int bulklen; /* bulk read len. -1 if not in bulk read mode */
285 int multibulk; /* multi bulk command format active */
286 list *reply;
287 int sentlen;
288 time_t lastinteraction; /* time of the last interaction, used for timeout */
289 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
290 /* REDIS_MULTI */
291 int slaveseldb; /* slave selected db, if this client is a slave */
292 int authenticated; /* when requirepass is non-NULL */
293 int replstate; /* replication state if this is a slave */
294 int repldbfd; /* replication DB file descriptor */
295 long repldboff; /* replication DB file offset */
296 off_t repldbsize; /* replication DB file size */
297 multiState mstate; /* MULTI/EXEC state */
298 robj **blockingkeys; /* The key we waiting to terminate a blocking
299 * operation such as BLPOP. Otherwise NULL. */
300 int blockingkeysnum; /* Number of blocking keys */
301 time_t blockingto; /* Blocking operation timeout. If UNIX current time
302 * is >= blockingto then the operation timed out. */
303 } redisClient;
304
305 struct saveparam {
306 time_t seconds;
307 int changes;
308 };
309
310 /* Global server state structure */
311 struct redisServer {
312 int port;
313 int fd;
314 redisDb *db;
315 dict *sharingpool; /* Poll used for object sharing */
316 unsigned int sharingpoolsize;
317 long long dirty; /* changes to DB from the last save */
318 list *clients;
319 list *slaves, *monitors;
320 char neterr[ANET_ERR_LEN];
321 aeEventLoop *el;
322 int cronloops; /* number of times the cron function run */
323 list *objfreelist; /* A list of freed objects to avoid malloc() */
324 time_t lastsave; /* Unix time of last save succeeede */
325 size_t usedmemory; /* Used memory in megabytes */
326 /* Fields used only for stats */
327 time_t stat_starttime; /* server start time */
328 long long stat_numcommands; /* number of processed commands */
329 long long stat_numconnections; /* number of connections received */
330 /* Configuration */
331 int verbosity;
332 int glueoutputbuf;
333 int maxidletime;
334 int dbnum;
335 int daemonize;
336 int appendonly;
337 int appendfsync;
338 time_t lastfsync;
339 int appendfd;
340 int appendseldb;
341 char *pidfile;
342 pid_t bgsavechildpid;
343 pid_t bgrewritechildpid;
344 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
345 struct saveparam *saveparams;
346 int saveparamslen;
347 char *logfile;
348 char *bindaddr;
349 char *dbfilename;
350 char *appendfilename;
351 char *requirepass;
352 int shareobjects;
353 int rdbcompression;
354 /* Replication related */
355 int isslave;
356 char *masterauth;
357 char *masterhost;
358 int masterport;
359 redisClient *master; /* client that is master for this slave */
360 int replstate;
361 unsigned int maxclients;
362 unsigned long maxmemory;
363 unsigned int blockedclients;
364 /* Sort parameters - qsort_r() is only available under BSD so we
365 * have to take this state global, in order to pass it to sortCompare() */
366 int sort_desc;
367 int sort_alpha;
368 int sort_bypattern;
369 /* Virtual memory configuration */
370 int vm_enabled;
371 off_t vm_page_size;
372 off_t vm_pages;
373 long vm_max_memory;
374 /* Virtual memory state */
375 FILE *vm_fp;
376 int vm_fd;
377 off_t vm_next_page; /* Next probably empty page */
378 off_t vm_near_pages; /* Number of pages allocated sequentially */
379 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
380 time_t unixtime; /* Unix time sampled every second. */
381 };
382
383 typedef void redisCommandProc(redisClient *c);
384 struct redisCommand {
385 char *name;
386 redisCommandProc *proc;
387 int arity;
388 int flags;
389 };
390
391 struct redisFunctionSym {
392 char *name;
393 unsigned long pointer;
394 };
395
396 typedef struct _redisSortObject {
397 robj *obj;
398 union {
399 double score;
400 robj *cmpobj;
401 } u;
402 } redisSortObject;
403
404 typedef struct _redisSortOperation {
405 int type;
406 robj *pattern;
407 } redisSortOperation;
408
409 /* ZSETs use a specialized version of Skiplists */
410
411 typedef struct zskiplistNode {
412 struct zskiplistNode **forward;
413 struct zskiplistNode *backward;
414 double score;
415 robj *obj;
416 } zskiplistNode;
417
418 typedef struct zskiplist {
419 struct zskiplistNode *header, *tail;
420 unsigned long length;
421 int level;
422 } zskiplist;
423
424 typedef struct zset {
425 dict *dict;
426 zskiplist *zsl;
427 } zset;
428
429 /* Our shared "common" objects */
430
431 struct sharedObjectsStruct {
432 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
433 *colon, *nullbulk, *nullmultibulk, *queued,
434 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
435 *outofrangeerr, *plus,
436 *select0, *select1, *select2, *select3, *select4,
437 *select5, *select6, *select7, *select8, *select9;
438 } shared;
439
440 /* Global vars that are actally used as constants. The following double
441 * values are used for double on-disk serialization, and are initialized
442 * at runtime to avoid strange compiler optimizations. */
443
444 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
445
446 /*================================ Prototypes =============================== */
447
448 static void freeStringObject(robj *o);
449 static void freeListObject(robj *o);
450 static void freeSetObject(robj *o);
451 static void decrRefCount(void *o);
452 static robj *createObject(int type, void *ptr);
453 static void freeClient(redisClient *c);
454 static int rdbLoad(char *filename);
455 static void addReply(redisClient *c, robj *obj);
456 static void addReplySds(redisClient *c, sds s);
457 static void incrRefCount(robj *o);
458 static int rdbSaveBackground(char *filename);
459 static robj *createStringObject(char *ptr, size_t len);
460 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
461 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
462 static int syncWithMaster(void);
463 static robj *tryObjectSharing(robj *o);
464 static int tryObjectEncoding(robj *o);
465 static robj *getDecodedObject(robj *o);
466 static int removeExpire(redisDb *db, robj *key);
467 static int expireIfNeeded(redisDb *db, robj *key);
468 static int deleteIfVolatile(redisDb *db, robj *key);
469 static int deleteKey(redisDb *db, robj *key);
470 static time_t getExpire(redisDb *db, robj *key);
471 static int setExpire(redisDb *db, robj *key, time_t when);
472 static void updateSlavesWaitingBgsave(int bgsaveerr);
473 static void freeMemoryIfNeeded(void);
474 static int processCommand(redisClient *c);
475 static void setupSigSegvAction(void);
476 static void rdbRemoveTempFile(pid_t childpid);
477 static void aofRemoveTempFile(pid_t childpid);
478 static size_t stringObjectLen(robj *o);
479 static void processInputBuffer(redisClient *c);
480 static zskiplist *zslCreate(void);
481 static void zslFree(zskiplist *zsl);
482 static void zslInsert(zskiplist *zsl, double score, robj *obj);
483 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
484 static void initClientMultiState(redisClient *c);
485 static void freeClientMultiState(redisClient *c);
486 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
487 static void unblockClient(redisClient *c);
488 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
489 static void vmInit(void);
490
491 static void authCommand(redisClient *c);
492 static void pingCommand(redisClient *c);
493 static void echoCommand(redisClient *c);
494 static void setCommand(redisClient *c);
495 static void setnxCommand(redisClient *c);
496 static void getCommand(redisClient *c);
497 static void delCommand(redisClient *c);
498 static void existsCommand(redisClient *c);
499 static void incrCommand(redisClient *c);
500 static void decrCommand(redisClient *c);
501 static void incrbyCommand(redisClient *c);
502 static void decrbyCommand(redisClient *c);
503 static void selectCommand(redisClient *c);
504 static void randomkeyCommand(redisClient *c);
505 static void keysCommand(redisClient *c);
506 static void dbsizeCommand(redisClient *c);
507 static void lastsaveCommand(redisClient *c);
508 static void saveCommand(redisClient *c);
509 static void bgsaveCommand(redisClient *c);
510 static void bgrewriteaofCommand(redisClient *c);
511 static void shutdownCommand(redisClient *c);
512 static void moveCommand(redisClient *c);
513 static void renameCommand(redisClient *c);
514 static void renamenxCommand(redisClient *c);
515 static void lpushCommand(redisClient *c);
516 static void rpushCommand(redisClient *c);
517 static void lpopCommand(redisClient *c);
518 static void rpopCommand(redisClient *c);
519 static void llenCommand(redisClient *c);
520 static void lindexCommand(redisClient *c);
521 static void lrangeCommand(redisClient *c);
522 static void ltrimCommand(redisClient *c);
523 static void typeCommand(redisClient *c);
524 static void lsetCommand(redisClient *c);
525 static void saddCommand(redisClient *c);
526 static void sremCommand(redisClient *c);
527 static void smoveCommand(redisClient *c);
528 static void sismemberCommand(redisClient *c);
529 static void scardCommand(redisClient *c);
530 static void spopCommand(redisClient *c);
531 static void srandmemberCommand(redisClient *c);
532 static void sinterCommand(redisClient *c);
533 static void sinterstoreCommand(redisClient *c);
534 static void sunionCommand(redisClient *c);
535 static void sunionstoreCommand(redisClient *c);
536 static void sdiffCommand(redisClient *c);
537 static void sdiffstoreCommand(redisClient *c);
538 static void syncCommand(redisClient *c);
539 static void flushdbCommand(redisClient *c);
540 static void flushallCommand(redisClient *c);
541 static void sortCommand(redisClient *c);
542 static void lremCommand(redisClient *c);
543 static void rpoplpushcommand(redisClient *c);
544 static void infoCommand(redisClient *c);
545 static void mgetCommand(redisClient *c);
546 static void monitorCommand(redisClient *c);
547 static void expireCommand(redisClient *c);
548 static void expireatCommand(redisClient *c);
549 static void getsetCommand(redisClient *c);
550 static void ttlCommand(redisClient *c);
551 static void slaveofCommand(redisClient *c);
552 static void debugCommand(redisClient *c);
553 static void msetCommand(redisClient *c);
554 static void msetnxCommand(redisClient *c);
555 static void zaddCommand(redisClient *c);
556 static void zincrbyCommand(redisClient *c);
557 static void zrangeCommand(redisClient *c);
558 static void zrangebyscoreCommand(redisClient *c);
559 static void zrevrangeCommand(redisClient *c);
560 static void zcardCommand(redisClient *c);
561 static void zremCommand(redisClient *c);
562 static void zscoreCommand(redisClient *c);
563 static void zremrangebyscoreCommand(redisClient *c);
564 static void multiCommand(redisClient *c);
565 static void execCommand(redisClient *c);
566 static void blpopCommand(redisClient *c);
567 static void brpopCommand(redisClient *c);
568
569 /*================================= Globals ================================= */
570
571 /* Global vars */
572 static struct redisServer server; /* server global state */
573 static struct redisCommand cmdTable[] = {
574 {"get",getCommand,2,REDIS_CMD_INLINE},
575 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
576 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
577 {"del",delCommand,-2,REDIS_CMD_INLINE},
578 {"exists",existsCommand,2,REDIS_CMD_INLINE},
579 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
580 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
581 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
582 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
583 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
584 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
585 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
586 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
587 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
588 {"llen",llenCommand,2,REDIS_CMD_INLINE},
589 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
590 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
591 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
592 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
593 {"lrem",lremCommand,4,REDIS_CMD_BULK},
594 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
595 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
596 {"srem",sremCommand,3,REDIS_CMD_BULK},
597 {"smove",smoveCommand,4,REDIS_CMD_BULK},
598 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
599 {"scard",scardCommand,2,REDIS_CMD_INLINE},
600 {"spop",spopCommand,2,REDIS_CMD_INLINE},
601 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
602 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
603 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
604 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
605 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
606 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
607 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
608 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
609 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
610 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
611 {"zrem",zremCommand,3,REDIS_CMD_BULK},
612 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
613 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
614 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
615 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
616 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
617 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
618 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
619 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
620 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
621 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
622 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
623 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
624 {"select",selectCommand,2,REDIS_CMD_INLINE},
625 {"move",moveCommand,3,REDIS_CMD_INLINE},
626 {"rename",renameCommand,3,REDIS_CMD_INLINE},
627 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
628 {"expire",expireCommand,3,REDIS_CMD_INLINE},
629 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
630 {"keys",keysCommand,2,REDIS_CMD_INLINE},
631 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
632 {"auth",authCommand,2,REDIS_CMD_INLINE},
633 {"ping",pingCommand,1,REDIS_CMD_INLINE},
634 {"echo",echoCommand,2,REDIS_CMD_BULK},
635 {"save",saveCommand,1,REDIS_CMD_INLINE},
636 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
637 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
638 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
639 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
640 {"type",typeCommand,2,REDIS_CMD_INLINE},
641 {"multi",multiCommand,1,REDIS_CMD_INLINE},
642 {"exec",execCommand,1,REDIS_CMD_INLINE},
643 {"sync",syncCommand,1,REDIS_CMD_INLINE},
644 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
645 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
646 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
647 {"info",infoCommand,1,REDIS_CMD_INLINE},
648 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
649 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
650 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
651 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
652 {NULL,NULL,0,0}
653 };
654
655 /*============================ Utility functions ============================ */
656
657 /* Glob-style pattern matching. */
658 int stringmatchlen(const char *pattern, int patternLen,
659 const char *string, int stringLen, int nocase)
660 {
661 while(patternLen) {
662 switch(pattern[0]) {
663 case '*':
664 while (pattern[1] == '*') {
665 pattern++;
666 patternLen--;
667 }
668 if (patternLen == 1)
669 return 1; /* match */
670 while(stringLen) {
671 if (stringmatchlen(pattern+1, patternLen-1,
672 string, stringLen, nocase))
673 return 1; /* match */
674 string++;
675 stringLen--;
676 }
677 return 0; /* no match */
678 break;
679 case '?':
680 if (stringLen == 0)
681 return 0; /* no match */
682 string++;
683 stringLen--;
684 break;
685 case '[':
686 {
687 int not, match;
688
689 pattern++;
690 patternLen--;
691 not = pattern[0] == '^';
692 if (not) {
693 pattern++;
694 patternLen--;
695 }
696 match = 0;
697 while(1) {
698 if (pattern[0] == '\\') {
699 pattern++;
700 patternLen--;
701 if (pattern[0] == string[0])
702 match = 1;
703 } else if (pattern[0] == ']') {
704 break;
705 } else if (patternLen == 0) {
706 pattern--;
707 patternLen++;
708 break;
709 } else if (pattern[1] == '-' && patternLen >= 3) {
710 int start = pattern[0];
711 int end = pattern[2];
712 int c = string[0];
713 if (start > end) {
714 int t = start;
715 start = end;
716 end = t;
717 }
718 if (nocase) {
719 start = tolower(start);
720 end = tolower(end);
721 c = tolower(c);
722 }
723 pattern += 2;
724 patternLen -= 2;
725 if (c >= start && c <= end)
726 match = 1;
727 } else {
728 if (!nocase) {
729 if (pattern[0] == string[0])
730 match = 1;
731 } else {
732 if (tolower((int)pattern[0]) == tolower((int)string[0]))
733 match = 1;
734 }
735 }
736 pattern++;
737 patternLen--;
738 }
739 if (not)
740 match = !match;
741 if (!match)
742 return 0; /* no match */
743 string++;
744 stringLen--;
745 break;
746 }
747 case '\\':
748 if (patternLen >= 2) {
749 pattern++;
750 patternLen--;
751 }
752 /* fall through */
753 default:
754 if (!nocase) {
755 if (pattern[0] != string[0])
756 return 0; /* no match */
757 } else {
758 if (tolower((int)pattern[0]) != tolower((int)string[0]))
759 return 0; /* no match */
760 }
761 string++;
762 stringLen--;
763 break;
764 }
765 pattern++;
766 patternLen--;
767 if (stringLen == 0) {
768 while(*pattern == '*') {
769 pattern++;
770 patternLen--;
771 }
772 break;
773 }
774 }
775 if (patternLen == 0 && stringLen == 0)
776 return 1;
777 return 0;
778 }
779
780 static void redisLog(int level, const char *fmt, ...) {
781 va_list ap;
782 FILE *fp;
783
784 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
785 if (!fp) return;
786
787 va_start(ap, fmt);
788 if (level >= server.verbosity) {
789 char *c = ".-*";
790 char buf[64];
791 time_t now;
792
793 now = time(NULL);
794 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
795 fprintf(fp,"%s %c ",buf,c[level]);
796 vfprintf(fp, fmt, ap);
797 fprintf(fp,"\n");
798 fflush(fp);
799 }
800 va_end(ap);
801
802 if (server.logfile) fclose(fp);
803 }
804
805 /*====================== Hash table type implementation ==================== */
806
807 /* This is an hash table type that uses the SDS dynamic strings libary as
808 * keys and radis objects as values (objects can hold SDS strings,
809 * lists, sets). */
810
811 static void dictVanillaFree(void *privdata, void *val)
812 {
813 DICT_NOTUSED(privdata);
814 zfree(val);
815 }
816
817 static void dictListDestructor(void *privdata, void *val)
818 {
819 DICT_NOTUSED(privdata);
820 listRelease((list*)val);
821 }
822
823 static int sdsDictKeyCompare(void *privdata, const void *key1,
824 const void *key2)
825 {
826 int l1,l2;
827 DICT_NOTUSED(privdata);
828
829 l1 = sdslen((sds)key1);
830 l2 = sdslen((sds)key2);
831 if (l1 != l2) return 0;
832 return memcmp(key1, key2, l1) == 0;
833 }
834
835 static void dictRedisObjectDestructor(void *privdata, void *val)
836 {
837 DICT_NOTUSED(privdata);
838
839 decrRefCount(val);
840 }
841
842 static int dictObjKeyCompare(void *privdata, const void *key1,
843 const void *key2)
844 {
845 const robj *o1 = key1, *o2 = key2;
846 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
847 }
848
849 static unsigned int dictObjHash(const void *key) {
850 const robj *o = key;
851 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
852 }
853
854 static int dictEncObjKeyCompare(void *privdata, const void *key1,
855 const void *key2)
856 {
857 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
858 int cmp;
859
860 o1 = getDecodedObject(o1);
861 o2 = getDecodedObject(o2);
862 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
863 decrRefCount(o1);
864 decrRefCount(o2);
865 return cmp;
866 }
867
868 static unsigned int dictEncObjHash(const void *key) {
869 robj *o = (robj*) key;
870
871 o = getDecodedObject(o);
872 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
873 decrRefCount(o);
874 return hash;
875 }
876
877 static dictType setDictType = {
878 dictEncObjHash, /* hash function */
879 NULL, /* key dup */
880 NULL, /* val dup */
881 dictEncObjKeyCompare, /* key compare */
882 dictRedisObjectDestructor, /* key destructor */
883 NULL /* val destructor */
884 };
885
886 static dictType zsetDictType = {
887 dictEncObjHash, /* hash function */
888 NULL, /* key dup */
889 NULL, /* val dup */
890 dictEncObjKeyCompare, /* key compare */
891 dictRedisObjectDestructor, /* key destructor */
892 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
893 };
894
895 static dictType hashDictType = {
896 dictObjHash, /* hash function */
897 NULL, /* key dup */
898 NULL, /* val dup */
899 dictObjKeyCompare, /* key compare */
900 dictRedisObjectDestructor, /* key destructor */
901 dictRedisObjectDestructor /* val destructor */
902 };
903
904 /* Keylist hash table type has unencoded redis objects as keys and
905 * lists as values. It's used for blocking operations (BLPOP) */
906 static dictType keylistDictType = {
907 dictObjHash, /* hash function */
908 NULL, /* key dup */
909 NULL, /* val dup */
910 dictObjKeyCompare, /* key compare */
911 dictRedisObjectDestructor, /* key destructor */
912 dictListDestructor /* val destructor */
913 };
914
915 /* ========================= Random utility functions ======================= */
916
917 /* Redis generally does not try to recover from out of memory conditions
918 * when allocating objects or strings, it is not clear if it will be possible
919 * to report this condition to the client since the networking layer itself
920 * is based on heap allocation for send buffers, so we simply abort.
921 * At least the code will be simpler to read... */
922 static void oom(const char *msg) {
923 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
924 sleep(1);
925 abort();
926 }
927
928 /* ====================== Redis server networking stuff ===================== */
929 static void closeTimedoutClients(void) {
930 redisClient *c;
931 listNode *ln;
932 time_t now = time(NULL);
933
934 listRewind(server.clients);
935 while ((ln = listYield(server.clients)) != NULL) {
936 c = listNodeValue(ln);
937 if (server.maxidletime &&
938 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
939 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
940 (now - c->lastinteraction > server.maxidletime))
941 {
942 redisLog(REDIS_DEBUG,"Closing idle client");
943 freeClient(c);
944 } else if (c->flags & REDIS_BLOCKED) {
945 if (c->blockingto != 0 && c->blockingto < now) {
946 addReply(c,shared.nullmultibulk);
947 unblockClient(c);
948 }
949 }
950 }
951 }
952
953 static int htNeedsResize(dict *dict) {
954 long long size, used;
955
956 size = dictSlots(dict);
957 used = dictSize(dict);
958 return (size && used && size > DICT_HT_INITIAL_SIZE &&
959 (used*100/size < REDIS_HT_MINFILL));
960 }
961
962 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
963 * we resize the hash table to save memory */
964 static void tryResizeHashTables(void) {
965 int j;
966
967 for (j = 0; j < server.dbnum; j++) {
968 if (htNeedsResize(server.db[j].dict)) {
969 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
970 dictResize(server.db[j].dict);
971 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
972 }
973 if (htNeedsResize(server.db[j].expires))
974 dictResize(server.db[j].expires);
975 }
976 }
977
978 /* A background saving child (BGSAVE) terminated its work. Handle this. */
979 void backgroundSaveDoneHandler(int statloc) {
980 int exitcode = WEXITSTATUS(statloc);
981 int bysignal = WIFSIGNALED(statloc);
982
983 if (!bysignal && exitcode == 0) {
984 redisLog(REDIS_NOTICE,
985 "Background saving terminated with success");
986 server.dirty = 0;
987 server.lastsave = time(NULL);
988 } else if (!bysignal && exitcode != 0) {
989 redisLog(REDIS_WARNING, "Background saving error");
990 } else {
991 redisLog(REDIS_WARNING,
992 "Background saving terminated by signal");
993 rdbRemoveTempFile(server.bgsavechildpid);
994 }
995 server.bgsavechildpid = -1;
996 /* Possibly there are slaves waiting for a BGSAVE in order to be served
997 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
998 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
999 }
1000
1001 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1002 * Handle this. */
1003 void backgroundRewriteDoneHandler(int statloc) {
1004 int exitcode = WEXITSTATUS(statloc);
1005 int bysignal = WIFSIGNALED(statloc);
1006
1007 if (!bysignal && exitcode == 0) {
1008 int fd;
1009 char tmpfile[256];
1010
1011 redisLog(REDIS_NOTICE,
1012 "Background append only file rewriting terminated with success");
1013 /* Now it's time to flush the differences accumulated by the parent */
1014 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1015 fd = open(tmpfile,O_WRONLY|O_APPEND);
1016 if (fd == -1) {
1017 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1018 goto cleanup;
1019 }
1020 /* Flush our data... */
1021 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1022 (signed) sdslen(server.bgrewritebuf)) {
1023 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1024 close(fd);
1025 goto cleanup;
1026 }
1027 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1028 /* Now our work is to rename the temp file into the stable file. And
1029 * switch the file descriptor used by the server for append only. */
1030 if (rename(tmpfile,server.appendfilename) == -1) {
1031 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1032 close(fd);
1033 goto cleanup;
1034 }
1035 /* Mission completed... almost */
1036 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1037 if (server.appendfd != -1) {
1038 /* If append only is actually enabled... */
1039 close(server.appendfd);
1040 server.appendfd = fd;
1041 fsync(fd);
1042 server.appendseldb = -1; /* Make sure it will issue SELECT */
1043 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1044 } else {
1045 /* If append only is disabled we just generate a dump in this
1046 * format. Why not? */
1047 close(fd);
1048 }
1049 } else if (!bysignal && exitcode != 0) {
1050 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1051 } else {
1052 redisLog(REDIS_WARNING,
1053 "Background append only file rewriting terminated by signal");
1054 }
1055 cleanup:
1056 sdsfree(server.bgrewritebuf);
1057 server.bgrewritebuf = sdsempty();
1058 aofRemoveTempFile(server.bgrewritechildpid);
1059 server.bgrewritechildpid = -1;
1060 }
1061
1062 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1063 int j, loops = server.cronloops++;
1064 REDIS_NOTUSED(eventLoop);
1065 REDIS_NOTUSED(id);
1066 REDIS_NOTUSED(clientData);
1067
1068 /* We take a cached value of the unix time in the global state because
1069 * with virtual memory and aging there is to store the current time
1070 * in objects at every object access, and accuracy is not needed.
1071 * To access a global var is faster than calling time(NULL) */
1072 server.unixtime = time(NULL);
1073
1074 /* Update the global state with the amount of used memory */
1075 server.usedmemory = zmalloc_used_memory();
1076
1077 /* Show some info about non-empty databases */
1078 for (j = 0; j < server.dbnum; j++) {
1079 long long size, used, vkeys;
1080
1081 size = dictSlots(server.db[j].dict);
1082 used = dictSize(server.db[j].dict);
1083 vkeys = dictSize(server.db[j].expires);
1084 if (!(loops % 5) && (used || vkeys)) {
1085 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1086 /* dictPrintStats(server.dict); */
1087 }
1088 }
1089
1090 /* We don't want to resize the hash tables while a bacground saving
1091 * is in progress: the saving child is created using fork() that is
1092 * implemented with a copy-on-write semantic in most modern systems, so
1093 * if we resize the HT while there is the saving child at work actually
1094 * a lot of memory movements in the parent will cause a lot of pages
1095 * copied. */
1096 if (server.bgsavechildpid == -1) tryResizeHashTables();
1097
1098 /* Show information about connected clients */
1099 if (!(loops % 5)) {
1100 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1101 listLength(server.clients)-listLength(server.slaves),
1102 listLength(server.slaves),
1103 server.usedmemory,
1104 dictSize(server.sharingpool));
1105 }
1106
1107 /* Close connections of timedout clients */
1108 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1109 closeTimedoutClients();
1110
1111 /* Check if a background saving or AOF rewrite in progress terminated */
1112 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1113 int statloc;
1114 pid_t pid;
1115
1116 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1117 if (pid == server.bgsavechildpid) {
1118 backgroundSaveDoneHandler(statloc);
1119 } else {
1120 backgroundRewriteDoneHandler(statloc);
1121 }
1122 }
1123 } else {
1124 /* If there is not a background saving in progress check if
1125 * we have to save now */
1126 time_t now = time(NULL);
1127 for (j = 0; j < server.saveparamslen; j++) {
1128 struct saveparam *sp = server.saveparams+j;
1129
1130 if (server.dirty >= sp->changes &&
1131 now-server.lastsave > sp->seconds) {
1132 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1133 sp->changes, sp->seconds);
1134 rdbSaveBackground(server.dbfilename);
1135 break;
1136 }
1137 }
1138 }
1139
1140 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1141 * will use few CPU cycles if there are few expiring keys, otherwise
1142 * it will get more aggressive to avoid that too much memory is used by
1143 * keys that can be removed from the keyspace. */
1144 for (j = 0; j < server.dbnum; j++) {
1145 int expired;
1146 redisDb *db = server.db+j;
1147
1148 /* Continue to expire if at the end of the cycle more than 25%
1149 * of the keys were expired. */
1150 do {
1151 int num = dictSize(db->expires);
1152 time_t now = time(NULL);
1153
1154 expired = 0;
1155 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1156 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1157 while (num--) {
1158 dictEntry *de;
1159 time_t t;
1160
1161 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1162 t = (time_t) dictGetEntryVal(de);
1163 if (now > t) {
1164 deleteKey(db,dictGetEntryKey(de));
1165 expired++;
1166 }
1167 }
1168 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1169 }
1170
1171 /* Check if we should connect to a MASTER */
1172 if (server.replstate == REDIS_REPL_CONNECT) {
1173 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1174 if (syncWithMaster() == REDIS_OK) {
1175 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1176 }
1177 }
1178 return 1000;
1179 }
1180
1181 static void createSharedObjects(void) {
1182 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1183 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1184 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1185 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1186 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1187 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1188 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1189 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1190 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1191 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1192 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1193 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1194 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1195 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1196 "-ERR no such key\r\n"));
1197 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1198 "-ERR syntax error\r\n"));
1199 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1200 "-ERR source and destination objects are the same\r\n"));
1201 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1202 "-ERR index out of range\r\n"));
1203 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1204 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1205 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1206 shared.select0 = createStringObject("select 0\r\n",10);
1207 shared.select1 = createStringObject("select 1\r\n",10);
1208 shared.select2 = createStringObject("select 2\r\n",10);
1209 shared.select3 = createStringObject("select 3\r\n",10);
1210 shared.select4 = createStringObject("select 4\r\n",10);
1211 shared.select5 = createStringObject("select 5\r\n",10);
1212 shared.select6 = createStringObject("select 6\r\n",10);
1213 shared.select7 = createStringObject("select 7\r\n",10);
1214 shared.select8 = createStringObject("select 8\r\n",10);
1215 shared.select9 = createStringObject("select 9\r\n",10);
1216 }
1217
1218 static void appendServerSaveParams(time_t seconds, int changes) {
1219 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1220 server.saveparams[server.saveparamslen].seconds = seconds;
1221 server.saveparams[server.saveparamslen].changes = changes;
1222 server.saveparamslen++;
1223 }
1224
1225 static void resetServerSaveParams() {
1226 zfree(server.saveparams);
1227 server.saveparams = NULL;
1228 server.saveparamslen = 0;
1229 }
1230
1231 static void initServerConfig() {
1232 server.dbnum = REDIS_DEFAULT_DBNUM;
1233 server.port = REDIS_SERVERPORT;
1234 server.verbosity = REDIS_DEBUG;
1235 server.maxidletime = REDIS_MAXIDLETIME;
1236 server.saveparams = NULL;
1237 server.logfile = NULL; /* NULL = log on standard output */
1238 server.bindaddr = NULL;
1239 server.glueoutputbuf = 1;
1240 server.daemonize = 0;
1241 server.appendonly = 0;
1242 server.appendfsync = APPENDFSYNC_ALWAYS;
1243 server.lastfsync = time(NULL);
1244 server.appendfd = -1;
1245 server.appendseldb = -1; /* Make sure the first time will not match */
1246 server.pidfile = "/var/run/redis.pid";
1247 server.dbfilename = "dump.rdb";
1248 server.appendfilename = "appendonly.aof";
1249 server.requirepass = NULL;
1250 server.shareobjects = 0;
1251 server.rdbcompression = 1;
1252 server.sharingpoolsize = 1024;
1253 server.maxclients = 0;
1254 server.blockedclients = 0;
1255 server.maxmemory = 0;
1256 server.vm_enabled = 0;
1257 server.vm_page_size = 256; /* 256 bytes per page */
1258 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1259 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1260
1261 resetServerSaveParams();
1262
1263 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1264 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1265 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1266 /* Replication related */
1267 server.isslave = 0;
1268 server.masterauth = NULL;
1269 server.masterhost = NULL;
1270 server.masterport = 6379;
1271 server.master = NULL;
1272 server.replstate = REDIS_REPL_NONE;
1273
1274 /* Double constants initialization */
1275 R_Zero = 0.0;
1276 R_PosInf = 1.0/R_Zero;
1277 R_NegInf = -1.0/R_Zero;
1278 R_Nan = R_Zero/R_Zero;
1279 }
1280
1281 static void initServer() {
1282 int j;
1283
1284 signal(SIGHUP, SIG_IGN);
1285 signal(SIGPIPE, SIG_IGN);
1286 setupSigSegvAction();
1287
1288 server.clients = listCreate();
1289 server.slaves = listCreate();
1290 server.monitors = listCreate();
1291 server.objfreelist = listCreate();
1292 createSharedObjects();
1293 server.el = aeCreateEventLoop();
1294 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1295 server.sharingpool = dictCreate(&setDictType,NULL);
1296 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1297 if (server.fd == -1) {
1298 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1299 exit(1);
1300 }
1301 for (j = 0; j < server.dbnum; j++) {
1302 server.db[j].dict = dictCreate(&hashDictType,NULL);
1303 server.db[j].expires = dictCreate(&setDictType,NULL);
1304 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1305 server.db[j].id = j;
1306 }
1307 server.cronloops = 0;
1308 server.bgsavechildpid = -1;
1309 server.bgrewritechildpid = -1;
1310 server.bgrewritebuf = sdsempty();
1311 server.lastsave = time(NULL);
1312 server.dirty = 0;
1313 server.usedmemory = 0;
1314 server.stat_numcommands = 0;
1315 server.stat_numconnections = 0;
1316 server.stat_starttime = time(NULL);
1317 server.unixtime = time(NULL);
1318 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1319
1320 if (server.appendonly) {
1321 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1322 if (server.appendfd == -1) {
1323 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1324 strerror(errno));
1325 exit(1);
1326 }
1327 }
1328
1329 if (server.vm_enabled) vmInit();
1330 }
1331
1332 /* Empty the whole database */
1333 static long long emptyDb() {
1334 int j;
1335 long long removed = 0;
1336
1337 for (j = 0; j < server.dbnum; j++) {
1338 removed += dictSize(server.db[j].dict);
1339 dictEmpty(server.db[j].dict);
1340 dictEmpty(server.db[j].expires);
1341 }
1342 return removed;
1343 }
1344
1345 static int yesnotoi(char *s) {
1346 if (!strcasecmp(s,"yes")) return 1;
1347 else if (!strcasecmp(s,"no")) return 0;
1348 else return -1;
1349 }
1350
1351 /* I agree, this is a very rudimental way to load a configuration...
1352 will improve later if the config gets more complex */
1353 static void loadServerConfig(char *filename) {
1354 FILE *fp;
1355 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1356 int linenum = 0;
1357 sds line = NULL;
1358
1359 if (filename[0] == '-' && filename[1] == '\0')
1360 fp = stdin;
1361 else {
1362 if ((fp = fopen(filename,"r")) == NULL) {
1363 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1364 exit(1);
1365 }
1366 }
1367
1368 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1369 sds *argv;
1370 int argc, j;
1371
1372 linenum++;
1373 line = sdsnew(buf);
1374 line = sdstrim(line," \t\r\n");
1375
1376 /* Skip comments and blank lines*/
1377 if (line[0] == '#' || line[0] == '\0') {
1378 sdsfree(line);
1379 continue;
1380 }
1381
1382 /* Split into arguments */
1383 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1384 sdstolower(argv[0]);
1385
1386 /* Execute config directives */
1387 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1388 server.maxidletime = atoi(argv[1]);
1389 if (server.maxidletime < 0) {
1390 err = "Invalid timeout value"; goto loaderr;
1391 }
1392 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1393 server.port = atoi(argv[1]);
1394 if (server.port < 1 || server.port > 65535) {
1395 err = "Invalid port"; goto loaderr;
1396 }
1397 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1398 server.bindaddr = zstrdup(argv[1]);
1399 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1400 int seconds = atoi(argv[1]);
1401 int changes = atoi(argv[2]);
1402 if (seconds < 1 || changes < 0) {
1403 err = "Invalid save parameters"; goto loaderr;
1404 }
1405 appendServerSaveParams(seconds,changes);
1406 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1407 if (chdir(argv[1]) == -1) {
1408 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1409 argv[1], strerror(errno));
1410 exit(1);
1411 }
1412 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1413 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1414 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1415 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1416 else {
1417 err = "Invalid log level. Must be one of debug, notice, warning";
1418 goto loaderr;
1419 }
1420 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1421 FILE *logfp;
1422
1423 server.logfile = zstrdup(argv[1]);
1424 if (!strcasecmp(server.logfile,"stdout")) {
1425 zfree(server.logfile);
1426 server.logfile = NULL;
1427 }
1428 if (server.logfile) {
1429 /* Test if we are able to open the file. The server will not
1430 * be able to abort just for this problem later... */
1431 logfp = fopen(server.logfile,"a");
1432 if (logfp == NULL) {
1433 err = sdscatprintf(sdsempty(),
1434 "Can't open the log file: %s", strerror(errno));
1435 goto loaderr;
1436 }
1437 fclose(logfp);
1438 }
1439 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1440 server.dbnum = atoi(argv[1]);
1441 if (server.dbnum < 1) {
1442 err = "Invalid number of databases"; goto loaderr;
1443 }
1444 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1445 server.maxclients = atoi(argv[1]);
1446 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1447 server.maxmemory = strtoll(argv[1], NULL, 10);
1448 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1449 server.masterhost = sdsnew(argv[1]);
1450 server.masterport = atoi(argv[2]);
1451 server.replstate = REDIS_REPL_CONNECT;
1452 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1453 server.masterauth = zstrdup(argv[1]);
1454 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1455 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1456 err = "argument must be 'yes' or 'no'"; goto loaderr;
1457 }
1458 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1459 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1460 err = "argument must be 'yes' or 'no'"; goto loaderr;
1461 }
1462 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1463 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1464 err = "argument must be 'yes' or 'no'"; goto loaderr;
1465 }
1466 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1467 server.sharingpoolsize = atoi(argv[1]);
1468 if (server.sharingpoolsize < 1) {
1469 err = "invalid object sharing pool size"; goto loaderr;
1470 }
1471 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1472 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1473 err = "argument must be 'yes' or 'no'"; goto loaderr;
1474 }
1475 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1476 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1477 err = "argument must be 'yes' or 'no'"; goto loaderr;
1478 }
1479 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1480 if (!strcasecmp(argv[1],"no")) {
1481 server.appendfsync = APPENDFSYNC_NO;
1482 } else if (!strcasecmp(argv[1],"always")) {
1483 server.appendfsync = APPENDFSYNC_ALWAYS;
1484 } else if (!strcasecmp(argv[1],"everysec")) {
1485 server.appendfsync = APPENDFSYNC_EVERYSEC;
1486 } else {
1487 err = "argument must be 'no', 'always' or 'everysec'";
1488 goto loaderr;
1489 }
1490 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1491 server.requirepass = zstrdup(argv[1]);
1492 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1493 server.pidfile = zstrdup(argv[1]);
1494 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1495 server.dbfilename = zstrdup(argv[1]);
1496 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1497 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1498 err = "argument must be 'yes' or 'no'"; goto loaderr;
1499 }
1500 } else {
1501 err = "Bad directive or wrong number of arguments"; goto loaderr;
1502 }
1503 for (j = 0; j < argc; j++)
1504 sdsfree(argv[j]);
1505 zfree(argv);
1506 sdsfree(line);
1507 }
1508 if (fp != stdin) fclose(fp);
1509 return;
1510
1511 loaderr:
1512 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1513 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1514 fprintf(stderr, ">>> '%s'\n", line);
1515 fprintf(stderr, "%s\n", err);
1516 exit(1);
1517 }
1518
1519 static void freeClientArgv(redisClient *c) {
1520 int j;
1521
1522 for (j = 0; j < c->argc; j++)
1523 decrRefCount(c->argv[j]);
1524 for (j = 0; j < c->mbargc; j++)
1525 decrRefCount(c->mbargv[j]);
1526 c->argc = 0;
1527 c->mbargc = 0;
1528 }
1529
1530 static void freeClient(redisClient *c) {
1531 listNode *ln;
1532
1533 /* Note that if the client we are freeing is blocked into a blocking
1534 * call, we have to set querybuf to NULL *before* to call unblockClient()
1535 * to avoid processInputBuffer() will get called. Also it is important
1536 * to remove the file events after this, because this call adds
1537 * the READABLE event. */
1538 sdsfree(c->querybuf);
1539 c->querybuf = NULL;
1540 if (c->flags & REDIS_BLOCKED)
1541 unblockClient(c);
1542
1543 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1544 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1545 listRelease(c->reply);
1546 freeClientArgv(c);
1547 close(c->fd);
1548 ln = listSearchKey(server.clients,c);
1549 redisAssert(ln != NULL);
1550 listDelNode(server.clients,ln);
1551 if (c->flags & REDIS_SLAVE) {
1552 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1553 close(c->repldbfd);
1554 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1555 ln = listSearchKey(l,c);
1556 redisAssert(ln != NULL);
1557 listDelNode(l,ln);
1558 }
1559 if (c->flags & REDIS_MASTER) {
1560 server.master = NULL;
1561 server.replstate = REDIS_REPL_CONNECT;
1562 }
1563 zfree(c->argv);
1564 zfree(c->mbargv);
1565 freeClientMultiState(c);
1566 zfree(c);
1567 }
1568
1569 #define GLUEREPLY_UP_TO (1024)
1570 static void glueReplyBuffersIfNeeded(redisClient *c) {
1571 int copylen = 0;
1572 char buf[GLUEREPLY_UP_TO];
1573 listNode *ln;
1574 robj *o;
1575
1576 listRewind(c->reply);
1577 while((ln = listYield(c->reply))) {
1578 int objlen;
1579
1580 o = ln->value;
1581 objlen = sdslen(o->ptr);
1582 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1583 memcpy(buf+copylen,o->ptr,objlen);
1584 copylen += objlen;
1585 listDelNode(c->reply,ln);
1586 } else {
1587 if (copylen == 0) return;
1588 break;
1589 }
1590 }
1591 /* Now the output buffer is empty, add the new single element */
1592 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1593 listAddNodeHead(c->reply,o);
1594 }
1595
1596 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1597 redisClient *c = privdata;
1598 int nwritten = 0, totwritten = 0, objlen;
1599 robj *o;
1600 REDIS_NOTUSED(el);
1601 REDIS_NOTUSED(mask);
1602
1603 /* Use writev() if we have enough buffers to send */
1604 if (!server.glueoutputbuf &&
1605 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1606 !(c->flags & REDIS_MASTER))
1607 {
1608 sendReplyToClientWritev(el, fd, privdata, mask);
1609 return;
1610 }
1611
1612 while(listLength(c->reply)) {
1613 if (server.glueoutputbuf && listLength(c->reply) > 1)
1614 glueReplyBuffersIfNeeded(c);
1615
1616 o = listNodeValue(listFirst(c->reply));
1617 objlen = sdslen(o->ptr);
1618
1619 if (objlen == 0) {
1620 listDelNode(c->reply,listFirst(c->reply));
1621 continue;
1622 }
1623
1624 if (c->flags & REDIS_MASTER) {
1625 /* Don't reply to a master */
1626 nwritten = objlen - c->sentlen;
1627 } else {
1628 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1629 if (nwritten <= 0) break;
1630 }
1631 c->sentlen += nwritten;
1632 totwritten += nwritten;
1633 /* If we fully sent the object on head go to the next one */
1634 if (c->sentlen == objlen) {
1635 listDelNode(c->reply,listFirst(c->reply));
1636 c->sentlen = 0;
1637 }
1638 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1639 * bytes, in a single threaded server it's a good idea to serve
1640 * other clients as well, even if a very large request comes from
1641 * super fast link that is always able to accept data (in real world
1642 * scenario think about 'KEYS *' against the loopback interfae) */
1643 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1644 }
1645 if (nwritten == -1) {
1646 if (errno == EAGAIN) {
1647 nwritten = 0;
1648 } else {
1649 redisLog(REDIS_DEBUG,
1650 "Error writing to client: %s", strerror(errno));
1651 freeClient(c);
1652 return;
1653 }
1654 }
1655 if (totwritten > 0) c->lastinteraction = time(NULL);
1656 if (listLength(c->reply) == 0) {
1657 c->sentlen = 0;
1658 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1659 }
1660 }
1661
1662 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1663 {
1664 redisClient *c = privdata;
1665 int nwritten = 0, totwritten = 0, objlen, willwrite;
1666 robj *o;
1667 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1668 int offset, ion = 0;
1669 REDIS_NOTUSED(el);
1670 REDIS_NOTUSED(mask);
1671
1672 listNode *node;
1673 while (listLength(c->reply)) {
1674 offset = c->sentlen;
1675 ion = 0;
1676 willwrite = 0;
1677
1678 /* fill-in the iov[] array */
1679 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1680 o = listNodeValue(node);
1681 objlen = sdslen(o->ptr);
1682
1683 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1684 break;
1685
1686 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1687 break; /* no more iovecs */
1688
1689 iov[ion].iov_base = ((char*)o->ptr) + offset;
1690 iov[ion].iov_len = objlen - offset;
1691 willwrite += objlen - offset;
1692 offset = 0; /* just for the first item */
1693 ion++;
1694 }
1695
1696 if(willwrite == 0)
1697 break;
1698
1699 /* write all collected blocks at once */
1700 if((nwritten = writev(fd, iov, ion)) < 0) {
1701 if (errno != EAGAIN) {
1702 redisLog(REDIS_DEBUG,
1703 "Error writing to client: %s", strerror(errno));
1704 freeClient(c);
1705 return;
1706 }
1707 break;
1708 }
1709
1710 totwritten += nwritten;
1711 offset = c->sentlen;
1712
1713 /* remove written robjs from c->reply */
1714 while (nwritten && listLength(c->reply)) {
1715 o = listNodeValue(listFirst(c->reply));
1716 objlen = sdslen(o->ptr);
1717
1718 if(nwritten >= objlen - offset) {
1719 listDelNode(c->reply, listFirst(c->reply));
1720 nwritten -= objlen - offset;
1721 c->sentlen = 0;
1722 } else {
1723 /* partial write */
1724 c->sentlen += nwritten;
1725 break;
1726 }
1727 offset = 0;
1728 }
1729 }
1730
1731 if (totwritten > 0)
1732 c->lastinteraction = time(NULL);
1733
1734 if (listLength(c->reply) == 0) {
1735 c->sentlen = 0;
1736 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1737 }
1738 }
1739
1740 static struct redisCommand *lookupCommand(char *name) {
1741 int j = 0;
1742 while(cmdTable[j].name != NULL) {
1743 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1744 j++;
1745 }
1746 return NULL;
1747 }
1748
1749 /* resetClient prepare the client to process the next command */
1750 static void resetClient(redisClient *c) {
1751 freeClientArgv(c);
1752 c->bulklen = -1;
1753 c->multibulk = 0;
1754 }
1755
1756 /* Call() is the core of Redis execution of a command */
1757 static void call(redisClient *c, struct redisCommand *cmd) {
1758 long long dirty;
1759
1760 dirty = server.dirty;
1761 cmd->proc(c);
1762 if (server.appendonly && server.dirty-dirty)
1763 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1764 if (server.dirty-dirty && listLength(server.slaves))
1765 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1766 if (listLength(server.monitors))
1767 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1768 server.stat_numcommands++;
1769 }
1770
1771 /* If this function gets called we already read a whole
1772 * command, argments are in the client argv/argc fields.
1773 * processCommand() execute the command or prepare the
1774 * server for a bulk read from the client.
1775 *
1776 * If 1 is returned the client is still alive and valid and
1777 * and other operations can be performed by the caller. Otherwise
1778 * if 0 is returned the client was destroied (i.e. after QUIT). */
1779 static int processCommand(redisClient *c) {
1780 struct redisCommand *cmd;
1781
1782 /* Free some memory if needed (maxmemory setting) */
1783 if (server.maxmemory) freeMemoryIfNeeded();
1784
1785 /* Handle the multi bulk command type. This is an alternative protocol
1786 * supported by Redis in order to receive commands that are composed of
1787 * multiple binary-safe "bulk" arguments. The latency of processing is
1788 * a bit higher but this allows things like multi-sets, so if this
1789 * protocol is used only for MSET and similar commands this is a big win. */
1790 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1791 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1792 if (c->multibulk <= 0) {
1793 resetClient(c);
1794 return 1;
1795 } else {
1796 decrRefCount(c->argv[c->argc-1]);
1797 c->argc--;
1798 return 1;
1799 }
1800 } else if (c->multibulk) {
1801 if (c->bulklen == -1) {
1802 if (((char*)c->argv[0]->ptr)[0] != '$') {
1803 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1804 resetClient(c);
1805 return 1;
1806 } else {
1807 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1808 decrRefCount(c->argv[0]);
1809 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1810 c->argc--;
1811 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1812 resetClient(c);
1813 return 1;
1814 }
1815 c->argc--;
1816 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1817 return 1;
1818 }
1819 } else {
1820 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1821 c->mbargv[c->mbargc] = c->argv[0];
1822 c->mbargc++;
1823 c->argc--;
1824 c->multibulk--;
1825 if (c->multibulk == 0) {
1826 robj **auxargv;
1827 int auxargc;
1828
1829 /* Here we need to swap the multi-bulk argc/argv with the
1830 * normal argc/argv of the client structure. */
1831 auxargv = c->argv;
1832 c->argv = c->mbargv;
1833 c->mbargv = auxargv;
1834
1835 auxargc = c->argc;
1836 c->argc = c->mbargc;
1837 c->mbargc = auxargc;
1838
1839 /* We need to set bulklen to something different than -1
1840 * in order for the code below to process the command without
1841 * to try to read the last argument of a bulk command as
1842 * a special argument. */
1843 c->bulklen = 0;
1844 /* continue below and process the command */
1845 } else {
1846 c->bulklen = -1;
1847 return 1;
1848 }
1849 }
1850 }
1851 /* -- end of multi bulk commands processing -- */
1852
1853 /* The QUIT command is handled as a special case. Normal command
1854 * procs are unable to close the client connection safely */
1855 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1856 freeClient(c);
1857 return 0;
1858 }
1859 cmd = lookupCommand(c->argv[0]->ptr);
1860 if (!cmd) {
1861 addReplySds(c,
1862 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1863 (char*)c->argv[0]->ptr));
1864 resetClient(c);
1865 return 1;
1866 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1867 (c->argc < -cmd->arity)) {
1868 addReplySds(c,
1869 sdscatprintf(sdsempty(),
1870 "-ERR wrong number of arguments for '%s' command\r\n",
1871 cmd->name));
1872 resetClient(c);
1873 return 1;
1874 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1875 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1876 resetClient(c);
1877 return 1;
1878 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1879 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1880
1881 decrRefCount(c->argv[c->argc-1]);
1882 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1883 c->argc--;
1884 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1885 resetClient(c);
1886 return 1;
1887 }
1888 c->argc--;
1889 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1890 /* It is possible that the bulk read is already in the
1891 * buffer. Check this condition and handle it accordingly.
1892 * This is just a fast path, alternative to call processInputBuffer().
1893 * It's a good idea since the code is small and this condition
1894 * happens most of the times. */
1895 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1896 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1897 c->argc++;
1898 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1899 } else {
1900 return 1;
1901 }
1902 }
1903 /* Let's try to share objects on the command arguments vector */
1904 if (server.shareobjects) {
1905 int j;
1906 for(j = 1; j < c->argc; j++)
1907 c->argv[j] = tryObjectSharing(c->argv[j]);
1908 }
1909 /* Let's try to encode the bulk object to save space. */
1910 if (cmd->flags & REDIS_CMD_BULK)
1911 tryObjectEncoding(c->argv[c->argc-1]);
1912
1913 /* Check if the user is authenticated */
1914 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1915 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1916 resetClient(c);
1917 return 1;
1918 }
1919
1920 /* Exec the command */
1921 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1922 queueMultiCommand(c,cmd);
1923 addReply(c,shared.queued);
1924 } else {
1925 call(c,cmd);
1926 }
1927
1928 /* Prepare the client for the next command */
1929 if (c->flags & REDIS_CLOSE) {
1930 freeClient(c);
1931 return 0;
1932 }
1933 resetClient(c);
1934 return 1;
1935 }
1936
1937 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1938 listNode *ln;
1939 int outc = 0, j;
1940 robj **outv;
1941 /* (args*2)+1 is enough room for args, spaces, newlines */
1942 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1943
1944 if (argc <= REDIS_STATIC_ARGS) {
1945 outv = static_outv;
1946 } else {
1947 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1948 }
1949
1950 for (j = 0; j < argc; j++) {
1951 if (j != 0) outv[outc++] = shared.space;
1952 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1953 robj *lenobj;
1954
1955 lenobj = createObject(REDIS_STRING,
1956 sdscatprintf(sdsempty(),"%lu\r\n",
1957 (unsigned long) stringObjectLen(argv[j])));
1958 lenobj->refcount = 0;
1959 outv[outc++] = lenobj;
1960 }
1961 outv[outc++] = argv[j];
1962 }
1963 outv[outc++] = shared.crlf;
1964
1965 /* Increment all the refcounts at start and decrement at end in order to
1966 * be sure to free objects if there is no slave in a replication state
1967 * able to be feed with commands */
1968 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1969 listRewind(slaves);
1970 while((ln = listYield(slaves))) {
1971 redisClient *slave = ln->value;
1972
1973 /* Don't feed slaves that are still waiting for BGSAVE to start */
1974 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1975
1976 /* Feed all the other slaves, MONITORs and so on */
1977 if (slave->slaveseldb != dictid) {
1978 robj *selectcmd;
1979
1980 switch(dictid) {
1981 case 0: selectcmd = shared.select0; break;
1982 case 1: selectcmd = shared.select1; break;
1983 case 2: selectcmd = shared.select2; break;
1984 case 3: selectcmd = shared.select3; break;
1985 case 4: selectcmd = shared.select4; break;
1986 case 5: selectcmd = shared.select5; break;
1987 case 6: selectcmd = shared.select6; break;
1988 case 7: selectcmd = shared.select7; break;
1989 case 8: selectcmd = shared.select8; break;
1990 case 9: selectcmd = shared.select9; break;
1991 default:
1992 selectcmd = createObject(REDIS_STRING,
1993 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1994 selectcmd->refcount = 0;
1995 break;
1996 }
1997 addReply(slave,selectcmd);
1998 slave->slaveseldb = dictid;
1999 }
2000 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
2001 }
2002 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
2003 if (outv != static_outv) zfree(outv);
2004 }
2005
2006 static void processInputBuffer(redisClient *c) {
2007 again:
2008 /* Before to process the input buffer, make sure the client is not
2009 * waitig for a blocking operation such as BLPOP. Note that the first
2010 * iteration the client is never blocked, otherwise the processInputBuffer
2011 * would not be called at all, but after the execution of the first commands
2012 * in the input buffer the client may be blocked, and the "goto again"
2013 * will try to reiterate. The following line will make it return asap. */
2014 if (c->flags & REDIS_BLOCKED) return;
2015 if (c->bulklen == -1) {
2016 /* Read the first line of the query */
2017 char *p = strchr(c->querybuf,'\n');
2018 size_t querylen;
2019
2020 if (p) {
2021 sds query, *argv;
2022 int argc, j;
2023
2024 query = c->querybuf;
2025 c->querybuf = sdsempty();
2026 querylen = 1+(p-(query));
2027 if (sdslen(query) > querylen) {
2028 /* leave data after the first line of the query in the buffer */
2029 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2030 }
2031 *p = '\0'; /* remove "\n" */
2032 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2033 sdsupdatelen(query);
2034
2035 /* Now we can split the query in arguments */
2036 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2037 sdsfree(query);
2038
2039 if (c->argv) zfree(c->argv);
2040 c->argv = zmalloc(sizeof(robj*)*argc);
2041
2042 for (j = 0; j < argc; j++) {
2043 if (sdslen(argv[j])) {
2044 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2045 c->argc++;
2046 } else {
2047 sdsfree(argv[j]);
2048 }
2049 }
2050 zfree(argv);
2051 if (c->argc) {
2052 /* Execute the command. If the client is still valid
2053 * after processCommand() return and there is something
2054 * on the query buffer try to process the next command. */
2055 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2056 } else {
2057 /* Nothing to process, argc == 0. Just process the query
2058 * buffer if it's not empty or return to the caller */
2059 if (sdslen(c->querybuf)) goto again;
2060 }
2061 return;
2062 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2063 redisLog(REDIS_DEBUG, "Client protocol error");
2064 freeClient(c);
2065 return;
2066 }
2067 } else {
2068 /* Bulk read handling. Note that if we are at this point
2069 the client already sent a command terminated with a newline,
2070 we are reading the bulk data that is actually the last
2071 argument of the command. */
2072 int qbl = sdslen(c->querybuf);
2073
2074 if (c->bulklen <= qbl) {
2075 /* Copy everything but the final CRLF as final argument */
2076 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2077 c->argc++;
2078 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2079 /* Process the command. If the client is still valid after
2080 * the processing and there is more data in the buffer
2081 * try to parse it. */
2082 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2083 return;
2084 }
2085 }
2086 }
2087
2088 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2089 redisClient *c = (redisClient*) privdata;
2090 char buf[REDIS_IOBUF_LEN];
2091 int nread;
2092 REDIS_NOTUSED(el);
2093 REDIS_NOTUSED(mask);
2094
2095 nread = read(fd, buf, REDIS_IOBUF_LEN);
2096 if (nread == -1) {
2097 if (errno == EAGAIN) {
2098 nread = 0;
2099 } else {
2100 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2101 freeClient(c);
2102 return;
2103 }
2104 } else if (nread == 0) {
2105 redisLog(REDIS_DEBUG, "Client closed connection");
2106 freeClient(c);
2107 return;
2108 }
2109 if (nread) {
2110 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2111 c->lastinteraction = time(NULL);
2112 } else {
2113 return;
2114 }
2115 processInputBuffer(c);
2116 }
2117
2118 static int selectDb(redisClient *c, int id) {
2119 if (id < 0 || id >= server.dbnum)
2120 return REDIS_ERR;
2121 c->db = &server.db[id];
2122 return REDIS_OK;
2123 }
2124
2125 static void *dupClientReplyValue(void *o) {
2126 incrRefCount((robj*)o);
2127 return 0;
2128 }
2129
2130 static redisClient *createClient(int fd) {
2131 redisClient *c = zmalloc(sizeof(*c));
2132
2133 anetNonBlock(NULL,fd);
2134 anetTcpNoDelay(NULL,fd);
2135 if (!c) return NULL;
2136 selectDb(c,0);
2137 c->fd = fd;
2138 c->querybuf = sdsempty();
2139 c->argc = 0;
2140 c->argv = NULL;
2141 c->bulklen = -1;
2142 c->multibulk = 0;
2143 c->mbargc = 0;
2144 c->mbargv = NULL;
2145 c->sentlen = 0;
2146 c->flags = 0;
2147 c->lastinteraction = time(NULL);
2148 c->authenticated = 0;
2149 c->replstate = REDIS_REPL_NONE;
2150 c->reply = listCreate();
2151 c->blockingkeys = NULL;
2152 c->blockingkeysnum = 0;
2153 listSetFreeMethod(c->reply,decrRefCount);
2154 listSetDupMethod(c->reply,dupClientReplyValue);
2155 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2156 readQueryFromClient, c) == AE_ERR) {
2157 freeClient(c);
2158 return NULL;
2159 }
2160 listAddNodeTail(server.clients,c);
2161 initClientMultiState(c);
2162 return c;
2163 }
2164
2165 static void addReply(redisClient *c, robj *obj) {
2166 if (listLength(c->reply) == 0 &&
2167 (c->replstate == REDIS_REPL_NONE ||
2168 c->replstate == REDIS_REPL_ONLINE) &&
2169 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2170 sendReplyToClient, c) == AE_ERR) return;
2171 listAddNodeTail(c->reply,getDecodedObject(obj));
2172 }
2173
2174 static void addReplySds(redisClient *c, sds s) {
2175 robj *o = createObject(REDIS_STRING,s);
2176 addReply(c,o);
2177 decrRefCount(o);
2178 }
2179
2180 static void addReplyDouble(redisClient *c, double d) {
2181 char buf[128];
2182
2183 snprintf(buf,sizeof(buf),"%.17g",d);
2184 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2185 (unsigned long) strlen(buf),buf));
2186 }
2187
2188 static void addReplyBulkLen(redisClient *c, robj *obj) {
2189 size_t len;
2190
2191 if (obj->encoding == REDIS_ENCODING_RAW) {
2192 len = sdslen(obj->ptr);
2193 } else {
2194 long n = (long)obj->ptr;
2195
2196 /* Compute how many bytes will take this integer as a radix 10 string */
2197 len = 1;
2198 if (n < 0) {
2199 len++;
2200 n = -n;
2201 }
2202 while((n = n/10) != 0) {
2203 len++;
2204 }
2205 }
2206 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2207 }
2208
2209 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2210 int cport, cfd;
2211 char cip[128];
2212 redisClient *c;
2213 REDIS_NOTUSED(el);
2214 REDIS_NOTUSED(mask);
2215 REDIS_NOTUSED(privdata);
2216
2217 cfd = anetAccept(server.neterr, fd, cip, &cport);
2218 if (cfd == AE_ERR) {
2219 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2220 return;
2221 }
2222 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2223 if ((c = createClient(cfd)) == NULL) {
2224 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2225 close(cfd); /* May be already closed, just ingore errors */
2226 return;
2227 }
2228 /* If maxclient directive is set and this is one client more... close the
2229 * connection. Note that we create the client instead to check before
2230 * for this condition, since now the socket is already set in nonblocking
2231 * mode and we can send an error for free using the Kernel I/O */
2232 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2233 char *err = "-ERR max number of clients reached\r\n";
2234
2235 /* That's a best effort error message, don't check write errors */
2236 if (write(c->fd,err,strlen(err)) == -1) {
2237 /* Nothing to do, Just to avoid the warning... */
2238 }
2239 freeClient(c);
2240 return;
2241 }
2242 server.stat_numconnections++;
2243 }
2244
2245 /* ======================= Redis objects implementation ===================== */
2246
2247 static robj *createObject(int type, void *ptr) {
2248 robj *o;
2249
2250 if (listLength(server.objfreelist)) {
2251 listNode *head = listFirst(server.objfreelist);
2252 o = listNodeValue(head);
2253 listDelNode(server.objfreelist,head);
2254 } else {
2255 if (server.vm_enabled) {
2256 o = zmalloc(sizeof(*o));
2257 } else {
2258 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2259 }
2260 }
2261 o->type = type;
2262 o->encoding = REDIS_ENCODING_RAW;
2263 o->ptr = ptr;
2264 o->refcount = 1;
2265 if (server.vm_enabled) {
2266 o->vm.atime = server.unixtime;
2267 o->storage = REDIS_VM_MEMORY;
2268 }
2269 return o;
2270 }
2271
2272 static robj *createStringObject(char *ptr, size_t len) {
2273 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2274 }
2275
2276 static robj *createListObject(void) {
2277 list *l = listCreate();
2278
2279 listSetFreeMethod(l,decrRefCount);
2280 return createObject(REDIS_LIST,l);
2281 }
2282
2283 static robj *createSetObject(void) {
2284 dict *d = dictCreate(&setDictType,NULL);
2285 return createObject(REDIS_SET,d);
2286 }
2287
2288 static robj *createZsetObject(void) {
2289 zset *zs = zmalloc(sizeof(*zs));
2290
2291 zs->dict = dictCreate(&zsetDictType,NULL);
2292 zs->zsl = zslCreate();
2293 return createObject(REDIS_ZSET,zs);
2294 }
2295
2296 static void freeStringObject(robj *o) {
2297 if (o->encoding == REDIS_ENCODING_RAW) {
2298 sdsfree(o->ptr);
2299 }
2300 }
2301
2302 static void freeListObject(robj *o) {
2303 listRelease((list*) o->ptr);
2304 }
2305
2306 static void freeSetObject(robj *o) {
2307 dictRelease((dict*) o->ptr);
2308 }
2309
2310 static void freeZsetObject(robj *o) {
2311 zset *zs = o->ptr;
2312
2313 dictRelease(zs->dict);
2314 zslFree(zs->zsl);
2315 zfree(zs);
2316 }
2317
2318 static void freeHashObject(robj *o) {
2319 dictRelease((dict*) o->ptr);
2320 }
2321
2322 static void incrRefCount(robj *o) {
2323 o->refcount++;
2324 #ifdef DEBUG_REFCOUNT
2325 if (o->type == REDIS_STRING)
2326 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2327 #endif
2328 }
2329
2330 static void decrRefCount(void *obj) {
2331 robj *o = obj;
2332
2333 #ifdef DEBUG_REFCOUNT
2334 if (o->type == REDIS_STRING)
2335 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2336 #endif
2337 if (--(o->refcount) == 0) {
2338 switch(o->type) {
2339 case REDIS_STRING: freeStringObject(o); break;
2340 case REDIS_LIST: freeListObject(o); break;
2341 case REDIS_SET: freeSetObject(o); break;
2342 case REDIS_ZSET: freeZsetObject(o); break;
2343 case REDIS_HASH: freeHashObject(o); break;
2344 default: redisAssert(0 != 0); break;
2345 }
2346 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2347 !listAddNodeHead(server.objfreelist,o))
2348 zfree(o);
2349 }
2350 }
2351
2352 static robj *lookupKey(redisDb *db, robj *key) {
2353 dictEntry *de = dictFind(db->dict,key);
2354 if (de) {
2355 robj *o = dictGetEntryVal(de);
2356
2357 /* Update the access time of the key for the aging algorithm. */
2358 if (server.vm_enabled) o->vm.atime = server.unixtime;
2359 return o;
2360 } else {
2361 return NULL;
2362 }
2363 }
2364
2365 static robj *lookupKeyRead(redisDb *db, robj *key) {
2366 expireIfNeeded(db,key);
2367 return lookupKey(db,key);
2368 }
2369
2370 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2371 deleteIfVolatile(db,key);
2372 return lookupKey(db,key);
2373 }
2374
2375 static int deleteKey(redisDb *db, robj *key) {
2376 int retval;
2377
2378 /* We need to protect key from destruction: after the first dictDelete()
2379 * it may happen that 'key' is no longer valid if we don't increment
2380 * it's count. This may happen when we get the object reference directly
2381 * from the hash table with dictRandomKey() or dict iterators */
2382 incrRefCount(key);
2383 if (dictSize(db->expires)) dictDelete(db->expires,key);
2384 retval = dictDelete(db->dict,key);
2385 decrRefCount(key);
2386
2387 return retval == DICT_OK;
2388 }
2389
2390 /* Try to share an object against the shared objects pool */
2391 static robj *tryObjectSharing(robj *o) {
2392 struct dictEntry *de;
2393 unsigned long c;
2394
2395 if (o == NULL || server.shareobjects == 0) return o;
2396
2397 redisAssert(o->type == REDIS_STRING);
2398 de = dictFind(server.sharingpool,o);
2399 if (de) {
2400 robj *shared = dictGetEntryKey(de);
2401
2402 c = ((unsigned long) dictGetEntryVal(de))+1;
2403 dictGetEntryVal(de) = (void*) c;
2404 incrRefCount(shared);
2405 decrRefCount(o);
2406 return shared;
2407 } else {
2408 /* Here we are using a stream algorihtm: Every time an object is
2409 * shared we increment its count, everytime there is a miss we
2410 * recrement the counter of a random object. If this object reaches
2411 * zero we remove the object and put the current object instead. */
2412 if (dictSize(server.sharingpool) >=
2413 server.sharingpoolsize) {
2414 de = dictGetRandomKey(server.sharingpool);
2415 redisAssert(de != NULL);
2416 c = ((unsigned long) dictGetEntryVal(de))-1;
2417 dictGetEntryVal(de) = (void*) c;
2418 if (c == 0) {
2419 dictDelete(server.sharingpool,de->key);
2420 }
2421 } else {
2422 c = 0; /* If the pool is empty we want to add this object */
2423 }
2424 if (c == 0) {
2425 int retval;
2426
2427 retval = dictAdd(server.sharingpool,o,(void*)1);
2428 redisAssert(retval == DICT_OK);
2429 incrRefCount(o);
2430 }
2431 return o;
2432 }
2433 }
2434
2435 /* Check if the nul-terminated string 's' can be represented by a long
2436 * (that is, is a number that fits into long without any other space or
2437 * character before or after the digits).
2438 *
2439 * If so, the function returns REDIS_OK and *longval is set to the value
2440 * of the number. Otherwise REDIS_ERR is returned */
2441 static int isStringRepresentableAsLong(sds s, long *longval) {
2442 char buf[32], *endptr;
2443 long value;
2444 int slen;
2445
2446 value = strtol(s, &endptr, 10);
2447 if (endptr[0] != '\0') return REDIS_ERR;
2448 slen = snprintf(buf,32,"%ld",value);
2449
2450 /* If the number converted back into a string is not identical
2451 * then it's not possible to encode the string as integer */
2452 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2453 if (longval) *longval = value;
2454 return REDIS_OK;
2455 }
2456
2457 /* Try to encode a string object in order to save space */
2458 static int tryObjectEncoding(robj *o) {
2459 long value;
2460 sds s = o->ptr;
2461
2462 if (o->encoding != REDIS_ENCODING_RAW)
2463 return REDIS_ERR; /* Already encoded */
2464
2465 /* It's not save to encode shared objects: shared objects can be shared
2466 * everywhere in the "object space" of Redis. Encoded objects can only
2467 * appear as "values" (and not, for instance, as keys) */
2468 if (o->refcount > 1) return REDIS_ERR;
2469
2470 /* Currently we try to encode only strings */
2471 redisAssert(o->type == REDIS_STRING);
2472
2473 /* Check if we can represent this string as a long integer */
2474 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2475
2476 /* Ok, this object can be encoded */
2477 o->encoding = REDIS_ENCODING_INT;
2478 sdsfree(o->ptr);
2479 o->ptr = (void*) value;
2480 return REDIS_OK;
2481 }
2482
2483 /* Get a decoded version of an encoded object (returned as a new object).
2484 * If the object is already raw-encoded just increment the ref count. */
2485 static robj *getDecodedObject(robj *o) {
2486 robj *dec;
2487
2488 if (o->encoding == REDIS_ENCODING_RAW) {
2489 incrRefCount(o);
2490 return o;
2491 }
2492 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2493 char buf[32];
2494
2495 snprintf(buf,32,"%ld",(long)o->ptr);
2496 dec = createStringObject(buf,strlen(buf));
2497 return dec;
2498 } else {
2499 redisAssert(1 != 1);
2500 }
2501 }
2502
2503 /* Compare two string objects via strcmp() or alike.
2504 * Note that the objects may be integer-encoded. In such a case we
2505 * use snprintf() to get a string representation of the numbers on the stack
2506 * and compare the strings, it's much faster than calling getDecodedObject().
2507 *
2508 * Important note: if objects are not integer encoded, but binary-safe strings,
2509 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2510 * binary safe. */
2511 static int compareStringObjects(robj *a, robj *b) {
2512 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2513 char bufa[128], bufb[128], *astr, *bstr;
2514 int bothsds = 1;
2515
2516 if (a == b) return 0;
2517 if (a->encoding != REDIS_ENCODING_RAW) {
2518 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2519 astr = bufa;
2520 bothsds = 0;
2521 } else {
2522 astr = a->ptr;
2523 }
2524 if (b->encoding != REDIS_ENCODING_RAW) {
2525 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2526 bstr = bufb;
2527 bothsds = 0;
2528 } else {
2529 bstr = b->ptr;
2530 }
2531 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2532 }
2533
2534 static size_t stringObjectLen(robj *o) {
2535 redisAssert(o->type == REDIS_STRING);
2536 if (o->encoding == REDIS_ENCODING_RAW) {
2537 return sdslen(o->ptr);
2538 } else {
2539 char buf[32];
2540
2541 return snprintf(buf,32,"%ld",(long)o->ptr);
2542 }
2543 }
2544
2545 /*============================ RDB saving/loading =========================== */
2546
2547 static int rdbSaveType(FILE *fp, unsigned char type) {
2548 if (fwrite(&type,1,1,fp) == 0) return -1;
2549 return 0;
2550 }
2551
2552 static int rdbSaveTime(FILE *fp, time_t t) {
2553 int32_t t32 = (int32_t) t;
2554 if (fwrite(&t32,4,1,fp) == 0) return -1;
2555 return 0;
2556 }
2557
2558 /* check rdbLoadLen() comments for more info */
2559 static int rdbSaveLen(FILE *fp, uint32_t len) {
2560 unsigned char buf[2];
2561
2562 if (len < (1<<6)) {
2563 /* Save a 6 bit len */
2564 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2565 if (fwrite(buf,1,1,fp) == 0) return -1;
2566 } else if (len < (1<<14)) {
2567 /* Save a 14 bit len */
2568 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2569 buf[1] = len&0xFF;
2570 if (fwrite(buf,2,1,fp) == 0) return -1;
2571 } else {
2572 /* Save a 32 bit len */
2573 buf[0] = (REDIS_RDB_32BITLEN<<6);
2574 if (fwrite(buf,1,1,fp) == 0) return -1;
2575 len = htonl(len);
2576 if (fwrite(&len,4,1,fp) == 0) return -1;
2577 }
2578 return 0;
2579 }
2580
2581 /* String objects in the form "2391" "-100" without any space and with a
2582 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2583 * encoded as integers to save space */
2584 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2585 long long value;
2586 char *endptr, buf[32];
2587
2588 /* Check if it's possible to encode this value as a number */
2589 value = strtoll(s, &endptr, 10);
2590 if (endptr[0] != '\0') return 0;
2591 snprintf(buf,32,"%lld",value);
2592
2593 /* If the number converted back into a string is not identical
2594 * then it's not possible to encode the string as integer */
2595 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2596
2597 /* Finally check if it fits in our ranges */
2598 if (value >= -(1<<7) && value <= (1<<7)-1) {
2599 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2600 enc[1] = value&0xFF;
2601 return 2;
2602 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2603 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2604 enc[1] = value&0xFF;
2605 enc[2] = (value>>8)&0xFF;
2606 return 3;
2607 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2608 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2609 enc[1] = value&0xFF;
2610 enc[2] = (value>>8)&0xFF;
2611 enc[3] = (value>>16)&0xFF;
2612 enc[4] = (value>>24)&0xFF;
2613 return 5;
2614 } else {
2615 return 0;
2616 }
2617 }
2618
2619 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2620 unsigned int comprlen, outlen;
2621 unsigned char byte;
2622 void *out;
2623
2624 /* We require at least four bytes compression for this to be worth it */
2625 outlen = sdslen(obj->ptr)-4;
2626 if (outlen <= 0) return 0;
2627 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2628 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2629 if (comprlen == 0) {
2630 zfree(out);
2631 return 0;
2632 }
2633 /* Data compressed! Let's save it on disk */
2634 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2635 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2636 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2637 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2638 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2639 zfree(out);
2640 return comprlen;
2641
2642 writeerr:
2643 zfree(out);
2644 return -1;
2645 }
2646
2647 /* Save a string objet as [len][data] on disk. If the object is a string
2648 * representation of an integer value we try to safe it in a special form */
2649 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2650 size_t len;
2651 int enclen;
2652
2653 len = sdslen(obj->ptr);
2654
2655 /* Try integer encoding */
2656 if (len <= 11) {
2657 unsigned char buf[5];
2658 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2659 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2660 return 0;
2661 }
2662 }
2663
2664 /* Try LZF compression - under 20 bytes it's unable to compress even
2665 * aaaaaaaaaaaaaaaaaa so skip it */
2666 if (server.rdbcompression && len > 20) {
2667 int retval;
2668
2669 retval = rdbSaveLzfStringObject(fp,obj);
2670 if (retval == -1) return -1;
2671 if (retval > 0) return 0;
2672 /* retval == 0 means data can't be compressed, save the old way */
2673 }
2674
2675 /* Store verbatim */
2676 if (rdbSaveLen(fp,len) == -1) return -1;
2677 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2678 return 0;
2679 }
2680
2681 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2682 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2683 int retval;
2684
2685 obj = getDecodedObject(obj);
2686 retval = rdbSaveStringObjectRaw(fp,obj);
2687 decrRefCount(obj);
2688 return retval;
2689 }
2690
2691 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2692 * 8 bit integer specifing the length of the representation.
2693 * This 8 bit integer has special values in order to specify the following
2694 * conditions:
2695 * 253: not a number
2696 * 254: + inf
2697 * 255: - inf
2698 */
2699 static int rdbSaveDoubleValue(FILE *fp, double val) {
2700 unsigned char buf[128];
2701 int len;
2702
2703 if (isnan(val)) {
2704 buf[0] = 253;
2705 len = 1;
2706 } else if (!isfinite(val)) {
2707 len = 1;
2708 buf[0] = (val < 0) ? 255 : 254;
2709 } else {
2710 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2711 buf[0] = strlen((char*)buf+1);
2712 len = buf[0]+1;
2713 }
2714 if (fwrite(buf,len,1,fp) == 0) return -1;
2715 return 0;
2716 }
2717
2718 /* Save a Redis object. */
2719 static int rdbSaveObject(FILE *fp, robj *o) {
2720 if (o->type == REDIS_STRING) {
2721 /* Save a string value */
2722 if (rdbSaveStringObject(fp,o) == -1) return -1;
2723 } else if (o->type == REDIS_LIST) {
2724 /* Save a list value */
2725 list *list = o->ptr;
2726 listNode *ln;
2727
2728 listRewind(list);
2729 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2730 while((ln = listYield(list))) {
2731 robj *eleobj = listNodeValue(ln);
2732
2733 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2734 }
2735 } else if (o->type == REDIS_SET) {
2736 /* Save a set value */
2737 dict *set = o->ptr;
2738 dictIterator *di = dictGetIterator(set);
2739 dictEntry *de;
2740
2741 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2742 while((de = dictNext(di)) != NULL) {
2743 robj *eleobj = dictGetEntryKey(de);
2744
2745 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2746 }
2747 dictReleaseIterator(di);
2748 } else if (o->type == REDIS_ZSET) {
2749 /* Save a set value */
2750 zset *zs = o->ptr;
2751 dictIterator *di = dictGetIterator(zs->dict);
2752 dictEntry *de;
2753
2754 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2755 while((de = dictNext(di)) != NULL) {
2756 robj *eleobj = dictGetEntryKey(de);
2757 double *score = dictGetEntryVal(de);
2758
2759 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2760 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2761 }
2762 dictReleaseIterator(di);
2763 } else {
2764 redisAssert(0 != 0);
2765 }
2766 return 0;
2767 }
2768
2769 /* Return the length the object will have on disk if saved with
2770 * the rdbSaveObject() function. Currently we use a trick to get
2771 * this length with very little changes to the code. In the future
2772 * we could switch to a faster solution. */
2773 static off_t rdbSavedObjectLen(robj *o) {
2774 static FILE *fp = NULL;
2775
2776 if (fp == NULL) fp = fopen("/dev/null","w");
2777 assert(fp != NULL);
2778
2779 rewind(fp);
2780 assert(rdbSaveObject(fp,o) != 1);
2781 return ftello(fp);
2782 }
2783
2784 /* Return the number of pages required to save this object in the swap file */
2785 static off_t rdbSavedObjectPages(robj *o) {
2786 off_t bytes = rdbSavedObjectLen(o);
2787
2788 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2789 }
2790
2791 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2792 static int rdbSave(char *filename) {
2793 dictIterator *di = NULL;
2794 dictEntry *de;
2795 FILE *fp;
2796 char tmpfile[256];
2797 int j;
2798 time_t now = time(NULL);
2799
2800 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2801 fp = fopen(tmpfile,"w");
2802 if (!fp) {
2803 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2804 return REDIS_ERR;
2805 }
2806 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2807 for (j = 0; j < server.dbnum; j++) {
2808 redisDb *db = server.db+j;
2809 dict *d = db->dict;
2810 if (dictSize(d) == 0) continue;
2811 di = dictGetIterator(d);
2812 if (!di) {
2813 fclose(fp);
2814 return REDIS_ERR;
2815 }
2816
2817 /* Write the SELECT DB opcode */
2818 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2819 if (rdbSaveLen(fp,j) == -1) goto werr;
2820
2821 /* Iterate this DB writing every entry */
2822 while((de = dictNext(di)) != NULL) {
2823 robj *key = dictGetEntryKey(de);
2824 robj *o = dictGetEntryVal(de);
2825 time_t expiretime = getExpire(db,key);
2826
2827 /* Save the expire time */
2828 if (expiretime != -1) {
2829 /* If this key is already expired skip it */
2830 if (expiretime < now) continue;
2831 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2832 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2833 }
2834 /* Save the key and associated value */
2835 if (rdbSaveType(fp,o->type) == -1) goto werr;
2836 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2837 /* Save the actual value */
2838 if (rdbSaveObject(fp,o) == -1) goto werr;
2839 }
2840 dictReleaseIterator(di);
2841 }
2842 /* EOF opcode */
2843 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2844
2845 /* Make sure data will not remain on the OS's output buffers */
2846 fflush(fp);
2847 fsync(fileno(fp));
2848 fclose(fp);
2849
2850 /* Use RENAME to make sure the DB file is changed atomically only
2851 * if the generate DB file is ok. */
2852 if (rename(tmpfile,filename) == -1) {
2853 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2854 unlink(tmpfile);
2855 return REDIS_ERR;
2856 }
2857 redisLog(REDIS_NOTICE,"DB saved on disk");
2858 server.dirty = 0;
2859 server.lastsave = time(NULL);
2860 return REDIS_OK;
2861
2862 werr:
2863 fclose(fp);
2864 unlink(tmpfile);
2865 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2866 if (di) dictReleaseIterator(di);
2867 return REDIS_ERR;
2868 }
2869
2870 static int rdbSaveBackground(char *filename) {
2871 pid_t childpid;
2872
2873 if (server.bgsavechildpid != -1) return REDIS_ERR;
2874 if ((childpid = fork()) == 0) {
2875 /* Child */
2876 close(server.fd);
2877 if (rdbSave(filename) == REDIS_OK) {
2878 exit(0);
2879 } else {
2880 exit(1);
2881 }
2882 } else {
2883 /* Parent */
2884 if (childpid == -1) {
2885 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2886 strerror(errno));
2887 return REDIS_ERR;
2888 }
2889 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2890 server.bgsavechildpid = childpid;
2891 return REDIS_OK;
2892 }
2893 return REDIS_OK; /* unreached */
2894 }
2895
2896 static void rdbRemoveTempFile(pid_t childpid) {
2897 char tmpfile[256];
2898
2899 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2900 unlink(tmpfile);
2901 }
2902
2903 static int rdbLoadType(FILE *fp) {
2904 unsigned char type;
2905 if (fread(&type,1,1,fp) == 0) return -1;
2906 return type;
2907 }
2908
2909 static time_t rdbLoadTime(FILE *fp) {
2910 int32_t t32;
2911 if (fread(&t32,4,1,fp) == 0) return -1;
2912 return (time_t) t32;
2913 }
2914
2915 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2916 * of this file for a description of how this are stored on disk.
2917 *
2918 * isencoded is set to 1 if the readed length is not actually a length but
2919 * an "encoding type", check the above comments for more info */
2920 static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
2921 unsigned char buf[2];
2922 uint32_t len;
2923 int type;
2924
2925 if (isencoded) *isencoded = 0;
2926 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2927 type = (buf[0]&0xC0)>>6;
2928 if (type == REDIS_RDB_6BITLEN) {
2929 /* Read a 6 bit len */
2930 return buf[0]&0x3F;
2931 } else if (type == REDIS_RDB_ENCVAL) {
2932 /* Read a 6 bit len encoding type */
2933 if (isencoded) *isencoded = 1;
2934 return buf[0]&0x3F;
2935 } else if (type == REDIS_RDB_14BITLEN) {
2936 /* Read a 14 bit len */
2937 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2938 return ((buf[0]&0x3F)<<8)|buf[1];
2939 } else {
2940 /* Read a 32 bit len */
2941 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2942 return ntohl(len);
2943 }
2944 }
2945
2946 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2947 unsigned char enc[4];
2948 long long val;
2949
2950 if (enctype == REDIS_RDB_ENC_INT8) {
2951 if (fread(enc,1,1,fp) == 0) return NULL;
2952 val = (signed char)enc[0];
2953 } else if (enctype == REDIS_RDB_ENC_INT16) {
2954 uint16_t v;
2955 if (fread(enc,2,1,fp) == 0) return NULL;
2956 v = enc[0]|(enc[1]<<8);
2957 val = (int16_t)v;
2958 } else if (enctype == REDIS_RDB_ENC_INT32) {
2959 uint32_t v;
2960 if (fread(enc,4,1,fp) == 0) return NULL;
2961 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2962 val = (int32_t)v;
2963 } else {
2964 val = 0; /* anti-warning */
2965 redisAssert(0!=0);
2966 }
2967 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2968 }
2969
2970 static robj *rdbLoadLzfStringObject(FILE*fp) {
2971 unsigned int len, clen;
2972 unsigned char *c = NULL;
2973 sds val = NULL;
2974
2975 if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2976 if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2977 if ((c = zmalloc(clen)) == NULL) goto err;
2978 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2979 if (fread(c,clen,1,fp) == 0) goto err;
2980 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2981 zfree(c);
2982 return createObject(REDIS_STRING,val);
2983 err:
2984 zfree(c);
2985 sdsfree(val);
2986 return NULL;
2987 }
2988
2989 static robj *rdbLoadStringObject(FILE*fp) {
2990 int isencoded;
2991 uint32_t len;
2992 sds val;
2993
2994 len = rdbLoadLen(fp,&isencoded);
2995 if (isencoded) {
2996 switch(len) {
2997 case REDIS_RDB_ENC_INT8:
2998 case REDIS_RDB_ENC_INT16:
2999 case REDIS_RDB_ENC_INT32:
3000 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
3001 case REDIS_RDB_ENC_LZF:
3002 return tryObjectSharing(rdbLoadLzfStringObject(fp));
3003 default:
3004 redisAssert(0!=0);
3005 }
3006 }
3007
3008 if (len == REDIS_RDB_LENERR) return NULL;
3009 val = sdsnewlen(NULL,len);
3010 if (len && fread(val,len,1,fp) == 0) {
3011 sdsfree(val);
3012 return NULL;
3013 }
3014 return tryObjectSharing(createObject(REDIS_STRING,val));
3015 }
3016
3017 /* For information about double serialization check rdbSaveDoubleValue() */
3018 static int rdbLoadDoubleValue(FILE *fp, double *val) {
3019 char buf[128];
3020 unsigned char len;
3021
3022 if (fread(&len,1,1,fp) == 0) return -1;
3023 switch(len) {
3024 case 255: *val = R_NegInf; return 0;
3025 case 254: *val = R_PosInf; return 0;
3026 case 253: *val = R_Nan; return 0;
3027 default:
3028 if (fread(buf,len,1,fp) == 0) return -1;
3029 buf[len] = '\0';
3030 sscanf(buf, "%lg", val);
3031 return 0;
3032 }
3033 }
3034
3035 /* Load a Redis object of the specified type from the specified file.
3036 * On success a newly allocated object is returned, otherwise NULL. */
3037 static robj *rdbLoadObject(int type, FILE *fp) {
3038 robj *o;
3039
3040 if (type == REDIS_STRING) {
3041 /* Read string value */
3042 if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
3043 tryObjectEncoding(o);
3044 } else if (type == REDIS_LIST || type == REDIS_SET) {
3045 /* Read list/set value */
3046 uint32_t listlen;
3047
3048 if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3049 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3050 /* Load every single element of the list/set */
3051 while(listlen--) {
3052 robj *ele;
3053
3054 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3055 tryObjectEncoding(ele);
3056 if (type == REDIS_LIST) {
3057 listAddNodeTail((list*)o->ptr,ele);
3058 } else {
3059 dictAdd((dict*)o->ptr,ele,NULL);
3060 }
3061 }
3062 } else if (type == REDIS_ZSET) {
3063 /* Read list/set value */
3064 uint32_t zsetlen;
3065 zset *zs;
3066
3067 if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3068 o = createZsetObject();
3069 zs = o->ptr;
3070 /* Load every single element of the list/set */
3071 while(zsetlen--) {
3072 robj *ele;
3073 double *score = zmalloc(sizeof(double));
3074
3075 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3076 tryObjectEncoding(ele);
3077 if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
3078 dictAdd(zs->dict,ele,score);
3079 zslInsert(zs->zsl,*score,ele);
3080 incrRefCount(ele); /* added to skiplist */
3081 }
3082 } else {
3083 redisAssert(0 != 0);
3084 }
3085 return o;
3086 }
3087
3088 static int rdbLoad(char *filename) {
3089 FILE *fp;
3090 robj *keyobj = NULL;
3091 uint32_t dbid;
3092 int type, retval, rdbver;
3093 dict *d = server.db[0].dict;
3094 redisDb *db = server.db+0;
3095 char buf[1024];
3096 time_t expiretime = -1, now = time(NULL);
3097
3098 fp = fopen(filename,"r");
3099 if (!fp) return REDIS_ERR;
3100 if (fread(buf,9,1,fp) == 0) goto eoferr;
3101 buf[9] = '\0';
3102 if (memcmp(buf,"REDIS",5) != 0) {
3103 fclose(fp);
3104 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3105 return REDIS_ERR;
3106 }
3107 rdbver = atoi(buf+5);
3108 if (rdbver != 1) {
3109 fclose(fp);
3110 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3111 return REDIS_ERR;
3112 }
3113 while(1) {
3114 robj *o;
3115
3116 /* Read type. */
3117 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3118 if (type == REDIS_EXPIRETIME) {
3119 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3120 /* We read the time so we need to read the object type again */
3121 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3122 }
3123 if (type == REDIS_EOF) break;
3124 /* Handle SELECT DB opcode as a special case */
3125 if (type == REDIS_SELECTDB) {
3126 if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
3127 goto eoferr;
3128 if (dbid >= (unsigned)server.dbnum) {
3129 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3130 exit(1);
3131 }
3132 db = server.db+dbid;
3133 d = db->dict;
3134 continue;
3135 }
3136 /* Read key */
3137 if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
3138 /* Read value */
3139 if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
3140 /* Add the new object in the hash table */
3141 retval = dictAdd(d,keyobj,o);
3142 if (retval == DICT_ERR) {
3143 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3144 exit(1);
3145 }
3146 /* Set the expire time if needed */
3147 if (expiretime != -1) {
3148 setExpire(db,keyobj,expiretime);
3149 /* Delete this key if already expired */
3150 if (expiretime < now) deleteKey(db,keyobj);
3151 expiretime = -1;
3152 }
3153 keyobj = o = NULL;
3154 }
3155 fclose(fp);
3156 return REDIS_OK;
3157
3158 eoferr: /* unexpected end of file is handled here with a fatal exit */
3159 if (keyobj) decrRefCount(keyobj);
3160 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3161 exit(1);
3162 return REDIS_ERR; /* Just to avoid warning */
3163 }
3164
3165 /*================================== Commands =============================== */
3166
3167 static void authCommand(redisClient *c) {
3168 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3169 c->authenticated = 1;
3170 addReply(c,shared.ok);
3171 } else {
3172 c->authenticated = 0;
3173 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3174 }
3175 }
3176
3177 static void pingCommand(redisClient *c) {
3178 addReply(c,shared.pong);
3179 }
3180
3181 static void echoCommand(redisClient *c) {
3182 addReplyBulkLen(c,c->argv[1]);
3183 addReply(c,c->argv[1]);
3184 addReply(c,shared.crlf);
3185 }
3186
3187 /*=================================== Strings =============================== */
3188
3189 static void setGenericCommand(redisClient *c, int nx) {
3190 int retval;
3191
3192 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3193 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3194 if (retval == DICT_ERR) {
3195 if (!nx) {
3196 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3197 incrRefCount(c->argv[2]);
3198 } else {
3199 addReply(c,shared.czero);
3200 return;
3201 }
3202 } else {
3203 incrRefCount(c->argv[1]);
3204 incrRefCount(c->argv[2]);
3205 }
3206 server.dirty++;
3207 removeExpire(c->db,c->argv[1]);
3208 addReply(c, nx ? shared.cone : shared.ok);
3209 }
3210
3211 static void setCommand(redisClient *c) {
3212 setGenericCommand(c,0);
3213 }
3214
3215 static void setnxCommand(redisClient *c) {
3216 setGenericCommand(c,1);
3217 }
3218
3219 static int getGenericCommand(redisClient *c) {
3220 robj *o = lookupKeyRead(c->db,c->argv[1]);
3221
3222 if (o == NULL) {
3223 addReply(c,shared.nullbulk);
3224 return REDIS_OK;
3225 } else {
3226 if (o->type != REDIS_STRING) {
3227 addReply(c,shared.wrongtypeerr);
3228 return REDIS_ERR;
3229 } else {
3230 addReplyBulkLen(c,o);
3231 addReply(c,o);
3232 addReply(c,shared.crlf);
3233 return REDIS_OK;
3234 }
3235 }
3236 }
3237
3238 static void getCommand(redisClient *c) {
3239 getGenericCommand(c);
3240 }
3241
3242 static void getsetCommand(redisClient *c) {
3243 if (getGenericCommand(c) == REDIS_ERR) return;
3244 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3245 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3246 } else {
3247 incrRefCount(c->argv[1]);
3248 }
3249 incrRefCount(c->argv[2]);
3250 server.dirty++;
3251 removeExpire(c->db,c->argv[1]);
3252 }
3253
3254 static void mgetCommand(redisClient *c) {
3255 int j;
3256
3257 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3258 for (j = 1; j < c->argc; j++) {
3259 robj *o = lookupKeyRead(c->db,c->argv[j]);
3260 if (o == NULL) {
3261 addReply(c,shared.nullbulk);
3262 } else {
3263 if (o->type != REDIS_STRING) {
3264 addReply(c,shared.nullbulk);
3265 } else {
3266 addReplyBulkLen(c,o);
3267 addReply(c,o);
3268 addReply(c,shared.crlf);
3269 }
3270 }
3271 }
3272 }
3273
3274 static void msetGenericCommand(redisClient *c, int nx) {
3275 int j, busykeys = 0;
3276
3277 if ((c->argc % 2) == 0) {
3278 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3279 return;
3280 }
3281 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3282 * set nothing at all if at least one already key exists. */
3283 if (nx) {
3284 for (j = 1; j < c->argc; j += 2) {
3285 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3286 busykeys++;
3287 }
3288 }
3289 }
3290 if (busykeys) {
3291 addReply(c, shared.czero);
3292 return;
3293 }
3294
3295 for (j = 1; j < c->argc; j += 2) {
3296 int retval;
3297
3298 tryObjectEncoding(c->argv[j+1]);
3299 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3300 if (retval == DICT_ERR) {
3301 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3302 incrRefCount(c->argv[j+1]);
3303 } else {
3304 incrRefCount(c->argv[j]);
3305 incrRefCount(c->argv[j+1]);
3306 }
3307 removeExpire(c->db,c->argv[j]);
3308 }
3309 server.dirty += (c->argc-1)/2;
3310 addReply(c, nx ? shared.cone : shared.ok);
3311 }
3312
3313 static void msetCommand(redisClient *c) {
3314 msetGenericCommand(c,0);
3315 }
3316
3317 static void msetnxCommand(redisClient *c) {
3318 msetGenericCommand(c,1);
3319 }
3320
3321 static void incrDecrCommand(redisClient *c, long long incr) {
3322 long long value;
3323 int retval;
3324 robj *o;
3325
3326 o = lookupKeyWrite(c->db,c->argv[1]);
3327 if (o == NULL) {
3328 value = 0;
3329 } else {
3330 if (o->type != REDIS_STRING) {
3331 value = 0;
3332 } else {
3333 char *eptr;
3334
3335 if (o->encoding == REDIS_ENCODING_RAW)
3336 value = strtoll(o->ptr, &eptr, 10);
3337 else if (o->encoding == REDIS_ENCODING_INT)
3338 value = (long)o->ptr;
3339 else
3340 redisAssert(1 != 1);
3341 }
3342 }
3343
3344 value += incr;
3345 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3346 tryObjectEncoding(o);
3347 retval = dictAdd(c->db->dict,c->argv[1],o);
3348 if (retval == DICT_ERR) {
3349 dictReplace(c->db->dict,c->argv[1],o);
3350 removeExpire(c->db,c->argv[1]);
3351 } else {
3352 incrRefCount(c->argv[1]);
3353 }
3354 server.dirty++;
3355 addReply(c,shared.colon);
3356 addReply(c,o);
3357 addReply(c,shared.crlf);
3358 }
3359
3360 static void incrCommand(redisClient *c) {
3361 incrDecrCommand(c,1);
3362 }
3363
3364 static void decrCommand(redisClient *c) {
3365 incrDecrCommand(c,-1);
3366 }
3367
3368 static void incrbyCommand(redisClient *c) {
3369 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3370 incrDecrCommand(c,incr);
3371 }
3372
3373 static void decrbyCommand(redisClient *c) {
3374 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3375 incrDecrCommand(c,-incr);
3376 }
3377
3378 /* ========================= Type agnostic commands ========================= */
3379
3380 static void delCommand(redisClient *c) {
3381 int deleted = 0, j;
3382
3383 for (j = 1; j < c->argc; j++) {
3384 if (deleteKey(c->db,c->argv[j])) {
3385 server.dirty++;
3386 deleted++;
3387 }
3388 }
3389 switch(deleted) {
3390 case 0:
3391 addReply(c,shared.czero);
3392 break;
3393 case 1:
3394 addReply(c,shared.cone);
3395 break;
3396 default:
3397 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3398 break;
3399 }
3400 }
3401
3402 static void existsCommand(redisClient *c) {
3403 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3404 }
3405
3406 static void selectCommand(redisClient *c) {
3407 int id = atoi(c->argv[1]->ptr);
3408
3409 if (selectDb(c,id) == REDIS_ERR) {
3410 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3411 } else {
3412 addReply(c,shared.ok);
3413 }
3414 }
3415
3416 static void randomkeyCommand(redisClient *c) {
3417 dictEntry *de;
3418
3419 while(1) {
3420 de = dictGetRandomKey(c->db->dict);
3421 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3422 }
3423 if (de == NULL) {
3424 addReply(c,shared.plus);
3425 addReply(c,shared.crlf);
3426 } else {
3427 addReply(c,shared.plus);
3428 addReply(c,dictGetEntryKey(de));
3429 addReply(c,shared.crlf);
3430 }
3431 }
3432
3433 static void keysCommand(redisClient *c) {
3434 dictIterator *di;
3435 dictEntry *de;
3436 sds pattern = c->argv[1]->ptr;
3437 int plen = sdslen(pattern);
3438 unsigned long numkeys = 0, keyslen = 0;
3439 robj *lenobj = createObject(REDIS_STRING,NULL);
3440
3441 di = dictGetIterator(c->db->dict);
3442 addReply(c,lenobj);
3443 decrRefCount(lenobj);
3444 while((de = dictNext(di)) != NULL) {
3445 robj *keyobj = dictGetEntryKey(de);
3446
3447 sds key = keyobj->ptr;
3448 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3449 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3450 if (expireIfNeeded(c->db,keyobj) == 0) {
3451 if (numkeys != 0)
3452 addReply(c,shared.space);
3453 addReply(c,keyobj);
3454 numkeys++;
3455 keyslen += sdslen(key);
3456 }
3457 }
3458 }
3459 dictReleaseIterator(di);
3460 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3461 addReply(c,shared.crlf);
3462 }
3463
3464 static void dbsizeCommand(redisClient *c) {
3465 addReplySds(c,
3466 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3467 }
3468
3469 static void lastsaveCommand(redisClient *c) {
3470 addReplySds(c,
3471 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3472 }
3473
3474 static void typeCommand(redisClient *c) {
3475 robj *o;
3476 char *type;
3477
3478 o = lookupKeyRead(c->db,c->argv[1]);
3479 if (o == NULL) {
3480 type = "+none";
3481 } else {
3482 switch(o->type) {
3483 case REDIS_STRING: type = "+string"; break;
3484 case REDIS_LIST: type = "+list"; break;
3485 case REDIS_SET: type = "+set"; break;
3486 case REDIS_ZSET: type = "+zset"; break;
3487 default: type = "unknown"; break;
3488 }
3489 }
3490 addReplySds(c,sdsnew(type));
3491 addReply(c,shared.crlf);
3492 }
3493
3494 static void saveCommand(redisClient *c) {
3495 if (server.bgsavechildpid != -1) {
3496 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3497 return;
3498 }
3499 if (rdbSave(server.dbfilename) == REDIS_OK) {
3500 addReply(c,shared.ok);
3501 } else {
3502 addReply(c,shared.err);
3503 }
3504 }
3505
3506 static void bgsaveCommand(redisClient *c) {
3507 if (server.bgsavechildpid != -1) {
3508 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3509 return;
3510 }
3511 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3512 char *status = "+Background saving started\r\n";
3513 addReplySds(c,sdsnew(status));
3514 } else {
3515 addReply(c,shared.err);
3516 }
3517 }
3518
3519 static void shutdownCommand(redisClient *c) {
3520 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3521 /* Kill the saving child if there is a background saving in progress.
3522 We want to avoid race conditions, for instance our saving child may
3523 overwrite the synchronous saving did by SHUTDOWN. */
3524 if (server.bgsavechildpid != -1) {
3525 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3526 kill(server.bgsavechildpid,SIGKILL);
3527 rdbRemoveTempFile(server.bgsavechildpid);
3528 }
3529 if (server.appendonly) {
3530 /* Append only file: fsync() the AOF and exit */
3531 fsync(server.appendfd);
3532 exit(0);
3533 } else {
3534 /* Snapshotting. Perform a SYNC SAVE and exit */
3535 if (rdbSave(server.dbfilename) == REDIS_OK) {
3536 if (server.daemonize)
3537 unlink(server.pidfile);
3538 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3539 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3540 exit(0);
3541 } else {
3542 /* Ooops.. error saving! The best we can do is to continue operating.
3543 * Note that if there was a background saving process, in the next
3544 * cron() Redis will be notified that the background saving aborted,
3545 * handling special stuff like slaves pending for synchronization... */
3546 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3547 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3548 }
3549 }
3550 }
3551
3552 static void renameGenericCommand(redisClient *c, int nx) {
3553 robj *o;
3554
3555 /* To use the same key as src and dst is probably an error */
3556 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3557 addReply(c,shared.sameobjecterr);
3558 return;
3559 }
3560
3561 o = lookupKeyWrite(c->db,c->argv[1]);
3562 if (o == NULL) {
3563 addReply(c,shared.nokeyerr);
3564 return;
3565 }
3566 incrRefCount(o);
3567 deleteIfVolatile(c->db,c->argv[2]);
3568 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3569 if (nx) {
3570 decrRefCount(o);
3571 addReply(c,shared.czero);
3572 return;
3573 }
3574 dictReplace(c->db->dict,c->argv[2],o);
3575 } else {
3576 incrRefCount(c->argv[2]);
3577 }
3578 deleteKey(c->db,c->argv[1]);
3579 server.dirty++;
3580 addReply(c,nx ? shared.cone : shared.ok);
3581 }
3582
3583 static void renameCommand(redisClient *c) {
3584 renameGenericCommand(c,0);
3585 }
3586
3587 static void renamenxCommand(redisClient *c) {
3588 renameGenericCommand(c,1);
3589 }
3590
3591 static void moveCommand(redisClient *c) {
3592 robj *o;
3593 redisDb *src, *dst;
3594 int srcid;
3595
3596 /* Obtain source and target DB pointers */
3597 src = c->db;
3598 srcid = c->db->id;
3599 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3600 addReply(c,shared.outofrangeerr);
3601 return;
3602 }
3603 dst = c->db;
3604 selectDb(c,srcid); /* Back to the source DB */
3605
3606 /* If the user is moving using as target the same
3607 * DB as the source DB it is probably an error. */
3608 if (src == dst) {
3609 addReply(c,shared.sameobjecterr);
3610 return;
3611 }
3612
3613 /* Check if the element exists and get a reference */
3614 o = lookupKeyWrite(c->db,c->argv[1]);
3615 if (!o) {
3616 addReply(c,shared.czero);
3617 return;
3618 }
3619
3620 /* Try to add the element to the target DB */
3621 deleteIfVolatile(dst,c->argv[1]);
3622 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3623 addReply(c,shared.czero);
3624 return;
3625 }
3626 incrRefCount(c->argv[1]);
3627 incrRefCount(o);
3628
3629 /* OK! key moved, free the entry in the source DB */
3630 deleteKey(src,c->argv[1]);
3631 server.dirty++;
3632 addReply(c,shared.cone);
3633 }
3634
3635 /* =================================== Lists ================================ */
3636 static void pushGenericCommand(redisClient *c, int where) {
3637 robj *lobj;
3638 list *list;
3639
3640 lobj = lookupKeyWrite(c->db,c->argv[1]);
3641 if (lobj == NULL) {
3642 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3643 addReply(c,shared.ok);
3644 return;
3645 }
3646 lobj = createListObject();
3647 list = lobj->ptr;
3648 if (where == REDIS_HEAD) {
3649 listAddNodeHead(list,c->argv[2]);
3650 } else {
3651 listAddNodeTail(list,c->argv[2]);
3652 }
3653 dictAdd(c->db->dict,c->argv[1],lobj);
3654 incrRefCount(c->argv[1]);
3655 incrRefCount(c->argv[2]);
3656 } else {
3657 if (lobj->type != REDIS_LIST) {
3658 addReply(c,shared.wrongtypeerr);
3659 return;
3660 }
3661 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3662 addReply(c,shared.ok);
3663 return;
3664 }
3665 list = lobj->ptr;
3666 if (where == REDIS_HEAD) {
3667 listAddNodeHead(list,c->argv[2]);
3668 } else {
3669 listAddNodeTail(list,c->argv[2]);
3670 }
3671 incrRefCount(c->argv[2]);
3672 }
3673 server.dirty++;
3674 addReply(c,shared.ok);
3675 }
3676
3677 static void lpushCommand(redisClient *c) {
3678 pushGenericCommand(c,REDIS_HEAD);
3679 }
3680
3681 static void rpushCommand(redisClient *c) {
3682 pushGenericCommand(c,REDIS_TAIL);
3683 }
3684
3685 static void llenCommand(redisClient *c) {
3686 robj *o;
3687 list *l;
3688
3689 o = lookupKeyRead(c->db,c->argv[1]);
3690 if (o == NULL) {
3691 addReply(c,shared.czero);
3692 return;
3693 } else {
3694 if (o->type != REDIS_LIST) {
3695 addReply(c,shared.wrongtypeerr);
3696 } else {
3697 l = o->ptr;
3698 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3699 }
3700 }
3701 }
3702
3703 static void lindexCommand(redisClient *c) {
3704 robj *o;
3705 int index = atoi(c->argv[2]->ptr);
3706
3707 o = lookupKeyRead(c->db,c->argv[1]);
3708 if (o == NULL) {
3709 addReply(c,shared.nullbulk);
3710 } else {
3711 if (o->type != REDIS_LIST) {
3712 addReply(c,shared.wrongtypeerr);
3713 } else {
3714 list *list = o->ptr;
3715 listNode *ln;
3716
3717 ln = listIndex(list, index);
3718 if (ln == NULL) {
3719 addReply(c,shared.nullbulk);
3720 } else {
3721 robj *ele = listNodeValue(ln);
3722 addReplyBulkLen(c,ele);
3723 addReply(c,ele);
3724 addReply(c,shared.crlf);
3725 }
3726 }
3727 }
3728 }
3729
3730 static void lsetCommand(redisClient *c) {
3731 robj *o;
3732 int index = atoi(c->argv[2]->ptr);
3733
3734 o = lookupKeyWrite(c->db,c->argv[1]);
3735 if (o == NULL) {
3736 addReply(c,shared.nokeyerr);
3737 } else {
3738 if (o->type != REDIS_LIST) {
3739 addReply(c,shared.wrongtypeerr);
3740 } else {
3741 list *list = o->ptr;
3742 listNode *ln;
3743
3744 ln = listIndex(list, index);
3745 if (ln == NULL) {
3746 addReply(c,shared.outofrangeerr);
3747 } else {
3748 robj *ele = listNodeValue(ln);
3749
3750 decrRefCount(ele);
3751 listNodeValue(ln) = c->argv[3];
3752 incrRefCount(c->argv[3]);
3753 addReply(c,shared.ok);
3754 server.dirty++;
3755 }
3756 }
3757 }
3758 }
3759
3760 static void popGenericCommand(redisClient *c, int where) {
3761 robj *o;
3762
3763 o = lookupKeyWrite(c->db,c->argv[1]);
3764 if (o == NULL) {
3765 addReply(c,shared.nullbulk);
3766 } else {
3767 if (o->type != REDIS_LIST) {
3768 addReply(c,shared.wrongtypeerr);
3769 } else {
3770 list *list = o->ptr;
3771 listNode *ln;
3772
3773 if (where == REDIS_HEAD)
3774 ln = listFirst(list);
3775 else
3776 ln = listLast(list);
3777
3778 if (ln == NULL) {
3779 addReply(c,shared.nullbulk);
3780 } else {
3781 robj *ele = listNodeValue(ln);
3782 addReplyBulkLen(c,ele);
3783 addReply(c,ele);
3784 addReply(c,shared.crlf);
3785 listDelNode(list,ln);
3786 server.dirty++;
3787 }
3788 }
3789 }
3790 }
3791
3792 static void lpopCommand(redisClient *c) {
3793 popGenericCommand(c,REDIS_HEAD);
3794 }
3795
3796 static void rpopCommand(redisClient *c) {
3797 popGenericCommand(c,REDIS_TAIL);
3798 }
3799
3800 static void lrangeCommand(redisClient *c) {
3801 robj *o;
3802 int start = atoi(c->argv[2]->ptr);
3803 int end = atoi(c->argv[3]->ptr);
3804
3805 o = lookupKeyRead(c->db,c->argv[1]);
3806 if (o == NULL) {
3807 addReply(c,shared.nullmultibulk);
3808 } else {
3809 if (o->type != REDIS_LIST) {
3810 addReply(c,shared.wrongtypeerr);
3811 } else {
3812 list *list = o->ptr;
3813 listNode *ln;
3814 int llen = listLength(list);
3815 int rangelen, j;
3816 robj *ele;
3817
3818 /* convert negative indexes */
3819 if (start < 0) start = llen+start;
3820 if (end < 0) end = llen+end;
3821 if (start < 0) start = 0;
3822 if (end < 0) end = 0;
3823
3824 /* indexes sanity checks */
3825 if (start > end || start >= llen) {
3826 /* Out of range start or start > end result in empty list */
3827 addReply(c,shared.emptymultibulk);
3828 return;
3829 }
3830 if (end >= llen) end = llen-1;
3831 rangelen = (end-start)+1;
3832
3833 /* Return the result in form of a multi-bulk reply */
3834 ln = listIndex(list, start);
3835 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3836 for (j = 0; j < rangelen; j++) {
3837 ele = listNodeValue(ln);
3838 addReplyBulkLen(c,ele);
3839 addReply(c,ele);
3840 addReply(c,shared.crlf);
3841 ln = ln->next;
3842 }
3843 }
3844 }
3845 }
3846
3847 static void ltrimCommand(redisClient *c) {
3848 robj *o;
3849 int start = atoi(c->argv[2]->ptr);
3850 int end = atoi(c->argv[3]->ptr);
3851
3852 o = lookupKeyWrite(c->db,c->argv[1]);
3853 if (o == NULL) {
3854 addReply(c,shared.ok);
3855 } else {
3856 if (o->type != REDIS_LIST) {
3857 addReply(c,shared.wrongtypeerr);
3858 } else {
3859 list *list = o->ptr;
3860 listNode *ln;
3861 int llen = listLength(list);
3862 int j, ltrim, rtrim;
3863
3864 /* convert negative indexes */
3865 if (start < 0) start = llen+start;
3866 if (end < 0) end = llen+end;
3867 if (start < 0) start = 0;
3868 if (end < 0) end = 0;
3869
3870 /* indexes sanity checks */
3871 if (start > end || start >= llen) {
3872 /* Out of range start or start > end result in empty list */
3873 ltrim = llen;
3874 rtrim = 0;
3875 } else {
3876 if (end >= llen) end = llen-1;
3877 ltrim = start;
3878 rtrim = llen-end-1;
3879 }
3880
3881 /* Remove list elements to perform the trim */
3882 for (j = 0; j < ltrim; j++) {
3883 ln = listFirst(list);
3884 listDelNode(list,ln);
3885 }
3886 for (j = 0; j < rtrim; j++) {
3887 ln = listLast(list);
3888 listDelNode(list,ln);
3889 }
3890 server.dirty++;
3891 addReply(c,shared.ok);
3892 }
3893 }
3894 }
3895
3896 static void lremCommand(redisClient *c) {
3897 robj *o;
3898
3899 o = lookupKeyWrite(c->db,c->argv[1]);
3900 if (o == NULL) {
3901 addReply(c,shared.czero);
3902 } else {
3903 if (o->type != REDIS_LIST) {
3904 addReply(c,shared.wrongtypeerr);
3905 } else {
3906 list *list = o->ptr;
3907 listNode *ln, *next;
3908 int toremove = atoi(c->argv[2]->ptr);
3909 int removed = 0;
3910 int fromtail = 0;
3911
3912 if (toremove < 0) {
3913 toremove = -toremove;
3914 fromtail = 1;
3915 }
3916 ln = fromtail ? list->tail : list->head;
3917 while (ln) {
3918 robj *ele = listNodeValue(ln);
3919
3920 next = fromtail ? ln->prev : ln->next;
3921 if (compareStringObjects(ele,c->argv[3]) == 0) {
3922 listDelNode(list,ln);
3923 server.dirty++;
3924 removed++;
3925 if (toremove && removed == toremove) break;
3926 }
3927 ln = next;
3928 }
3929 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3930 }
3931 }
3932 }
3933
3934 /* This is the semantic of this command:
3935 * RPOPLPUSH srclist dstlist:
3936 * IF LLEN(srclist) > 0
3937 * element = RPOP srclist
3938 * LPUSH dstlist element
3939 * RETURN element
3940 * ELSE
3941 * RETURN nil
3942 * END
3943 * END
3944 *
3945 * The idea is to be able to get an element from a list in a reliable way
3946 * since the element is not just returned but pushed against another list
3947 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3948 */
3949 static void rpoplpushcommand(redisClient *c) {
3950 robj *sobj;
3951
3952 sobj = lookupKeyWrite(c->db,c->argv[1]);
3953 if (sobj == NULL) {
3954 addReply(c,shared.nullbulk);
3955 } else {
3956 if (sobj->type != REDIS_LIST) {
3957 addReply(c,shared.wrongtypeerr);
3958 } else {
3959 list *srclist = sobj->ptr;
3960 listNode *ln = listLast(srclist);
3961
3962 if (ln == NULL) {
3963 addReply(c,shared.nullbulk);
3964 } else {
3965 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3966 robj *ele = listNodeValue(ln);
3967 list *dstlist;
3968
3969 if (dobj && dobj->type != REDIS_LIST) {
3970 addReply(c,shared.wrongtypeerr);
3971 return;
3972 }
3973
3974 /* Add the element to the target list (unless it's directly
3975 * passed to some BLPOP-ing client */
3976 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3977 if (dobj == NULL) {
3978 /* Create the list if the key does not exist */
3979 dobj = createListObject();
3980 dictAdd(c->db->dict,c->argv[2],dobj);
3981 incrRefCount(c->argv[2]);
3982 }
3983 dstlist = dobj->ptr;
3984 listAddNodeHead(dstlist,ele);
3985 incrRefCount(ele);
3986 }
3987
3988 /* Send the element to the client as reply as well */
3989 addReplyBulkLen(c,ele);
3990 addReply(c,ele);
3991 addReply(c,shared.crlf);
3992
3993 /* Finally remove the element from the source list */
3994 listDelNode(srclist,ln);
3995 server.dirty++;
3996 }
3997 }
3998 }
3999 }
4000
4001
4002 /* ==================================== Sets ================================ */
4003
4004 static void saddCommand(redisClient *c) {
4005 robj *set;
4006
4007 set = lookupKeyWrite(c->db,c->argv[1]);
4008 if (set == NULL) {
4009 set = createSetObject();
4010 dictAdd(c->db->dict,c->argv[1],set);
4011 incrRefCount(c->argv[1]);
4012 } else {
4013 if (set->type != REDIS_SET) {
4014 addReply(c,shared.wrongtypeerr);
4015 return;
4016 }
4017 }
4018 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
4019 incrRefCount(c->argv[2]);
4020 server.dirty++;
4021 addReply(c,shared.cone);
4022 } else {
4023 addReply(c,shared.czero);
4024 }
4025 }
4026
4027 static void sremCommand(redisClient *c) {
4028 robj *set;
4029
4030 set = lookupKeyWrite(c->db,c->argv[1]);
4031 if (set == NULL) {
4032 addReply(c,shared.czero);
4033 } else {
4034 if (set->type != REDIS_SET) {
4035 addReply(c,shared.wrongtypeerr);
4036 return;
4037 }
4038 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4039 server.dirty++;
4040 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4041 addReply(c,shared.cone);
4042 } else {
4043 addReply(c,shared.czero);
4044 }
4045 }
4046 }
4047
4048 static void smoveCommand(redisClient *c) {
4049 robj *srcset, *dstset;
4050
4051 srcset = lookupKeyWrite(c->db,c->argv[1]);
4052 dstset = lookupKeyWrite(c->db,c->argv[2]);
4053
4054 /* If the source key does not exist return 0, if it's of the wrong type
4055 * raise an error */
4056 if (srcset == NULL || srcset->type != REDIS_SET) {
4057 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4058 return;
4059 }
4060 /* Error if the destination key is not a set as well */
4061 if (dstset && dstset->type != REDIS_SET) {
4062 addReply(c,shared.wrongtypeerr);
4063 return;
4064 }
4065 /* Remove the element from the source set */
4066 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4067 /* Key not found in the src set! return zero */
4068 addReply(c,shared.czero);
4069 return;
4070 }
4071 server.dirty++;
4072 /* Add the element to the destination set */
4073 if (!dstset) {
4074 dstset = createSetObject();
4075 dictAdd(c->db->dict,c->argv[2],dstset);
4076 incrRefCount(c->argv[2]);
4077 }
4078 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4079 incrRefCount(c->argv[3]);
4080 addReply(c,shared.cone);
4081 }
4082
4083 static void sismemberCommand(redisClient *c) {
4084 robj *set;
4085
4086 set = lookupKeyRead(c->db,c->argv[1]);
4087 if (set == NULL) {
4088 addReply(c,shared.czero);
4089 } else {
4090 if (set->type != REDIS_SET) {
4091 addReply(c,shared.wrongtypeerr);
4092 return;
4093 }
4094 if (dictFind(set->ptr,c->argv[2]))
4095 addReply(c,shared.cone);
4096 else
4097 addReply(c,shared.czero);
4098 }
4099 }
4100
4101 static void scardCommand(redisClient *c) {
4102 robj *o;
4103 dict *s;
4104
4105 o = lookupKeyRead(c->db,c->argv[1]);
4106 if (o == NULL) {
4107 addReply(c,shared.czero);
4108 return;
4109 } else {
4110 if (o->type != REDIS_SET) {
4111 addReply(c,shared.wrongtypeerr);
4112 } else {
4113 s = o->ptr;
4114 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4115 dictSize(s)));
4116 }
4117 }
4118 }
4119
4120 static void spopCommand(redisClient *c) {
4121 robj *set;
4122 dictEntry *de;
4123
4124 set = lookupKeyWrite(c->db,c->argv[1]);
4125 if (set == NULL) {
4126 addReply(c,shared.nullbulk);
4127 } else {
4128 if (set->type != REDIS_SET) {
4129 addReply(c,shared.wrongtypeerr);
4130 return;
4131 }
4132 de = dictGetRandomKey(set->ptr);
4133 if (de == NULL) {
4134 addReply(c,shared.nullbulk);
4135 } else {
4136 robj *ele = dictGetEntryKey(de);
4137
4138 addReplyBulkLen(c,ele);
4139 addReply(c,ele);
4140 addReply(c,shared.crlf);
4141 dictDelete(set->ptr,ele);
4142 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4143 server.dirty++;
4144 }
4145 }
4146 }
4147
4148 static void srandmemberCommand(redisClient *c) {
4149 robj *set;
4150 dictEntry *de;
4151
4152 set = lookupKeyRead(c->db,c->argv[1]);
4153 if (set == NULL) {
4154 addReply(c,shared.nullbulk);
4155 } else {
4156 if (set->type != REDIS_SET) {
4157 addReply(c,shared.wrongtypeerr);
4158 return;
4159 }
4160 de = dictGetRandomKey(set->ptr);
4161 if (de == NULL) {
4162 addReply(c,shared.nullbulk);
4163 } else {
4164 robj *ele = dictGetEntryKey(de);
4165
4166 addReplyBulkLen(c,ele);
4167 addReply(c,ele);
4168 addReply(c,shared.crlf);
4169 }
4170 }
4171 }
4172
4173 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4174 dict **d1 = (void*) s1, **d2 = (void*) s2;
4175
4176 return dictSize(*d1)-dictSize(*d2);
4177 }
4178
4179 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4180 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4181 dictIterator *di;
4182 dictEntry *de;
4183 robj *lenobj = NULL, *dstset = NULL;
4184 unsigned long j, cardinality = 0;
4185
4186 for (j = 0; j < setsnum; j++) {
4187 robj *setobj;
4188
4189 setobj = dstkey ?
4190 lookupKeyWrite(c->db,setskeys[j]) :
4191 lookupKeyRead(c->db,setskeys[j]);
4192 if (!setobj) {
4193 zfree(dv);
4194 if (dstkey) {
4195 if (deleteKey(c->db,dstkey))
4196 server.dirty++;
4197 addReply(c,shared.czero);
4198 } else {
4199 addReply(c,shared.nullmultibulk);
4200 }
4201 return;
4202 }
4203 if (setobj->type != REDIS_SET) {
4204 zfree(dv);
4205 addReply(c,shared.wrongtypeerr);
4206 return;
4207 }
4208 dv[j] = setobj->ptr;
4209 }
4210 /* Sort sets from the smallest to largest, this will improve our
4211 * algorithm's performace */
4212 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4213
4214 /* The first thing we should output is the total number of elements...
4215 * since this is a multi-bulk write, but at this stage we don't know
4216 * the intersection set size, so we use a trick, append an empty object
4217 * to the output list and save the pointer to later modify it with the
4218 * right length */
4219 if (!dstkey) {
4220 lenobj = createObject(REDIS_STRING,NULL);
4221 addReply(c,lenobj);
4222 decrRefCount(lenobj);
4223 } else {
4224 /* If we have a target key where to store the resulting set
4225 * create this key with an empty set inside */
4226 dstset = createSetObject();
4227 }
4228
4229 /* Iterate all the elements of the first (smallest) set, and test
4230 * the element against all the other sets, if at least one set does
4231 * not include the element it is discarded */
4232 di = dictGetIterator(dv[0]);
4233
4234 while((de = dictNext(di)) != NULL) {
4235 robj *ele;
4236
4237 for (j = 1; j < setsnum; j++)
4238 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4239 if (j != setsnum)
4240 continue; /* at least one set does not contain the member */
4241 ele = dictGetEntryKey(de);
4242 if (!dstkey) {
4243 addReplyBulkLen(c,ele);
4244 addReply(c,ele);
4245 addReply(c,shared.crlf);
4246 cardinality++;
4247 } else {
4248 dictAdd(dstset->ptr,ele,NULL);
4249 incrRefCount(ele);
4250 }
4251 }
4252 dictReleaseIterator(di);
4253
4254 if (dstkey) {
4255 /* Store the resulting set into the target */
4256 deleteKey(c->db,dstkey);
4257 dictAdd(c->db->dict,dstkey,dstset);
4258 incrRefCount(dstkey);
4259 }
4260
4261 if (!dstkey) {
4262 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4263 } else {
4264 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4265 dictSize((dict*)dstset->ptr)));
4266 server.dirty++;
4267 }
4268 zfree(dv);
4269 }
4270
4271 static void sinterCommand(redisClient *c) {
4272 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4273 }
4274
4275 static void sinterstoreCommand(redisClient *c) {
4276 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4277 }
4278
4279 #define REDIS_OP_UNION 0
4280 #define REDIS_OP_DIFF 1
4281
4282 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4283 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4284 dictIterator *di;
4285 dictEntry *de;
4286 robj *dstset = NULL;
4287 int j, cardinality = 0;
4288
4289 for (j = 0; j < setsnum; j++) {
4290 robj *setobj;
4291
4292 setobj = dstkey ?
4293 lookupKeyWrite(c->db,setskeys[j]) :
4294 lookupKeyRead(c->db,setskeys[j]);
4295 if (!setobj) {
4296 dv[j] = NULL;
4297 continue;
4298 }
4299 if (setobj->type != REDIS_SET) {
4300 zfree(dv);
4301 addReply(c,shared.wrongtypeerr);
4302 return;
4303 }
4304 dv[j] = setobj->ptr;
4305 }
4306
4307 /* We need a temp set object to store our union. If the dstkey
4308 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4309 * this set object will be the resulting object to set into the target key*/
4310 dstset = createSetObject();
4311
4312 /* Iterate all the elements of all the sets, add every element a single
4313 * time to the result set */
4314 for (j = 0; j < setsnum; j++) {
4315 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4316 if (!dv[j]) continue; /* non existing keys are like empty sets */
4317
4318 di = dictGetIterator(dv[j]);
4319
4320 while((de = dictNext(di)) != NULL) {
4321 robj *ele;
4322
4323 /* dictAdd will not add the same element multiple times */
4324 ele = dictGetEntryKey(de);
4325 if (op == REDIS_OP_UNION || j == 0) {
4326 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4327 incrRefCount(ele);
4328 cardinality++;
4329 }
4330 } else if (op == REDIS_OP_DIFF) {
4331 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4332 cardinality--;
4333 }
4334 }
4335 }
4336 dictReleaseIterator(di);
4337
4338 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4339 }
4340
4341 /* Output the content of the resulting set, if not in STORE mode */
4342 if (!dstkey) {
4343 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4344 di = dictGetIterator(dstset->ptr);
4345 while((de = dictNext(di)) != NULL) {
4346 robj *ele;
4347
4348 ele = dictGetEntryKey(de);
4349 addReplyBulkLen(c,ele);
4350 addReply(c,ele);
4351 addReply(c,shared.crlf);
4352 }
4353 dictReleaseIterator(di);
4354 } else {
4355 /* If we have a target key where to store the resulting set
4356 * create this key with the result set inside */
4357 deleteKey(c->db,dstkey);
4358 dictAdd(c->db->dict,dstkey,dstset);
4359 incrRefCount(dstkey);
4360 }
4361
4362 /* Cleanup */
4363 if (!dstkey) {
4364 decrRefCount(dstset);
4365 } else {
4366 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4367 dictSize((dict*)dstset->ptr)));
4368 server.dirty++;
4369 }
4370 zfree(dv);
4371 }
4372
4373 static void sunionCommand(redisClient *c) {
4374 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4375 }
4376
4377 static void sunionstoreCommand(redisClient *c) {
4378 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4379 }
4380
4381 static void sdiffCommand(redisClient *c) {
4382 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4383 }
4384
4385 static void sdiffstoreCommand(redisClient *c) {
4386 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4387 }
4388
4389 /* ==================================== ZSets =============================== */
4390
4391 /* ZSETs are ordered sets using two data structures to hold the same elements
4392 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4393 * data structure.
4394 *
4395 * The elements are added to an hash table mapping Redis objects to scores.
4396 * At the same time the elements are added to a skip list mapping scores
4397 * to Redis objects (so objects are sorted by scores in this "view"). */
4398
4399 /* This skiplist implementation is almost a C translation of the original
4400 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4401 * Alternative to Balanced Trees", modified in three ways:
4402 * a) this implementation allows for repeated values.
4403 * b) the comparison is not just by key (our 'score') but by satellite data.
4404 * c) there is a back pointer, so it's a doubly linked list with the back
4405 * pointers being only at "level 1". This allows to traverse the list
4406 * from tail to head, useful for ZREVRANGE. */
4407
4408 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4409 zskiplistNode *zn = zmalloc(sizeof(*zn));
4410
4411 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4412 zn->score = score;
4413 zn->obj = obj;
4414 return zn;
4415 }
4416
4417 static zskiplist *zslCreate(void) {
4418 int j;
4419 zskiplist *zsl;
4420
4421 zsl = zmalloc(sizeof(*zsl));
4422 zsl->level = 1;
4423 zsl->length = 0;
4424 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4425 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4426 zsl->header->forward[j] = NULL;
4427 zsl->header->backward = NULL;
4428 zsl->tail = NULL;
4429 return zsl;
4430 }
4431
4432 static void zslFreeNode(zskiplistNode *node) {
4433 decrRefCount(node->obj);
4434 zfree(node->forward);
4435 zfree(node);
4436 }
4437
4438 static void zslFree(zskiplist *zsl) {
4439 zskiplistNode *node = zsl->header->forward[0], *next;
4440
4441 zfree(zsl->header->forward);
4442 zfree(zsl->header);
4443 while(node) {
4444 next = node->forward[0];
4445 zslFreeNode(node);
4446 node = next;
4447 }
4448 zfree(zsl);
4449 }
4450
4451 static int zslRandomLevel(void) {
4452 int level = 1;
4453 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4454 level += 1;
4455 return level;
4456 }
4457
4458 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4459 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4460 int i, level;
4461
4462 x = zsl->header;
4463 for (i = zsl->level-1; i >= 0; i--) {
4464 while (x->forward[i] &&
4465 (x->forward[i]->score < score ||
4466 (x->forward[i]->score == score &&
4467 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4468 x = x->forward[i];
4469 update[i] = x;
4470 }
4471 /* we assume the key is not already inside, since we allow duplicated
4472 * scores, and the re-insertion of score and redis object should never
4473 * happpen since the caller of zslInsert() should test in the hash table
4474 * if the element is already inside or not. */
4475 level = zslRandomLevel();
4476 if (level > zsl->level) {
4477 for (i = zsl->level; i < level; i++)
4478 update[i] = zsl->header;
4479 zsl->level = level;
4480 }
4481 x = zslCreateNode(level,score,obj);
4482 for (i = 0; i < level; i++) {
4483 x->forward[i] = update[i]->forward[i];
4484 update[i]->forward[i] = x;
4485 }
4486 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4487 if (x->forward[0])
4488 x->forward[0]->backward = x;
4489 else
4490 zsl->tail = x;
4491 zsl->length++;
4492 }
4493
4494 /* Delete an element with matching score/object from the skiplist. */
4495 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4496 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4497 int i;
4498
4499 x = zsl->header;
4500 for (i = zsl->level-1; i >= 0; i--) {
4501 while (x->forward[i] &&
4502 (x->forward[i]->score < score ||
4503 (x->forward[i]->score == score &&
4504 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4505 x = x->forward[i];
4506 update[i] = x;
4507 }
4508 /* We may have multiple elements with the same score, what we need
4509 * is to find the element with both the right score and object. */
4510 x = x->forward[0];
4511 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4512 for (i = 0; i < zsl->level; i++) {
4513 if (update[i]->forward[i] != x) break;
4514 update[i]->forward[i] = x->forward[i];
4515 }
4516 if (x->forward[0]) {
4517 x->forward[0]->backward = (x->backward == zsl->header) ?
4518 NULL : x->backward;
4519 } else {
4520 zsl->tail = x->backward;
4521 }
4522 zslFreeNode(x);
4523 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4524 zsl->level--;
4525 zsl->length--;
4526 return 1;
4527 } else {
4528 return 0; /* not found */
4529 }
4530 return 0; /* not found */
4531 }
4532
4533 /* Delete all the elements with score between min and max from the skiplist.
4534 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4535 * Note that this function takes the reference to the hash table view of the
4536 * sorted set, in order to remove the elements from the hash table too. */
4537 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4538 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4539 unsigned long removed = 0;
4540 int i;
4541
4542 x = zsl->header;
4543 for (i = zsl->level-1; i >= 0; i--) {
4544 while (x->forward[i] && x->forward[i]->score < min)
4545 x = x->forward[i];
4546 update[i] = x;
4547 }
4548 /* We may have multiple elements with the same score, what we need
4549 * is to find the element with both the right score and object. */
4550 x = x->forward[0];
4551 while (x && x->score <= max) {
4552 zskiplistNode *next;
4553
4554 for (i = 0; i < zsl->level; i++) {
4555 if (update[i]->forward[i] != x) break;
4556 update[i]->forward[i] = x->forward[i];
4557 }
4558 if (x->forward[0]) {
4559 x->forward[0]->backward = (x->backward == zsl->header) ?
4560 NULL : x->backward;
4561 } else {
4562 zsl->tail = x->backward;
4563 }
4564 next = x->forward[0];
4565 dictDelete(dict,x->obj);
4566 zslFreeNode(x);
4567 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4568 zsl->level--;
4569 zsl->length--;
4570 removed++;
4571 x = next;
4572 }
4573 return removed; /* not found */
4574 }
4575
4576 /* Find the first node having a score equal or greater than the specified one.
4577 * Returns NULL if there is no match. */
4578 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4579 zskiplistNode *x;
4580 int i;
4581
4582 x = zsl->header;
4583 for (i = zsl->level-1; i >= 0; i--) {
4584 while (x->forward[i] && x->forward[i]->score < score)
4585 x = x->forward[i];
4586 }
4587 /* We may have multiple elements with the same score, what we need
4588 * is to find the element with both the right score and object. */
4589 return x->forward[0];
4590 }
4591
4592 /* The actual Z-commands implementations */
4593
4594 /* This generic command implements both ZADD and ZINCRBY.
4595 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4596 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4597 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4598 robj *zsetobj;
4599 zset *zs;
4600 double *score;
4601
4602 zsetobj = lookupKeyWrite(c->db,key);
4603 if (zsetobj == NULL) {
4604 zsetobj = createZsetObject();
4605 dictAdd(c->db->dict,key,zsetobj);
4606 incrRefCount(key);
4607 } else {
4608 if (zsetobj->type != REDIS_ZSET) {
4609 addReply(c,shared.wrongtypeerr);
4610 return;
4611 }
4612 }
4613 zs = zsetobj->ptr;
4614
4615 /* Ok now since we implement both ZADD and ZINCRBY here the code
4616 * needs to handle the two different conditions. It's all about setting
4617 * '*score', that is, the new score to set, to the right value. */
4618 score = zmalloc(sizeof(double));
4619 if (doincrement) {
4620 dictEntry *de;
4621
4622 /* Read the old score. If the element was not present starts from 0 */
4623 de = dictFind(zs->dict,ele);
4624 if (de) {
4625 double *oldscore = dictGetEntryVal(de);
4626 *score = *oldscore + scoreval;
4627 } else {
4628 *score = scoreval;
4629 }
4630 } else {
4631 *score = scoreval;
4632 }
4633
4634 /* What follows is a simple remove and re-insert operation that is common
4635 * to both ZADD and ZINCRBY... */
4636 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4637 /* case 1: New element */
4638 incrRefCount(ele); /* added to hash */
4639 zslInsert(zs->zsl,*score,ele);
4640 incrRefCount(ele); /* added to skiplist */
4641 server.dirty++;
4642 if (doincrement)
4643 addReplyDouble(c,*score);
4644 else
4645 addReply(c,shared.cone);
4646 } else {
4647 dictEntry *de;
4648 double *oldscore;
4649
4650 /* case 2: Score update operation */
4651 de = dictFind(zs->dict,ele);
4652 redisAssert(de != NULL);
4653 oldscore = dictGetEntryVal(de);
4654 if (*score != *oldscore) {
4655 int deleted;
4656
4657 /* Remove and insert the element in the skip list with new score */
4658 deleted = zslDelete(zs->zsl,*oldscore,ele);
4659 redisAssert(deleted != 0);
4660 zslInsert(zs->zsl,*score,ele);
4661 incrRefCount(ele);
4662 /* Update the score in the hash table */
4663 dictReplace(zs->dict,ele,score);
4664 server.dirty++;
4665 } else {
4666 zfree(score);
4667 }
4668 if (doincrement)
4669 addReplyDouble(c,*score);
4670 else
4671 addReply(c,shared.czero);
4672 }
4673 }
4674
4675 static void zaddCommand(redisClient *c) {
4676 double scoreval;
4677
4678 scoreval = strtod(c->argv[2]->ptr,NULL);
4679 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4680 }
4681
4682 static void zincrbyCommand(redisClient *c) {
4683 double scoreval;
4684
4685 scoreval = strtod(c->argv[2]->ptr,NULL);
4686 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4687 }
4688
4689 static void zremCommand(redisClient *c) {
4690 robj *zsetobj;
4691 zset *zs;
4692
4693 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4694 if (zsetobj == NULL) {
4695 addReply(c,shared.czero);
4696 } else {
4697 dictEntry *de;
4698 double *oldscore;
4699 int deleted;
4700
4701 if (zsetobj->type != REDIS_ZSET) {
4702 addReply(c,shared.wrongtypeerr);
4703 return;
4704 }
4705 zs = zsetobj->ptr;
4706 de = dictFind(zs->dict,c->argv[2]);
4707 if (de == NULL) {
4708 addReply(c,shared.czero);
4709 return;
4710 }
4711 /* Delete from the skiplist */
4712 oldscore = dictGetEntryVal(de);
4713 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4714 redisAssert(deleted != 0);
4715
4716 /* Delete from the hash table */
4717 dictDelete(zs->dict,c->argv[2]);
4718 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4719 server.dirty++;
4720 addReply(c,shared.cone);
4721 }
4722 }
4723
4724 static void zremrangebyscoreCommand(redisClient *c) {
4725 double min = strtod(c->argv[2]->ptr,NULL);
4726 double max = strtod(c->argv[3]->ptr,NULL);
4727 robj *zsetobj;
4728 zset *zs;
4729
4730 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4731 if (zsetobj == NULL) {
4732 addReply(c,shared.czero);
4733 } else {
4734 long deleted;
4735
4736 if (zsetobj->type != REDIS_ZSET) {
4737 addReply(c,shared.wrongtypeerr);
4738 return;
4739 }
4740 zs = zsetobj->ptr;
4741 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4742 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4743 server.dirty += deleted;
4744 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4745 }
4746 }
4747
4748 static void zrangeGenericCommand(redisClient *c, int reverse) {
4749 robj *o;
4750 int start = atoi(c->argv[2]->ptr);
4751 int end = atoi(c->argv[3]->ptr);
4752 int withscores = 0;
4753
4754 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4755 withscores = 1;
4756 } else if (c->argc >= 5) {
4757 addReply(c,shared.syntaxerr);
4758 return;
4759 }
4760
4761 o = lookupKeyRead(c->db,c->argv[1]);
4762 if (o == NULL) {
4763 addReply(c,shared.nullmultibulk);
4764 } else {
4765 if (o->type != REDIS_ZSET) {
4766 addReply(c,shared.wrongtypeerr);
4767 } else {
4768 zset *zsetobj = o->ptr;
4769 zskiplist *zsl = zsetobj->zsl;
4770 zskiplistNode *ln;
4771
4772 int llen = zsl->length;
4773 int rangelen, j;
4774 robj *ele;
4775
4776 /* convert negative indexes */
4777 if (start < 0) start = llen+start;
4778 if (end < 0) end = llen+end;
4779 if (start < 0) start = 0;
4780 if (end < 0) end = 0;
4781
4782 /* indexes sanity checks */
4783 if (start > end || start >= llen) {
4784 /* Out of range start or start > end result in empty list */
4785 addReply(c,shared.emptymultibulk);
4786 return;
4787 }
4788 if (end >= llen) end = llen-1;
4789 rangelen = (end-start)+1;
4790
4791 /* Return the result in form of a multi-bulk reply */
4792 if (reverse) {
4793 ln = zsl->tail;
4794 while (start--)
4795 ln = ln->backward;
4796 } else {
4797 ln = zsl->header->forward[0];
4798 while (start--)
4799 ln = ln->forward[0];
4800 }
4801
4802 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4803 withscores ? (rangelen*2) : rangelen));
4804 for (j = 0; j < rangelen; j++) {
4805 ele = ln->obj;
4806 addReplyBulkLen(c,ele);
4807 addReply(c,ele);
4808 addReply(c,shared.crlf);
4809 if (withscores)
4810 addReplyDouble(c,ln->score);
4811 ln = reverse ? ln->backward : ln->forward[0];
4812 }
4813 }
4814 }
4815 }
4816
4817 static void zrangeCommand(redisClient *c) {
4818 zrangeGenericCommand(c,0);
4819 }
4820
4821 static void zrevrangeCommand(redisClient *c) {
4822 zrangeGenericCommand(c,1);
4823 }
4824
4825 static void zrangebyscoreCommand(redisClient *c) {
4826 robj *o;
4827 double min = strtod(c->argv[2]->ptr,NULL);
4828 double max = strtod(c->argv[3]->ptr,NULL);
4829 int offset = 0, limit = -1;
4830
4831 if (c->argc != 4 && c->argc != 7) {
4832 addReplySds(c,
4833 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4834 return;
4835 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4836 addReply(c,shared.syntaxerr);
4837 return;
4838 } else if (c->argc == 7) {
4839 offset = atoi(c->argv[5]->ptr);
4840 limit = atoi(c->argv[6]->ptr);
4841 if (offset < 0) offset = 0;
4842 }
4843
4844 o = lookupKeyRead(c->db,c->argv[1]);
4845 if (o == NULL) {
4846 addReply(c,shared.nullmultibulk);
4847 } else {
4848 if (o->type != REDIS_ZSET) {
4849 addReply(c,shared.wrongtypeerr);
4850 } else {
4851 zset *zsetobj = o->ptr;
4852 zskiplist *zsl = zsetobj->zsl;
4853 zskiplistNode *ln;
4854 robj *ele, *lenobj;
4855 unsigned int rangelen = 0;
4856
4857 /* Get the first node with the score >= min */
4858 ln = zslFirstWithScore(zsl,min);
4859 if (ln == NULL) {
4860 /* No element matching the speciifed interval */
4861 addReply(c,shared.emptymultibulk);
4862 return;
4863 }
4864
4865 /* We don't know in advance how many matching elements there
4866 * are in the list, so we push this object that will represent
4867 * the multi-bulk length in the output buffer, and will "fix"
4868 * it later */
4869 lenobj = createObject(REDIS_STRING,NULL);
4870 addReply(c,lenobj);
4871 decrRefCount(lenobj);
4872
4873 while(ln && ln->score <= max) {
4874 if (offset) {
4875 offset--;
4876 ln = ln->forward[0];
4877 continue;
4878 }
4879 if (limit == 0) break;
4880 ele = ln->obj;
4881 addReplyBulkLen(c,ele);
4882 addReply(c,ele);
4883 addReply(c,shared.crlf);
4884 ln = ln->forward[0];
4885 rangelen++;
4886 if (limit > 0) limit--;
4887 }
4888 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4889 }
4890 }
4891 }
4892
4893 static void zcardCommand(redisClient *c) {
4894 robj *o;
4895 zset *zs;
4896
4897 o = lookupKeyRead(c->db,c->argv[1]);
4898 if (o == NULL) {
4899 addReply(c,shared.czero);
4900 return;
4901 } else {
4902 if (o->type != REDIS_ZSET) {
4903 addReply(c,shared.wrongtypeerr);
4904 } else {
4905 zs = o->ptr;
4906 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4907 }
4908 }
4909 }
4910
4911 static void zscoreCommand(redisClient *c) {
4912 robj *o;
4913 zset *zs;
4914
4915 o = lookupKeyRead(c->db,c->argv[1]);
4916 if (o == NULL) {
4917 addReply(c,shared.nullbulk);
4918 return;
4919 } else {
4920 if (o->type != REDIS_ZSET) {
4921 addReply(c,shared.wrongtypeerr);
4922 } else {
4923 dictEntry *de;
4924
4925 zs = o->ptr;
4926 de = dictFind(zs->dict,c->argv[2]);
4927 if (!de) {
4928 addReply(c,shared.nullbulk);
4929 } else {
4930 double *score = dictGetEntryVal(de);
4931
4932 addReplyDouble(c,*score);
4933 }
4934 }
4935 }
4936 }
4937
4938 /* ========================= Non type-specific commands ==================== */
4939
4940 static void flushdbCommand(redisClient *c) {
4941 server.dirty += dictSize(c->db->dict);
4942 dictEmpty(c->db->dict);
4943 dictEmpty(c->db->expires);
4944 addReply(c,shared.ok);
4945 }
4946
4947 static void flushallCommand(redisClient *c) {
4948 server.dirty += emptyDb();
4949 addReply(c,shared.ok);
4950 rdbSave(server.dbfilename);
4951 server.dirty++;
4952 }
4953
4954 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4955 redisSortOperation *so = zmalloc(sizeof(*so));
4956 so->type = type;
4957 so->pattern = pattern;
4958 return so;
4959 }
4960
4961 /* Return the value associated to the key with a name obtained
4962 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4963 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4964 char *p;
4965 sds spat, ssub;
4966 robj keyobj;
4967 int prefixlen, sublen, postfixlen;
4968 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4969 struct {
4970 long len;
4971 long free;
4972 char buf[REDIS_SORTKEY_MAX+1];
4973 } keyname;
4974
4975 /* If the pattern is "#" return the substitution object itself in order
4976 * to implement the "SORT ... GET #" feature. */
4977 spat = pattern->ptr;
4978 if (spat[0] == '#' && spat[1] == '\0') {
4979 return subst;
4980 }
4981
4982 /* The substitution object may be specially encoded. If so we create
4983 * a decoded object on the fly. Otherwise getDecodedObject will just
4984 * increment the ref count, that we'll decrement later. */
4985 subst = getDecodedObject(subst);
4986
4987 ssub = subst->ptr;
4988 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4989 p = strchr(spat,'*');
4990 if (!p) {
4991 decrRefCount(subst);
4992 return NULL;
4993 }
4994
4995 prefixlen = p-spat;
4996 sublen = sdslen(ssub);
4997 postfixlen = sdslen(spat)-(prefixlen+1);
4998 memcpy(keyname.buf,spat,prefixlen);
4999 memcpy(keyname.buf+prefixlen,ssub,sublen);
5000 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
5001 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
5002 keyname.len = prefixlen+sublen+postfixlen;
5003
5004 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
5005 decrRefCount(subst);
5006
5007 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5008 return lookupKeyRead(db,&keyobj);
5009 }
5010
5011 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5012 * the additional parameter is not standard but a BSD-specific we have to
5013 * pass sorting parameters via the global 'server' structure */
5014 static int sortCompare(const void *s1, const void *s2) {
5015 const redisSortObject *so1 = s1, *so2 = s2;
5016 int cmp;
5017
5018 if (!server.sort_alpha) {
5019 /* Numeric sorting. Here it's trivial as we precomputed scores */
5020 if (so1->u.score > so2->u.score) {
5021 cmp = 1;
5022 } else if (so1->u.score < so2->u.score) {
5023 cmp = -1;
5024 } else {
5025 cmp = 0;
5026 }
5027 } else {
5028 /* Alphanumeric sorting */
5029 if (server.sort_bypattern) {
5030 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5031 /* At least one compare object is NULL */
5032 if (so1->u.cmpobj == so2->u.cmpobj)
5033 cmp = 0;
5034 else if (so1->u.cmpobj == NULL)
5035 cmp = -1;
5036 else
5037 cmp = 1;
5038 } else {
5039 /* We have both the objects, use strcoll */
5040 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5041 }
5042 } else {
5043 /* Compare elements directly */
5044 robj *dec1, *dec2;
5045
5046 dec1 = getDecodedObject(so1->obj);
5047 dec2 = getDecodedObject(so2->obj);
5048 cmp = strcoll(dec1->ptr,dec2->ptr);
5049 decrRefCount(dec1);
5050 decrRefCount(dec2);
5051 }
5052 }
5053 return server.sort_desc ? -cmp : cmp;
5054 }
5055
5056 /* The SORT command is the most complex command in Redis. Warning: this code
5057 * is optimized for speed and a bit less for readability */
5058 static void sortCommand(redisClient *c) {
5059 list *operations;
5060 int outputlen = 0;
5061 int desc = 0, alpha = 0;
5062 int limit_start = 0, limit_count = -1, start, end;
5063 int j, dontsort = 0, vectorlen;
5064 int getop = 0; /* GET operation counter */
5065 robj *sortval, *sortby = NULL, *storekey = NULL;
5066 redisSortObject *vector; /* Resulting vector to sort */
5067
5068 /* Lookup the key to sort. It must be of the right types */
5069 sortval = lookupKeyRead(c->db,c->argv[1]);
5070 if (sortval == NULL) {
5071 addReply(c,shared.nullmultibulk);
5072 return;
5073 }
5074 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5075 sortval->type != REDIS_ZSET)
5076 {
5077 addReply(c,shared.wrongtypeerr);
5078 return;
5079 }
5080
5081 /* Create a list of operations to perform for every sorted element.
5082 * Operations can be GET/DEL/INCR/DECR */
5083 operations = listCreate();
5084 listSetFreeMethod(operations,zfree);
5085 j = 2;
5086
5087 /* Now we need to protect sortval incrementing its count, in the future
5088 * SORT may have options able to overwrite/delete keys during the sorting
5089 * and the sorted key itself may get destroied */
5090 incrRefCount(sortval);
5091
5092 /* The SORT command has an SQL-alike syntax, parse it */
5093 while(j < c->argc) {
5094 int leftargs = c->argc-j-1;
5095 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5096 desc = 0;
5097 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5098 desc = 1;
5099 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5100 alpha = 1;
5101 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5102 limit_start = atoi(c->argv[j+1]->ptr);
5103 limit_count = atoi(c->argv[j+2]->ptr);
5104 j+=2;
5105 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5106 storekey = c->argv[j+1];
5107 j++;
5108 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5109 sortby = c->argv[j+1];
5110 /* If the BY pattern does not contain '*', i.e. it is constant,
5111 * we don't need to sort nor to lookup the weight keys. */
5112 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5113 j++;
5114 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5115 listAddNodeTail(operations,createSortOperation(
5116 REDIS_SORT_GET,c->argv[j+1]));
5117 getop++;
5118 j++;
5119 } else {
5120 decrRefCount(sortval);
5121 listRelease(operations);
5122 addReply(c,shared.syntaxerr);
5123 return;
5124 }
5125 j++;
5126 }
5127
5128 /* Load the sorting vector with all the objects to sort */
5129 switch(sortval->type) {
5130 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5131 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5132 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5133 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5134 }
5135 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5136 j = 0;
5137
5138 if (sortval->type == REDIS_LIST) {
5139 list *list = sortval->ptr;
5140 listNode *ln;
5141
5142 listRewind(list);
5143 while((ln = listYield(list))) {
5144 robj *ele = ln->value;
5145 vector[j].obj = ele;
5146 vector[j].u.score = 0;
5147 vector[j].u.cmpobj = NULL;
5148 j++;
5149 }
5150 } else {
5151 dict *set;
5152 dictIterator *di;
5153 dictEntry *setele;
5154
5155 if (sortval->type == REDIS_SET) {
5156 set = sortval->ptr;
5157 } else {
5158 zset *zs = sortval->ptr;
5159 set = zs->dict;
5160 }
5161
5162 di = dictGetIterator(set);
5163 while((setele = dictNext(di)) != NULL) {
5164 vector[j].obj = dictGetEntryKey(setele);
5165 vector[j].u.score = 0;
5166 vector[j].u.cmpobj = NULL;
5167 j++;
5168 }
5169 dictReleaseIterator(di);
5170 }
5171 redisAssert(j == vectorlen);
5172
5173 /* Now it's time to load the right scores in the sorting vector */
5174 if (dontsort == 0) {
5175 for (j = 0; j < vectorlen; j++) {
5176 if (sortby) {
5177 robj *byval;
5178
5179 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5180 if (!byval || byval->type != REDIS_STRING) continue;
5181 if (alpha) {
5182 vector[j].u.cmpobj = getDecodedObject(byval);
5183 } else {
5184 if (byval->encoding == REDIS_ENCODING_RAW) {
5185 vector[j].u.score = strtod(byval->ptr,NULL);
5186 } else {
5187 /* Don't need to decode the object if it's
5188 * integer-encoded (the only encoding supported) so
5189 * far. We can just cast it */
5190 if (byval->encoding == REDIS_ENCODING_INT) {
5191 vector[j].u.score = (long)byval->ptr;
5192 } else
5193 redisAssert(1 != 1);
5194 }
5195 }
5196 } else {
5197 if (!alpha) {
5198 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5199 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5200 else {
5201 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5202 vector[j].u.score = (long) vector[j].obj->ptr;
5203 else
5204 redisAssert(1 != 1);
5205 }
5206 }
5207 }
5208 }
5209 }
5210
5211 /* We are ready to sort the vector... perform a bit of sanity check
5212 * on the LIMIT option too. We'll use a partial version of quicksort. */
5213 start = (limit_start < 0) ? 0 : limit_start;
5214 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5215 if (start >= vectorlen) {
5216 start = vectorlen-1;
5217 end = vectorlen-2;
5218 }
5219 if (end >= vectorlen) end = vectorlen-1;
5220
5221 if (dontsort == 0) {
5222 server.sort_desc = desc;
5223 server.sort_alpha = alpha;
5224 server.sort_bypattern = sortby ? 1 : 0;
5225 if (sortby && (start != 0 || end != vectorlen-1))
5226 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5227 else
5228 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5229 }
5230
5231 /* Send command output to the output buffer, performing the specified
5232 * GET/DEL/INCR/DECR operations if any. */
5233 outputlen = getop ? getop*(end-start+1) : end-start+1;
5234 if (storekey == NULL) {
5235 /* STORE option not specified, sent the sorting result to client */
5236 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5237 for (j = start; j <= end; j++) {
5238 listNode *ln;
5239 if (!getop) {
5240 addReplyBulkLen(c,vector[j].obj);
5241 addReply(c,vector[j].obj);
5242 addReply(c,shared.crlf);
5243 }
5244 listRewind(operations);
5245 while((ln = listYield(operations))) {
5246 redisSortOperation *sop = ln->value;
5247 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5248 vector[j].obj);
5249
5250 if (sop->type == REDIS_SORT_GET) {
5251 if (!val || val->type != REDIS_STRING) {
5252 addReply(c,shared.nullbulk);
5253 } else {
5254 addReplyBulkLen(c,val);
5255 addReply(c,val);
5256 addReply(c,shared.crlf);
5257 }
5258 } else {
5259 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5260 }
5261 }
5262 }
5263 } else {
5264 robj *listObject = createListObject();
5265 list *listPtr = (list*) listObject->ptr;
5266
5267 /* STORE option specified, set the sorting result as a List object */
5268 for (j = start; j <= end; j++) {
5269 listNode *ln;
5270 if (!getop) {
5271 listAddNodeTail(listPtr,vector[j].obj);
5272 incrRefCount(vector[j].obj);
5273 }
5274 listRewind(operations);
5275 while((ln = listYield(operations))) {
5276 redisSortOperation *sop = ln->value;
5277 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5278 vector[j].obj);
5279
5280 if (sop->type == REDIS_SORT_GET) {
5281 if (!val || val->type != REDIS_STRING) {
5282 listAddNodeTail(listPtr,createStringObject("",0));
5283 } else {
5284 listAddNodeTail(listPtr,val);
5285 incrRefCount(val);
5286 }
5287 } else {
5288 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5289 }
5290 }
5291 }
5292 if (dictReplace(c->db->dict,storekey,listObject)) {
5293 incrRefCount(storekey);
5294 }
5295 /* Note: we add 1 because the DB is dirty anyway since even if the
5296 * SORT result is empty a new key is set and maybe the old content
5297 * replaced. */
5298 server.dirty += 1+outputlen;
5299 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5300 }
5301
5302 /* Cleanup */
5303 decrRefCount(sortval);
5304 listRelease(operations);
5305 for (j = 0; j < vectorlen; j++) {
5306 if (sortby && alpha && vector[j].u.cmpobj)
5307 decrRefCount(vector[j].u.cmpobj);
5308 }
5309 zfree(vector);
5310 }
5311
5312 /* Create the string returned by the INFO command. This is decoupled
5313 * by the INFO command itself as we need to report the same information
5314 * on memory corruption problems. */
5315 static sds genRedisInfoString(void) {
5316 sds info;
5317 time_t uptime = time(NULL)-server.stat_starttime;
5318 int j;
5319
5320 info = sdscatprintf(sdsempty(),
5321 "redis_version:%s\r\n"
5322 "arch_bits:%s\r\n"
5323 "multiplexing_api:%s\r\n"
5324 "uptime_in_seconds:%ld\r\n"
5325 "uptime_in_days:%ld\r\n"
5326 "connected_clients:%d\r\n"
5327 "connected_slaves:%d\r\n"
5328 "blocked_clients:%d\r\n"
5329 "used_memory:%zu\r\n"
5330 "changes_since_last_save:%lld\r\n"
5331 "bgsave_in_progress:%d\r\n"
5332 "last_save_time:%ld\r\n"
5333 "bgrewriteaof_in_progress:%d\r\n"
5334 "total_connections_received:%lld\r\n"
5335 "total_commands_processed:%lld\r\n"
5336 "role:%s\r\n"
5337 ,REDIS_VERSION,
5338 (sizeof(long) == 8) ? "64" : "32",
5339 aeGetApiName(),
5340 uptime,
5341 uptime/(3600*24),
5342 listLength(server.clients)-listLength(server.slaves),
5343 listLength(server.slaves),
5344 server.blockedclients,
5345 server.usedmemory,
5346 server.dirty,
5347 server.bgsavechildpid != -1,
5348 server.lastsave,
5349 server.bgrewritechildpid != -1,
5350 server.stat_numconnections,
5351 server.stat_numcommands,
5352 server.masterhost == NULL ? "master" : "slave"
5353 );
5354 if (server.masterhost) {
5355 info = sdscatprintf(info,
5356 "master_host:%s\r\n"
5357 "master_port:%d\r\n"
5358 "master_link_status:%s\r\n"
5359 "master_last_io_seconds_ago:%d\r\n"
5360 ,server.masterhost,
5361 server.masterport,
5362 (server.replstate == REDIS_REPL_CONNECTED) ?
5363 "up" : "down",
5364 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5365 );
5366 }
5367 for (j = 0; j < server.dbnum; j++) {
5368 long long keys, vkeys;
5369
5370 keys = dictSize(server.db[j].dict);
5371 vkeys = dictSize(server.db[j].expires);
5372 if (keys || vkeys) {
5373 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5374 j, keys, vkeys);
5375 }
5376 }
5377 return info;
5378 }
5379
5380 static void infoCommand(redisClient *c) {
5381 sds info = genRedisInfoString();
5382 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5383 (unsigned long)sdslen(info)));
5384 addReplySds(c,info);
5385 addReply(c,shared.crlf);
5386 }
5387
5388 static void monitorCommand(redisClient *c) {
5389 /* ignore MONITOR if aleady slave or in monitor mode */
5390 if (c->flags & REDIS_SLAVE) return;
5391
5392 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5393 c->slaveseldb = 0;
5394 listAddNodeTail(server.monitors,c);
5395 addReply(c,shared.ok);
5396 }
5397
5398 /* ================================= Expire ================================= */
5399 static int removeExpire(redisDb *db, robj *key) {
5400 if (dictDelete(db->expires,key) == DICT_OK) {
5401 return 1;
5402 } else {
5403 return 0;
5404 }
5405 }
5406
5407 static int setExpire(redisDb *db, robj *key, time_t when) {
5408 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5409 return 0;
5410 } else {
5411 incrRefCount(key);
5412 return 1;
5413 }
5414 }
5415
5416 /* Return the expire time of the specified key, or -1 if no expire
5417 * is associated with this key (i.e. the key is non volatile) */
5418 static time_t getExpire(redisDb *db, robj *key) {
5419 dictEntry *de;
5420
5421 /* No expire? return ASAP */
5422 if (dictSize(db->expires) == 0 ||
5423 (de = dictFind(db->expires,key)) == NULL) return -1;
5424
5425 return (time_t) dictGetEntryVal(de);
5426 }
5427
5428 static int expireIfNeeded(redisDb *db, robj *key) {
5429 time_t when;
5430 dictEntry *de;
5431
5432 /* No expire? return ASAP */
5433 if (dictSize(db->expires) == 0 ||
5434 (de = dictFind(db->expires,key)) == NULL) return 0;
5435
5436 /* Lookup the expire */
5437 when = (time_t) dictGetEntryVal(de);
5438 if (time(NULL) <= when) return 0;
5439
5440 /* Delete the key */
5441 dictDelete(db->expires,key);
5442 return dictDelete(db->dict,key) == DICT_OK;
5443 }
5444
5445 static int deleteIfVolatile(redisDb *db, robj *key) {
5446 dictEntry *de;
5447
5448 /* No expire? return ASAP */
5449 if (dictSize(db->expires) == 0 ||
5450 (de = dictFind(db->expires,key)) == NULL) return 0;
5451
5452 /* Delete the key */
5453 server.dirty++;
5454 dictDelete(db->expires,key);
5455 return dictDelete(db->dict,key) == DICT_OK;
5456 }
5457
5458 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5459 dictEntry *de;
5460
5461 de = dictFind(c->db->dict,key);
5462 if (de == NULL) {
5463 addReply(c,shared.czero);
5464 return;
5465 }
5466 if (seconds < 0) {
5467 if (deleteKey(c->db,key)) server.dirty++;
5468 addReply(c, shared.cone);
5469 return;
5470 } else {
5471 time_t when = time(NULL)+seconds;
5472 if (setExpire(c->db,key,when)) {
5473 addReply(c,shared.cone);
5474 server.dirty++;
5475 } else {
5476 addReply(c,shared.czero);
5477 }
5478 return;
5479 }
5480 }
5481
5482 static void expireCommand(redisClient *c) {
5483 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5484 }
5485
5486 static void expireatCommand(redisClient *c) {
5487 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5488 }
5489
5490 static void ttlCommand(redisClient *c) {
5491 time_t expire;
5492 int ttl = -1;
5493
5494 expire = getExpire(c->db,c->argv[1]);
5495 if (expire != -1) {
5496 ttl = (int) (expire-time(NULL));
5497 if (ttl < 0) ttl = -1;
5498 }
5499 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5500 }
5501
5502 /* ================================ MULTI/EXEC ============================== */
5503
5504 /* Client state initialization for MULTI/EXEC */
5505 static void initClientMultiState(redisClient *c) {
5506 c->mstate.commands = NULL;
5507 c->mstate.count = 0;
5508 }
5509
5510 /* Release all the resources associated with MULTI/EXEC state */
5511 static void freeClientMultiState(redisClient *c) {
5512 int j;
5513
5514 for (j = 0; j < c->mstate.count; j++) {
5515 int i;
5516 multiCmd *mc = c->mstate.commands+j;
5517
5518 for (i = 0; i < mc->argc; i++)
5519 decrRefCount(mc->argv[i]);
5520 zfree(mc->argv);
5521 }
5522 zfree(c->mstate.commands);
5523 }
5524
5525 /* Add a new command into the MULTI commands queue */
5526 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5527 multiCmd *mc;
5528 int j;
5529
5530 c->mstate.commands = zrealloc(c->mstate.commands,
5531 sizeof(multiCmd)*(c->mstate.count+1));
5532 mc = c->mstate.commands+c->mstate.count;
5533 mc->cmd = cmd;
5534 mc->argc = c->argc;
5535 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5536 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5537 for (j = 0; j < c->argc; j++)
5538 incrRefCount(mc->argv[j]);
5539 c->mstate.count++;
5540 }
5541
5542 static void multiCommand(redisClient *c) {
5543 c->flags |= REDIS_MULTI;
5544 addReply(c,shared.ok);
5545 }
5546
5547 static void execCommand(redisClient *c) {
5548 int j;
5549 robj **orig_argv;
5550 int orig_argc;
5551
5552 if (!(c->flags & REDIS_MULTI)) {
5553 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5554 return;
5555 }
5556
5557 orig_argv = c->argv;
5558 orig_argc = c->argc;
5559 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5560 for (j = 0; j < c->mstate.count; j++) {
5561 c->argc = c->mstate.commands[j].argc;
5562 c->argv = c->mstate.commands[j].argv;
5563 call(c,c->mstate.commands[j].cmd);
5564 }
5565 c->argv = orig_argv;
5566 c->argc = orig_argc;
5567 freeClientMultiState(c);
5568 initClientMultiState(c);
5569 c->flags &= (~REDIS_MULTI);
5570 }
5571
5572 /* =========================== Blocking Operations ========================= */
5573
5574 /* Currently Redis blocking operations support is limited to list POP ops,
5575 * so the current implementation is not fully generic, but it is also not
5576 * completely specific so it will not require a rewrite to support new
5577 * kind of blocking operations in the future.
5578 *
5579 * Still it's important to note that list blocking operations can be already
5580 * used as a notification mechanism in order to implement other blocking
5581 * operations at application level, so there must be a very strong evidence
5582 * of usefulness and generality before new blocking operations are implemented.
5583 *
5584 * This is how the current blocking POP works, we use BLPOP as example:
5585 * - If the user calls BLPOP and the key exists and contains a non empty list
5586 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5587 * if there is not to block.
5588 * - If instead BLPOP is called and the key does not exists or the list is
5589 * empty we need to block. In order to do so we remove the notification for
5590 * new data to read in the client socket (so that we'll not serve new
5591 * requests if the blocking request is not served). Also we put the client
5592 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5593 * blocking for this keys.
5594 * - If a PUSH operation against a key with blocked clients waiting is
5595 * performed, we serve the first in the list: basically instead to push
5596 * the new element inside the list we return it to the (first / oldest)
5597 * blocking client, unblock the client, and remove it form the list.
5598 *
5599 * The above comment and the source code should be enough in order to understand
5600 * the implementation and modify / fix it later.
5601 */
5602
5603 /* Set a client in blocking mode for the specified key, with the specified
5604 * timeout */
5605 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5606 dictEntry *de;
5607 list *l;
5608 int j;
5609
5610 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5611 c->blockingkeysnum = numkeys;
5612 c->blockingto = timeout;
5613 for (j = 0; j < numkeys; j++) {
5614 /* Add the key in the client structure, to map clients -> keys */
5615 c->blockingkeys[j] = keys[j];
5616 incrRefCount(keys[j]);
5617
5618 /* And in the other "side", to map keys -> clients */
5619 de = dictFind(c->db->blockingkeys,keys[j]);
5620 if (de == NULL) {
5621 int retval;
5622
5623 /* For every key we take a list of clients blocked for it */
5624 l = listCreate();
5625 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5626 incrRefCount(keys[j]);
5627 assert(retval == DICT_OK);
5628 } else {
5629 l = dictGetEntryVal(de);
5630 }
5631 listAddNodeTail(l,c);
5632 }
5633 /* Mark the client as a blocked client */
5634 c->flags |= REDIS_BLOCKED;
5635 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5636 server.blockedclients++;
5637 }
5638
5639 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5640 static void unblockClient(redisClient *c) {
5641 dictEntry *de;
5642 list *l;
5643 int j;
5644
5645 assert(c->blockingkeys != NULL);
5646 /* The client may wait for multiple keys, so unblock it for every key. */
5647 for (j = 0; j < c->blockingkeysnum; j++) {
5648 /* Remove this client from the list of clients waiting for this key. */
5649 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5650 assert(de != NULL);
5651 l = dictGetEntryVal(de);
5652 listDelNode(l,listSearchKey(l,c));
5653 /* If the list is empty we need to remove it to avoid wasting memory */
5654 if (listLength(l) == 0)
5655 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5656 decrRefCount(c->blockingkeys[j]);
5657 }
5658 /* Cleanup the client structure */
5659 zfree(c->blockingkeys);
5660 c->blockingkeys = NULL;
5661 c->flags &= (~REDIS_BLOCKED);
5662 server.blockedclients--;
5663 /* Ok now we are ready to get read events from socket, note that we
5664 * can't trap errors here as it's possible that unblockClients() is
5665 * called from freeClient() itself, and the only thing we can do
5666 * if we failed to register the READABLE event is to kill the client.
5667 * Still the following function should never fail in the real world as
5668 * we are sure the file descriptor is sane, and we exit on out of mem. */
5669 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5670 /* As a final step we want to process data if there is some command waiting
5671 * in the input buffer. Note that this is safe even if unblockClient()
5672 * gets called from freeClient() because freeClient() will be smart
5673 * enough to call this function *after* c->querybuf was set to NULL. */
5674 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5675 }
5676
5677 /* This should be called from any function PUSHing into lists.
5678 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5679 * 'ele' is the element pushed.
5680 *
5681 * If the function returns 0 there was no client waiting for a list push
5682 * against this key.
5683 *
5684 * If the function returns 1 there was a client waiting for a list push
5685 * against this key, the element was passed to this client thus it's not
5686 * needed to actually add it to the list and the caller should return asap. */
5687 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5688 struct dictEntry *de;
5689 redisClient *receiver;
5690 list *l;
5691 listNode *ln;
5692
5693 de = dictFind(c->db->blockingkeys,key);
5694 if (de == NULL) return 0;
5695 l = dictGetEntryVal(de);
5696 ln = listFirst(l);
5697 assert(ln != NULL);
5698 receiver = ln->value;
5699
5700 addReplySds(receiver,sdsnew("*2\r\n"));
5701 addReplyBulkLen(receiver,key);
5702 addReply(receiver,key);
5703 addReply(receiver,shared.crlf);
5704 addReplyBulkLen(receiver,ele);
5705 addReply(receiver,ele);
5706 addReply(receiver,shared.crlf);
5707 unblockClient(receiver);
5708 return 1;
5709 }
5710
5711 /* Blocking RPOP/LPOP */
5712 static void blockingPopGenericCommand(redisClient *c, int where) {
5713 robj *o;
5714 time_t timeout;
5715 int j;
5716
5717 for (j = 1; j < c->argc-1; j++) {
5718 o = lookupKeyWrite(c->db,c->argv[j]);
5719 if (o != NULL) {
5720 if (o->type != REDIS_LIST) {
5721 addReply(c,shared.wrongtypeerr);
5722 return;
5723 } else {
5724 list *list = o->ptr;
5725 if (listLength(list) != 0) {
5726 /* If the list contains elements fall back to the usual
5727 * non-blocking POP operation */
5728 robj *argv[2], **orig_argv;
5729 int orig_argc;
5730
5731 /* We need to alter the command arguments before to call
5732 * popGenericCommand() as the command takes a single key. */
5733 orig_argv = c->argv;
5734 orig_argc = c->argc;
5735 argv[1] = c->argv[j];
5736 c->argv = argv;
5737 c->argc = 2;
5738
5739 /* Also the return value is different, we need to output
5740 * the multi bulk reply header and the key name. The
5741 * "real" command will add the last element (the value)
5742 * for us. If this souds like an hack to you it's just
5743 * because it is... */
5744 addReplySds(c,sdsnew("*2\r\n"));
5745 addReplyBulkLen(c,argv[1]);
5746 addReply(c,argv[1]);
5747 addReply(c,shared.crlf);
5748 popGenericCommand(c,where);
5749
5750 /* Fix the client structure with the original stuff */
5751 c->argv = orig_argv;
5752 c->argc = orig_argc;
5753 return;
5754 }
5755 }
5756 }
5757 }
5758 /* If the list is empty or the key does not exists we must block */
5759 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5760 if (timeout > 0) timeout += time(NULL);
5761 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5762 }
5763
5764 static void blpopCommand(redisClient *c) {
5765 blockingPopGenericCommand(c,REDIS_HEAD);
5766 }
5767
5768 static void brpopCommand(redisClient *c) {
5769 blockingPopGenericCommand(c,REDIS_TAIL);
5770 }
5771
5772 /* =============================== Replication ============================= */
5773
5774 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5775 ssize_t nwritten, ret = size;
5776 time_t start = time(NULL);
5777
5778 timeout++;
5779 while(size) {
5780 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5781 nwritten = write(fd,ptr,size);
5782 if (nwritten == -1) return -1;
5783 ptr += nwritten;
5784 size -= nwritten;
5785 }
5786 if ((time(NULL)-start) > timeout) {
5787 errno = ETIMEDOUT;
5788 return -1;
5789 }
5790 }
5791 return ret;
5792 }
5793
5794 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5795 ssize_t nread, totread = 0;
5796 time_t start = time(NULL);
5797
5798 timeout++;
5799 while(size) {
5800 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5801 nread = read(fd,ptr,size);
5802 if (nread == -1) return -1;
5803 ptr += nread;
5804 size -= nread;
5805 totread += nread;
5806 }
5807 if ((time(NULL)-start) > timeout) {
5808 errno = ETIMEDOUT;
5809 return -1;
5810 }
5811 }
5812 return totread;
5813 }
5814
5815 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5816 ssize_t nread = 0;
5817
5818 size--;
5819 while(size) {
5820 char c;
5821
5822 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5823 if (c == '\n') {
5824 *ptr = '\0';
5825 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5826 return nread;
5827 } else {
5828 *ptr++ = c;
5829 *ptr = '\0';
5830 nread++;
5831 }
5832 }
5833 return nread;
5834 }
5835
5836 static void syncCommand(redisClient *c) {
5837 /* ignore SYNC if aleady slave or in monitor mode */
5838 if (c->flags & REDIS_SLAVE) return;
5839
5840 /* SYNC can't be issued when the server has pending data to send to
5841 * the client about already issued commands. We need a fresh reply
5842 * buffer registering the differences between the BGSAVE and the current
5843 * dataset, so that we can copy to other slaves if needed. */
5844 if (listLength(c->reply) != 0) {
5845 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5846 return;
5847 }
5848
5849 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5850 /* Here we need to check if there is a background saving operation
5851 * in progress, or if it is required to start one */
5852 if (server.bgsavechildpid != -1) {
5853 /* Ok a background save is in progress. Let's check if it is a good
5854 * one for replication, i.e. if there is another slave that is
5855 * registering differences since the server forked to save */
5856 redisClient *slave;
5857 listNode *ln;
5858
5859 listRewind(server.slaves);
5860 while((ln = listYield(server.slaves))) {
5861 slave = ln->value;
5862 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5863 }
5864 if (ln) {
5865 /* Perfect, the server is already registering differences for
5866 * another slave. Set the right state, and copy the buffer. */
5867 listRelease(c->reply);
5868 c->reply = listDup(slave->reply);
5869 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5870 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5871 } else {
5872 /* No way, we need to wait for the next BGSAVE in order to
5873 * register differences */
5874 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5875 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5876 }
5877 } else {
5878 /* Ok we don't have a BGSAVE in progress, let's start one */
5879 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5880 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5881 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5882 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5883 return;
5884 }
5885 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5886 }
5887 c->repldbfd = -1;
5888 c->flags |= REDIS_SLAVE;
5889 c->slaveseldb = 0;
5890 listAddNodeTail(server.slaves,c);
5891 return;
5892 }
5893
5894 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5895 redisClient *slave = privdata;
5896 REDIS_NOTUSED(el);
5897 REDIS_NOTUSED(mask);
5898 char buf[REDIS_IOBUF_LEN];
5899 ssize_t nwritten, buflen;
5900
5901 if (slave->repldboff == 0) {
5902 /* Write the bulk write count before to transfer the DB. In theory here
5903 * we don't know how much room there is in the output buffer of the
5904 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5905 * operations) will never be smaller than the few bytes we need. */
5906 sds bulkcount;
5907
5908 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5909 slave->repldbsize);
5910 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5911 {
5912 sdsfree(bulkcount);
5913 freeClient(slave);
5914 return;
5915 }
5916 sdsfree(bulkcount);
5917 }
5918 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5919 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5920 if (buflen <= 0) {
5921 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5922 (buflen == 0) ? "premature EOF" : strerror(errno));
5923 freeClient(slave);
5924 return;
5925 }
5926 if ((nwritten = write(fd,buf,buflen)) == -1) {
5927 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5928 strerror(errno));
5929 freeClient(slave);
5930 return;
5931 }
5932 slave->repldboff += nwritten;
5933 if (slave->repldboff == slave->repldbsize) {
5934 close(slave->repldbfd);
5935 slave->repldbfd = -1;
5936 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5937 slave->replstate = REDIS_REPL_ONLINE;
5938 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5939 sendReplyToClient, slave) == AE_ERR) {
5940 freeClient(slave);
5941 return;
5942 }
5943 addReplySds(slave,sdsempty());
5944 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5945 }
5946 }
5947
5948 /* This function is called at the end of every backgrond saving.
5949 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5950 * otherwise REDIS_ERR is passed to the function.
5951 *
5952 * The goal of this function is to handle slaves waiting for a successful
5953 * background saving in order to perform non-blocking synchronization. */
5954 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5955 listNode *ln;
5956 int startbgsave = 0;
5957
5958 listRewind(server.slaves);
5959 while((ln = listYield(server.slaves))) {
5960 redisClient *slave = ln->value;
5961
5962 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5963 startbgsave = 1;
5964 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5965 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5966 struct redis_stat buf;
5967
5968 if (bgsaveerr != REDIS_OK) {
5969 freeClient(slave);
5970 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5971 continue;
5972 }
5973 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5974 redis_fstat(slave->repldbfd,&buf) == -1) {
5975 freeClient(slave);
5976 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5977 continue;
5978 }
5979 slave->repldboff = 0;
5980 slave->repldbsize = buf.st_size;
5981 slave->replstate = REDIS_REPL_SEND_BULK;
5982 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5983 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5984 freeClient(slave);
5985 continue;
5986 }
5987 }
5988 }
5989 if (startbgsave) {
5990 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5991 listRewind(server.slaves);
5992 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5993 while((ln = listYield(server.slaves))) {
5994 redisClient *slave = ln->value;
5995
5996 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5997 freeClient(slave);
5998 }
5999 }
6000 }
6001 }
6002
6003 static int syncWithMaster(void) {
6004 char buf[1024], tmpfile[256], authcmd[1024];
6005 int dumpsize;
6006 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
6007 int dfd;
6008
6009 if (fd == -1) {
6010 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
6011 strerror(errno));
6012 return REDIS_ERR;
6013 }
6014
6015 /* AUTH with the master if required. */
6016 if(server.masterauth) {
6017 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
6018 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
6019 close(fd);
6020 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
6021 strerror(errno));
6022 return REDIS_ERR;
6023 }
6024 /* Read the AUTH result. */
6025 if (syncReadLine(fd,buf,1024,3600) == -1) {
6026 close(fd);
6027 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6028 strerror(errno));
6029 return REDIS_ERR;
6030 }
6031 if (buf[0] != '+') {
6032 close(fd);
6033 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6034 return REDIS_ERR;
6035 }
6036 }
6037
6038 /* Issue the SYNC command */
6039 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6040 close(fd);
6041 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6042 strerror(errno));
6043 return REDIS_ERR;
6044 }
6045 /* Read the bulk write count */
6046 if (syncReadLine(fd,buf,1024,3600) == -1) {
6047 close(fd);
6048 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6049 strerror(errno));
6050 return REDIS_ERR;
6051 }
6052 if (buf[0] != '$') {
6053 close(fd);
6054 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6055 return REDIS_ERR;
6056 }
6057 dumpsize = atoi(buf+1);
6058 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6059 /* Read the bulk write data on a temp file */
6060 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6061 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6062 if (dfd == -1) {
6063 close(fd);
6064 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6065 return REDIS_ERR;
6066 }
6067 while(dumpsize) {
6068 int nread, nwritten;
6069
6070 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6071 if (nread == -1) {
6072 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6073 strerror(errno));
6074 close(fd);
6075 close(dfd);
6076 return REDIS_ERR;
6077 }
6078 nwritten = write(dfd,buf,nread);
6079 if (nwritten == -1) {
6080 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6081 close(fd);
6082 close(dfd);
6083 return REDIS_ERR;
6084 }
6085 dumpsize -= nread;
6086 }
6087 close(dfd);
6088 if (rename(tmpfile,server.dbfilename) == -1) {
6089 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6090 unlink(tmpfile);
6091 close(fd);
6092 return REDIS_ERR;
6093 }
6094 emptyDb();
6095 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6096 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6097 close(fd);
6098 return REDIS_ERR;
6099 }
6100 server.master = createClient(fd);
6101 server.master->flags |= REDIS_MASTER;
6102 server.master->authenticated = 1;
6103 server.replstate = REDIS_REPL_CONNECTED;
6104 return REDIS_OK;
6105 }
6106
6107 static void slaveofCommand(redisClient *c) {
6108 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6109 !strcasecmp(c->argv[2]->ptr,"one")) {
6110 if (server.masterhost) {
6111 sdsfree(server.masterhost);
6112 server.masterhost = NULL;
6113 if (server.master) freeClient(server.master);
6114 server.replstate = REDIS_REPL_NONE;
6115 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6116 }
6117 } else {
6118 sdsfree(server.masterhost);
6119 server.masterhost = sdsdup(c->argv[1]->ptr);
6120 server.masterport = atoi(c->argv[2]->ptr);
6121 if (server.master) freeClient(server.master);
6122 server.replstate = REDIS_REPL_CONNECT;
6123 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6124 server.masterhost, server.masterport);
6125 }
6126 addReply(c,shared.ok);
6127 }
6128
6129 /* ============================ Maxmemory directive ======================== */
6130
6131 /* This function gets called when 'maxmemory' is set on the config file to limit
6132 * the max memory used by the server, and we are out of memory.
6133 * This function will try to, in order:
6134 *
6135 * - Free objects from the free list
6136 * - Try to remove keys with an EXPIRE set
6137 *
6138 * It is not possible to free enough memory to reach used-memory < maxmemory
6139 * the server will start refusing commands that will enlarge even more the
6140 * memory usage.
6141 */
6142 static void freeMemoryIfNeeded(void) {
6143 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6144 if (listLength(server.objfreelist)) {
6145 robj *o;
6146
6147 listNode *head = listFirst(server.objfreelist);
6148 o = listNodeValue(head);
6149 listDelNode(server.objfreelist,head);
6150 zfree(o);
6151 } else {
6152 int j, k, freed = 0;
6153
6154 for (j = 0; j < server.dbnum; j++) {
6155 int minttl = -1;
6156 robj *minkey = NULL;
6157 struct dictEntry *de;
6158
6159 if (dictSize(server.db[j].expires)) {
6160 freed = 1;
6161 /* From a sample of three keys drop the one nearest to
6162 * the natural expire */
6163 for (k = 0; k < 3; k++) {
6164 time_t t;
6165
6166 de = dictGetRandomKey(server.db[j].expires);
6167 t = (time_t) dictGetEntryVal(de);
6168 if (minttl == -1 || t < minttl) {
6169 minkey = dictGetEntryKey(de);
6170 minttl = t;
6171 }
6172 }
6173 deleteKey(server.db+j,minkey);
6174 }
6175 }
6176 if (!freed) return; /* nothing to free... */
6177 }
6178 }
6179 }
6180
6181 /* ============================== Append Only file ========================== */
6182
6183 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6184 sds buf = sdsempty();
6185 int j;
6186 ssize_t nwritten;
6187 time_t now;
6188 robj *tmpargv[3];
6189
6190 /* The DB this command was targetting is not the same as the last command
6191 * we appendend. To issue a SELECT command is needed. */
6192 if (dictid != server.appendseldb) {
6193 char seldb[64];
6194
6195 snprintf(seldb,sizeof(seldb),"%d",dictid);
6196 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6197 (unsigned long)strlen(seldb),seldb);
6198 server.appendseldb = dictid;
6199 }
6200
6201 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6202 * EXPIREs into EXPIREATs calls */
6203 if (cmd->proc == expireCommand) {
6204 long when;
6205
6206 tmpargv[0] = createStringObject("EXPIREAT",8);
6207 tmpargv[1] = argv[1];
6208 incrRefCount(argv[1]);
6209 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6210 tmpargv[2] = createObject(REDIS_STRING,
6211 sdscatprintf(sdsempty(),"%ld",when));
6212 argv = tmpargv;
6213 }
6214
6215 /* Append the actual command */
6216 buf = sdscatprintf(buf,"*%d\r\n",argc);
6217 for (j = 0; j < argc; j++) {
6218 robj *o = argv[j];
6219
6220 o = getDecodedObject(o);
6221 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6222 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6223 buf = sdscatlen(buf,"\r\n",2);
6224 decrRefCount(o);
6225 }
6226
6227 /* Free the objects from the modified argv for EXPIREAT */
6228 if (cmd->proc == expireCommand) {
6229 for (j = 0; j < 3; j++)
6230 decrRefCount(argv[j]);
6231 }
6232
6233 /* We want to perform a single write. This should be guaranteed atomic
6234 * at least if the filesystem we are writing is a real physical one.
6235 * While this will save us against the server being killed I don't think
6236 * there is much to do about the whole server stopping for power problems
6237 * or alike */
6238 nwritten = write(server.appendfd,buf,sdslen(buf));
6239 if (nwritten != (signed)sdslen(buf)) {
6240 /* Ooops, we are in troubles. The best thing to do for now is
6241 * to simply exit instead to give the illusion that everything is
6242 * working as expected. */
6243 if (nwritten == -1) {
6244 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6245 } else {
6246 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6247 }
6248 exit(1);
6249 }
6250 /* If a background append only file rewriting is in progress we want to
6251 * accumulate the differences between the child DB and the current one
6252 * in a buffer, so that when the child process will do its work we
6253 * can append the differences to the new append only file. */
6254 if (server.bgrewritechildpid != -1)
6255 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6256
6257 sdsfree(buf);
6258 now = time(NULL);
6259 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6260 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6261 now-server.lastfsync > 1))
6262 {
6263 fsync(server.appendfd); /* Let's try to get this data on the disk */
6264 server.lastfsync = now;
6265 }
6266 }
6267
6268 /* In Redis commands are always executed in the context of a client, so in
6269 * order to load the append only file we need to create a fake client. */
6270 static struct redisClient *createFakeClient(void) {
6271 struct redisClient *c = zmalloc(sizeof(*c));
6272
6273 selectDb(c,0);
6274 c->fd = -1;
6275 c->querybuf = sdsempty();
6276 c->argc = 0;
6277 c->argv = NULL;
6278 c->flags = 0;
6279 /* We set the fake client as a slave waiting for the synchronization
6280 * so that Redis will not try to send replies to this client. */
6281 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6282 c->reply = listCreate();
6283 listSetFreeMethod(c->reply,decrRefCount);
6284 listSetDupMethod(c->reply,dupClientReplyValue);
6285 return c;
6286 }
6287
6288 static void freeFakeClient(struct redisClient *c) {
6289 sdsfree(c->querybuf);
6290 listRelease(c->reply);
6291 zfree(c);
6292 }
6293
6294 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6295 * error (the append only file is zero-length) REDIS_ERR is returned. On
6296 * fatal error an error message is logged and the program exists. */
6297 int loadAppendOnlyFile(char *filename) {
6298 struct redisClient *fakeClient;
6299 FILE *fp = fopen(filename,"r");
6300 struct redis_stat sb;
6301
6302 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6303 return REDIS_ERR;
6304
6305 if (fp == NULL) {
6306 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6307 exit(1);
6308 }
6309
6310 fakeClient = createFakeClient();
6311 while(1) {
6312 int argc, j;
6313 unsigned long len;
6314 robj **argv;
6315 char buf[128];
6316 sds argsds;
6317 struct redisCommand *cmd;
6318
6319 if (fgets(buf,sizeof(buf),fp) == NULL) {
6320 if (feof(fp))
6321 break;
6322 else
6323 goto readerr;
6324 }
6325 if (buf[0] != '*') goto fmterr;
6326 argc = atoi(buf+1);
6327 argv = zmalloc(sizeof(robj*)*argc);
6328 for (j = 0; j < argc; j++) {
6329 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6330 if (buf[0] != '$') goto fmterr;
6331 len = strtol(buf+1,NULL,10);
6332 argsds = sdsnewlen(NULL,len);
6333 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6334 argv[j] = createObject(REDIS_STRING,argsds);
6335 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6336 }
6337
6338 /* Command lookup */
6339 cmd = lookupCommand(argv[0]->ptr);
6340 if (!cmd) {
6341 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6342 exit(1);
6343 }
6344 /* Try object sharing and encoding */
6345 if (server.shareobjects) {
6346 int j;
6347 for(j = 1; j < argc; j++)
6348 argv[j] = tryObjectSharing(argv[j]);
6349 }
6350 if (cmd->flags & REDIS_CMD_BULK)
6351 tryObjectEncoding(argv[argc-1]);
6352 /* Run the command in the context of a fake client */
6353 fakeClient->argc = argc;
6354 fakeClient->argv = argv;
6355 cmd->proc(fakeClient);
6356 /* Discard the reply objects list from the fake client */
6357 while(listLength(fakeClient->reply))
6358 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6359 /* Clean up, ready for the next command */
6360 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6361 zfree(argv);
6362 }
6363 fclose(fp);
6364 freeFakeClient(fakeClient);
6365 return REDIS_OK;
6366
6367 readerr:
6368 if (feof(fp)) {
6369 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6370 } else {
6371 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6372 }
6373 exit(1);
6374 fmterr:
6375 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6376 exit(1);
6377 }
6378
6379 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6380 static int fwriteBulk(FILE *fp, robj *obj) {
6381 char buf[128];
6382 obj = getDecodedObject(obj);
6383 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6384 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6385 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6386 goto err;
6387 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6388 decrRefCount(obj);
6389 return 1;
6390 err:
6391 decrRefCount(obj);
6392 return 0;
6393 }
6394
6395 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6396 static int fwriteBulkDouble(FILE *fp, double d) {
6397 char buf[128], dbuf[128];
6398
6399 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6400 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6401 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6402 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6403 return 1;
6404 }
6405
6406 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6407 static int fwriteBulkLong(FILE *fp, long l) {
6408 char buf[128], lbuf[128];
6409
6410 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6411 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6412 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6413 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6414 return 1;
6415 }
6416
6417 /* Write a sequence of commands able to fully rebuild the dataset into
6418 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6419 static int rewriteAppendOnlyFile(char *filename) {
6420 dictIterator *di = NULL;
6421 dictEntry *de;
6422 FILE *fp;
6423 char tmpfile[256];
6424 int j;
6425 time_t now = time(NULL);
6426
6427 /* Note that we have to use a different temp name here compared to the
6428 * one used by rewriteAppendOnlyFileBackground() function. */
6429 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6430 fp = fopen(tmpfile,"w");
6431 if (!fp) {
6432 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6433 return REDIS_ERR;
6434 }
6435 for (j = 0; j < server.dbnum; j++) {
6436 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6437 redisDb *db = server.db+j;
6438 dict *d = db->dict;
6439 if (dictSize(d) == 0) continue;
6440 di = dictGetIterator(d);
6441 if (!di) {
6442 fclose(fp);
6443 return REDIS_ERR;
6444 }
6445
6446 /* SELECT the new DB */
6447 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6448 if (fwriteBulkLong(fp,j) == 0) goto werr;
6449
6450 /* Iterate this DB writing every entry */
6451 while((de = dictNext(di)) != NULL) {
6452 robj *key = dictGetEntryKey(de);
6453 robj *o = dictGetEntryVal(de);
6454 time_t expiretime = getExpire(db,key);
6455
6456 /* Save the key and associated value */
6457 if (o->type == REDIS_STRING) {
6458 /* Emit a SET command */
6459 char cmd[]="*3\r\n$3\r\nSET\r\n";
6460 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6461 /* Key and value */
6462 if (fwriteBulk(fp,key) == 0) goto werr;
6463 if (fwriteBulk(fp,o) == 0) goto werr;
6464 } else if (o->type == REDIS_LIST) {
6465 /* Emit the RPUSHes needed to rebuild the list */
6466 list *list = o->ptr;
6467 listNode *ln;
6468
6469 listRewind(list);
6470 while((ln = listYield(list))) {
6471 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6472 robj *eleobj = listNodeValue(ln);
6473
6474 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6475 if (fwriteBulk(fp,key) == 0) goto werr;
6476 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6477 }
6478 } else if (o->type == REDIS_SET) {
6479 /* Emit the SADDs needed to rebuild the set */
6480 dict *set = o->ptr;
6481 dictIterator *di = dictGetIterator(set);
6482 dictEntry *de;
6483
6484 while((de = dictNext(di)) != NULL) {
6485 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6486 robj *eleobj = dictGetEntryKey(de);
6487
6488 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6489 if (fwriteBulk(fp,key) == 0) goto werr;
6490 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6491 }
6492 dictReleaseIterator(di);
6493 } else if (o->type == REDIS_ZSET) {
6494 /* Emit the ZADDs needed to rebuild the sorted set */
6495 zset *zs = o->ptr;
6496 dictIterator *di = dictGetIterator(zs->dict);
6497 dictEntry *de;
6498
6499 while((de = dictNext(di)) != NULL) {
6500 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6501 robj *eleobj = dictGetEntryKey(de);
6502 double *score = dictGetEntryVal(de);
6503
6504 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6505 if (fwriteBulk(fp,key) == 0) goto werr;
6506 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6507 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6508 }
6509 dictReleaseIterator(di);
6510 } else {
6511 redisAssert(0 != 0);
6512 }
6513 /* Save the expire time */
6514 if (expiretime != -1) {
6515 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6516 /* If this key is already expired skip it */
6517 if (expiretime < now) continue;
6518 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6519 if (fwriteBulk(fp,key) == 0) goto werr;
6520 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6521 }
6522 }
6523 dictReleaseIterator(di);
6524 }
6525
6526 /* Make sure data will not remain on the OS's output buffers */
6527 fflush(fp);
6528 fsync(fileno(fp));
6529 fclose(fp);
6530
6531 /* Use RENAME to make sure the DB file is changed atomically only
6532 * if the generate DB file is ok. */
6533 if (rename(tmpfile,filename) == -1) {
6534 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6535 unlink(tmpfile);
6536 return REDIS_ERR;
6537 }
6538 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6539 return REDIS_OK;
6540
6541 werr:
6542 fclose(fp);
6543 unlink(tmpfile);
6544 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6545 if (di) dictReleaseIterator(di);
6546 return REDIS_ERR;
6547 }
6548
6549 /* This is how rewriting of the append only file in background works:
6550 *
6551 * 1) The user calls BGREWRITEAOF
6552 * 2) Redis calls this function, that forks():
6553 * 2a) the child rewrite the append only file in a temp file.
6554 * 2b) the parent accumulates differences in server.bgrewritebuf.
6555 * 3) When the child finished '2a' exists.
6556 * 4) The parent will trap the exit code, if it's OK, will append the
6557 * data accumulated into server.bgrewritebuf into the temp file, and
6558 * finally will rename(2) the temp file in the actual file name.
6559 * The the new file is reopened as the new append only file. Profit!
6560 */
6561 static int rewriteAppendOnlyFileBackground(void) {
6562 pid_t childpid;
6563
6564 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6565 if ((childpid = fork()) == 0) {
6566 /* Child */
6567 char tmpfile[256];
6568 close(server.fd);
6569
6570 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6571 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6572 exit(0);
6573 } else {
6574 exit(1);
6575 }
6576 } else {
6577 /* Parent */
6578 if (childpid == -1) {
6579 redisLog(REDIS_WARNING,
6580 "Can't rewrite append only file in background: fork: %s",
6581 strerror(errno));
6582 return REDIS_ERR;
6583 }
6584 redisLog(REDIS_NOTICE,
6585 "Background append only file rewriting started by pid %d",childpid);
6586 server.bgrewritechildpid = childpid;
6587 /* We set appendseldb to -1 in order to force the next call to the
6588 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6589 * accumulated by the parent into server.bgrewritebuf will start
6590 * with a SELECT statement and it will be safe to merge. */
6591 server.appendseldb = -1;
6592 return REDIS_OK;
6593 }
6594 return REDIS_OK; /* unreached */
6595 }
6596
6597 static void bgrewriteaofCommand(redisClient *c) {
6598 if (server.bgrewritechildpid != -1) {
6599 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6600 return;
6601 }
6602 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6603 char *status = "+Background append only file rewriting started\r\n";
6604 addReplySds(c,sdsnew(status));
6605 } else {
6606 addReply(c,shared.err);
6607 }
6608 }
6609
6610 static void aofRemoveTempFile(pid_t childpid) {
6611 char tmpfile[256];
6612
6613 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6614 unlink(tmpfile);
6615 }
6616
6617 /* =============================== Virtual Memory =========================== */
6618 static void vmInit(void) {
6619 off_t totsize;
6620
6621 server.vm_fp = fopen("/tmp/redisvm","w+b");
6622 if (server.vm_fp == NULL) {
6623 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6624 exit(1);
6625 }
6626 server.vm_fd = fileno(server.vm_fp);
6627 server.vm_next_page = 0;
6628 server.vm_near_pages = 0;
6629 totsize = server.vm_pages*server.vm_page_size;
6630 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6631 if (ftruncate(server.vm_fd,totsize) == -1) {
6632 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6633 strerror(errno));
6634 exit(1);
6635 } else {
6636 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6637 }
6638 server.vm_bitmap = zmalloc((server.vm_near_pages+7)/8);
6639 memset(server.vm_bitmap,0,(server.vm_near_pages+7)/8);
6640 /* Try to remove the swap file, so the OS will really delete it from the
6641 * file system when Redis exists. */
6642 unlink("/tmp/redisvm");
6643 }
6644
6645 /* Mark the page as used */
6646 static void vmMarkPageUsed(off_t page) {
6647 off_t byte = page/8;
6648 int bit = page&7;
6649 server.vm_bitmap[byte] |= 1<<bit;
6650 }
6651
6652 /* Mark N contiguous pages as used, with 'page' being the first. */
6653 static void vmMarkPagesUsed(off_t page, off_t count) {
6654 off_t j;
6655
6656 for (j = 0; j < count; j++)
6657 vmMarkPageUsed(page+count);
6658 }
6659
6660 /* Mark the page as free */
6661 static void vmMarkPageFree(off_t page) {
6662 off_t byte = page/8;
6663 int bit = page&7;
6664 server.vm_bitmap[byte] &= ~(1<<bit);
6665 }
6666
6667 /* Mark N contiguous pages as free, with 'page' being the first. */
6668 static void vmMarkPagesFree(off_t page, off_t count) {
6669 off_t j;
6670
6671 for (j = 0; j < count; j++)
6672 vmMarkPageFree(page+count);
6673 }
6674
6675 /* Test if the page is free */
6676 static int vmFreePage(off_t page) {
6677 off_t byte = page/8;
6678 int bit = page&7;
6679 return server.vm_bitmap[byte] & bit;
6680 }
6681
6682 /* Find N contiguous free pages storing the first page of the cluster in *first.
6683 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6684 * REDIS_ERR is returned.
6685 *
6686 * This function uses a simple algorithm: we try to allocate
6687 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6688 * again from the start of the swap file searching for free spaces.
6689 *
6690 * If it looks pretty clear that there are no free pages near our offset
6691 * we try to find less populated places doing a forward jump of
6692 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6693 * without hurry, and then we jump again and so forth...
6694 *
6695 * This function can be improved using a free list to avoid to guess
6696 * too much, since we could collect data about freed pages.
6697 *
6698 * note: I implemented this function just after watching an episode of
6699 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6700 */
6701 static int vmFindContiguousPages(off_t *first, int n) {
6702 off_t base, offset = 0, since_jump = 0, numfree = 0;
6703
6704 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6705 server.vm_near_pages = 0;
6706 server.vm_next_page = 0;
6707 }
6708 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6709 base = server.vm_next_page;
6710
6711 while(offset < server.vm_pages) {
6712 off_t this = base+offset;
6713
6714 /* If we overflow, restart from page zero */
6715 if (this >= server.vm_pages) {
6716 this -= server.vm_pages;
6717 if (this == 0) {
6718 /* Just overflowed, what we found on tail is no longer
6719 * interesting, as it's no longer contiguous. */
6720 numfree = 0;
6721 }
6722 }
6723 if (vmFreePage(this)) {
6724 /* This is a free page */
6725 numfree++;
6726 /* Already got N free pages? Return to the caller, with success */
6727 if (numfree == n) {
6728 *first = this;
6729 return REDIS_OK;
6730 }
6731 } else {
6732 /* The current one is not a free page */
6733 numfree = 0;
6734 }
6735
6736 /* Fast-forward if the current page is not free and we already
6737 * searched enough near this place. */
6738 since_jump++;
6739 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6740 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6741 since_jump = 0;
6742 /* Note that even if we rewind after the jump, we are don't need
6743 * to make sure numfree is set to zero as we only jump *if* it
6744 * is set to zero. */
6745 } else {
6746 /* Otherwise just check the next page */
6747 offset++;
6748 }
6749 }
6750 return REDIS_ERR;
6751 }
6752
6753 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6754 * needed to later retrieve the object into the key object.
6755 * If we can't find enough contiguous empty pages to swap the object on disk
6756 * REDIS_ERR is returned. */
6757 static int vmSwapObject(robj *key, robj *val) {
6758 off_t pages = rdbSavedObjectPages(val);
6759 off_t page;
6760
6761 assert(key->storage == REDIS_VM_MEMORY);
6762 if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
6763 if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
6764 redisLog(REDIS_WARNING,
6765 "Critical VM problem in vmSwapObject(): can't seek: %s",
6766 strerror(errno));
6767 return REDIS_ERR;
6768 }
6769 rdbSaveObject(server.vm_fp,val);
6770 key->vm.page = page;
6771 key->vm.usedpages = pages;
6772 key->storage = REDIS_VM_SWAPPED;
6773 decrRefCount(val); /* Deallocate the object from memory. */
6774 vmMarkPagesUsed(page,pages);
6775 return REDIS_OK;
6776 }
6777
6778 /* Load the value object relative to the 'key' object from swap to memory.
6779 * The newly allocated object is returned. */
6780 static robj *vmLoadObject(robj *key) {
6781 robj *val;
6782
6783 assert(key->storage == REDIS_VM_SWAPPED);
6784 if (fseeko(server.vm_fp,key->vm.page*server.vm_page_size,SEEK_SET) == -1) {
6785 redisLog(REDIS_WARNING,
6786 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6787 strerror(errno));
6788 exit(1);
6789 }
6790 val = rdbLoadObject(key->type,server.vm_fp);
6791 if (val == NULL) {
6792 redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno));
6793 exit(1);
6794 }
6795 key->storage = REDIS_VM_MEMORY;
6796 key->vm.atime = server.unixtime;
6797 vmMarkPagesFree(key->vm.page,key->vm.usedpages);
6798 return val;
6799 }
6800
6801 /* ================================= Debugging ============================== */
6802
6803 static void debugCommand(redisClient *c) {
6804 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6805 *((char*)-1) = 'x';
6806 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6807 if (rdbSave(server.dbfilename) != REDIS_OK) {
6808 addReply(c,shared.err);
6809 return;
6810 }
6811 emptyDb();
6812 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6813 addReply(c,shared.err);
6814 return;
6815 }
6816 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6817 addReply(c,shared.ok);
6818 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6819 emptyDb();
6820 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6821 addReply(c,shared.err);
6822 return;
6823 }
6824 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6825 addReply(c,shared.ok);
6826 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6827 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6828 robj *key, *val;
6829
6830 if (!de) {
6831 addReply(c,shared.nokeyerr);
6832 return;
6833 }
6834 key = dictGetEntryKey(de);
6835 val = dictGetEntryVal(de);
6836 addReplySds(c,sdscatprintf(sdsempty(),
6837 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6838 (void*)key, key->refcount, (void*)val, val->refcount,
6839 val->encoding, rdbSavedObjectLen(val)));
6840 } else {
6841 addReplySds(c,sdsnew(
6842 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6843 }
6844 }
6845
6846 static void _redisAssert(char *estr) {
6847 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6848 redisLog(REDIS_WARNING,"==> %s\n",estr);
6849 #ifdef HAVE_BACKTRACE
6850 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6851 *((char*)-1) = 'x';
6852 #endif
6853 }
6854
6855 /* =================================== Main! ================================ */
6856
6857 #ifdef __linux__
6858 int linuxOvercommitMemoryValue(void) {
6859 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6860 char buf[64];
6861
6862 if (!fp) return -1;
6863 if (fgets(buf,64,fp) == NULL) {
6864 fclose(fp);
6865 return -1;
6866 }
6867 fclose(fp);
6868
6869 return atoi(buf);
6870 }
6871
6872 void linuxOvercommitMemoryWarning(void) {
6873 if (linuxOvercommitMemoryValue() == 0) {
6874 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6875 }
6876 }
6877 #endif /* __linux__ */
6878
6879 static void daemonize(void) {
6880 int fd;
6881 FILE *fp;
6882
6883 if (fork() != 0) exit(0); /* parent exits */
6884 printf("New pid: %d\n", getpid());
6885 setsid(); /* create a new session */
6886
6887 /* Every output goes to /dev/null. If Redis is daemonized but
6888 * the 'logfile' is set to 'stdout' in the configuration file
6889 * it will not log at all. */
6890 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6891 dup2(fd, STDIN_FILENO);
6892 dup2(fd, STDOUT_FILENO);
6893 dup2(fd, STDERR_FILENO);
6894 if (fd > STDERR_FILENO) close(fd);
6895 }
6896 /* Try to write the pid file */
6897 fp = fopen(server.pidfile,"w");
6898 if (fp) {
6899 fprintf(fp,"%d\n",getpid());
6900 fclose(fp);
6901 }
6902 }
6903
6904 int main(int argc, char **argv) {
6905 initServerConfig();
6906 if (argc == 2) {
6907 resetServerSaveParams();
6908 loadServerConfig(argv[1]);
6909 } else if (argc > 2) {
6910 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6911 exit(1);
6912 } else {
6913 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6914 }
6915 if (server.daemonize) daemonize();
6916 initServer();
6917 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6918 #ifdef __linux__
6919 linuxOvercommitMemoryWarning();
6920 #endif
6921 if (server.appendonly) {
6922 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6923 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6924 } else {
6925 if (rdbLoad(server.dbfilename) == REDIS_OK)
6926 redisLog(REDIS_NOTICE,"DB loaded from disk");
6927 }
6928 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6929 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6930 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6931 aeMain(server.el);
6932 aeDeleteEventLoop(server.el);
6933 return 0;
6934 }
6935
6936 /* ============================= Backtrace support ========================= */
6937
6938 #ifdef HAVE_BACKTRACE
6939 static char *findFuncName(void *pointer, unsigned long *offset);
6940
6941 static void *getMcontextEip(ucontext_t *uc) {
6942 #if defined(__FreeBSD__)
6943 return (void*) uc->uc_mcontext.mc_eip;
6944 #elif defined(__dietlibc__)
6945 return (void*) uc->uc_mcontext.eip;
6946 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6947 #if __x86_64__
6948 return (void*) uc->uc_mcontext->__ss.__rip;
6949 #else
6950 return (void*) uc->uc_mcontext->__ss.__eip;
6951 #endif
6952 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6953 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6954 return (void*) uc->uc_mcontext->__ss.__rip;
6955 #else
6956 return (void*) uc->uc_mcontext->__ss.__eip;
6957 #endif
6958 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6959 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6960 #elif defined(__ia64__) /* Linux IA64 */
6961 return (void*) uc->uc_mcontext.sc_ip;
6962 #else
6963 return NULL;
6964 #endif
6965 }
6966
6967 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6968 void *trace[100];
6969 char **messages = NULL;
6970 int i, trace_size = 0;
6971 unsigned long offset=0;
6972 ucontext_t *uc = (ucontext_t*) secret;
6973 sds infostring;
6974 REDIS_NOTUSED(info);
6975
6976 redisLog(REDIS_WARNING,
6977 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6978 infostring = genRedisInfoString();
6979 redisLog(REDIS_WARNING, "%s",infostring);
6980 /* It's not safe to sdsfree() the returned string under memory
6981 * corruption conditions. Let it leak as we are going to abort */
6982
6983 trace_size = backtrace(trace, 100);
6984 /* overwrite sigaction with caller's address */
6985 if (getMcontextEip(uc) != NULL) {
6986 trace[1] = getMcontextEip(uc);
6987 }
6988 messages = backtrace_symbols(trace, trace_size);
6989
6990 for (i=1; i<trace_size; ++i) {
6991 char *fn = findFuncName(trace[i], &offset), *p;
6992
6993 p = strchr(messages[i],'+');
6994 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6995 redisLog(REDIS_WARNING,"%s", messages[i]);
6996 } else {
6997 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6998 }
6999 }
7000 /* free(messages); Don't call free() with possibly corrupted memory. */
7001 exit(0);
7002 }
7003
7004 static void setupSigSegvAction(void) {
7005 struct sigaction act;
7006
7007 sigemptyset (&act.sa_mask);
7008 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7009 * is used. Otherwise, sa_handler is used */
7010 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
7011 act.sa_sigaction = segvHandler;
7012 sigaction (SIGSEGV, &act, NULL);
7013 sigaction (SIGBUS, &act, NULL);
7014 sigaction (SIGFPE, &act, NULL);
7015 sigaction (SIGILL, &act, NULL);
7016 sigaction (SIGBUS, &act, NULL);
7017 return;
7018 }
7019
7020 #include "staticsymbols.h"
7021 /* This function try to convert a pointer into a function name. It's used in
7022 * oreder to provide a backtrace under segmentation fault that's able to
7023 * display functions declared as static (otherwise the backtrace is useless). */
7024 static char *findFuncName(void *pointer, unsigned long *offset){
7025 int i, ret = -1;
7026 unsigned long off, minoff = 0;
7027
7028 /* Try to match against the Symbol with the smallest offset */
7029 for (i=0; symsTable[i].pointer; i++) {
7030 unsigned long lp = (unsigned long) pointer;
7031
7032 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
7033 off=lp-symsTable[i].pointer;
7034 if (ret < 0 || off < minoff) {
7035 minoff=off;
7036 ret=i;
7037 }
7038 }
7039 }
7040 if (ret == -1) return NULL;
7041 *offset = minoff;
7042 return symsTable[ret].name;
7043 }
7044 #else /* HAVE_BACKTRACE */
7045 static void setupSigSegvAction(void) {
7046 }
7047 #endif /* HAVE_BACKTRACE */
7048
7049
7050
7051 /* The End */
7052
7053
7054