]> git.saurik.com Git - redis.git/blob - redis.c
b31bb43d8be9a106645b6f0c7fa4af0317917561
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166 /* Client flags */
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188 /* List related stuff */
189 #define REDIS_HEAD 0
190 #define REDIS_TAIL 1
191
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
197
198 /* Log levels */
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
202
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
205
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
213
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr);
217
218 /*================================= Data types ============================== */
219
220 /* A redis object, that is a type able to hold a string / list / set */
221
222 /* The VM object structure */
223 struct redisObjectVM {
224 off_t page; /* the page at witch the object is stored on disk */
225 off_t usedpages; /* number of pages used on disk */
226 time_t atime; /* Last access time */
227 } vm;
228
229 /* The actual Redis Object */
230 typedef struct redisObject {
231 void *ptr;
232 unsigned char type;
233 unsigned char encoding;
234 unsigned char storage; /* If this object is a key, where is the value?
235 * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
236 unsigned char vtype; /* If this object is a key, and value is swapped out,
237 * this is the type of the swapped out object. */
238 int refcount;
239 /* VM fields, this are only allocated if VM is active, otherwise the
240 * object allocation function will just allocate
241 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
242 * Redis without VM active will not have any overhead. */
243 struct redisObjectVM vm;
244 } robj;
245
246 /* Macro used to initalize a Redis object allocated on the stack.
247 * Note that this macro is taken near the structure definition to make sure
248 * we'll update it when the structure is changed, to avoid bugs like
249 * bug #85 introduced exactly in this way. */
250 #define initStaticStringObject(_var,_ptr) do { \
251 _var.refcount = 1; \
252 _var.type = REDIS_STRING; \
253 _var.encoding = REDIS_ENCODING_RAW; \
254 _var.ptr = _ptr; \
255 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
256 } while(0);
257
258 typedef struct redisDb {
259 dict *dict; /* The keyspace for this DB */
260 dict *expires; /* Timeout of keys with a timeout set */
261 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
262 int id;
263 } redisDb;
264
265 /* Client MULTI/EXEC state */
266 typedef struct multiCmd {
267 robj **argv;
268 int argc;
269 struct redisCommand *cmd;
270 } multiCmd;
271
272 typedef struct multiState {
273 multiCmd *commands; /* Array of MULTI commands */
274 int count; /* Total number of MULTI commands */
275 } multiState;
276
277 /* With multiplexing we need to take per-clinet state.
278 * Clients are taken in a liked list. */
279 typedef struct redisClient {
280 int fd;
281 redisDb *db;
282 int dictid;
283 sds querybuf;
284 robj **argv, **mbargv;
285 int argc, mbargc;
286 int bulklen; /* bulk read len. -1 if not in bulk read mode */
287 int multibulk; /* multi bulk command format active */
288 list *reply;
289 int sentlen;
290 time_t lastinteraction; /* time of the last interaction, used for timeout */
291 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
292 /* REDIS_MULTI */
293 int slaveseldb; /* slave selected db, if this client is a slave */
294 int authenticated; /* when requirepass is non-NULL */
295 int replstate; /* replication state if this is a slave */
296 int repldbfd; /* replication DB file descriptor */
297 long repldboff; /* replication DB file offset */
298 off_t repldbsize; /* replication DB file size */
299 multiState mstate; /* MULTI/EXEC state */
300 robj **blockingkeys; /* The key we waiting to terminate a blocking
301 * operation such as BLPOP. Otherwise NULL. */
302 int blockingkeysnum; /* Number of blocking keys */
303 time_t blockingto; /* Blocking operation timeout. If UNIX current time
304 * is >= blockingto then the operation timed out. */
305 } redisClient;
306
307 struct saveparam {
308 time_t seconds;
309 int changes;
310 };
311
312 /* Global server state structure */
313 struct redisServer {
314 int port;
315 int fd;
316 redisDb *db;
317 dict *sharingpool; /* Poll used for object sharing */
318 unsigned int sharingpoolsize;
319 long long dirty; /* changes to DB from the last save */
320 list *clients;
321 list *slaves, *monitors;
322 char neterr[ANET_ERR_LEN];
323 aeEventLoop *el;
324 int cronloops; /* number of times the cron function run */
325 list *objfreelist; /* A list of freed objects to avoid malloc() */
326 time_t lastsave; /* Unix time of last save succeeede */
327 size_t usedmemory; /* Used memory in megabytes */
328 /* Fields used only for stats */
329 time_t stat_starttime; /* server start time */
330 long long stat_numcommands; /* number of processed commands */
331 long long stat_numconnections; /* number of connections received */
332 /* Configuration */
333 int verbosity;
334 int glueoutputbuf;
335 int maxidletime;
336 int dbnum;
337 int daemonize;
338 int appendonly;
339 int appendfsync;
340 time_t lastfsync;
341 int appendfd;
342 int appendseldb;
343 char *pidfile;
344 pid_t bgsavechildpid;
345 pid_t bgrewritechildpid;
346 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
347 struct saveparam *saveparams;
348 int saveparamslen;
349 char *logfile;
350 char *bindaddr;
351 char *dbfilename;
352 char *appendfilename;
353 char *requirepass;
354 int shareobjects;
355 int rdbcompression;
356 /* Replication related */
357 int isslave;
358 char *masterauth;
359 char *masterhost;
360 int masterport;
361 redisClient *master; /* client that is master for this slave */
362 int replstate;
363 unsigned int maxclients;
364 unsigned long maxmemory;
365 unsigned int blockedclients;
366 /* Sort parameters - qsort_r() is only available under BSD so we
367 * have to take this state global, in order to pass it to sortCompare() */
368 int sort_desc;
369 int sort_alpha;
370 int sort_bypattern;
371 /* Virtual memory configuration */
372 int vm_enabled;
373 off_t vm_page_size;
374 off_t vm_pages;
375 long vm_max_memory;
376 /* Virtual memory state */
377 FILE *vm_fp;
378 int vm_fd;
379 off_t vm_next_page; /* Next probably empty page */
380 off_t vm_near_pages; /* Number of pages allocated sequentially */
381 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
382 time_t unixtime; /* Unix time sampled every second. */
383 };
384
385 typedef void redisCommandProc(redisClient *c);
386 struct redisCommand {
387 char *name;
388 redisCommandProc *proc;
389 int arity;
390 int flags;
391 };
392
393 struct redisFunctionSym {
394 char *name;
395 unsigned long pointer;
396 };
397
398 typedef struct _redisSortObject {
399 robj *obj;
400 union {
401 double score;
402 robj *cmpobj;
403 } u;
404 } redisSortObject;
405
406 typedef struct _redisSortOperation {
407 int type;
408 robj *pattern;
409 } redisSortOperation;
410
411 /* ZSETs use a specialized version of Skiplists */
412
413 typedef struct zskiplistNode {
414 struct zskiplistNode **forward;
415 struct zskiplistNode *backward;
416 double score;
417 robj *obj;
418 } zskiplistNode;
419
420 typedef struct zskiplist {
421 struct zskiplistNode *header, *tail;
422 unsigned long length;
423 int level;
424 } zskiplist;
425
426 typedef struct zset {
427 dict *dict;
428 zskiplist *zsl;
429 } zset;
430
431 /* Our shared "common" objects */
432
433 struct sharedObjectsStruct {
434 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
435 *colon, *nullbulk, *nullmultibulk, *queued,
436 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
437 *outofrangeerr, *plus,
438 *select0, *select1, *select2, *select3, *select4,
439 *select5, *select6, *select7, *select8, *select9;
440 } shared;
441
442 /* Global vars that are actally used as constants. The following double
443 * values are used for double on-disk serialization, and are initialized
444 * at runtime to avoid strange compiler optimizations. */
445
446 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
447
448 /*================================ Prototypes =============================== */
449
450 static void freeStringObject(robj *o);
451 static void freeListObject(robj *o);
452 static void freeSetObject(robj *o);
453 static void decrRefCount(void *o);
454 static robj *createObject(int type, void *ptr);
455 static void freeClient(redisClient *c);
456 static int rdbLoad(char *filename);
457 static void addReply(redisClient *c, robj *obj);
458 static void addReplySds(redisClient *c, sds s);
459 static void incrRefCount(robj *o);
460 static int rdbSaveBackground(char *filename);
461 static robj *createStringObject(char *ptr, size_t len);
462 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
463 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
464 static int syncWithMaster(void);
465 static robj *tryObjectSharing(robj *o);
466 static int tryObjectEncoding(robj *o);
467 static robj *getDecodedObject(robj *o);
468 static int removeExpire(redisDb *db, robj *key);
469 static int expireIfNeeded(redisDb *db, robj *key);
470 static int deleteIfVolatile(redisDb *db, robj *key);
471 static int deleteKey(redisDb *db, robj *key);
472 static time_t getExpire(redisDb *db, robj *key);
473 static int setExpire(redisDb *db, robj *key, time_t when);
474 static void updateSlavesWaitingBgsave(int bgsaveerr);
475 static void freeMemoryIfNeeded(void);
476 static int processCommand(redisClient *c);
477 static void setupSigSegvAction(void);
478 static void rdbRemoveTempFile(pid_t childpid);
479 static void aofRemoveTempFile(pid_t childpid);
480 static size_t stringObjectLen(robj *o);
481 static void processInputBuffer(redisClient *c);
482 static zskiplist *zslCreate(void);
483 static void zslFree(zskiplist *zsl);
484 static void zslInsert(zskiplist *zsl, double score, robj *obj);
485 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
486 static void initClientMultiState(redisClient *c);
487 static void freeClientMultiState(redisClient *c);
488 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
489 static void unblockClient(redisClient *c);
490 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
491 static void vmInit(void);
492 static void vmMarkPagesFree(off_t page, off_t count);
493 static robj *vmLoadObject(robj *key);
494
495 static void authCommand(redisClient *c);
496 static void pingCommand(redisClient *c);
497 static void echoCommand(redisClient *c);
498 static void setCommand(redisClient *c);
499 static void setnxCommand(redisClient *c);
500 static void getCommand(redisClient *c);
501 static void delCommand(redisClient *c);
502 static void existsCommand(redisClient *c);
503 static void incrCommand(redisClient *c);
504 static void decrCommand(redisClient *c);
505 static void incrbyCommand(redisClient *c);
506 static void decrbyCommand(redisClient *c);
507 static void selectCommand(redisClient *c);
508 static void randomkeyCommand(redisClient *c);
509 static void keysCommand(redisClient *c);
510 static void dbsizeCommand(redisClient *c);
511 static void lastsaveCommand(redisClient *c);
512 static void saveCommand(redisClient *c);
513 static void bgsaveCommand(redisClient *c);
514 static void bgrewriteaofCommand(redisClient *c);
515 static void shutdownCommand(redisClient *c);
516 static void moveCommand(redisClient *c);
517 static void renameCommand(redisClient *c);
518 static void renamenxCommand(redisClient *c);
519 static void lpushCommand(redisClient *c);
520 static void rpushCommand(redisClient *c);
521 static void lpopCommand(redisClient *c);
522 static void rpopCommand(redisClient *c);
523 static void llenCommand(redisClient *c);
524 static void lindexCommand(redisClient *c);
525 static void lrangeCommand(redisClient *c);
526 static void ltrimCommand(redisClient *c);
527 static void typeCommand(redisClient *c);
528 static void lsetCommand(redisClient *c);
529 static void saddCommand(redisClient *c);
530 static void sremCommand(redisClient *c);
531 static void smoveCommand(redisClient *c);
532 static void sismemberCommand(redisClient *c);
533 static void scardCommand(redisClient *c);
534 static void spopCommand(redisClient *c);
535 static void srandmemberCommand(redisClient *c);
536 static void sinterCommand(redisClient *c);
537 static void sinterstoreCommand(redisClient *c);
538 static void sunionCommand(redisClient *c);
539 static void sunionstoreCommand(redisClient *c);
540 static void sdiffCommand(redisClient *c);
541 static void sdiffstoreCommand(redisClient *c);
542 static void syncCommand(redisClient *c);
543 static void flushdbCommand(redisClient *c);
544 static void flushallCommand(redisClient *c);
545 static void sortCommand(redisClient *c);
546 static void lremCommand(redisClient *c);
547 static void rpoplpushcommand(redisClient *c);
548 static void infoCommand(redisClient *c);
549 static void mgetCommand(redisClient *c);
550 static void monitorCommand(redisClient *c);
551 static void expireCommand(redisClient *c);
552 static void expireatCommand(redisClient *c);
553 static void getsetCommand(redisClient *c);
554 static void ttlCommand(redisClient *c);
555 static void slaveofCommand(redisClient *c);
556 static void debugCommand(redisClient *c);
557 static void msetCommand(redisClient *c);
558 static void msetnxCommand(redisClient *c);
559 static void zaddCommand(redisClient *c);
560 static void zincrbyCommand(redisClient *c);
561 static void zrangeCommand(redisClient *c);
562 static void zrangebyscoreCommand(redisClient *c);
563 static void zrevrangeCommand(redisClient *c);
564 static void zcardCommand(redisClient *c);
565 static void zremCommand(redisClient *c);
566 static void zscoreCommand(redisClient *c);
567 static void zremrangebyscoreCommand(redisClient *c);
568 static void multiCommand(redisClient *c);
569 static void execCommand(redisClient *c);
570 static void blpopCommand(redisClient *c);
571 static void brpopCommand(redisClient *c);
572
573 /*================================= Globals ================================= */
574
575 /* Global vars */
576 static struct redisServer server; /* server global state */
577 static struct redisCommand cmdTable[] = {
578 {"get",getCommand,2,REDIS_CMD_INLINE},
579 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"del",delCommand,-2,REDIS_CMD_INLINE},
582 {"exists",existsCommand,2,REDIS_CMD_INLINE},
583 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
584 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
585 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
586 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
587 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
588 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
589 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
590 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
591 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
592 {"llen",llenCommand,2,REDIS_CMD_INLINE},
593 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
594 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
595 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
596 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
597 {"lrem",lremCommand,4,REDIS_CMD_BULK},
598 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
599 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
600 {"srem",sremCommand,3,REDIS_CMD_BULK},
601 {"smove",smoveCommand,4,REDIS_CMD_BULK},
602 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
603 {"scard",scardCommand,2,REDIS_CMD_INLINE},
604 {"spop",spopCommand,2,REDIS_CMD_INLINE},
605 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
606 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
607 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
608 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
609 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
610 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
611 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
612 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
613 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
614 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
615 {"zrem",zremCommand,3,REDIS_CMD_BULK},
616 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
617 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
618 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
619 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
620 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
621 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
622 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
623 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
624 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
625 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
626 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
627 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
628 {"select",selectCommand,2,REDIS_CMD_INLINE},
629 {"move",moveCommand,3,REDIS_CMD_INLINE},
630 {"rename",renameCommand,3,REDIS_CMD_INLINE},
631 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
632 {"expire",expireCommand,3,REDIS_CMD_INLINE},
633 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
634 {"keys",keysCommand,2,REDIS_CMD_INLINE},
635 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
636 {"auth",authCommand,2,REDIS_CMD_INLINE},
637 {"ping",pingCommand,1,REDIS_CMD_INLINE},
638 {"echo",echoCommand,2,REDIS_CMD_BULK},
639 {"save",saveCommand,1,REDIS_CMD_INLINE},
640 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
641 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
642 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
643 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
644 {"type",typeCommand,2,REDIS_CMD_INLINE},
645 {"multi",multiCommand,1,REDIS_CMD_INLINE},
646 {"exec",execCommand,1,REDIS_CMD_INLINE},
647 {"sync",syncCommand,1,REDIS_CMD_INLINE},
648 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
649 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
650 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
651 {"info",infoCommand,1,REDIS_CMD_INLINE},
652 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
653 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
654 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
655 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
656 {NULL,NULL,0,0}
657 };
658
659 /*============================ Utility functions ============================ */
660
661 /* Glob-style pattern matching. */
662 int stringmatchlen(const char *pattern, int patternLen,
663 const char *string, int stringLen, int nocase)
664 {
665 while(patternLen) {
666 switch(pattern[0]) {
667 case '*':
668 while (pattern[1] == '*') {
669 pattern++;
670 patternLen--;
671 }
672 if (patternLen == 1)
673 return 1; /* match */
674 while(stringLen) {
675 if (stringmatchlen(pattern+1, patternLen-1,
676 string, stringLen, nocase))
677 return 1; /* match */
678 string++;
679 stringLen--;
680 }
681 return 0; /* no match */
682 break;
683 case '?':
684 if (stringLen == 0)
685 return 0; /* no match */
686 string++;
687 stringLen--;
688 break;
689 case '[':
690 {
691 int not, match;
692
693 pattern++;
694 patternLen--;
695 not = pattern[0] == '^';
696 if (not) {
697 pattern++;
698 patternLen--;
699 }
700 match = 0;
701 while(1) {
702 if (pattern[0] == '\\') {
703 pattern++;
704 patternLen--;
705 if (pattern[0] == string[0])
706 match = 1;
707 } else if (pattern[0] == ']') {
708 break;
709 } else if (patternLen == 0) {
710 pattern--;
711 patternLen++;
712 break;
713 } else if (pattern[1] == '-' && patternLen >= 3) {
714 int start = pattern[0];
715 int end = pattern[2];
716 int c = string[0];
717 if (start > end) {
718 int t = start;
719 start = end;
720 end = t;
721 }
722 if (nocase) {
723 start = tolower(start);
724 end = tolower(end);
725 c = tolower(c);
726 }
727 pattern += 2;
728 patternLen -= 2;
729 if (c >= start && c <= end)
730 match = 1;
731 } else {
732 if (!nocase) {
733 if (pattern[0] == string[0])
734 match = 1;
735 } else {
736 if (tolower((int)pattern[0]) == tolower((int)string[0]))
737 match = 1;
738 }
739 }
740 pattern++;
741 patternLen--;
742 }
743 if (not)
744 match = !match;
745 if (!match)
746 return 0; /* no match */
747 string++;
748 stringLen--;
749 break;
750 }
751 case '\\':
752 if (patternLen >= 2) {
753 pattern++;
754 patternLen--;
755 }
756 /* fall through */
757 default:
758 if (!nocase) {
759 if (pattern[0] != string[0])
760 return 0; /* no match */
761 } else {
762 if (tolower((int)pattern[0]) != tolower((int)string[0]))
763 return 0; /* no match */
764 }
765 string++;
766 stringLen--;
767 break;
768 }
769 pattern++;
770 patternLen--;
771 if (stringLen == 0) {
772 while(*pattern == '*') {
773 pattern++;
774 patternLen--;
775 }
776 break;
777 }
778 }
779 if (patternLen == 0 && stringLen == 0)
780 return 1;
781 return 0;
782 }
783
784 static void redisLog(int level, const char *fmt, ...) {
785 va_list ap;
786 FILE *fp;
787
788 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
789 if (!fp) return;
790
791 va_start(ap, fmt);
792 if (level >= server.verbosity) {
793 char *c = ".-*";
794 char buf[64];
795 time_t now;
796
797 now = time(NULL);
798 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
799 fprintf(fp,"%s %c ",buf,c[level]);
800 vfprintf(fp, fmt, ap);
801 fprintf(fp,"\n");
802 fflush(fp);
803 }
804 va_end(ap);
805
806 if (server.logfile) fclose(fp);
807 }
808
809 /*====================== Hash table type implementation ==================== */
810
811 /* This is an hash table type that uses the SDS dynamic strings libary as
812 * keys and radis objects as values (objects can hold SDS strings,
813 * lists, sets). */
814
815 static void dictVanillaFree(void *privdata, void *val)
816 {
817 DICT_NOTUSED(privdata);
818 zfree(val);
819 }
820
821 static void dictListDestructor(void *privdata, void *val)
822 {
823 DICT_NOTUSED(privdata);
824 listRelease((list*)val);
825 }
826
827 static int sdsDictKeyCompare(void *privdata, const void *key1,
828 const void *key2)
829 {
830 int l1,l2;
831 DICT_NOTUSED(privdata);
832
833 l1 = sdslen((sds)key1);
834 l2 = sdslen((sds)key2);
835 if (l1 != l2) return 0;
836 return memcmp(key1, key2, l1) == 0;
837 }
838
839 static void dictRedisObjectDestructor(void *privdata, void *val)
840 {
841 DICT_NOTUSED(privdata);
842
843 if (val == NULL) return; /* Values of swapped out keys as set to NULL */
844 decrRefCount(val);
845 }
846
847 static int dictObjKeyCompare(void *privdata, const void *key1,
848 const void *key2)
849 {
850 const robj *o1 = key1, *o2 = key2;
851 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
852 }
853
854 static unsigned int dictObjHash(const void *key) {
855 const robj *o = key;
856 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
857 }
858
859 static int dictEncObjKeyCompare(void *privdata, const void *key1,
860 const void *key2)
861 {
862 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
863 int cmp;
864
865 o1 = getDecodedObject(o1);
866 o2 = getDecodedObject(o2);
867 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
868 decrRefCount(o1);
869 decrRefCount(o2);
870 return cmp;
871 }
872
873 static unsigned int dictEncObjHash(const void *key) {
874 robj *o = (robj*) key;
875
876 o = getDecodedObject(o);
877 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
878 decrRefCount(o);
879 return hash;
880 }
881
882 static dictType setDictType = {
883 dictEncObjHash, /* hash function */
884 NULL, /* key dup */
885 NULL, /* val dup */
886 dictEncObjKeyCompare, /* key compare */
887 dictRedisObjectDestructor, /* key destructor */
888 NULL /* val destructor */
889 };
890
891 static dictType zsetDictType = {
892 dictEncObjHash, /* hash function */
893 NULL, /* key dup */
894 NULL, /* val dup */
895 dictEncObjKeyCompare, /* key compare */
896 dictRedisObjectDestructor, /* key destructor */
897 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
898 };
899
900 static dictType hashDictType = {
901 dictObjHash, /* hash function */
902 NULL, /* key dup */
903 NULL, /* val dup */
904 dictObjKeyCompare, /* key compare */
905 dictRedisObjectDestructor, /* key destructor */
906 dictRedisObjectDestructor /* val destructor */
907 };
908
909 /* Keylist hash table type has unencoded redis objects as keys and
910 * lists as values. It's used for blocking operations (BLPOP) */
911 static dictType keylistDictType = {
912 dictObjHash, /* hash function */
913 NULL, /* key dup */
914 NULL, /* val dup */
915 dictObjKeyCompare, /* key compare */
916 dictRedisObjectDestructor, /* key destructor */
917 dictListDestructor /* val destructor */
918 };
919
920 /* ========================= Random utility functions ======================= */
921
922 /* Redis generally does not try to recover from out of memory conditions
923 * when allocating objects or strings, it is not clear if it will be possible
924 * to report this condition to the client since the networking layer itself
925 * is based on heap allocation for send buffers, so we simply abort.
926 * At least the code will be simpler to read... */
927 static void oom(const char *msg) {
928 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
929 sleep(1);
930 abort();
931 }
932
933 /* ====================== Redis server networking stuff ===================== */
934 static void closeTimedoutClients(void) {
935 redisClient *c;
936 listNode *ln;
937 time_t now = time(NULL);
938
939 listRewind(server.clients);
940 while ((ln = listYield(server.clients)) != NULL) {
941 c = listNodeValue(ln);
942 if (server.maxidletime &&
943 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
944 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
945 (now - c->lastinteraction > server.maxidletime))
946 {
947 redisLog(REDIS_DEBUG,"Closing idle client");
948 freeClient(c);
949 } else if (c->flags & REDIS_BLOCKED) {
950 if (c->blockingto != 0 && c->blockingto < now) {
951 addReply(c,shared.nullmultibulk);
952 unblockClient(c);
953 }
954 }
955 }
956 }
957
958 static int htNeedsResize(dict *dict) {
959 long long size, used;
960
961 size = dictSlots(dict);
962 used = dictSize(dict);
963 return (size && used && size > DICT_HT_INITIAL_SIZE &&
964 (used*100/size < REDIS_HT_MINFILL));
965 }
966
967 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
968 * we resize the hash table to save memory */
969 static void tryResizeHashTables(void) {
970 int j;
971
972 for (j = 0; j < server.dbnum; j++) {
973 if (htNeedsResize(server.db[j].dict)) {
974 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
975 dictResize(server.db[j].dict);
976 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
977 }
978 if (htNeedsResize(server.db[j].expires))
979 dictResize(server.db[j].expires);
980 }
981 }
982
983 /* A background saving child (BGSAVE) terminated its work. Handle this. */
984 void backgroundSaveDoneHandler(int statloc) {
985 int exitcode = WEXITSTATUS(statloc);
986 int bysignal = WIFSIGNALED(statloc);
987
988 if (!bysignal && exitcode == 0) {
989 redisLog(REDIS_NOTICE,
990 "Background saving terminated with success");
991 server.dirty = 0;
992 server.lastsave = time(NULL);
993 } else if (!bysignal && exitcode != 0) {
994 redisLog(REDIS_WARNING, "Background saving error");
995 } else {
996 redisLog(REDIS_WARNING,
997 "Background saving terminated by signal");
998 rdbRemoveTempFile(server.bgsavechildpid);
999 }
1000 server.bgsavechildpid = -1;
1001 /* Possibly there are slaves waiting for a BGSAVE in order to be served
1002 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1003 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
1004 }
1005
1006 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1007 * Handle this. */
1008 void backgroundRewriteDoneHandler(int statloc) {
1009 int exitcode = WEXITSTATUS(statloc);
1010 int bysignal = WIFSIGNALED(statloc);
1011
1012 if (!bysignal && exitcode == 0) {
1013 int fd;
1014 char tmpfile[256];
1015
1016 redisLog(REDIS_NOTICE,
1017 "Background append only file rewriting terminated with success");
1018 /* Now it's time to flush the differences accumulated by the parent */
1019 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1020 fd = open(tmpfile,O_WRONLY|O_APPEND);
1021 if (fd == -1) {
1022 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1023 goto cleanup;
1024 }
1025 /* Flush our data... */
1026 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1027 (signed) sdslen(server.bgrewritebuf)) {
1028 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1029 close(fd);
1030 goto cleanup;
1031 }
1032 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1033 /* Now our work is to rename the temp file into the stable file. And
1034 * switch the file descriptor used by the server for append only. */
1035 if (rename(tmpfile,server.appendfilename) == -1) {
1036 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1037 close(fd);
1038 goto cleanup;
1039 }
1040 /* Mission completed... almost */
1041 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1042 if (server.appendfd != -1) {
1043 /* If append only is actually enabled... */
1044 close(server.appendfd);
1045 server.appendfd = fd;
1046 fsync(fd);
1047 server.appendseldb = -1; /* Make sure it will issue SELECT */
1048 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1049 } else {
1050 /* If append only is disabled we just generate a dump in this
1051 * format. Why not? */
1052 close(fd);
1053 }
1054 } else if (!bysignal && exitcode != 0) {
1055 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1056 } else {
1057 redisLog(REDIS_WARNING,
1058 "Background append only file rewriting terminated by signal");
1059 }
1060 cleanup:
1061 sdsfree(server.bgrewritebuf);
1062 server.bgrewritebuf = sdsempty();
1063 aofRemoveTempFile(server.bgrewritechildpid);
1064 server.bgrewritechildpid = -1;
1065 }
1066
1067 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1068 int j, loops = server.cronloops++;
1069 REDIS_NOTUSED(eventLoop);
1070 REDIS_NOTUSED(id);
1071 REDIS_NOTUSED(clientData);
1072
1073 /* We take a cached value of the unix time in the global state because
1074 * with virtual memory and aging there is to store the current time
1075 * in objects at every object access, and accuracy is not needed.
1076 * To access a global var is faster than calling time(NULL) */
1077 server.unixtime = time(NULL);
1078
1079 /* Update the global state with the amount of used memory */
1080 server.usedmemory = zmalloc_used_memory();
1081
1082 /* Show some info about non-empty databases */
1083 for (j = 0; j < server.dbnum; j++) {
1084 long long size, used, vkeys;
1085
1086 size = dictSlots(server.db[j].dict);
1087 used = dictSize(server.db[j].dict);
1088 vkeys = dictSize(server.db[j].expires);
1089 if (!(loops % 5) && (used || vkeys)) {
1090 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1091 /* dictPrintStats(server.dict); */
1092 }
1093 }
1094
1095 /* We don't want to resize the hash tables while a bacground saving
1096 * is in progress: the saving child is created using fork() that is
1097 * implemented with a copy-on-write semantic in most modern systems, so
1098 * if we resize the HT while there is the saving child at work actually
1099 * a lot of memory movements in the parent will cause a lot of pages
1100 * copied. */
1101 if (server.bgsavechildpid == -1) tryResizeHashTables();
1102
1103 /* Show information about connected clients */
1104 if (!(loops % 5)) {
1105 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1106 listLength(server.clients)-listLength(server.slaves),
1107 listLength(server.slaves),
1108 server.usedmemory,
1109 dictSize(server.sharingpool));
1110 }
1111
1112 /* Close connections of timedout clients */
1113 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1114 closeTimedoutClients();
1115
1116 /* Check if a background saving or AOF rewrite in progress terminated */
1117 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1118 int statloc;
1119 pid_t pid;
1120
1121 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1122 if (pid == server.bgsavechildpid) {
1123 backgroundSaveDoneHandler(statloc);
1124 } else {
1125 backgroundRewriteDoneHandler(statloc);
1126 }
1127 }
1128 } else {
1129 /* If there is not a background saving in progress check if
1130 * we have to save now */
1131 time_t now = time(NULL);
1132 for (j = 0; j < server.saveparamslen; j++) {
1133 struct saveparam *sp = server.saveparams+j;
1134
1135 if (server.dirty >= sp->changes &&
1136 now-server.lastsave > sp->seconds) {
1137 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1138 sp->changes, sp->seconds);
1139 rdbSaveBackground(server.dbfilename);
1140 break;
1141 }
1142 }
1143 }
1144
1145 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1146 * will use few CPU cycles if there are few expiring keys, otherwise
1147 * it will get more aggressive to avoid that too much memory is used by
1148 * keys that can be removed from the keyspace. */
1149 for (j = 0; j < server.dbnum; j++) {
1150 int expired;
1151 redisDb *db = server.db+j;
1152
1153 /* Continue to expire if at the end of the cycle more than 25%
1154 * of the keys were expired. */
1155 do {
1156 int num = dictSize(db->expires);
1157 time_t now = time(NULL);
1158
1159 expired = 0;
1160 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1161 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1162 while (num--) {
1163 dictEntry *de;
1164 time_t t;
1165
1166 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1167 t = (time_t) dictGetEntryVal(de);
1168 if (now > t) {
1169 deleteKey(db,dictGetEntryKey(de));
1170 expired++;
1171 }
1172 }
1173 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1174 }
1175
1176 /* Check if we should connect to a MASTER */
1177 if (server.replstate == REDIS_REPL_CONNECT) {
1178 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1179 if (syncWithMaster() == REDIS_OK) {
1180 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1181 }
1182 }
1183 return 1000;
1184 }
1185
1186 static void createSharedObjects(void) {
1187 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1188 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1189 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1190 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1191 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1192 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1193 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1194 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1195 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1196 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1197 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1198 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1199 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1200 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1201 "-ERR no such key\r\n"));
1202 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1203 "-ERR syntax error\r\n"));
1204 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1205 "-ERR source and destination objects are the same\r\n"));
1206 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1207 "-ERR index out of range\r\n"));
1208 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1209 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1210 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1211 shared.select0 = createStringObject("select 0\r\n",10);
1212 shared.select1 = createStringObject("select 1\r\n",10);
1213 shared.select2 = createStringObject("select 2\r\n",10);
1214 shared.select3 = createStringObject("select 3\r\n",10);
1215 shared.select4 = createStringObject("select 4\r\n",10);
1216 shared.select5 = createStringObject("select 5\r\n",10);
1217 shared.select6 = createStringObject("select 6\r\n",10);
1218 shared.select7 = createStringObject("select 7\r\n",10);
1219 shared.select8 = createStringObject("select 8\r\n",10);
1220 shared.select9 = createStringObject("select 9\r\n",10);
1221 }
1222
1223 static void appendServerSaveParams(time_t seconds, int changes) {
1224 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1225 server.saveparams[server.saveparamslen].seconds = seconds;
1226 server.saveparams[server.saveparamslen].changes = changes;
1227 server.saveparamslen++;
1228 }
1229
1230 static void resetServerSaveParams() {
1231 zfree(server.saveparams);
1232 server.saveparams = NULL;
1233 server.saveparamslen = 0;
1234 }
1235
1236 static void initServerConfig() {
1237 server.dbnum = REDIS_DEFAULT_DBNUM;
1238 server.port = REDIS_SERVERPORT;
1239 server.verbosity = REDIS_DEBUG;
1240 server.maxidletime = REDIS_MAXIDLETIME;
1241 server.saveparams = NULL;
1242 server.logfile = NULL; /* NULL = log on standard output */
1243 server.bindaddr = NULL;
1244 server.glueoutputbuf = 1;
1245 server.daemonize = 0;
1246 server.appendonly = 0;
1247 server.appendfsync = APPENDFSYNC_ALWAYS;
1248 server.lastfsync = time(NULL);
1249 server.appendfd = -1;
1250 server.appendseldb = -1; /* Make sure the first time will not match */
1251 server.pidfile = "/var/run/redis.pid";
1252 server.dbfilename = "dump.rdb";
1253 server.appendfilename = "appendonly.aof";
1254 server.requirepass = NULL;
1255 server.shareobjects = 0;
1256 server.rdbcompression = 1;
1257 server.sharingpoolsize = 1024;
1258 server.maxclients = 0;
1259 server.blockedclients = 0;
1260 server.maxmemory = 0;
1261 server.vm_enabled = 0;
1262 server.vm_page_size = 256; /* 256 bytes per page */
1263 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1264 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1265
1266 resetServerSaveParams();
1267
1268 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1269 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1270 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1271 /* Replication related */
1272 server.isslave = 0;
1273 server.masterauth = NULL;
1274 server.masterhost = NULL;
1275 server.masterport = 6379;
1276 server.master = NULL;
1277 server.replstate = REDIS_REPL_NONE;
1278
1279 /* Double constants initialization */
1280 R_Zero = 0.0;
1281 R_PosInf = 1.0/R_Zero;
1282 R_NegInf = -1.0/R_Zero;
1283 R_Nan = R_Zero/R_Zero;
1284 }
1285
1286 static void initServer() {
1287 int j;
1288
1289 signal(SIGHUP, SIG_IGN);
1290 signal(SIGPIPE, SIG_IGN);
1291 setupSigSegvAction();
1292
1293 server.clients = listCreate();
1294 server.slaves = listCreate();
1295 server.monitors = listCreate();
1296 server.objfreelist = listCreate();
1297 createSharedObjects();
1298 server.el = aeCreateEventLoop();
1299 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1300 server.sharingpool = dictCreate(&setDictType,NULL);
1301 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1302 if (server.fd == -1) {
1303 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1304 exit(1);
1305 }
1306 for (j = 0; j < server.dbnum; j++) {
1307 server.db[j].dict = dictCreate(&hashDictType,NULL);
1308 server.db[j].expires = dictCreate(&setDictType,NULL);
1309 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1310 server.db[j].id = j;
1311 }
1312 server.cronloops = 0;
1313 server.bgsavechildpid = -1;
1314 server.bgrewritechildpid = -1;
1315 server.bgrewritebuf = sdsempty();
1316 server.lastsave = time(NULL);
1317 server.dirty = 0;
1318 server.usedmemory = 0;
1319 server.stat_numcommands = 0;
1320 server.stat_numconnections = 0;
1321 server.stat_starttime = time(NULL);
1322 server.unixtime = time(NULL);
1323 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1324
1325 if (server.appendonly) {
1326 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1327 if (server.appendfd == -1) {
1328 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1329 strerror(errno));
1330 exit(1);
1331 }
1332 }
1333
1334 if (server.vm_enabled) vmInit();
1335 }
1336
1337 /* Empty the whole database */
1338 static long long emptyDb() {
1339 int j;
1340 long long removed = 0;
1341
1342 for (j = 0; j < server.dbnum; j++) {
1343 removed += dictSize(server.db[j].dict);
1344 dictEmpty(server.db[j].dict);
1345 dictEmpty(server.db[j].expires);
1346 }
1347 return removed;
1348 }
1349
1350 static int yesnotoi(char *s) {
1351 if (!strcasecmp(s,"yes")) return 1;
1352 else if (!strcasecmp(s,"no")) return 0;
1353 else return -1;
1354 }
1355
1356 /* I agree, this is a very rudimental way to load a configuration...
1357 will improve later if the config gets more complex */
1358 static void loadServerConfig(char *filename) {
1359 FILE *fp;
1360 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1361 int linenum = 0;
1362 sds line = NULL;
1363
1364 if (filename[0] == '-' && filename[1] == '\0')
1365 fp = stdin;
1366 else {
1367 if ((fp = fopen(filename,"r")) == NULL) {
1368 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1369 exit(1);
1370 }
1371 }
1372
1373 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1374 sds *argv;
1375 int argc, j;
1376
1377 linenum++;
1378 line = sdsnew(buf);
1379 line = sdstrim(line," \t\r\n");
1380
1381 /* Skip comments and blank lines*/
1382 if (line[0] == '#' || line[0] == '\0') {
1383 sdsfree(line);
1384 continue;
1385 }
1386
1387 /* Split into arguments */
1388 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1389 sdstolower(argv[0]);
1390
1391 /* Execute config directives */
1392 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1393 server.maxidletime = atoi(argv[1]);
1394 if (server.maxidletime < 0) {
1395 err = "Invalid timeout value"; goto loaderr;
1396 }
1397 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1398 server.port = atoi(argv[1]);
1399 if (server.port < 1 || server.port > 65535) {
1400 err = "Invalid port"; goto loaderr;
1401 }
1402 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1403 server.bindaddr = zstrdup(argv[1]);
1404 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1405 int seconds = atoi(argv[1]);
1406 int changes = atoi(argv[2]);
1407 if (seconds < 1 || changes < 0) {
1408 err = "Invalid save parameters"; goto loaderr;
1409 }
1410 appendServerSaveParams(seconds,changes);
1411 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1412 if (chdir(argv[1]) == -1) {
1413 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1414 argv[1], strerror(errno));
1415 exit(1);
1416 }
1417 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1418 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1419 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1420 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1421 else {
1422 err = "Invalid log level. Must be one of debug, notice, warning";
1423 goto loaderr;
1424 }
1425 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1426 FILE *logfp;
1427
1428 server.logfile = zstrdup(argv[1]);
1429 if (!strcasecmp(server.logfile,"stdout")) {
1430 zfree(server.logfile);
1431 server.logfile = NULL;
1432 }
1433 if (server.logfile) {
1434 /* Test if we are able to open the file. The server will not
1435 * be able to abort just for this problem later... */
1436 logfp = fopen(server.logfile,"a");
1437 if (logfp == NULL) {
1438 err = sdscatprintf(sdsempty(),
1439 "Can't open the log file: %s", strerror(errno));
1440 goto loaderr;
1441 }
1442 fclose(logfp);
1443 }
1444 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1445 server.dbnum = atoi(argv[1]);
1446 if (server.dbnum < 1) {
1447 err = "Invalid number of databases"; goto loaderr;
1448 }
1449 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1450 server.maxclients = atoi(argv[1]);
1451 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1452 server.maxmemory = strtoll(argv[1], NULL, 10);
1453 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1454 server.masterhost = sdsnew(argv[1]);
1455 server.masterport = atoi(argv[2]);
1456 server.replstate = REDIS_REPL_CONNECT;
1457 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1458 server.masterauth = zstrdup(argv[1]);
1459 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1460 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1461 err = "argument must be 'yes' or 'no'"; goto loaderr;
1462 }
1463 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1464 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1465 err = "argument must be 'yes' or 'no'"; goto loaderr;
1466 }
1467 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1468 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1469 err = "argument must be 'yes' or 'no'"; goto loaderr;
1470 }
1471 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1472 server.sharingpoolsize = atoi(argv[1]);
1473 if (server.sharingpoolsize < 1) {
1474 err = "invalid object sharing pool size"; goto loaderr;
1475 }
1476 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1477 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1478 err = "argument must be 'yes' or 'no'"; goto loaderr;
1479 }
1480 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1481 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1482 err = "argument must be 'yes' or 'no'"; goto loaderr;
1483 }
1484 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1485 if (!strcasecmp(argv[1],"no")) {
1486 server.appendfsync = APPENDFSYNC_NO;
1487 } else if (!strcasecmp(argv[1],"always")) {
1488 server.appendfsync = APPENDFSYNC_ALWAYS;
1489 } else if (!strcasecmp(argv[1],"everysec")) {
1490 server.appendfsync = APPENDFSYNC_EVERYSEC;
1491 } else {
1492 err = "argument must be 'no', 'always' or 'everysec'";
1493 goto loaderr;
1494 }
1495 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1496 server.requirepass = zstrdup(argv[1]);
1497 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1498 server.pidfile = zstrdup(argv[1]);
1499 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1500 server.dbfilename = zstrdup(argv[1]);
1501 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1502 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1503 err = "argument must be 'yes' or 'no'"; goto loaderr;
1504 }
1505 } else {
1506 err = "Bad directive or wrong number of arguments"; goto loaderr;
1507 }
1508 for (j = 0; j < argc; j++)
1509 sdsfree(argv[j]);
1510 zfree(argv);
1511 sdsfree(line);
1512 }
1513 if (fp != stdin) fclose(fp);
1514 return;
1515
1516 loaderr:
1517 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1518 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1519 fprintf(stderr, ">>> '%s'\n", line);
1520 fprintf(stderr, "%s\n", err);
1521 exit(1);
1522 }
1523
1524 static void freeClientArgv(redisClient *c) {
1525 int j;
1526
1527 for (j = 0; j < c->argc; j++)
1528 decrRefCount(c->argv[j]);
1529 for (j = 0; j < c->mbargc; j++)
1530 decrRefCount(c->mbargv[j]);
1531 c->argc = 0;
1532 c->mbargc = 0;
1533 }
1534
1535 static void freeClient(redisClient *c) {
1536 listNode *ln;
1537
1538 /* Note that if the client we are freeing is blocked into a blocking
1539 * call, we have to set querybuf to NULL *before* to call unblockClient()
1540 * to avoid processInputBuffer() will get called. Also it is important
1541 * to remove the file events after this, because this call adds
1542 * the READABLE event. */
1543 sdsfree(c->querybuf);
1544 c->querybuf = NULL;
1545 if (c->flags & REDIS_BLOCKED)
1546 unblockClient(c);
1547
1548 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1549 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1550 listRelease(c->reply);
1551 freeClientArgv(c);
1552 close(c->fd);
1553 ln = listSearchKey(server.clients,c);
1554 redisAssert(ln != NULL);
1555 listDelNode(server.clients,ln);
1556 if (c->flags & REDIS_SLAVE) {
1557 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1558 close(c->repldbfd);
1559 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1560 ln = listSearchKey(l,c);
1561 redisAssert(ln != NULL);
1562 listDelNode(l,ln);
1563 }
1564 if (c->flags & REDIS_MASTER) {
1565 server.master = NULL;
1566 server.replstate = REDIS_REPL_CONNECT;
1567 }
1568 zfree(c->argv);
1569 zfree(c->mbargv);
1570 freeClientMultiState(c);
1571 zfree(c);
1572 }
1573
1574 #define GLUEREPLY_UP_TO (1024)
1575 static void glueReplyBuffersIfNeeded(redisClient *c) {
1576 int copylen = 0;
1577 char buf[GLUEREPLY_UP_TO];
1578 listNode *ln;
1579 robj *o;
1580
1581 listRewind(c->reply);
1582 while((ln = listYield(c->reply))) {
1583 int objlen;
1584
1585 o = ln->value;
1586 objlen = sdslen(o->ptr);
1587 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1588 memcpy(buf+copylen,o->ptr,objlen);
1589 copylen += objlen;
1590 listDelNode(c->reply,ln);
1591 } else {
1592 if (copylen == 0) return;
1593 break;
1594 }
1595 }
1596 /* Now the output buffer is empty, add the new single element */
1597 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1598 listAddNodeHead(c->reply,o);
1599 }
1600
1601 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1602 redisClient *c = privdata;
1603 int nwritten = 0, totwritten = 0, objlen;
1604 robj *o;
1605 REDIS_NOTUSED(el);
1606 REDIS_NOTUSED(mask);
1607
1608 /* Use writev() if we have enough buffers to send */
1609 if (!server.glueoutputbuf &&
1610 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1611 !(c->flags & REDIS_MASTER))
1612 {
1613 sendReplyToClientWritev(el, fd, privdata, mask);
1614 return;
1615 }
1616
1617 while(listLength(c->reply)) {
1618 if (server.glueoutputbuf && listLength(c->reply) > 1)
1619 glueReplyBuffersIfNeeded(c);
1620
1621 o = listNodeValue(listFirst(c->reply));
1622 objlen = sdslen(o->ptr);
1623
1624 if (objlen == 0) {
1625 listDelNode(c->reply,listFirst(c->reply));
1626 continue;
1627 }
1628
1629 if (c->flags & REDIS_MASTER) {
1630 /* Don't reply to a master */
1631 nwritten = objlen - c->sentlen;
1632 } else {
1633 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1634 if (nwritten <= 0) break;
1635 }
1636 c->sentlen += nwritten;
1637 totwritten += nwritten;
1638 /* If we fully sent the object on head go to the next one */
1639 if (c->sentlen == objlen) {
1640 listDelNode(c->reply,listFirst(c->reply));
1641 c->sentlen = 0;
1642 }
1643 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1644 * bytes, in a single threaded server it's a good idea to serve
1645 * other clients as well, even if a very large request comes from
1646 * super fast link that is always able to accept data (in real world
1647 * scenario think about 'KEYS *' against the loopback interfae) */
1648 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1649 }
1650 if (nwritten == -1) {
1651 if (errno == EAGAIN) {
1652 nwritten = 0;
1653 } else {
1654 redisLog(REDIS_DEBUG,
1655 "Error writing to client: %s", strerror(errno));
1656 freeClient(c);
1657 return;
1658 }
1659 }
1660 if (totwritten > 0) c->lastinteraction = time(NULL);
1661 if (listLength(c->reply) == 0) {
1662 c->sentlen = 0;
1663 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1664 }
1665 }
1666
1667 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1668 {
1669 redisClient *c = privdata;
1670 int nwritten = 0, totwritten = 0, objlen, willwrite;
1671 robj *o;
1672 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1673 int offset, ion = 0;
1674 REDIS_NOTUSED(el);
1675 REDIS_NOTUSED(mask);
1676
1677 listNode *node;
1678 while (listLength(c->reply)) {
1679 offset = c->sentlen;
1680 ion = 0;
1681 willwrite = 0;
1682
1683 /* fill-in the iov[] array */
1684 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1685 o = listNodeValue(node);
1686 objlen = sdslen(o->ptr);
1687
1688 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1689 break;
1690
1691 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1692 break; /* no more iovecs */
1693
1694 iov[ion].iov_base = ((char*)o->ptr) + offset;
1695 iov[ion].iov_len = objlen - offset;
1696 willwrite += objlen - offset;
1697 offset = 0; /* just for the first item */
1698 ion++;
1699 }
1700
1701 if(willwrite == 0)
1702 break;
1703
1704 /* write all collected blocks at once */
1705 if((nwritten = writev(fd, iov, ion)) < 0) {
1706 if (errno != EAGAIN) {
1707 redisLog(REDIS_DEBUG,
1708 "Error writing to client: %s", strerror(errno));
1709 freeClient(c);
1710 return;
1711 }
1712 break;
1713 }
1714
1715 totwritten += nwritten;
1716 offset = c->sentlen;
1717
1718 /* remove written robjs from c->reply */
1719 while (nwritten && listLength(c->reply)) {
1720 o = listNodeValue(listFirst(c->reply));
1721 objlen = sdslen(o->ptr);
1722
1723 if(nwritten >= objlen - offset) {
1724 listDelNode(c->reply, listFirst(c->reply));
1725 nwritten -= objlen - offset;
1726 c->sentlen = 0;
1727 } else {
1728 /* partial write */
1729 c->sentlen += nwritten;
1730 break;
1731 }
1732 offset = 0;
1733 }
1734 }
1735
1736 if (totwritten > 0)
1737 c->lastinteraction = time(NULL);
1738
1739 if (listLength(c->reply) == 0) {
1740 c->sentlen = 0;
1741 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1742 }
1743 }
1744
1745 static struct redisCommand *lookupCommand(char *name) {
1746 int j = 0;
1747 while(cmdTable[j].name != NULL) {
1748 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1749 j++;
1750 }
1751 return NULL;
1752 }
1753
1754 /* resetClient prepare the client to process the next command */
1755 static void resetClient(redisClient *c) {
1756 freeClientArgv(c);
1757 c->bulklen = -1;
1758 c->multibulk = 0;
1759 }
1760
1761 /* Call() is the core of Redis execution of a command */
1762 static void call(redisClient *c, struct redisCommand *cmd) {
1763 long long dirty;
1764
1765 dirty = server.dirty;
1766 cmd->proc(c);
1767 if (server.appendonly && server.dirty-dirty)
1768 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1769 if (server.dirty-dirty && listLength(server.slaves))
1770 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1771 if (listLength(server.monitors))
1772 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1773 server.stat_numcommands++;
1774 }
1775
1776 /* If this function gets called we already read a whole
1777 * command, argments are in the client argv/argc fields.
1778 * processCommand() execute the command or prepare the
1779 * server for a bulk read from the client.
1780 *
1781 * If 1 is returned the client is still alive and valid and
1782 * and other operations can be performed by the caller. Otherwise
1783 * if 0 is returned the client was destroied (i.e. after QUIT). */
1784 static int processCommand(redisClient *c) {
1785 struct redisCommand *cmd;
1786
1787 /* Free some memory if needed (maxmemory setting) */
1788 if (server.maxmemory) freeMemoryIfNeeded();
1789
1790 /* Handle the multi bulk command type. This is an alternative protocol
1791 * supported by Redis in order to receive commands that are composed of
1792 * multiple binary-safe "bulk" arguments. The latency of processing is
1793 * a bit higher but this allows things like multi-sets, so if this
1794 * protocol is used only for MSET and similar commands this is a big win. */
1795 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1796 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1797 if (c->multibulk <= 0) {
1798 resetClient(c);
1799 return 1;
1800 } else {
1801 decrRefCount(c->argv[c->argc-1]);
1802 c->argc--;
1803 return 1;
1804 }
1805 } else if (c->multibulk) {
1806 if (c->bulklen == -1) {
1807 if (((char*)c->argv[0]->ptr)[0] != '$') {
1808 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1809 resetClient(c);
1810 return 1;
1811 } else {
1812 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1813 decrRefCount(c->argv[0]);
1814 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1815 c->argc--;
1816 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1817 resetClient(c);
1818 return 1;
1819 }
1820 c->argc--;
1821 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1822 return 1;
1823 }
1824 } else {
1825 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1826 c->mbargv[c->mbargc] = c->argv[0];
1827 c->mbargc++;
1828 c->argc--;
1829 c->multibulk--;
1830 if (c->multibulk == 0) {
1831 robj **auxargv;
1832 int auxargc;
1833
1834 /* Here we need to swap the multi-bulk argc/argv with the
1835 * normal argc/argv of the client structure. */
1836 auxargv = c->argv;
1837 c->argv = c->mbargv;
1838 c->mbargv = auxargv;
1839
1840 auxargc = c->argc;
1841 c->argc = c->mbargc;
1842 c->mbargc = auxargc;
1843
1844 /* We need to set bulklen to something different than -1
1845 * in order for the code below to process the command without
1846 * to try to read the last argument of a bulk command as
1847 * a special argument. */
1848 c->bulklen = 0;
1849 /* continue below and process the command */
1850 } else {
1851 c->bulklen = -1;
1852 return 1;
1853 }
1854 }
1855 }
1856 /* -- end of multi bulk commands processing -- */
1857
1858 /* The QUIT command is handled as a special case. Normal command
1859 * procs are unable to close the client connection safely */
1860 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1861 freeClient(c);
1862 return 0;
1863 }
1864 cmd = lookupCommand(c->argv[0]->ptr);
1865 if (!cmd) {
1866 addReplySds(c,
1867 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1868 (char*)c->argv[0]->ptr));
1869 resetClient(c);
1870 return 1;
1871 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1872 (c->argc < -cmd->arity)) {
1873 addReplySds(c,
1874 sdscatprintf(sdsempty(),
1875 "-ERR wrong number of arguments for '%s' command\r\n",
1876 cmd->name));
1877 resetClient(c);
1878 return 1;
1879 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1880 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1881 resetClient(c);
1882 return 1;
1883 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1884 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1885
1886 decrRefCount(c->argv[c->argc-1]);
1887 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1888 c->argc--;
1889 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1890 resetClient(c);
1891 return 1;
1892 }
1893 c->argc--;
1894 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1895 /* It is possible that the bulk read is already in the
1896 * buffer. Check this condition and handle it accordingly.
1897 * This is just a fast path, alternative to call processInputBuffer().
1898 * It's a good idea since the code is small and this condition
1899 * happens most of the times. */
1900 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1901 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1902 c->argc++;
1903 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1904 } else {
1905 return 1;
1906 }
1907 }
1908 /* Let's try to share objects on the command arguments vector */
1909 if (server.shareobjects) {
1910 int j;
1911 for(j = 1; j < c->argc; j++)
1912 c->argv[j] = tryObjectSharing(c->argv[j]);
1913 }
1914 /* Let's try to encode the bulk object to save space. */
1915 if (cmd->flags & REDIS_CMD_BULK)
1916 tryObjectEncoding(c->argv[c->argc-1]);
1917
1918 /* Check if the user is authenticated */
1919 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1920 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1921 resetClient(c);
1922 return 1;
1923 }
1924
1925 /* Exec the command */
1926 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1927 queueMultiCommand(c,cmd);
1928 addReply(c,shared.queued);
1929 } else {
1930 call(c,cmd);
1931 }
1932
1933 /* Prepare the client for the next command */
1934 if (c->flags & REDIS_CLOSE) {
1935 freeClient(c);
1936 return 0;
1937 }
1938 resetClient(c);
1939 return 1;
1940 }
1941
1942 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1943 listNode *ln;
1944 int outc = 0, j;
1945 robj **outv;
1946 /* (args*2)+1 is enough room for args, spaces, newlines */
1947 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1948
1949 if (argc <= REDIS_STATIC_ARGS) {
1950 outv = static_outv;
1951 } else {
1952 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1953 }
1954
1955 for (j = 0; j < argc; j++) {
1956 if (j != 0) outv[outc++] = shared.space;
1957 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1958 robj *lenobj;
1959
1960 lenobj = createObject(REDIS_STRING,
1961 sdscatprintf(sdsempty(),"%lu\r\n",
1962 (unsigned long) stringObjectLen(argv[j])));
1963 lenobj->refcount = 0;
1964 outv[outc++] = lenobj;
1965 }
1966 outv[outc++] = argv[j];
1967 }
1968 outv[outc++] = shared.crlf;
1969
1970 /* Increment all the refcounts at start and decrement at end in order to
1971 * be sure to free objects if there is no slave in a replication state
1972 * able to be feed with commands */
1973 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1974 listRewind(slaves);
1975 while((ln = listYield(slaves))) {
1976 redisClient *slave = ln->value;
1977
1978 /* Don't feed slaves that are still waiting for BGSAVE to start */
1979 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1980
1981 /* Feed all the other slaves, MONITORs and so on */
1982 if (slave->slaveseldb != dictid) {
1983 robj *selectcmd;
1984
1985 switch(dictid) {
1986 case 0: selectcmd = shared.select0; break;
1987 case 1: selectcmd = shared.select1; break;
1988 case 2: selectcmd = shared.select2; break;
1989 case 3: selectcmd = shared.select3; break;
1990 case 4: selectcmd = shared.select4; break;
1991 case 5: selectcmd = shared.select5; break;
1992 case 6: selectcmd = shared.select6; break;
1993 case 7: selectcmd = shared.select7; break;
1994 case 8: selectcmd = shared.select8; break;
1995 case 9: selectcmd = shared.select9; break;
1996 default:
1997 selectcmd = createObject(REDIS_STRING,
1998 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1999 selectcmd->refcount = 0;
2000 break;
2001 }
2002 addReply(slave,selectcmd);
2003 slave->slaveseldb = dictid;
2004 }
2005 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
2006 }
2007 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
2008 if (outv != static_outv) zfree(outv);
2009 }
2010
2011 static void processInputBuffer(redisClient *c) {
2012 again:
2013 /* Before to process the input buffer, make sure the client is not
2014 * waitig for a blocking operation such as BLPOP. Note that the first
2015 * iteration the client is never blocked, otherwise the processInputBuffer
2016 * would not be called at all, but after the execution of the first commands
2017 * in the input buffer the client may be blocked, and the "goto again"
2018 * will try to reiterate. The following line will make it return asap. */
2019 if (c->flags & REDIS_BLOCKED) return;
2020 if (c->bulklen == -1) {
2021 /* Read the first line of the query */
2022 char *p = strchr(c->querybuf,'\n');
2023 size_t querylen;
2024
2025 if (p) {
2026 sds query, *argv;
2027 int argc, j;
2028
2029 query = c->querybuf;
2030 c->querybuf = sdsempty();
2031 querylen = 1+(p-(query));
2032 if (sdslen(query) > querylen) {
2033 /* leave data after the first line of the query in the buffer */
2034 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2035 }
2036 *p = '\0'; /* remove "\n" */
2037 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2038 sdsupdatelen(query);
2039
2040 /* Now we can split the query in arguments */
2041 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2042 sdsfree(query);
2043
2044 if (c->argv) zfree(c->argv);
2045 c->argv = zmalloc(sizeof(robj*)*argc);
2046
2047 for (j = 0; j < argc; j++) {
2048 if (sdslen(argv[j])) {
2049 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2050 c->argc++;
2051 } else {
2052 sdsfree(argv[j]);
2053 }
2054 }
2055 zfree(argv);
2056 if (c->argc) {
2057 /* Execute the command. If the client is still valid
2058 * after processCommand() return and there is something
2059 * on the query buffer try to process the next command. */
2060 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2061 } else {
2062 /* Nothing to process, argc == 0. Just process the query
2063 * buffer if it's not empty or return to the caller */
2064 if (sdslen(c->querybuf)) goto again;
2065 }
2066 return;
2067 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2068 redisLog(REDIS_DEBUG, "Client protocol error");
2069 freeClient(c);
2070 return;
2071 }
2072 } else {
2073 /* Bulk read handling. Note that if we are at this point
2074 the client already sent a command terminated with a newline,
2075 we are reading the bulk data that is actually the last
2076 argument of the command. */
2077 int qbl = sdslen(c->querybuf);
2078
2079 if (c->bulklen <= qbl) {
2080 /* Copy everything but the final CRLF as final argument */
2081 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2082 c->argc++;
2083 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2084 /* Process the command. If the client is still valid after
2085 * the processing and there is more data in the buffer
2086 * try to parse it. */
2087 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2088 return;
2089 }
2090 }
2091 }
2092
2093 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2094 redisClient *c = (redisClient*) privdata;
2095 char buf[REDIS_IOBUF_LEN];
2096 int nread;
2097 REDIS_NOTUSED(el);
2098 REDIS_NOTUSED(mask);
2099
2100 nread = read(fd, buf, REDIS_IOBUF_LEN);
2101 if (nread == -1) {
2102 if (errno == EAGAIN) {
2103 nread = 0;
2104 } else {
2105 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2106 freeClient(c);
2107 return;
2108 }
2109 } else if (nread == 0) {
2110 redisLog(REDIS_DEBUG, "Client closed connection");
2111 freeClient(c);
2112 return;
2113 }
2114 if (nread) {
2115 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2116 c->lastinteraction = time(NULL);
2117 } else {
2118 return;
2119 }
2120 processInputBuffer(c);
2121 }
2122
2123 static int selectDb(redisClient *c, int id) {
2124 if (id < 0 || id >= server.dbnum)
2125 return REDIS_ERR;
2126 c->db = &server.db[id];
2127 return REDIS_OK;
2128 }
2129
2130 static void *dupClientReplyValue(void *o) {
2131 incrRefCount((robj*)o);
2132 return 0;
2133 }
2134
2135 static redisClient *createClient(int fd) {
2136 redisClient *c = zmalloc(sizeof(*c));
2137
2138 anetNonBlock(NULL,fd);
2139 anetTcpNoDelay(NULL,fd);
2140 if (!c) return NULL;
2141 selectDb(c,0);
2142 c->fd = fd;
2143 c->querybuf = sdsempty();
2144 c->argc = 0;
2145 c->argv = NULL;
2146 c->bulklen = -1;
2147 c->multibulk = 0;
2148 c->mbargc = 0;
2149 c->mbargv = NULL;
2150 c->sentlen = 0;
2151 c->flags = 0;
2152 c->lastinteraction = time(NULL);
2153 c->authenticated = 0;
2154 c->replstate = REDIS_REPL_NONE;
2155 c->reply = listCreate();
2156 c->blockingkeys = NULL;
2157 c->blockingkeysnum = 0;
2158 listSetFreeMethod(c->reply,decrRefCount);
2159 listSetDupMethod(c->reply,dupClientReplyValue);
2160 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2161 readQueryFromClient, c) == AE_ERR) {
2162 freeClient(c);
2163 return NULL;
2164 }
2165 listAddNodeTail(server.clients,c);
2166 initClientMultiState(c);
2167 return c;
2168 }
2169
2170 static void addReply(redisClient *c, robj *obj) {
2171 if (listLength(c->reply) == 0 &&
2172 (c->replstate == REDIS_REPL_NONE ||
2173 c->replstate == REDIS_REPL_ONLINE) &&
2174 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2175 sendReplyToClient, c) == AE_ERR) return;
2176 listAddNodeTail(c->reply,getDecodedObject(obj));
2177 }
2178
2179 static void addReplySds(redisClient *c, sds s) {
2180 robj *o = createObject(REDIS_STRING,s);
2181 addReply(c,o);
2182 decrRefCount(o);
2183 }
2184
2185 static void addReplyDouble(redisClient *c, double d) {
2186 char buf[128];
2187
2188 snprintf(buf,sizeof(buf),"%.17g",d);
2189 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2190 (unsigned long) strlen(buf),buf));
2191 }
2192
2193 static void addReplyBulkLen(redisClient *c, robj *obj) {
2194 size_t len;
2195
2196 if (obj->encoding == REDIS_ENCODING_RAW) {
2197 len = sdslen(obj->ptr);
2198 } else {
2199 long n = (long)obj->ptr;
2200
2201 /* Compute how many bytes will take this integer as a radix 10 string */
2202 len = 1;
2203 if (n < 0) {
2204 len++;
2205 n = -n;
2206 }
2207 while((n = n/10) != 0) {
2208 len++;
2209 }
2210 }
2211 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2212 }
2213
2214 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2215 int cport, cfd;
2216 char cip[128];
2217 redisClient *c;
2218 REDIS_NOTUSED(el);
2219 REDIS_NOTUSED(mask);
2220 REDIS_NOTUSED(privdata);
2221
2222 cfd = anetAccept(server.neterr, fd, cip, &cport);
2223 if (cfd == AE_ERR) {
2224 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2225 return;
2226 }
2227 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2228 if ((c = createClient(cfd)) == NULL) {
2229 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2230 close(cfd); /* May be already closed, just ingore errors */
2231 return;
2232 }
2233 /* If maxclient directive is set and this is one client more... close the
2234 * connection. Note that we create the client instead to check before
2235 * for this condition, since now the socket is already set in nonblocking
2236 * mode and we can send an error for free using the Kernel I/O */
2237 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2238 char *err = "-ERR max number of clients reached\r\n";
2239
2240 /* That's a best effort error message, don't check write errors */
2241 if (write(c->fd,err,strlen(err)) == -1) {
2242 /* Nothing to do, Just to avoid the warning... */
2243 }
2244 freeClient(c);
2245 return;
2246 }
2247 server.stat_numconnections++;
2248 }
2249
2250 /* ======================= Redis objects implementation ===================== */
2251
2252 static robj *createObject(int type, void *ptr) {
2253 robj *o;
2254
2255 if (listLength(server.objfreelist)) {
2256 listNode *head = listFirst(server.objfreelist);
2257 o = listNodeValue(head);
2258 listDelNode(server.objfreelist,head);
2259 } else {
2260 if (server.vm_enabled) {
2261 o = zmalloc(sizeof(*o));
2262 } else {
2263 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2264 }
2265 }
2266 o->type = type;
2267 o->encoding = REDIS_ENCODING_RAW;
2268 o->ptr = ptr;
2269 o->refcount = 1;
2270 if (server.vm_enabled) {
2271 o->vm.atime = server.unixtime;
2272 o->storage = REDIS_VM_MEMORY;
2273 }
2274 return o;
2275 }
2276
2277 static robj *createStringObject(char *ptr, size_t len) {
2278 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2279 }
2280
2281 static robj *createListObject(void) {
2282 list *l = listCreate();
2283
2284 listSetFreeMethod(l,decrRefCount);
2285 return createObject(REDIS_LIST,l);
2286 }
2287
2288 static robj *createSetObject(void) {
2289 dict *d = dictCreate(&setDictType,NULL);
2290 return createObject(REDIS_SET,d);
2291 }
2292
2293 static robj *createZsetObject(void) {
2294 zset *zs = zmalloc(sizeof(*zs));
2295
2296 zs->dict = dictCreate(&zsetDictType,NULL);
2297 zs->zsl = zslCreate();
2298 return createObject(REDIS_ZSET,zs);
2299 }
2300
2301 static void freeStringObject(robj *o) {
2302 if (o->encoding == REDIS_ENCODING_RAW) {
2303 sdsfree(o->ptr);
2304 }
2305 }
2306
2307 static void freeListObject(robj *o) {
2308 listRelease((list*) o->ptr);
2309 }
2310
2311 static void freeSetObject(robj *o) {
2312 dictRelease((dict*) o->ptr);
2313 }
2314
2315 static void freeZsetObject(robj *o) {
2316 zset *zs = o->ptr;
2317
2318 dictRelease(zs->dict);
2319 zslFree(zs->zsl);
2320 zfree(zs);
2321 }
2322
2323 static void freeHashObject(robj *o) {
2324 dictRelease((dict*) o->ptr);
2325 }
2326
2327 static void incrRefCount(robj *o) {
2328 assert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
2329 o->refcount++;
2330 }
2331
2332 static void decrRefCount(void *obj) {
2333 robj *o = obj;
2334
2335 /* REDIS_VM_SWAPPED */
2336 if (server.vm_enabled && o->storage == REDIS_VM_SWAPPED) {
2337 assert(o->refcount == 1);
2338 assert(o->type == REDIS_STRING);
2339 freeStringObject(o);
2340 vmMarkPagesFree(o->vm.page,o->vm.usedpages);
2341 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2342 !listAddNodeHead(server.objfreelist,o))
2343 zfree(o);
2344 return;
2345 }
2346 /* REDIS_VM_MEMORY */
2347 if (--(o->refcount) == 0) {
2348 switch(o->type) {
2349 case REDIS_STRING: freeStringObject(o); break;
2350 case REDIS_LIST: freeListObject(o); break;
2351 case REDIS_SET: freeSetObject(o); break;
2352 case REDIS_ZSET: freeZsetObject(o); break;
2353 case REDIS_HASH: freeHashObject(o); break;
2354 default: redisAssert(0 != 0); break;
2355 }
2356 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2357 !listAddNodeHead(server.objfreelist,o))
2358 zfree(o);
2359 }
2360 }
2361
2362 static robj *lookupKey(redisDb *db, robj *key) {
2363 dictEntry *de = dictFind(db->dict,key);
2364 if (de) {
2365 robj *key = dictGetEntryKey(de);
2366 robj *val = dictGetEntryVal(de);
2367
2368 if (server.vm_enabled) {
2369 if (key->storage == REDIS_VM_MEMORY) {
2370 /* Update the access time of the key for the aging algorithm. */
2371 key->vm.atime = server.unixtime;
2372 } else {
2373 /* Our value was swapped on disk. Bring it at home. */
2374 assert(val == NULL);
2375 val = vmLoadObject(key);
2376 dictGetEntryVal(de) = val;
2377 }
2378 }
2379 return val;
2380 } else {
2381 return NULL;
2382 }
2383 }
2384
2385 static robj *lookupKeyRead(redisDb *db, robj *key) {
2386 expireIfNeeded(db,key);
2387 return lookupKey(db,key);
2388 }
2389
2390 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2391 deleteIfVolatile(db,key);
2392 return lookupKey(db,key);
2393 }
2394
2395 static int deleteKey(redisDb *db, robj *key) {
2396 int retval;
2397
2398 /* We need to protect key from destruction: after the first dictDelete()
2399 * it may happen that 'key' is no longer valid if we don't increment
2400 * it's count. This may happen when we get the object reference directly
2401 * from the hash table with dictRandomKey() or dict iterators */
2402 incrRefCount(key);
2403 if (dictSize(db->expires)) dictDelete(db->expires,key);
2404 retval = dictDelete(db->dict,key);
2405 decrRefCount(key);
2406
2407 return retval == DICT_OK;
2408 }
2409
2410 /* Try to share an object against the shared objects pool */
2411 static robj *tryObjectSharing(robj *o) {
2412 struct dictEntry *de;
2413 unsigned long c;
2414
2415 if (o == NULL || server.shareobjects == 0) return o;
2416
2417 redisAssert(o->type == REDIS_STRING);
2418 de = dictFind(server.sharingpool,o);
2419 if (de) {
2420 robj *shared = dictGetEntryKey(de);
2421
2422 c = ((unsigned long) dictGetEntryVal(de))+1;
2423 dictGetEntryVal(de) = (void*) c;
2424 incrRefCount(shared);
2425 decrRefCount(o);
2426 return shared;
2427 } else {
2428 /* Here we are using a stream algorihtm: Every time an object is
2429 * shared we increment its count, everytime there is a miss we
2430 * recrement the counter of a random object. If this object reaches
2431 * zero we remove the object and put the current object instead. */
2432 if (dictSize(server.sharingpool) >=
2433 server.sharingpoolsize) {
2434 de = dictGetRandomKey(server.sharingpool);
2435 redisAssert(de != NULL);
2436 c = ((unsigned long) dictGetEntryVal(de))-1;
2437 dictGetEntryVal(de) = (void*) c;
2438 if (c == 0) {
2439 dictDelete(server.sharingpool,de->key);
2440 }
2441 } else {
2442 c = 0; /* If the pool is empty we want to add this object */
2443 }
2444 if (c == 0) {
2445 int retval;
2446
2447 retval = dictAdd(server.sharingpool,o,(void*)1);
2448 redisAssert(retval == DICT_OK);
2449 incrRefCount(o);
2450 }
2451 return o;
2452 }
2453 }
2454
2455 /* Check if the nul-terminated string 's' can be represented by a long
2456 * (that is, is a number that fits into long without any other space or
2457 * character before or after the digits).
2458 *
2459 * If so, the function returns REDIS_OK and *longval is set to the value
2460 * of the number. Otherwise REDIS_ERR is returned */
2461 static int isStringRepresentableAsLong(sds s, long *longval) {
2462 char buf[32], *endptr;
2463 long value;
2464 int slen;
2465
2466 value = strtol(s, &endptr, 10);
2467 if (endptr[0] != '\0') return REDIS_ERR;
2468 slen = snprintf(buf,32,"%ld",value);
2469
2470 /* If the number converted back into a string is not identical
2471 * then it's not possible to encode the string as integer */
2472 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2473 if (longval) *longval = value;
2474 return REDIS_OK;
2475 }
2476
2477 /* Try to encode a string object in order to save space */
2478 static int tryObjectEncoding(robj *o) {
2479 long value;
2480 sds s = o->ptr;
2481
2482 if (o->encoding != REDIS_ENCODING_RAW)
2483 return REDIS_ERR; /* Already encoded */
2484
2485 /* It's not save to encode shared objects: shared objects can be shared
2486 * everywhere in the "object space" of Redis. Encoded objects can only
2487 * appear as "values" (and not, for instance, as keys) */
2488 if (o->refcount > 1) return REDIS_ERR;
2489
2490 /* Currently we try to encode only strings */
2491 redisAssert(o->type == REDIS_STRING);
2492
2493 /* Check if we can represent this string as a long integer */
2494 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2495
2496 /* Ok, this object can be encoded */
2497 o->encoding = REDIS_ENCODING_INT;
2498 sdsfree(o->ptr);
2499 o->ptr = (void*) value;
2500 return REDIS_OK;
2501 }
2502
2503 /* Get a decoded version of an encoded object (returned as a new object).
2504 * If the object is already raw-encoded just increment the ref count. */
2505 static robj *getDecodedObject(robj *o) {
2506 robj *dec;
2507
2508 if (o->encoding == REDIS_ENCODING_RAW) {
2509 incrRefCount(o);
2510 return o;
2511 }
2512 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2513 char buf[32];
2514
2515 snprintf(buf,32,"%ld",(long)o->ptr);
2516 dec = createStringObject(buf,strlen(buf));
2517 return dec;
2518 } else {
2519 redisAssert(1 != 1);
2520 }
2521 }
2522
2523 /* Compare two string objects via strcmp() or alike.
2524 * Note that the objects may be integer-encoded. In such a case we
2525 * use snprintf() to get a string representation of the numbers on the stack
2526 * and compare the strings, it's much faster than calling getDecodedObject().
2527 *
2528 * Important note: if objects are not integer encoded, but binary-safe strings,
2529 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2530 * binary safe. */
2531 static int compareStringObjects(robj *a, robj *b) {
2532 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2533 char bufa[128], bufb[128], *astr, *bstr;
2534 int bothsds = 1;
2535
2536 if (a == b) return 0;
2537 if (a->encoding != REDIS_ENCODING_RAW) {
2538 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2539 astr = bufa;
2540 bothsds = 0;
2541 } else {
2542 astr = a->ptr;
2543 }
2544 if (b->encoding != REDIS_ENCODING_RAW) {
2545 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2546 bstr = bufb;
2547 bothsds = 0;
2548 } else {
2549 bstr = b->ptr;
2550 }
2551 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2552 }
2553
2554 static size_t stringObjectLen(robj *o) {
2555 redisAssert(o->type == REDIS_STRING);
2556 if (o->encoding == REDIS_ENCODING_RAW) {
2557 return sdslen(o->ptr);
2558 } else {
2559 char buf[32];
2560
2561 return snprintf(buf,32,"%ld",(long)o->ptr);
2562 }
2563 }
2564
2565 /*============================ RDB saving/loading =========================== */
2566
2567 static int rdbSaveType(FILE *fp, unsigned char type) {
2568 if (fwrite(&type,1,1,fp) == 0) return -1;
2569 return 0;
2570 }
2571
2572 static int rdbSaveTime(FILE *fp, time_t t) {
2573 int32_t t32 = (int32_t) t;
2574 if (fwrite(&t32,4,1,fp) == 0) return -1;
2575 return 0;
2576 }
2577
2578 /* check rdbLoadLen() comments for more info */
2579 static int rdbSaveLen(FILE *fp, uint32_t len) {
2580 unsigned char buf[2];
2581
2582 if (len < (1<<6)) {
2583 /* Save a 6 bit len */
2584 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2585 if (fwrite(buf,1,1,fp) == 0) return -1;
2586 } else if (len < (1<<14)) {
2587 /* Save a 14 bit len */
2588 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2589 buf[1] = len&0xFF;
2590 if (fwrite(buf,2,1,fp) == 0) return -1;
2591 } else {
2592 /* Save a 32 bit len */
2593 buf[0] = (REDIS_RDB_32BITLEN<<6);
2594 if (fwrite(buf,1,1,fp) == 0) return -1;
2595 len = htonl(len);
2596 if (fwrite(&len,4,1,fp) == 0) return -1;
2597 }
2598 return 0;
2599 }
2600
2601 /* String objects in the form "2391" "-100" without any space and with a
2602 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2603 * encoded as integers to save space */
2604 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2605 long long value;
2606 char *endptr, buf[32];
2607
2608 /* Check if it's possible to encode this value as a number */
2609 value = strtoll(s, &endptr, 10);
2610 if (endptr[0] != '\0') return 0;
2611 snprintf(buf,32,"%lld",value);
2612
2613 /* If the number converted back into a string is not identical
2614 * then it's not possible to encode the string as integer */
2615 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2616
2617 /* Finally check if it fits in our ranges */
2618 if (value >= -(1<<7) && value <= (1<<7)-1) {
2619 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2620 enc[1] = value&0xFF;
2621 return 2;
2622 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2623 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2624 enc[1] = value&0xFF;
2625 enc[2] = (value>>8)&0xFF;
2626 return 3;
2627 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2628 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2629 enc[1] = value&0xFF;
2630 enc[2] = (value>>8)&0xFF;
2631 enc[3] = (value>>16)&0xFF;
2632 enc[4] = (value>>24)&0xFF;
2633 return 5;
2634 } else {
2635 return 0;
2636 }
2637 }
2638
2639 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2640 unsigned int comprlen, outlen;
2641 unsigned char byte;
2642 void *out;
2643
2644 /* We require at least four bytes compression for this to be worth it */
2645 outlen = sdslen(obj->ptr)-4;
2646 if (outlen <= 0) return 0;
2647 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2648 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2649 if (comprlen == 0) {
2650 zfree(out);
2651 return 0;
2652 }
2653 /* Data compressed! Let's save it on disk */
2654 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2655 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2656 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2657 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2658 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2659 zfree(out);
2660 return comprlen;
2661
2662 writeerr:
2663 zfree(out);
2664 return -1;
2665 }
2666
2667 /* Save a string objet as [len][data] on disk. If the object is a string
2668 * representation of an integer value we try to safe it in a special form */
2669 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2670 size_t len;
2671 int enclen;
2672
2673 len = sdslen(obj->ptr);
2674
2675 /* Try integer encoding */
2676 if (len <= 11) {
2677 unsigned char buf[5];
2678 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2679 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2680 return 0;
2681 }
2682 }
2683
2684 /* Try LZF compression - under 20 bytes it's unable to compress even
2685 * aaaaaaaaaaaaaaaaaa so skip it */
2686 if (server.rdbcompression && len > 20) {
2687 int retval;
2688
2689 retval = rdbSaveLzfStringObject(fp,obj);
2690 if (retval == -1) return -1;
2691 if (retval > 0) return 0;
2692 /* retval == 0 means data can't be compressed, save the old way */
2693 }
2694
2695 /* Store verbatim */
2696 if (rdbSaveLen(fp,len) == -1) return -1;
2697 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2698 return 0;
2699 }
2700
2701 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2702 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2703 int retval;
2704
2705 obj = getDecodedObject(obj);
2706 retval = rdbSaveStringObjectRaw(fp,obj);
2707 decrRefCount(obj);
2708 return retval;
2709 }
2710
2711 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2712 * 8 bit integer specifing the length of the representation.
2713 * This 8 bit integer has special values in order to specify the following
2714 * conditions:
2715 * 253: not a number
2716 * 254: + inf
2717 * 255: - inf
2718 */
2719 static int rdbSaveDoubleValue(FILE *fp, double val) {
2720 unsigned char buf[128];
2721 int len;
2722
2723 if (isnan(val)) {
2724 buf[0] = 253;
2725 len = 1;
2726 } else if (!isfinite(val)) {
2727 len = 1;
2728 buf[0] = (val < 0) ? 255 : 254;
2729 } else {
2730 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2731 buf[0] = strlen((char*)buf+1);
2732 len = buf[0]+1;
2733 }
2734 if (fwrite(buf,len,1,fp) == 0) return -1;
2735 return 0;
2736 }
2737
2738 /* Save a Redis object. */
2739 static int rdbSaveObject(FILE *fp, robj *o) {
2740 if (o->type == REDIS_STRING) {
2741 /* Save a string value */
2742 if (rdbSaveStringObject(fp,o) == -1) return -1;
2743 } else if (o->type == REDIS_LIST) {
2744 /* Save a list value */
2745 list *list = o->ptr;
2746 listNode *ln;
2747
2748 listRewind(list);
2749 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2750 while((ln = listYield(list))) {
2751 robj *eleobj = listNodeValue(ln);
2752
2753 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2754 }
2755 } else if (o->type == REDIS_SET) {
2756 /* Save a set value */
2757 dict *set = o->ptr;
2758 dictIterator *di = dictGetIterator(set);
2759 dictEntry *de;
2760
2761 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2762 while((de = dictNext(di)) != NULL) {
2763 robj *eleobj = dictGetEntryKey(de);
2764
2765 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2766 }
2767 dictReleaseIterator(di);
2768 } else if (o->type == REDIS_ZSET) {
2769 /* Save a set value */
2770 zset *zs = o->ptr;
2771 dictIterator *di = dictGetIterator(zs->dict);
2772 dictEntry *de;
2773
2774 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2775 while((de = dictNext(di)) != NULL) {
2776 robj *eleobj = dictGetEntryKey(de);
2777 double *score = dictGetEntryVal(de);
2778
2779 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2780 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2781 }
2782 dictReleaseIterator(di);
2783 } else {
2784 redisAssert(0 != 0);
2785 }
2786 return 0;
2787 }
2788
2789 /* Return the length the object will have on disk if saved with
2790 * the rdbSaveObject() function. Currently we use a trick to get
2791 * this length with very little changes to the code. In the future
2792 * we could switch to a faster solution. */
2793 static off_t rdbSavedObjectLen(robj *o) {
2794 static FILE *fp = NULL;
2795
2796 if (fp == NULL) fp = fopen("/dev/null","w");
2797 assert(fp != NULL);
2798
2799 rewind(fp);
2800 assert(rdbSaveObject(fp,o) != 1);
2801 return ftello(fp);
2802 }
2803
2804 /* Return the number of pages required to save this object in the swap file */
2805 static off_t rdbSavedObjectPages(robj *o) {
2806 off_t bytes = rdbSavedObjectLen(o);
2807
2808 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2809 }
2810
2811 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2812 static int rdbSave(char *filename) {
2813 dictIterator *di = NULL;
2814 dictEntry *de;
2815 FILE *fp;
2816 char tmpfile[256];
2817 int j;
2818 time_t now = time(NULL);
2819
2820 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2821 fp = fopen(tmpfile,"w");
2822 if (!fp) {
2823 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2824 return REDIS_ERR;
2825 }
2826 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2827 for (j = 0; j < server.dbnum; j++) {
2828 redisDb *db = server.db+j;
2829 dict *d = db->dict;
2830 if (dictSize(d) == 0) continue;
2831 di = dictGetIterator(d);
2832 if (!di) {
2833 fclose(fp);
2834 return REDIS_ERR;
2835 }
2836
2837 /* Write the SELECT DB opcode */
2838 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2839 if (rdbSaveLen(fp,j) == -1) goto werr;
2840
2841 /* Iterate this DB writing every entry */
2842 while((de = dictNext(di)) != NULL) {
2843 robj *key = dictGetEntryKey(de);
2844 robj *o = dictGetEntryVal(de);
2845 time_t expiretime = getExpire(db,key);
2846
2847 /* Save the expire time */
2848 if (expiretime != -1) {
2849 /* If this key is already expired skip it */
2850 if (expiretime < now) continue;
2851 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2852 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2853 }
2854 /* Save the key and associated value */
2855 if (rdbSaveType(fp,o->type) == -1) goto werr;
2856 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2857 /* Save the actual value */
2858 if (rdbSaveObject(fp,o) == -1) goto werr;
2859 }
2860 dictReleaseIterator(di);
2861 }
2862 /* EOF opcode */
2863 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2864
2865 /* Make sure data will not remain on the OS's output buffers */
2866 fflush(fp);
2867 fsync(fileno(fp));
2868 fclose(fp);
2869
2870 /* Use RENAME to make sure the DB file is changed atomically only
2871 * if the generate DB file is ok. */
2872 if (rename(tmpfile,filename) == -1) {
2873 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2874 unlink(tmpfile);
2875 return REDIS_ERR;
2876 }
2877 redisLog(REDIS_NOTICE,"DB saved on disk");
2878 server.dirty = 0;
2879 server.lastsave = time(NULL);
2880 return REDIS_OK;
2881
2882 werr:
2883 fclose(fp);
2884 unlink(tmpfile);
2885 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2886 if (di) dictReleaseIterator(di);
2887 return REDIS_ERR;
2888 }
2889
2890 static int rdbSaveBackground(char *filename) {
2891 pid_t childpid;
2892
2893 if (server.bgsavechildpid != -1) return REDIS_ERR;
2894 if ((childpid = fork()) == 0) {
2895 /* Child */
2896 close(server.fd);
2897 if (rdbSave(filename) == REDIS_OK) {
2898 exit(0);
2899 } else {
2900 exit(1);
2901 }
2902 } else {
2903 /* Parent */
2904 if (childpid == -1) {
2905 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2906 strerror(errno));
2907 return REDIS_ERR;
2908 }
2909 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2910 server.bgsavechildpid = childpid;
2911 return REDIS_OK;
2912 }
2913 return REDIS_OK; /* unreached */
2914 }
2915
2916 static void rdbRemoveTempFile(pid_t childpid) {
2917 char tmpfile[256];
2918
2919 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2920 unlink(tmpfile);
2921 }
2922
2923 static int rdbLoadType(FILE *fp) {
2924 unsigned char type;
2925 if (fread(&type,1,1,fp) == 0) return -1;
2926 return type;
2927 }
2928
2929 static time_t rdbLoadTime(FILE *fp) {
2930 int32_t t32;
2931 if (fread(&t32,4,1,fp) == 0) return -1;
2932 return (time_t) t32;
2933 }
2934
2935 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2936 * of this file for a description of how this are stored on disk.
2937 *
2938 * isencoded is set to 1 if the readed length is not actually a length but
2939 * an "encoding type", check the above comments for more info */
2940 static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
2941 unsigned char buf[2];
2942 uint32_t len;
2943 int type;
2944
2945 if (isencoded) *isencoded = 0;
2946 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2947 type = (buf[0]&0xC0)>>6;
2948 if (type == REDIS_RDB_6BITLEN) {
2949 /* Read a 6 bit len */
2950 return buf[0]&0x3F;
2951 } else if (type == REDIS_RDB_ENCVAL) {
2952 /* Read a 6 bit len encoding type */
2953 if (isencoded) *isencoded = 1;
2954 return buf[0]&0x3F;
2955 } else if (type == REDIS_RDB_14BITLEN) {
2956 /* Read a 14 bit len */
2957 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2958 return ((buf[0]&0x3F)<<8)|buf[1];
2959 } else {
2960 /* Read a 32 bit len */
2961 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2962 return ntohl(len);
2963 }
2964 }
2965
2966 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2967 unsigned char enc[4];
2968 long long val;
2969
2970 if (enctype == REDIS_RDB_ENC_INT8) {
2971 if (fread(enc,1,1,fp) == 0) return NULL;
2972 val = (signed char)enc[0];
2973 } else if (enctype == REDIS_RDB_ENC_INT16) {
2974 uint16_t v;
2975 if (fread(enc,2,1,fp) == 0) return NULL;
2976 v = enc[0]|(enc[1]<<8);
2977 val = (int16_t)v;
2978 } else if (enctype == REDIS_RDB_ENC_INT32) {
2979 uint32_t v;
2980 if (fread(enc,4,1,fp) == 0) return NULL;
2981 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2982 val = (int32_t)v;
2983 } else {
2984 val = 0; /* anti-warning */
2985 redisAssert(0!=0);
2986 }
2987 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2988 }
2989
2990 static robj *rdbLoadLzfStringObject(FILE*fp) {
2991 unsigned int len, clen;
2992 unsigned char *c = NULL;
2993 sds val = NULL;
2994
2995 if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2996 if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2997 if ((c = zmalloc(clen)) == NULL) goto err;
2998 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2999 if (fread(c,clen,1,fp) == 0) goto err;
3000 if (lzf_decompress(c,clen,val,len) == 0) goto err;
3001 zfree(c);
3002 return createObject(REDIS_STRING,val);
3003 err:
3004 zfree(c);
3005 sdsfree(val);
3006 return NULL;
3007 }
3008
3009 static robj *rdbLoadStringObject(FILE*fp) {
3010 int isencoded;
3011 uint32_t len;
3012 sds val;
3013
3014 len = rdbLoadLen(fp,&isencoded);
3015 if (isencoded) {
3016 switch(len) {
3017 case REDIS_RDB_ENC_INT8:
3018 case REDIS_RDB_ENC_INT16:
3019 case REDIS_RDB_ENC_INT32:
3020 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
3021 case REDIS_RDB_ENC_LZF:
3022 return tryObjectSharing(rdbLoadLzfStringObject(fp));
3023 default:
3024 redisAssert(0!=0);
3025 }
3026 }
3027
3028 if (len == REDIS_RDB_LENERR) return NULL;
3029 val = sdsnewlen(NULL,len);
3030 if (len && fread(val,len,1,fp) == 0) {
3031 sdsfree(val);
3032 return NULL;
3033 }
3034 return tryObjectSharing(createObject(REDIS_STRING,val));
3035 }
3036
3037 /* For information about double serialization check rdbSaveDoubleValue() */
3038 static int rdbLoadDoubleValue(FILE *fp, double *val) {
3039 char buf[128];
3040 unsigned char len;
3041
3042 if (fread(&len,1,1,fp) == 0) return -1;
3043 switch(len) {
3044 case 255: *val = R_NegInf; return 0;
3045 case 254: *val = R_PosInf; return 0;
3046 case 253: *val = R_Nan; return 0;
3047 default:
3048 if (fread(buf,len,1,fp) == 0) return -1;
3049 buf[len] = '\0';
3050 sscanf(buf, "%lg", val);
3051 return 0;
3052 }
3053 }
3054
3055 /* Load a Redis object of the specified type from the specified file.
3056 * On success a newly allocated object is returned, otherwise NULL. */
3057 static robj *rdbLoadObject(int type, FILE *fp) {
3058 robj *o;
3059
3060 if (type == REDIS_STRING) {
3061 /* Read string value */
3062 if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
3063 tryObjectEncoding(o);
3064 } else if (type == REDIS_LIST || type == REDIS_SET) {
3065 /* Read list/set value */
3066 uint32_t listlen;
3067
3068 if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3069 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3070 /* Load every single element of the list/set */
3071 while(listlen--) {
3072 robj *ele;
3073
3074 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3075 tryObjectEncoding(ele);
3076 if (type == REDIS_LIST) {
3077 listAddNodeTail((list*)o->ptr,ele);
3078 } else {
3079 dictAdd((dict*)o->ptr,ele,NULL);
3080 }
3081 }
3082 } else if (type == REDIS_ZSET) {
3083 /* Read list/set value */
3084 uint32_t zsetlen;
3085 zset *zs;
3086
3087 if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3088 o = createZsetObject();
3089 zs = o->ptr;
3090 /* Load every single element of the list/set */
3091 while(zsetlen--) {
3092 robj *ele;
3093 double *score = zmalloc(sizeof(double));
3094
3095 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3096 tryObjectEncoding(ele);
3097 if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
3098 dictAdd(zs->dict,ele,score);
3099 zslInsert(zs->zsl,*score,ele);
3100 incrRefCount(ele); /* added to skiplist */
3101 }
3102 } else {
3103 redisAssert(0 != 0);
3104 }
3105 return o;
3106 }
3107
3108 static int rdbLoad(char *filename) {
3109 FILE *fp;
3110 robj *keyobj = NULL;
3111 uint32_t dbid;
3112 int type, retval, rdbver;
3113 dict *d = server.db[0].dict;
3114 redisDb *db = server.db+0;
3115 char buf[1024];
3116 time_t expiretime = -1, now = time(NULL);
3117
3118 fp = fopen(filename,"r");
3119 if (!fp) return REDIS_ERR;
3120 if (fread(buf,9,1,fp) == 0) goto eoferr;
3121 buf[9] = '\0';
3122 if (memcmp(buf,"REDIS",5) != 0) {
3123 fclose(fp);
3124 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3125 return REDIS_ERR;
3126 }
3127 rdbver = atoi(buf+5);
3128 if (rdbver != 1) {
3129 fclose(fp);
3130 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3131 return REDIS_ERR;
3132 }
3133 while(1) {
3134 robj *o;
3135
3136 /* Read type. */
3137 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3138 if (type == REDIS_EXPIRETIME) {
3139 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3140 /* We read the time so we need to read the object type again */
3141 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3142 }
3143 if (type == REDIS_EOF) break;
3144 /* Handle SELECT DB opcode as a special case */
3145 if (type == REDIS_SELECTDB) {
3146 if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
3147 goto eoferr;
3148 if (dbid >= (unsigned)server.dbnum) {
3149 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3150 exit(1);
3151 }
3152 db = server.db+dbid;
3153 d = db->dict;
3154 continue;
3155 }
3156 /* Read key */
3157 if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
3158 /* Read value */
3159 if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
3160 /* Add the new object in the hash table */
3161 retval = dictAdd(d,keyobj,o);
3162 if (retval == DICT_ERR) {
3163 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3164 exit(1);
3165 }
3166 /* Set the expire time if needed */
3167 if (expiretime != -1) {
3168 setExpire(db,keyobj,expiretime);
3169 /* Delete this key if already expired */
3170 if (expiretime < now) deleteKey(db,keyobj);
3171 expiretime = -1;
3172 }
3173 keyobj = o = NULL;
3174 }
3175 fclose(fp);
3176 return REDIS_OK;
3177
3178 eoferr: /* unexpected end of file is handled here with a fatal exit */
3179 if (keyobj) decrRefCount(keyobj);
3180 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3181 exit(1);
3182 return REDIS_ERR; /* Just to avoid warning */
3183 }
3184
3185 /*================================== Commands =============================== */
3186
3187 static void authCommand(redisClient *c) {
3188 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3189 c->authenticated = 1;
3190 addReply(c,shared.ok);
3191 } else {
3192 c->authenticated = 0;
3193 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3194 }
3195 }
3196
3197 static void pingCommand(redisClient *c) {
3198 addReply(c,shared.pong);
3199 }
3200
3201 static void echoCommand(redisClient *c) {
3202 addReplyBulkLen(c,c->argv[1]);
3203 addReply(c,c->argv[1]);
3204 addReply(c,shared.crlf);
3205 }
3206
3207 /*=================================== Strings =============================== */
3208
3209 static void setGenericCommand(redisClient *c, int nx) {
3210 int retval;
3211
3212 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3213 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3214 if (retval == DICT_ERR) {
3215 if (!nx) {
3216 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3217 incrRefCount(c->argv[2]);
3218 } else {
3219 addReply(c,shared.czero);
3220 return;
3221 }
3222 } else {
3223 incrRefCount(c->argv[1]);
3224 incrRefCount(c->argv[2]);
3225 }
3226 server.dirty++;
3227 removeExpire(c->db,c->argv[1]);
3228 addReply(c, nx ? shared.cone : shared.ok);
3229 }
3230
3231 static void setCommand(redisClient *c) {
3232 setGenericCommand(c,0);
3233 }
3234
3235 static void setnxCommand(redisClient *c) {
3236 setGenericCommand(c,1);
3237 }
3238
3239 static int getGenericCommand(redisClient *c) {
3240 robj *o = lookupKeyRead(c->db,c->argv[1]);
3241
3242 if (o == NULL) {
3243 addReply(c,shared.nullbulk);
3244 return REDIS_OK;
3245 } else {
3246 if (o->type != REDIS_STRING) {
3247 addReply(c,shared.wrongtypeerr);
3248 return REDIS_ERR;
3249 } else {
3250 addReplyBulkLen(c,o);
3251 addReply(c,o);
3252 addReply(c,shared.crlf);
3253 return REDIS_OK;
3254 }
3255 }
3256 }
3257
3258 static void getCommand(redisClient *c) {
3259 getGenericCommand(c);
3260 }
3261
3262 static void getsetCommand(redisClient *c) {
3263 if (getGenericCommand(c) == REDIS_ERR) return;
3264 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3265 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3266 } else {
3267 incrRefCount(c->argv[1]);
3268 }
3269 incrRefCount(c->argv[2]);
3270 server.dirty++;
3271 removeExpire(c->db,c->argv[1]);
3272 }
3273
3274 static void mgetCommand(redisClient *c) {
3275 int j;
3276
3277 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3278 for (j = 1; j < c->argc; j++) {
3279 robj *o = lookupKeyRead(c->db,c->argv[j]);
3280 if (o == NULL) {
3281 addReply(c,shared.nullbulk);
3282 } else {
3283 if (o->type != REDIS_STRING) {
3284 addReply(c,shared.nullbulk);
3285 } else {
3286 addReplyBulkLen(c,o);
3287 addReply(c,o);
3288 addReply(c,shared.crlf);
3289 }
3290 }
3291 }
3292 }
3293
3294 static void msetGenericCommand(redisClient *c, int nx) {
3295 int j, busykeys = 0;
3296
3297 if ((c->argc % 2) == 0) {
3298 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3299 return;
3300 }
3301 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3302 * set nothing at all if at least one already key exists. */
3303 if (nx) {
3304 for (j = 1; j < c->argc; j += 2) {
3305 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3306 busykeys++;
3307 }
3308 }
3309 }
3310 if (busykeys) {
3311 addReply(c, shared.czero);
3312 return;
3313 }
3314
3315 for (j = 1; j < c->argc; j += 2) {
3316 int retval;
3317
3318 tryObjectEncoding(c->argv[j+1]);
3319 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3320 if (retval == DICT_ERR) {
3321 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3322 incrRefCount(c->argv[j+1]);
3323 } else {
3324 incrRefCount(c->argv[j]);
3325 incrRefCount(c->argv[j+1]);
3326 }
3327 removeExpire(c->db,c->argv[j]);
3328 }
3329 server.dirty += (c->argc-1)/2;
3330 addReply(c, nx ? shared.cone : shared.ok);
3331 }
3332
3333 static void msetCommand(redisClient *c) {
3334 msetGenericCommand(c,0);
3335 }
3336
3337 static void msetnxCommand(redisClient *c) {
3338 msetGenericCommand(c,1);
3339 }
3340
3341 static void incrDecrCommand(redisClient *c, long long incr) {
3342 long long value;
3343 int retval;
3344 robj *o;
3345
3346 o = lookupKeyWrite(c->db,c->argv[1]);
3347 if (o == NULL) {
3348 value = 0;
3349 } else {
3350 if (o->type != REDIS_STRING) {
3351 value = 0;
3352 } else {
3353 char *eptr;
3354
3355 if (o->encoding == REDIS_ENCODING_RAW)
3356 value = strtoll(o->ptr, &eptr, 10);
3357 else if (o->encoding == REDIS_ENCODING_INT)
3358 value = (long)o->ptr;
3359 else
3360 redisAssert(1 != 1);
3361 }
3362 }
3363
3364 value += incr;
3365 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3366 tryObjectEncoding(o);
3367 retval = dictAdd(c->db->dict,c->argv[1],o);
3368 if (retval == DICT_ERR) {
3369 dictReplace(c->db->dict,c->argv[1],o);
3370 removeExpire(c->db,c->argv[1]);
3371 } else {
3372 incrRefCount(c->argv[1]);
3373 }
3374 server.dirty++;
3375 addReply(c,shared.colon);
3376 addReply(c,o);
3377 addReply(c,shared.crlf);
3378 }
3379
3380 static void incrCommand(redisClient *c) {
3381 incrDecrCommand(c,1);
3382 }
3383
3384 static void decrCommand(redisClient *c) {
3385 incrDecrCommand(c,-1);
3386 }
3387
3388 static void incrbyCommand(redisClient *c) {
3389 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3390 incrDecrCommand(c,incr);
3391 }
3392
3393 static void decrbyCommand(redisClient *c) {
3394 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3395 incrDecrCommand(c,-incr);
3396 }
3397
3398 /* ========================= Type agnostic commands ========================= */
3399
3400 static void delCommand(redisClient *c) {
3401 int deleted = 0, j;
3402
3403 for (j = 1; j < c->argc; j++) {
3404 if (deleteKey(c->db,c->argv[j])) {
3405 server.dirty++;
3406 deleted++;
3407 }
3408 }
3409 switch(deleted) {
3410 case 0:
3411 addReply(c,shared.czero);
3412 break;
3413 case 1:
3414 addReply(c,shared.cone);
3415 break;
3416 default:
3417 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3418 break;
3419 }
3420 }
3421
3422 static void existsCommand(redisClient *c) {
3423 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3424 }
3425
3426 static void selectCommand(redisClient *c) {
3427 int id = atoi(c->argv[1]->ptr);
3428
3429 if (selectDb(c,id) == REDIS_ERR) {
3430 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3431 } else {
3432 addReply(c,shared.ok);
3433 }
3434 }
3435
3436 static void randomkeyCommand(redisClient *c) {
3437 dictEntry *de;
3438
3439 while(1) {
3440 de = dictGetRandomKey(c->db->dict);
3441 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3442 }
3443 if (de == NULL) {
3444 addReply(c,shared.plus);
3445 addReply(c,shared.crlf);
3446 } else {
3447 addReply(c,shared.plus);
3448 addReply(c,dictGetEntryKey(de));
3449 addReply(c,shared.crlf);
3450 }
3451 }
3452
3453 static void keysCommand(redisClient *c) {
3454 dictIterator *di;
3455 dictEntry *de;
3456 sds pattern = c->argv[1]->ptr;
3457 int plen = sdslen(pattern);
3458 unsigned long numkeys = 0, keyslen = 0;
3459 robj *lenobj = createObject(REDIS_STRING,NULL);
3460
3461 di = dictGetIterator(c->db->dict);
3462 addReply(c,lenobj);
3463 decrRefCount(lenobj);
3464 while((de = dictNext(di)) != NULL) {
3465 robj *keyobj = dictGetEntryKey(de);
3466
3467 sds key = keyobj->ptr;
3468 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3469 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3470 if (expireIfNeeded(c->db,keyobj) == 0) {
3471 if (numkeys != 0)
3472 addReply(c,shared.space);
3473 addReply(c,keyobj);
3474 numkeys++;
3475 keyslen += sdslen(key);
3476 }
3477 }
3478 }
3479 dictReleaseIterator(di);
3480 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3481 addReply(c,shared.crlf);
3482 }
3483
3484 static void dbsizeCommand(redisClient *c) {
3485 addReplySds(c,
3486 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3487 }
3488
3489 static void lastsaveCommand(redisClient *c) {
3490 addReplySds(c,
3491 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3492 }
3493
3494 static void typeCommand(redisClient *c) {
3495 robj *o;
3496 char *type;
3497
3498 o = lookupKeyRead(c->db,c->argv[1]);
3499 if (o == NULL) {
3500 type = "+none";
3501 } else {
3502 switch(o->type) {
3503 case REDIS_STRING: type = "+string"; break;
3504 case REDIS_LIST: type = "+list"; break;
3505 case REDIS_SET: type = "+set"; break;
3506 case REDIS_ZSET: type = "+zset"; break;
3507 default: type = "unknown"; break;
3508 }
3509 }
3510 addReplySds(c,sdsnew(type));
3511 addReply(c,shared.crlf);
3512 }
3513
3514 static void saveCommand(redisClient *c) {
3515 if (server.bgsavechildpid != -1) {
3516 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3517 return;
3518 }
3519 if (rdbSave(server.dbfilename) == REDIS_OK) {
3520 addReply(c,shared.ok);
3521 } else {
3522 addReply(c,shared.err);
3523 }
3524 }
3525
3526 static void bgsaveCommand(redisClient *c) {
3527 if (server.bgsavechildpid != -1) {
3528 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3529 return;
3530 }
3531 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3532 char *status = "+Background saving started\r\n";
3533 addReplySds(c,sdsnew(status));
3534 } else {
3535 addReply(c,shared.err);
3536 }
3537 }
3538
3539 static void shutdownCommand(redisClient *c) {
3540 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3541 /* Kill the saving child if there is a background saving in progress.
3542 We want to avoid race conditions, for instance our saving child may
3543 overwrite the synchronous saving did by SHUTDOWN. */
3544 if (server.bgsavechildpid != -1) {
3545 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3546 kill(server.bgsavechildpid,SIGKILL);
3547 rdbRemoveTempFile(server.bgsavechildpid);
3548 }
3549 if (server.appendonly) {
3550 /* Append only file: fsync() the AOF and exit */
3551 fsync(server.appendfd);
3552 exit(0);
3553 } else {
3554 /* Snapshotting. Perform a SYNC SAVE and exit */
3555 if (rdbSave(server.dbfilename) == REDIS_OK) {
3556 if (server.daemonize)
3557 unlink(server.pidfile);
3558 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3559 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3560 exit(0);
3561 } else {
3562 /* Ooops.. error saving! The best we can do is to continue operating.
3563 * Note that if there was a background saving process, in the next
3564 * cron() Redis will be notified that the background saving aborted,
3565 * handling special stuff like slaves pending for synchronization... */
3566 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3567 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3568 }
3569 }
3570 }
3571
3572 static void renameGenericCommand(redisClient *c, int nx) {
3573 robj *o;
3574
3575 /* To use the same key as src and dst is probably an error */
3576 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3577 addReply(c,shared.sameobjecterr);
3578 return;
3579 }
3580
3581 o = lookupKeyWrite(c->db,c->argv[1]);
3582 if (o == NULL) {
3583 addReply(c,shared.nokeyerr);
3584 return;
3585 }
3586 incrRefCount(o);
3587 deleteIfVolatile(c->db,c->argv[2]);
3588 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3589 if (nx) {
3590 decrRefCount(o);
3591 addReply(c,shared.czero);
3592 return;
3593 }
3594 dictReplace(c->db->dict,c->argv[2],o);
3595 } else {
3596 incrRefCount(c->argv[2]);
3597 }
3598 deleteKey(c->db,c->argv[1]);
3599 server.dirty++;
3600 addReply(c,nx ? shared.cone : shared.ok);
3601 }
3602
3603 static void renameCommand(redisClient *c) {
3604 renameGenericCommand(c,0);
3605 }
3606
3607 static void renamenxCommand(redisClient *c) {
3608 renameGenericCommand(c,1);
3609 }
3610
3611 static void moveCommand(redisClient *c) {
3612 robj *o;
3613 redisDb *src, *dst;
3614 int srcid;
3615
3616 /* Obtain source and target DB pointers */
3617 src = c->db;
3618 srcid = c->db->id;
3619 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3620 addReply(c,shared.outofrangeerr);
3621 return;
3622 }
3623 dst = c->db;
3624 selectDb(c,srcid); /* Back to the source DB */
3625
3626 /* If the user is moving using as target the same
3627 * DB as the source DB it is probably an error. */
3628 if (src == dst) {
3629 addReply(c,shared.sameobjecterr);
3630 return;
3631 }
3632
3633 /* Check if the element exists and get a reference */
3634 o = lookupKeyWrite(c->db,c->argv[1]);
3635 if (!o) {
3636 addReply(c,shared.czero);
3637 return;
3638 }
3639
3640 /* Try to add the element to the target DB */
3641 deleteIfVolatile(dst,c->argv[1]);
3642 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3643 addReply(c,shared.czero);
3644 return;
3645 }
3646 incrRefCount(c->argv[1]);
3647 incrRefCount(o);
3648
3649 /* OK! key moved, free the entry in the source DB */
3650 deleteKey(src,c->argv[1]);
3651 server.dirty++;
3652 addReply(c,shared.cone);
3653 }
3654
3655 /* =================================== Lists ================================ */
3656 static void pushGenericCommand(redisClient *c, int where) {
3657 robj *lobj;
3658 list *list;
3659
3660 lobj = lookupKeyWrite(c->db,c->argv[1]);
3661 if (lobj == NULL) {
3662 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3663 addReply(c,shared.ok);
3664 return;
3665 }
3666 lobj = createListObject();
3667 list = lobj->ptr;
3668 if (where == REDIS_HEAD) {
3669 listAddNodeHead(list,c->argv[2]);
3670 } else {
3671 listAddNodeTail(list,c->argv[2]);
3672 }
3673 dictAdd(c->db->dict,c->argv[1],lobj);
3674 incrRefCount(c->argv[1]);
3675 incrRefCount(c->argv[2]);
3676 } else {
3677 if (lobj->type != REDIS_LIST) {
3678 addReply(c,shared.wrongtypeerr);
3679 return;
3680 }
3681 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3682 addReply(c,shared.ok);
3683 return;
3684 }
3685 list = lobj->ptr;
3686 if (where == REDIS_HEAD) {
3687 listAddNodeHead(list,c->argv[2]);
3688 } else {
3689 listAddNodeTail(list,c->argv[2]);
3690 }
3691 incrRefCount(c->argv[2]);
3692 }
3693 server.dirty++;
3694 addReply(c,shared.ok);
3695 }
3696
3697 static void lpushCommand(redisClient *c) {
3698 pushGenericCommand(c,REDIS_HEAD);
3699 }
3700
3701 static void rpushCommand(redisClient *c) {
3702 pushGenericCommand(c,REDIS_TAIL);
3703 }
3704
3705 static void llenCommand(redisClient *c) {
3706 robj *o;
3707 list *l;
3708
3709 o = lookupKeyRead(c->db,c->argv[1]);
3710 if (o == NULL) {
3711 addReply(c,shared.czero);
3712 return;
3713 } else {
3714 if (o->type != REDIS_LIST) {
3715 addReply(c,shared.wrongtypeerr);
3716 } else {
3717 l = o->ptr;
3718 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3719 }
3720 }
3721 }
3722
3723 static void lindexCommand(redisClient *c) {
3724 robj *o;
3725 int index = atoi(c->argv[2]->ptr);
3726
3727 o = lookupKeyRead(c->db,c->argv[1]);
3728 if (o == NULL) {
3729 addReply(c,shared.nullbulk);
3730 } else {
3731 if (o->type != REDIS_LIST) {
3732 addReply(c,shared.wrongtypeerr);
3733 } else {
3734 list *list = o->ptr;
3735 listNode *ln;
3736
3737 ln = listIndex(list, index);
3738 if (ln == NULL) {
3739 addReply(c,shared.nullbulk);
3740 } else {
3741 robj *ele = listNodeValue(ln);
3742 addReplyBulkLen(c,ele);
3743 addReply(c,ele);
3744 addReply(c,shared.crlf);
3745 }
3746 }
3747 }
3748 }
3749
3750 static void lsetCommand(redisClient *c) {
3751 robj *o;
3752 int index = atoi(c->argv[2]->ptr);
3753
3754 o = lookupKeyWrite(c->db,c->argv[1]);
3755 if (o == NULL) {
3756 addReply(c,shared.nokeyerr);
3757 } else {
3758 if (o->type != REDIS_LIST) {
3759 addReply(c,shared.wrongtypeerr);
3760 } else {
3761 list *list = o->ptr;
3762 listNode *ln;
3763
3764 ln = listIndex(list, index);
3765 if (ln == NULL) {
3766 addReply(c,shared.outofrangeerr);
3767 } else {
3768 robj *ele = listNodeValue(ln);
3769
3770 decrRefCount(ele);
3771 listNodeValue(ln) = c->argv[3];
3772 incrRefCount(c->argv[3]);
3773 addReply(c,shared.ok);
3774 server.dirty++;
3775 }
3776 }
3777 }
3778 }
3779
3780 static void popGenericCommand(redisClient *c, int where) {
3781 robj *o;
3782
3783 o = lookupKeyWrite(c->db,c->argv[1]);
3784 if (o == NULL) {
3785 addReply(c,shared.nullbulk);
3786 } else {
3787 if (o->type != REDIS_LIST) {
3788 addReply(c,shared.wrongtypeerr);
3789 } else {
3790 list *list = o->ptr;
3791 listNode *ln;
3792
3793 if (where == REDIS_HEAD)
3794 ln = listFirst(list);
3795 else
3796 ln = listLast(list);
3797
3798 if (ln == NULL) {
3799 addReply(c,shared.nullbulk);
3800 } else {
3801 robj *ele = listNodeValue(ln);
3802 addReplyBulkLen(c,ele);
3803 addReply(c,ele);
3804 addReply(c,shared.crlf);
3805 listDelNode(list,ln);
3806 server.dirty++;
3807 }
3808 }
3809 }
3810 }
3811
3812 static void lpopCommand(redisClient *c) {
3813 popGenericCommand(c,REDIS_HEAD);
3814 }
3815
3816 static void rpopCommand(redisClient *c) {
3817 popGenericCommand(c,REDIS_TAIL);
3818 }
3819
3820 static void lrangeCommand(redisClient *c) {
3821 robj *o;
3822 int start = atoi(c->argv[2]->ptr);
3823 int end = atoi(c->argv[3]->ptr);
3824
3825 o = lookupKeyRead(c->db,c->argv[1]);
3826 if (o == NULL) {
3827 addReply(c,shared.nullmultibulk);
3828 } else {
3829 if (o->type != REDIS_LIST) {
3830 addReply(c,shared.wrongtypeerr);
3831 } else {
3832 list *list = o->ptr;
3833 listNode *ln;
3834 int llen = listLength(list);
3835 int rangelen, j;
3836 robj *ele;
3837
3838 /* convert negative indexes */
3839 if (start < 0) start = llen+start;
3840 if (end < 0) end = llen+end;
3841 if (start < 0) start = 0;
3842 if (end < 0) end = 0;
3843
3844 /* indexes sanity checks */
3845 if (start > end || start >= llen) {
3846 /* Out of range start or start > end result in empty list */
3847 addReply(c,shared.emptymultibulk);
3848 return;
3849 }
3850 if (end >= llen) end = llen-1;
3851 rangelen = (end-start)+1;
3852
3853 /* Return the result in form of a multi-bulk reply */
3854 ln = listIndex(list, start);
3855 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3856 for (j = 0; j < rangelen; j++) {
3857 ele = listNodeValue(ln);
3858 addReplyBulkLen(c,ele);
3859 addReply(c,ele);
3860 addReply(c,shared.crlf);
3861 ln = ln->next;
3862 }
3863 }
3864 }
3865 }
3866
3867 static void ltrimCommand(redisClient *c) {
3868 robj *o;
3869 int start = atoi(c->argv[2]->ptr);
3870 int end = atoi(c->argv[3]->ptr);
3871
3872 o = lookupKeyWrite(c->db,c->argv[1]);
3873 if (o == NULL) {
3874 addReply(c,shared.ok);
3875 } else {
3876 if (o->type != REDIS_LIST) {
3877 addReply(c,shared.wrongtypeerr);
3878 } else {
3879 list *list = o->ptr;
3880 listNode *ln;
3881 int llen = listLength(list);
3882 int j, ltrim, rtrim;
3883
3884 /* convert negative indexes */
3885 if (start < 0) start = llen+start;
3886 if (end < 0) end = llen+end;
3887 if (start < 0) start = 0;
3888 if (end < 0) end = 0;
3889
3890 /* indexes sanity checks */
3891 if (start > end || start >= llen) {
3892 /* Out of range start or start > end result in empty list */
3893 ltrim = llen;
3894 rtrim = 0;
3895 } else {
3896 if (end >= llen) end = llen-1;
3897 ltrim = start;
3898 rtrim = llen-end-1;
3899 }
3900
3901 /* Remove list elements to perform the trim */
3902 for (j = 0; j < ltrim; j++) {
3903 ln = listFirst(list);
3904 listDelNode(list,ln);
3905 }
3906 for (j = 0; j < rtrim; j++) {
3907 ln = listLast(list);
3908 listDelNode(list,ln);
3909 }
3910 server.dirty++;
3911 addReply(c,shared.ok);
3912 }
3913 }
3914 }
3915
3916 static void lremCommand(redisClient *c) {
3917 robj *o;
3918
3919 o = lookupKeyWrite(c->db,c->argv[1]);
3920 if (o == NULL) {
3921 addReply(c,shared.czero);
3922 } else {
3923 if (o->type != REDIS_LIST) {
3924 addReply(c,shared.wrongtypeerr);
3925 } else {
3926 list *list = o->ptr;
3927 listNode *ln, *next;
3928 int toremove = atoi(c->argv[2]->ptr);
3929 int removed = 0;
3930 int fromtail = 0;
3931
3932 if (toremove < 0) {
3933 toremove = -toremove;
3934 fromtail = 1;
3935 }
3936 ln = fromtail ? list->tail : list->head;
3937 while (ln) {
3938 robj *ele = listNodeValue(ln);
3939
3940 next = fromtail ? ln->prev : ln->next;
3941 if (compareStringObjects(ele,c->argv[3]) == 0) {
3942 listDelNode(list,ln);
3943 server.dirty++;
3944 removed++;
3945 if (toremove && removed == toremove) break;
3946 }
3947 ln = next;
3948 }
3949 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3950 }
3951 }
3952 }
3953
3954 /* This is the semantic of this command:
3955 * RPOPLPUSH srclist dstlist:
3956 * IF LLEN(srclist) > 0
3957 * element = RPOP srclist
3958 * LPUSH dstlist element
3959 * RETURN element
3960 * ELSE
3961 * RETURN nil
3962 * END
3963 * END
3964 *
3965 * The idea is to be able to get an element from a list in a reliable way
3966 * since the element is not just returned but pushed against another list
3967 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3968 */
3969 static void rpoplpushcommand(redisClient *c) {
3970 robj *sobj;
3971
3972 sobj = lookupKeyWrite(c->db,c->argv[1]);
3973 if (sobj == NULL) {
3974 addReply(c,shared.nullbulk);
3975 } else {
3976 if (sobj->type != REDIS_LIST) {
3977 addReply(c,shared.wrongtypeerr);
3978 } else {
3979 list *srclist = sobj->ptr;
3980 listNode *ln = listLast(srclist);
3981
3982 if (ln == NULL) {
3983 addReply(c,shared.nullbulk);
3984 } else {
3985 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3986 robj *ele = listNodeValue(ln);
3987 list *dstlist;
3988
3989 if (dobj && dobj->type != REDIS_LIST) {
3990 addReply(c,shared.wrongtypeerr);
3991 return;
3992 }
3993
3994 /* Add the element to the target list (unless it's directly
3995 * passed to some BLPOP-ing client */
3996 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3997 if (dobj == NULL) {
3998 /* Create the list if the key does not exist */
3999 dobj = createListObject();
4000 dictAdd(c->db->dict,c->argv[2],dobj);
4001 incrRefCount(c->argv[2]);
4002 }
4003 dstlist = dobj->ptr;
4004 listAddNodeHead(dstlist,ele);
4005 incrRefCount(ele);
4006 }
4007
4008 /* Send the element to the client as reply as well */
4009 addReplyBulkLen(c,ele);
4010 addReply(c,ele);
4011 addReply(c,shared.crlf);
4012
4013 /* Finally remove the element from the source list */
4014 listDelNode(srclist,ln);
4015 server.dirty++;
4016 }
4017 }
4018 }
4019 }
4020
4021
4022 /* ==================================== Sets ================================ */
4023
4024 static void saddCommand(redisClient *c) {
4025 robj *set;
4026
4027 set = lookupKeyWrite(c->db,c->argv[1]);
4028 if (set == NULL) {
4029 set = createSetObject();
4030 dictAdd(c->db->dict,c->argv[1],set);
4031 incrRefCount(c->argv[1]);
4032 } else {
4033 if (set->type != REDIS_SET) {
4034 addReply(c,shared.wrongtypeerr);
4035 return;
4036 }
4037 }
4038 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
4039 incrRefCount(c->argv[2]);
4040 server.dirty++;
4041 addReply(c,shared.cone);
4042 } else {
4043 addReply(c,shared.czero);
4044 }
4045 }
4046
4047 static void sremCommand(redisClient *c) {
4048 robj *set;
4049
4050 set = lookupKeyWrite(c->db,c->argv[1]);
4051 if (set == NULL) {
4052 addReply(c,shared.czero);
4053 } else {
4054 if (set->type != REDIS_SET) {
4055 addReply(c,shared.wrongtypeerr);
4056 return;
4057 }
4058 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4059 server.dirty++;
4060 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4061 addReply(c,shared.cone);
4062 } else {
4063 addReply(c,shared.czero);
4064 }
4065 }
4066 }
4067
4068 static void smoveCommand(redisClient *c) {
4069 robj *srcset, *dstset;
4070
4071 srcset = lookupKeyWrite(c->db,c->argv[1]);
4072 dstset = lookupKeyWrite(c->db,c->argv[2]);
4073
4074 /* If the source key does not exist return 0, if it's of the wrong type
4075 * raise an error */
4076 if (srcset == NULL || srcset->type != REDIS_SET) {
4077 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4078 return;
4079 }
4080 /* Error if the destination key is not a set as well */
4081 if (dstset && dstset->type != REDIS_SET) {
4082 addReply(c,shared.wrongtypeerr);
4083 return;
4084 }
4085 /* Remove the element from the source set */
4086 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4087 /* Key not found in the src set! return zero */
4088 addReply(c,shared.czero);
4089 return;
4090 }
4091 server.dirty++;
4092 /* Add the element to the destination set */
4093 if (!dstset) {
4094 dstset = createSetObject();
4095 dictAdd(c->db->dict,c->argv[2],dstset);
4096 incrRefCount(c->argv[2]);
4097 }
4098 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4099 incrRefCount(c->argv[3]);
4100 addReply(c,shared.cone);
4101 }
4102
4103 static void sismemberCommand(redisClient *c) {
4104 robj *set;
4105
4106 set = lookupKeyRead(c->db,c->argv[1]);
4107 if (set == NULL) {
4108 addReply(c,shared.czero);
4109 } else {
4110 if (set->type != REDIS_SET) {
4111 addReply(c,shared.wrongtypeerr);
4112 return;
4113 }
4114 if (dictFind(set->ptr,c->argv[2]))
4115 addReply(c,shared.cone);
4116 else
4117 addReply(c,shared.czero);
4118 }
4119 }
4120
4121 static void scardCommand(redisClient *c) {
4122 robj *o;
4123 dict *s;
4124
4125 o = lookupKeyRead(c->db,c->argv[1]);
4126 if (o == NULL) {
4127 addReply(c,shared.czero);
4128 return;
4129 } else {
4130 if (o->type != REDIS_SET) {
4131 addReply(c,shared.wrongtypeerr);
4132 } else {
4133 s = o->ptr;
4134 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4135 dictSize(s)));
4136 }
4137 }
4138 }
4139
4140 static void spopCommand(redisClient *c) {
4141 robj *set;
4142 dictEntry *de;
4143
4144 set = lookupKeyWrite(c->db,c->argv[1]);
4145 if (set == NULL) {
4146 addReply(c,shared.nullbulk);
4147 } else {
4148 if (set->type != REDIS_SET) {
4149 addReply(c,shared.wrongtypeerr);
4150 return;
4151 }
4152 de = dictGetRandomKey(set->ptr);
4153 if (de == NULL) {
4154 addReply(c,shared.nullbulk);
4155 } else {
4156 robj *ele = dictGetEntryKey(de);
4157
4158 addReplyBulkLen(c,ele);
4159 addReply(c,ele);
4160 addReply(c,shared.crlf);
4161 dictDelete(set->ptr,ele);
4162 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4163 server.dirty++;
4164 }
4165 }
4166 }
4167
4168 static void srandmemberCommand(redisClient *c) {
4169 robj *set;
4170 dictEntry *de;
4171
4172 set = lookupKeyRead(c->db,c->argv[1]);
4173 if (set == NULL) {
4174 addReply(c,shared.nullbulk);
4175 } else {
4176 if (set->type != REDIS_SET) {
4177 addReply(c,shared.wrongtypeerr);
4178 return;
4179 }
4180 de = dictGetRandomKey(set->ptr);
4181 if (de == NULL) {
4182 addReply(c,shared.nullbulk);
4183 } else {
4184 robj *ele = dictGetEntryKey(de);
4185
4186 addReplyBulkLen(c,ele);
4187 addReply(c,ele);
4188 addReply(c,shared.crlf);
4189 }
4190 }
4191 }
4192
4193 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4194 dict **d1 = (void*) s1, **d2 = (void*) s2;
4195
4196 return dictSize(*d1)-dictSize(*d2);
4197 }
4198
4199 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4200 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4201 dictIterator *di;
4202 dictEntry *de;
4203 robj *lenobj = NULL, *dstset = NULL;
4204 unsigned long j, cardinality = 0;
4205
4206 for (j = 0; j < setsnum; j++) {
4207 robj *setobj;
4208
4209 setobj = dstkey ?
4210 lookupKeyWrite(c->db,setskeys[j]) :
4211 lookupKeyRead(c->db,setskeys[j]);
4212 if (!setobj) {
4213 zfree(dv);
4214 if (dstkey) {
4215 if (deleteKey(c->db,dstkey))
4216 server.dirty++;
4217 addReply(c,shared.czero);
4218 } else {
4219 addReply(c,shared.nullmultibulk);
4220 }
4221 return;
4222 }
4223 if (setobj->type != REDIS_SET) {
4224 zfree(dv);
4225 addReply(c,shared.wrongtypeerr);
4226 return;
4227 }
4228 dv[j] = setobj->ptr;
4229 }
4230 /* Sort sets from the smallest to largest, this will improve our
4231 * algorithm's performace */
4232 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4233
4234 /* The first thing we should output is the total number of elements...
4235 * since this is a multi-bulk write, but at this stage we don't know
4236 * the intersection set size, so we use a trick, append an empty object
4237 * to the output list and save the pointer to later modify it with the
4238 * right length */
4239 if (!dstkey) {
4240 lenobj = createObject(REDIS_STRING,NULL);
4241 addReply(c,lenobj);
4242 decrRefCount(lenobj);
4243 } else {
4244 /* If we have a target key where to store the resulting set
4245 * create this key with an empty set inside */
4246 dstset = createSetObject();
4247 }
4248
4249 /* Iterate all the elements of the first (smallest) set, and test
4250 * the element against all the other sets, if at least one set does
4251 * not include the element it is discarded */
4252 di = dictGetIterator(dv[0]);
4253
4254 while((de = dictNext(di)) != NULL) {
4255 robj *ele;
4256
4257 for (j = 1; j < setsnum; j++)
4258 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4259 if (j != setsnum)
4260 continue; /* at least one set does not contain the member */
4261 ele = dictGetEntryKey(de);
4262 if (!dstkey) {
4263 addReplyBulkLen(c,ele);
4264 addReply(c,ele);
4265 addReply(c,shared.crlf);
4266 cardinality++;
4267 } else {
4268 dictAdd(dstset->ptr,ele,NULL);
4269 incrRefCount(ele);
4270 }
4271 }
4272 dictReleaseIterator(di);
4273
4274 if (dstkey) {
4275 /* Store the resulting set into the target */
4276 deleteKey(c->db,dstkey);
4277 dictAdd(c->db->dict,dstkey,dstset);
4278 incrRefCount(dstkey);
4279 }
4280
4281 if (!dstkey) {
4282 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4283 } else {
4284 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4285 dictSize((dict*)dstset->ptr)));
4286 server.dirty++;
4287 }
4288 zfree(dv);
4289 }
4290
4291 static void sinterCommand(redisClient *c) {
4292 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4293 }
4294
4295 static void sinterstoreCommand(redisClient *c) {
4296 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4297 }
4298
4299 #define REDIS_OP_UNION 0
4300 #define REDIS_OP_DIFF 1
4301
4302 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4303 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4304 dictIterator *di;
4305 dictEntry *de;
4306 robj *dstset = NULL;
4307 int j, cardinality = 0;
4308
4309 for (j = 0; j < setsnum; j++) {
4310 robj *setobj;
4311
4312 setobj = dstkey ?
4313 lookupKeyWrite(c->db,setskeys[j]) :
4314 lookupKeyRead(c->db,setskeys[j]);
4315 if (!setobj) {
4316 dv[j] = NULL;
4317 continue;
4318 }
4319 if (setobj->type != REDIS_SET) {
4320 zfree(dv);
4321 addReply(c,shared.wrongtypeerr);
4322 return;
4323 }
4324 dv[j] = setobj->ptr;
4325 }
4326
4327 /* We need a temp set object to store our union. If the dstkey
4328 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4329 * this set object will be the resulting object to set into the target key*/
4330 dstset = createSetObject();
4331
4332 /* Iterate all the elements of all the sets, add every element a single
4333 * time to the result set */
4334 for (j = 0; j < setsnum; j++) {
4335 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4336 if (!dv[j]) continue; /* non existing keys are like empty sets */
4337
4338 di = dictGetIterator(dv[j]);
4339
4340 while((de = dictNext(di)) != NULL) {
4341 robj *ele;
4342
4343 /* dictAdd will not add the same element multiple times */
4344 ele = dictGetEntryKey(de);
4345 if (op == REDIS_OP_UNION || j == 0) {
4346 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4347 incrRefCount(ele);
4348 cardinality++;
4349 }
4350 } else if (op == REDIS_OP_DIFF) {
4351 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4352 cardinality--;
4353 }
4354 }
4355 }
4356 dictReleaseIterator(di);
4357
4358 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4359 }
4360
4361 /* Output the content of the resulting set, if not in STORE mode */
4362 if (!dstkey) {
4363 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4364 di = dictGetIterator(dstset->ptr);
4365 while((de = dictNext(di)) != NULL) {
4366 robj *ele;
4367
4368 ele = dictGetEntryKey(de);
4369 addReplyBulkLen(c,ele);
4370 addReply(c,ele);
4371 addReply(c,shared.crlf);
4372 }
4373 dictReleaseIterator(di);
4374 } else {
4375 /* If we have a target key where to store the resulting set
4376 * create this key with the result set inside */
4377 deleteKey(c->db,dstkey);
4378 dictAdd(c->db->dict,dstkey,dstset);
4379 incrRefCount(dstkey);
4380 }
4381
4382 /* Cleanup */
4383 if (!dstkey) {
4384 decrRefCount(dstset);
4385 } else {
4386 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4387 dictSize((dict*)dstset->ptr)));
4388 server.dirty++;
4389 }
4390 zfree(dv);
4391 }
4392
4393 static void sunionCommand(redisClient *c) {
4394 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4395 }
4396
4397 static void sunionstoreCommand(redisClient *c) {
4398 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4399 }
4400
4401 static void sdiffCommand(redisClient *c) {
4402 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4403 }
4404
4405 static void sdiffstoreCommand(redisClient *c) {
4406 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4407 }
4408
4409 /* ==================================== ZSets =============================== */
4410
4411 /* ZSETs are ordered sets using two data structures to hold the same elements
4412 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4413 * data structure.
4414 *
4415 * The elements are added to an hash table mapping Redis objects to scores.
4416 * At the same time the elements are added to a skip list mapping scores
4417 * to Redis objects (so objects are sorted by scores in this "view"). */
4418
4419 /* This skiplist implementation is almost a C translation of the original
4420 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4421 * Alternative to Balanced Trees", modified in three ways:
4422 * a) this implementation allows for repeated values.
4423 * b) the comparison is not just by key (our 'score') but by satellite data.
4424 * c) there is a back pointer, so it's a doubly linked list with the back
4425 * pointers being only at "level 1". This allows to traverse the list
4426 * from tail to head, useful for ZREVRANGE. */
4427
4428 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4429 zskiplistNode *zn = zmalloc(sizeof(*zn));
4430
4431 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4432 zn->score = score;
4433 zn->obj = obj;
4434 return zn;
4435 }
4436
4437 static zskiplist *zslCreate(void) {
4438 int j;
4439 zskiplist *zsl;
4440
4441 zsl = zmalloc(sizeof(*zsl));
4442 zsl->level = 1;
4443 zsl->length = 0;
4444 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4445 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4446 zsl->header->forward[j] = NULL;
4447 zsl->header->backward = NULL;
4448 zsl->tail = NULL;
4449 return zsl;
4450 }
4451
4452 static void zslFreeNode(zskiplistNode *node) {
4453 decrRefCount(node->obj);
4454 zfree(node->forward);
4455 zfree(node);
4456 }
4457
4458 static void zslFree(zskiplist *zsl) {
4459 zskiplistNode *node = zsl->header->forward[0], *next;
4460
4461 zfree(zsl->header->forward);
4462 zfree(zsl->header);
4463 while(node) {
4464 next = node->forward[0];
4465 zslFreeNode(node);
4466 node = next;
4467 }
4468 zfree(zsl);
4469 }
4470
4471 static int zslRandomLevel(void) {
4472 int level = 1;
4473 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4474 level += 1;
4475 return level;
4476 }
4477
4478 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4479 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4480 int i, level;
4481
4482 x = zsl->header;
4483 for (i = zsl->level-1; i >= 0; i--) {
4484 while (x->forward[i] &&
4485 (x->forward[i]->score < score ||
4486 (x->forward[i]->score == score &&
4487 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4488 x = x->forward[i];
4489 update[i] = x;
4490 }
4491 /* we assume the key is not already inside, since we allow duplicated
4492 * scores, and the re-insertion of score and redis object should never
4493 * happpen since the caller of zslInsert() should test in the hash table
4494 * if the element is already inside or not. */
4495 level = zslRandomLevel();
4496 if (level > zsl->level) {
4497 for (i = zsl->level; i < level; i++)
4498 update[i] = zsl->header;
4499 zsl->level = level;
4500 }
4501 x = zslCreateNode(level,score,obj);
4502 for (i = 0; i < level; i++) {
4503 x->forward[i] = update[i]->forward[i];
4504 update[i]->forward[i] = x;
4505 }
4506 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4507 if (x->forward[0])
4508 x->forward[0]->backward = x;
4509 else
4510 zsl->tail = x;
4511 zsl->length++;
4512 }
4513
4514 /* Delete an element with matching score/object from the skiplist. */
4515 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4516 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4517 int i;
4518
4519 x = zsl->header;
4520 for (i = zsl->level-1; i >= 0; i--) {
4521 while (x->forward[i] &&
4522 (x->forward[i]->score < score ||
4523 (x->forward[i]->score == score &&
4524 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4525 x = x->forward[i];
4526 update[i] = x;
4527 }
4528 /* We may have multiple elements with the same score, what we need
4529 * is to find the element with both the right score and object. */
4530 x = x->forward[0];
4531 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4532 for (i = 0; i < zsl->level; i++) {
4533 if (update[i]->forward[i] != x) break;
4534 update[i]->forward[i] = x->forward[i];
4535 }
4536 if (x->forward[0]) {
4537 x->forward[0]->backward = (x->backward == zsl->header) ?
4538 NULL : x->backward;
4539 } else {
4540 zsl->tail = x->backward;
4541 }
4542 zslFreeNode(x);
4543 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4544 zsl->level--;
4545 zsl->length--;
4546 return 1;
4547 } else {
4548 return 0; /* not found */
4549 }
4550 return 0; /* not found */
4551 }
4552
4553 /* Delete all the elements with score between min and max from the skiplist.
4554 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4555 * Note that this function takes the reference to the hash table view of the
4556 * sorted set, in order to remove the elements from the hash table too. */
4557 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4558 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4559 unsigned long removed = 0;
4560 int i;
4561
4562 x = zsl->header;
4563 for (i = zsl->level-1; i >= 0; i--) {
4564 while (x->forward[i] && x->forward[i]->score < min)
4565 x = x->forward[i];
4566 update[i] = x;
4567 }
4568 /* We may have multiple elements with the same score, what we need
4569 * is to find the element with both the right score and object. */
4570 x = x->forward[0];
4571 while (x && x->score <= max) {
4572 zskiplistNode *next;
4573
4574 for (i = 0; i < zsl->level; i++) {
4575 if (update[i]->forward[i] != x) break;
4576 update[i]->forward[i] = x->forward[i];
4577 }
4578 if (x->forward[0]) {
4579 x->forward[0]->backward = (x->backward == zsl->header) ?
4580 NULL : x->backward;
4581 } else {
4582 zsl->tail = x->backward;
4583 }
4584 next = x->forward[0];
4585 dictDelete(dict,x->obj);
4586 zslFreeNode(x);
4587 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4588 zsl->level--;
4589 zsl->length--;
4590 removed++;
4591 x = next;
4592 }
4593 return removed; /* not found */
4594 }
4595
4596 /* Find the first node having a score equal or greater than the specified one.
4597 * Returns NULL if there is no match. */
4598 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4599 zskiplistNode *x;
4600 int i;
4601
4602 x = zsl->header;
4603 for (i = zsl->level-1; i >= 0; i--) {
4604 while (x->forward[i] && x->forward[i]->score < score)
4605 x = x->forward[i];
4606 }
4607 /* We may have multiple elements with the same score, what we need
4608 * is to find the element with both the right score and object. */
4609 return x->forward[0];
4610 }
4611
4612 /* The actual Z-commands implementations */
4613
4614 /* This generic command implements both ZADD and ZINCRBY.
4615 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4616 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4617 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4618 robj *zsetobj;
4619 zset *zs;
4620 double *score;
4621
4622 zsetobj = lookupKeyWrite(c->db,key);
4623 if (zsetobj == NULL) {
4624 zsetobj = createZsetObject();
4625 dictAdd(c->db->dict,key,zsetobj);
4626 incrRefCount(key);
4627 } else {
4628 if (zsetobj->type != REDIS_ZSET) {
4629 addReply(c,shared.wrongtypeerr);
4630 return;
4631 }
4632 }
4633 zs = zsetobj->ptr;
4634
4635 /* Ok now since we implement both ZADD and ZINCRBY here the code
4636 * needs to handle the two different conditions. It's all about setting
4637 * '*score', that is, the new score to set, to the right value. */
4638 score = zmalloc(sizeof(double));
4639 if (doincrement) {
4640 dictEntry *de;
4641
4642 /* Read the old score. If the element was not present starts from 0 */
4643 de = dictFind(zs->dict,ele);
4644 if (de) {
4645 double *oldscore = dictGetEntryVal(de);
4646 *score = *oldscore + scoreval;
4647 } else {
4648 *score = scoreval;
4649 }
4650 } else {
4651 *score = scoreval;
4652 }
4653
4654 /* What follows is a simple remove and re-insert operation that is common
4655 * to both ZADD and ZINCRBY... */
4656 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4657 /* case 1: New element */
4658 incrRefCount(ele); /* added to hash */
4659 zslInsert(zs->zsl,*score,ele);
4660 incrRefCount(ele); /* added to skiplist */
4661 server.dirty++;
4662 if (doincrement)
4663 addReplyDouble(c,*score);
4664 else
4665 addReply(c,shared.cone);
4666 } else {
4667 dictEntry *de;
4668 double *oldscore;
4669
4670 /* case 2: Score update operation */
4671 de = dictFind(zs->dict,ele);
4672 redisAssert(de != NULL);
4673 oldscore = dictGetEntryVal(de);
4674 if (*score != *oldscore) {
4675 int deleted;
4676
4677 /* Remove and insert the element in the skip list with new score */
4678 deleted = zslDelete(zs->zsl,*oldscore,ele);
4679 redisAssert(deleted != 0);
4680 zslInsert(zs->zsl,*score,ele);
4681 incrRefCount(ele);
4682 /* Update the score in the hash table */
4683 dictReplace(zs->dict,ele,score);
4684 server.dirty++;
4685 } else {
4686 zfree(score);
4687 }
4688 if (doincrement)
4689 addReplyDouble(c,*score);
4690 else
4691 addReply(c,shared.czero);
4692 }
4693 }
4694
4695 static void zaddCommand(redisClient *c) {
4696 double scoreval;
4697
4698 scoreval = strtod(c->argv[2]->ptr,NULL);
4699 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4700 }
4701
4702 static void zincrbyCommand(redisClient *c) {
4703 double scoreval;
4704
4705 scoreval = strtod(c->argv[2]->ptr,NULL);
4706 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4707 }
4708
4709 static void zremCommand(redisClient *c) {
4710 robj *zsetobj;
4711 zset *zs;
4712
4713 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4714 if (zsetobj == NULL) {
4715 addReply(c,shared.czero);
4716 } else {
4717 dictEntry *de;
4718 double *oldscore;
4719 int deleted;
4720
4721 if (zsetobj->type != REDIS_ZSET) {
4722 addReply(c,shared.wrongtypeerr);
4723 return;
4724 }
4725 zs = zsetobj->ptr;
4726 de = dictFind(zs->dict,c->argv[2]);
4727 if (de == NULL) {
4728 addReply(c,shared.czero);
4729 return;
4730 }
4731 /* Delete from the skiplist */
4732 oldscore = dictGetEntryVal(de);
4733 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4734 redisAssert(deleted != 0);
4735
4736 /* Delete from the hash table */
4737 dictDelete(zs->dict,c->argv[2]);
4738 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4739 server.dirty++;
4740 addReply(c,shared.cone);
4741 }
4742 }
4743
4744 static void zremrangebyscoreCommand(redisClient *c) {
4745 double min = strtod(c->argv[2]->ptr,NULL);
4746 double max = strtod(c->argv[3]->ptr,NULL);
4747 robj *zsetobj;
4748 zset *zs;
4749
4750 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4751 if (zsetobj == NULL) {
4752 addReply(c,shared.czero);
4753 } else {
4754 long deleted;
4755
4756 if (zsetobj->type != REDIS_ZSET) {
4757 addReply(c,shared.wrongtypeerr);
4758 return;
4759 }
4760 zs = zsetobj->ptr;
4761 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4762 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4763 server.dirty += deleted;
4764 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4765 }
4766 }
4767
4768 static void zrangeGenericCommand(redisClient *c, int reverse) {
4769 robj *o;
4770 int start = atoi(c->argv[2]->ptr);
4771 int end = atoi(c->argv[3]->ptr);
4772 int withscores = 0;
4773
4774 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4775 withscores = 1;
4776 } else if (c->argc >= 5) {
4777 addReply(c,shared.syntaxerr);
4778 return;
4779 }
4780
4781 o = lookupKeyRead(c->db,c->argv[1]);
4782 if (o == NULL) {
4783 addReply(c,shared.nullmultibulk);
4784 } else {
4785 if (o->type != REDIS_ZSET) {
4786 addReply(c,shared.wrongtypeerr);
4787 } else {
4788 zset *zsetobj = o->ptr;
4789 zskiplist *zsl = zsetobj->zsl;
4790 zskiplistNode *ln;
4791
4792 int llen = zsl->length;
4793 int rangelen, j;
4794 robj *ele;
4795
4796 /* convert negative indexes */
4797 if (start < 0) start = llen+start;
4798 if (end < 0) end = llen+end;
4799 if (start < 0) start = 0;
4800 if (end < 0) end = 0;
4801
4802 /* indexes sanity checks */
4803 if (start > end || start >= llen) {
4804 /* Out of range start or start > end result in empty list */
4805 addReply(c,shared.emptymultibulk);
4806 return;
4807 }
4808 if (end >= llen) end = llen-1;
4809 rangelen = (end-start)+1;
4810
4811 /* Return the result in form of a multi-bulk reply */
4812 if (reverse) {
4813 ln = zsl->tail;
4814 while (start--)
4815 ln = ln->backward;
4816 } else {
4817 ln = zsl->header->forward[0];
4818 while (start--)
4819 ln = ln->forward[0];
4820 }
4821
4822 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4823 withscores ? (rangelen*2) : rangelen));
4824 for (j = 0; j < rangelen; j++) {
4825 ele = ln->obj;
4826 addReplyBulkLen(c,ele);
4827 addReply(c,ele);
4828 addReply(c,shared.crlf);
4829 if (withscores)
4830 addReplyDouble(c,ln->score);
4831 ln = reverse ? ln->backward : ln->forward[0];
4832 }
4833 }
4834 }
4835 }
4836
4837 static void zrangeCommand(redisClient *c) {
4838 zrangeGenericCommand(c,0);
4839 }
4840
4841 static void zrevrangeCommand(redisClient *c) {
4842 zrangeGenericCommand(c,1);
4843 }
4844
4845 static void zrangebyscoreCommand(redisClient *c) {
4846 robj *o;
4847 double min = strtod(c->argv[2]->ptr,NULL);
4848 double max = strtod(c->argv[3]->ptr,NULL);
4849 int offset = 0, limit = -1;
4850
4851 if (c->argc != 4 && c->argc != 7) {
4852 addReplySds(c,
4853 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4854 return;
4855 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4856 addReply(c,shared.syntaxerr);
4857 return;
4858 } else if (c->argc == 7) {
4859 offset = atoi(c->argv[5]->ptr);
4860 limit = atoi(c->argv[6]->ptr);
4861 if (offset < 0) offset = 0;
4862 }
4863
4864 o = lookupKeyRead(c->db,c->argv[1]);
4865 if (o == NULL) {
4866 addReply(c,shared.nullmultibulk);
4867 } else {
4868 if (o->type != REDIS_ZSET) {
4869 addReply(c,shared.wrongtypeerr);
4870 } else {
4871 zset *zsetobj = o->ptr;
4872 zskiplist *zsl = zsetobj->zsl;
4873 zskiplistNode *ln;
4874 robj *ele, *lenobj;
4875 unsigned int rangelen = 0;
4876
4877 /* Get the first node with the score >= min */
4878 ln = zslFirstWithScore(zsl,min);
4879 if (ln == NULL) {
4880 /* No element matching the speciifed interval */
4881 addReply(c,shared.emptymultibulk);
4882 return;
4883 }
4884
4885 /* We don't know in advance how many matching elements there
4886 * are in the list, so we push this object that will represent
4887 * the multi-bulk length in the output buffer, and will "fix"
4888 * it later */
4889 lenobj = createObject(REDIS_STRING,NULL);
4890 addReply(c,lenobj);
4891 decrRefCount(lenobj);
4892
4893 while(ln && ln->score <= max) {
4894 if (offset) {
4895 offset--;
4896 ln = ln->forward[0];
4897 continue;
4898 }
4899 if (limit == 0) break;
4900 ele = ln->obj;
4901 addReplyBulkLen(c,ele);
4902 addReply(c,ele);
4903 addReply(c,shared.crlf);
4904 ln = ln->forward[0];
4905 rangelen++;
4906 if (limit > 0) limit--;
4907 }
4908 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4909 }
4910 }
4911 }
4912
4913 static void zcardCommand(redisClient *c) {
4914 robj *o;
4915 zset *zs;
4916
4917 o = lookupKeyRead(c->db,c->argv[1]);
4918 if (o == NULL) {
4919 addReply(c,shared.czero);
4920 return;
4921 } else {
4922 if (o->type != REDIS_ZSET) {
4923 addReply(c,shared.wrongtypeerr);
4924 } else {
4925 zs = o->ptr;
4926 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4927 }
4928 }
4929 }
4930
4931 static void zscoreCommand(redisClient *c) {
4932 robj *o;
4933 zset *zs;
4934
4935 o = lookupKeyRead(c->db,c->argv[1]);
4936 if (o == NULL) {
4937 addReply(c,shared.nullbulk);
4938 return;
4939 } else {
4940 if (o->type != REDIS_ZSET) {
4941 addReply(c,shared.wrongtypeerr);
4942 } else {
4943 dictEntry *de;
4944
4945 zs = o->ptr;
4946 de = dictFind(zs->dict,c->argv[2]);
4947 if (!de) {
4948 addReply(c,shared.nullbulk);
4949 } else {
4950 double *score = dictGetEntryVal(de);
4951
4952 addReplyDouble(c,*score);
4953 }
4954 }
4955 }
4956 }
4957
4958 /* ========================= Non type-specific commands ==================== */
4959
4960 static void flushdbCommand(redisClient *c) {
4961 server.dirty += dictSize(c->db->dict);
4962 dictEmpty(c->db->dict);
4963 dictEmpty(c->db->expires);
4964 addReply(c,shared.ok);
4965 }
4966
4967 static void flushallCommand(redisClient *c) {
4968 server.dirty += emptyDb();
4969 addReply(c,shared.ok);
4970 rdbSave(server.dbfilename);
4971 server.dirty++;
4972 }
4973
4974 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4975 redisSortOperation *so = zmalloc(sizeof(*so));
4976 so->type = type;
4977 so->pattern = pattern;
4978 return so;
4979 }
4980
4981 /* Return the value associated to the key with a name obtained
4982 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4983 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4984 char *p;
4985 sds spat, ssub;
4986 robj keyobj;
4987 int prefixlen, sublen, postfixlen;
4988 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4989 struct {
4990 long len;
4991 long free;
4992 char buf[REDIS_SORTKEY_MAX+1];
4993 } keyname;
4994
4995 /* If the pattern is "#" return the substitution object itself in order
4996 * to implement the "SORT ... GET #" feature. */
4997 spat = pattern->ptr;
4998 if (spat[0] == '#' && spat[1] == '\0') {
4999 return subst;
5000 }
5001
5002 /* The substitution object may be specially encoded. If so we create
5003 * a decoded object on the fly. Otherwise getDecodedObject will just
5004 * increment the ref count, that we'll decrement later. */
5005 subst = getDecodedObject(subst);
5006
5007 ssub = subst->ptr;
5008 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
5009 p = strchr(spat,'*');
5010 if (!p) {
5011 decrRefCount(subst);
5012 return NULL;
5013 }
5014
5015 prefixlen = p-spat;
5016 sublen = sdslen(ssub);
5017 postfixlen = sdslen(spat)-(prefixlen+1);
5018 memcpy(keyname.buf,spat,prefixlen);
5019 memcpy(keyname.buf+prefixlen,ssub,sublen);
5020 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
5021 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
5022 keyname.len = prefixlen+sublen+postfixlen;
5023
5024 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
5025 decrRefCount(subst);
5026
5027 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5028 return lookupKeyRead(db,&keyobj);
5029 }
5030
5031 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5032 * the additional parameter is not standard but a BSD-specific we have to
5033 * pass sorting parameters via the global 'server' structure */
5034 static int sortCompare(const void *s1, const void *s2) {
5035 const redisSortObject *so1 = s1, *so2 = s2;
5036 int cmp;
5037
5038 if (!server.sort_alpha) {
5039 /* Numeric sorting. Here it's trivial as we precomputed scores */
5040 if (so1->u.score > so2->u.score) {
5041 cmp = 1;
5042 } else if (so1->u.score < so2->u.score) {
5043 cmp = -1;
5044 } else {
5045 cmp = 0;
5046 }
5047 } else {
5048 /* Alphanumeric sorting */
5049 if (server.sort_bypattern) {
5050 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5051 /* At least one compare object is NULL */
5052 if (so1->u.cmpobj == so2->u.cmpobj)
5053 cmp = 0;
5054 else if (so1->u.cmpobj == NULL)
5055 cmp = -1;
5056 else
5057 cmp = 1;
5058 } else {
5059 /* We have both the objects, use strcoll */
5060 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5061 }
5062 } else {
5063 /* Compare elements directly */
5064 robj *dec1, *dec2;
5065
5066 dec1 = getDecodedObject(so1->obj);
5067 dec2 = getDecodedObject(so2->obj);
5068 cmp = strcoll(dec1->ptr,dec2->ptr);
5069 decrRefCount(dec1);
5070 decrRefCount(dec2);
5071 }
5072 }
5073 return server.sort_desc ? -cmp : cmp;
5074 }
5075
5076 /* The SORT command is the most complex command in Redis. Warning: this code
5077 * is optimized for speed and a bit less for readability */
5078 static void sortCommand(redisClient *c) {
5079 list *operations;
5080 int outputlen = 0;
5081 int desc = 0, alpha = 0;
5082 int limit_start = 0, limit_count = -1, start, end;
5083 int j, dontsort = 0, vectorlen;
5084 int getop = 0; /* GET operation counter */
5085 robj *sortval, *sortby = NULL, *storekey = NULL;
5086 redisSortObject *vector; /* Resulting vector to sort */
5087
5088 /* Lookup the key to sort. It must be of the right types */
5089 sortval = lookupKeyRead(c->db,c->argv[1]);
5090 if (sortval == NULL) {
5091 addReply(c,shared.nullmultibulk);
5092 return;
5093 }
5094 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5095 sortval->type != REDIS_ZSET)
5096 {
5097 addReply(c,shared.wrongtypeerr);
5098 return;
5099 }
5100
5101 /* Create a list of operations to perform for every sorted element.
5102 * Operations can be GET/DEL/INCR/DECR */
5103 operations = listCreate();
5104 listSetFreeMethod(operations,zfree);
5105 j = 2;
5106
5107 /* Now we need to protect sortval incrementing its count, in the future
5108 * SORT may have options able to overwrite/delete keys during the sorting
5109 * and the sorted key itself may get destroied */
5110 incrRefCount(sortval);
5111
5112 /* The SORT command has an SQL-alike syntax, parse it */
5113 while(j < c->argc) {
5114 int leftargs = c->argc-j-1;
5115 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5116 desc = 0;
5117 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5118 desc = 1;
5119 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5120 alpha = 1;
5121 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5122 limit_start = atoi(c->argv[j+1]->ptr);
5123 limit_count = atoi(c->argv[j+2]->ptr);
5124 j+=2;
5125 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5126 storekey = c->argv[j+1];
5127 j++;
5128 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5129 sortby = c->argv[j+1];
5130 /* If the BY pattern does not contain '*', i.e. it is constant,
5131 * we don't need to sort nor to lookup the weight keys. */
5132 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5133 j++;
5134 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5135 listAddNodeTail(operations,createSortOperation(
5136 REDIS_SORT_GET,c->argv[j+1]));
5137 getop++;
5138 j++;
5139 } else {
5140 decrRefCount(sortval);
5141 listRelease(operations);
5142 addReply(c,shared.syntaxerr);
5143 return;
5144 }
5145 j++;
5146 }
5147
5148 /* Load the sorting vector with all the objects to sort */
5149 switch(sortval->type) {
5150 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5151 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5152 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5153 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5154 }
5155 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5156 j = 0;
5157
5158 if (sortval->type == REDIS_LIST) {
5159 list *list = sortval->ptr;
5160 listNode *ln;
5161
5162 listRewind(list);
5163 while((ln = listYield(list))) {
5164 robj *ele = ln->value;
5165 vector[j].obj = ele;
5166 vector[j].u.score = 0;
5167 vector[j].u.cmpobj = NULL;
5168 j++;
5169 }
5170 } else {
5171 dict *set;
5172 dictIterator *di;
5173 dictEntry *setele;
5174
5175 if (sortval->type == REDIS_SET) {
5176 set = sortval->ptr;
5177 } else {
5178 zset *zs = sortval->ptr;
5179 set = zs->dict;
5180 }
5181
5182 di = dictGetIterator(set);
5183 while((setele = dictNext(di)) != NULL) {
5184 vector[j].obj = dictGetEntryKey(setele);
5185 vector[j].u.score = 0;
5186 vector[j].u.cmpobj = NULL;
5187 j++;
5188 }
5189 dictReleaseIterator(di);
5190 }
5191 redisAssert(j == vectorlen);
5192
5193 /* Now it's time to load the right scores in the sorting vector */
5194 if (dontsort == 0) {
5195 for (j = 0; j < vectorlen; j++) {
5196 if (sortby) {
5197 robj *byval;
5198
5199 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5200 if (!byval || byval->type != REDIS_STRING) continue;
5201 if (alpha) {
5202 vector[j].u.cmpobj = getDecodedObject(byval);
5203 } else {
5204 if (byval->encoding == REDIS_ENCODING_RAW) {
5205 vector[j].u.score = strtod(byval->ptr,NULL);
5206 } else {
5207 /* Don't need to decode the object if it's
5208 * integer-encoded (the only encoding supported) so
5209 * far. We can just cast it */
5210 if (byval->encoding == REDIS_ENCODING_INT) {
5211 vector[j].u.score = (long)byval->ptr;
5212 } else
5213 redisAssert(1 != 1);
5214 }
5215 }
5216 } else {
5217 if (!alpha) {
5218 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5219 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5220 else {
5221 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5222 vector[j].u.score = (long) vector[j].obj->ptr;
5223 else
5224 redisAssert(1 != 1);
5225 }
5226 }
5227 }
5228 }
5229 }
5230
5231 /* We are ready to sort the vector... perform a bit of sanity check
5232 * on the LIMIT option too. We'll use a partial version of quicksort. */
5233 start = (limit_start < 0) ? 0 : limit_start;
5234 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5235 if (start >= vectorlen) {
5236 start = vectorlen-1;
5237 end = vectorlen-2;
5238 }
5239 if (end >= vectorlen) end = vectorlen-1;
5240
5241 if (dontsort == 0) {
5242 server.sort_desc = desc;
5243 server.sort_alpha = alpha;
5244 server.sort_bypattern = sortby ? 1 : 0;
5245 if (sortby && (start != 0 || end != vectorlen-1))
5246 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5247 else
5248 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5249 }
5250
5251 /* Send command output to the output buffer, performing the specified
5252 * GET/DEL/INCR/DECR operations if any. */
5253 outputlen = getop ? getop*(end-start+1) : end-start+1;
5254 if (storekey == NULL) {
5255 /* STORE option not specified, sent the sorting result to client */
5256 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5257 for (j = start; j <= end; j++) {
5258 listNode *ln;
5259 if (!getop) {
5260 addReplyBulkLen(c,vector[j].obj);
5261 addReply(c,vector[j].obj);
5262 addReply(c,shared.crlf);
5263 }
5264 listRewind(operations);
5265 while((ln = listYield(operations))) {
5266 redisSortOperation *sop = ln->value;
5267 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5268 vector[j].obj);
5269
5270 if (sop->type == REDIS_SORT_GET) {
5271 if (!val || val->type != REDIS_STRING) {
5272 addReply(c,shared.nullbulk);
5273 } else {
5274 addReplyBulkLen(c,val);
5275 addReply(c,val);
5276 addReply(c,shared.crlf);
5277 }
5278 } else {
5279 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5280 }
5281 }
5282 }
5283 } else {
5284 robj *listObject = createListObject();
5285 list *listPtr = (list*) listObject->ptr;
5286
5287 /* STORE option specified, set the sorting result as a List object */
5288 for (j = start; j <= end; j++) {
5289 listNode *ln;
5290 if (!getop) {
5291 listAddNodeTail(listPtr,vector[j].obj);
5292 incrRefCount(vector[j].obj);
5293 }
5294 listRewind(operations);
5295 while((ln = listYield(operations))) {
5296 redisSortOperation *sop = ln->value;
5297 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5298 vector[j].obj);
5299
5300 if (sop->type == REDIS_SORT_GET) {
5301 if (!val || val->type != REDIS_STRING) {
5302 listAddNodeTail(listPtr,createStringObject("",0));
5303 } else {
5304 listAddNodeTail(listPtr,val);
5305 incrRefCount(val);
5306 }
5307 } else {
5308 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5309 }
5310 }
5311 }
5312 if (dictReplace(c->db->dict,storekey,listObject)) {
5313 incrRefCount(storekey);
5314 }
5315 /* Note: we add 1 because the DB is dirty anyway since even if the
5316 * SORT result is empty a new key is set and maybe the old content
5317 * replaced. */
5318 server.dirty += 1+outputlen;
5319 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5320 }
5321
5322 /* Cleanup */
5323 decrRefCount(sortval);
5324 listRelease(operations);
5325 for (j = 0; j < vectorlen; j++) {
5326 if (sortby && alpha && vector[j].u.cmpobj)
5327 decrRefCount(vector[j].u.cmpobj);
5328 }
5329 zfree(vector);
5330 }
5331
5332 /* Create the string returned by the INFO command. This is decoupled
5333 * by the INFO command itself as we need to report the same information
5334 * on memory corruption problems. */
5335 static sds genRedisInfoString(void) {
5336 sds info;
5337 time_t uptime = time(NULL)-server.stat_starttime;
5338 int j;
5339
5340 info = sdscatprintf(sdsempty(),
5341 "redis_version:%s\r\n"
5342 "arch_bits:%s\r\n"
5343 "multiplexing_api:%s\r\n"
5344 "uptime_in_seconds:%ld\r\n"
5345 "uptime_in_days:%ld\r\n"
5346 "connected_clients:%d\r\n"
5347 "connected_slaves:%d\r\n"
5348 "blocked_clients:%d\r\n"
5349 "used_memory:%zu\r\n"
5350 "changes_since_last_save:%lld\r\n"
5351 "bgsave_in_progress:%d\r\n"
5352 "last_save_time:%ld\r\n"
5353 "bgrewriteaof_in_progress:%d\r\n"
5354 "total_connections_received:%lld\r\n"
5355 "total_commands_processed:%lld\r\n"
5356 "role:%s\r\n"
5357 ,REDIS_VERSION,
5358 (sizeof(long) == 8) ? "64" : "32",
5359 aeGetApiName(),
5360 uptime,
5361 uptime/(3600*24),
5362 listLength(server.clients)-listLength(server.slaves),
5363 listLength(server.slaves),
5364 server.blockedclients,
5365 server.usedmemory,
5366 server.dirty,
5367 server.bgsavechildpid != -1,
5368 server.lastsave,
5369 server.bgrewritechildpid != -1,
5370 server.stat_numconnections,
5371 server.stat_numcommands,
5372 server.masterhost == NULL ? "master" : "slave"
5373 );
5374 if (server.masterhost) {
5375 info = sdscatprintf(info,
5376 "master_host:%s\r\n"
5377 "master_port:%d\r\n"
5378 "master_link_status:%s\r\n"
5379 "master_last_io_seconds_ago:%d\r\n"
5380 ,server.masterhost,
5381 server.masterport,
5382 (server.replstate == REDIS_REPL_CONNECTED) ?
5383 "up" : "down",
5384 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5385 );
5386 }
5387 for (j = 0; j < server.dbnum; j++) {
5388 long long keys, vkeys;
5389
5390 keys = dictSize(server.db[j].dict);
5391 vkeys = dictSize(server.db[j].expires);
5392 if (keys || vkeys) {
5393 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5394 j, keys, vkeys);
5395 }
5396 }
5397 return info;
5398 }
5399
5400 static void infoCommand(redisClient *c) {
5401 sds info = genRedisInfoString();
5402 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5403 (unsigned long)sdslen(info)));
5404 addReplySds(c,info);
5405 addReply(c,shared.crlf);
5406 }
5407
5408 static void monitorCommand(redisClient *c) {
5409 /* ignore MONITOR if aleady slave or in monitor mode */
5410 if (c->flags & REDIS_SLAVE) return;
5411
5412 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5413 c->slaveseldb = 0;
5414 listAddNodeTail(server.monitors,c);
5415 addReply(c,shared.ok);
5416 }
5417
5418 /* ================================= Expire ================================= */
5419 static int removeExpire(redisDb *db, robj *key) {
5420 if (dictDelete(db->expires,key) == DICT_OK) {
5421 return 1;
5422 } else {
5423 return 0;
5424 }
5425 }
5426
5427 static int setExpire(redisDb *db, robj *key, time_t when) {
5428 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5429 return 0;
5430 } else {
5431 incrRefCount(key);
5432 return 1;
5433 }
5434 }
5435
5436 /* Return the expire time of the specified key, or -1 if no expire
5437 * is associated with this key (i.e. the key is non volatile) */
5438 static time_t getExpire(redisDb *db, robj *key) {
5439 dictEntry *de;
5440
5441 /* No expire? return ASAP */
5442 if (dictSize(db->expires) == 0 ||
5443 (de = dictFind(db->expires,key)) == NULL) return -1;
5444
5445 return (time_t) dictGetEntryVal(de);
5446 }
5447
5448 static int expireIfNeeded(redisDb *db, robj *key) {
5449 time_t when;
5450 dictEntry *de;
5451
5452 /* No expire? return ASAP */
5453 if (dictSize(db->expires) == 0 ||
5454 (de = dictFind(db->expires,key)) == NULL) return 0;
5455
5456 /* Lookup the expire */
5457 when = (time_t) dictGetEntryVal(de);
5458 if (time(NULL) <= when) return 0;
5459
5460 /* Delete the key */
5461 dictDelete(db->expires,key);
5462 return dictDelete(db->dict,key) == DICT_OK;
5463 }
5464
5465 static int deleteIfVolatile(redisDb *db, robj *key) {
5466 dictEntry *de;
5467
5468 /* No expire? return ASAP */
5469 if (dictSize(db->expires) == 0 ||
5470 (de = dictFind(db->expires,key)) == NULL) return 0;
5471
5472 /* Delete the key */
5473 server.dirty++;
5474 dictDelete(db->expires,key);
5475 return dictDelete(db->dict,key) == DICT_OK;
5476 }
5477
5478 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5479 dictEntry *de;
5480
5481 de = dictFind(c->db->dict,key);
5482 if (de == NULL) {
5483 addReply(c,shared.czero);
5484 return;
5485 }
5486 if (seconds < 0) {
5487 if (deleteKey(c->db,key)) server.dirty++;
5488 addReply(c, shared.cone);
5489 return;
5490 } else {
5491 time_t when = time(NULL)+seconds;
5492 if (setExpire(c->db,key,when)) {
5493 addReply(c,shared.cone);
5494 server.dirty++;
5495 } else {
5496 addReply(c,shared.czero);
5497 }
5498 return;
5499 }
5500 }
5501
5502 static void expireCommand(redisClient *c) {
5503 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5504 }
5505
5506 static void expireatCommand(redisClient *c) {
5507 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5508 }
5509
5510 static void ttlCommand(redisClient *c) {
5511 time_t expire;
5512 int ttl = -1;
5513
5514 expire = getExpire(c->db,c->argv[1]);
5515 if (expire != -1) {
5516 ttl = (int) (expire-time(NULL));
5517 if (ttl < 0) ttl = -1;
5518 }
5519 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5520 }
5521
5522 /* ================================ MULTI/EXEC ============================== */
5523
5524 /* Client state initialization for MULTI/EXEC */
5525 static void initClientMultiState(redisClient *c) {
5526 c->mstate.commands = NULL;
5527 c->mstate.count = 0;
5528 }
5529
5530 /* Release all the resources associated with MULTI/EXEC state */
5531 static void freeClientMultiState(redisClient *c) {
5532 int j;
5533
5534 for (j = 0; j < c->mstate.count; j++) {
5535 int i;
5536 multiCmd *mc = c->mstate.commands+j;
5537
5538 for (i = 0; i < mc->argc; i++)
5539 decrRefCount(mc->argv[i]);
5540 zfree(mc->argv);
5541 }
5542 zfree(c->mstate.commands);
5543 }
5544
5545 /* Add a new command into the MULTI commands queue */
5546 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5547 multiCmd *mc;
5548 int j;
5549
5550 c->mstate.commands = zrealloc(c->mstate.commands,
5551 sizeof(multiCmd)*(c->mstate.count+1));
5552 mc = c->mstate.commands+c->mstate.count;
5553 mc->cmd = cmd;
5554 mc->argc = c->argc;
5555 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5556 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5557 for (j = 0; j < c->argc; j++)
5558 incrRefCount(mc->argv[j]);
5559 c->mstate.count++;
5560 }
5561
5562 static void multiCommand(redisClient *c) {
5563 c->flags |= REDIS_MULTI;
5564 addReply(c,shared.ok);
5565 }
5566
5567 static void execCommand(redisClient *c) {
5568 int j;
5569 robj **orig_argv;
5570 int orig_argc;
5571
5572 if (!(c->flags & REDIS_MULTI)) {
5573 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5574 return;
5575 }
5576
5577 orig_argv = c->argv;
5578 orig_argc = c->argc;
5579 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5580 for (j = 0; j < c->mstate.count; j++) {
5581 c->argc = c->mstate.commands[j].argc;
5582 c->argv = c->mstate.commands[j].argv;
5583 call(c,c->mstate.commands[j].cmd);
5584 }
5585 c->argv = orig_argv;
5586 c->argc = orig_argc;
5587 freeClientMultiState(c);
5588 initClientMultiState(c);
5589 c->flags &= (~REDIS_MULTI);
5590 }
5591
5592 /* =========================== Blocking Operations ========================= */
5593
5594 /* Currently Redis blocking operations support is limited to list POP ops,
5595 * so the current implementation is not fully generic, but it is also not
5596 * completely specific so it will not require a rewrite to support new
5597 * kind of blocking operations in the future.
5598 *
5599 * Still it's important to note that list blocking operations can be already
5600 * used as a notification mechanism in order to implement other blocking
5601 * operations at application level, so there must be a very strong evidence
5602 * of usefulness and generality before new blocking operations are implemented.
5603 *
5604 * This is how the current blocking POP works, we use BLPOP as example:
5605 * - If the user calls BLPOP and the key exists and contains a non empty list
5606 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5607 * if there is not to block.
5608 * - If instead BLPOP is called and the key does not exists or the list is
5609 * empty we need to block. In order to do so we remove the notification for
5610 * new data to read in the client socket (so that we'll not serve new
5611 * requests if the blocking request is not served). Also we put the client
5612 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5613 * blocking for this keys.
5614 * - If a PUSH operation against a key with blocked clients waiting is
5615 * performed, we serve the first in the list: basically instead to push
5616 * the new element inside the list we return it to the (first / oldest)
5617 * blocking client, unblock the client, and remove it form the list.
5618 *
5619 * The above comment and the source code should be enough in order to understand
5620 * the implementation and modify / fix it later.
5621 */
5622
5623 /* Set a client in blocking mode for the specified key, with the specified
5624 * timeout */
5625 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5626 dictEntry *de;
5627 list *l;
5628 int j;
5629
5630 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5631 c->blockingkeysnum = numkeys;
5632 c->blockingto = timeout;
5633 for (j = 0; j < numkeys; j++) {
5634 /* Add the key in the client structure, to map clients -> keys */
5635 c->blockingkeys[j] = keys[j];
5636 incrRefCount(keys[j]);
5637
5638 /* And in the other "side", to map keys -> clients */
5639 de = dictFind(c->db->blockingkeys,keys[j]);
5640 if (de == NULL) {
5641 int retval;
5642
5643 /* For every key we take a list of clients blocked for it */
5644 l = listCreate();
5645 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5646 incrRefCount(keys[j]);
5647 assert(retval == DICT_OK);
5648 } else {
5649 l = dictGetEntryVal(de);
5650 }
5651 listAddNodeTail(l,c);
5652 }
5653 /* Mark the client as a blocked client */
5654 c->flags |= REDIS_BLOCKED;
5655 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5656 server.blockedclients++;
5657 }
5658
5659 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5660 static void unblockClient(redisClient *c) {
5661 dictEntry *de;
5662 list *l;
5663 int j;
5664
5665 assert(c->blockingkeys != NULL);
5666 /* The client may wait for multiple keys, so unblock it for every key. */
5667 for (j = 0; j < c->blockingkeysnum; j++) {
5668 /* Remove this client from the list of clients waiting for this key. */
5669 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5670 assert(de != NULL);
5671 l = dictGetEntryVal(de);
5672 listDelNode(l,listSearchKey(l,c));
5673 /* If the list is empty we need to remove it to avoid wasting memory */
5674 if (listLength(l) == 0)
5675 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5676 decrRefCount(c->blockingkeys[j]);
5677 }
5678 /* Cleanup the client structure */
5679 zfree(c->blockingkeys);
5680 c->blockingkeys = NULL;
5681 c->flags &= (~REDIS_BLOCKED);
5682 server.blockedclients--;
5683 /* Ok now we are ready to get read events from socket, note that we
5684 * can't trap errors here as it's possible that unblockClients() is
5685 * called from freeClient() itself, and the only thing we can do
5686 * if we failed to register the READABLE event is to kill the client.
5687 * Still the following function should never fail in the real world as
5688 * we are sure the file descriptor is sane, and we exit on out of mem. */
5689 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5690 /* As a final step we want to process data if there is some command waiting
5691 * in the input buffer. Note that this is safe even if unblockClient()
5692 * gets called from freeClient() because freeClient() will be smart
5693 * enough to call this function *after* c->querybuf was set to NULL. */
5694 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5695 }
5696
5697 /* This should be called from any function PUSHing into lists.
5698 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5699 * 'ele' is the element pushed.
5700 *
5701 * If the function returns 0 there was no client waiting for a list push
5702 * against this key.
5703 *
5704 * If the function returns 1 there was a client waiting for a list push
5705 * against this key, the element was passed to this client thus it's not
5706 * needed to actually add it to the list and the caller should return asap. */
5707 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5708 struct dictEntry *de;
5709 redisClient *receiver;
5710 list *l;
5711 listNode *ln;
5712
5713 de = dictFind(c->db->blockingkeys,key);
5714 if (de == NULL) return 0;
5715 l = dictGetEntryVal(de);
5716 ln = listFirst(l);
5717 assert(ln != NULL);
5718 receiver = ln->value;
5719
5720 addReplySds(receiver,sdsnew("*2\r\n"));
5721 addReplyBulkLen(receiver,key);
5722 addReply(receiver,key);
5723 addReply(receiver,shared.crlf);
5724 addReplyBulkLen(receiver,ele);
5725 addReply(receiver,ele);
5726 addReply(receiver,shared.crlf);
5727 unblockClient(receiver);
5728 return 1;
5729 }
5730
5731 /* Blocking RPOP/LPOP */
5732 static void blockingPopGenericCommand(redisClient *c, int where) {
5733 robj *o;
5734 time_t timeout;
5735 int j;
5736
5737 for (j = 1; j < c->argc-1; j++) {
5738 o = lookupKeyWrite(c->db,c->argv[j]);
5739 if (o != NULL) {
5740 if (o->type != REDIS_LIST) {
5741 addReply(c,shared.wrongtypeerr);
5742 return;
5743 } else {
5744 list *list = o->ptr;
5745 if (listLength(list) != 0) {
5746 /* If the list contains elements fall back to the usual
5747 * non-blocking POP operation */
5748 robj *argv[2], **orig_argv;
5749 int orig_argc;
5750
5751 /* We need to alter the command arguments before to call
5752 * popGenericCommand() as the command takes a single key. */
5753 orig_argv = c->argv;
5754 orig_argc = c->argc;
5755 argv[1] = c->argv[j];
5756 c->argv = argv;
5757 c->argc = 2;
5758
5759 /* Also the return value is different, we need to output
5760 * the multi bulk reply header and the key name. The
5761 * "real" command will add the last element (the value)
5762 * for us. If this souds like an hack to you it's just
5763 * because it is... */
5764 addReplySds(c,sdsnew("*2\r\n"));
5765 addReplyBulkLen(c,argv[1]);
5766 addReply(c,argv[1]);
5767 addReply(c,shared.crlf);
5768 popGenericCommand(c,where);
5769
5770 /* Fix the client structure with the original stuff */
5771 c->argv = orig_argv;
5772 c->argc = orig_argc;
5773 return;
5774 }
5775 }
5776 }
5777 }
5778 /* If the list is empty or the key does not exists we must block */
5779 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5780 if (timeout > 0) timeout += time(NULL);
5781 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5782 }
5783
5784 static void blpopCommand(redisClient *c) {
5785 blockingPopGenericCommand(c,REDIS_HEAD);
5786 }
5787
5788 static void brpopCommand(redisClient *c) {
5789 blockingPopGenericCommand(c,REDIS_TAIL);
5790 }
5791
5792 /* =============================== Replication ============================= */
5793
5794 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5795 ssize_t nwritten, ret = size;
5796 time_t start = time(NULL);
5797
5798 timeout++;
5799 while(size) {
5800 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5801 nwritten = write(fd,ptr,size);
5802 if (nwritten == -1) return -1;
5803 ptr += nwritten;
5804 size -= nwritten;
5805 }
5806 if ((time(NULL)-start) > timeout) {
5807 errno = ETIMEDOUT;
5808 return -1;
5809 }
5810 }
5811 return ret;
5812 }
5813
5814 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5815 ssize_t nread, totread = 0;
5816 time_t start = time(NULL);
5817
5818 timeout++;
5819 while(size) {
5820 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5821 nread = read(fd,ptr,size);
5822 if (nread == -1) return -1;
5823 ptr += nread;
5824 size -= nread;
5825 totread += nread;
5826 }
5827 if ((time(NULL)-start) > timeout) {
5828 errno = ETIMEDOUT;
5829 return -1;
5830 }
5831 }
5832 return totread;
5833 }
5834
5835 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5836 ssize_t nread = 0;
5837
5838 size--;
5839 while(size) {
5840 char c;
5841
5842 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5843 if (c == '\n') {
5844 *ptr = '\0';
5845 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5846 return nread;
5847 } else {
5848 *ptr++ = c;
5849 *ptr = '\0';
5850 nread++;
5851 }
5852 }
5853 return nread;
5854 }
5855
5856 static void syncCommand(redisClient *c) {
5857 /* ignore SYNC if aleady slave or in monitor mode */
5858 if (c->flags & REDIS_SLAVE) return;
5859
5860 /* SYNC can't be issued when the server has pending data to send to
5861 * the client about already issued commands. We need a fresh reply
5862 * buffer registering the differences between the BGSAVE and the current
5863 * dataset, so that we can copy to other slaves if needed. */
5864 if (listLength(c->reply) != 0) {
5865 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5866 return;
5867 }
5868
5869 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5870 /* Here we need to check if there is a background saving operation
5871 * in progress, or if it is required to start one */
5872 if (server.bgsavechildpid != -1) {
5873 /* Ok a background save is in progress. Let's check if it is a good
5874 * one for replication, i.e. if there is another slave that is
5875 * registering differences since the server forked to save */
5876 redisClient *slave;
5877 listNode *ln;
5878
5879 listRewind(server.slaves);
5880 while((ln = listYield(server.slaves))) {
5881 slave = ln->value;
5882 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5883 }
5884 if (ln) {
5885 /* Perfect, the server is already registering differences for
5886 * another slave. Set the right state, and copy the buffer. */
5887 listRelease(c->reply);
5888 c->reply = listDup(slave->reply);
5889 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5890 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5891 } else {
5892 /* No way, we need to wait for the next BGSAVE in order to
5893 * register differences */
5894 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5895 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5896 }
5897 } else {
5898 /* Ok we don't have a BGSAVE in progress, let's start one */
5899 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5900 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5901 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5902 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5903 return;
5904 }
5905 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5906 }
5907 c->repldbfd = -1;
5908 c->flags |= REDIS_SLAVE;
5909 c->slaveseldb = 0;
5910 listAddNodeTail(server.slaves,c);
5911 return;
5912 }
5913
5914 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5915 redisClient *slave = privdata;
5916 REDIS_NOTUSED(el);
5917 REDIS_NOTUSED(mask);
5918 char buf[REDIS_IOBUF_LEN];
5919 ssize_t nwritten, buflen;
5920
5921 if (slave->repldboff == 0) {
5922 /* Write the bulk write count before to transfer the DB. In theory here
5923 * we don't know how much room there is in the output buffer of the
5924 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5925 * operations) will never be smaller than the few bytes we need. */
5926 sds bulkcount;
5927
5928 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5929 slave->repldbsize);
5930 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5931 {
5932 sdsfree(bulkcount);
5933 freeClient(slave);
5934 return;
5935 }
5936 sdsfree(bulkcount);
5937 }
5938 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5939 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5940 if (buflen <= 0) {
5941 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5942 (buflen == 0) ? "premature EOF" : strerror(errno));
5943 freeClient(slave);
5944 return;
5945 }
5946 if ((nwritten = write(fd,buf,buflen)) == -1) {
5947 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5948 strerror(errno));
5949 freeClient(slave);
5950 return;
5951 }
5952 slave->repldboff += nwritten;
5953 if (slave->repldboff == slave->repldbsize) {
5954 close(slave->repldbfd);
5955 slave->repldbfd = -1;
5956 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5957 slave->replstate = REDIS_REPL_ONLINE;
5958 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5959 sendReplyToClient, slave) == AE_ERR) {
5960 freeClient(slave);
5961 return;
5962 }
5963 addReplySds(slave,sdsempty());
5964 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5965 }
5966 }
5967
5968 /* This function is called at the end of every backgrond saving.
5969 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5970 * otherwise REDIS_ERR is passed to the function.
5971 *
5972 * The goal of this function is to handle slaves waiting for a successful
5973 * background saving in order to perform non-blocking synchronization. */
5974 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5975 listNode *ln;
5976 int startbgsave = 0;
5977
5978 listRewind(server.slaves);
5979 while((ln = listYield(server.slaves))) {
5980 redisClient *slave = ln->value;
5981
5982 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5983 startbgsave = 1;
5984 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5985 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5986 struct redis_stat buf;
5987
5988 if (bgsaveerr != REDIS_OK) {
5989 freeClient(slave);
5990 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5991 continue;
5992 }
5993 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5994 redis_fstat(slave->repldbfd,&buf) == -1) {
5995 freeClient(slave);
5996 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5997 continue;
5998 }
5999 slave->repldboff = 0;
6000 slave->repldbsize = buf.st_size;
6001 slave->replstate = REDIS_REPL_SEND_BULK;
6002 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
6003 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
6004 freeClient(slave);
6005 continue;
6006 }
6007 }
6008 }
6009 if (startbgsave) {
6010 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
6011 listRewind(server.slaves);
6012 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
6013 while((ln = listYield(server.slaves))) {
6014 redisClient *slave = ln->value;
6015
6016 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
6017 freeClient(slave);
6018 }
6019 }
6020 }
6021 }
6022
6023 static int syncWithMaster(void) {
6024 char buf[1024], tmpfile[256], authcmd[1024];
6025 int dumpsize;
6026 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
6027 int dfd;
6028
6029 if (fd == -1) {
6030 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
6031 strerror(errno));
6032 return REDIS_ERR;
6033 }
6034
6035 /* AUTH with the master if required. */
6036 if(server.masterauth) {
6037 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
6038 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
6039 close(fd);
6040 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
6041 strerror(errno));
6042 return REDIS_ERR;
6043 }
6044 /* Read the AUTH result. */
6045 if (syncReadLine(fd,buf,1024,3600) == -1) {
6046 close(fd);
6047 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6048 strerror(errno));
6049 return REDIS_ERR;
6050 }
6051 if (buf[0] != '+') {
6052 close(fd);
6053 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6054 return REDIS_ERR;
6055 }
6056 }
6057
6058 /* Issue the SYNC command */
6059 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6060 close(fd);
6061 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6062 strerror(errno));
6063 return REDIS_ERR;
6064 }
6065 /* Read the bulk write count */
6066 if (syncReadLine(fd,buf,1024,3600) == -1) {
6067 close(fd);
6068 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6069 strerror(errno));
6070 return REDIS_ERR;
6071 }
6072 if (buf[0] != '$') {
6073 close(fd);
6074 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6075 return REDIS_ERR;
6076 }
6077 dumpsize = atoi(buf+1);
6078 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6079 /* Read the bulk write data on a temp file */
6080 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6081 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6082 if (dfd == -1) {
6083 close(fd);
6084 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6085 return REDIS_ERR;
6086 }
6087 while(dumpsize) {
6088 int nread, nwritten;
6089
6090 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6091 if (nread == -1) {
6092 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6093 strerror(errno));
6094 close(fd);
6095 close(dfd);
6096 return REDIS_ERR;
6097 }
6098 nwritten = write(dfd,buf,nread);
6099 if (nwritten == -1) {
6100 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6101 close(fd);
6102 close(dfd);
6103 return REDIS_ERR;
6104 }
6105 dumpsize -= nread;
6106 }
6107 close(dfd);
6108 if (rename(tmpfile,server.dbfilename) == -1) {
6109 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6110 unlink(tmpfile);
6111 close(fd);
6112 return REDIS_ERR;
6113 }
6114 emptyDb();
6115 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6116 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6117 close(fd);
6118 return REDIS_ERR;
6119 }
6120 server.master = createClient(fd);
6121 server.master->flags |= REDIS_MASTER;
6122 server.master->authenticated = 1;
6123 server.replstate = REDIS_REPL_CONNECTED;
6124 return REDIS_OK;
6125 }
6126
6127 static void slaveofCommand(redisClient *c) {
6128 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6129 !strcasecmp(c->argv[2]->ptr,"one")) {
6130 if (server.masterhost) {
6131 sdsfree(server.masterhost);
6132 server.masterhost = NULL;
6133 if (server.master) freeClient(server.master);
6134 server.replstate = REDIS_REPL_NONE;
6135 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6136 }
6137 } else {
6138 sdsfree(server.masterhost);
6139 server.masterhost = sdsdup(c->argv[1]->ptr);
6140 server.masterport = atoi(c->argv[2]->ptr);
6141 if (server.master) freeClient(server.master);
6142 server.replstate = REDIS_REPL_CONNECT;
6143 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6144 server.masterhost, server.masterport);
6145 }
6146 addReply(c,shared.ok);
6147 }
6148
6149 /* ============================ Maxmemory directive ======================== */
6150
6151 /* This function gets called when 'maxmemory' is set on the config file to limit
6152 * the max memory used by the server, and we are out of memory.
6153 * This function will try to, in order:
6154 *
6155 * - Free objects from the free list
6156 * - Try to remove keys with an EXPIRE set
6157 *
6158 * It is not possible to free enough memory to reach used-memory < maxmemory
6159 * the server will start refusing commands that will enlarge even more the
6160 * memory usage.
6161 */
6162 static void freeMemoryIfNeeded(void) {
6163 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6164 if (listLength(server.objfreelist)) {
6165 robj *o;
6166
6167 listNode *head = listFirst(server.objfreelist);
6168 o = listNodeValue(head);
6169 listDelNode(server.objfreelist,head);
6170 zfree(o);
6171 } else {
6172 int j, k, freed = 0;
6173
6174 for (j = 0; j < server.dbnum; j++) {
6175 int minttl = -1;
6176 robj *minkey = NULL;
6177 struct dictEntry *de;
6178
6179 if (dictSize(server.db[j].expires)) {
6180 freed = 1;
6181 /* From a sample of three keys drop the one nearest to
6182 * the natural expire */
6183 for (k = 0; k < 3; k++) {
6184 time_t t;
6185
6186 de = dictGetRandomKey(server.db[j].expires);
6187 t = (time_t) dictGetEntryVal(de);
6188 if (minttl == -1 || t < minttl) {
6189 minkey = dictGetEntryKey(de);
6190 minttl = t;
6191 }
6192 }
6193 deleteKey(server.db+j,minkey);
6194 }
6195 }
6196 if (!freed) return; /* nothing to free... */
6197 }
6198 }
6199 }
6200
6201 /* ============================== Append Only file ========================== */
6202
6203 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6204 sds buf = sdsempty();
6205 int j;
6206 ssize_t nwritten;
6207 time_t now;
6208 robj *tmpargv[3];
6209
6210 /* The DB this command was targetting is not the same as the last command
6211 * we appendend. To issue a SELECT command is needed. */
6212 if (dictid != server.appendseldb) {
6213 char seldb[64];
6214
6215 snprintf(seldb,sizeof(seldb),"%d",dictid);
6216 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6217 (unsigned long)strlen(seldb),seldb);
6218 server.appendseldb = dictid;
6219 }
6220
6221 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6222 * EXPIREs into EXPIREATs calls */
6223 if (cmd->proc == expireCommand) {
6224 long when;
6225
6226 tmpargv[0] = createStringObject("EXPIREAT",8);
6227 tmpargv[1] = argv[1];
6228 incrRefCount(argv[1]);
6229 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6230 tmpargv[2] = createObject(REDIS_STRING,
6231 sdscatprintf(sdsempty(),"%ld",when));
6232 argv = tmpargv;
6233 }
6234
6235 /* Append the actual command */
6236 buf = sdscatprintf(buf,"*%d\r\n",argc);
6237 for (j = 0; j < argc; j++) {
6238 robj *o = argv[j];
6239
6240 o = getDecodedObject(o);
6241 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6242 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6243 buf = sdscatlen(buf,"\r\n",2);
6244 decrRefCount(o);
6245 }
6246
6247 /* Free the objects from the modified argv for EXPIREAT */
6248 if (cmd->proc == expireCommand) {
6249 for (j = 0; j < 3; j++)
6250 decrRefCount(argv[j]);
6251 }
6252
6253 /* We want to perform a single write. This should be guaranteed atomic
6254 * at least if the filesystem we are writing is a real physical one.
6255 * While this will save us against the server being killed I don't think
6256 * there is much to do about the whole server stopping for power problems
6257 * or alike */
6258 nwritten = write(server.appendfd,buf,sdslen(buf));
6259 if (nwritten != (signed)sdslen(buf)) {
6260 /* Ooops, we are in troubles. The best thing to do for now is
6261 * to simply exit instead to give the illusion that everything is
6262 * working as expected. */
6263 if (nwritten == -1) {
6264 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6265 } else {
6266 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6267 }
6268 exit(1);
6269 }
6270 /* If a background append only file rewriting is in progress we want to
6271 * accumulate the differences between the child DB and the current one
6272 * in a buffer, so that when the child process will do its work we
6273 * can append the differences to the new append only file. */
6274 if (server.bgrewritechildpid != -1)
6275 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6276
6277 sdsfree(buf);
6278 now = time(NULL);
6279 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6280 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6281 now-server.lastfsync > 1))
6282 {
6283 fsync(server.appendfd); /* Let's try to get this data on the disk */
6284 server.lastfsync = now;
6285 }
6286 }
6287
6288 /* In Redis commands are always executed in the context of a client, so in
6289 * order to load the append only file we need to create a fake client. */
6290 static struct redisClient *createFakeClient(void) {
6291 struct redisClient *c = zmalloc(sizeof(*c));
6292
6293 selectDb(c,0);
6294 c->fd = -1;
6295 c->querybuf = sdsempty();
6296 c->argc = 0;
6297 c->argv = NULL;
6298 c->flags = 0;
6299 /* We set the fake client as a slave waiting for the synchronization
6300 * so that Redis will not try to send replies to this client. */
6301 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6302 c->reply = listCreate();
6303 listSetFreeMethod(c->reply,decrRefCount);
6304 listSetDupMethod(c->reply,dupClientReplyValue);
6305 return c;
6306 }
6307
6308 static void freeFakeClient(struct redisClient *c) {
6309 sdsfree(c->querybuf);
6310 listRelease(c->reply);
6311 zfree(c);
6312 }
6313
6314 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6315 * error (the append only file is zero-length) REDIS_ERR is returned. On
6316 * fatal error an error message is logged and the program exists. */
6317 int loadAppendOnlyFile(char *filename) {
6318 struct redisClient *fakeClient;
6319 FILE *fp = fopen(filename,"r");
6320 struct redis_stat sb;
6321
6322 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6323 return REDIS_ERR;
6324
6325 if (fp == NULL) {
6326 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6327 exit(1);
6328 }
6329
6330 fakeClient = createFakeClient();
6331 while(1) {
6332 int argc, j;
6333 unsigned long len;
6334 robj **argv;
6335 char buf[128];
6336 sds argsds;
6337 struct redisCommand *cmd;
6338
6339 if (fgets(buf,sizeof(buf),fp) == NULL) {
6340 if (feof(fp))
6341 break;
6342 else
6343 goto readerr;
6344 }
6345 if (buf[0] != '*') goto fmterr;
6346 argc = atoi(buf+1);
6347 argv = zmalloc(sizeof(robj*)*argc);
6348 for (j = 0; j < argc; j++) {
6349 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6350 if (buf[0] != '$') goto fmterr;
6351 len = strtol(buf+1,NULL,10);
6352 argsds = sdsnewlen(NULL,len);
6353 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6354 argv[j] = createObject(REDIS_STRING,argsds);
6355 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6356 }
6357
6358 /* Command lookup */
6359 cmd = lookupCommand(argv[0]->ptr);
6360 if (!cmd) {
6361 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6362 exit(1);
6363 }
6364 /* Try object sharing and encoding */
6365 if (server.shareobjects) {
6366 int j;
6367 for(j = 1; j < argc; j++)
6368 argv[j] = tryObjectSharing(argv[j]);
6369 }
6370 if (cmd->flags & REDIS_CMD_BULK)
6371 tryObjectEncoding(argv[argc-1]);
6372 /* Run the command in the context of a fake client */
6373 fakeClient->argc = argc;
6374 fakeClient->argv = argv;
6375 cmd->proc(fakeClient);
6376 /* Discard the reply objects list from the fake client */
6377 while(listLength(fakeClient->reply))
6378 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6379 /* Clean up, ready for the next command */
6380 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6381 zfree(argv);
6382 }
6383 fclose(fp);
6384 freeFakeClient(fakeClient);
6385 return REDIS_OK;
6386
6387 readerr:
6388 if (feof(fp)) {
6389 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6390 } else {
6391 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6392 }
6393 exit(1);
6394 fmterr:
6395 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6396 exit(1);
6397 }
6398
6399 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6400 static int fwriteBulk(FILE *fp, robj *obj) {
6401 char buf[128];
6402 obj = getDecodedObject(obj);
6403 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6404 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6405 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6406 goto err;
6407 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6408 decrRefCount(obj);
6409 return 1;
6410 err:
6411 decrRefCount(obj);
6412 return 0;
6413 }
6414
6415 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6416 static int fwriteBulkDouble(FILE *fp, double d) {
6417 char buf[128], dbuf[128];
6418
6419 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6420 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6421 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6422 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6423 return 1;
6424 }
6425
6426 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6427 static int fwriteBulkLong(FILE *fp, long l) {
6428 char buf[128], lbuf[128];
6429
6430 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6431 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6432 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6433 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6434 return 1;
6435 }
6436
6437 /* Write a sequence of commands able to fully rebuild the dataset into
6438 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6439 static int rewriteAppendOnlyFile(char *filename) {
6440 dictIterator *di = NULL;
6441 dictEntry *de;
6442 FILE *fp;
6443 char tmpfile[256];
6444 int j;
6445 time_t now = time(NULL);
6446
6447 /* Note that we have to use a different temp name here compared to the
6448 * one used by rewriteAppendOnlyFileBackground() function. */
6449 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6450 fp = fopen(tmpfile,"w");
6451 if (!fp) {
6452 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6453 return REDIS_ERR;
6454 }
6455 for (j = 0; j < server.dbnum; j++) {
6456 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6457 redisDb *db = server.db+j;
6458 dict *d = db->dict;
6459 if (dictSize(d) == 0) continue;
6460 di = dictGetIterator(d);
6461 if (!di) {
6462 fclose(fp);
6463 return REDIS_ERR;
6464 }
6465
6466 /* SELECT the new DB */
6467 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6468 if (fwriteBulkLong(fp,j) == 0) goto werr;
6469
6470 /* Iterate this DB writing every entry */
6471 while((de = dictNext(di)) != NULL) {
6472 robj *key = dictGetEntryKey(de);
6473 robj *o = dictGetEntryVal(de);
6474 time_t expiretime = getExpire(db,key);
6475
6476 /* Save the key and associated value */
6477 if (o->type == REDIS_STRING) {
6478 /* Emit a SET command */
6479 char cmd[]="*3\r\n$3\r\nSET\r\n";
6480 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6481 /* Key and value */
6482 if (fwriteBulk(fp,key) == 0) goto werr;
6483 if (fwriteBulk(fp,o) == 0) goto werr;
6484 } else if (o->type == REDIS_LIST) {
6485 /* Emit the RPUSHes needed to rebuild the list */
6486 list *list = o->ptr;
6487 listNode *ln;
6488
6489 listRewind(list);
6490 while((ln = listYield(list))) {
6491 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6492 robj *eleobj = listNodeValue(ln);
6493
6494 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6495 if (fwriteBulk(fp,key) == 0) goto werr;
6496 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6497 }
6498 } else if (o->type == REDIS_SET) {
6499 /* Emit the SADDs needed to rebuild the set */
6500 dict *set = o->ptr;
6501 dictIterator *di = dictGetIterator(set);
6502 dictEntry *de;
6503
6504 while((de = dictNext(di)) != NULL) {
6505 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6506 robj *eleobj = dictGetEntryKey(de);
6507
6508 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6509 if (fwriteBulk(fp,key) == 0) goto werr;
6510 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6511 }
6512 dictReleaseIterator(di);
6513 } else if (o->type == REDIS_ZSET) {
6514 /* Emit the ZADDs needed to rebuild the sorted set */
6515 zset *zs = o->ptr;
6516 dictIterator *di = dictGetIterator(zs->dict);
6517 dictEntry *de;
6518
6519 while((de = dictNext(di)) != NULL) {
6520 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6521 robj *eleobj = dictGetEntryKey(de);
6522 double *score = dictGetEntryVal(de);
6523
6524 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6525 if (fwriteBulk(fp,key) == 0) goto werr;
6526 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6527 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6528 }
6529 dictReleaseIterator(di);
6530 } else {
6531 redisAssert(0 != 0);
6532 }
6533 /* Save the expire time */
6534 if (expiretime != -1) {
6535 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6536 /* If this key is already expired skip it */
6537 if (expiretime < now) continue;
6538 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6539 if (fwriteBulk(fp,key) == 0) goto werr;
6540 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6541 }
6542 }
6543 dictReleaseIterator(di);
6544 }
6545
6546 /* Make sure data will not remain on the OS's output buffers */
6547 fflush(fp);
6548 fsync(fileno(fp));
6549 fclose(fp);
6550
6551 /* Use RENAME to make sure the DB file is changed atomically only
6552 * if the generate DB file is ok. */
6553 if (rename(tmpfile,filename) == -1) {
6554 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6555 unlink(tmpfile);
6556 return REDIS_ERR;
6557 }
6558 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6559 return REDIS_OK;
6560
6561 werr:
6562 fclose(fp);
6563 unlink(tmpfile);
6564 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6565 if (di) dictReleaseIterator(di);
6566 return REDIS_ERR;
6567 }
6568
6569 /* This is how rewriting of the append only file in background works:
6570 *
6571 * 1) The user calls BGREWRITEAOF
6572 * 2) Redis calls this function, that forks():
6573 * 2a) the child rewrite the append only file in a temp file.
6574 * 2b) the parent accumulates differences in server.bgrewritebuf.
6575 * 3) When the child finished '2a' exists.
6576 * 4) The parent will trap the exit code, if it's OK, will append the
6577 * data accumulated into server.bgrewritebuf into the temp file, and
6578 * finally will rename(2) the temp file in the actual file name.
6579 * The the new file is reopened as the new append only file. Profit!
6580 */
6581 static int rewriteAppendOnlyFileBackground(void) {
6582 pid_t childpid;
6583
6584 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6585 if ((childpid = fork()) == 0) {
6586 /* Child */
6587 char tmpfile[256];
6588 close(server.fd);
6589
6590 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6591 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6592 exit(0);
6593 } else {
6594 exit(1);
6595 }
6596 } else {
6597 /* Parent */
6598 if (childpid == -1) {
6599 redisLog(REDIS_WARNING,
6600 "Can't rewrite append only file in background: fork: %s",
6601 strerror(errno));
6602 return REDIS_ERR;
6603 }
6604 redisLog(REDIS_NOTICE,
6605 "Background append only file rewriting started by pid %d",childpid);
6606 server.bgrewritechildpid = childpid;
6607 /* We set appendseldb to -1 in order to force the next call to the
6608 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6609 * accumulated by the parent into server.bgrewritebuf will start
6610 * with a SELECT statement and it will be safe to merge. */
6611 server.appendseldb = -1;
6612 return REDIS_OK;
6613 }
6614 return REDIS_OK; /* unreached */
6615 }
6616
6617 static void bgrewriteaofCommand(redisClient *c) {
6618 if (server.bgrewritechildpid != -1) {
6619 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6620 return;
6621 }
6622 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6623 char *status = "+Background append only file rewriting started\r\n";
6624 addReplySds(c,sdsnew(status));
6625 } else {
6626 addReply(c,shared.err);
6627 }
6628 }
6629
6630 static void aofRemoveTempFile(pid_t childpid) {
6631 char tmpfile[256];
6632
6633 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6634 unlink(tmpfile);
6635 }
6636
6637 /* =============================== Virtual Memory =========================== */
6638 static void vmInit(void) {
6639 off_t totsize;
6640
6641 server.vm_fp = fopen("/tmp/redisvm","w+b");
6642 if (server.vm_fp == NULL) {
6643 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6644 exit(1);
6645 }
6646 server.vm_fd = fileno(server.vm_fp);
6647 server.vm_next_page = 0;
6648 server.vm_near_pages = 0;
6649 totsize = server.vm_pages*server.vm_page_size;
6650 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6651 if (ftruncate(server.vm_fd,totsize) == -1) {
6652 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6653 strerror(errno));
6654 exit(1);
6655 } else {
6656 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6657 }
6658 server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
6659 memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
6660 /* Try to remove the swap file, so the OS will really delete it from the
6661 * file system when Redis exists. */
6662 unlink("/tmp/redisvm");
6663 }
6664
6665 /* Mark the page as used */
6666 static void vmMarkPageUsed(off_t page) {
6667 off_t byte = page/8;
6668 int bit = page&7;
6669 server.vm_bitmap[byte] |= 1<<bit;
6670 printf("Mark used: %lld (byte:%d bit:%d)\n", (long long)page, byte, bit);
6671 }
6672
6673 /* Mark N contiguous pages as used, with 'page' being the first. */
6674 static void vmMarkPagesUsed(off_t page, off_t count) {
6675 off_t j;
6676
6677 for (j = 0; j < count; j++)
6678 vmMarkPageUsed(page+j);
6679 }
6680
6681 /* Mark the page as free */
6682 static void vmMarkPageFree(off_t page) {
6683 off_t byte = page/8;
6684 int bit = page&7;
6685 server.vm_bitmap[byte] &= ~(1<<bit);
6686 }
6687
6688 /* Mark N contiguous pages as free, with 'page' being the first. */
6689 static void vmMarkPagesFree(off_t page, off_t count) {
6690 off_t j;
6691
6692 for (j = 0; j < count; j++)
6693 vmMarkPageFree(page+j);
6694 }
6695
6696 /* Test if the page is free */
6697 static int vmFreePage(off_t page) {
6698 off_t byte = page/8;
6699 int bit = page&7;
6700 return (server.vm_bitmap[byte] & (1<<bit)) == 0;
6701 }
6702
6703 /* Find N contiguous free pages storing the first page of the cluster in *first.
6704 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6705 * REDIS_ERR is returned.
6706 *
6707 * This function uses a simple algorithm: we try to allocate
6708 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6709 * again from the start of the swap file searching for free spaces.
6710 *
6711 * If it looks pretty clear that there are no free pages near our offset
6712 * we try to find less populated places doing a forward jump of
6713 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6714 * without hurry, and then we jump again and so forth...
6715 *
6716 * This function can be improved using a free list to avoid to guess
6717 * too much, since we could collect data about freed pages.
6718 *
6719 * note: I implemented this function just after watching an episode of
6720 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6721 */
6722 static int vmFindContiguousPages(off_t *first, int n) {
6723 off_t base, offset = 0, since_jump = 0, numfree = 0;
6724
6725 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6726 server.vm_near_pages = 0;
6727 server.vm_next_page = 0;
6728 }
6729 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6730 base = server.vm_next_page;
6731
6732 while(offset < server.vm_pages) {
6733 off_t this = base+offset;
6734
6735 printf("THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
6736 /* If we overflow, restart from page zero */
6737 if (this >= server.vm_pages) {
6738 this -= server.vm_pages;
6739 if (this == 0) {
6740 /* Just overflowed, what we found on tail is no longer
6741 * interesting, as it's no longer contiguous. */
6742 numfree = 0;
6743 }
6744 }
6745 if (vmFreePage(this)) {
6746 /* This is a free page */
6747 numfree++;
6748 /* Already got N free pages? Return to the caller, with success */
6749 if (numfree == n) {
6750 *first = this-(n-1);
6751 server.vm_next_page = this+1;
6752 return REDIS_OK;
6753 }
6754 } else {
6755 /* The current one is not a free page */
6756 numfree = 0;
6757 }
6758
6759 /* Fast-forward if the current page is not free and we already
6760 * searched enough near this place. */
6761 since_jump++;
6762 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6763 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6764 since_jump = 0;
6765 /* Note that even if we rewind after the jump, we are don't need
6766 * to make sure numfree is set to zero as we only jump *if* it
6767 * is set to zero. */
6768 } else {
6769 /* Otherwise just check the next page */
6770 offset++;
6771 }
6772 }
6773 return REDIS_ERR;
6774 }
6775
6776 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6777 * needed to later retrieve the object into the key object.
6778 * If we can't find enough contiguous empty pages to swap the object on disk
6779 * REDIS_ERR is returned. */
6780 static int vmSwapObject(robj *key, robj *val) {
6781 off_t pages = rdbSavedObjectPages(val);
6782 off_t page;
6783
6784 assert(key->storage == REDIS_VM_MEMORY);
6785 if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
6786 if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
6787 redisLog(REDIS_WARNING,
6788 "Critical VM problem in vmSwapObject(): can't seek: %s",
6789 strerror(errno));
6790 return REDIS_ERR;
6791 }
6792 rdbSaveObject(server.vm_fp,val);
6793 key->vm.page = page;
6794 key->vm.usedpages = pages;
6795 key->storage = REDIS_VM_SWAPPED;
6796 key->vtype = val->type;
6797 decrRefCount(val); /* Deallocate the object from memory. */
6798 vmMarkPagesUsed(page,pages);
6799 redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
6800 (unsigned char*) key->ptr,
6801 (unsigned long long) page, (unsigned long long) pages);
6802 return REDIS_OK;
6803 }
6804
6805 /* Load the value object relative to the 'key' object from swap to memory.
6806 * The newly allocated object is returned. */
6807 static robj *vmLoadObject(robj *key) {
6808 robj *val;
6809
6810 assert(key->storage == REDIS_VM_SWAPPED);
6811 if (fseeko(server.vm_fp,key->vm.page*server.vm_page_size,SEEK_SET) == -1) {
6812 redisLog(REDIS_WARNING,
6813 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6814 strerror(errno));
6815 exit(1);
6816 }
6817 val = rdbLoadObject(key->vtype,server.vm_fp);
6818 if (val == NULL) {
6819 redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno));
6820 exit(1);
6821 }
6822 key->storage = REDIS_VM_MEMORY;
6823 key->vm.atime = server.unixtime;
6824 vmMarkPagesFree(key->vm.page,key->vm.usedpages);
6825 redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
6826 (unsigned char*) key->ptr);
6827 return val;
6828 }
6829
6830 /* ================================= Debugging ============================== */
6831
6832 static void debugCommand(redisClient *c) {
6833 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6834 *((char*)-1) = 'x';
6835 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6836 if (rdbSave(server.dbfilename) != REDIS_OK) {
6837 addReply(c,shared.err);
6838 return;
6839 }
6840 emptyDb();
6841 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6842 addReply(c,shared.err);
6843 return;
6844 }
6845 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6846 addReply(c,shared.ok);
6847 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6848 emptyDb();
6849 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6850 addReply(c,shared.err);
6851 return;
6852 }
6853 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6854 addReply(c,shared.ok);
6855 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6856 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6857 robj *key, *val;
6858
6859 if (!de) {
6860 addReply(c,shared.nokeyerr);
6861 return;
6862 }
6863 key = dictGetEntryKey(de);
6864 val = dictGetEntryVal(de);
6865 addReplySds(c,sdscatprintf(sdsempty(),
6866 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6867 (void*)key, key->refcount, (void*)val, val->refcount,
6868 val->encoding, rdbSavedObjectLen(val)));
6869 } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
6870 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6871 robj *key, *val;
6872
6873 if (!server.vm_enabled) {
6874 addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
6875 return;
6876 }
6877 if (!de) {
6878 addReply(c,shared.nokeyerr);
6879 return;
6880 }
6881 key = dictGetEntryKey(de);
6882 val = dictGetEntryVal(de);
6883 if (key->storage != REDIS_VM_MEMORY) {
6884 addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
6885 } else if (vmSwapObject(key,val) == REDIS_OK) {
6886 dictGetEntryVal(de) = NULL;
6887 addReply(c,shared.ok);
6888 } else {
6889 addReply(c,shared.err);
6890 }
6891 } else {
6892 addReplySds(c,sdsnew(
6893 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
6894 }
6895 }
6896
6897 static void _redisAssert(char *estr) {
6898 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6899 redisLog(REDIS_WARNING,"==> %s\n",estr);
6900 #ifdef HAVE_BACKTRACE
6901 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6902 *((char*)-1) = 'x';
6903 #endif
6904 }
6905
6906 /* =================================== Main! ================================ */
6907
6908 #ifdef __linux__
6909 int linuxOvercommitMemoryValue(void) {
6910 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6911 char buf[64];
6912
6913 if (!fp) return -1;
6914 if (fgets(buf,64,fp) == NULL) {
6915 fclose(fp);
6916 return -1;
6917 }
6918 fclose(fp);
6919
6920 return atoi(buf);
6921 }
6922
6923 void linuxOvercommitMemoryWarning(void) {
6924 if (linuxOvercommitMemoryValue() == 0) {
6925 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6926 }
6927 }
6928 #endif /* __linux__ */
6929
6930 static void daemonize(void) {
6931 int fd;
6932 FILE *fp;
6933
6934 if (fork() != 0) exit(0); /* parent exits */
6935 printf("New pid: %d\n", getpid());
6936 setsid(); /* create a new session */
6937
6938 /* Every output goes to /dev/null. If Redis is daemonized but
6939 * the 'logfile' is set to 'stdout' in the configuration file
6940 * it will not log at all. */
6941 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6942 dup2(fd, STDIN_FILENO);
6943 dup2(fd, STDOUT_FILENO);
6944 dup2(fd, STDERR_FILENO);
6945 if (fd > STDERR_FILENO) close(fd);
6946 }
6947 /* Try to write the pid file */
6948 fp = fopen(server.pidfile,"w");
6949 if (fp) {
6950 fprintf(fp,"%d\n",getpid());
6951 fclose(fp);
6952 }
6953 }
6954
6955 int main(int argc, char **argv) {
6956 initServerConfig();
6957 if (argc == 2) {
6958 resetServerSaveParams();
6959 loadServerConfig(argv[1]);
6960 } else if (argc > 2) {
6961 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6962 exit(1);
6963 } else {
6964 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6965 }
6966 if (server.daemonize) daemonize();
6967 initServer();
6968 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6969 #ifdef __linux__
6970 linuxOvercommitMemoryWarning();
6971 #endif
6972 if (server.appendonly) {
6973 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6974 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6975 } else {
6976 if (rdbLoad(server.dbfilename) == REDIS_OK)
6977 redisLog(REDIS_NOTICE,"DB loaded from disk");
6978 }
6979 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6980 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6981 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6982 aeMain(server.el);
6983 aeDeleteEventLoop(server.el);
6984 return 0;
6985 }
6986
6987 /* ============================= Backtrace support ========================= */
6988
6989 #ifdef HAVE_BACKTRACE
6990 static char *findFuncName(void *pointer, unsigned long *offset);
6991
6992 static void *getMcontextEip(ucontext_t *uc) {
6993 #if defined(__FreeBSD__)
6994 return (void*) uc->uc_mcontext.mc_eip;
6995 #elif defined(__dietlibc__)
6996 return (void*) uc->uc_mcontext.eip;
6997 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6998 #if __x86_64__
6999 return (void*) uc->uc_mcontext->__ss.__rip;
7000 #else
7001 return (void*) uc->uc_mcontext->__ss.__eip;
7002 #endif
7003 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
7004 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
7005 return (void*) uc->uc_mcontext->__ss.__rip;
7006 #else
7007 return (void*) uc->uc_mcontext->__ss.__eip;
7008 #endif
7009 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
7010 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
7011 #elif defined(__ia64__) /* Linux IA64 */
7012 return (void*) uc->uc_mcontext.sc_ip;
7013 #else
7014 return NULL;
7015 #endif
7016 }
7017
7018 static void segvHandler(int sig, siginfo_t *info, void *secret) {
7019 void *trace[100];
7020 char **messages = NULL;
7021 int i, trace_size = 0;
7022 unsigned long offset=0;
7023 ucontext_t *uc = (ucontext_t*) secret;
7024 sds infostring;
7025 REDIS_NOTUSED(info);
7026
7027 redisLog(REDIS_WARNING,
7028 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
7029 infostring = genRedisInfoString();
7030 redisLog(REDIS_WARNING, "%s",infostring);
7031 /* It's not safe to sdsfree() the returned string under memory
7032 * corruption conditions. Let it leak as we are going to abort */
7033
7034 trace_size = backtrace(trace, 100);
7035 /* overwrite sigaction with caller's address */
7036 if (getMcontextEip(uc) != NULL) {
7037 trace[1] = getMcontextEip(uc);
7038 }
7039 messages = backtrace_symbols(trace, trace_size);
7040
7041 for (i=1; i<trace_size; ++i) {
7042 char *fn = findFuncName(trace[i], &offset), *p;
7043
7044 p = strchr(messages[i],'+');
7045 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
7046 redisLog(REDIS_WARNING,"%s", messages[i]);
7047 } else {
7048 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
7049 }
7050 }
7051 /* free(messages); Don't call free() with possibly corrupted memory. */
7052 exit(0);
7053 }
7054
7055 static void setupSigSegvAction(void) {
7056 struct sigaction act;
7057
7058 sigemptyset (&act.sa_mask);
7059 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7060 * is used. Otherwise, sa_handler is used */
7061 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
7062 act.sa_sigaction = segvHandler;
7063 sigaction (SIGSEGV, &act, NULL);
7064 sigaction (SIGBUS, &act, NULL);
7065 sigaction (SIGFPE, &act, NULL);
7066 sigaction (SIGILL, &act, NULL);
7067 sigaction (SIGBUS, &act, NULL);
7068 return;
7069 }
7070
7071 #include "staticsymbols.h"
7072 /* This function try to convert a pointer into a function name. It's used in
7073 * oreder to provide a backtrace under segmentation fault that's able to
7074 * display functions declared as static (otherwise the backtrace is useless). */
7075 static char *findFuncName(void *pointer, unsigned long *offset){
7076 int i, ret = -1;
7077 unsigned long off, minoff = 0;
7078
7079 /* Try to match against the Symbol with the smallest offset */
7080 for (i=0; symsTable[i].pointer; i++) {
7081 unsigned long lp = (unsigned long) pointer;
7082
7083 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
7084 off=lp-symsTable[i].pointer;
7085 if (ret < 0 || off < minoff) {
7086 minoff=off;
7087 ret=i;
7088 }
7089 }
7090 }
7091 if (ret == -1) return NULL;
7092 *offset = minoff;
7093 return symsTable[ret].name;
7094 }
7095 #else /* HAVE_BACKTRACE */
7096 static void setupSigSegvAction(void) {
7097 }
7098 #endif /* HAVE_BACKTRACE */
7099
7100
7101
7102 /* The End */
7103
7104
7105