]> git.saurik.com Git - redis.git/blob - redis.c
952f1f731e69bfa6a2930b3ef54c7b3b0bfa2588
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166 /* Client flags */
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188 /* List related stuff */
189 #define REDIS_HEAD 0
190 #define REDIS_TAIL 1
191
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
197
198 /* Log levels */
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
202
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
205
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
213
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr);
217
218 /*================================= Data types ============================== */
219
220 /* A redis object, that is a type able to hold a string / list / set */
221
222 /* The VM object structure */
223 struct redisObjectVM {
224 off_t page; /* the page at witch the object is stored on disk */
225 off_t usedpages; /* number of pages used on disk */
226 time_t atime; /* Last access time */
227 } vm;
228
229 /* The actual Redis Object */
230 typedef struct redisObject {
231 void *ptr;
232 unsigned char type;
233 unsigned char encoding;
234 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
235 unsigned char notused;
236 int refcount;
237 /* VM fields, this are only allocated if VM is active, otherwise the
238 * object allocation function will just allocate
239 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
240 * Redis without VM active will not have any overhead. */
241 struct redisObjectVM vm;
242 } robj;
243
244 /* Macro used to initalize a Redis object allocated on the stack.
245 * Note that this macro is taken near the structure definition to make sure
246 * we'll update it when the structure is changed, to avoid bugs like
247 * bug #85 introduced exactly in this way. */
248 #define initStaticStringObject(_var,_ptr) do { \
249 _var.refcount = 1; \
250 _var.type = REDIS_STRING; \
251 _var.encoding = REDIS_ENCODING_RAW; \
252 _var.ptr = _ptr; \
253 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
254 } while(0);
255
256 typedef struct redisDb {
257 dict *dict; /* The keyspace for this DB */
258 dict *expires; /* Timeout of keys with a timeout set */
259 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
260 int id;
261 } redisDb;
262
263 /* Client MULTI/EXEC state */
264 typedef struct multiCmd {
265 robj **argv;
266 int argc;
267 struct redisCommand *cmd;
268 } multiCmd;
269
270 typedef struct multiState {
271 multiCmd *commands; /* Array of MULTI commands */
272 int count; /* Total number of MULTI commands */
273 } multiState;
274
275 /* With multiplexing we need to take per-clinet state.
276 * Clients are taken in a liked list. */
277 typedef struct redisClient {
278 int fd;
279 redisDb *db;
280 int dictid;
281 sds querybuf;
282 robj **argv, **mbargv;
283 int argc, mbargc;
284 int bulklen; /* bulk read len. -1 if not in bulk read mode */
285 int multibulk; /* multi bulk command format active */
286 list *reply;
287 int sentlen;
288 time_t lastinteraction; /* time of the last interaction, used for timeout */
289 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
290 /* REDIS_MULTI */
291 int slaveseldb; /* slave selected db, if this client is a slave */
292 int authenticated; /* when requirepass is non-NULL */
293 int replstate; /* replication state if this is a slave */
294 int repldbfd; /* replication DB file descriptor */
295 long repldboff; /* replication DB file offset */
296 off_t repldbsize; /* replication DB file size */
297 multiState mstate; /* MULTI/EXEC state */
298 robj **blockingkeys; /* The key we waiting to terminate a blocking
299 * operation such as BLPOP. Otherwise NULL. */
300 int blockingkeysnum; /* Number of blocking keys */
301 time_t blockingto; /* Blocking operation timeout. If UNIX current time
302 * is >= blockingto then the operation timed out. */
303 } redisClient;
304
305 struct saveparam {
306 time_t seconds;
307 int changes;
308 };
309
310 /* Global server state structure */
311 struct redisServer {
312 int port;
313 int fd;
314 redisDb *db;
315 dict *sharingpool; /* Poll used for object sharing */
316 unsigned int sharingpoolsize;
317 long long dirty; /* changes to DB from the last save */
318 list *clients;
319 list *slaves, *monitors;
320 char neterr[ANET_ERR_LEN];
321 aeEventLoop *el;
322 int cronloops; /* number of times the cron function run */
323 list *objfreelist; /* A list of freed objects to avoid malloc() */
324 time_t lastsave; /* Unix time of last save succeeede */
325 size_t usedmemory; /* Used memory in megabytes */
326 /* Fields used only for stats */
327 time_t stat_starttime; /* server start time */
328 long long stat_numcommands; /* number of processed commands */
329 long long stat_numconnections; /* number of connections received */
330 /* Configuration */
331 int verbosity;
332 int glueoutputbuf;
333 int maxidletime;
334 int dbnum;
335 int daemonize;
336 int appendonly;
337 int appendfsync;
338 time_t lastfsync;
339 int appendfd;
340 int appendseldb;
341 char *pidfile;
342 pid_t bgsavechildpid;
343 pid_t bgrewritechildpid;
344 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
345 struct saveparam *saveparams;
346 int saveparamslen;
347 char *logfile;
348 char *bindaddr;
349 char *dbfilename;
350 char *appendfilename;
351 char *requirepass;
352 int shareobjects;
353 int rdbcompression;
354 /* Replication related */
355 int isslave;
356 char *masterauth;
357 char *masterhost;
358 int masterport;
359 redisClient *master; /* client that is master for this slave */
360 int replstate;
361 unsigned int maxclients;
362 unsigned long maxmemory;
363 unsigned int blockedclients;
364 /* Sort parameters - qsort_r() is only available under BSD so we
365 * have to take this state global, in order to pass it to sortCompare() */
366 int sort_desc;
367 int sort_alpha;
368 int sort_bypattern;
369 /* Virtual memory configuration */
370 int vm_enabled;
371 off_t vm_page_size;
372 off_t vm_pages;
373 long vm_max_memory;
374 /* Virtual memory state */
375 FILE *vm_fp;
376 int vm_fd;
377 off_t vm_next_page; /* Next probably empty page */
378 off_t vm_near_pages; /* Number of pages allocated sequentially */
379 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
380 time_t unixtime; /* Unix time sampled every second. */
381 };
382
383 typedef void redisCommandProc(redisClient *c);
384 struct redisCommand {
385 char *name;
386 redisCommandProc *proc;
387 int arity;
388 int flags;
389 };
390
391 struct redisFunctionSym {
392 char *name;
393 unsigned long pointer;
394 };
395
396 typedef struct _redisSortObject {
397 robj *obj;
398 union {
399 double score;
400 robj *cmpobj;
401 } u;
402 } redisSortObject;
403
404 typedef struct _redisSortOperation {
405 int type;
406 robj *pattern;
407 } redisSortOperation;
408
409 /* ZSETs use a specialized version of Skiplists */
410
411 typedef struct zskiplistNode {
412 struct zskiplistNode **forward;
413 struct zskiplistNode *backward;
414 double score;
415 robj *obj;
416 } zskiplistNode;
417
418 typedef struct zskiplist {
419 struct zskiplistNode *header, *tail;
420 unsigned long length;
421 int level;
422 } zskiplist;
423
424 typedef struct zset {
425 dict *dict;
426 zskiplist *zsl;
427 } zset;
428
429 /* Our shared "common" objects */
430
431 struct sharedObjectsStruct {
432 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
433 *colon, *nullbulk, *nullmultibulk, *queued,
434 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
435 *outofrangeerr, *plus,
436 *select0, *select1, *select2, *select3, *select4,
437 *select5, *select6, *select7, *select8, *select9;
438 } shared;
439
440 /* Global vars that are actally used as constants. The following double
441 * values are used for double on-disk serialization, and are initialized
442 * at runtime to avoid strange compiler optimizations. */
443
444 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
445
446 /*================================ Prototypes =============================== */
447
448 static void freeStringObject(robj *o);
449 static void freeListObject(robj *o);
450 static void freeSetObject(robj *o);
451 static void decrRefCount(void *o);
452 static robj *createObject(int type, void *ptr);
453 static void freeClient(redisClient *c);
454 static int rdbLoad(char *filename);
455 static void addReply(redisClient *c, robj *obj);
456 static void addReplySds(redisClient *c, sds s);
457 static void incrRefCount(robj *o);
458 static int rdbSaveBackground(char *filename);
459 static robj *createStringObject(char *ptr, size_t len);
460 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
461 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
462 static int syncWithMaster(void);
463 static robj *tryObjectSharing(robj *o);
464 static int tryObjectEncoding(robj *o);
465 static robj *getDecodedObject(robj *o);
466 static int removeExpire(redisDb *db, robj *key);
467 static int expireIfNeeded(redisDb *db, robj *key);
468 static int deleteIfVolatile(redisDb *db, robj *key);
469 static int deleteKey(redisDb *db, robj *key);
470 static time_t getExpire(redisDb *db, robj *key);
471 static int setExpire(redisDb *db, robj *key, time_t when);
472 static void updateSlavesWaitingBgsave(int bgsaveerr);
473 static void freeMemoryIfNeeded(void);
474 static int processCommand(redisClient *c);
475 static void setupSigSegvAction(void);
476 static void rdbRemoveTempFile(pid_t childpid);
477 static void aofRemoveTempFile(pid_t childpid);
478 static size_t stringObjectLen(robj *o);
479 static void processInputBuffer(redisClient *c);
480 static zskiplist *zslCreate(void);
481 static void zslFree(zskiplist *zsl);
482 static void zslInsert(zskiplist *zsl, double score, robj *obj);
483 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
484 static void initClientMultiState(redisClient *c);
485 static void freeClientMultiState(redisClient *c);
486 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
487 static void unblockClient(redisClient *c);
488 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
489 static void vmInit(void);
490 static void vmMarkPagesFree(off_t page, off_t count);
491 static robj *vmLoadObject(robj *key);
492
493 static void authCommand(redisClient *c);
494 static void pingCommand(redisClient *c);
495 static void echoCommand(redisClient *c);
496 static void setCommand(redisClient *c);
497 static void setnxCommand(redisClient *c);
498 static void getCommand(redisClient *c);
499 static void delCommand(redisClient *c);
500 static void existsCommand(redisClient *c);
501 static void incrCommand(redisClient *c);
502 static void decrCommand(redisClient *c);
503 static void incrbyCommand(redisClient *c);
504 static void decrbyCommand(redisClient *c);
505 static void selectCommand(redisClient *c);
506 static void randomkeyCommand(redisClient *c);
507 static void keysCommand(redisClient *c);
508 static void dbsizeCommand(redisClient *c);
509 static void lastsaveCommand(redisClient *c);
510 static void saveCommand(redisClient *c);
511 static void bgsaveCommand(redisClient *c);
512 static void bgrewriteaofCommand(redisClient *c);
513 static void shutdownCommand(redisClient *c);
514 static void moveCommand(redisClient *c);
515 static void renameCommand(redisClient *c);
516 static void renamenxCommand(redisClient *c);
517 static void lpushCommand(redisClient *c);
518 static void rpushCommand(redisClient *c);
519 static void lpopCommand(redisClient *c);
520 static void rpopCommand(redisClient *c);
521 static void llenCommand(redisClient *c);
522 static void lindexCommand(redisClient *c);
523 static void lrangeCommand(redisClient *c);
524 static void ltrimCommand(redisClient *c);
525 static void typeCommand(redisClient *c);
526 static void lsetCommand(redisClient *c);
527 static void saddCommand(redisClient *c);
528 static void sremCommand(redisClient *c);
529 static void smoveCommand(redisClient *c);
530 static void sismemberCommand(redisClient *c);
531 static void scardCommand(redisClient *c);
532 static void spopCommand(redisClient *c);
533 static void srandmemberCommand(redisClient *c);
534 static void sinterCommand(redisClient *c);
535 static void sinterstoreCommand(redisClient *c);
536 static void sunionCommand(redisClient *c);
537 static void sunionstoreCommand(redisClient *c);
538 static void sdiffCommand(redisClient *c);
539 static void sdiffstoreCommand(redisClient *c);
540 static void syncCommand(redisClient *c);
541 static void flushdbCommand(redisClient *c);
542 static void flushallCommand(redisClient *c);
543 static void sortCommand(redisClient *c);
544 static void lremCommand(redisClient *c);
545 static void rpoplpushcommand(redisClient *c);
546 static void infoCommand(redisClient *c);
547 static void mgetCommand(redisClient *c);
548 static void monitorCommand(redisClient *c);
549 static void expireCommand(redisClient *c);
550 static void expireatCommand(redisClient *c);
551 static void getsetCommand(redisClient *c);
552 static void ttlCommand(redisClient *c);
553 static void slaveofCommand(redisClient *c);
554 static void debugCommand(redisClient *c);
555 static void msetCommand(redisClient *c);
556 static void msetnxCommand(redisClient *c);
557 static void zaddCommand(redisClient *c);
558 static void zincrbyCommand(redisClient *c);
559 static void zrangeCommand(redisClient *c);
560 static void zrangebyscoreCommand(redisClient *c);
561 static void zrevrangeCommand(redisClient *c);
562 static void zcardCommand(redisClient *c);
563 static void zremCommand(redisClient *c);
564 static void zscoreCommand(redisClient *c);
565 static void zremrangebyscoreCommand(redisClient *c);
566 static void multiCommand(redisClient *c);
567 static void execCommand(redisClient *c);
568 static void blpopCommand(redisClient *c);
569 static void brpopCommand(redisClient *c);
570
571 /*================================= Globals ================================= */
572
573 /* Global vars */
574 static struct redisServer server; /* server global state */
575 static struct redisCommand cmdTable[] = {
576 {"get",getCommand,2,REDIS_CMD_INLINE},
577 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
578 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
579 {"del",delCommand,-2,REDIS_CMD_INLINE},
580 {"exists",existsCommand,2,REDIS_CMD_INLINE},
581 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
582 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
583 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
584 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
585 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
586 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
587 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
588 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
589 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
590 {"llen",llenCommand,2,REDIS_CMD_INLINE},
591 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
592 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
593 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
594 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
595 {"lrem",lremCommand,4,REDIS_CMD_BULK},
596 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
597 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
598 {"srem",sremCommand,3,REDIS_CMD_BULK},
599 {"smove",smoveCommand,4,REDIS_CMD_BULK},
600 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
601 {"scard",scardCommand,2,REDIS_CMD_INLINE},
602 {"spop",spopCommand,2,REDIS_CMD_INLINE},
603 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
604 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
605 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
606 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
607 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
608 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
609 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
610 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
611 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
612 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
613 {"zrem",zremCommand,3,REDIS_CMD_BULK},
614 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
615 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
616 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
617 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
618 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
619 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
620 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
621 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
622 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
623 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
624 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
625 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
626 {"select",selectCommand,2,REDIS_CMD_INLINE},
627 {"move",moveCommand,3,REDIS_CMD_INLINE},
628 {"rename",renameCommand,3,REDIS_CMD_INLINE},
629 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
630 {"expire",expireCommand,3,REDIS_CMD_INLINE},
631 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
632 {"keys",keysCommand,2,REDIS_CMD_INLINE},
633 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
634 {"auth",authCommand,2,REDIS_CMD_INLINE},
635 {"ping",pingCommand,1,REDIS_CMD_INLINE},
636 {"echo",echoCommand,2,REDIS_CMD_BULK},
637 {"save",saveCommand,1,REDIS_CMD_INLINE},
638 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
639 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
640 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
641 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
642 {"type",typeCommand,2,REDIS_CMD_INLINE},
643 {"multi",multiCommand,1,REDIS_CMD_INLINE},
644 {"exec",execCommand,1,REDIS_CMD_INLINE},
645 {"sync",syncCommand,1,REDIS_CMD_INLINE},
646 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
647 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
648 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
649 {"info",infoCommand,1,REDIS_CMD_INLINE},
650 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
651 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
652 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
653 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
654 {NULL,NULL,0,0}
655 };
656
657 /*============================ Utility functions ============================ */
658
659 /* Glob-style pattern matching. */
660 int stringmatchlen(const char *pattern, int patternLen,
661 const char *string, int stringLen, int nocase)
662 {
663 while(patternLen) {
664 switch(pattern[0]) {
665 case '*':
666 while (pattern[1] == '*') {
667 pattern++;
668 patternLen--;
669 }
670 if (patternLen == 1)
671 return 1; /* match */
672 while(stringLen) {
673 if (stringmatchlen(pattern+1, patternLen-1,
674 string, stringLen, nocase))
675 return 1; /* match */
676 string++;
677 stringLen--;
678 }
679 return 0; /* no match */
680 break;
681 case '?':
682 if (stringLen == 0)
683 return 0; /* no match */
684 string++;
685 stringLen--;
686 break;
687 case '[':
688 {
689 int not, match;
690
691 pattern++;
692 patternLen--;
693 not = pattern[0] == '^';
694 if (not) {
695 pattern++;
696 patternLen--;
697 }
698 match = 0;
699 while(1) {
700 if (pattern[0] == '\\') {
701 pattern++;
702 patternLen--;
703 if (pattern[0] == string[0])
704 match = 1;
705 } else if (pattern[0] == ']') {
706 break;
707 } else if (patternLen == 0) {
708 pattern--;
709 patternLen++;
710 break;
711 } else if (pattern[1] == '-' && patternLen >= 3) {
712 int start = pattern[0];
713 int end = pattern[2];
714 int c = string[0];
715 if (start > end) {
716 int t = start;
717 start = end;
718 end = t;
719 }
720 if (nocase) {
721 start = tolower(start);
722 end = tolower(end);
723 c = tolower(c);
724 }
725 pattern += 2;
726 patternLen -= 2;
727 if (c >= start && c <= end)
728 match = 1;
729 } else {
730 if (!nocase) {
731 if (pattern[0] == string[0])
732 match = 1;
733 } else {
734 if (tolower((int)pattern[0]) == tolower((int)string[0]))
735 match = 1;
736 }
737 }
738 pattern++;
739 patternLen--;
740 }
741 if (not)
742 match = !match;
743 if (!match)
744 return 0; /* no match */
745 string++;
746 stringLen--;
747 break;
748 }
749 case '\\':
750 if (patternLen >= 2) {
751 pattern++;
752 patternLen--;
753 }
754 /* fall through */
755 default:
756 if (!nocase) {
757 if (pattern[0] != string[0])
758 return 0; /* no match */
759 } else {
760 if (tolower((int)pattern[0]) != tolower((int)string[0]))
761 return 0; /* no match */
762 }
763 string++;
764 stringLen--;
765 break;
766 }
767 pattern++;
768 patternLen--;
769 if (stringLen == 0) {
770 while(*pattern == '*') {
771 pattern++;
772 patternLen--;
773 }
774 break;
775 }
776 }
777 if (patternLen == 0 && stringLen == 0)
778 return 1;
779 return 0;
780 }
781
782 static void redisLog(int level, const char *fmt, ...) {
783 va_list ap;
784 FILE *fp;
785
786 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
787 if (!fp) return;
788
789 va_start(ap, fmt);
790 if (level >= server.verbosity) {
791 char *c = ".-*";
792 char buf[64];
793 time_t now;
794
795 now = time(NULL);
796 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
797 fprintf(fp,"%s %c ",buf,c[level]);
798 vfprintf(fp, fmt, ap);
799 fprintf(fp,"\n");
800 fflush(fp);
801 }
802 va_end(ap);
803
804 if (server.logfile) fclose(fp);
805 }
806
807 /*====================== Hash table type implementation ==================== */
808
809 /* This is an hash table type that uses the SDS dynamic strings libary as
810 * keys and radis objects as values (objects can hold SDS strings,
811 * lists, sets). */
812
813 static void dictVanillaFree(void *privdata, void *val)
814 {
815 DICT_NOTUSED(privdata);
816 zfree(val);
817 }
818
819 static void dictListDestructor(void *privdata, void *val)
820 {
821 DICT_NOTUSED(privdata);
822 listRelease((list*)val);
823 }
824
825 static int sdsDictKeyCompare(void *privdata, const void *key1,
826 const void *key2)
827 {
828 int l1,l2;
829 DICT_NOTUSED(privdata);
830
831 l1 = sdslen((sds)key1);
832 l2 = sdslen((sds)key2);
833 if (l1 != l2) return 0;
834 return memcmp(key1, key2, l1) == 0;
835 }
836
837 static void dictRedisObjectDestructor(void *privdata, void *val)
838 {
839 DICT_NOTUSED(privdata);
840
841 if (val == NULL) return; /* Values of swapped out keys as set to NULL */
842 decrRefCount(val);
843 }
844
845 static int dictObjKeyCompare(void *privdata, const void *key1,
846 const void *key2)
847 {
848 const robj *o1 = key1, *o2 = key2;
849 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
850 }
851
852 static unsigned int dictObjHash(const void *key) {
853 const robj *o = key;
854 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
855 }
856
857 static int dictEncObjKeyCompare(void *privdata, const void *key1,
858 const void *key2)
859 {
860 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
861 int cmp;
862
863 o1 = getDecodedObject(o1);
864 o2 = getDecodedObject(o2);
865 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
866 decrRefCount(o1);
867 decrRefCount(o2);
868 return cmp;
869 }
870
871 static unsigned int dictEncObjHash(const void *key) {
872 robj *o = (robj*) key;
873
874 o = getDecodedObject(o);
875 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
876 decrRefCount(o);
877 return hash;
878 }
879
880 static dictType setDictType = {
881 dictEncObjHash, /* hash function */
882 NULL, /* key dup */
883 NULL, /* val dup */
884 dictEncObjKeyCompare, /* key compare */
885 dictRedisObjectDestructor, /* key destructor */
886 NULL /* val destructor */
887 };
888
889 static dictType zsetDictType = {
890 dictEncObjHash, /* hash function */
891 NULL, /* key dup */
892 NULL, /* val dup */
893 dictEncObjKeyCompare, /* key compare */
894 dictRedisObjectDestructor, /* key destructor */
895 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
896 };
897
898 static dictType hashDictType = {
899 dictObjHash, /* hash function */
900 NULL, /* key dup */
901 NULL, /* val dup */
902 dictObjKeyCompare, /* key compare */
903 dictRedisObjectDestructor, /* key destructor */
904 dictRedisObjectDestructor /* val destructor */
905 };
906
907 /* Keylist hash table type has unencoded redis objects as keys and
908 * lists as values. It's used for blocking operations (BLPOP) */
909 static dictType keylistDictType = {
910 dictObjHash, /* hash function */
911 NULL, /* key dup */
912 NULL, /* val dup */
913 dictObjKeyCompare, /* key compare */
914 dictRedisObjectDestructor, /* key destructor */
915 dictListDestructor /* val destructor */
916 };
917
918 /* ========================= Random utility functions ======================= */
919
920 /* Redis generally does not try to recover from out of memory conditions
921 * when allocating objects or strings, it is not clear if it will be possible
922 * to report this condition to the client since the networking layer itself
923 * is based on heap allocation for send buffers, so we simply abort.
924 * At least the code will be simpler to read... */
925 static void oom(const char *msg) {
926 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
927 sleep(1);
928 abort();
929 }
930
931 /* ====================== Redis server networking stuff ===================== */
932 static void closeTimedoutClients(void) {
933 redisClient *c;
934 listNode *ln;
935 time_t now = time(NULL);
936
937 listRewind(server.clients);
938 while ((ln = listYield(server.clients)) != NULL) {
939 c = listNodeValue(ln);
940 if (server.maxidletime &&
941 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
942 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
943 (now - c->lastinteraction > server.maxidletime))
944 {
945 redisLog(REDIS_DEBUG,"Closing idle client");
946 freeClient(c);
947 } else if (c->flags & REDIS_BLOCKED) {
948 if (c->blockingto != 0 && c->blockingto < now) {
949 addReply(c,shared.nullmultibulk);
950 unblockClient(c);
951 }
952 }
953 }
954 }
955
956 static int htNeedsResize(dict *dict) {
957 long long size, used;
958
959 size = dictSlots(dict);
960 used = dictSize(dict);
961 return (size && used && size > DICT_HT_INITIAL_SIZE &&
962 (used*100/size < REDIS_HT_MINFILL));
963 }
964
965 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
966 * we resize the hash table to save memory */
967 static void tryResizeHashTables(void) {
968 int j;
969
970 for (j = 0; j < server.dbnum; j++) {
971 if (htNeedsResize(server.db[j].dict)) {
972 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
973 dictResize(server.db[j].dict);
974 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
975 }
976 if (htNeedsResize(server.db[j].expires))
977 dictResize(server.db[j].expires);
978 }
979 }
980
981 /* A background saving child (BGSAVE) terminated its work. Handle this. */
982 void backgroundSaveDoneHandler(int statloc) {
983 int exitcode = WEXITSTATUS(statloc);
984 int bysignal = WIFSIGNALED(statloc);
985
986 if (!bysignal && exitcode == 0) {
987 redisLog(REDIS_NOTICE,
988 "Background saving terminated with success");
989 server.dirty = 0;
990 server.lastsave = time(NULL);
991 } else if (!bysignal && exitcode != 0) {
992 redisLog(REDIS_WARNING, "Background saving error");
993 } else {
994 redisLog(REDIS_WARNING,
995 "Background saving terminated by signal");
996 rdbRemoveTempFile(server.bgsavechildpid);
997 }
998 server.bgsavechildpid = -1;
999 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1000 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1001 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
1002 }
1003
1004 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1005 * Handle this. */
1006 void backgroundRewriteDoneHandler(int statloc) {
1007 int exitcode = WEXITSTATUS(statloc);
1008 int bysignal = WIFSIGNALED(statloc);
1009
1010 if (!bysignal && exitcode == 0) {
1011 int fd;
1012 char tmpfile[256];
1013
1014 redisLog(REDIS_NOTICE,
1015 "Background append only file rewriting terminated with success");
1016 /* Now it's time to flush the differences accumulated by the parent */
1017 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1018 fd = open(tmpfile,O_WRONLY|O_APPEND);
1019 if (fd == -1) {
1020 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1021 goto cleanup;
1022 }
1023 /* Flush our data... */
1024 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1025 (signed) sdslen(server.bgrewritebuf)) {
1026 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1027 close(fd);
1028 goto cleanup;
1029 }
1030 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1031 /* Now our work is to rename the temp file into the stable file. And
1032 * switch the file descriptor used by the server for append only. */
1033 if (rename(tmpfile,server.appendfilename) == -1) {
1034 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1035 close(fd);
1036 goto cleanup;
1037 }
1038 /* Mission completed... almost */
1039 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1040 if (server.appendfd != -1) {
1041 /* If append only is actually enabled... */
1042 close(server.appendfd);
1043 server.appendfd = fd;
1044 fsync(fd);
1045 server.appendseldb = -1; /* Make sure it will issue SELECT */
1046 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1047 } else {
1048 /* If append only is disabled we just generate a dump in this
1049 * format. Why not? */
1050 close(fd);
1051 }
1052 } else if (!bysignal && exitcode != 0) {
1053 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1054 } else {
1055 redisLog(REDIS_WARNING,
1056 "Background append only file rewriting terminated by signal");
1057 }
1058 cleanup:
1059 sdsfree(server.bgrewritebuf);
1060 server.bgrewritebuf = sdsempty();
1061 aofRemoveTempFile(server.bgrewritechildpid);
1062 server.bgrewritechildpid = -1;
1063 }
1064
1065 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1066 int j, loops = server.cronloops++;
1067 REDIS_NOTUSED(eventLoop);
1068 REDIS_NOTUSED(id);
1069 REDIS_NOTUSED(clientData);
1070
1071 /* We take a cached value of the unix time in the global state because
1072 * with virtual memory and aging there is to store the current time
1073 * in objects at every object access, and accuracy is not needed.
1074 * To access a global var is faster than calling time(NULL) */
1075 server.unixtime = time(NULL);
1076
1077 /* Update the global state with the amount of used memory */
1078 server.usedmemory = zmalloc_used_memory();
1079
1080 /* Show some info about non-empty databases */
1081 for (j = 0; j < server.dbnum; j++) {
1082 long long size, used, vkeys;
1083
1084 size = dictSlots(server.db[j].dict);
1085 used = dictSize(server.db[j].dict);
1086 vkeys = dictSize(server.db[j].expires);
1087 if (!(loops % 5) && (used || vkeys)) {
1088 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1089 /* dictPrintStats(server.dict); */
1090 }
1091 }
1092
1093 /* We don't want to resize the hash tables while a bacground saving
1094 * is in progress: the saving child is created using fork() that is
1095 * implemented with a copy-on-write semantic in most modern systems, so
1096 * if we resize the HT while there is the saving child at work actually
1097 * a lot of memory movements in the parent will cause a lot of pages
1098 * copied. */
1099 if (server.bgsavechildpid == -1) tryResizeHashTables();
1100
1101 /* Show information about connected clients */
1102 if (!(loops % 5)) {
1103 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1104 listLength(server.clients)-listLength(server.slaves),
1105 listLength(server.slaves),
1106 server.usedmemory,
1107 dictSize(server.sharingpool));
1108 }
1109
1110 /* Close connections of timedout clients */
1111 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1112 closeTimedoutClients();
1113
1114 /* Check if a background saving or AOF rewrite in progress terminated */
1115 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1116 int statloc;
1117 pid_t pid;
1118
1119 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1120 if (pid == server.bgsavechildpid) {
1121 backgroundSaveDoneHandler(statloc);
1122 } else {
1123 backgroundRewriteDoneHandler(statloc);
1124 }
1125 }
1126 } else {
1127 /* If there is not a background saving in progress check if
1128 * we have to save now */
1129 time_t now = time(NULL);
1130 for (j = 0; j < server.saveparamslen; j++) {
1131 struct saveparam *sp = server.saveparams+j;
1132
1133 if (server.dirty >= sp->changes &&
1134 now-server.lastsave > sp->seconds) {
1135 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1136 sp->changes, sp->seconds);
1137 rdbSaveBackground(server.dbfilename);
1138 break;
1139 }
1140 }
1141 }
1142
1143 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1144 * will use few CPU cycles if there are few expiring keys, otherwise
1145 * it will get more aggressive to avoid that too much memory is used by
1146 * keys that can be removed from the keyspace. */
1147 for (j = 0; j < server.dbnum; j++) {
1148 int expired;
1149 redisDb *db = server.db+j;
1150
1151 /* Continue to expire if at the end of the cycle more than 25%
1152 * of the keys were expired. */
1153 do {
1154 int num = dictSize(db->expires);
1155 time_t now = time(NULL);
1156
1157 expired = 0;
1158 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1159 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1160 while (num--) {
1161 dictEntry *de;
1162 time_t t;
1163
1164 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1165 t = (time_t) dictGetEntryVal(de);
1166 if (now > t) {
1167 deleteKey(db,dictGetEntryKey(de));
1168 expired++;
1169 }
1170 }
1171 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1172 }
1173
1174 /* Check if we should connect to a MASTER */
1175 if (server.replstate == REDIS_REPL_CONNECT) {
1176 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1177 if (syncWithMaster() == REDIS_OK) {
1178 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1179 }
1180 }
1181 return 1000;
1182 }
1183
1184 static void createSharedObjects(void) {
1185 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1186 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1187 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1188 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1189 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1190 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1191 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1192 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1193 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1194 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1195 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1196 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1197 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1198 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1199 "-ERR no such key\r\n"));
1200 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1201 "-ERR syntax error\r\n"));
1202 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1203 "-ERR source and destination objects are the same\r\n"));
1204 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1205 "-ERR index out of range\r\n"));
1206 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1207 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1208 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1209 shared.select0 = createStringObject("select 0\r\n",10);
1210 shared.select1 = createStringObject("select 1\r\n",10);
1211 shared.select2 = createStringObject("select 2\r\n",10);
1212 shared.select3 = createStringObject("select 3\r\n",10);
1213 shared.select4 = createStringObject("select 4\r\n",10);
1214 shared.select5 = createStringObject("select 5\r\n",10);
1215 shared.select6 = createStringObject("select 6\r\n",10);
1216 shared.select7 = createStringObject("select 7\r\n",10);
1217 shared.select8 = createStringObject("select 8\r\n",10);
1218 shared.select9 = createStringObject("select 9\r\n",10);
1219 }
1220
1221 static void appendServerSaveParams(time_t seconds, int changes) {
1222 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1223 server.saveparams[server.saveparamslen].seconds = seconds;
1224 server.saveparams[server.saveparamslen].changes = changes;
1225 server.saveparamslen++;
1226 }
1227
1228 static void resetServerSaveParams() {
1229 zfree(server.saveparams);
1230 server.saveparams = NULL;
1231 server.saveparamslen = 0;
1232 }
1233
1234 static void initServerConfig() {
1235 server.dbnum = REDIS_DEFAULT_DBNUM;
1236 server.port = REDIS_SERVERPORT;
1237 server.verbosity = REDIS_DEBUG;
1238 server.maxidletime = REDIS_MAXIDLETIME;
1239 server.saveparams = NULL;
1240 server.logfile = NULL; /* NULL = log on standard output */
1241 server.bindaddr = NULL;
1242 server.glueoutputbuf = 1;
1243 server.daemonize = 0;
1244 server.appendonly = 0;
1245 server.appendfsync = APPENDFSYNC_ALWAYS;
1246 server.lastfsync = time(NULL);
1247 server.appendfd = -1;
1248 server.appendseldb = -1; /* Make sure the first time will not match */
1249 server.pidfile = "/var/run/redis.pid";
1250 server.dbfilename = "dump.rdb";
1251 server.appendfilename = "appendonly.aof";
1252 server.requirepass = NULL;
1253 server.shareobjects = 0;
1254 server.rdbcompression = 1;
1255 server.sharingpoolsize = 1024;
1256 server.maxclients = 0;
1257 server.blockedclients = 0;
1258 server.maxmemory = 0;
1259 server.vm_enabled = 0;
1260 server.vm_page_size = 256; /* 256 bytes per page */
1261 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1262 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1263
1264 resetServerSaveParams();
1265
1266 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1267 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1268 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1269 /* Replication related */
1270 server.isslave = 0;
1271 server.masterauth = NULL;
1272 server.masterhost = NULL;
1273 server.masterport = 6379;
1274 server.master = NULL;
1275 server.replstate = REDIS_REPL_NONE;
1276
1277 /* Double constants initialization */
1278 R_Zero = 0.0;
1279 R_PosInf = 1.0/R_Zero;
1280 R_NegInf = -1.0/R_Zero;
1281 R_Nan = R_Zero/R_Zero;
1282 }
1283
1284 static void initServer() {
1285 int j;
1286
1287 signal(SIGHUP, SIG_IGN);
1288 signal(SIGPIPE, SIG_IGN);
1289 setupSigSegvAction();
1290
1291 server.clients = listCreate();
1292 server.slaves = listCreate();
1293 server.monitors = listCreate();
1294 server.objfreelist = listCreate();
1295 createSharedObjects();
1296 server.el = aeCreateEventLoop();
1297 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1298 server.sharingpool = dictCreate(&setDictType,NULL);
1299 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1300 if (server.fd == -1) {
1301 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1302 exit(1);
1303 }
1304 for (j = 0; j < server.dbnum; j++) {
1305 server.db[j].dict = dictCreate(&hashDictType,NULL);
1306 server.db[j].expires = dictCreate(&setDictType,NULL);
1307 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1308 server.db[j].id = j;
1309 }
1310 server.cronloops = 0;
1311 server.bgsavechildpid = -1;
1312 server.bgrewritechildpid = -1;
1313 server.bgrewritebuf = sdsempty();
1314 server.lastsave = time(NULL);
1315 server.dirty = 0;
1316 server.usedmemory = 0;
1317 server.stat_numcommands = 0;
1318 server.stat_numconnections = 0;
1319 server.stat_starttime = time(NULL);
1320 server.unixtime = time(NULL);
1321 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1322
1323 if (server.appendonly) {
1324 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1325 if (server.appendfd == -1) {
1326 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1327 strerror(errno));
1328 exit(1);
1329 }
1330 }
1331
1332 if (server.vm_enabled) vmInit();
1333 }
1334
1335 /* Empty the whole database */
1336 static long long emptyDb() {
1337 int j;
1338 long long removed = 0;
1339
1340 for (j = 0; j < server.dbnum; j++) {
1341 removed += dictSize(server.db[j].dict);
1342 dictEmpty(server.db[j].dict);
1343 dictEmpty(server.db[j].expires);
1344 }
1345 return removed;
1346 }
1347
1348 static int yesnotoi(char *s) {
1349 if (!strcasecmp(s,"yes")) return 1;
1350 else if (!strcasecmp(s,"no")) return 0;
1351 else return -1;
1352 }
1353
1354 /* I agree, this is a very rudimental way to load a configuration...
1355 will improve later if the config gets more complex */
1356 static void loadServerConfig(char *filename) {
1357 FILE *fp;
1358 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1359 int linenum = 0;
1360 sds line = NULL;
1361
1362 if (filename[0] == '-' && filename[1] == '\0')
1363 fp = stdin;
1364 else {
1365 if ((fp = fopen(filename,"r")) == NULL) {
1366 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1367 exit(1);
1368 }
1369 }
1370
1371 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1372 sds *argv;
1373 int argc, j;
1374
1375 linenum++;
1376 line = sdsnew(buf);
1377 line = sdstrim(line," \t\r\n");
1378
1379 /* Skip comments and blank lines*/
1380 if (line[0] == '#' || line[0] == '\0') {
1381 sdsfree(line);
1382 continue;
1383 }
1384
1385 /* Split into arguments */
1386 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1387 sdstolower(argv[0]);
1388
1389 /* Execute config directives */
1390 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1391 server.maxidletime = atoi(argv[1]);
1392 if (server.maxidletime < 0) {
1393 err = "Invalid timeout value"; goto loaderr;
1394 }
1395 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1396 server.port = atoi(argv[1]);
1397 if (server.port < 1 || server.port > 65535) {
1398 err = "Invalid port"; goto loaderr;
1399 }
1400 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1401 server.bindaddr = zstrdup(argv[1]);
1402 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1403 int seconds = atoi(argv[1]);
1404 int changes = atoi(argv[2]);
1405 if (seconds < 1 || changes < 0) {
1406 err = "Invalid save parameters"; goto loaderr;
1407 }
1408 appendServerSaveParams(seconds,changes);
1409 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1410 if (chdir(argv[1]) == -1) {
1411 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1412 argv[1], strerror(errno));
1413 exit(1);
1414 }
1415 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1416 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1417 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1418 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1419 else {
1420 err = "Invalid log level. Must be one of debug, notice, warning";
1421 goto loaderr;
1422 }
1423 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1424 FILE *logfp;
1425
1426 server.logfile = zstrdup(argv[1]);
1427 if (!strcasecmp(server.logfile,"stdout")) {
1428 zfree(server.logfile);
1429 server.logfile = NULL;
1430 }
1431 if (server.logfile) {
1432 /* Test if we are able to open the file. The server will not
1433 * be able to abort just for this problem later... */
1434 logfp = fopen(server.logfile,"a");
1435 if (logfp == NULL) {
1436 err = sdscatprintf(sdsempty(),
1437 "Can't open the log file: %s", strerror(errno));
1438 goto loaderr;
1439 }
1440 fclose(logfp);
1441 }
1442 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1443 server.dbnum = atoi(argv[1]);
1444 if (server.dbnum < 1) {
1445 err = "Invalid number of databases"; goto loaderr;
1446 }
1447 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1448 server.maxclients = atoi(argv[1]);
1449 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1450 server.maxmemory = strtoll(argv[1], NULL, 10);
1451 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1452 server.masterhost = sdsnew(argv[1]);
1453 server.masterport = atoi(argv[2]);
1454 server.replstate = REDIS_REPL_CONNECT;
1455 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1456 server.masterauth = zstrdup(argv[1]);
1457 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1458 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1459 err = "argument must be 'yes' or 'no'"; goto loaderr;
1460 }
1461 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1462 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1463 err = "argument must be 'yes' or 'no'"; goto loaderr;
1464 }
1465 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1466 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1467 err = "argument must be 'yes' or 'no'"; goto loaderr;
1468 }
1469 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1470 server.sharingpoolsize = atoi(argv[1]);
1471 if (server.sharingpoolsize < 1) {
1472 err = "invalid object sharing pool size"; goto loaderr;
1473 }
1474 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1475 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1476 err = "argument must be 'yes' or 'no'"; goto loaderr;
1477 }
1478 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1479 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1480 err = "argument must be 'yes' or 'no'"; goto loaderr;
1481 }
1482 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1483 if (!strcasecmp(argv[1],"no")) {
1484 server.appendfsync = APPENDFSYNC_NO;
1485 } else if (!strcasecmp(argv[1],"always")) {
1486 server.appendfsync = APPENDFSYNC_ALWAYS;
1487 } else if (!strcasecmp(argv[1],"everysec")) {
1488 server.appendfsync = APPENDFSYNC_EVERYSEC;
1489 } else {
1490 err = "argument must be 'no', 'always' or 'everysec'";
1491 goto loaderr;
1492 }
1493 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1494 server.requirepass = zstrdup(argv[1]);
1495 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1496 server.pidfile = zstrdup(argv[1]);
1497 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1498 server.dbfilename = zstrdup(argv[1]);
1499 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1500 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1501 err = "argument must be 'yes' or 'no'"; goto loaderr;
1502 }
1503 } else {
1504 err = "Bad directive or wrong number of arguments"; goto loaderr;
1505 }
1506 for (j = 0; j < argc; j++)
1507 sdsfree(argv[j]);
1508 zfree(argv);
1509 sdsfree(line);
1510 }
1511 if (fp != stdin) fclose(fp);
1512 return;
1513
1514 loaderr:
1515 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1516 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1517 fprintf(stderr, ">>> '%s'\n", line);
1518 fprintf(stderr, "%s\n", err);
1519 exit(1);
1520 }
1521
1522 static void freeClientArgv(redisClient *c) {
1523 int j;
1524
1525 for (j = 0; j < c->argc; j++)
1526 decrRefCount(c->argv[j]);
1527 for (j = 0; j < c->mbargc; j++)
1528 decrRefCount(c->mbargv[j]);
1529 c->argc = 0;
1530 c->mbargc = 0;
1531 }
1532
1533 static void freeClient(redisClient *c) {
1534 listNode *ln;
1535
1536 /* Note that if the client we are freeing is blocked into a blocking
1537 * call, we have to set querybuf to NULL *before* to call unblockClient()
1538 * to avoid processInputBuffer() will get called. Also it is important
1539 * to remove the file events after this, because this call adds
1540 * the READABLE event. */
1541 sdsfree(c->querybuf);
1542 c->querybuf = NULL;
1543 if (c->flags & REDIS_BLOCKED)
1544 unblockClient(c);
1545
1546 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1547 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1548 listRelease(c->reply);
1549 freeClientArgv(c);
1550 close(c->fd);
1551 ln = listSearchKey(server.clients,c);
1552 redisAssert(ln != NULL);
1553 listDelNode(server.clients,ln);
1554 if (c->flags & REDIS_SLAVE) {
1555 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1556 close(c->repldbfd);
1557 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1558 ln = listSearchKey(l,c);
1559 redisAssert(ln != NULL);
1560 listDelNode(l,ln);
1561 }
1562 if (c->flags & REDIS_MASTER) {
1563 server.master = NULL;
1564 server.replstate = REDIS_REPL_CONNECT;
1565 }
1566 zfree(c->argv);
1567 zfree(c->mbargv);
1568 freeClientMultiState(c);
1569 zfree(c);
1570 }
1571
1572 #define GLUEREPLY_UP_TO (1024)
1573 static void glueReplyBuffersIfNeeded(redisClient *c) {
1574 int copylen = 0;
1575 char buf[GLUEREPLY_UP_TO];
1576 listNode *ln;
1577 robj *o;
1578
1579 listRewind(c->reply);
1580 while((ln = listYield(c->reply))) {
1581 int objlen;
1582
1583 o = ln->value;
1584 objlen = sdslen(o->ptr);
1585 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1586 memcpy(buf+copylen,o->ptr,objlen);
1587 copylen += objlen;
1588 listDelNode(c->reply,ln);
1589 } else {
1590 if (copylen == 0) return;
1591 break;
1592 }
1593 }
1594 /* Now the output buffer is empty, add the new single element */
1595 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1596 listAddNodeHead(c->reply,o);
1597 }
1598
1599 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1600 redisClient *c = privdata;
1601 int nwritten = 0, totwritten = 0, objlen;
1602 robj *o;
1603 REDIS_NOTUSED(el);
1604 REDIS_NOTUSED(mask);
1605
1606 /* Use writev() if we have enough buffers to send */
1607 if (!server.glueoutputbuf &&
1608 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1609 !(c->flags & REDIS_MASTER))
1610 {
1611 sendReplyToClientWritev(el, fd, privdata, mask);
1612 return;
1613 }
1614
1615 while(listLength(c->reply)) {
1616 if (server.glueoutputbuf && listLength(c->reply) > 1)
1617 glueReplyBuffersIfNeeded(c);
1618
1619 o = listNodeValue(listFirst(c->reply));
1620 objlen = sdslen(o->ptr);
1621
1622 if (objlen == 0) {
1623 listDelNode(c->reply,listFirst(c->reply));
1624 continue;
1625 }
1626
1627 if (c->flags & REDIS_MASTER) {
1628 /* Don't reply to a master */
1629 nwritten = objlen - c->sentlen;
1630 } else {
1631 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1632 if (nwritten <= 0) break;
1633 }
1634 c->sentlen += nwritten;
1635 totwritten += nwritten;
1636 /* If we fully sent the object on head go to the next one */
1637 if (c->sentlen == objlen) {
1638 listDelNode(c->reply,listFirst(c->reply));
1639 c->sentlen = 0;
1640 }
1641 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1642 * bytes, in a single threaded server it's a good idea to serve
1643 * other clients as well, even if a very large request comes from
1644 * super fast link that is always able to accept data (in real world
1645 * scenario think about 'KEYS *' against the loopback interfae) */
1646 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1647 }
1648 if (nwritten == -1) {
1649 if (errno == EAGAIN) {
1650 nwritten = 0;
1651 } else {
1652 redisLog(REDIS_DEBUG,
1653 "Error writing to client: %s", strerror(errno));
1654 freeClient(c);
1655 return;
1656 }
1657 }
1658 if (totwritten > 0) c->lastinteraction = time(NULL);
1659 if (listLength(c->reply) == 0) {
1660 c->sentlen = 0;
1661 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1662 }
1663 }
1664
1665 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1666 {
1667 redisClient *c = privdata;
1668 int nwritten = 0, totwritten = 0, objlen, willwrite;
1669 robj *o;
1670 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1671 int offset, ion = 0;
1672 REDIS_NOTUSED(el);
1673 REDIS_NOTUSED(mask);
1674
1675 listNode *node;
1676 while (listLength(c->reply)) {
1677 offset = c->sentlen;
1678 ion = 0;
1679 willwrite = 0;
1680
1681 /* fill-in the iov[] array */
1682 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1683 o = listNodeValue(node);
1684 objlen = sdslen(o->ptr);
1685
1686 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1687 break;
1688
1689 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1690 break; /* no more iovecs */
1691
1692 iov[ion].iov_base = ((char*)o->ptr) + offset;
1693 iov[ion].iov_len = objlen - offset;
1694 willwrite += objlen - offset;
1695 offset = 0; /* just for the first item */
1696 ion++;
1697 }
1698
1699 if(willwrite == 0)
1700 break;
1701
1702 /* write all collected blocks at once */
1703 if((nwritten = writev(fd, iov, ion)) < 0) {
1704 if (errno != EAGAIN) {
1705 redisLog(REDIS_DEBUG,
1706 "Error writing to client: %s", strerror(errno));
1707 freeClient(c);
1708 return;
1709 }
1710 break;
1711 }
1712
1713 totwritten += nwritten;
1714 offset = c->sentlen;
1715
1716 /* remove written robjs from c->reply */
1717 while (nwritten && listLength(c->reply)) {
1718 o = listNodeValue(listFirst(c->reply));
1719 objlen = sdslen(o->ptr);
1720
1721 if(nwritten >= objlen - offset) {
1722 listDelNode(c->reply, listFirst(c->reply));
1723 nwritten -= objlen - offset;
1724 c->sentlen = 0;
1725 } else {
1726 /* partial write */
1727 c->sentlen += nwritten;
1728 break;
1729 }
1730 offset = 0;
1731 }
1732 }
1733
1734 if (totwritten > 0)
1735 c->lastinteraction = time(NULL);
1736
1737 if (listLength(c->reply) == 0) {
1738 c->sentlen = 0;
1739 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1740 }
1741 }
1742
1743 static struct redisCommand *lookupCommand(char *name) {
1744 int j = 0;
1745 while(cmdTable[j].name != NULL) {
1746 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1747 j++;
1748 }
1749 return NULL;
1750 }
1751
1752 /* resetClient prepare the client to process the next command */
1753 static void resetClient(redisClient *c) {
1754 freeClientArgv(c);
1755 c->bulklen = -1;
1756 c->multibulk = 0;
1757 }
1758
1759 /* Call() is the core of Redis execution of a command */
1760 static void call(redisClient *c, struct redisCommand *cmd) {
1761 long long dirty;
1762
1763 dirty = server.dirty;
1764 cmd->proc(c);
1765 if (server.appendonly && server.dirty-dirty)
1766 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1767 if (server.dirty-dirty && listLength(server.slaves))
1768 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1769 if (listLength(server.monitors))
1770 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1771 server.stat_numcommands++;
1772 }
1773
1774 /* If this function gets called we already read a whole
1775 * command, argments are in the client argv/argc fields.
1776 * processCommand() execute the command or prepare the
1777 * server for a bulk read from the client.
1778 *
1779 * If 1 is returned the client is still alive and valid and
1780 * and other operations can be performed by the caller. Otherwise
1781 * if 0 is returned the client was destroied (i.e. after QUIT). */
1782 static int processCommand(redisClient *c) {
1783 struct redisCommand *cmd;
1784
1785 /* Free some memory if needed (maxmemory setting) */
1786 if (server.maxmemory) freeMemoryIfNeeded();
1787
1788 /* Handle the multi bulk command type. This is an alternative protocol
1789 * supported by Redis in order to receive commands that are composed of
1790 * multiple binary-safe "bulk" arguments. The latency of processing is
1791 * a bit higher but this allows things like multi-sets, so if this
1792 * protocol is used only for MSET and similar commands this is a big win. */
1793 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1794 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1795 if (c->multibulk <= 0) {
1796 resetClient(c);
1797 return 1;
1798 } else {
1799 decrRefCount(c->argv[c->argc-1]);
1800 c->argc--;
1801 return 1;
1802 }
1803 } else if (c->multibulk) {
1804 if (c->bulklen == -1) {
1805 if (((char*)c->argv[0]->ptr)[0] != '$') {
1806 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1807 resetClient(c);
1808 return 1;
1809 } else {
1810 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1811 decrRefCount(c->argv[0]);
1812 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1813 c->argc--;
1814 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1815 resetClient(c);
1816 return 1;
1817 }
1818 c->argc--;
1819 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1820 return 1;
1821 }
1822 } else {
1823 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1824 c->mbargv[c->mbargc] = c->argv[0];
1825 c->mbargc++;
1826 c->argc--;
1827 c->multibulk--;
1828 if (c->multibulk == 0) {
1829 robj **auxargv;
1830 int auxargc;
1831
1832 /* Here we need to swap the multi-bulk argc/argv with the
1833 * normal argc/argv of the client structure. */
1834 auxargv = c->argv;
1835 c->argv = c->mbargv;
1836 c->mbargv = auxargv;
1837
1838 auxargc = c->argc;
1839 c->argc = c->mbargc;
1840 c->mbargc = auxargc;
1841
1842 /* We need to set bulklen to something different than -1
1843 * in order for the code below to process the command without
1844 * to try to read the last argument of a bulk command as
1845 * a special argument. */
1846 c->bulklen = 0;
1847 /* continue below and process the command */
1848 } else {
1849 c->bulklen = -1;
1850 return 1;
1851 }
1852 }
1853 }
1854 /* -- end of multi bulk commands processing -- */
1855
1856 /* The QUIT command is handled as a special case. Normal command
1857 * procs are unable to close the client connection safely */
1858 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1859 freeClient(c);
1860 return 0;
1861 }
1862 cmd = lookupCommand(c->argv[0]->ptr);
1863 if (!cmd) {
1864 addReplySds(c,
1865 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1866 (char*)c->argv[0]->ptr));
1867 resetClient(c);
1868 return 1;
1869 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1870 (c->argc < -cmd->arity)) {
1871 addReplySds(c,
1872 sdscatprintf(sdsempty(),
1873 "-ERR wrong number of arguments for '%s' command\r\n",
1874 cmd->name));
1875 resetClient(c);
1876 return 1;
1877 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1878 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1879 resetClient(c);
1880 return 1;
1881 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1882 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1883
1884 decrRefCount(c->argv[c->argc-1]);
1885 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1886 c->argc--;
1887 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1888 resetClient(c);
1889 return 1;
1890 }
1891 c->argc--;
1892 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1893 /* It is possible that the bulk read is already in the
1894 * buffer. Check this condition and handle it accordingly.
1895 * This is just a fast path, alternative to call processInputBuffer().
1896 * It's a good idea since the code is small and this condition
1897 * happens most of the times. */
1898 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1899 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1900 c->argc++;
1901 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1902 } else {
1903 return 1;
1904 }
1905 }
1906 /* Let's try to share objects on the command arguments vector */
1907 if (server.shareobjects) {
1908 int j;
1909 for(j = 1; j < c->argc; j++)
1910 c->argv[j] = tryObjectSharing(c->argv[j]);
1911 }
1912 /* Let's try to encode the bulk object to save space. */
1913 if (cmd->flags & REDIS_CMD_BULK)
1914 tryObjectEncoding(c->argv[c->argc-1]);
1915
1916 /* Check if the user is authenticated */
1917 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1918 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1919 resetClient(c);
1920 return 1;
1921 }
1922
1923 /* Exec the command */
1924 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1925 queueMultiCommand(c,cmd);
1926 addReply(c,shared.queued);
1927 } else {
1928 call(c,cmd);
1929 }
1930
1931 /* Prepare the client for the next command */
1932 if (c->flags & REDIS_CLOSE) {
1933 freeClient(c);
1934 return 0;
1935 }
1936 resetClient(c);
1937 return 1;
1938 }
1939
1940 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1941 listNode *ln;
1942 int outc = 0, j;
1943 robj **outv;
1944 /* (args*2)+1 is enough room for args, spaces, newlines */
1945 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1946
1947 if (argc <= REDIS_STATIC_ARGS) {
1948 outv = static_outv;
1949 } else {
1950 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1951 }
1952
1953 for (j = 0; j < argc; j++) {
1954 if (j != 0) outv[outc++] = shared.space;
1955 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1956 robj *lenobj;
1957
1958 lenobj = createObject(REDIS_STRING,
1959 sdscatprintf(sdsempty(),"%lu\r\n",
1960 (unsigned long) stringObjectLen(argv[j])));
1961 lenobj->refcount = 0;
1962 outv[outc++] = lenobj;
1963 }
1964 outv[outc++] = argv[j];
1965 }
1966 outv[outc++] = shared.crlf;
1967
1968 /* Increment all the refcounts at start and decrement at end in order to
1969 * be sure to free objects if there is no slave in a replication state
1970 * able to be feed with commands */
1971 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1972 listRewind(slaves);
1973 while((ln = listYield(slaves))) {
1974 redisClient *slave = ln->value;
1975
1976 /* Don't feed slaves that are still waiting for BGSAVE to start */
1977 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1978
1979 /* Feed all the other slaves, MONITORs and so on */
1980 if (slave->slaveseldb != dictid) {
1981 robj *selectcmd;
1982
1983 switch(dictid) {
1984 case 0: selectcmd = shared.select0; break;
1985 case 1: selectcmd = shared.select1; break;
1986 case 2: selectcmd = shared.select2; break;
1987 case 3: selectcmd = shared.select3; break;
1988 case 4: selectcmd = shared.select4; break;
1989 case 5: selectcmd = shared.select5; break;
1990 case 6: selectcmd = shared.select6; break;
1991 case 7: selectcmd = shared.select7; break;
1992 case 8: selectcmd = shared.select8; break;
1993 case 9: selectcmd = shared.select9; break;
1994 default:
1995 selectcmd = createObject(REDIS_STRING,
1996 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1997 selectcmd->refcount = 0;
1998 break;
1999 }
2000 addReply(slave,selectcmd);
2001 slave->slaveseldb = dictid;
2002 }
2003 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
2004 }
2005 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
2006 if (outv != static_outv) zfree(outv);
2007 }
2008
2009 static void processInputBuffer(redisClient *c) {
2010 again:
2011 /* Before to process the input buffer, make sure the client is not
2012 * waitig for a blocking operation such as BLPOP. Note that the first
2013 * iteration the client is never blocked, otherwise the processInputBuffer
2014 * would not be called at all, but after the execution of the first commands
2015 * in the input buffer the client may be blocked, and the "goto again"
2016 * will try to reiterate. The following line will make it return asap. */
2017 if (c->flags & REDIS_BLOCKED) return;
2018 if (c->bulklen == -1) {
2019 /* Read the first line of the query */
2020 char *p = strchr(c->querybuf,'\n');
2021 size_t querylen;
2022
2023 if (p) {
2024 sds query, *argv;
2025 int argc, j;
2026
2027 query = c->querybuf;
2028 c->querybuf = sdsempty();
2029 querylen = 1+(p-(query));
2030 if (sdslen(query) > querylen) {
2031 /* leave data after the first line of the query in the buffer */
2032 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2033 }
2034 *p = '\0'; /* remove "\n" */
2035 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2036 sdsupdatelen(query);
2037
2038 /* Now we can split the query in arguments */
2039 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2040 sdsfree(query);
2041
2042 if (c->argv) zfree(c->argv);
2043 c->argv = zmalloc(sizeof(robj*)*argc);
2044
2045 for (j = 0; j < argc; j++) {
2046 if (sdslen(argv[j])) {
2047 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2048 c->argc++;
2049 } else {
2050 sdsfree(argv[j]);
2051 }
2052 }
2053 zfree(argv);
2054 if (c->argc) {
2055 /* Execute the command. If the client is still valid
2056 * after processCommand() return and there is something
2057 * on the query buffer try to process the next command. */
2058 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2059 } else {
2060 /* Nothing to process, argc == 0. Just process the query
2061 * buffer if it's not empty or return to the caller */
2062 if (sdslen(c->querybuf)) goto again;
2063 }
2064 return;
2065 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2066 redisLog(REDIS_DEBUG, "Client protocol error");
2067 freeClient(c);
2068 return;
2069 }
2070 } else {
2071 /* Bulk read handling. Note that if we are at this point
2072 the client already sent a command terminated with a newline,
2073 we are reading the bulk data that is actually the last
2074 argument of the command. */
2075 int qbl = sdslen(c->querybuf);
2076
2077 if (c->bulklen <= qbl) {
2078 /* Copy everything but the final CRLF as final argument */
2079 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2080 c->argc++;
2081 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2082 /* Process the command. If the client is still valid after
2083 * the processing and there is more data in the buffer
2084 * try to parse it. */
2085 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2086 return;
2087 }
2088 }
2089 }
2090
2091 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2092 redisClient *c = (redisClient*) privdata;
2093 char buf[REDIS_IOBUF_LEN];
2094 int nread;
2095 REDIS_NOTUSED(el);
2096 REDIS_NOTUSED(mask);
2097
2098 nread = read(fd, buf, REDIS_IOBUF_LEN);
2099 if (nread == -1) {
2100 if (errno == EAGAIN) {
2101 nread = 0;
2102 } else {
2103 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2104 freeClient(c);
2105 return;
2106 }
2107 } else if (nread == 0) {
2108 redisLog(REDIS_DEBUG, "Client closed connection");
2109 freeClient(c);
2110 return;
2111 }
2112 if (nread) {
2113 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2114 c->lastinteraction = time(NULL);
2115 } else {
2116 return;
2117 }
2118 processInputBuffer(c);
2119 }
2120
2121 static int selectDb(redisClient *c, int id) {
2122 if (id < 0 || id >= server.dbnum)
2123 return REDIS_ERR;
2124 c->db = &server.db[id];
2125 return REDIS_OK;
2126 }
2127
2128 static void *dupClientReplyValue(void *o) {
2129 incrRefCount((robj*)o);
2130 return 0;
2131 }
2132
2133 static redisClient *createClient(int fd) {
2134 redisClient *c = zmalloc(sizeof(*c));
2135
2136 anetNonBlock(NULL,fd);
2137 anetTcpNoDelay(NULL,fd);
2138 if (!c) return NULL;
2139 selectDb(c,0);
2140 c->fd = fd;
2141 c->querybuf = sdsempty();
2142 c->argc = 0;
2143 c->argv = NULL;
2144 c->bulklen = -1;
2145 c->multibulk = 0;
2146 c->mbargc = 0;
2147 c->mbargv = NULL;
2148 c->sentlen = 0;
2149 c->flags = 0;
2150 c->lastinteraction = time(NULL);
2151 c->authenticated = 0;
2152 c->replstate = REDIS_REPL_NONE;
2153 c->reply = listCreate();
2154 c->blockingkeys = NULL;
2155 c->blockingkeysnum = 0;
2156 listSetFreeMethod(c->reply,decrRefCount);
2157 listSetDupMethod(c->reply,dupClientReplyValue);
2158 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2159 readQueryFromClient, c) == AE_ERR) {
2160 freeClient(c);
2161 return NULL;
2162 }
2163 listAddNodeTail(server.clients,c);
2164 initClientMultiState(c);
2165 return c;
2166 }
2167
2168 static void addReply(redisClient *c, robj *obj) {
2169 if (listLength(c->reply) == 0 &&
2170 (c->replstate == REDIS_REPL_NONE ||
2171 c->replstate == REDIS_REPL_ONLINE) &&
2172 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2173 sendReplyToClient, c) == AE_ERR) return;
2174 listAddNodeTail(c->reply,getDecodedObject(obj));
2175 }
2176
2177 static void addReplySds(redisClient *c, sds s) {
2178 robj *o = createObject(REDIS_STRING,s);
2179 addReply(c,o);
2180 decrRefCount(o);
2181 }
2182
2183 static void addReplyDouble(redisClient *c, double d) {
2184 char buf[128];
2185
2186 snprintf(buf,sizeof(buf),"%.17g",d);
2187 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2188 (unsigned long) strlen(buf),buf));
2189 }
2190
2191 static void addReplyBulkLen(redisClient *c, robj *obj) {
2192 size_t len;
2193
2194 if (obj->encoding == REDIS_ENCODING_RAW) {
2195 len = sdslen(obj->ptr);
2196 } else {
2197 long n = (long)obj->ptr;
2198
2199 /* Compute how many bytes will take this integer as a radix 10 string */
2200 len = 1;
2201 if (n < 0) {
2202 len++;
2203 n = -n;
2204 }
2205 while((n = n/10) != 0) {
2206 len++;
2207 }
2208 }
2209 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2210 }
2211
2212 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2213 int cport, cfd;
2214 char cip[128];
2215 redisClient *c;
2216 REDIS_NOTUSED(el);
2217 REDIS_NOTUSED(mask);
2218 REDIS_NOTUSED(privdata);
2219
2220 cfd = anetAccept(server.neterr, fd, cip, &cport);
2221 if (cfd == AE_ERR) {
2222 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2223 return;
2224 }
2225 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2226 if ((c = createClient(cfd)) == NULL) {
2227 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2228 close(cfd); /* May be already closed, just ingore errors */
2229 return;
2230 }
2231 /* If maxclient directive is set and this is one client more... close the
2232 * connection. Note that we create the client instead to check before
2233 * for this condition, since now the socket is already set in nonblocking
2234 * mode and we can send an error for free using the Kernel I/O */
2235 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2236 char *err = "-ERR max number of clients reached\r\n";
2237
2238 /* That's a best effort error message, don't check write errors */
2239 if (write(c->fd,err,strlen(err)) == -1) {
2240 /* Nothing to do, Just to avoid the warning... */
2241 }
2242 freeClient(c);
2243 return;
2244 }
2245 server.stat_numconnections++;
2246 }
2247
2248 /* ======================= Redis objects implementation ===================== */
2249
2250 static robj *createObject(int type, void *ptr) {
2251 robj *o;
2252
2253 if (listLength(server.objfreelist)) {
2254 listNode *head = listFirst(server.objfreelist);
2255 o = listNodeValue(head);
2256 listDelNode(server.objfreelist,head);
2257 } else {
2258 if (server.vm_enabled) {
2259 o = zmalloc(sizeof(*o));
2260 } else {
2261 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2262 }
2263 }
2264 o->type = type;
2265 o->encoding = REDIS_ENCODING_RAW;
2266 o->ptr = ptr;
2267 o->refcount = 1;
2268 if (server.vm_enabled) {
2269 o->vm.atime = server.unixtime;
2270 o->storage = REDIS_VM_MEMORY;
2271 }
2272 return o;
2273 }
2274
2275 static robj *createStringObject(char *ptr, size_t len) {
2276 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2277 }
2278
2279 static robj *createListObject(void) {
2280 list *l = listCreate();
2281
2282 listSetFreeMethod(l,decrRefCount);
2283 return createObject(REDIS_LIST,l);
2284 }
2285
2286 static robj *createSetObject(void) {
2287 dict *d = dictCreate(&setDictType,NULL);
2288 return createObject(REDIS_SET,d);
2289 }
2290
2291 static robj *createZsetObject(void) {
2292 zset *zs = zmalloc(sizeof(*zs));
2293
2294 zs->dict = dictCreate(&zsetDictType,NULL);
2295 zs->zsl = zslCreate();
2296 return createObject(REDIS_ZSET,zs);
2297 }
2298
2299 static void freeStringObject(robj *o) {
2300 if (o->encoding == REDIS_ENCODING_RAW) {
2301 sdsfree(o->ptr);
2302 }
2303 }
2304
2305 static void freeListObject(robj *o) {
2306 listRelease((list*) o->ptr);
2307 }
2308
2309 static void freeSetObject(robj *o) {
2310 dictRelease((dict*) o->ptr);
2311 }
2312
2313 static void freeZsetObject(robj *o) {
2314 zset *zs = o->ptr;
2315
2316 dictRelease(zs->dict);
2317 zslFree(zs->zsl);
2318 zfree(zs);
2319 }
2320
2321 static void freeHashObject(robj *o) {
2322 dictRelease((dict*) o->ptr);
2323 }
2324
2325 static void incrRefCount(robj *o) {
2326 assert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
2327 o->refcount++;
2328 }
2329
2330 static void decrRefCount(void *obj) {
2331 robj *o = obj;
2332
2333 /* REDIS_VM_SWAPPED */
2334 if (server.vm_enabled && o->storage == REDIS_VM_SWAPPED) {
2335 assert(o->refcount == 1);
2336 assert(o->type == REDIS_STRING);
2337 freeStringObject(o);
2338 vmMarkPagesFree(o->vm.page,o->vm.usedpages);
2339 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2340 !listAddNodeHead(server.objfreelist,o))
2341 zfree(o);
2342 return;
2343 }
2344 /* REDIS_VM_MEMORY */
2345 if (--(o->refcount) == 0) {
2346 switch(o->type) {
2347 case REDIS_STRING: freeStringObject(o); break;
2348 case REDIS_LIST: freeListObject(o); break;
2349 case REDIS_SET: freeSetObject(o); break;
2350 case REDIS_ZSET: freeZsetObject(o); break;
2351 case REDIS_HASH: freeHashObject(o); break;
2352 default: redisAssert(0 != 0); break;
2353 }
2354 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2355 !listAddNodeHead(server.objfreelist,o))
2356 zfree(o);
2357 }
2358 }
2359
2360 static robj *lookupKey(redisDb *db, robj *key) {
2361 dictEntry *de = dictFind(db->dict,key);
2362 if (de) {
2363 robj *key = dictGetEntryKey(de);
2364 robj *val = dictGetEntryVal(de);
2365
2366 if (server.vm_enabled) {
2367 if (key->storage == REDIS_VM_MEMORY) {
2368 /* Update the access time of the key for the aging algorithm. */
2369 key->vm.atime = server.unixtime;
2370 } else {
2371 /* Our value was swapped on disk. Bring it at home. */
2372 assert(val == NULL);
2373 val = vmLoadObject(key);
2374 dictGetEntryVal(de) = val;
2375 }
2376 }
2377 return val;
2378 } else {
2379 return NULL;
2380 }
2381 }
2382
2383 static robj *lookupKeyRead(redisDb *db, robj *key) {
2384 expireIfNeeded(db,key);
2385 return lookupKey(db,key);
2386 }
2387
2388 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2389 deleteIfVolatile(db,key);
2390 return lookupKey(db,key);
2391 }
2392
2393 static int deleteKey(redisDb *db, robj *key) {
2394 int retval;
2395
2396 /* We need to protect key from destruction: after the first dictDelete()
2397 * it may happen that 'key' is no longer valid if we don't increment
2398 * it's count. This may happen when we get the object reference directly
2399 * from the hash table with dictRandomKey() or dict iterators */
2400 incrRefCount(key);
2401 if (dictSize(db->expires)) dictDelete(db->expires,key);
2402 retval = dictDelete(db->dict,key);
2403 decrRefCount(key);
2404
2405 return retval == DICT_OK;
2406 }
2407
2408 /* Try to share an object against the shared objects pool */
2409 static robj *tryObjectSharing(robj *o) {
2410 struct dictEntry *de;
2411 unsigned long c;
2412
2413 if (o == NULL || server.shareobjects == 0) return o;
2414
2415 redisAssert(o->type == REDIS_STRING);
2416 de = dictFind(server.sharingpool,o);
2417 if (de) {
2418 robj *shared = dictGetEntryKey(de);
2419
2420 c = ((unsigned long) dictGetEntryVal(de))+1;
2421 dictGetEntryVal(de) = (void*) c;
2422 incrRefCount(shared);
2423 decrRefCount(o);
2424 return shared;
2425 } else {
2426 /* Here we are using a stream algorihtm: Every time an object is
2427 * shared we increment its count, everytime there is a miss we
2428 * recrement the counter of a random object. If this object reaches
2429 * zero we remove the object and put the current object instead. */
2430 if (dictSize(server.sharingpool) >=
2431 server.sharingpoolsize) {
2432 de = dictGetRandomKey(server.sharingpool);
2433 redisAssert(de != NULL);
2434 c = ((unsigned long) dictGetEntryVal(de))-1;
2435 dictGetEntryVal(de) = (void*) c;
2436 if (c == 0) {
2437 dictDelete(server.sharingpool,de->key);
2438 }
2439 } else {
2440 c = 0; /* If the pool is empty we want to add this object */
2441 }
2442 if (c == 0) {
2443 int retval;
2444
2445 retval = dictAdd(server.sharingpool,o,(void*)1);
2446 redisAssert(retval == DICT_OK);
2447 incrRefCount(o);
2448 }
2449 return o;
2450 }
2451 }
2452
2453 /* Check if the nul-terminated string 's' can be represented by a long
2454 * (that is, is a number that fits into long without any other space or
2455 * character before or after the digits).
2456 *
2457 * If so, the function returns REDIS_OK and *longval is set to the value
2458 * of the number. Otherwise REDIS_ERR is returned */
2459 static int isStringRepresentableAsLong(sds s, long *longval) {
2460 char buf[32], *endptr;
2461 long value;
2462 int slen;
2463
2464 value = strtol(s, &endptr, 10);
2465 if (endptr[0] != '\0') return REDIS_ERR;
2466 slen = snprintf(buf,32,"%ld",value);
2467
2468 /* If the number converted back into a string is not identical
2469 * then it's not possible to encode the string as integer */
2470 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2471 if (longval) *longval = value;
2472 return REDIS_OK;
2473 }
2474
2475 /* Try to encode a string object in order to save space */
2476 static int tryObjectEncoding(robj *o) {
2477 long value;
2478 sds s = o->ptr;
2479
2480 if (o->encoding != REDIS_ENCODING_RAW)
2481 return REDIS_ERR; /* Already encoded */
2482
2483 /* It's not save to encode shared objects: shared objects can be shared
2484 * everywhere in the "object space" of Redis. Encoded objects can only
2485 * appear as "values" (and not, for instance, as keys) */
2486 if (o->refcount > 1) return REDIS_ERR;
2487
2488 /* Currently we try to encode only strings */
2489 redisAssert(o->type == REDIS_STRING);
2490
2491 /* Check if we can represent this string as a long integer */
2492 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2493
2494 /* Ok, this object can be encoded */
2495 o->encoding = REDIS_ENCODING_INT;
2496 sdsfree(o->ptr);
2497 o->ptr = (void*) value;
2498 return REDIS_OK;
2499 }
2500
2501 /* Get a decoded version of an encoded object (returned as a new object).
2502 * If the object is already raw-encoded just increment the ref count. */
2503 static robj *getDecodedObject(robj *o) {
2504 robj *dec;
2505
2506 if (o->encoding == REDIS_ENCODING_RAW) {
2507 incrRefCount(o);
2508 return o;
2509 }
2510 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2511 char buf[32];
2512
2513 snprintf(buf,32,"%ld",(long)o->ptr);
2514 dec = createStringObject(buf,strlen(buf));
2515 return dec;
2516 } else {
2517 redisAssert(1 != 1);
2518 }
2519 }
2520
2521 /* Compare two string objects via strcmp() or alike.
2522 * Note that the objects may be integer-encoded. In such a case we
2523 * use snprintf() to get a string representation of the numbers on the stack
2524 * and compare the strings, it's much faster than calling getDecodedObject().
2525 *
2526 * Important note: if objects are not integer encoded, but binary-safe strings,
2527 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2528 * binary safe. */
2529 static int compareStringObjects(robj *a, robj *b) {
2530 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2531 char bufa[128], bufb[128], *astr, *bstr;
2532 int bothsds = 1;
2533
2534 if (a == b) return 0;
2535 if (a->encoding != REDIS_ENCODING_RAW) {
2536 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2537 astr = bufa;
2538 bothsds = 0;
2539 } else {
2540 astr = a->ptr;
2541 }
2542 if (b->encoding != REDIS_ENCODING_RAW) {
2543 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2544 bstr = bufb;
2545 bothsds = 0;
2546 } else {
2547 bstr = b->ptr;
2548 }
2549 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2550 }
2551
2552 static size_t stringObjectLen(robj *o) {
2553 redisAssert(o->type == REDIS_STRING);
2554 if (o->encoding == REDIS_ENCODING_RAW) {
2555 return sdslen(o->ptr);
2556 } else {
2557 char buf[32];
2558
2559 return snprintf(buf,32,"%ld",(long)o->ptr);
2560 }
2561 }
2562
2563 /*============================ RDB saving/loading =========================== */
2564
2565 static int rdbSaveType(FILE *fp, unsigned char type) {
2566 if (fwrite(&type,1,1,fp) == 0) return -1;
2567 return 0;
2568 }
2569
2570 static int rdbSaveTime(FILE *fp, time_t t) {
2571 int32_t t32 = (int32_t) t;
2572 if (fwrite(&t32,4,1,fp) == 0) return -1;
2573 return 0;
2574 }
2575
2576 /* check rdbLoadLen() comments for more info */
2577 static int rdbSaveLen(FILE *fp, uint32_t len) {
2578 unsigned char buf[2];
2579
2580 if (len < (1<<6)) {
2581 /* Save a 6 bit len */
2582 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2583 if (fwrite(buf,1,1,fp) == 0) return -1;
2584 } else if (len < (1<<14)) {
2585 /* Save a 14 bit len */
2586 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2587 buf[1] = len&0xFF;
2588 if (fwrite(buf,2,1,fp) == 0) return -1;
2589 } else {
2590 /* Save a 32 bit len */
2591 buf[0] = (REDIS_RDB_32BITLEN<<6);
2592 if (fwrite(buf,1,1,fp) == 0) return -1;
2593 len = htonl(len);
2594 if (fwrite(&len,4,1,fp) == 0) return -1;
2595 }
2596 return 0;
2597 }
2598
2599 /* String objects in the form "2391" "-100" without any space and with a
2600 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2601 * encoded as integers to save space */
2602 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2603 long long value;
2604 char *endptr, buf[32];
2605
2606 /* Check if it's possible to encode this value as a number */
2607 value = strtoll(s, &endptr, 10);
2608 if (endptr[0] != '\0') return 0;
2609 snprintf(buf,32,"%lld",value);
2610
2611 /* If the number converted back into a string is not identical
2612 * then it's not possible to encode the string as integer */
2613 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2614
2615 /* Finally check if it fits in our ranges */
2616 if (value >= -(1<<7) && value <= (1<<7)-1) {
2617 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2618 enc[1] = value&0xFF;
2619 return 2;
2620 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2621 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2622 enc[1] = value&0xFF;
2623 enc[2] = (value>>8)&0xFF;
2624 return 3;
2625 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2626 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2627 enc[1] = value&0xFF;
2628 enc[2] = (value>>8)&0xFF;
2629 enc[3] = (value>>16)&0xFF;
2630 enc[4] = (value>>24)&0xFF;
2631 return 5;
2632 } else {
2633 return 0;
2634 }
2635 }
2636
2637 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2638 unsigned int comprlen, outlen;
2639 unsigned char byte;
2640 void *out;
2641
2642 /* We require at least four bytes compression for this to be worth it */
2643 outlen = sdslen(obj->ptr)-4;
2644 if (outlen <= 0) return 0;
2645 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2646 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2647 if (comprlen == 0) {
2648 zfree(out);
2649 return 0;
2650 }
2651 /* Data compressed! Let's save it on disk */
2652 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2653 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2654 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2655 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2656 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2657 zfree(out);
2658 return comprlen;
2659
2660 writeerr:
2661 zfree(out);
2662 return -1;
2663 }
2664
2665 /* Save a string objet as [len][data] on disk. If the object is a string
2666 * representation of an integer value we try to safe it in a special form */
2667 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2668 size_t len;
2669 int enclen;
2670
2671 len = sdslen(obj->ptr);
2672
2673 /* Try integer encoding */
2674 if (len <= 11) {
2675 unsigned char buf[5];
2676 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2677 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2678 return 0;
2679 }
2680 }
2681
2682 /* Try LZF compression - under 20 bytes it's unable to compress even
2683 * aaaaaaaaaaaaaaaaaa so skip it */
2684 if (server.rdbcompression && len > 20) {
2685 int retval;
2686
2687 retval = rdbSaveLzfStringObject(fp,obj);
2688 if (retval == -1) return -1;
2689 if (retval > 0) return 0;
2690 /* retval == 0 means data can't be compressed, save the old way */
2691 }
2692
2693 /* Store verbatim */
2694 if (rdbSaveLen(fp,len) == -1) return -1;
2695 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2696 return 0;
2697 }
2698
2699 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2700 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2701 int retval;
2702
2703 obj = getDecodedObject(obj);
2704 retval = rdbSaveStringObjectRaw(fp,obj);
2705 decrRefCount(obj);
2706 return retval;
2707 }
2708
2709 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2710 * 8 bit integer specifing the length of the representation.
2711 * This 8 bit integer has special values in order to specify the following
2712 * conditions:
2713 * 253: not a number
2714 * 254: + inf
2715 * 255: - inf
2716 */
2717 static int rdbSaveDoubleValue(FILE *fp, double val) {
2718 unsigned char buf[128];
2719 int len;
2720
2721 if (isnan(val)) {
2722 buf[0] = 253;
2723 len = 1;
2724 } else if (!isfinite(val)) {
2725 len = 1;
2726 buf[0] = (val < 0) ? 255 : 254;
2727 } else {
2728 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2729 buf[0] = strlen((char*)buf+1);
2730 len = buf[0]+1;
2731 }
2732 if (fwrite(buf,len,1,fp) == 0) return -1;
2733 return 0;
2734 }
2735
2736 /* Save a Redis object. */
2737 static int rdbSaveObject(FILE *fp, robj *o) {
2738 if (o->type == REDIS_STRING) {
2739 /* Save a string value */
2740 if (rdbSaveStringObject(fp,o) == -1) return -1;
2741 } else if (o->type == REDIS_LIST) {
2742 /* Save a list value */
2743 list *list = o->ptr;
2744 listNode *ln;
2745
2746 listRewind(list);
2747 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2748 while((ln = listYield(list))) {
2749 robj *eleobj = listNodeValue(ln);
2750
2751 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2752 }
2753 } else if (o->type == REDIS_SET) {
2754 /* Save a set value */
2755 dict *set = o->ptr;
2756 dictIterator *di = dictGetIterator(set);
2757 dictEntry *de;
2758
2759 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2760 while((de = dictNext(di)) != NULL) {
2761 robj *eleobj = dictGetEntryKey(de);
2762
2763 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2764 }
2765 dictReleaseIterator(di);
2766 } else if (o->type == REDIS_ZSET) {
2767 /* Save a set value */
2768 zset *zs = o->ptr;
2769 dictIterator *di = dictGetIterator(zs->dict);
2770 dictEntry *de;
2771
2772 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2773 while((de = dictNext(di)) != NULL) {
2774 robj *eleobj = dictGetEntryKey(de);
2775 double *score = dictGetEntryVal(de);
2776
2777 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2778 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2779 }
2780 dictReleaseIterator(di);
2781 } else {
2782 redisAssert(0 != 0);
2783 }
2784 return 0;
2785 }
2786
2787 /* Return the length the object will have on disk if saved with
2788 * the rdbSaveObject() function. Currently we use a trick to get
2789 * this length with very little changes to the code. In the future
2790 * we could switch to a faster solution. */
2791 static off_t rdbSavedObjectLen(robj *o) {
2792 static FILE *fp = NULL;
2793
2794 if (fp == NULL) fp = fopen("/dev/null","w");
2795 assert(fp != NULL);
2796
2797 rewind(fp);
2798 assert(rdbSaveObject(fp,o) != 1);
2799 return ftello(fp);
2800 }
2801
2802 /* Return the number of pages required to save this object in the swap file */
2803 static off_t rdbSavedObjectPages(robj *o) {
2804 off_t bytes = rdbSavedObjectLen(o);
2805
2806 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2807 }
2808
2809 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2810 static int rdbSave(char *filename) {
2811 dictIterator *di = NULL;
2812 dictEntry *de;
2813 FILE *fp;
2814 char tmpfile[256];
2815 int j;
2816 time_t now = time(NULL);
2817
2818 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2819 fp = fopen(tmpfile,"w");
2820 if (!fp) {
2821 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2822 return REDIS_ERR;
2823 }
2824 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2825 for (j = 0; j < server.dbnum; j++) {
2826 redisDb *db = server.db+j;
2827 dict *d = db->dict;
2828 if (dictSize(d) == 0) continue;
2829 di = dictGetIterator(d);
2830 if (!di) {
2831 fclose(fp);
2832 return REDIS_ERR;
2833 }
2834
2835 /* Write the SELECT DB opcode */
2836 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2837 if (rdbSaveLen(fp,j) == -1) goto werr;
2838
2839 /* Iterate this DB writing every entry */
2840 while((de = dictNext(di)) != NULL) {
2841 robj *key = dictGetEntryKey(de);
2842 robj *o = dictGetEntryVal(de);
2843 time_t expiretime = getExpire(db,key);
2844
2845 /* Save the expire time */
2846 if (expiretime != -1) {
2847 /* If this key is already expired skip it */
2848 if (expiretime < now) continue;
2849 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2850 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2851 }
2852 /* Save the key and associated value */
2853 if (rdbSaveType(fp,o->type) == -1) goto werr;
2854 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2855 /* Save the actual value */
2856 if (rdbSaveObject(fp,o) == -1) goto werr;
2857 }
2858 dictReleaseIterator(di);
2859 }
2860 /* EOF opcode */
2861 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2862
2863 /* Make sure data will not remain on the OS's output buffers */
2864 fflush(fp);
2865 fsync(fileno(fp));
2866 fclose(fp);
2867
2868 /* Use RENAME to make sure the DB file is changed atomically only
2869 * if the generate DB file is ok. */
2870 if (rename(tmpfile,filename) == -1) {
2871 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2872 unlink(tmpfile);
2873 return REDIS_ERR;
2874 }
2875 redisLog(REDIS_NOTICE,"DB saved on disk");
2876 server.dirty = 0;
2877 server.lastsave = time(NULL);
2878 return REDIS_OK;
2879
2880 werr:
2881 fclose(fp);
2882 unlink(tmpfile);
2883 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2884 if (di) dictReleaseIterator(di);
2885 return REDIS_ERR;
2886 }
2887
2888 static int rdbSaveBackground(char *filename) {
2889 pid_t childpid;
2890
2891 if (server.bgsavechildpid != -1) return REDIS_ERR;
2892 if ((childpid = fork()) == 0) {
2893 /* Child */
2894 close(server.fd);
2895 if (rdbSave(filename) == REDIS_OK) {
2896 exit(0);
2897 } else {
2898 exit(1);
2899 }
2900 } else {
2901 /* Parent */
2902 if (childpid == -1) {
2903 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2904 strerror(errno));
2905 return REDIS_ERR;
2906 }
2907 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2908 server.bgsavechildpid = childpid;
2909 return REDIS_OK;
2910 }
2911 return REDIS_OK; /* unreached */
2912 }
2913
2914 static void rdbRemoveTempFile(pid_t childpid) {
2915 char tmpfile[256];
2916
2917 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2918 unlink(tmpfile);
2919 }
2920
2921 static int rdbLoadType(FILE *fp) {
2922 unsigned char type;
2923 if (fread(&type,1,1,fp) == 0) return -1;
2924 return type;
2925 }
2926
2927 static time_t rdbLoadTime(FILE *fp) {
2928 int32_t t32;
2929 if (fread(&t32,4,1,fp) == 0) return -1;
2930 return (time_t) t32;
2931 }
2932
2933 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2934 * of this file for a description of how this are stored on disk.
2935 *
2936 * isencoded is set to 1 if the readed length is not actually a length but
2937 * an "encoding type", check the above comments for more info */
2938 static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
2939 unsigned char buf[2];
2940 uint32_t len;
2941 int type;
2942
2943 if (isencoded) *isencoded = 0;
2944 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2945 type = (buf[0]&0xC0)>>6;
2946 if (type == REDIS_RDB_6BITLEN) {
2947 /* Read a 6 bit len */
2948 return buf[0]&0x3F;
2949 } else if (type == REDIS_RDB_ENCVAL) {
2950 /* Read a 6 bit len encoding type */
2951 if (isencoded) *isencoded = 1;
2952 return buf[0]&0x3F;
2953 } else if (type == REDIS_RDB_14BITLEN) {
2954 /* Read a 14 bit len */
2955 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2956 return ((buf[0]&0x3F)<<8)|buf[1];
2957 } else {
2958 /* Read a 32 bit len */
2959 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2960 return ntohl(len);
2961 }
2962 }
2963
2964 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2965 unsigned char enc[4];
2966 long long val;
2967
2968 if (enctype == REDIS_RDB_ENC_INT8) {
2969 if (fread(enc,1,1,fp) == 0) return NULL;
2970 val = (signed char)enc[0];
2971 } else if (enctype == REDIS_RDB_ENC_INT16) {
2972 uint16_t v;
2973 if (fread(enc,2,1,fp) == 0) return NULL;
2974 v = enc[0]|(enc[1]<<8);
2975 val = (int16_t)v;
2976 } else if (enctype == REDIS_RDB_ENC_INT32) {
2977 uint32_t v;
2978 if (fread(enc,4,1,fp) == 0) return NULL;
2979 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2980 val = (int32_t)v;
2981 } else {
2982 val = 0; /* anti-warning */
2983 redisAssert(0!=0);
2984 }
2985 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2986 }
2987
2988 static robj *rdbLoadLzfStringObject(FILE*fp) {
2989 unsigned int len, clen;
2990 unsigned char *c = NULL;
2991 sds val = NULL;
2992
2993 if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2994 if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2995 if ((c = zmalloc(clen)) == NULL) goto err;
2996 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2997 if (fread(c,clen,1,fp) == 0) goto err;
2998 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2999 zfree(c);
3000 return createObject(REDIS_STRING,val);
3001 err:
3002 zfree(c);
3003 sdsfree(val);
3004 return NULL;
3005 }
3006
3007 static robj *rdbLoadStringObject(FILE*fp) {
3008 int isencoded;
3009 uint32_t len;
3010 sds val;
3011
3012 len = rdbLoadLen(fp,&isencoded);
3013 if (isencoded) {
3014 switch(len) {
3015 case REDIS_RDB_ENC_INT8:
3016 case REDIS_RDB_ENC_INT16:
3017 case REDIS_RDB_ENC_INT32:
3018 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
3019 case REDIS_RDB_ENC_LZF:
3020 return tryObjectSharing(rdbLoadLzfStringObject(fp));
3021 default:
3022 redisAssert(0!=0);
3023 }
3024 }
3025
3026 if (len == REDIS_RDB_LENERR) return NULL;
3027 val = sdsnewlen(NULL,len);
3028 if (len && fread(val,len,1,fp) == 0) {
3029 sdsfree(val);
3030 return NULL;
3031 }
3032 return tryObjectSharing(createObject(REDIS_STRING,val));
3033 }
3034
3035 /* For information about double serialization check rdbSaveDoubleValue() */
3036 static int rdbLoadDoubleValue(FILE *fp, double *val) {
3037 char buf[128];
3038 unsigned char len;
3039
3040 if (fread(&len,1,1,fp) == 0) return -1;
3041 switch(len) {
3042 case 255: *val = R_NegInf; return 0;
3043 case 254: *val = R_PosInf; return 0;
3044 case 253: *val = R_Nan; return 0;
3045 default:
3046 if (fread(buf,len,1,fp) == 0) return -1;
3047 buf[len] = '\0';
3048 sscanf(buf, "%lg", val);
3049 return 0;
3050 }
3051 }
3052
3053 /* Load a Redis object of the specified type from the specified file.
3054 * On success a newly allocated object is returned, otherwise NULL. */
3055 static robj *rdbLoadObject(int type, FILE *fp) {
3056 robj *o;
3057
3058 if (type == REDIS_STRING) {
3059 /* Read string value */
3060 if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
3061 tryObjectEncoding(o);
3062 } else if (type == REDIS_LIST || type == REDIS_SET) {
3063 /* Read list/set value */
3064 uint32_t listlen;
3065
3066 if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3067 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3068 /* Load every single element of the list/set */
3069 while(listlen--) {
3070 robj *ele;
3071
3072 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3073 tryObjectEncoding(ele);
3074 if (type == REDIS_LIST) {
3075 listAddNodeTail((list*)o->ptr,ele);
3076 } else {
3077 dictAdd((dict*)o->ptr,ele,NULL);
3078 }
3079 }
3080 } else if (type == REDIS_ZSET) {
3081 /* Read list/set value */
3082 uint32_t zsetlen;
3083 zset *zs;
3084
3085 if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3086 o = createZsetObject();
3087 zs = o->ptr;
3088 /* Load every single element of the list/set */
3089 while(zsetlen--) {
3090 robj *ele;
3091 double *score = zmalloc(sizeof(double));
3092
3093 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3094 tryObjectEncoding(ele);
3095 if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
3096 dictAdd(zs->dict,ele,score);
3097 zslInsert(zs->zsl,*score,ele);
3098 incrRefCount(ele); /* added to skiplist */
3099 }
3100 } else {
3101 redisAssert(0 != 0);
3102 }
3103 return o;
3104 }
3105
3106 static int rdbLoad(char *filename) {
3107 FILE *fp;
3108 robj *keyobj = NULL;
3109 uint32_t dbid;
3110 int type, retval, rdbver;
3111 dict *d = server.db[0].dict;
3112 redisDb *db = server.db+0;
3113 char buf[1024];
3114 time_t expiretime = -1, now = time(NULL);
3115
3116 fp = fopen(filename,"r");
3117 if (!fp) return REDIS_ERR;
3118 if (fread(buf,9,1,fp) == 0) goto eoferr;
3119 buf[9] = '\0';
3120 if (memcmp(buf,"REDIS",5) != 0) {
3121 fclose(fp);
3122 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3123 return REDIS_ERR;
3124 }
3125 rdbver = atoi(buf+5);
3126 if (rdbver != 1) {
3127 fclose(fp);
3128 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3129 return REDIS_ERR;
3130 }
3131 while(1) {
3132 robj *o;
3133
3134 /* Read type. */
3135 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3136 if (type == REDIS_EXPIRETIME) {
3137 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3138 /* We read the time so we need to read the object type again */
3139 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3140 }
3141 if (type == REDIS_EOF) break;
3142 /* Handle SELECT DB opcode as a special case */
3143 if (type == REDIS_SELECTDB) {
3144 if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
3145 goto eoferr;
3146 if (dbid >= (unsigned)server.dbnum) {
3147 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3148 exit(1);
3149 }
3150 db = server.db+dbid;
3151 d = db->dict;
3152 continue;
3153 }
3154 /* Read key */
3155 if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
3156 /* Read value */
3157 if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
3158 /* Add the new object in the hash table */
3159 retval = dictAdd(d,keyobj,o);
3160 if (retval == DICT_ERR) {
3161 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3162 exit(1);
3163 }
3164 /* Set the expire time if needed */
3165 if (expiretime != -1) {
3166 setExpire(db,keyobj,expiretime);
3167 /* Delete this key if already expired */
3168 if (expiretime < now) deleteKey(db,keyobj);
3169 expiretime = -1;
3170 }
3171 keyobj = o = NULL;
3172 }
3173 fclose(fp);
3174 return REDIS_OK;
3175
3176 eoferr: /* unexpected end of file is handled here with a fatal exit */
3177 if (keyobj) decrRefCount(keyobj);
3178 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3179 exit(1);
3180 return REDIS_ERR; /* Just to avoid warning */
3181 }
3182
3183 /*================================== Commands =============================== */
3184
3185 static void authCommand(redisClient *c) {
3186 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3187 c->authenticated = 1;
3188 addReply(c,shared.ok);
3189 } else {
3190 c->authenticated = 0;
3191 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3192 }
3193 }
3194
3195 static void pingCommand(redisClient *c) {
3196 addReply(c,shared.pong);
3197 }
3198
3199 static void echoCommand(redisClient *c) {
3200 addReplyBulkLen(c,c->argv[1]);
3201 addReply(c,c->argv[1]);
3202 addReply(c,shared.crlf);
3203 }
3204
3205 /*=================================== Strings =============================== */
3206
3207 static void setGenericCommand(redisClient *c, int nx) {
3208 int retval;
3209
3210 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3211 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3212 if (retval == DICT_ERR) {
3213 if (!nx) {
3214 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3215 incrRefCount(c->argv[2]);
3216 } else {
3217 addReply(c,shared.czero);
3218 return;
3219 }
3220 } else {
3221 incrRefCount(c->argv[1]);
3222 incrRefCount(c->argv[2]);
3223 }
3224 server.dirty++;
3225 removeExpire(c->db,c->argv[1]);
3226 addReply(c, nx ? shared.cone : shared.ok);
3227 }
3228
3229 static void setCommand(redisClient *c) {
3230 setGenericCommand(c,0);
3231 }
3232
3233 static void setnxCommand(redisClient *c) {
3234 setGenericCommand(c,1);
3235 }
3236
3237 static int getGenericCommand(redisClient *c) {
3238 robj *o = lookupKeyRead(c->db,c->argv[1]);
3239
3240 if (o == NULL) {
3241 addReply(c,shared.nullbulk);
3242 return REDIS_OK;
3243 } else {
3244 if (o->type != REDIS_STRING) {
3245 addReply(c,shared.wrongtypeerr);
3246 return REDIS_ERR;
3247 } else {
3248 addReplyBulkLen(c,o);
3249 addReply(c,o);
3250 addReply(c,shared.crlf);
3251 return REDIS_OK;
3252 }
3253 }
3254 }
3255
3256 static void getCommand(redisClient *c) {
3257 getGenericCommand(c);
3258 }
3259
3260 static void getsetCommand(redisClient *c) {
3261 if (getGenericCommand(c) == REDIS_ERR) return;
3262 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3263 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3264 } else {
3265 incrRefCount(c->argv[1]);
3266 }
3267 incrRefCount(c->argv[2]);
3268 server.dirty++;
3269 removeExpire(c->db,c->argv[1]);
3270 }
3271
3272 static void mgetCommand(redisClient *c) {
3273 int j;
3274
3275 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3276 for (j = 1; j < c->argc; j++) {
3277 robj *o = lookupKeyRead(c->db,c->argv[j]);
3278 if (o == NULL) {
3279 addReply(c,shared.nullbulk);
3280 } else {
3281 if (o->type != REDIS_STRING) {
3282 addReply(c,shared.nullbulk);
3283 } else {
3284 addReplyBulkLen(c,o);
3285 addReply(c,o);
3286 addReply(c,shared.crlf);
3287 }
3288 }
3289 }
3290 }
3291
3292 static void msetGenericCommand(redisClient *c, int nx) {
3293 int j, busykeys = 0;
3294
3295 if ((c->argc % 2) == 0) {
3296 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3297 return;
3298 }
3299 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3300 * set nothing at all if at least one already key exists. */
3301 if (nx) {
3302 for (j = 1; j < c->argc; j += 2) {
3303 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3304 busykeys++;
3305 }
3306 }
3307 }
3308 if (busykeys) {
3309 addReply(c, shared.czero);
3310 return;
3311 }
3312
3313 for (j = 1; j < c->argc; j += 2) {
3314 int retval;
3315
3316 tryObjectEncoding(c->argv[j+1]);
3317 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3318 if (retval == DICT_ERR) {
3319 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3320 incrRefCount(c->argv[j+1]);
3321 } else {
3322 incrRefCount(c->argv[j]);
3323 incrRefCount(c->argv[j+1]);
3324 }
3325 removeExpire(c->db,c->argv[j]);
3326 }
3327 server.dirty += (c->argc-1)/2;
3328 addReply(c, nx ? shared.cone : shared.ok);
3329 }
3330
3331 static void msetCommand(redisClient *c) {
3332 msetGenericCommand(c,0);
3333 }
3334
3335 static void msetnxCommand(redisClient *c) {
3336 msetGenericCommand(c,1);
3337 }
3338
3339 static void incrDecrCommand(redisClient *c, long long incr) {
3340 long long value;
3341 int retval;
3342 robj *o;
3343
3344 o = lookupKeyWrite(c->db,c->argv[1]);
3345 if (o == NULL) {
3346 value = 0;
3347 } else {
3348 if (o->type != REDIS_STRING) {
3349 value = 0;
3350 } else {
3351 char *eptr;
3352
3353 if (o->encoding == REDIS_ENCODING_RAW)
3354 value = strtoll(o->ptr, &eptr, 10);
3355 else if (o->encoding == REDIS_ENCODING_INT)
3356 value = (long)o->ptr;
3357 else
3358 redisAssert(1 != 1);
3359 }
3360 }
3361
3362 value += incr;
3363 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3364 tryObjectEncoding(o);
3365 retval = dictAdd(c->db->dict,c->argv[1],o);
3366 if (retval == DICT_ERR) {
3367 dictReplace(c->db->dict,c->argv[1],o);
3368 removeExpire(c->db,c->argv[1]);
3369 } else {
3370 incrRefCount(c->argv[1]);
3371 }
3372 server.dirty++;
3373 addReply(c,shared.colon);
3374 addReply(c,o);
3375 addReply(c,shared.crlf);
3376 }
3377
3378 static void incrCommand(redisClient *c) {
3379 incrDecrCommand(c,1);
3380 }
3381
3382 static void decrCommand(redisClient *c) {
3383 incrDecrCommand(c,-1);
3384 }
3385
3386 static void incrbyCommand(redisClient *c) {
3387 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3388 incrDecrCommand(c,incr);
3389 }
3390
3391 static void decrbyCommand(redisClient *c) {
3392 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3393 incrDecrCommand(c,-incr);
3394 }
3395
3396 /* ========================= Type agnostic commands ========================= */
3397
3398 static void delCommand(redisClient *c) {
3399 int deleted = 0, j;
3400
3401 for (j = 1; j < c->argc; j++) {
3402 if (deleteKey(c->db,c->argv[j])) {
3403 server.dirty++;
3404 deleted++;
3405 }
3406 }
3407 switch(deleted) {
3408 case 0:
3409 addReply(c,shared.czero);
3410 break;
3411 case 1:
3412 addReply(c,shared.cone);
3413 break;
3414 default:
3415 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3416 break;
3417 }
3418 }
3419
3420 static void existsCommand(redisClient *c) {
3421 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3422 }
3423
3424 static void selectCommand(redisClient *c) {
3425 int id = atoi(c->argv[1]->ptr);
3426
3427 if (selectDb(c,id) == REDIS_ERR) {
3428 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3429 } else {
3430 addReply(c,shared.ok);
3431 }
3432 }
3433
3434 static void randomkeyCommand(redisClient *c) {
3435 dictEntry *de;
3436
3437 while(1) {
3438 de = dictGetRandomKey(c->db->dict);
3439 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3440 }
3441 if (de == NULL) {
3442 addReply(c,shared.plus);
3443 addReply(c,shared.crlf);
3444 } else {
3445 addReply(c,shared.plus);
3446 addReply(c,dictGetEntryKey(de));
3447 addReply(c,shared.crlf);
3448 }
3449 }
3450
3451 static void keysCommand(redisClient *c) {
3452 dictIterator *di;
3453 dictEntry *de;
3454 sds pattern = c->argv[1]->ptr;
3455 int plen = sdslen(pattern);
3456 unsigned long numkeys = 0, keyslen = 0;
3457 robj *lenobj = createObject(REDIS_STRING,NULL);
3458
3459 di = dictGetIterator(c->db->dict);
3460 addReply(c,lenobj);
3461 decrRefCount(lenobj);
3462 while((de = dictNext(di)) != NULL) {
3463 robj *keyobj = dictGetEntryKey(de);
3464
3465 sds key = keyobj->ptr;
3466 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3467 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3468 if (expireIfNeeded(c->db,keyobj) == 0) {
3469 if (numkeys != 0)
3470 addReply(c,shared.space);
3471 addReply(c,keyobj);
3472 numkeys++;
3473 keyslen += sdslen(key);
3474 }
3475 }
3476 }
3477 dictReleaseIterator(di);
3478 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3479 addReply(c,shared.crlf);
3480 }
3481
3482 static void dbsizeCommand(redisClient *c) {
3483 addReplySds(c,
3484 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3485 }
3486
3487 static void lastsaveCommand(redisClient *c) {
3488 addReplySds(c,
3489 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3490 }
3491
3492 static void typeCommand(redisClient *c) {
3493 robj *o;
3494 char *type;
3495
3496 o = lookupKeyRead(c->db,c->argv[1]);
3497 if (o == NULL) {
3498 type = "+none";
3499 } else {
3500 switch(o->type) {
3501 case REDIS_STRING: type = "+string"; break;
3502 case REDIS_LIST: type = "+list"; break;
3503 case REDIS_SET: type = "+set"; break;
3504 case REDIS_ZSET: type = "+zset"; break;
3505 default: type = "unknown"; break;
3506 }
3507 }
3508 addReplySds(c,sdsnew(type));
3509 addReply(c,shared.crlf);
3510 }
3511
3512 static void saveCommand(redisClient *c) {
3513 if (server.bgsavechildpid != -1) {
3514 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3515 return;
3516 }
3517 if (rdbSave(server.dbfilename) == REDIS_OK) {
3518 addReply(c,shared.ok);
3519 } else {
3520 addReply(c,shared.err);
3521 }
3522 }
3523
3524 static void bgsaveCommand(redisClient *c) {
3525 if (server.bgsavechildpid != -1) {
3526 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3527 return;
3528 }
3529 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3530 char *status = "+Background saving started\r\n";
3531 addReplySds(c,sdsnew(status));
3532 } else {
3533 addReply(c,shared.err);
3534 }
3535 }
3536
3537 static void shutdownCommand(redisClient *c) {
3538 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3539 /* Kill the saving child if there is a background saving in progress.
3540 We want to avoid race conditions, for instance our saving child may
3541 overwrite the synchronous saving did by SHUTDOWN. */
3542 if (server.bgsavechildpid != -1) {
3543 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3544 kill(server.bgsavechildpid,SIGKILL);
3545 rdbRemoveTempFile(server.bgsavechildpid);
3546 }
3547 if (server.appendonly) {
3548 /* Append only file: fsync() the AOF and exit */
3549 fsync(server.appendfd);
3550 exit(0);
3551 } else {
3552 /* Snapshotting. Perform a SYNC SAVE and exit */
3553 if (rdbSave(server.dbfilename) == REDIS_OK) {
3554 if (server.daemonize)
3555 unlink(server.pidfile);
3556 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3557 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3558 exit(0);
3559 } else {
3560 /* Ooops.. error saving! The best we can do is to continue operating.
3561 * Note that if there was a background saving process, in the next
3562 * cron() Redis will be notified that the background saving aborted,
3563 * handling special stuff like slaves pending for synchronization... */
3564 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3565 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3566 }
3567 }
3568 }
3569
3570 static void renameGenericCommand(redisClient *c, int nx) {
3571 robj *o;
3572
3573 /* To use the same key as src and dst is probably an error */
3574 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3575 addReply(c,shared.sameobjecterr);
3576 return;
3577 }
3578
3579 o = lookupKeyWrite(c->db,c->argv[1]);
3580 if (o == NULL) {
3581 addReply(c,shared.nokeyerr);
3582 return;
3583 }
3584 incrRefCount(o);
3585 deleteIfVolatile(c->db,c->argv[2]);
3586 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3587 if (nx) {
3588 decrRefCount(o);
3589 addReply(c,shared.czero);
3590 return;
3591 }
3592 dictReplace(c->db->dict,c->argv[2],o);
3593 } else {
3594 incrRefCount(c->argv[2]);
3595 }
3596 deleteKey(c->db,c->argv[1]);
3597 server.dirty++;
3598 addReply(c,nx ? shared.cone : shared.ok);
3599 }
3600
3601 static void renameCommand(redisClient *c) {
3602 renameGenericCommand(c,0);
3603 }
3604
3605 static void renamenxCommand(redisClient *c) {
3606 renameGenericCommand(c,1);
3607 }
3608
3609 static void moveCommand(redisClient *c) {
3610 robj *o;
3611 redisDb *src, *dst;
3612 int srcid;
3613
3614 /* Obtain source and target DB pointers */
3615 src = c->db;
3616 srcid = c->db->id;
3617 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3618 addReply(c,shared.outofrangeerr);
3619 return;
3620 }
3621 dst = c->db;
3622 selectDb(c,srcid); /* Back to the source DB */
3623
3624 /* If the user is moving using as target the same
3625 * DB as the source DB it is probably an error. */
3626 if (src == dst) {
3627 addReply(c,shared.sameobjecterr);
3628 return;
3629 }
3630
3631 /* Check if the element exists and get a reference */
3632 o = lookupKeyWrite(c->db,c->argv[1]);
3633 if (!o) {
3634 addReply(c,shared.czero);
3635 return;
3636 }
3637
3638 /* Try to add the element to the target DB */
3639 deleteIfVolatile(dst,c->argv[1]);
3640 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3641 addReply(c,shared.czero);
3642 return;
3643 }
3644 incrRefCount(c->argv[1]);
3645 incrRefCount(o);
3646
3647 /* OK! key moved, free the entry in the source DB */
3648 deleteKey(src,c->argv[1]);
3649 server.dirty++;
3650 addReply(c,shared.cone);
3651 }
3652
3653 /* =================================== Lists ================================ */
3654 static void pushGenericCommand(redisClient *c, int where) {
3655 robj *lobj;
3656 list *list;
3657
3658 lobj = lookupKeyWrite(c->db,c->argv[1]);
3659 if (lobj == NULL) {
3660 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3661 addReply(c,shared.ok);
3662 return;
3663 }
3664 lobj = createListObject();
3665 list = lobj->ptr;
3666 if (where == REDIS_HEAD) {
3667 listAddNodeHead(list,c->argv[2]);
3668 } else {
3669 listAddNodeTail(list,c->argv[2]);
3670 }
3671 dictAdd(c->db->dict,c->argv[1],lobj);
3672 incrRefCount(c->argv[1]);
3673 incrRefCount(c->argv[2]);
3674 } else {
3675 if (lobj->type != REDIS_LIST) {
3676 addReply(c,shared.wrongtypeerr);
3677 return;
3678 }
3679 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3680 addReply(c,shared.ok);
3681 return;
3682 }
3683 list = lobj->ptr;
3684 if (where == REDIS_HEAD) {
3685 listAddNodeHead(list,c->argv[2]);
3686 } else {
3687 listAddNodeTail(list,c->argv[2]);
3688 }
3689 incrRefCount(c->argv[2]);
3690 }
3691 server.dirty++;
3692 addReply(c,shared.ok);
3693 }
3694
3695 static void lpushCommand(redisClient *c) {
3696 pushGenericCommand(c,REDIS_HEAD);
3697 }
3698
3699 static void rpushCommand(redisClient *c) {
3700 pushGenericCommand(c,REDIS_TAIL);
3701 }
3702
3703 static void llenCommand(redisClient *c) {
3704 robj *o;
3705 list *l;
3706
3707 o = lookupKeyRead(c->db,c->argv[1]);
3708 if (o == NULL) {
3709 addReply(c,shared.czero);
3710 return;
3711 } else {
3712 if (o->type != REDIS_LIST) {
3713 addReply(c,shared.wrongtypeerr);
3714 } else {
3715 l = o->ptr;
3716 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3717 }
3718 }
3719 }
3720
3721 static void lindexCommand(redisClient *c) {
3722 robj *o;
3723 int index = atoi(c->argv[2]->ptr);
3724
3725 o = lookupKeyRead(c->db,c->argv[1]);
3726 if (o == NULL) {
3727 addReply(c,shared.nullbulk);
3728 } else {
3729 if (o->type != REDIS_LIST) {
3730 addReply(c,shared.wrongtypeerr);
3731 } else {
3732 list *list = o->ptr;
3733 listNode *ln;
3734
3735 ln = listIndex(list, index);
3736 if (ln == NULL) {
3737 addReply(c,shared.nullbulk);
3738 } else {
3739 robj *ele = listNodeValue(ln);
3740 addReplyBulkLen(c,ele);
3741 addReply(c,ele);
3742 addReply(c,shared.crlf);
3743 }
3744 }
3745 }
3746 }
3747
3748 static void lsetCommand(redisClient *c) {
3749 robj *o;
3750 int index = atoi(c->argv[2]->ptr);
3751
3752 o = lookupKeyWrite(c->db,c->argv[1]);
3753 if (o == NULL) {
3754 addReply(c,shared.nokeyerr);
3755 } else {
3756 if (o->type != REDIS_LIST) {
3757 addReply(c,shared.wrongtypeerr);
3758 } else {
3759 list *list = o->ptr;
3760 listNode *ln;
3761
3762 ln = listIndex(list, index);
3763 if (ln == NULL) {
3764 addReply(c,shared.outofrangeerr);
3765 } else {
3766 robj *ele = listNodeValue(ln);
3767
3768 decrRefCount(ele);
3769 listNodeValue(ln) = c->argv[3];
3770 incrRefCount(c->argv[3]);
3771 addReply(c,shared.ok);
3772 server.dirty++;
3773 }
3774 }
3775 }
3776 }
3777
3778 static void popGenericCommand(redisClient *c, int where) {
3779 robj *o;
3780
3781 o = lookupKeyWrite(c->db,c->argv[1]);
3782 if (o == NULL) {
3783 addReply(c,shared.nullbulk);
3784 } else {
3785 if (o->type != REDIS_LIST) {
3786 addReply(c,shared.wrongtypeerr);
3787 } else {
3788 list *list = o->ptr;
3789 listNode *ln;
3790
3791 if (where == REDIS_HEAD)
3792 ln = listFirst(list);
3793 else
3794 ln = listLast(list);
3795
3796 if (ln == NULL) {
3797 addReply(c,shared.nullbulk);
3798 } else {
3799 robj *ele = listNodeValue(ln);
3800 addReplyBulkLen(c,ele);
3801 addReply(c,ele);
3802 addReply(c,shared.crlf);
3803 listDelNode(list,ln);
3804 server.dirty++;
3805 }
3806 }
3807 }
3808 }
3809
3810 static void lpopCommand(redisClient *c) {
3811 popGenericCommand(c,REDIS_HEAD);
3812 }
3813
3814 static void rpopCommand(redisClient *c) {
3815 popGenericCommand(c,REDIS_TAIL);
3816 }
3817
3818 static void lrangeCommand(redisClient *c) {
3819 robj *o;
3820 int start = atoi(c->argv[2]->ptr);
3821 int end = atoi(c->argv[3]->ptr);
3822
3823 o = lookupKeyRead(c->db,c->argv[1]);
3824 if (o == NULL) {
3825 addReply(c,shared.nullmultibulk);
3826 } else {
3827 if (o->type != REDIS_LIST) {
3828 addReply(c,shared.wrongtypeerr);
3829 } else {
3830 list *list = o->ptr;
3831 listNode *ln;
3832 int llen = listLength(list);
3833 int rangelen, j;
3834 robj *ele;
3835
3836 /* convert negative indexes */
3837 if (start < 0) start = llen+start;
3838 if (end < 0) end = llen+end;
3839 if (start < 0) start = 0;
3840 if (end < 0) end = 0;
3841
3842 /* indexes sanity checks */
3843 if (start > end || start >= llen) {
3844 /* Out of range start or start > end result in empty list */
3845 addReply(c,shared.emptymultibulk);
3846 return;
3847 }
3848 if (end >= llen) end = llen-1;
3849 rangelen = (end-start)+1;
3850
3851 /* Return the result in form of a multi-bulk reply */
3852 ln = listIndex(list, start);
3853 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3854 for (j = 0; j < rangelen; j++) {
3855 ele = listNodeValue(ln);
3856 addReplyBulkLen(c,ele);
3857 addReply(c,ele);
3858 addReply(c,shared.crlf);
3859 ln = ln->next;
3860 }
3861 }
3862 }
3863 }
3864
3865 static void ltrimCommand(redisClient *c) {
3866 robj *o;
3867 int start = atoi(c->argv[2]->ptr);
3868 int end = atoi(c->argv[3]->ptr);
3869
3870 o = lookupKeyWrite(c->db,c->argv[1]);
3871 if (o == NULL) {
3872 addReply(c,shared.ok);
3873 } else {
3874 if (o->type != REDIS_LIST) {
3875 addReply(c,shared.wrongtypeerr);
3876 } else {
3877 list *list = o->ptr;
3878 listNode *ln;
3879 int llen = listLength(list);
3880 int j, ltrim, rtrim;
3881
3882 /* convert negative indexes */
3883 if (start < 0) start = llen+start;
3884 if (end < 0) end = llen+end;
3885 if (start < 0) start = 0;
3886 if (end < 0) end = 0;
3887
3888 /* indexes sanity checks */
3889 if (start > end || start >= llen) {
3890 /* Out of range start or start > end result in empty list */
3891 ltrim = llen;
3892 rtrim = 0;
3893 } else {
3894 if (end >= llen) end = llen-1;
3895 ltrim = start;
3896 rtrim = llen-end-1;
3897 }
3898
3899 /* Remove list elements to perform the trim */
3900 for (j = 0; j < ltrim; j++) {
3901 ln = listFirst(list);
3902 listDelNode(list,ln);
3903 }
3904 for (j = 0; j < rtrim; j++) {
3905 ln = listLast(list);
3906 listDelNode(list,ln);
3907 }
3908 server.dirty++;
3909 addReply(c,shared.ok);
3910 }
3911 }
3912 }
3913
3914 static void lremCommand(redisClient *c) {
3915 robj *o;
3916
3917 o = lookupKeyWrite(c->db,c->argv[1]);
3918 if (o == NULL) {
3919 addReply(c,shared.czero);
3920 } else {
3921 if (o->type != REDIS_LIST) {
3922 addReply(c,shared.wrongtypeerr);
3923 } else {
3924 list *list = o->ptr;
3925 listNode *ln, *next;
3926 int toremove = atoi(c->argv[2]->ptr);
3927 int removed = 0;
3928 int fromtail = 0;
3929
3930 if (toremove < 0) {
3931 toremove = -toremove;
3932 fromtail = 1;
3933 }
3934 ln = fromtail ? list->tail : list->head;
3935 while (ln) {
3936 robj *ele = listNodeValue(ln);
3937
3938 next = fromtail ? ln->prev : ln->next;
3939 if (compareStringObjects(ele,c->argv[3]) == 0) {
3940 listDelNode(list,ln);
3941 server.dirty++;
3942 removed++;
3943 if (toremove && removed == toremove) break;
3944 }
3945 ln = next;
3946 }
3947 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3948 }
3949 }
3950 }
3951
3952 /* This is the semantic of this command:
3953 * RPOPLPUSH srclist dstlist:
3954 * IF LLEN(srclist) > 0
3955 * element = RPOP srclist
3956 * LPUSH dstlist element
3957 * RETURN element
3958 * ELSE
3959 * RETURN nil
3960 * END
3961 * END
3962 *
3963 * The idea is to be able to get an element from a list in a reliable way
3964 * since the element is not just returned but pushed against another list
3965 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3966 */
3967 static void rpoplpushcommand(redisClient *c) {
3968 robj *sobj;
3969
3970 sobj = lookupKeyWrite(c->db,c->argv[1]);
3971 if (sobj == NULL) {
3972 addReply(c,shared.nullbulk);
3973 } else {
3974 if (sobj->type != REDIS_LIST) {
3975 addReply(c,shared.wrongtypeerr);
3976 } else {
3977 list *srclist = sobj->ptr;
3978 listNode *ln = listLast(srclist);
3979
3980 if (ln == NULL) {
3981 addReply(c,shared.nullbulk);
3982 } else {
3983 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3984 robj *ele = listNodeValue(ln);
3985 list *dstlist;
3986
3987 if (dobj && dobj->type != REDIS_LIST) {
3988 addReply(c,shared.wrongtypeerr);
3989 return;
3990 }
3991
3992 /* Add the element to the target list (unless it's directly
3993 * passed to some BLPOP-ing client */
3994 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3995 if (dobj == NULL) {
3996 /* Create the list if the key does not exist */
3997 dobj = createListObject();
3998 dictAdd(c->db->dict,c->argv[2],dobj);
3999 incrRefCount(c->argv[2]);
4000 }
4001 dstlist = dobj->ptr;
4002 listAddNodeHead(dstlist,ele);
4003 incrRefCount(ele);
4004 }
4005
4006 /* Send the element to the client as reply as well */
4007 addReplyBulkLen(c,ele);
4008 addReply(c,ele);
4009 addReply(c,shared.crlf);
4010
4011 /* Finally remove the element from the source list */
4012 listDelNode(srclist,ln);
4013 server.dirty++;
4014 }
4015 }
4016 }
4017 }
4018
4019
4020 /* ==================================== Sets ================================ */
4021
4022 static void saddCommand(redisClient *c) {
4023 robj *set;
4024
4025 set = lookupKeyWrite(c->db,c->argv[1]);
4026 if (set == NULL) {
4027 set = createSetObject();
4028 dictAdd(c->db->dict,c->argv[1],set);
4029 incrRefCount(c->argv[1]);
4030 } else {
4031 if (set->type != REDIS_SET) {
4032 addReply(c,shared.wrongtypeerr);
4033 return;
4034 }
4035 }
4036 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
4037 incrRefCount(c->argv[2]);
4038 server.dirty++;
4039 addReply(c,shared.cone);
4040 } else {
4041 addReply(c,shared.czero);
4042 }
4043 }
4044
4045 static void sremCommand(redisClient *c) {
4046 robj *set;
4047
4048 set = lookupKeyWrite(c->db,c->argv[1]);
4049 if (set == NULL) {
4050 addReply(c,shared.czero);
4051 } else {
4052 if (set->type != REDIS_SET) {
4053 addReply(c,shared.wrongtypeerr);
4054 return;
4055 }
4056 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4057 server.dirty++;
4058 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4059 addReply(c,shared.cone);
4060 } else {
4061 addReply(c,shared.czero);
4062 }
4063 }
4064 }
4065
4066 static void smoveCommand(redisClient *c) {
4067 robj *srcset, *dstset;
4068
4069 srcset = lookupKeyWrite(c->db,c->argv[1]);
4070 dstset = lookupKeyWrite(c->db,c->argv[2]);
4071
4072 /* If the source key does not exist return 0, if it's of the wrong type
4073 * raise an error */
4074 if (srcset == NULL || srcset->type != REDIS_SET) {
4075 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4076 return;
4077 }
4078 /* Error if the destination key is not a set as well */
4079 if (dstset && dstset->type != REDIS_SET) {
4080 addReply(c,shared.wrongtypeerr);
4081 return;
4082 }
4083 /* Remove the element from the source set */
4084 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4085 /* Key not found in the src set! return zero */
4086 addReply(c,shared.czero);
4087 return;
4088 }
4089 server.dirty++;
4090 /* Add the element to the destination set */
4091 if (!dstset) {
4092 dstset = createSetObject();
4093 dictAdd(c->db->dict,c->argv[2],dstset);
4094 incrRefCount(c->argv[2]);
4095 }
4096 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4097 incrRefCount(c->argv[3]);
4098 addReply(c,shared.cone);
4099 }
4100
4101 static void sismemberCommand(redisClient *c) {
4102 robj *set;
4103
4104 set = lookupKeyRead(c->db,c->argv[1]);
4105 if (set == NULL) {
4106 addReply(c,shared.czero);
4107 } else {
4108 if (set->type != REDIS_SET) {
4109 addReply(c,shared.wrongtypeerr);
4110 return;
4111 }
4112 if (dictFind(set->ptr,c->argv[2]))
4113 addReply(c,shared.cone);
4114 else
4115 addReply(c,shared.czero);
4116 }
4117 }
4118
4119 static void scardCommand(redisClient *c) {
4120 robj *o;
4121 dict *s;
4122
4123 o = lookupKeyRead(c->db,c->argv[1]);
4124 if (o == NULL) {
4125 addReply(c,shared.czero);
4126 return;
4127 } else {
4128 if (o->type != REDIS_SET) {
4129 addReply(c,shared.wrongtypeerr);
4130 } else {
4131 s = o->ptr;
4132 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4133 dictSize(s)));
4134 }
4135 }
4136 }
4137
4138 static void spopCommand(redisClient *c) {
4139 robj *set;
4140 dictEntry *de;
4141
4142 set = lookupKeyWrite(c->db,c->argv[1]);
4143 if (set == NULL) {
4144 addReply(c,shared.nullbulk);
4145 } else {
4146 if (set->type != REDIS_SET) {
4147 addReply(c,shared.wrongtypeerr);
4148 return;
4149 }
4150 de = dictGetRandomKey(set->ptr);
4151 if (de == NULL) {
4152 addReply(c,shared.nullbulk);
4153 } else {
4154 robj *ele = dictGetEntryKey(de);
4155
4156 addReplyBulkLen(c,ele);
4157 addReply(c,ele);
4158 addReply(c,shared.crlf);
4159 dictDelete(set->ptr,ele);
4160 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4161 server.dirty++;
4162 }
4163 }
4164 }
4165
4166 static void srandmemberCommand(redisClient *c) {
4167 robj *set;
4168 dictEntry *de;
4169
4170 set = lookupKeyRead(c->db,c->argv[1]);
4171 if (set == NULL) {
4172 addReply(c,shared.nullbulk);
4173 } else {
4174 if (set->type != REDIS_SET) {
4175 addReply(c,shared.wrongtypeerr);
4176 return;
4177 }
4178 de = dictGetRandomKey(set->ptr);
4179 if (de == NULL) {
4180 addReply(c,shared.nullbulk);
4181 } else {
4182 robj *ele = dictGetEntryKey(de);
4183
4184 addReplyBulkLen(c,ele);
4185 addReply(c,ele);
4186 addReply(c,shared.crlf);
4187 }
4188 }
4189 }
4190
4191 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4192 dict **d1 = (void*) s1, **d2 = (void*) s2;
4193
4194 return dictSize(*d1)-dictSize(*d2);
4195 }
4196
4197 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4198 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4199 dictIterator *di;
4200 dictEntry *de;
4201 robj *lenobj = NULL, *dstset = NULL;
4202 unsigned long j, cardinality = 0;
4203
4204 for (j = 0; j < setsnum; j++) {
4205 robj *setobj;
4206
4207 setobj = dstkey ?
4208 lookupKeyWrite(c->db,setskeys[j]) :
4209 lookupKeyRead(c->db,setskeys[j]);
4210 if (!setobj) {
4211 zfree(dv);
4212 if (dstkey) {
4213 if (deleteKey(c->db,dstkey))
4214 server.dirty++;
4215 addReply(c,shared.czero);
4216 } else {
4217 addReply(c,shared.nullmultibulk);
4218 }
4219 return;
4220 }
4221 if (setobj->type != REDIS_SET) {
4222 zfree(dv);
4223 addReply(c,shared.wrongtypeerr);
4224 return;
4225 }
4226 dv[j] = setobj->ptr;
4227 }
4228 /* Sort sets from the smallest to largest, this will improve our
4229 * algorithm's performace */
4230 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4231
4232 /* The first thing we should output is the total number of elements...
4233 * since this is a multi-bulk write, but at this stage we don't know
4234 * the intersection set size, so we use a trick, append an empty object
4235 * to the output list and save the pointer to later modify it with the
4236 * right length */
4237 if (!dstkey) {
4238 lenobj = createObject(REDIS_STRING,NULL);
4239 addReply(c,lenobj);
4240 decrRefCount(lenobj);
4241 } else {
4242 /* If we have a target key where to store the resulting set
4243 * create this key with an empty set inside */
4244 dstset = createSetObject();
4245 }
4246
4247 /* Iterate all the elements of the first (smallest) set, and test
4248 * the element against all the other sets, if at least one set does
4249 * not include the element it is discarded */
4250 di = dictGetIterator(dv[0]);
4251
4252 while((de = dictNext(di)) != NULL) {
4253 robj *ele;
4254
4255 for (j = 1; j < setsnum; j++)
4256 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4257 if (j != setsnum)
4258 continue; /* at least one set does not contain the member */
4259 ele = dictGetEntryKey(de);
4260 if (!dstkey) {
4261 addReplyBulkLen(c,ele);
4262 addReply(c,ele);
4263 addReply(c,shared.crlf);
4264 cardinality++;
4265 } else {
4266 dictAdd(dstset->ptr,ele,NULL);
4267 incrRefCount(ele);
4268 }
4269 }
4270 dictReleaseIterator(di);
4271
4272 if (dstkey) {
4273 /* Store the resulting set into the target */
4274 deleteKey(c->db,dstkey);
4275 dictAdd(c->db->dict,dstkey,dstset);
4276 incrRefCount(dstkey);
4277 }
4278
4279 if (!dstkey) {
4280 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4281 } else {
4282 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4283 dictSize((dict*)dstset->ptr)));
4284 server.dirty++;
4285 }
4286 zfree(dv);
4287 }
4288
4289 static void sinterCommand(redisClient *c) {
4290 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4291 }
4292
4293 static void sinterstoreCommand(redisClient *c) {
4294 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4295 }
4296
4297 #define REDIS_OP_UNION 0
4298 #define REDIS_OP_DIFF 1
4299
4300 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4301 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4302 dictIterator *di;
4303 dictEntry *de;
4304 robj *dstset = NULL;
4305 int j, cardinality = 0;
4306
4307 for (j = 0; j < setsnum; j++) {
4308 robj *setobj;
4309
4310 setobj = dstkey ?
4311 lookupKeyWrite(c->db,setskeys[j]) :
4312 lookupKeyRead(c->db,setskeys[j]);
4313 if (!setobj) {
4314 dv[j] = NULL;
4315 continue;
4316 }
4317 if (setobj->type != REDIS_SET) {
4318 zfree(dv);
4319 addReply(c,shared.wrongtypeerr);
4320 return;
4321 }
4322 dv[j] = setobj->ptr;
4323 }
4324
4325 /* We need a temp set object to store our union. If the dstkey
4326 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4327 * this set object will be the resulting object to set into the target key*/
4328 dstset = createSetObject();
4329
4330 /* Iterate all the elements of all the sets, add every element a single
4331 * time to the result set */
4332 for (j = 0; j < setsnum; j++) {
4333 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4334 if (!dv[j]) continue; /* non existing keys are like empty sets */
4335
4336 di = dictGetIterator(dv[j]);
4337
4338 while((de = dictNext(di)) != NULL) {
4339 robj *ele;
4340
4341 /* dictAdd will not add the same element multiple times */
4342 ele = dictGetEntryKey(de);
4343 if (op == REDIS_OP_UNION || j == 0) {
4344 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4345 incrRefCount(ele);
4346 cardinality++;
4347 }
4348 } else if (op == REDIS_OP_DIFF) {
4349 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4350 cardinality--;
4351 }
4352 }
4353 }
4354 dictReleaseIterator(di);
4355
4356 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4357 }
4358
4359 /* Output the content of the resulting set, if not in STORE mode */
4360 if (!dstkey) {
4361 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4362 di = dictGetIterator(dstset->ptr);
4363 while((de = dictNext(di)) != NULL) {
4364 robj *ele;
4365
4366 ele = dictGetEntryKey(de);
4367 addReplyBulkLen(c,ele);
4368 addReply(c,ele);
4369 addReply(c,shared.crlf);
4370 }
4371 dictReleaseIterator(di);
4372 } else {
4373 /* If we have a target key where to store the resulting set
4374 * create this key with the result set inside */
4375 deleteKey(c->db,dstkey);
4376 dictAdd(c->db->dict,dstkey,dstset);
4377 incrRefCount(dstkey);
4378 }
4379
4380 /* Cleanup */
4381 if (!dstkey) {
4382 decrRefCount(dstset);
4383 } else {
4384 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4385 dictSize((dict*)dstset->ptr)));
4386 server.dirty++;
4387 }
4388 zfree(dv);
4389 }
4390
4391 static void sunionCommand(redisClient *c) {
4392 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4393 }
4394
4395 static void sunionstoreCommand(redisClient *c) {
4396 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4397 }
4398
4399 static void sdiffCommand(redisClient *c) {
4400 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4401 }
4402
4403 static void sdiffstoreCommand(redisClient *c) {
4404 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4405 }
4406
4407 /* ==================================== ZSets =============================== */
4408
4409 /* ZSETs are ordered sets using two data structures to hold the same elements
4410 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4411 * data structure.
4412 *
4413 * The elements are added to an hash table mapping Redis objects to scores.
4414 * At the same time the elements are added to a skip list mapping scores
4415 * to Redis objects (so objects are sorted by scores in this "view"). */
4416
4417 /* This skiplist implementation is almost a C translation of the original
4418 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4419 * Alternative to Balanced Trees", modified in three ways:
4420 * a) this implementation allows for repeated values.
4421 * b) the comparison is not just by key (our 'score') but by satellite data.
4422 * c) there is a back pointer, so it's a doubly linked list with the back
4423 * pointers being only at "level 1". This allows to traverse the list
4424 * from tail to head, useful for ZREVRANGE. */
4425
4426 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4427 zskiplistNode *zn = zmalloc(sizeof(*zn));
4428
4429 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4430 zn->score = score;
4431 zn->obj = obj;
4432 return zn;
4433 }
4434
4435 static zskiplist *zslCreate(void) {
4436 int j;
4437 zskiplist *zsl;
4438
4439 zsl = zmalloc(sizeof(*zsl));
4440 zsl->level = 1;
4441 zsl->length = 0;
4442 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4443 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4444 zsl->header->forward[j] = NULL;
4445 zsl->header->backward = NULL;
4446 zsl->tail = NULL;
4447 return zsl;
4448 }
4449
4450 static void zslFreeNode(zskiplistNode *node) {
4451 decrRefCount(node->obj);
4452 zfree(node->forward);
4453 zfree(node);
4454 }
4455
4456 static void zslFree(zskiplist *zsl) {
4457 zskiplistNode *node = zsl->header->forward[0], *next;
4458
4459 zfree(zsl->header->forward);
4460 zfree(zsl->header);
4461 while(node) {
4462 next = node->forward[0];
4463 zslFreeNode(node);
4464 node = next;
4465 }
4466 zfree(zsl);
4467 }
4468
4469 static int zslRandomLevel(void) {
4470 int level = 1;
4471 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4472 level += 1;
4473 return level;
4474 }
4475
4476 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4477 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4478 int i, level;
4479
4480 x = zsl->header;
4481 for (i = zsl->level-1; i >= 0; i--) {
4482 while (x->forward[i] &&
4483 (x->forward[i]->score < score ||
4484 (x->forward[i]->score == score &&
4485 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4486 x = x->forward[i];
4487 update[i] = x;
4488 }
4489 /* we assume the key is not already inside, since we allow duplicated
4490 * scores, and the re-insertion of score and redis object should never
4491 * happpen since the caller of zslInsert() should test in the hash table
4492 * if the element is already inside or not. */
4493 level = zslRandomLevel();
4494 if (level > zsl->level) {
4495 for (i = zsl->level; i < level; i++)
4496 update[i] = zsl->header;
4497 zsl->level = level;
4498 }
4499 x = zslCreateNode(level,score,obj);
4500 for (i = 0; i < level; i++) {
4501 x->forward[i] = update[i]->forward[i];
4502 update[i]->forward[i] = x;
4503 }
4504 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4505 if (x->forward[0])
4506 x->forward[0]->backward = x;
4507 else
4508 zsl->tail = x;
4509 zsl->length++;
4510 }
4511
4512 /* Delete an element with matching score/object from the skiplist. */
4513 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4514 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4515 int i;
4516
4517 x = zsl->header;
4518 for (i = zsl->level-1; i >= 0; i--) {
4519 while (x->forward[i] &&
4520 (x->forward[i]->score < score ||
4521 (x->forward[i]->score == score &&
4522 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4523 x = x->forward[i];
4524 update[i] = x;
4525 }
4526 /* We may have multiple elements with the same score, what we need
4527 * is to find the element with both the right score and object. */
4528 x = x->forward[0];
4529 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4530 for (i = 0; i < zsl->level; i++) {
4531 if (update[i]->forward[i] != x) break;
4532 update[i]->forward[i] = x->forward[i];
4533 }
4534 if (x->forward[0]) {
4535 x->forward[0]->backward = (x->backward == zsl->header) ?
4536 NULL : x->backward;
4537 } else {
4538 zsl->tail = x->backward;
4539 }
4540 zslFreeNode(x);
4541 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4542 zsl->level--;
4543 zsl->length--;
4544 return 1;
4545 } else {
4546 return 0; /* not found */
4547 }
4548 return 0; /* not found */
4549 }
4550
4551 /* Delete all the elements with score between min and max from the skiplist.
4552 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4553 * Note that this function takes the reference to the hash table view of the
4554 * sorted set, in order to remove the elements from the hash table too. */
4555 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4556 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4557 unsigned long removed = 0;
4558 int i;
4559
4560 x = zsl->header;
4561 for (i = zsl->level-1; i >= 0; i--) {
4562 while (x->forward[i] && x->forward[i]->score < min)
4563 x = x->forward[i];
4564 update[i] = x;
4565 }
4566 /* We may have multiple elements with the same score, what we need
4567 * is to find the element with both the right score and object. */
4568 x = x->forward[0];
4569 while (x && x->score <= max) {
4570 zskiplistNode *next;
4571
4572 for (i = 0; i < zsl->level; i++) {
4573 if (update[i]->forward[i] != x) break;
4574 update[i]->forward[i] = x->forward[i];
4575 }
4576 if (x->forward[0]) {
4577 x->forward[0]->backward = (x->backward == zsl->header) ?
4578 NULL : x->backward;
4579 } else {
4580 zsl->tail = x->backward;
4581 }
4582 next = x->forward[0];
4583 dictDelete(dict,x->obj);
4584 zslFreeNode(x);
4585 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4586 zsl->level--;
4587 zsl->length--;
4588 removed++;
4589 x = next;
4590 }
4591 return removed; /* not found */
4592 }
4593
4594 /* Find the first node having a score equal or greater than the specified one.
4595 * Returns NULL if there is no match. */
4596 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4597 zskiplistNode *x;
4598 int i;
4599
4600 x = zsl->header;
4601 for (i = zsl->level-1; i >= 0; i--) {
4602 while (x->forward[i] && x->forward[i]->score < score)
4603 x = x->forward[i];
4604 }
4605 /* We may have multiple elements with the same score, what we need
4606 * is to find the element with both the right score and object. */
4607 return x->forward[0];
4608 }
4609
4610 /* The actual Z-commands implementations */
4611
4612 /* This generic command implements both ZADD and ZINCRBY.
4613 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4614 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4615 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4616 robj *zsetobj;
4617 zset *zs;
4618 double *score;
4619
4620 zsetobj = lookupKeyWrite(c->db,key);
4621 if (zsetobj == NULL) {
4622 zsetobj = createZsetObject();
4623 dictAdd(c->db->dict,key,zsetobj);
4624 incrRefCount(key);
4625 } else {
4626 if (zsetobj->type != REDIS_ZSET) {
4627 addReply(c,shared.wrongtypeerr);
4628 return;
4629 }
4630 }
4631 zs = zsetobj->ptr;
4632
4633 /* Ok now since we implement both ZADD and ZINCRBY here the code
4634 * needs to handle the two different conditions. It's all about setting
4635 * '*score', that is, the new score to set, to the right value. */
4636 score = zmalloc(sizeof(double));
4637 if (doincrement) {
4638 dictEntry *de;
4639
4640 /* Read the old score. If the element was not present starts from 0 */
4641 de = dictFind(zs->dict,ele);
4642 if (de) {
4643 double *oldscore = dictGetEntryVal(de);
4644 *score = *oldscore + scoreval;
4645 } else {
4646 *score = scoreval;
4647 }
4648 } else {
4649 *score = scoreval;
4650 }
4651
4652 /* What follows is a simple remove and re-insert operation that is common
4653 * to both ZADD and ZINCRBY... */
4654 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4655 /* case 1: New element */
4656 incrRefCount(ele); /* added to hash */
4657 zslInsert(zs->zsl,*score,ele);
4658 incrRefCount(ele); /* added to skiplist */
4659 server.dirty++;
4660 if (doincrement)
4661 addReplyDouble(c,*score);
4662 else
4663 addReply(c,shared.cone);
4664 } else {
4665 dictEntry *de;
4666 double *oldscore;
4667
4668 /* case 2: Score update operation */
4669 de = dictFind(zs->dict,ele);
4670 redisAssert(de != NULL);
4671 oldscore = dictGetEntryVal(de);
4672 if (*score != *oldscore) {
4673 int deleted;
4674
4675 /* Remove and insert the element in the skip list with new score */
4676 deleted = zslDelete(zs->zsl,*oldscore,ele);
4677 redisAssert(deleted != 0);
4678 zslInsert(zs->zsl,*score,ele);
4679 incrRefCount(ele);
4680 /* Update the score in the hash table */
4681 dictReplace(zs->dict,ele,score);
4682 server.dirty++;
4683 } else {
4684 zfree(score);
4685 }
4686 if (doincrement)
4687 addReplyDouble(c,*score);
4688 else
4689 addReply(c,shared.czero);
4690 }
4691 }
4692
4693 static void zaddCommand(redisClient *c) {
4694 double scoreval;
4695
4696 scoreval = strtod(c->argv[2]->ptr,NULL);
4697 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4698 }
4699
4700 static void zincrbyCommand(redisClient *c) {
4701 double scoreval;
4702
4703 scoreval = strtod(c->argv[2]->ptr,NULL);
4704 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4705 }
4706
4707 static void zremCommand(redisClient *c) {
4708 robj *zsetobj;
4709 zset *zs;
4710
4711 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4712 if (zsetobj == NULL) {
4713 addReply(c,shared.czero);
4714 } else {
4715 dictEntry *de;
4716 double *oldscore;
4717 int deleted;
4718
4719 if (zsetobj->type != REDIS_ZSET) {
4720 addReply(c,shared.wrongtypeerr);
4721 return;
4722 }
4723 zs = zsetobj->ptr;
4724 de = dictFind(zs->dict,c->argv[2]);
4725 if (de == NULL) {
4726 addReply(c,shared.czero);
4727 return;
4728 }
4729 /* Delete from the skiplist */
4730 oldscore = dictGetEntryVal(de);
4731 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4732 redisAssert(deleted != 0);
4733
4734 /* Delete from the hash table */
4735 dictDelete(zs->dict,c->argv[2]);
4736 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4737 server.dirty++;
4738 addReply(c,shared.cone);
4739 }
4740 }
4741
4742 static void zremrangebyscoreCommand(redisClient *c) {
4743 double min = strtod(c->argv[2]->ptr,NULL);
4744 double max = strtod(c->argv[3]->ptr,NULL);
4745 robj *zsetobj;
4746 zset *zs;
4747
4748 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4749 if (zsetobj == NULL) {
4750 addReply(c,shared.czero);
4751 } else {
4752 long deleted;
4753
4754 if (zsetobj->type != REDIS_ZSET) {
4755 addReply(c,shared.wrongtypeerr);
4756 return;
4757 }
4758 zs = zsetobj->ptr;
4759 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4760 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4761 server.dirty += deleted;
4762 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4763 }
4764 }
4765
4766 static void zrangeGenericCommand(redisClient *c, int reverse) {
4767 robj *o;
4768 int start = atoi(c->argv[2]->ptr);
4769 int end = atoi(c->argv[3]->ptr);
4770 int withscores = 0;
4771
4772 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4773 withscores = 1;
4774 } else if (c->argc >= 5) {
4775 addReply(c,shared.syntaxerr);
4776 return;
4777 }
4778
4779 o = lookupKeyRead(c->db,c->argv[1]);
4780 if (o == NULL) {
4781 addReply(c,shared.nullmultibulk);
4782 } else {
4783 if (o->type != REDIS_ZSET) {
4784 addReply(c,shared.wrongtypeerr);
4785 } else {
4786 zset *zsetobj = o->ptr;
4787 zskiplist *zsl = zsetobj->zsl;
4788 zskiplistNode *ln;
4789
4790 int llen = zsl->length;
4791 int rangelen, j;
4792 robj *ele;
4793
4794 /* convert negative indexes */
4795 if (start < 0) start = llen+start;
4796 if (end < 0) end = llen+end;
4797 if (start < 0) start = 0;
4798 if (end < 0) end = 0;
4799
4800 /* indexes sanity checks */
4801 if (start > end || start >= llen) {
4802 /* Out of range start or start > end result in empty list */
4803 addReply(c,shared.emptymultibulk);
4804 return;
4805 }
4806 if (end >= llen) end = llen-1;
4807 rangelen = (end-start)+1;
4808
4809 /* Return the result in form of a multi-bulk reply */
4810 if (reverse) {
4811 ln = zsl->tail;
4812 while (start--)
4813 ln = ln->backward;
4814 } else {
4815 ln = zsl->header->forward[0];
4816 while (start--)
4817 ln = ln->forward[0];
4818 }
4819
4820 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4821 withscores ? (rangelen*2) : rangelen));
4822 for (j = 0; j < rangelen; j++) {
4823 ele = ln->obj;
4824 addReplyBulkLen(c,ele);
4825 addReply(c,ele);
4826 addReply(c,shared.crlf);
4827 if (withscores)
4828 addReplyDouble(c,ln->score);
4829 ln = reverse ? ln->backward : ln->forward[0];
4830 }
4831 }
4832 }
4833 }
4834
4835 static void zrangeCommand(redisClient *c) {
4836 zrangeGenericCommand(c,0);
4837 }
4838
4839 static void zrevrangeCommand(redisClient *c) {
4840 zrangeGenericCommand(c,1);
4841 }
4842
4843 static void zrangebyscoreCommand(redisClient *c) {
4844 robj *o;
4845 double min = strtod(c->argv[2]->ptr,NULL);
4846 double max = strtod(c->argv[3]->ptr,NULL);
4847 int offset = 0, limit = -1;
4848
4849 if (c->argc != 4 && c->argc != 7) {
4850 addReplySds(c,
4851 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4852 return;
4853 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4854 addReply(c,shared.syntaxerr);
4855 return;
4856 } else if (c->argc == 7) {
4857 offset = atoi(c->argv[5]->ptr);
4858 limit = atoi(c->argv[6]->ptr);
4859 if (offset < 0) offset = 0;
4860 }
4861
4862 o = lookupKeyRead(c->db,c->argv[1]);
4863 if (o == NULL) {
4864 addReply(c,shared.nullmultibulk);
4865 } else {
4866 if (o->type != REDIS_ZSET) {
4867 addReply(c,shared.wrongtypeerr);
4868 } else {
4869 zset *zsetobj = o->ptr;
4870 zskiplist *zsl = zsetobj->zsl;
4871 zskiplistNode *ln;
4872 robj *ele, *lenobj;
4873 unsigned int rangelen = 0;
4874
4875 /* Get the first node with the score >= min */
4876 ln = zslFirstWithScore(zsl,min);
4877 if (ln == NULL) {
4878 /* No element matching the speciifed interval */
4879 addReply(c,shared.emptymultibulk);
4880 return;
4881 }
4882
4883 /* We don't know in advance how many matching elements there
4884 * are in the list, so we push this object that will represent
4885 * the multi-bulk length in the output buffer, and will "fix"
4886 * it later */
4887 lenobj = createObject(REDIS_STRING,NULL);
4888 addReply(c,lenobj);
4889 decrRefCount(lenobj);
4890
4891 while(ln && ln->score <= max) {
4892 if (offset) {
4893 offset--;
4894 ln = ln->forward[0];
4895 continue;
4896 }
4897 if (limit == 0) break;
4898 ele = ln->obj;
4899 addReplyBulkLen(c,ele);
4900 addReply(c,ele);
4901 addReply(c,shared.crlf);
4902 ln = ln->forward[0];
4903 rangelen++;
4904 if (limit > 0) limit--;
4905 }
4906 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4907 }
4908 }
4909 }
4910
4911 static void zcardCommand(redisClient *c) {
4912 robj *o;
4913 zset *zs;
4914
4915 o = lookupKeyRead(c->db,c->argv[1]);
4916 if (o == NULL) {
4917 addReply(c,shared.czero);
4918 return;
4919 } else {
4920 if (o->type != REDIS_ZSET) {
4921 addReply(c,shared.wrongtypeerr);
4922 } else {
4923 zs = o->ptr;
4924 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4925 }
4926 }
4927 }
4928
4929 static void zscoreCommand(redisClient *c) {
4930 robj *o;
4931 zset *zs;
4932
4933 o = lookupKeyRead(c->db,c->argv[1]);
4934 if (o == NULL) {
4935 addReply(c,shared.nullbulk);
4936 return;
4937 } else {
4938 if (o->type != REDIS_ZSET) {
4939 addReply(c,shared.wrongtypeerr);
4940 } else {
4941 dictEntry *de;
4942
4943 zs = o->ptr;
4944 de = dictFind(zs->dict,c->argv[2]);
4945 if (!de) {
4946 addReply(c,shared.nullbulk);
4947 } else {
4948 double *score = dictGetEntryVal(de);
4949
4950 addReplyDouble(c,*score);
4951 }
4952 }
4953 }
4954 }
4955
4956 /* ========================= Non type-specific commands ==================== */
4957
4958 static void flushdbCommand(redisClient *c) {
4959 server.dirty += dictSize(c->db->dict);
4960 dictEmpty(c->db->dict);
4961 dictEmpty(c->db->expires);
4962 addReply(c,shared.ok);
4963 }
4964
4965 static void flushallCommand(redisClient *c) {
4966 server.dirty += emptyDb();
4967 addReply(c,shared.ok);
4968 rdbSave(server.dbfilename);
4969 server.dirty++;
4970 }
4971
4972 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4973 redisSortOperation *so = zmalloc(sizeof(*so));
4974 so->type = type;
4975 so->pattern = pattern;
4976 return so;
4977 }
4978
4979 /* Return the value associated to the key with a name obtained
4980 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4981 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4982 char *p;
4983 sds spat, ssub;
4984 robj keyobj;
4985 int prefixlen, sublen, postfixlen;
4986 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4987 struct {
4988 long len;
4989 long free;
4990 char buf[REDIS_SORTKEY_MAX+1];
4991 } keyname;
4992
4993 /* If the pattern is "#" return the substitution object itself in order
4994 * to implement the "SORT ... GET #" feature. */
4995 spat = pattern->ptr;
4996 if (spat[0] == '#' && spat[1] == '\0') {
4997 return subst;
4998 }
4999
5000 /* The substitution object may be specially encoded. If so we create
5001 * a decoded object on the fly. Otherwise getDecodedObject will just
5002 * increment the ref count, that we'll decrement later. */
5003 subst = getDecodedObject(subst);
5004
5005 ssub = subst->ptr;
5006 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
5007 p = strchr(spat,'*');
5008 if (!p) {
5009 decrRefCount(subst);
5010 return NULL;
5011 }
5012
5013 prefixlen = p-spat;
5014 sublen = sdslen(ssub);
5015 postfixlen = sdslen(spat)-(prefixlen+1);
5016 memcpy(keyname.buf,spat,prefixlen);
5017 memcpy(keyname.buf+prefixlen,ssub,sublen);
5018 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
5019 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
5020 keyname.len = prefixlen+sublen+postfixlen;
5021
5022 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
5023 decrRefCount(subst);
5024
5025 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5026 return lookupKeyRead(db,&keyobj);
5027 }
5028
5029 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5030 * the additional parameter is not standard but a BSD-specific we have to
5031 * pass sorting parameters via the global 'server' structure */
5032 static int sortCompare(const void *s1, const void *s2) {
5033 const redisSortObject *so1 = s1, *so2 = s2;
5034 int cmp;
5035
5036 if (!server.sort_alpha) {
5037 /* Numeric sorting. Here it's trivial as we precomputed scores */
5038 if (so1->u.score > so2->u.score) {
5039 cmp = 1;
5040 } else if (so1->u.score < so2->u.score) {
5041 cmp = -1;
5042 } else {
5043 cmp = 0;
5044 }
5045 } else {
5046 /* Alphanumeric sorting */
5047 if (server.sort_bypattern) {
5048 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5049 /* At least one compare object is NULL */
5050 if (so1->u.cmpobj == so2->u.cmpobj)
5051 cmp = 0;
5052 else if (so1->u.cmpobj == NULL)
5053 cmp = -1;
5054 else
5055 cmp = 1;
5056 } else {
5057 /* We have both the objects, use strcoll */
5058 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5059 }
5060 } else {
5061 /* Compare elements directly */
5062 robj *dec1, *dec2;
5063
5064 dec1 = getDecodedObject(so1->obj);
5065 dec2 = getDecodedObject(so2->obj);
5066 cmp = strcoll(dec1->ptr,dec2->ptr);
5067 decrRefCount(dec1);
5068 decrRefCount(dec2);
5069 }
5070 }
5071 return server.sort_desc ? -cmp : cmp;
5072 }
5073
5074 /* The SORT command is the most complex command in Redis. Warning: this code
5075 * is optimized for speed and a bit less for readability */
5076 static void sortCommand(redisClient *c) {
5077 list *operations;
5078 int outputlen = 0;
5079 int desc = 0, alpha = 0;
5080 int limit_start = 0, limit_count = -1, start, end;
5081 int j, dontsort = 0, vectorlen;
5082 int getop = 0; /* GET operation counter */
5083 robj *sortval, *sortby = NULL, *storekey = NULL;
5084 redisSortObject *vector; /* Resulting vector to sort */
5085
5086 /* Lookup the key to sort. It must be of the right types */
5087 sortval = lookupKeyRead(c->db,c->argv[1]);
5088 if (sortval == NULL) {
5089 addReply(c,shared.nullmultibulk);
5090 return;
5091 }
5092 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5093 sortval->type != REDIS_ZSET)
5094 {
5095 addReply(c,shared.wrongtypeerr);
5096 return;
5097 }
5098
5099 /* Create a list of operations to perform for every sorted element.
5100 * Operations can be GET/DEL/INCR/DECR */
5101 operations = listCreate();
5102 listSetFreeMethod(operations,zfree);
5103 j = 2;
5104
5105 /* Now we need to protect sortval incrementing its count, in the future
5106 * SORT may have options able to overwrite/delete keys during the sorting
5107 * and the sorted key itself may get destroied */
5108 incrRefCount(sortval);
5109
5110 /* The SORT command has an SQL-alike syntax, parse it */
5111 while(j < c->argc) {
5112 int leftargs = c->argc-j-1;
5113 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5114 desc = 0;
5115 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5116 desc = 1;
5117 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5118 alpha = 1;
5119 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5120 limit_start = atoi(c->argv[j+1]->ptr);
5121 limit_count = atoi(c->argv[j+2]->ptr);
5122 j+=2;
5123 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5124 storekey = c->argv[j+1];
5125 j++;
5126 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5127 sortby = c->argv[j+1];
5128 /* If the BY pattern does not contain '*', i.e. it is constant,
5129 * we don't need to sort nor to lookup the weight keys. */
5130 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5131 j++;
5132 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5133 listAddNodeTail(operations,createSortOperation(
5134 REDIS_SORT_GET,c->argv[j+1]));
5135 getop++;
5136 j++;
5137 } else {
5138 decrRefCount(sortval);
5139 listRelease(operations);
5140 addReply(c,shared.syntaxerr);
5141 return;
5142 }
5143 j++;
5144 }
5145
5146 /* Load the sorting vector with all the objects to sort */
5147 switch(sortval->type) {
5148 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5149 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5150 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5151 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5152 }
5153 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5154 j = 0;
5155
5156 if (sortval->type == REDIS_LIST) {
5157 list *list = sortval->ptr;
5158 listNode *ln;
5159
5160 listRewind(list);
5161 while((ln = listYield(list))) {
5162 robj *ele = ln->value;
5163 vector[j].obj = ele;
5164 vector[j].u.score = 0;
5165 vector[j].u.cmpobj = NULL;
5166 j++;
5167 }
5168 } else {
5169 dict *set;
5170 dictIterator *di;
5171 dictEntry *setele;
5172
5173 if (sortval->type == REDIS_SET) {
5174 set = sortval->ptr;
5175 } else {
5176 zset *zs = sortval->ptr;
5177 set = zs->dict;
5178 }
5179
5180 di = dictGetIterator(set);
5181 while((setele = dictNext(di)) != NULL) {
5182 vector[j].obj = dictGetEntryKey(setele);
5183 vector[j].u.score = 0;
5184 vector[j].u.cmpobj = NULL;
5185 j++;
5186 }
5187 dictReleaseIterator(di);
5188 }
5189 redisAssert(j == vectorlen);
5190
5191 /* Now it's time to load the right scores in the sorting vector */
5192 if (dontsort == 0) {
5193 for (j = 0; j < vectorlen; j++) {
5194 if (sortby) {
5195 robj *byval;
5196
5197 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5198 if (!byval || byval->type != REDIS_STRING) continue;
5199 if (alpha) {
5200 vector[j].u.cmpobj = getDecodedObject(byval);
5201 } else {
5202 if (byval->encoding == REDIS_ENCODING_RAW) {
5203 vector[j].u.score = strtod(byval->ptr,NULL);
5204 } else {
5205 /* Don't need to decode the object if it's
5206 * integer-encoded (the only encoding supported) so
5207 * far. We can just cast it */
5208 if (byval->encoding == REDIS_ENCODING_INT) {
5209 vector[j].u.score = (long)byval->ptr;
5210 } else
5211 redisAssert(1 != 1);
5212 }
5213 }
5214 } else {
5215 if (!alpha) {
5216 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5217 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5218 else {
5219 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5220 vector[j].u.score = (long) vector[j].obj->ptr;
5221 else
5222 redisAssert(1 != 1);
5223 }
5224 }
5225 }
5226 }
5227 }
5228
5229 /* We are ready to sort the vector... perform a bit of sanity check
5230 * on the LIMIT option too. We'll use a partial version of quicksort. */
5231 start = (limit_start < 0) ? 0 : limit_start;
5232 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5233 if (start >= vectorlen) {
5234 start = vectorlen-1;
5235 end = vectorlen-2;
5236 }
5237 if (end >= vectorlen) end = vectorlen-1;
5238
5239 if (dontsort == 0) {
5240 server.sort_desc = desc;
5241 server.sort_alpha = alpha;
5242 server.sort_bypattern = sortby ? 1 : 0;
5243 if (sortby && (start != 0 || end != vectorlen-1))
5244 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5245 else
5246 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5247 }
5248
5249 /* Send command output to the output buffer, performing the specified
5250 * GET/DEL/INCR/DECR operations if any. */
5251 outputlen = getop ? getop*(end-start+1) : end-start+1;
5252 if (storekey == NULL) {
5253 /* STORE option not specified, sent the sorting result to client */
5254 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5255 for (j = start; j <= end; j++) {
5256 listNode *ln;
5257 if (!getop) {
5258 addReplyBulkLen(c,vector[j].obj);
5259 addReply(c,vector[j].obj);
5260 addReply(c,shared.crlf);
5261 }
5262 listRewind(operations);
5263 while((ln = listYield(operations))) {
5264 redisSortOperation *sop = ln->value;
5265 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5266 vector[j].obj);
5267
5268 if (sop->type == REDIS_SORT_GET) {
5269 if (!val || val->type != REDIS_STRING) {
5270 addReply(c,shared.nullbulk);
5271 } else {
5272 addReplyBulkLen(c,val);
5273 addReply(c,val);
5274 addReply(c,shared.crlf);
5275 }
5276 } else {
5277 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5278 }
5279 }
5280 }
5281 } else {
5282 robj *listObject = createListObject();
5283 list *listPtr = (list*) listObject->ptr;
5284
5285 /* STORE option specified, set the sorting result as a List object */
5286 for (j = start; j <= end; j++) {
5287 listNode *ln;
5288 if (!getop) {
5289 listAddNodeTail(listPtr,vector[j].obj);
5290 incrRefCount(vector[j].obj);
5291 }
5292 listRewind(operations);
5293 while((ln = listYield(operations))) {
5294 redisSortOperation *sop = ln->value;
5295 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5296 vector[j].obj);
5297
5298 if (sop->type == REDIS_SORT_GET) {
5299 if (!val || val->type != REDIS_STRING) {
5300 listAddNodeTail(listPtr,createStringObject("",0));
5301 } else {
5302 listAddNodeTail(listPtr,val);
5303 incrRefCount(val);
5304 }
5305 } else {
5306 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5307 }
5308 }
5309 }
5310 if (dictReplace(c->db->dict,storekey,listObject)) {
5311 incrRefCount(storekey);
5312 }
5313 /* Note: we add 1 because the DB is dirty anyway since even if the
5314 * SORT result is empty a new key is set and maybe the old content
5315 * replaced. */
5316 server.dirty += 1+outputlen;
5317 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5318 }
5319
5320 /* Cleanup */
5321 decrRefCount(sortval);
5322 listRelease(operations);
5323 for (j = 0; j < vectorlen; j++) {
5324 if (sortby && alpha && vector[j].u.cmpobj)
5325 decrRefCount(vector[j].u.cmpobj);
5326 }
5327 zfree(vector);
5328 }
5329
5330 /* Create the string returned by the INFO command. This is decoupled
5331 * by the INFO command itself as we need to report the same information
5332 * on memory corruption problems. */
5333 static sds genRedisInfoString(void) {
5334 sds info;
5335 time_t uptime = time(NULL)-server.stat_starttime;
5336 int j;
5337
5338 info = sdscatprintf(sdsempty(),
5339 "redis_version:%s\r\n"
5340 "arch_bits:%s\r\n"
5341 "multiplexing_api:%s\r\n"
5342 "uptime_in_seconds:%ld\r\n"
5343 "uptime_in_days:%ld\r\n"
5344 "connected_clients:%d\r\n"
5345 "connected_slaves:%d\r\n"
5346 "blocked_clients:%d\r\n"
5347 "used_memory:%zu\r\n"
5348 "changes_since_last_save:%lld\r\n"
5349 "bgsave_in_progress:%d\r\n"
5350 "last_save_time:%ld\r\n"
5351 "bgrewriteaof_in_progress:%d\r\n"
5352 "total_connections_received:%lld\r\n"
5353 "total_commands_processed:%lld\r\n"
5354 "role:%s\r\n"
5355 ,REDIS_VERSION,
5356 (sizeof(long) == 8) ? "64" : "32",
5357 aeGetApiName(),
5358 uptime,
5359 uptime/(3600*24),
5360 listLength(server.clients)-listLength(server.slaves),
5361 listLength(server.slaves),
5362 server.blockedclients,
5363 server.usedmemory,
5364 server.dirty,
5365 server.bgsavechildpid != -1,
5366 server.lastsave,
5367 server.bgrewritechildpid != -1,
5368 server.stat_numconnections,
5369 server.stat_numcommands,
5370 server.masterhost == NULL ? "master" : "slave"
5371 );
5372 if (server.masterhost) {
5373 info = sdscatprintf(info,
5374 "master_host:%s\r\n"
5375 "master_port:%d\r\n"
5376 "master_link_status:%s\r\n"
5377 "master_last_io_seconds_ago:%d\r\n"
5378 ,server.masterhost,
5379 server.masterport,
5380 (server.replstate == REDIS_REPL_CONNECTED) ?
5381 "up" : "down",
5382 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5383 );
5384 }
5385 for (j = 0; j < server.dbnum; j++) {
5386 long long keys, vkeys;
5387
5388 keys = dictSize(server.db[j].dict);
5389 vkeys = dictSize(server.db[j].expires);
5390 if (keys || vkeys) {
5391 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5392 j, keys, vkeys);
5393 }
5394 }
5395 return info;
5396 }
5397
5398 static void infoCommand(redisClient *c) {
5399 sds info = genRedisInfoString();
5400 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5401 (unsigned long)sdslen(info)));
5402 addReplySds(c,info);
5403 addReply(c,shared.crlf);
5404 }
5405
5406 static void monitorCommand(redisClient *c) {
5407 /* ignore MONITOR if aleady slave or in monitor mode */
5408 if (c->flags & REDIS_SLAVE) return;
5409
5410 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5411 c->slaveseldb = 0;
5412 listAddNodeTail(server.monitors,c);
5413 addReply(c,shared.ok);
5414 }
5415
5416 /* ================================= Expire ================================= */
5417 static int removeExpire(redisDb *db, robj *key) {
5418 if (dictDelete(db->expires,key) == DICT_OK) {
5419 return 1;
5420 } else {
5421 return 0;
5422 }
5423 }
5424
5425 static int setExpire(redisDb *db, robj *key, time_t when) {
5426 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5427 return 0;
5428 } else {
5429 incrRefCount(key);
5430 return 1;
5431 }
5432 }
5433
5434 /* Return the expire time of the specified key, or -1 if no expire
5435 * is associated with this key (i.e. the key is non volatile) */
5436 static time_t getExpire(redisDb *db, robj *key) {
5437 dictEntry *de;
5438
5439 /* No expire? return ASAP */
5440 if (dictSize(db->expires) == 0 ||
5441 (de = dictFind(db->expires,key)) == NULL) return -1;
5442
5443 return (time_t) dictGetEntryVal(de);
5444 }
5445
5446 static int expireIfNeeded(redisDb *db, robj *key) {
5447 time_t when;
5448 dictEntry *de;
5449
5450 /* No expire? return ASAP */
5451 if (dictSize(db->expires) == 0 ||
5452 (de = dictFind(db->expires,key)) == NULL) return 0;
5453
5454 /* Lookup the expire */
5455 when = (time_t) dictGetEntryVal(de);
5456 if (time(NULL) <= when) return 0;
5457
5458 /* Delete the key */
5459 dictDelete(db->expires,key);
5460 return dictDelete(db->dict,key) == DICT_OK;
5461 }
5462
5463 static int deleteIfVolatile(redisDb *db, robj *key) {
5464 dictEntry *de;
5465
5466 /* No expire? return ASAP */
5467 if (dictSize(db->expires) == 0 ||
5468 (de = dictFind(db->expires,key)) == NULL) return 0;
5469
5470 /* Delete the key */
5471 server.dirty++;
5472 dictDelete(db->expires,key);
5473 return dictDelete(db->dict,key) == DICT_OK;
5474 }
5475
5476 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5477 dictEntry *de;
5478
5479 de = dictFind(c->db->dict,key);
5480 if (de == NULL) {
5481 addReply(c,shared.czero);
5482 return;
5483 }
5484 if (seconds < 0) {
5485 if (deleteKey(c->db,key)) server.dirty++;
5486 addReply(c, shared.cone);
5487 return;
5488 } else {
5489 time_t when = time(NULL)+seconds;
5490 if (setExpire(c->db,key,when)) {
5491 addReply(c,shared.cone);
5492 server.dirty++;
5493 } else {
5494 addReply(c,shared.czero);
5495 }
5496 return;
5497 }
5498 }
5499
5500 static void expireCommand(redisClient *c) {
5501 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5502 }
5503
5504 static void expireatCommand(redisClient *c) {
5505 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5506 }
5507
5508 static void ttlCommand(redisClient *c) {
5509 time_t expire;
5510 int ttl = -1;
5511
5512 expire = getExpire(c->db,c->argv[1]);
5513 if (expire != -1) {
5514 ttl = (int) (expire-time(NULL));
5515 if (ttl < 0) ttl = -1;
5516 }
5517 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5518 }
5519
5520 /* ================================ MULTI/EXEC ============================== */
5521
5522 /* Client state initialization for MULTI/EXEC */
5523 static void initClientMultiState(redisClient *c) {
5524 c->mstate.commands = NULL;
5525 c->mstate.count = 0;
5526 }
5527
5528 /* Release all the resources associated with MULTI/EXEC state */
5529 static void freeClientMultiState(redisClient *c) {
5530 int j;
5531
5532 for (j = 0; j < c->mstate.count; j++) {
5533 int i;
5534 multiCmd *mc = c->mstate.commands+j;
5535
5536 for (i = 0; i < mc->argc; i++)
5537 decrRefCount(mc->argv[i]);
5538 zfree(mc->argv);
5539 }
5540 zfree(c->mstate.commands);
5541 }
5542
5543 /* Add a new command into the MULTI commands queue */
5544 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5545 multiCmd *mc;
5546 int j;
5547
5548 c->mstate.commands = zrealloc(c->mstate.commands,
5549 sizeof(multiCmd)*(c->mstate.count+1));
5550 mc = c->mstate.commands+c->mstate.count;
5551 mc->cmd = cmd;
5552 mc->argc = c->argc;
5553 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5554 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5555 for (j = 0; j < c->argc; j++)
5556 incrRefCount(mc->argv[j]);
5557 c->mstate.count++;
5558 }
5559
5560 static void multiCommand(redisClient *c) {
5561 c->flags |= REDIS_MULTI;
5562 addReply(c,shared.ok);
5563 }
5564
5565 static void execCommand(redisClient *c) {
5566 int j;
5567 robj **orig_argv;
5568 int orig_argc;
5569
5570 if (!(c->flags & REDIS_MULTI)) {
5571 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5572 return;
5573 }
5574
5575 orig_argv = c->argv;
5576 orig_argc = c->argc;
5577 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5578 for (j = 0; j < c->mstate.count; j++) {
5579 c->argc = c->mstate.commands[j].argc;
5580 c->argv = c->mstate.commands[j].argv;
5581 call(c,c->mstate.commands[j].cmd);
5582 }
5583 c->argv = orig_argv;
5584 c->argc = orig_argc;
5585 freeClientMultiState(c);
5586 initClientMultiState(c);
5587 c->flags &= (~REDIS_MULTI);
5588 }
5589
5590 /* =========================== Blocking Operations ========================= */
5591
5592 /* Currently Redis blocking operations support is limited to list POP ops,
5593 * so the current implementation is not fully generic, but it is also not
5594 * completely specific so it will not require a rewrite to support new
5595 * kind of blocking operations in the future.
5596 *
5597 * Still it's important to note that list blocking operations can be already
5598 * used as a notification mechanism in order to implement other blocking
5599 * operations at application level, so there must be a very strong evidence
5600 * of usefulness and generality before new blocking operations are implemented.
5601 *
5602 * This is how the current blocking POP works, we use BLPOP as example:
5603 * - If the user calls BLPOP and the key exists and contains a non empty list
5604 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5605 * if there is not to block.
5606 * - If instead BLPOP is called and the key does not exists or the list is
5607 * empty we need to block. In order to do so we remove the notification for
5608 * new data to read in the client socket (so that we'll not serve new
5609 * requests if the blocking request is not served). Also we put the client
5610 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5611 * blocking for this keys.
5612 * - If a PUSH operation against a key with blocked clients waiting is
5613 * performed, we serve the first in the list: basically instead to push
5614 * the new element inside the list we return it to the (first / oldest)
5615 * blocking client, unblock the client, and remove it form the list.
5616 *
5617 * The above comment and the source code should be enough in order to understand
5618 * the implementation and modify / fix it later.
5619 */
5620
5621 /* Set a client in blocking mode for the specified key, with the specified
5622 * timeout */
5623 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5624 dictEntry *de;
5625 list *l;
5626 int j;
5627
5628 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5629 c->blockingkeysnum = numkeys;
5630 c->blockingto = timeout;
5631 for (j = 0; j < numkeys; j++) {
5632 /* Add the key in the client structure, to map clients -> keys */
5633 c->blockingkeys[j] = keys[j];
5634 incrRefCount(keys[j]);
5635
5636 /* And in the other "side", to map keys -> clients */
5637 de = dictFind(c->db->blockingkeys,keys[j]);
5638 if (de == NULL) {
5639 int retval;
5640
5641 /* For every key we take a list of clients blocked for it */
5642 l = listCreate();
5643 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5644 incrRefCount(keys[j]);
5645 assert(retval == DICT_OK);
5646 } else {
5647 l = dictGetEntryVal(de);
5648 }
5649 listAddNodeTail(l,c);
5650 }
5651 /* Mark the client as a blocked client */
5652 c->flags |= REDIS_BLOCKED;
5653 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5654 server.blockedclients++;
5655 }
5656
5657 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5658 static void unblockClient(redisClient *c) {
5659 dictEntry *de;
5660 list *l;
5661 int j;
5662
5663 assert(c->blockingkeys != NULL);
5664 /* The client may wait for multiple keys, so unblock it for every key. */
5665 for (j = 0; j < c->blockingkeysnum; j++) {
5666 /* Remove this client from the list of clients waiting for this key. */
5667 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5668 assert(de != NULL);
5669 l = dictGetEntryVal(de);
5670 listDelNode(l,listSearchKey(l,c));
5671 /* If the list is empty we need to remove it to avoid wasting memory */
5672 if (listLength(l) == 0)
5673 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5674 decrRefCount(c->blockingkeys[j]);
5675 }
5676 /* Cleanup the client structure */
5677 zfree(c->blockingkeys);
5678 c->blockingkeys = NULL;
5679 c->flags &= (~REDIS_BLOCKED);
5680 server.blockedclients--;
5681 /* Ok now we are ready to get read events from socket, note that we
5682 * can't trap errors here as it's possible that unblockClients() is
5683 * called from freeClient() itself, and the only thing we can do
5684 * if we failed to register the READABLE event is to kill the client.
5685 * Still the following function should never fail in the real world as
5686 * we are sure the file descriptor is sane, and we exit on out of mem. */
5687 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5688 /* As a final step we want to process data if there is some command waiting
5689 * in the input buffer. Note that this is safe even if unblockClient()
5690 * gets called from freeClient() because freeClient() will be smart
5691 * enough to call this function *after* c->querybuf was set to NULL. */
5692 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5693 }
5694
5695 /* This should be called from any function PUSHing into lists.
5696 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5697 * 'ele' is the element pushed.
5698 *
5699 * If the function returns 0 there was no client waiting for a list push
5700 * against this key.
5701 *
5702 * If the function returns 1 there was a client waiting for a list push
5703 * against this key, the element was passed to this client thus it's not
5704 * needed to actually add it to the list and the caller should return asap. */
5705 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5706 struct dictEntry *de;
5707 redisClient *receiver;
5708 list *l;
5709 listNode *ln;
5710
5711 de = dictFind(c->db->blockingkeys,key);
5712 if (de == NULL) return 0;
5713 l = dictGetEntryVal(de);
5714 ln = listFirst(l);
5715 assert(ln != NULL);
5716 receiver = ln->value;
5717
5718 addReplySds(receiver,sdsnew("*2\r\n"));
5719 addReplyBulkLen(receiver,key);
5720 addReply(receiver,key);
5721 addReply(receiver,shared.crlf);
5722 addReplyBulkLen(receiver,ele);
5723 addReply(receiver,ele);
5724 addReply(receiver,shared.crlf);
5725 unblockClient(receiver);
5726 return 1;
5727 }
5728
5729 /* Blocking RPOP/LPOP */
5730 static void blockingPopGenericCommand(redisClient *c, int where) {
5731 robj *o;
5732 time_t timeout;
5733 int j;
5734
5735 for (j = 1; j < c->argc-1; j++) {
5736 o = lookupKeyWrite(c->db,c->argv[j]);
5737 if (o != NULL) {
5738 if (o->type != REDIS_LIST) {
5739 addReply(c,shared.wrongtypeerr);
5740 return;
5741 } else {
5742 list *list = o->ptr;
5743 if (listLength(list) != 0) {
5744 /* If the list contains elements fall back to the usual
5745 * non-blocking POP operation */
5746 robj *argv[2], **orig_argv;
5747 int orig_argc;
5748
5749 /* We need to alter the command arguments before to call
5750 * popGenericCommand() as the command takes a single key. */
5751 orig_argv = c->argv;
5752 orig_argc = c->argc;
5753 argv[1] = c->argv[j];
5754 c->argv = argv;
5755 c->argc = 2;
5756
5757 /* Also the return value is different, we need to output
5758 * the multi bulk reply header and the key name. The
5759 * "real" command will add the last element (the value)
5760 * for us. If this souds like an hack to you it's just
5761 * because it is... */
5762 addReplySds(c,sdsnew("*2\r\n"));
5763 addReplyBulkLen(c,argv[1]);
5764 addReply(c,argv[1]);
5765 addReply(c,shared.crlf);
5766 popGenericCommand(c,where);
5767
5768 /* Fix the client structure with the original stuff */
5769 c->argv = orig_argv;
5770 c->argc = orig_argc;
5771 return;
5772 }
5773 }
5774 }
5775 }
5776 /* If the list is empty or the key does not exists we must block */
5777 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5778 if (timeout > 0) timeout += time(NULL);
5779 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5780 }
5781
5782 static void blpopCommand(redisClient *c) {
5783 blockingPopGenericCommand(c,REDIS_HEAD);
5784 }
5785
5786 static void brpopCommand(redisClient *c) {
5787 blockingPopGenericCommand(c,REDIS_TAIL);
5788 }
5789
5790 /* =============================== Replication ============================= */
5791
5792 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5793 ssize_t nwritten, ret = size;
5794 time_t start = time(NULL);
5795
5796 timeout++;
5797 while(size) {
5798 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5799 nwritten = write(fd,ptr,size);
5800 if (nwritten == -1) return -1;
5801 ptr += nwritten;
5802 size -= nwritten;
5803 }
5804 if ((time(NULL)-start) > timeout) {
5805 errno = ETIMEDOUT;
5806 return -1;
5807 }
5808 }
5809 return ret;
5810 }
5811
5812 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5813 ssize_t nread, totread = 0;
5814 time_t start = time(NULL);
5815
5816 timeout++;
5817 while(size) {
5818 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5819 nread = read(fd,ptr,size);
5820 if (nread == -1) return -1;
5821 ptr += nread;
5822 size -= nread;
5823 totread += nread;
5824 }
5825 if ((time(NULL)-start) > timeout) {
5826 errno = ETIMEDOUT;
5827 return -1;
5828 }
5829 }
5830 return totread;
5831 }
5832
5833 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5834 ssize_t nread = 0;
5835
5836 size--;
5837 while(size) {
5838 char c;
5839
5840 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5841 if (c == '\n') {
5842 *ptr = '\0';
5843 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5844 return nread;
5845 } else {
5846 *ptr++ = c;
5847 *ptr = '\0';
5848 nread++;
5849 }
5850 }
5851 return nread;
5852 }
5853
5854 static void syncCommand(redisClient *c) {
5855 /* ignore SYNC if aleady slave or in monitor mode */
5856 if (c->flags & REDIS_SLAVE) return;
5857
5858 /* SYNC can't be issued when the server has pending data to send to
5859 * the client about already issued commands. We need a fresh reply
5860 * buffer registering the differences between the BGSAVE and the current
5861 * dataset, so that we can copy to other slaves if needed. */
5862 if (listLength(c->reply) != 0) {
5863 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5864 return;
5865 }
5866
5867 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5868 /* Here we need to check if there is a background saving operation
5869 * in progress, or if it is required to start one */
5870 if (server.bgsavechildpid != -1) {
5871 /* Ok a background save is in progress. Let's check if it is a good
5872 * one for replication, i.e. if there is another slave that is
5873 * registering differences since the server forked to save */
5874 redisClient *slave;
5875 listNode *ln;
5876
5877 listRewind(server.slaves);
5878 while((ln = listYield(server.slaves))) {
5879 slave = ln->value;
5880 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5881 }
5882 if (ln) {
5883 /* Perfect, the server is already registering differences for
5884 * another slave. Set the right state, and copy the buffer. */
5885 listRelease(c->reply);
5886 c->reply = listDup(slave->reply);
5887 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5888 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5889 } else {
5890 /* No way, we need to wait for the next BGSAVE in order to
5891 * register differences */
5892 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5893 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5894 }
5895 } else {
5896 /* Ok we don't have a BGSAVE in progress, let's start one */
5897 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5898 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5899 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5900 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5901 return;
5902 }
5903 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5904 }
5905 c->repldbfd = -1;
5906 c->flags |= REDIS_SLAVE;
5907 c->slaveseldb = 0;
5908 listAddNodeTail(server.slaves,c);
5909 return;
5910 }
5911
5912 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5913 redisClient *slave = privdata;
5914 REDIS_NOTUSED(el);
5915 REDIS_NOTUSED(mask);
5916 char buf[REDIS_IOBUF_LEN];
5917 ssize_t nwritten, buflen;
5918
5919 if (slave->repldboff == 0) {
5920 /* Write the bulk write count before to transfer the DB. In theory here
5921 * we don't know how much room there is in the output buffer of the
5922 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5923 * operations) will never be smaller than the few bytes we need. */
5924 sds bulkcount;
5925
5926 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5927 slave->repldbsize);
5928 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5929 {
5930 sdsfree(bulkcount);
5931 freeClient(slave);
5932 return;
5933 }
5934 sdsfree(bulkcount);
5935 }
5936 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5937 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5938 if (buflen <= 0) {
5939 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5940 (buflen == 0) ? "premature EOF" : strerror(errno));
5941 freeClient(slave);
5942 return;
5943 }
5944 if ((nwritten = write(fd,buf,buflen)) == -1) {
5945 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5946 strerror(errno));
5947 freeClient(slave);
5948 return;
5949 }
5950 slave->repldboff += nwritten;
5951 if (slave->repldboff == slave->repldbsize) {
5952 close(slave->repldbfd);
5953 slave->repldbfd = -1;
5954 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5955 slave->replstate = REDIS_REPL_ONLINE;
5956 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5957 sendReplyToClient, slave) == AE_ERR) {
5958 freeClient(slave);
5959 return;
5960 }
5961 addReplySds(slave,sdsempty());
5962 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5963 }
5964 }
5965
5966 /* This function is called at the end of every backgrond saving.
5967 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5968 * otherwise REDIS_ERR is passed to the function.
5969 *
5970 * The goal of this function is to handle slaves waiting for a successful
5971 * background saving in order to perform non-blocking synchronization. */
5972 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5973 listNode *ln;
5974 int startbgsave = 0;
5975
5976 listRewind(server.slaves);
5977 while((ln = listYield(server.slaves))) {
5978 redisClient *slave = ln->value;
5979
5980 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5981 startbgsave = 1;
5982 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5983 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5984 struct redis_stat buf;
5985
5986 if (bgsaveerr != REDIS_OK) {
5987 freeClient(slave);
5988 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5989 continue;
5990 }
5991 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5992 redis_fstat(slave->repldbfd,&buf) == -1) {
5993 freeClient(slave);
5994 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5995 continue;
5996 }
5997 slave->repldboff = 0;
5998 slave->repldbsize = buf.st_size;
5999 slave->replstate = REDIS_REPL_SEND_BULK;
6000 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
6001 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6002 freeClient(slave);
6003 continue;
6004 }
6005 }
6006 }
6007 if (startbgsave) {
6008 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
6009 listRewind(server.slaves);
6010 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
6011 while((ln = listYield(server.slaves))) {
6012 redisClient *slave = ln->value;
6013
6014 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
6015 freeClient(slave);
6016 }
6017 }
6018 }
6019 }
6020
6021 static int syncWithMaster(void) {
6022 char buf[1024], tmpfile[256], authcmd[1024];
6023 int dumpsize;
6024 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
6025 int dfd;
6026
6027 if (fd == -1) {
6028 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
6029 strerror(errno));
6030 return REDIS_ERR;
6031 }
6032
6033 /* AUTH with the master if required. */
6034 if(server.masterauth) {
6035 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
6036 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
6037 close(fd);
6038 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
6039 strerror(errno));
6040 return REDIS_ERR;
6041 }
6042 /* Read the AUTH result. */
6043 if (syncReadLine(fd,buf,1024,3600) == -1) {
6044 close(fd);
6045 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6046 strerror(errno));
6047 return REDIS_ERR;
6048 }
6049 if (buf[0] != '+') {
6050 close(fd);
6051 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6052 return REDIS_ERR;
6053 }
6054 }
6055
6056 /* Issue the SYNC command */
6057 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6058 close(fd);
6059 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6060 strerror(errno));
6061 return REDIS_ERR;
6062 }
6063 /* Read the bulk write count */
6064 if (syncReadLine(fd,buf,1024,3600) == -1) {
6065 close(fd);
6066 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6067 strerror(errno));
6068 return REDIS_ERR;
6069 }
6070 if (buf[0] != '$') {
6071 close(fd);
6072 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6073 return REDIS_ERR;
6074 }
6075 dumpsize = atoi(buf+1);
6076 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6077 /* Read the bulk write data on a temp file */
6078 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6079 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6080 if (dfd == -1) {
6081 close(fd);
6082 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6083 return REDIS_ERR;
6084 }
6085 while(dumpsize) {
6086 int nread, nwritten;
6087
6088 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6089 if (nread == -1) {
6090 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6091 strerror(errno));
6092 close(fd);
6093 close(dfd);
6094 return REDIS_ERR;
6095 }
6096 nwritten = write(dfd,buf,nread);
6097 if (nwritten == -1) {
6098 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6099 close(fd);
6100 close(dfd);
6101 return REDIS_ERR;
6102 }
6103 dumpsize -= nread;
6104 }
6105 close(dfd);
6106 if (rename(tmpfile,server.dbfilename) == -1) {
6107 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6108 unlink(tmpfile);
6109 close(fd);
6110 return REDIS_ERR;
6111 }
6112 emptyDb();
6113 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6114 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6115 close(fd);
6116 return REDIS_ERR;
6117 }
6118 server.master = createClient(fd);
6119 server.master->flags |= REDIS_MASTER;
6120 server.master->authenticated = 1;
6121 server.replstate = REDIS_REPL_CONNECTED;
6122 return REDIS_OK;
6123 }
6124
6125 static void slaveofCommand(redisClient *c) {
6126 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6127 !strcasecmp(c->argv[2]->ptr,"one")) {
6128 if (server.masterhost) {
6129 sdsfree(server.masterhost);
6130 server.masterhost = NULL;
6131 if (server.master) freeClient(server.master);
6132 server.replstate = REDIS_REPL_NONE;
6133 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6134 }
6135 } else {
6136 sdsfree(server.masterhost);
6137 server.masterhost = sdsdup(c->argv[1]->ptr);
6138 server.masterport = atoi(c->argv[2]->ptr);
6139 if (server.master) freeClient(server.master);
6140 server.replstate = REDIS_REPL_CONNECT;
6141 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6142 server.masterhost, server.masterport);
6143 }
6144 addReply(c,shared.ok);
6145 }
6146
6147 /* ============================ Maxmemory directive ======================== */
6148
6149 /* This function gets called when 'maxmemory' is set on the config file to limit
6150 * the max memory used by the server, and we are out of memory.
6151 * This function will try to, in order:
6152 *
6153 * - Free objects from the free list
6154 * - Try to remove keys with an EXPIRE set
6155 *
6156 * It is not possible to free enough memory to reach used-memory < maxmemory
6157 * the server will start refusing commands that will enlarge even more the
6158 * memory usage.
6159 */
6160 static void freeMemoryIfNeeded(void) {
6161 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6162 if (listLength(server.objfreelist)) {
6163 robj *o;
6164
6165 listNode *head = listFirst(server.objfreelist);
6166 o = listNodeValue(head);
6167 listDelNode(server.objfreelist,head);
6168 zfree(o);
6169 } else {
6170 int j, k, freed = 0;
6171
6172 for (j = 0; j < server.dbnum; j++) {
6173 int minttl = -1;
6174 robj *minkey = NULL;
6175 struct dictEntry *de;
6176
6177 if (dictSize(server.db[j].expires)) {
6178 freed = 1;
6179 /* From a sample of three keys drop the one nearest to
6180 * the natural expire */
6181 for (k = 0; k < 3; k++) {
6182 time_t t;
6183
6184 de = dictGetRandomKey(server.db[j].expires);
6185 t = (time_t) dictGetEntryVal(de);
6186 if (minttl == -1 || t < minttl) {
6187 minkey = dictGetEntryKey(de);
6188 minttl = t;
6189 }
6190 }
6191 deleteKey(server.db+j,minkey);
6192 }
6193 }
6194 if (!freed) return; /* nothing to free... */
6195 }
6196 }
6197 }
6198
6199 /* ============================== Append Only file ========================== */
6200
6201 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6202 sds buf = sdsempty();
6203 int j;
6204 ssize_t nwritten;
6205 time_t now;
6206 robj *tmpargv[3];
6207
6208 /* The DB this command was targetting is not the same as the last command
6209 * we appendend. To issue a SELECT command is needed. */
6210 if (dictid != server.appendseldb) {
6211 char seldb[64];
6212
6213 snprintf(seldb,sizeof(seldb),"%d",dictid);
6214 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6215 (unsigned long)strlen(seldb),seldb);
6216 server.appendseldb = dictid;
6217 }
6218
6219 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6220 * EXPIREs into EXPIREATs calls */
6221 if (cmd->proc == expireCommand) {
6222 long when;
6223
6224 tmpargv[0] = createStringObject("EXPIREAT",8);
6225 tmpargv[1] = argv[1];
6226 incrRefCount(argv[1]);
6227 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6228 tmpargv[2] = createObject(REDIS_STRING,
6229 sdscatprintf(sdsempty(),"%ld",when));
6230 argv = tmpargv;
6231 }
6232
6233 /* Append the actual command */
6234 buf = sdscatprintf(buf,"*%d\r\n",argc);
6235 for (j = 0; j < argc; j++) {
6236 robj *o = argv[j];
6237
6238 o = getDecodedObject(o);
6239 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6240 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6241 buf = sdscatlen(buf,"\r\n",2);
6242 decrRefCount(o);
6243 }
6244
6245 /* Free the objects from the modified argv for EXPIREAT */
6246 if (cmd->proc == expireCommand) {
6247 for (j = 0; j < 3; j++)
6248 decrRefCount(argv[j]);
6249 }
6250
6251 /* We want to perform a single write. This should be guaranteed atomic
6252 * at least if the filesystem we are writing is a real physical one.
6253 * While this will save us against the server being killed I don't think
6254 * there is much to do about the whole server stopping for power problems
6255 * or alike */
6256 nwritten = write(server.appendfd,buf,sdslen(buf));
6257 if (nwritten != (signed)sdslen(buf)) {
6258 /* Ooops, we are in troubles. The best thing to do for now is
6259 * to simply exit instead to give the illusion that everything is
6260 * working as expected. */
6261 if (nwritten == -1) {
6262 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6263 } else {
6264 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6265 }
6266 exit(1);
6267 }
6268 /* If a background append only file rewriting is in progress we want to
6269 * accumulate the differences between the child DB and the current one
6270 * in a buffer, so that when the child process will do its work we
6271 * can append the differences to the new append only file. */
6272 if (server.bgrewritechildpid != -1)
6273 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6274
6275 sdsfree(buf);
6276 now = time(NULL);
6277 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6278 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6279 now-server.lastfsync > 1))
6280 {
6281 fsync(server.appendfd); /* Let's try to get this data on the disk */
6282 server.lastfsync = now;
6283 }
6284 }
6285
6286 /* In Redis commands are always executed in the context of a client, so in
6287 * order to load the append only file we need to create a fake client. */
6288 static struct redisClient *createFakeClient(void) {
6289 struct redisClient *c = zmalloc(sizeof(*c));
6290
6291 selectDb(c,0);
6292 c->fd = -1;
6293 c->querybuf = sdsempty();
6294 c->argc = 0;
6295 c->argv = NULL;
6296 c->flags = 0;
6297 /* We set the fake client as a slave waiting for the synchronization
6298 * so that Redis will not try to send replies to this client. */
6299 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6300 c->reply = listCreate();
6301 listSetFreeMethod(c->reply,decrRefCount);
6302 listSetDupMethod(c->reply,dupClientReplyValue);
6303 return c;
6304 }
6305
6306 static void freeFakeClient(struct redisClient *c) {
6307 sdsfree(c->querybuf);
6308 listRelease(c->reply);
6309 zfree(c);
6310 }
6311
6312 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6313 * error (the append only file is zero-length) REDIS_ERR is returned. On
6314 * fatal error an error message is logged and the program exists. */
6315 int loadAppendOnlyFile(char *filename) {
6316 struct redisClient *fakeClient;
6317 FILE *fp = fopen(filename,"r");
6318 struct redis_stat sb;
6319
6320 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6321 return REDIS_ERR;
6322
6323 if (fp == NULL) {
6324 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6325 exit(1);
6326 }
6327
6328 fakeClient = createFakeClient();
6329 while(1) {
6330 int argc, j;
6331 unsigned long len;
6332 robj **argv;
6333 char buf[128];
6334 sds argsds;
6335 struct redisCommand *cmd;
6336
6337 if (fgets(buf,sizeof(buf),fp) == NULL) {
6338 if (feof(fp))
6339 break;
6340 else
6341 goto readerr;
6342 }
6343 if (buf[0] != '*') goto fmterr;
6344 argc = atoi(buf+1);
6345 argv = zmalloc(sizeof(robj*)*argc);
6346 for (j = 0; j < argc; j++) {
6347 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6348 if (buf[0] != '$') goto fmterr;
6349 len = strtol(buf+1,NULL,10);
6350 argsds = sdsnewlen(NULL,len);
6351 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6352 argv[j] = createObject(REDIS_STRING,argsds);
6353 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6354 }
6355
6356 /* Command lookup */
6357 cmd = lookupCommand(argv[0]->ptr);
6358 if (!cmd) {
6359 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6360 exit(1);
6361 }
6362 /* Try object sharing and encoding */
6363 if (server.shareobjects) {
6364 int j;
6365 for(j = 1; j < argc; j++)
6366 argv[j] = tryObjectSharing(argv[j]);
6367 }
6368 if (cmd->flags & REDIS_CMD_BULK)
6369 tryObjectEncoding(argv[argc-1]);
6370 /* Run the command in the context of a fake client */
6371 fakeClient->argc = argc;
6372 fakeClient->argv = argv;
6373 cmd->proc(fakeClient);
6374 /* Discard the reply objects list from the fake client */
6375 while(listLength(fakeClient->reply))
6376 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6377 /* Clean up, ready for the next command */
6378 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6379 zfree(argv);
6380 }
6381 fclose(fp);
6382 freeFakeClient(fakeClient);
6383 return REDIS_OK;
6384
6385 readerr:
6386 if (feof(fp)) {
6387 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6388 } else {
6389 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6390 }
6391 exit(1);
6392 fmterr:
6393 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6394 exit(1);
6395 }
6396
6397 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6398 static int fwriteBulk(FILE *fp, robj *obj) {
6399 char buf[128];
6400 obj = getDecodedObject(obj);
6401 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6402 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6403 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6404 goto err;
6405 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6406 decrRefCount(obj);
6407 return 1;
6408 err:
6409 decrRefCount(obj);
6410 return 0;
6411 }
6412
6413 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6414 static int fwriteBulkDouble(FILE *fp, double d) {
6415 char buf[128], dbuf[128];
6416
6417 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6418 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6419 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6420 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6421 return 1;
6422 }
6423
6424 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6425 static int fwriteBulkLong(FILE *fp, long l) {
6426 char buf[128], lbuf[128];
6427
6428 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6429 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6430 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6431 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6432 return 1;
6433 }
6434
6435 /* Write a sequence of commands able to fully rebuild the dataset into
6436 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6437 static int rewriteAppendOnlyFile(char *filename) {
6438 dictIterator *di = NULL;
6439 dictEntry *de;
6440 FILE *fp;
6441 char tmpfile[256];
6442 int j;
6443 time_t now = time(NULL);
6444
6445 /* Note that we have to use a different temp name here compared to the
6446 * one used by rewriteAppendOnlyFileBackground() function. */
6447 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6448 fp = fopen(tmpfile,"w");
6449 if (!fp) {
6450 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6451 return REDIS_ERR;
6452 }
6453 for (j = 0; j < server.dbnum; j++) {
6454 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6455 redisDb *db = server.db+j;
6456 dict *d = db->dict;
6457 if (dictSize(d) == 0) continue;
6458 di = dictGetIterator(d);
6459 if (!di) {
6460 fclose(fp);
6461 return REDIS_ERR;
6462 }
6463
6464 /* SELECT the new DB */
6465 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6466 if (fwriteBulkLong(fp,j) == 0) goto werr;
6467
6468 /* Iterate this DB writing every entry */
6469 while((de = dictNext(di)) != NULL) {
6470 robj *key = dictGetEntryKey(de);
6471 robj *o = dictGetEntryVal(de);
6472 time_t expiretime = getExpire(db,key);
6473
6474 /* Save the key and associated value */
6475 if (o->type == REDIS_STRING) {
6476 /* Emit a SET command */
6477 char cmd[]="*3\r\n$3\r\nSET\r\n";
6478 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6479 /* Key and value */
6480 if (fwriteBulk(fp,key) == 0) goto werr;
6481 if (fwriteBulk(fp,o) == 0) goto werr;
6482 } else if (o->type == REDIS_LIST) {
6483 /* Emit the RPUSHes needed to rebuild the list */
6484 list *list = o->ptr;
6485 listNode *ln;
6486
6487 listRewind(list);
6488 while((ln = listYield(list))) {
6489 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6490 robj *eleobj = listNodeValue(ln);
6491
6492 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6493 if (fwriteBulk(fp,key) == 0) goto werr;
6494 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6495 }
6496 } else if (o->type == REDIS_SET) {
6497 /* Emit the SADDs needed to rebuild the set */
6498 dict *set = o->ptr;
6499 dictIterator *di = dictGetIterator(set);
6500 dictEntry *de;
6501
6502 while((de = dictNext(di)) != NULL) {
6503 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6504 robj *eleobj = dictGetEntryKey(de);
6505
6506 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6507 if (fwriteBulk(fp,key) == 0) goto werr;
6508 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6509 }
6510 dictReleaseIterator(di);
6511 } else if (o->type == REDIS_ZSET) {
6512 /* Emit the ZADDs needed to rebuild the sorted set */
6513 zset *zs = o->ptr;
6514 dictIterator *di = dictGetIterator(zs->dict);
6515 dictEntry *de;
6516
6517 while((de = dictNext(di)) != NULL) {
6518 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6519 robj *eleobj = dictGetEntryKey(de);
6520 double *score = dictGetEntryVal(de);
6521
6522 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6523 if (fwriteBulk(fp,key) == 0) goto werr;
6524 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6525 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6526 }
6527 dictReleaseIterator(di);
6528 } else {
6529 redisAssert(0 != 0);
6530 }
6531 /* Save the expire time */
6532 if (expiretime != -1) {
6533 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6534 /* If this key is already expired skip it */
6535 if (expiretime < now) continue;
6536 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6537 if (fwriteBulk(fp,key) == 0) goto werr;
6538 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6539 }
6540 }
6541 dictReleaseIterator(di);
6542 }
6543
6544 /* Make sure data will not remain on the OS's output buffers */
6545 fflush(fp);
6546 fsync(fileno(fp));
6547 fclose(fp);
6548
6549 /* Use RENAME to make sure the DB file is changed atomically only
6550 * if the generate DB file is ok. */
6551 if (rename(tmpfile,filename) == -1) {
6552 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6553 unlink(tmpfile);
6554 return REDIS_ERR;
6555 }
6556 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6557 return REDIS_OK;
6558
6559 werr:
6560 fclose(fp);
6561 unlink(tmpfile);
6562 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6563 if (di) dictReleaseIterator(di);
6564 return REDIS_ERR;
6565 }
6566
6567 /* This is how rewriting of the append only file in background works:
6568 *
6569 * 1) The user calls BGREWRITEAOF
6570 * 2) Redis calls this function, that forks():
6571 * 2a) the child rewrite the append only file in a temp file.
6572 * 2b) the parent accumulates differences in server.bgrewritebuf.
6573 * 3) When the child finished '2a' exists.
6574 * 4) The parent will trap the exit code, if it's OK, will append the
6575 * data accumulated into server.bgrewritebuf into the temp file, and
6576 * finally will rename(2) the temp file in the actual file name.
6577 * The the new file is reopened as the new append only file. Profit!
6578 */
6579 static int rewriteAppendOnlyFileBackground(void) {
6580 pid_t childpid;
6581
6582 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6583 if ((childpid = fork()) == 0) {
6584 /* Child */
6585 char tmpfile[256];
6586 close(server.fd);
6587
6588 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6589 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6590 exit(0);
6591 } else {
6592 exit(1);
6593 }
6594 } else {
6595 /* Parent */
6596 if (childpid == -1) {
6597 redisLog(REDIS_WARNING,
6598 "Can't rewrite append only file in background: fork: %s",
6599 strerror(errno));
6600 return REDIS_ERR;
6601 }
6602 redisLog(REDIS_NOTICE,
6603 "Background append only file rewriting started by pid %d",childpid);
6604 server.bgrewritechildpid = childpid;
6605 /* We set appendseldb to -1 in order to force the next call to the
6606 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6607 * accumulated by the parent into server.bgrewritebuf will start
6608 * with a SELECT statement and it will be safe to merge. */
6609 server.appendseldb = -1;
6610 return REDIS_OK;
6611 }
6612 return REDIS_OK; /* unreached */
6613 }
6614
6615 static void bgrewriteaofCommand(redisClient *c) {
6616 if (server.bgrewritechildpid != -1) {
6617 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6618 return;
6619 }
6620 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6621 char *status = "+Background append only file rewriting started\r\n";
6622 addReplySds(c,sdsnew(status));
6623 } else {
6624 addReply(c,shared.err);
6625 }
6626 }
6627
6628 static void aofRemoveTempFile(pid_t childpid) {
6629 char tmpfile[256];
6630
6631 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6632 unlink(tmpfile);
6633 }
6634
6635 /* =============================== Virtual Memory =========================== */
6636 static void vmInit(void) {
6637 off_t totsize;
6638
6639 server.vm_fp = fopen("/tmp/redisvm","w+b");
6640 if (server.vm_fp == NULL) {
6641 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6642 exit(1);
6643 }
6644 server.vm_fd = fileno(server.vm_fp);
6645 server.vm_next_page = 0;
6646 server.vm_near_pages = 0;
6647 totsize = server.vm_pages*server.vm_page_size;
6648 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6649 if (ftruncate(server.vm_fd,totsize) == -1) {
6650 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6651 strerror(errno));
6652 exit(1);
6653 } else {
6654 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6655 }
6656 server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
6657 memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
6658 /* Try to remove the swap file, so the OS will really delete it from the
6659 * file system when Redis exists. */
6660 unlink("/tmp/redisvm");
6661 }
6662
6663 /* Mark the page as used */
6664 static void vmMarkPageUsed(off_t page) {
6665 off_t byte = page/8;
6666 int bit = page&7;
6667 server.vm_bitmap[byte] |= 1<<bit;
6668 printf("Mark used: %lld (byte:%d bit:%d)\n", (long long)page, byte, bit);
6669 }
6670
6671 /* Mark N contiguous pages as used, with 'page' being the first. */
6672 static void vmMarkPagesUsed(off_t page, off_t count) {
6673 off_t j;
6674
6675 for (j = 0; j < count; j++)
6676 vmMarkPageUsed(page+j);
6677 }
6678
6679 /* Mark the page as free */
6680 static void vmMarkPageFree(off_t page) {
6681 off_t byte = page/8;
6682 int bit = page&7;
6683 server.vm_bitmap[byte] &= ~(1<<bit);
6684 }
6685
6686 /* Mark N contiguous pages as free, with 'page' being the first. */
6687 static void vmMarkPagesFree(off_t page, off_t count) {
6688 off_t j;
6689
6690 for (j = 0; j < count; j++)
6691 vmMarkPageFree(page+j);
6692 }
6693
6694 /* Test if the page is free */
6695 static int vmFreePage(off_t page) {
6696 off_t byte = page/8;
6697 int bit = page&7;
6698 return (server.vm_bitmap[byte] & (1<<bit)) == 0;
6699 }
6700
6701 /* Find N contiguous free pages storing the first page of the cluster in *first.
6702 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6703 * REDIS_ERR is returned.
6704 *
6705 * This function uses a simple algorithm: we try to allocate
6706 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6707 * again from the start of the swap file searching for free spaces.
6708 *
6709 * If it looks pretty clear that there are no free pages near our offset
6710 * we try to find less populated places doing a forward jump of
6711 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6712 * without hurry, and then we jump again and so forth...
6713 *
6714 * This function can be improved using a free list to avoid to guess
6715 * too much, since we could collect data about freed pages.
6716 *
6717 * note: I implemented this function just after watching an episode of
6718 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6719 */
6720 static int vmFindContiguousPages(off_t *first, int n) {
6721 off_t base, offset = 0, since_jump = 0, numfree = 0;
6722
6723 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6724 server.vm_near_pages = 0;
6725 server.vm_next_page = 0;
6726 }
6727 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6728 base = server.vm_next_page;
6729
6730 while(offset < server.vm_pages) {
6731 off_t this = base+offset;
6732
6733 printf("THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
6734 /* If we overflow, restart from page zero */
6735 if (this >= server.vm_pages) {
6736 this -= server.vm_pages;
6737 if (this == 0) {
6738 /* Just overflowed, what we found on tail is no longer
6739 * interesting, as it's no longer contiguous. */
6740 numfree = 0;
6741 }
6742 }
6743 if (vmFreePage(this)) {
6744 /* This is a free page */
6745 numfree++;
6746 /* Already got N free pages? Return to the caller, with success */
6747 if (numfree == n) {
6748 *first = this-(n-1);
6749 server.vm_next_page = this+1;
6750 return REDIS_OK;
6751 }
6752 } else {
6753 /* The current one is not a free page */
6754 numfree = 0;
6755 }
6756
6757 /* Fast-forward if the current page is not free and we already
6758 * searched enough near this place. */
6759 since_jump++;
6760 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6761 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6762 since_jump = 0;
6763 /* Note that even if we rewind after the jump, we are don't need
6764 * to make sure numfree is set to zero as we only jump *if* it
6765 * is set to zero. */
6766 } else {
6767 /* Otherwise just check the next page */
6768 offset++;
6769 }
6770 }
6771 return REDIS_ERR;
6772 }
6773
6774 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6775 * needed to later retrieve the object into the key object.
6776 * If we can't find enough contiguous empty pages to swap the object on disk
6777 * REDIS_ERR is returned. */
6778 static int vmSwapObject(robj *key, robj *val) {
6779 off_t pages = rdbSavedObjectPages(val);
6780 off_t page;
6781
6782 assert(key->storage == REDIS_VM_MEMORY);
6783 if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
6784 if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
6785 redisLog(REDIS_WARNING,
6786 "Critical VM problem in vmSwapObject(): can't seek: %s",
6787 strerror(errno));
6788 return REDIS_ERR;
6789 }
6790 rdbSaveObject(server.vm_fp,val);
6791 key->vm.page = page;
6792 key->vm.usedpages = pages;
6793 key->storage = REDIS_VM_SWAPPED;
6794 decrRefCount(val); /* Deallocate the object from memory. */
6795 vmMarkPagesUsed(page,pages);
6796 redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
6797 (unsigned char*) key->ptr,
6798 (unsigned long long) page, (unsigned long long) pages);
6799 return REDIS_OK;
6800 }
6801
6802 /* Load the value object relative to the 'key' object from swap to memory.
6803 * The newly allocated object is returned. */
6804 static robj *vmLoadObject(robj *key) {
6805 robj *val;
6806
6807 assert(key->storage == REDIS_VM_SWAPPED);
6808 if (fseeko(server.vm_fp,key->vm.page*server.vm_page_size,SEEK_SET) == -1) {
6809 redisLog(REDIS_WARNING,
6810 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6811 strerror(errno));
6812 exit(1);
6813 }
6814 val = rdbLoadObject(key->type,server.vm_fp);
6815 if (val == NULL) {
6816 redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno));
6817 exit(1);
6818 }
6819 key->storage = REDIS_VM_MEMORY;
6820 key->vm.atime = server.unixtime;
6821 vmMarkPagesFree(key->vm.page,key->vm.usedpages);
6822 redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
6823 (unsigned char*) key->ptr);
6824 return val;
6825 }
6826
6827 /* ================================= Debugging ============================== */
6828
6829 static void debugCommand(redisClient *c) {
6830 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6831 *((char*)-1) = 'x';
6832 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6833 if (rdbSave(server.dbfilename) != REDIS_OK) {
6834 addReply(c,shared.err);
6835 return;
6836 }
6837 emptyDb();
6838 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6839 addReply(c,shared.err);
6840 return;
6841 }
6842 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6843 addReply(c,shared.ok);
6844 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6845 emptyDb();
6846 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6847 addReply(c,shared.err);
6848 return;
6849 }
6850 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6851 addReply(c,shared.ok);
6852 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6853 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6854 robj *key, *val;
6855
6856 if (!de) {
6857 addReply(c,shared.nokeyerr);
6858 return;
6859 }
6860 key = dictGetEntryKey(de);
6861 val = dictGetEntryVal(de);
6862 addReplySds(c,sdscatprintf(sdsempty(),
6863 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6864 (void*)key, key->refcount, (void*)val, val->refcount,
6865 val->encoding, rdbSavedObjectLen(val)));
6866 } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
6867 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6868 robj *key, *val;
6869
6870 if (!server.vm_enabled) {
6871 addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
6872 return;
6873 }
6874 if (!de) {
6875 addReply(c,shared.nokeyerr);
6876 return;
6877 }
6878 key = dictGetEntryKey(de);
6879 val = dictGetEntryVal(de);
6880 if (key->storage != REDIS_VM_MEMORY) {
6881 addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
6882 } else if (vmSwapObject(key,val) == REDIS_OK) {
6883 dictGetEntryVal(de) = NULL;
6884 addReply(c,shared.ok);
6885 } else {
6886 addReply(c,shared.err);
6887 }
6888 } else {
6889 addReplySds(c,sdsnew(
6890 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
6891 }
6892 }
6893
6894 static void _redisAssert(char *estr) {
6895 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6896 redisLog(REDIS_WARNING,"==> %s\n",estr);
6897 #ifdef HAVE_BACKTRACE
6898 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6899 *((char*)-1) = 'x';
6900 #endif
6901 }
6902
6903 /* =================================== Main! ================================ */
6904
6905 #ifdef __linux__
6906 int linuxOvercommitMemoryValue(void) {
6907 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6908 char buf[64];
6909
6910 if (!fp) return -1;
6911 if (fgets(buf,64,fp) == NULL) {
6912 fclose(fp);
6913 return -1;
6914 }
6915 fclose(fp);
6916
6917 return atoi(buf);
6918 }
6919
6920 void linuxOvercommitMemoryWarning(void) {
6921 if (linuxOvercommitMemoryValue() == 0) {
6922 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6923 }
6924 }
6925 #endif /* __linux__ */
6926
6927 static void daemonize(void) {
6928 int fd;
6929 FILE *fp;
6930
6931 if (fork() != 0) exit(0); /* parent exits */
6932 printf("New pid: %d\n", getpid());
6933 setsid(); /* create a new session */
6934
6935 /* Every output goes to /dev/null. If Redis is daemonized but
6936 * the 'logfile' is set to 'stdout' in the configuration file
6937 * it will not log at all. */
6938 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6939 dup2(fd, STDIN_FILENO);
6940 dup2(fd, STDOUT_FILENO);
6941 dup2(fd, STDERR_FILENO);
6942 if (fd > STDERR_FILENO) close(fd);
6943 }
6944 /* Try to write the pid file */
6945 fp = fopen(server.pidfile,"w");
6946 if (fp) {
6947 fprintf(fp,"%d\n",getpid());
6948 fclose(fp);
6949 }
6950 }
6951
6952 int main(int argc, char **argv) {
6953 initServerConfig();
6954 if (argc == 2) {
6955 resetServerSaveParams();
6956 loadServerConfig(argv[1]);
6957 } else if (argc > 2) {
6958 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6959 exit(1);
6960 } else {
6961 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6962 }
6963 if (server.daemonize) daemonize();
6964 initServer();
6965 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6966 #ifdef __linux__
6967 linuxOvercommitMemoryWarning();
6968 #endif
6969 if (server.appendonly) {
6970 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6971 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6972 } else {
6973 if (rdbLoad(server.dbfilename) == REDIS_OK)
6974 redisLog(REDIS_NOTICE,"DB loaded from disk");
6975 }
6976 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6977 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6978 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6979 aeMain(server.el);
6980 aeDeleteEventLoop(server.el);
6981 return 0;
6982 }
6983
6984 /* ============================= Backtrace support ========================= */
6985
6986 #ifdef HAVE_BACKTRACE
6987 static char *findFuncName(void *pointer, unsigned long *offset);
6988
6989 static void *getMcontextEip(ucontext_t *uc) {
6990 #if defined(__FreeBSD__)
6991 return (void*) uc->uc_mcontext.mc_eip;
6992 #elif defined(__dietlibc__)
6993 return (void*) uc->uc_mcontext.eip;
6994 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6995 #if __x86_64__
6996 return (void*) uc->uc_mcontext->__ss.__rip;
6997 #else
6998 return (void*) uc->uc_mcontext->__ss.__eip;
6999 #endif
7000 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
7001 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
7002 return (void*) uc->uc_mcontext->__ss.__rip;
7003 #else
7004 return (void*) uc->uc_mcontext->__ss.__eip;
7005 #endif
7006 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
7007 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
7008 #elif defined(__ia64__) /* Linux IA64 */
7009 return (void*) uc->uc_mcontext.sc_ip;
7010 #else
7011 return NULL;
7012 #endif
7013 }
7014
7015 static void segvHandler(int sig, siginfo_t *info, void *secret) {
7016 void *trace[100];
7017 char **messages = NULL;
7018 int i, trace_size = 0;
7019 unsigned long offset=0;
7020 ucontext_t *uc = (ucontext_t*) secret;
7021 sds infostring;
7022 REDIS_NOTUSED(info);
7023
7024 redisLog(REDIS_WARNING,
7025 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
7026 infostring = genRedisInfoString();
7027 redisLog(REDIS_WARNING, "%s",infostring);
7028 /* It's not safe to sdsfree() the returned string under memory
7029 * corruption conditions. Let it leak as we are going to abort */
7030
7031 trace_size = backtrace(trace, 100);
7032 /* overwrite sigaction with caller's address */
7033 if (getMcontextEip(uc) != NULL) {
7034 trace[1] = getMcontextEip(uc);
7035 }
7036 messages = backtrace_symbols(trace, trace_size);
7037
7038 for (i=1; i<trace_size; ++i) {
7039 char *fn = findFuncName(trace[i], &offset), *p;
7040
7041 p = strchr(messages[i],'+');
7042 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
7043 redisLog(REDIS_WARNING,"%s", messages[i]);
7044 } else {
7045 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
7046 }
7047 }
7048 /* free(messages); Don't call free() with possibly corrupted memory. */
7049 exit(0);
7050 }
7051
7052 static void setupSigSegvAction(void) {
7053 struct sigaction act;
7054
7055 sigemptyset (&act.sa_mask);
7056 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7057 * is used. Otherwise, sa_handler is used */
7058 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
7059 act.sa_sigaction = segvHandler;
7060 sigaction (SIGSEGV, &act, NULL);
7061 sigaction (SIGBUS, &act, NULL);
7062 sigaction (SIGFPE, &act, NULL);
7063 sigaction (SIGILL, &act, NULL);
7064 sigaction (SIGBUS, &act, NULL);
7065 return;
7066 }
7067
7068 #include "staticsymbols.h"
7069 /* This function try to convert a pointer into a function name. It's used in
7070 * oreder to provide a backtrace under segmentation fault that's able to
7071 * display functions declared as static (otherwise the backtrace is useless). */
7072 static char *findFuncName(void *pointer, unsigned long *offset){
7073 int i, ret = -1;
7074 unsigned long off, minoff = 0;
7075
7076 /* Try to match against the Symbol with the smallest offset */
7077 for (i=0; symsTable[i].pointer; i++) {
7078 unsigned long lp = (unsigned long) pointer;
7079
7080 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
7081 off=lp-symsTable[i].pointer;
7082 if (ret < 0 || off < minoff) {
7083 minoff=off;
7084 ret=i;
7085 }
7086 }
7087 }
7088 if (ret == -1) return NULL;
7089 *offset = minoff;
7090 return symsTable[ret].name;
7091 }
7092 #else /* HAVE_BACKTRACE */
7093 static void setupSigSegvAction(void) {
7094 }
7095 #endif /* HAVE_BACKTRACE */
7096
7097
7098
7099 /* The End */
7100
7101
7102