]> git.saurik.com Git - redis.git/blob - redis.c
8d1b91d3d4a5cf67350ce86e6ed26630d3936cdf
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166 /* Client flags */
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188 /* List related stuff */
189 #define REDIS_HEAD 0
190 #define REDIS_TAIL 1
191
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
197
198 /* Log levels */
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
202
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
205
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
213
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr);
217
218 /*================================= Data types ============================== */
219
220 /* A redis object, that is a type able to hold a string / list / set */
221
222 /* The VM object structure */
223 struct redisObjectVM {
224 off_t page; /* the page at witch the object is stored on disk */
225 off_t usedpages; /* number of pages used on disk */
226 time_t atime; /* Last access time */
227 } vm;
228
229 /* The actual Redis Object */
230 typedef struct redisObject {
231 void *ptr;
232 unsigned char type;
233 unsigned char encoding;
234 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
235 unsigned char notused;
236 int refcount;
237 /* VM fields, this are only allocated if VM is active, otherwise the
238 * object allocation function will just allocate
239 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
240 * Redis without VM active will not have any overhead. */
241 struct redisObjectVM vm;
242 } robj;
243
244 /* Macro used to initalize a Redis object allocated on the stack.
245 * Note that this macro is taken near the structure definition to make sure
246 * we'll update it when the structure is changed, to avoid bugs like
247 * bug #85 introduced exactly in this way. */
248 #define initStaticStringObject(_var,_ptr) do { \
249 _var.refcount = 1; \
250 _var.type = REDIS_STRING; \
251 _var.encoding = REDIS_ENCODING_RAW; \
252 _var.ptr = _ptr; \
253 if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
254 } while(0);
255
256 typedef struct redisDb {
257 dict *dict; /* The keyspace for this DB */
258 dict *expires; /* Timeout of keys with a timeout set */
259 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
260 int id;
261 } redisDb;
262
263 /* Client MULTI/EXEC state */
264 typedef struct multiCmd {
265 robj **argv;
266 int argc;
267 struct redisCommand *cmd;
268 } multiCmd;
269
270 typedef struct multiState {
271 multiCmd *commands; /* Array of MULTI commands */
272 int count; /* Total number of MULTI commands */
273 } multiState;
274
275 /* With multiplexing we need to take per-clinet state.
276 * Clients are taken in a liked list. */
277 typedef struct redisClient {
278 int fd;
279 redisDb *db;
280 int dictid;
281 sds querybuf;
282 robj **argv, **mbargv;
283 int argc, mbargc;
284 int bulklen; /* bulk read len. -1 if not in bulk read mode */
285 int multibulk; /* multi bulk command format active */
286 list *reply;
287 int sentlen;
288 time_t lastinteraction; /* time of the last interaction, used for timeout */
289 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
290 /* REDIS_MULTI */
291 int slaveseldb; /* slave selected db, if this client is a slave */
292 int authenticated; /* when requirepass is non-NULL */
293 int replstate; /* replication state if this is a slave */
294 int repldbfd; /* replication DB file descriptor */
295 long repldboff; /* replication DB file offset */
296 off_t repldbsize; /* replication DB file size */
297 multiState mstate; /* MULTI/EXEC state */
298 robj **blockingkeys; /* The key we waiting to terminate a blocking
299 * operation such as BLPOP. Otherwise NULL. */
300 int blockingkeysnum; /* Number of blocking keys */
301 time_t blockingto; /* Blocking operation timeout. If UNIX current time
302 * is >= blockingto then the operation timed out. */
303 } redisClient;
304
305 struct saveparam {
306 time_t seconds;
307 int changes;
308 };
309
310 /* Global server state structure */
311 struct redisServer {
312 int port;
313 int fd;
314 redisDb *db;
315 dict *sharingpool; /* Poll used for object sharing */
316 unsigned int sharingpoolsize;
317 long long dirty; /* changes to DB from the last save */
318 list *clients;
319 list *slaves, *monitors;
320 char neterr[ANET_ERR_LEN];
321 aeEventLoop *el;
322 int cronloops; /* number of times the cron function run */
323 list *objfreelist; /* A list of freed objects to avoid malloc() */
324 time_t lastsave; /* Unix time of last save succeeede */
325 size_t usedmemory; /* Used memory in megabytes */
326 /* Fields used only for stats */
327 time_t stat_starttime; /* server start time */
328 long long stat_numcommands; /* number of processed commands */
329 long long stat_numconnections; /* number of connections received */
330 /* Configuration */
331 int verbosity;
332 int glueoutputbuf;
333 int maxidletime;
334 int dbnum;
335 int daemonize;
336 int appendonly;
337 int appendfsync;
338 time_t lastfsync;
339 int appendfd;
340 int appendseldb;
341 char *pidfile;
342 pid_t bgsavechildpid;
343 pid_t bgrewritechildpid;
344 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
345 struct saveparam *saveparams;
346 int saveparamslen;
347 char *logfile;
348 char *bindaddr;
349 char *dbfilename;
350 char *appendfilename;
351 char *requirepass;
352 int shareobjects;
353 int rdbcompression;
354 /* Replication related */
355 int isslave;
356 char *masterauth;
357 char *masterhost;
358 int masterport;
359 redisClient *master; /* client that is master for this slave */
360 int replstate;
361 unsigned int maxclients;
362 unsigned long maxmemory;
363 unsigned int blockedclients;
364 /* Sort parameters - qsort_r() is only available under BSD so we
365 * have to take this state global, in order to pass it to sortCompare() */
366 int sort_desc;
367 int sort_alpha;
368 int sort_bypattern;
369 /* Virtual memory configuration */
370 int vm_enabled;
371 off_t vm_page_size;
372 off_t vm_pages;
373 long vm_max_memory;
374 /* Virtual memory state */
375 FILE *vm_fp;
376 int vm_fd;
377 off_t vm_next_page; /* Next probably empty page */
378 off_t vm_near_pages; /* Number of pages allocated sequentially */
379 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
380 time_t unixtime; /* Unix time sampled every second. */
381 };
382
383 typedef void redisCommandProc(redisClient *c);
384 struct redisCommand {
385 char *name;
386 redisCommandProc *proc;
387 int arity;
388 int flags;
389 };
390
391 struct redisFunctionSym {
392 char *name;
393 unsigned long pointer;
394 };
395
396 typedef struct _redisSortObject {
397 robj *obj;
398 union {
399 double score;
400 robj *cmpobj;
401 } u;
402 } redisSortObject;
403
404 typedef struct _redisSortOperation {
405 int type;
406 robj *pattern;
407 } redisSortOperation;
408
409 /* ZSETs use a specialized version of Skiplists */
410
411 typedef struct zskiplistNode {
412 struct zskiplistNode **forward;
413 struct zskiplistNode *backward;
414 double score;
415 robj *obj;
416 } zskiplistNode;
417
418 typedef struct zskiplist {
419 struct zskiplistNode *header, *tail;
420 unsigned long length;
421 int level;
422 } zskiplist;
423
424 typedef struct zset {
425 dict *dict;
426 zskiplist *zsl;
427 } zset;
428
429 /* Our shared "common" objects */
430
431 struct sharedObjectsStruct {
432 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
433 *colon, *nullbulk, *nullmultibulk, *queued,
434 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
435 *outofrangeerr, *plus,
436 *select0, *select1, *select2, *select3, *select4,
437 *select5, *select6, *select7, *select8, *select9;
438 } shared;
439
440 /* Global vars that are actally used as constants. The following double
441 * values are used for double on-disk serialization, and are initialized
442 * at runtime to avoid strange compiler optimizations. */
443
444 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
445
446 /*================================ Prototypes =============================== */
447
448 static void freeStringObject(robj *o);
449 static void freeListObject(robj *o);
450 static void freeSetObject(robj *o);
451 static void decrRefCount(void *o);
452 static robj *createObject(int type, void *ptr);
453 static void freeClient(redisClient *c);
454 static int rdbLoad(char *filename);
455 static void addReply(redisClient *c, robj *obj);
456 static void addReplySds(redisClient *c, sds s);
457 static void incrRefCount(robj *o);
458 static int rdbSaveBackground(char *filename);
459 static robj *createStringObject(char *ptr, size_t len);
460 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
461 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
462 static int syncWithMaster(void);
463 static robj *tryObjectSharing(robj *o);
464 static int tryObjectEncoding(robj *o);
465 static robj *getDecodedObject(robj *o);
466 static int removeExpire(redisDb *db, robj *key);
467 static int expireIfNeeded(redisDb *db, robj *key);
468 static int deleteIfVolatile(redisDb *db, robj *key);
469 static int deleteKey(redisDb *db, robj *key);
470 static time_t getExpire(redisDb *db, robj *key);
471 static int setExpire(redisDb *db, robj *key, time_t when);
472 static void updateSlavesWaitingBgsave(int bgsaveerr);
473 static void freeMemoryIfNeeded(void);
474 static int processCommand(redisClient *c);
475 static void setupSigSegvAction(void);
476 static void rdbRemoveTempFile(pid_t childpid);
477 static void aofRemoveTempFile(pid_t childpid);
478 static size_t stringObjectLen(robj *o);
479 static void processInputBuffer(redisClient *c);
480 static zskiplist *zslCreate(void);
481 static void zslFree(zskiplist *zsl);
482 static void zslInsert(zskiplist *zsl, double score, robj *obj);
483 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
484 static void initClientMultiState(redisClient *c);
485 static void freeClientMultiState(redisClient *c);
486 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
487 static void unblockClient(redisClient *c);
488 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
489 static void vmInit(void);
490 static void vmMarkPagesFree(off_t page, off_t count);
491
492 static void authCommand(redisClient *c);
493 static void pingCommand(redisClient *c);
494 static void echoCommand(redisClient *c);
495 static void setCommand(redisClient *c);
496 static void setnxCommand(redisClient *c);
497 static void getCommand(redisClient *c);
498 static void delCommand(redisClient *c);
499 static void existsCommand(redisClient *c);
500 static void incrCommand(redisClient *c);
501 static void decrCommand(redisClient *c);
502 static void incrbyCommand(redisClient *c);
503 static void decrbyCommand(redisClient *c);
504 static void selectCommand(redisClient *c);
505 static void randomkeyCommand(redisClient *c);
506 static void keysCommand(redisClient *c);
507 static void dbsizeCommand(redisClient *c);
508 static void lastsaveCommand(redisClient *c);
509 static void saveCommand(redisClient *c);
510 static void bgsaveCommand(redisClient *c);
511 static void bgrewriteaofCommand(redisClient *c);
512 static void shutdownCommand(redisClient *c);
513 static void moveCommand(redisClient *c);
514 static void renameCommand(redisClient *c);
515 static void renamenxCommand(redisClient *c);
516 static void lpushCommand(redisClient *c);
517 static void rpushCommand(redisClient *c);
518 static void lpopCommand(redisClient *c);
519 static void rpopCommand(redisClient *c);
520 static void llenCommand(redisClient *c);
521 static void lindexCommand(redisClient *c);
522 static void lrangeCommand(redisClient *c);
523 static void ltrimCommand(redisClient *c);
524 static void typeCommand(redisClient *c);
525 static void lsetCommand(redisClient *c);
526 static void saddCommand(redisClient *c);
527 static void sremCommand(redisClient *c);
528 static void smoveCommand(redisClient *c);
529 static void sismemberCommand(redisClient *c);
530 static void scardCommand(redisClient *c);
531 static void spopCommand(redisClient *c);
532 static void srandmemberCommand(redisClient *c);
533 static void sinterCommand(redisClient *c);
534 static void sinterstoreCommand(redisClient *c);
535 static void sunionCommand(redisClient *c);
536 static void sunionstoreCommand(redisClient *c);
537 static void sdiffCommand(redisClient *c);
538 static void sdiffstoreCommand(redisClient *c);
539 static void syncCommand(redisClient *c);
540 static void flushdbCommand(redisClient *c);
541 static void flushallCommand(redisClient *c);
542 static void sortCommand(redisClient *c);
543 static void lremCommand(redisClient *c);
544 static void rpoplpushcommand(redisClient *c);
545 static void infoCommand(redisClient *c);
546 static void mgetCommand(redisClient *c);
547 static void monitorCommand(redisClient *c);
548 static void expireCommand(redisClient *c);
549 static void expireatCommand(redisClient *c);
550 static void getsetCommand(redisClient *c);
551 static void ttlCommand(redisClient *c);
552 static void slaveofCommand(redisClient *c);
553 static void debugCommand(redisClient *c);
554 static void msetCommand(redisClient *c);
555 static void msetnxCommand(redisClient *c);
556 static void zaddCommand(redisClient *c);
557 static void zincrbyCommand(redisClient *c);
558 static void zrangeCommand(redisClient *c);
559 static void zrangebyscoreCommand(redisClient *c);
560 static void zrevrangeCommand(redisClient *c);
561 static void zcardCommand(redisClient *c);
562 static void zremCommand(redisClient *c);
563 static void zscoreCommand(redisClient *c);
564 static void zremrangebyscoreCommand(redisClient *c);
565 static void multiCommand(redisClient *c);
566 static void execCommand(redisClient *c);
567 static void blpopCommand(redisClient *c);
568 static void brpopCommand(redisClient *c);
569
570 /*================================= Globals ================================= */
571
572 /* Global vars */
573 static struct redisServer server; /* server global state */
574 static struct redisCommand cmdTable[] = {
575 {"get",getCommand,2,REDIS_CMD_INLINE},
576 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
577 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
578 {"del",delCommand,-2,REDIS_CMD_INLINE},
579 {"exists",existsCommand,2,REDIS_CMD_INLINE},
580 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
581 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
582 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
583 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
584 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
585 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
586 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
587 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
588 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
589 {"llen",llenCommand,2,REDIS_CMD_INLINE},
590 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
591 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
592 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
593 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
594 {"lrem",lremCommand,4,REDIS_CMD_BULK},
595 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
596 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
597 {"srem",sremCommand,3,REDIS_CMD_BULK},
598 {"smove",smoveCommand,4,REDIS_CMD_BULK},
599 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
600 {"scard",scardCommand,2,REDIS_CMD_INLINE},
601 {"spop",spopCommand,2,REDIS_CMD_INLINE},
602 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
603 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
604 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
605 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
606 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
607 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
608 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
609 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
610 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
611 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
612 {"zrem",zremCommand,3,REDIS_CMD_BULK},
613 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
614 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
615 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
616 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
617 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
618 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
619 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
620 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
621 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
622 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
623 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
624 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
625 {"select",selectCommand,2,REDIS_CMD_INLINE},
626 {"move",moveCommand,3,REDIS_CMD_INLINE},
627 {"rename",renameCommand,3,REDIS_CMD_INLINE},
628 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
629 {"expire",expireCommand,3,REDIS_CMD_INLINE},
630 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
631 {"keys",keysCommand,2,REDIS_CMD_INLINE},
632 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
633 {"auth",authCommand,2,REDIS_CMD_INLINE},
634 {"ping",pingCommand,1,REDIS_CMD_INLINE},
635 {"echo",echoCommand,2,REDIS_CMD_BULK},
636 {"save",saveCommand,1,REDIS_CMD_INLINE},
637 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
638 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
639 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
640 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
641 {"type",typeCommand,2,REDIS_CMD_INLINE},
642 {"multi",multiCommand,1,REDIS_CMD_INLINE},
643 {"exec",execCommand,1,REDIS_CMD_INLINE},
644 {"sync",syncCommand,1,REDIS_CMD_INLINE},
645 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
646 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
647 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
648 {"info",infoCommand,1,REDIS_CMD_INLINE},
649 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
650 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
651 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
652 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
653 {NULL,NULL,0,0}
654 };
655
656 /*============================ Utility functions ============================ */
657
658 /* Glob-style pattern matching. */
659 int stringmatchlen(const char *pattern, int patternLen,
660 const char *string, int stringLen, int nocase)
661 {
662 while(patternLen) {
663 switch(pattern[0]) {
664 case '*':
665 while (pattern[1] == '*') {
666 pattern++;
667 patternLen--;
668 }
669 if (patternLen == 1)
670 return 1; /* match */
671 while(stringLen) {
672 if (stringmatchlen(pattern+1, patternLen-1,
673 string, stringLen, nocase))
674 return 1; /* match */
675 string++;
676 stringLen--;
677 }
678 return 0; /* no match */
679 break;
680 case '?':
681 if (stringLen == 0)
682 return 0; /* no match */
683 string++;
684 stringLen--;
685 break;
686 case '[':
687 {
688 int not, match;
689
690 pattern++;
691 patternLen--;
692 not = pattern[0] == '^';
693 if (not) {
694 pattern++;
695 patternLen--;
696 }
697 match = 0;
698 while(1) {
699 if (pattern[0] == '\\') {
700 pattern++;
701 patternLen--;
702 if (pattern[0] == string[0])
703 match = 1;
704 } else if (pattern[0] == ']') {
705 break;
706 } else if (patternLen == 0) {
707 pattern--;
708 patternLen++;
709 break;
710 } else if (pattern[1] == '-' && patternLen >= 3) {
711 int start = pattern[0];
712 int end = pattern[2];
713 int c = string[0];
714 if (start > end) {
715 int t = start;
716 start = end;
717 end = t;
718 }
719 if (nocase) {
720 start = tolower(start);
721 end = tolower(end);
722 c = tolower(c);
723 }
724 pattern += 2;
725 patternLen -= 2;
726 if (c >= start && c <= end)
727 match = 1;
728 } else {
729 if (!nocase) {
730 if (pattern[0] == string[0])
731 match = 1;
732 } else {
733 if (tolower((int)pattern[0]) == tolower((int)string[0]))
734 match = 1;
735 }
736 }
737 pattern++;
738 patternLen--;
739 }
740 if (not)
741 match = !match;
742 if (!match)
743 return 0; /* no match */
744 string++;
745 stringLen--;
746 break;
747 }
748 case '\\':
749 if (patternLen >= 2) {
750 pattern++;
751 patternLen--;
752 }
753 /* fall through */
754 default:
755 if (!nocase) {
756 if (pattern[0] != string[0])
757 return 0; /* no match */
758 } else {
759 if (tolower((int)pattern[0]) != tolower((int)string[0]))
760 return 0; /* no match */
761 }
762 string++;
763 stringLen--;
764 break;
765 }
766 pattern++;
767 patternLen--;
768 if (stringLen == 0) {
769 while(*pattern == '*') {
770 pattern++;
771 patternLen--;
772 }
773 break;
774 }
775 }
776 if (patternLen == 0 && stringLen == 0)
777 return 1;
778 return 0;
779 }
780
781 static void redisLog(int level, const char *fmt, ...) {
782 va_list ap;
783 FILE *fp;
784
785 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
786 if (!fp) return;
787
788 va_start(ap, fmt);
789 if (level >= server.verbosity) {
790 char *c = ".-*";
791 char buf[64];
792 time_t now;
793
794 now = time(NULL);
795 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
796 fprintf(fp,"%s %c ",buf,c[level]);
797 vfprintf(fp, fmt, ap);
798 fprintf(fp,"\n");
799 fflush(fp);
800 }
801 va_end(ap);
802
803 if (server.logfile) fclose(fp);
804 }
805
806 /*====================== Hash table type implementation ==================== */
807
808 /* This is an hash table type that uses the SDS dynamic strings libary as
809 * keys and radis objects as values (objects can hold SDS strings,
810 * lists, sets). */
811
812 static void dictVanillaFree(void *privdata, void *val)
813 {
814 DICT_NOTUSED(privdata);
815 zfree(val);
816 }
817
818 static void dictListDestructor(void *privdata, void *val)
819 {
820 DICT_NOTUSED(privdata);
821 listRelease((list*)val);
822 }
823
824 static int sdsDictKeyCompare(void *privdata, const void *key1,
825 const void *key2)
826 {
827 int l1,l2;
828 DICT_NOTUSED(privdata);
829
830 l1 = sdslen((sds)key1);
831 l2 = sdslen((sds)key2);
832 if (l1 != l2) return 0;
833 return memcmp(key1, key2, l1) == 0;
834 }
835
836 static void dictRedisObjectDestructor(void *privdata, void *val)
837 {
838 DICT_NOTUSED(privdata);
839
840 if (val == NULL) return; /* Values of swapped out keys as set to NULL */
841 decrRefCount(val);
842 }
843
844 static int dictObjKeyCompare(void *privdata, const void *key1,
845 const void *key2)
846 {
847 const robj *o1 = key1, *o2 = key2;
848 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
849 }
850
851 static unsigned int dictObjHash(const void *key) {
852 const robj *o = key;
853 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
854 }
855
856 static int dictEncObjKeyCompare(void *privdata, const void *key1,
857 const void *key2)
858 {
859 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
860 int cmp;
861
862 o1 = getDecodedObject(o1);
863 o2 = getDecodedObject(o2);
864 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
865 decrRefCount(o1);
866 decrRefCount(o2);
867 return cmp;
868 }
869
870 static unsigned int dictEncObjHash(const void *key) {
871 robj *o = (robj*) key;
872
873 o = getDecodedObject(o);
874 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
875 decrRefCount(o);
876 return hash;
877 }
878
879 static dictType setDictType = {
880 dictEncObjHash, /* hash function */
881 NULL, /* key dup */
882 NULL, /* val dup */
883 dictEncObjKeyCompare, /* key compare */
884 dictRedisObjectDestructor, /* key destructor */
885 NULL /* val destructor */
886 };
887
888 static dictType zsetDictType = {
889 dictEncObjHash, /* hash function */
890 NULL, /* key dup */
891 NULL, /* val dup */
892 dictEncObjKeyCompare, /* key compare */
893 dictRedisObjectDestructor, /* key destructor */
894 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
895 };
896
897 static dictType hashDictType = {
898 dictObjHash, /* hash function */
899 NULL, /* key dup */
900 NULL, /* val dup */
901 dictObjKeyCompare, /* key compare */
902 dictRedisObjectDestructor, /* key destructor */
903 dictRedisObjectDestructor /* val destructor */
904 };
905
906 /* Keylist hash table type has unencoded redis objects as keys and
907 * lists as values. It's used for blocking operations (BLPOP) */
908 static dictType keylistDictType = {
909 dictObjHash, /* hash function */
910 NULL, /* key dup */
911 NULL, /* val dup */
912 dictObjKeyCompare, /* key compare */
913 dictRedisObjectDestructor, /* key destructor */
914 dictListDestructor /* val destructor */
915 };
916
917 /* ========================= Random utility functions ======================= */
918
919 /* Redis generally does not try to recover from out of memory conditions
920 * when allocating objects or strings, it is not clear if it will be possible
921 * to report this condition to the client since the networking layer itself
922 * is based on heap allocation for send buffers, so we simply abort.
923 * At least the code will be simpler to read... */
924 static void oom(const char *msg) {
925 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
926 sleep(1);
927 abort();
928 }
929
930 /* ====================== Redis server networking stuff ===================== */
931 static void closeTimedoutClients(void) {
932 redisClient *c;
933 listNode *ln;
934 time_t now = time(NULL);
935
936 listRewind(server.clients);
937 while ((ln = listYield(server.clients)) != NULL) {
938 c = listNodeValue(ln);
939 if (server.maxidletime &&
940 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
941 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
942 (now - c->lastinteraction > server.maxidletime))
943 {
944 redisLog(REDIS_DEBUG,"Closing idle client");
945 freeClient(c);
946 } else if (c->flags & REDIS_BLOCKED) {
947 if (c->blockingto != 0 && c->blockingto < now) {
948 addReply(c,shared.nullmultibulk);
949 unblockClient(c);
950 }
951 }
952 }
953 }
954
955 static int htNeedsResize(dict *dict) {
956 long long size, used;
957
958 size = dictSlots(dict);
959 used = dictSize(dict);
960 return (size && used && size > DICT_HT_INITIAL_SIZE &&
961 (used*100/size < REDIS_HT_MINFILL));
962 }
963
964 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
965 * we resize the hash table to save memory */
966 static void tryResizeHashTables(void) {
967 int j;
968
969 for (j = 0; j < server.dbnum; j++) {
970 if (htNeedsResize(server.db[j].dict)) {
971 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
972 dictResize(server.db[j].dict);
973 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
974 }
975 if (htNeedsResize(server.db[j].expires))
976 dictResize(server.db[j].expires);
977 }
978 }
979
980 /* A background saving child (BGSAVE) terminated its work. Handle this. */
981 void backgroundSaveDoneHandler(int statloc) {
982 int exitcode = WEXITSTATUS(statloc);
983 int bysignal = WIFSIGNALED(statloc);
984
985 if (!bysignal && exitcode == 0) {
986 redisLog(REDIS_NOTICE,
987 "Background saving terminated with success");
988 server.dirty = 0;
989 server.lastsave = time(NULL);
990 } else if (!bysignal && exitcode != 0) {
991 redisLog(REDIS_WARNING, "Background saving error");
992 } else {
993 redisLog(REDIS_WARNING,
994 "Background saving terminated by signal");
995 rdbRemoveTempFile(server.bgsavechildpid);
996 }
997 server.bgsavechildpid = -1;
998 /* Possibly there are slaves waiting for a BGSAVE in order to be served
999 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
1000 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
1001 }
1002
1003 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1004 * Handle this. */
1005 void backgroundRewriteDoneHandler(int statloc) {
1006 int exitcode = WEXITSTATUS(statloc);
1007 int bysignal = WIFSIGNALED(statloc);
1008
1009 if (!bysignal && exitcode == 0) {
1010 int fd;
1011 char tmpfile[256];
1012
1013 redisLog(REDIS_NOTICE,
1014 "Background append only file rewriting terminated with success");
1015 /* Now it's time to flush the differences accumulated by the parent */
1016 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1017 fd = open(tmpfile,O_WRONLY|O_APPEND);
1018 if (fd == -1) {
1019 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1020 goto cleanup;
1021 }
1022 /* Flush our data... */
1023 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1024 (signed) sdslen(server.bgrewritebuf)) {
1025 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1026 close(fd);
1027 goto cleanup;
1028 }
1029 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1030 /* Now our work is to rename the temp file into the stable file. And
1031 * switch the file descriptor used by the server for append only. */
1032 if (rename(tmpfile,server.appendfilename) == -1) {
1033 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1034 close(fd);
1035 goto cleanup;
1036 }
1037 /* Mission completed... almost */
1038 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1039 if (server.appendfd != -1) {
1040 /* If append only is actually enabled... */
1041 close(server.appendfd);
1042 server.appendfd = fd;
1043 fsync(fd);
1044 server.appendseldb = -1; /* Make sure it will issue SELECT */
1045 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1046 } else {
1047 /* If append only is disabled we just generate a dump in this
1048 * format. Why not? */
1049 close(fd);
1050 }
1051 } else if (!bysignal && exitcode != 0) {
1052 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1053 } else {
1054 redisLog(REDIS_WARNING,
1055 "Background append only file rewriting terminated by signal");
1056 }
1057 cleanup:
1058 sdsfree(server.bgrewritebuf);
1059 server.bgrewritebuf = sdsempty();
1060 aofRemoveTempFile(server.bgrewritechildpid);
1061 server.bgrewritechildpid = -1;
1062 }
1063
1064 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1065 int j, loops = server.cronloops++;
1066 REDIS_NOTUSED(eventLoop);
1067 REDIS_NOTUSED(id);
1068 REDIS_NOTUSED(clientData);
1069
1070 /* We take a cached value of the unix time in the global state because
1071 * with virtual memory and aging there is to store the current time
1072 * in objects at every object access, and accuracy is not needed.
1073 * To access a global var is faster than calling time(NULL) */
1074 server.unixtime = time(NULL);
1075
1076 /* Update the global state with the amount of used memory */
1077 server.usedmemory = zmalloc_used_memory();
1078
1079 /* Show some info about non-empty databases */
1080 for (j = 0; j < server.dbnum; j++) {
1081 long long size, used, vkeys;
1082
1083 size = dictSlots(server.db[j].dict);
1084 used = dictSize(server.db[j].dict);
1085 vkeys = dictSize(server.db[j].expires);
1086 if (!(loops % 5) && (used || vkeys)) {
1087 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1088 /* dictPrintStats(server.dict); */
1089 }
1090 }
1091
1092 /* We don't want to resize the hash tables while a bacground saving
1093 * is in progress: the saving child is created using fork() that is
1094 * implemented with a copy-on-write semantic in most modern systems, so
1095 * if we resize the HT while there is the saving child at work actually
1096 * a lot of memory movements in the parent will cause a lot of pages
1097 * copied. */
1098 if (server.bgsavechildpid == -1) tryResizeHashTables();
1099
1100 /* Show information about connected clients */
1101 if (!(loops % 5)) {
1102 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1103 listLength(server.clients)-listLength(server.slaves),
1104 listLength(server.slaves),
1105 server.usedmemory,
1106 dictSize(server.sharingpool));
1107 }
1108
1109 /* Close connections of timedout clients */
1110 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1111 closeTimedoutClients();
1112
1113 /* Check if a background saving or AOF rewrite in progress terminated */
1114 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1115 int statloc;
1116 pid_t pid;
1117
1118 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1119 if (pid == server.bgsavechildpid) {
1120 backgroundSaveDoneHandler(statloc);
1121 } else {
1122 backgroundRewriteDoneHandler(statloc);
1123 }
1124 }
1125 } else {
1126 /* If there is not a background saving in progress check if
1127 * we have to save now */
1128 time_t now = time(NULL);
1129 for (j = 0; j < server.saveparamslen; j++) {
1130 struct saveparam *sp = server.saveparams+j;
1131
1132 if (server.dirty >= sp->changes &&
1133 now-server.lastsave > sp->seconds) {
1134 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1135 sp->changes, sp->seconds);
1136 rdbSaveBackground(server.dbfilename);
1137 break;
1138 }
1139 }
1140 }
1141
1142 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1143 * will use few CPU cycles if there are few expiring keys, otherwise
1144 * it will get more aggressive to avoid that too much memory is used by
1145 * keys that can be removed from the keyspace. */
1146 for (j = 0; j < server.dbnum; j++) {
1147 int expired;
1148 redisDb *db = server.db+j;
1149
1150 /* Continue to expire if at the end of the cycle more than 25%
1151 * of the keys were expired. */
1152 do {
1153 int num = dictSize(db->expires);
1154 time_t now = time(NULL);
1155
1156 expired = 0;
1157 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1158 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1159 while (num--) {
1160 dictEntry *de;
1161 time_t t;
1162
1163 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1164 t = (time_t) dictGetEntryVal(de);
1165 if (now > t) {
1166 deleteKey(db,dictGetEntryKey(de));
1167 expired++;
1168 }
1169 }
1170 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1171 }
1172
1173 /* Check if we should connect to a MASTER */
1174 if (server.replstate == REDIS_REPL_CONNECT) {
1175 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1176 if (syncWithMaster() == REDIS_OK) {
1177 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1178 }
1179 }
1180 return 1000;
1181 }
1182
1183 static void createSharedObjects(void) {
1184 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1185 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1186 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1187 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1188 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1189 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1190 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1191 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1192 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1193 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1194 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1195 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1196 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1197 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1198 "-ERR no such key\r\n"));
1199 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1200 "-ERR syntax error\r\n"));
1201 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1202 "-ERR source and destination objects are the same\r\n"));
1203 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1204 "-ERR index out of range\r\n"));
1205 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1206 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1207 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1208 shared.select0 = createStringObject("select 0\r\n",10);
1209 shared.select1 = createStringObject("select 1\r\n",10);
1210 shared.select2 = createStringObject("select 2\r\n",10);
1211 shared.select3 = createStringObject("select 3\r\n",10);
1212 shared.select4 = createStringObject("select 4\r\n",10);
1213 shared.select5 = createStringObject("select 5\r\n",10);
1214 shared.select6 = createStringObject("select 6\r\n",10);
1215 shared.select7 = createStringObject("select 7\r\n",10);
1216 shared.select8 = createStringObject("select 8\r\n",10);
1217 shared.select9 = createStringObject("select 9\r\n",10);
1218 }
1219
1220 static void appendServerSaveParams(time_t seconds, int changes) {
1221 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1222 server.saveparams[server.saveparamslen].seconds = seconds;
1223 server.saveparams[server.saveparamslen].changes = changes;
1224 server.saveparamslen++;
1225 }
1226
1227 static void resetServerSaveParams() {
1228 zfree(server.saveparams);
1229 server.saveparams = NULL;
1230 server.saveparamslen = 0;
1231 }
1232
1233 static void initServerConfig() {
1234 server.dbnum = REDIS_DEFAULT_DBNUM;
1235 server.port = REDIS_SERVERPORT;
1236 server.verbosity = REDIS_DEBUG;
1237 server.maxidletime = REDIS_MAXIDLETIME;
1238 server.saveparams = NULL;
1239 server.logfile = NULL; /* NULL = log on standard output */
1240 server.bindaddr = NULL;
1241 server.glueoutputbuf = 1;
1242 server.daemonize = 0;
1243 server.appendonly = 0;
1244 server.appendfsync = APPENDFSYNC_ALWAYS;
1245 server.lastfsync = time(NULL);
1246 server.appendfd = -1;
1247 server.appendseldb = -1; /* Make sure the first time will not match */
1248 server.pidfile = "/var/run/redis.pid";
1249 server.dbfilename = "dump.rdb";
1250 server.appendfilename = "appendonly.aof";
1251 server.requirepass = NULL;
1252 server.shareobjects = 0;
1253 server.rdbcompression = 1;
1254 server.sharingpoolsize = 1024;
1255 server.maxclients = 0;
1256 server.blockedclients = 0;
1257 server.maxmemory = 0;
1258 server.vm_enabled = 0;
1259 server.vm_page_size = 256; /* 256 bytes per page */
1260 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1261 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1262
1263 resetServerSaveParams();
1264
1265 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1266 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1267 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1268 /* Replication related */
1269 server.isslave = 0;
1270 server.masterauth = NULL;
1271 server.masterhost = NULL;
1272 server.masterport = 6379;
1273 server.master = NULL;
1274 server.replstate = REDIS_REPL_NONE;
1275
1276 /* Double constants initialization */
1277 R_Zero = 0.0;
1278 R_PosInf = 1.0/R_Zero;
1279 R_NegInf = -1.0/R_Zero;
1280 R_Nan = R_Zero/R_Zero;
1281 }
1282
1283 static void initServer() {
1284 int j;
1285
1286 signal(SIGHUP, SIG_IGN);
1287 signal(SIGPIPE, SIG_IGN);
1288 setupSigSegvAction();
1289
1290 server.clients = listCreate();
1291 server.slaves = listCreate();
1292 server.monitors = listCreate();
1293 server.objfreelist = listCreate();
1294 createSharedObjects();
1295 server.el = aeCreateEventLoop();
1296 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1297 server.sharingpool = dictCreate(&setDictType,NULL);
1298 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1299 if (server.fd == -1) {
1300 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1301 exit(1);
1302 }
1303 for (j = 0; j < server.dbnum; j++) {
1304 server.db[j].dict = dictCreate(&hashDictType,NULL);
1305 server.db[j].expires = dictCreate(&setDictType,NULL);
1306 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1307 server.db[j].id = j;
1308 }
1309 server.cronloops = 0;
1310 server.bgsavechildpid = -1;
1311 server.bgrewritechildpid = -1;
1312 server.bgrewritebuf = sdsempty();
1313 server.lastsave = time(NULL);
1314 server.dirty = 0;
1315 server.usedmemory = 0;
1316 server.stat_numcommands = 0;
1317 server.stat_numconnections = 0;
1318 server.stat_starttime = time(NULL);
1319 server.unixtime = time(NULL);
1320 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1321
1322 if (server.appendonly) {
1323 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1324 if (server.appendfd == -1) {
1325 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1326 strerror(errno));
1327 exit(1);
1328 }
1329 }
1330
1331 if (server.vm_enabled) vmInit();
1332 }
1333
1334 /* Empty the whole database */
1335 static long long emptyDb() {
1336 int j;
1337 long long removed = 0;
1338
1339 for (j = 0; j < server.dbnum; j++) {
1340 removed += dictSize(server.db[j].dict);
1341 dictEmpty(server.db[j].dict);
1342 dictEmpty(server.db[j].expires);
1343 }
1344 return removed;
1345 }
1346
1347 static int yesnotoi(char *s) {
1348 if (!strcasecmp(s,"yes")) return 1;
1349 else if (!strcasecmp(s,"no")) return 0;
1350 else return -1;
1351 }
1352
1353 /* I agree, this is a very rudimental way to load a configuration...
1354 will improve later if the config gets more complex */
1355 static void loadServerConfig(char *filename) {
1356 FILE *fp;
1357 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1358 int linenum = 0;
1359 sds line = NULL;
1360
1361 if (filename[0] == '-' && filename[1] == '\0')
1362 fp = stdin;
1363 else {
1364 if ((fp = fopen(filename,"r")) == NULL) {
1365 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1366 exit(1);
1367 }
1368 }
1369
1370 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1371 sds *argv;
1372 int argc, j;
1373
1374 linenum++;
1375 line = sdsnew(buf);
1376 line = sdstrim(line," \t\r\n");
1377
1378 /* Skip comments and blank lines*/
1379 if (line[0] == '#' || line[0] == '\0') {
1380 sdsfree(line);
1381 continue;
1382 }
1383
1384 /* Split into arguments */
1385 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1386 sdstolower(argv[0]);
1387
1388 /* Execute config directives */
1389 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1390 server.maxidletime = atoi(argv[1]);
1391 if (server.maxidletime < 0) {
1392 err = "Invalid timeout value"; goto loaderr;
1393 }
1394 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1395 server.port = atoi(argv[1]);
1396 if (server.port < 1 || server.port > 65535) {
1397 err = "Invalid port"; goto loaderr;
1398 }
1399 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1400 server.bindaddr = zstrdup(argv[1]);
1401 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1402 int seconds = atoi(argv[1]);
1403 int changes = atoi(argv[2]);
1404 if (seconds < 1 || changes < 0) {
1405 err = "Invalid save parameters"; goto loaderr;
1406 }
1407 appendServerSaveParams(seconds,changes);
1408 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1409 if (chdir(argv[1]) == -1) {
1410 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1411 argv[1], strerror(errno));
1412 exit(1);
1413 }
1414 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1415 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1416 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1417 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1418 else {
1419 err = "Invalid log level. Must be one of debug, notice, warning";
1420 goto loaderr;
1421 }
1422 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1423 FILE *logfp;
1424
1425 server.logfile = zstrdup(argv[1]);
1426 if (!strcasecmp(server.logfile,"stdout")) {
1427 zfree(server.logfile);
1428 server.logfile = NULL;
1429 }
1430 if (server.logfile) {
1431 /* Test if we are able to open the file. The server will not
1432 * be able to abort just for this problem later... */
1433 logfp = fopen(server.logfile,"a");
1434 if (logfp == NULL) {
1435 err = sdscatprintf(sdsempty(),
1436 "Can't open the log file: %s", strerror(errno));
1437 goto loaderr;
1438 }
1439 fclose(logfp);
1440 }
1441 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1442 server.dbnum = atoi(argv[1]);
1443 if (server.dbnum < 1) {
1444 err = "Invalid number of databases"; goto loaderr;
1445 }
1446 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1447 server.maxclients = atoi(argv[1]);
1448 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1449 server.maxmemory = strtoll(argv[1], NULL, 10);
1450 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1451 server.masterhost = sdsnew(argv[1]);
1452 server.masterport = atoi(argv[2]);
1453 server.replstate = REDIS_REPL_CONNECT;
1454 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1455 server.masterauth = zstrdup(argv[1]);
1456 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1457 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1458 err = "argument must be 'yes' or 'no'"; goto loaderr;
1459 }
1460 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1461 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1462 err = "argument must be 'yes' or 'no'"; goto loaderr;
1463 }
1464 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1465 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1466 err = "argument must be 'yes' or 'no'"; goto loaderr;
1467 }
1468 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1469 server.sharingpoolsize = atoi(argv[1]);
1470 if (server.sharingpoolsize < 1) {
1471 err = "invalid object sharing pool size"; goto loaderr;
1472 }
1473 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1474 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1475 err = "argument must be 'yes' or 'no'"; goto loaderr;
1476 }
1477 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1478 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1479 err = "argument must be 'yes' or 'no'"; goto loaderr;
1480 }
1481 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1482 if (!strcasecmp(argv[1],"no")) {
1483 server.appendfsync = APPENDFSYNC_NO;
1484 } else if (!strcasecmp(argv[1],"always")) {
1485 server.appendfsync = APPENDFSYNC_ALWAYS;
1486 } else if (!strcasecmp(argv[1],"everysec")) {
1487 server.appendfsync = APPENDFSYNC_EVERYSEC;
1488 } else {
1489 err = "argument must be 'no', 'always' or 'everysec'";
1490 goto loaderr;
1491 }
1492 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1493 server.requirepass = zstrdup(argv[1]);
1494 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1495 server.pidfile = zstrdup(argv[1]);
1496 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1497 server.dbfilename = zstrdup(argv[1]);
1498 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1499 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1500 err = "argument must be 'yes' or 'no'"; goto loaderr;
1501 }
1502 } else {
1503 err = "Bad directive or wrong number of arguments"; goto loaderr;
1504 }
1505 for (j = 0; j < argc; j++)
1506 sdsfree(argv[j]);
1507 zfree(argv);
1508 sdsfree(line);
1509 }
1510 if (fp != stdin) fclose(fp);
1511 return;
1512
1513 loaderr:
1514 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1515 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1516 fprintf(stderr, ">>> '%s'\n", line);
1517 fprintf(stderr, "%s\n", err);
1518 exit(1);
1519 }
1520
1521 static void freeClientArgv(redisClient *c) {
1522 int j;
1523
1524 for (j = 0; j < c->argc; j++)
1525 decrRefCount(c->argv[j]);
1526 for (j = 0; j < c->mbargc; j++)
1527 decrRefCount(c->mbargv[j]);
1528 c->argc = 0;
1529 c->mbargc = 0;
1530 }
1531
1532 static void freeClient(redisClient *c) {
1533 listNode *ln;
1534
1535 /* Note that if the client we are freeing is blocked into a blocking
1536 * call, we have to set querybuf to NULL *before* to call unblockClient()
1537 * to avoid processInputBuffer() will get called. Also it is important
1538 * to remove the file events after this, because this call adds
1539 * the READABLE event. */
1540 sdsfree(c->querybuf);
1541 c->querybuf = NULL;
1542 if (c->flags & REDIS_BLOCKED)
1543 unblockClient(c);
1544
1545 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1546 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1547 listRelease(c->reply);
1548 freeClientArgv(c);
1549 close(c->fd);
1550 ln = listSearchKey(server.clients,c);
1551 redisAssert(ln != NULL);
1552 listDelNode(server.clients,ln);
1553 if (c->flags & REDIS_SLAVE) {
1554 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1555 close(c->repldbfd);
1556 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1557 ln = listSearchKey(l,c);
1558 redisAssert(ln != NULL);
1559 listDelNode(l,ln);
1560 }
1561 if (c->flags & REDIS_MASTER) {
1562 server.master = NULL;
1563 server.replstate = REDIS_REPL_CONNECT;
1564 }
1565 zfree(c->argv);
1566 zfree(c->mbargv);
1567 freeClientMultiState(c);
1568 zfree(c);
1569 }
1570
1571 #define GLUEREPLY_UP_TO (1024)
1572 static void glueReplyBuffersIfNeeded(redisClient *c) {
1573 int copylen = 0;
1574 char buf[GLUEREPLY_UP_TO];
1575 listNode *ln;
1576 robj *o;
1577
1578 listRewind(c->reply);
1579 while((ln = listYield(c->reply))) {
1580 int objlen;
1581
1582 o = ln->value;
1583 objlen = sdslen(o->ptr);
1584 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1585 memcpy(buf+copylen,o->ptr,objlen);
1586 copylen += objlen;
1587 listDelNode(c->reply,ln);
1588 } else {
1589 if (copylen == 0) return;
1590 break;
1591 }
1592 }
1593 /* Now the output buffer is empty, add the new single element */
1594 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1595 listAddNodeHead(c->reply,o);
1596 }
1597
1598 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1599 redisClient *c = privdata;
1600 int nwritten = 0, totwritten = 0, objlen;
1601 robj *o;
1602 REDIS_NOTUSED(el);
1603 REDIS_NOTUSED(mask);
1604
1605 /* Use writev() if we have enough buffers to send */
1606 if (!server.glueoutputbuf &&
1607 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1608 !(c->flags & REDIS_MASTER))
1609 {
1610 sendReplyToClientWritev(el, fd, privdata, mask);
1611 return;
1612 }
1613
1614 while(listLength(c->reply)) {
1615 if (server.glueoutputbuf && listLength(c->reply) > 1)
1616 glueReplyBuffersIfNeeded(c);
1617
1618 o = listNodeValue(listFirst(c->reply));
1619 objlen = sdslen(o->ptr);
1620
1621 if (objlen == 0) {
1622 listDelNode(c->reply,listFirst(c->reply));
1623 continue;
1624 }
1625
1626 if (c->flags & REDIS_MASTER) {
1627 /* Don't reply to a master */
1628 nwritten = objlen - c->sentlen;
1629 } else {
1630 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1631 if (nwritten <= 0) break;
1632 }
1633 c->sentlen += nwritten;
1634 totwritten += nwritten;
1635 /* If we fully sent the object on head go to the next one */
1636 if (c->sentlen == objlen) {
1637 listDelNode(c->reply,listFirst(c->reply));
1638 c->sentlen = 0;
1639 }
1640 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1641 * bytes, in a single threaded server it's a good idea to serve
1642 * other clients as well, even if a very large request comes from
1643 * super fast link that is always able to accept data (in real world
1644 * scenario think about 'KEYS *' against the loopback interfae) */
1645 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1646 }
1647 if (nwritten == -1) {
1648 if (errno == EAGAIN) {
1649 nwritten = 0;
1650 } else {
1651 redisLog(REDIS_DEBUG,
1652 "Error writing to client: %s", strerror(errno));
1653 freeClient(c);
1654 return;
1655 }
1656 }
1657 if (totwritten > 0) c->lastinteraction = time(NULL);
1658 if (listLength(c->reply) == 0) {
1659 c->sentlen = 0;
1660 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1661 }
1662 }
1663
1664 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1665 {
1666 redisClient *c = privdata;
1667 int nwritten = 0, totwritten = 0, objlen, willwrite;
1668 robj *o;
1669 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1670 int offset, ion = 0;
1671 REDIS_NOTUSED(el);
1672 REDIS_NOTUSED(mask);
1673
1674 listNode *node;
1675 while (listLength(c->reply)) {
1676 offset = c->sentlen;
1677 ion = 0;
1678 willwrite = 0;
1679
1680 /* fill-in the iov[] array */
1681 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1682 o = listNodeValue(node);
1683 objlen = sdslen(o->ptr);
1684
1685 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1686 break;
1687
1688 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1689 break; /* no more iovecs */
1690
1691 iov[ion].iov_base = ((char*)o->ptr) + offset;
1692 iov[ion].iov_len = objlen - offset;
1693 willwrite += objlen - offset;
1694 offset = 0; /* just for the first item */
1695 ion++;
1696 }
1697
1698 if(willwrite == 0)
1699 break;
1700
1701 /* write all collected blocks at once */
1702 if((nwritten = writev(fd, iov, ion)) < 0) {
1703 if (errno != EAGAIN) {
1704 redisLog(REDIS_DEBUG,
1705 "Error writing to client: %s", strerror(errno));
1706 freeClient(c);
1707 return;
1708 }
1709 break;
1710 }
1711
1712 totwritten += nwritten;
1713 offset = c->sentlen;
1714
1715 /* remove written robjs from c->reply */
1716 while (nwritten && listLength(c->reply)) {
1717 o = listNodeValue(listFirst(c->reply));
1718 objlen = sdslen(o->ptr);
1719
1720 if(nwritten >= objlen - offset) {
1721 listDelNode(c->reply, listFirst(c->reply));
1722 nwritten -= objlen - offset;
1723 c->sentlen = 0;
1724 } else {
1725 /* partial write */
1726 c->sentlen += nwritten;
1727 break;
1728 }
1729 offset = 0;
1730 }
1731 }
1732
1733 if (totwritten > 0)
1734 c->lastinteraction = time(NULL);
1735
1736 if (listLength(c->reply) == 0) {
1737 c->sentlen = 0;
1738 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1739 }
1740 }
1741
1742 static struct redisCommand *lookupCommand(char *name) {
1743 int j = 0;
1744 while(cmdTable[j].name != NULL) {
1745 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1746 j++;
1747 }
1748 return NULL;
1749 }
1750
1751 /* resetClient prepare the client to process the next command */
1752 static void resetClient(redisClient *c) {
1753 freeClientArgv(c);
1754 c->bulklen = -1;
1755 c->multibulk = 0;
1756 }
1757
1758 /* Call() is the core of Redis execution of a command */
1759 static void call(redisClient *c, struct redisCommand *cmd) {
1760 long long dirty;
1761
1762 dirty = server.dirty;
1763 cmd->proc(c);
1764 if (server.appendonly && server.dirty-dirty)
1765 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1766 if (server.dirty-dirty && listLength(server.slaves))
1767 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1768 if (listLength(server.monitors))
1769 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1770 server.stat_numcommands++;
1771 }
1772
1773 /* If this function gets called we already read a whole
1774 * command, argments are in the client argv/argc fields.
1775 * processCommand() execute the command or prepare the
1776 * server for a bulk read from the client.
1777 *
1778 * If 1 is returned the client is still alive and valid and
1779 * and other operations can be performed by the caller. Otherwise
1780 * if 0 is returned the client was destroied (i.e. after QUIT). */
1781 static int processCommand(redisClient *c) {
1782 struct redisCommand *cmd;
1783
1784 /* Free some memory if needed (maxmemory setting) */
1785 if (server.maxmemory) freeMemoryIfNeeded();
1786
1787 /* Handle the multi bulk command type. This is an alternative protocol
1788 * supported by Redis in order to receive commands that are composed of
1789 * multiple binary-safe "bulk" arguments. The latency of processing is
1790 * a bit higher but this allows things like multi-sets, so if this
1791 * protocol is used only for MSET and similar commands this is a big win. */
1792 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1793 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1794 if (c->multibulk <= 0) {
1795 resetClient(c);
1796 return 1;
1797 } else {
1798 decrRefCount(c->argv[c->argc-1]);
1799 c->argc--;
1800 return 1;
1801 }
1802 } else if (c->multibulk) {
1803 if (c->bulklen == -1) {
1804 if (((char*)c->argv[0]->ptr)[0] != '$') {
1805 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1806 resetClient(c);
1807 return 1;
1808 } else {
1809 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1810 decrRefCount(c->argv[0]);
1811 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1812 c->argc--;
1813 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1814 resetClient(c);
1815 return 1;
1816 }
1817 c->argc--;
1818 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1819 return 1;
1820 }
1821 } else {
1822 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1823 c->mbargv[c->mbargc] = c->argv[0];
1824 c->mbargc++;
1825 c->argc--;
1826 c->multibulk--;
1827 if (c->multibulk == 0) {
1828 robj **auxargv;
1829 int auxargc;
1830
1831 /* Here we need to swap the multi-bulk argc/argv with the
1832 * normal argc/argv of the client structure. */
1833 auxargv = c->argv;
1834 c->argv = c->mbargv;
1835 c->mbargv = auxargv;
1836
1837 auxargc = c->argc;
1838 c->argc = c->mbargc;
1839 c->mbargc = auxargc;
1840
1841 /* We need to set bulklen to something different than -1
1842 * in order for the code below to process the command without
1843 * to try to read the last argument of a bulk command as
1844 * a special argument. */
1845 c->bulklen = 0;
1846 /* continue below and process the command */
1847 } else {
1848 c->bulklen = -1;
1849 return 1;
1850 }
1851 }
1852 }
1853 /* -- end of multi bulk commands processing -- */
1854
1855 /* The QUIT command is handled as a special case. Normal command
1856 * procs are unable to close the client connection safely */
1857 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1858 freeClient(c);
1859 return 0;
1860 }
1861 cmd = lookupCommand(c->argv[0]->ptr);
1862 if (!cmd) {
1863 addReplySds(c,
1864 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1865 (char*)c->argv[0]->ptr));
1866 resetClient(c);
1867 return 1;
1868 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1869 (c->argc < -cmd->arity)) {
1870 addReplySds(c,
1871 sdscatprintf(sdsempty(),
1872 "-ERR wrong number of arguments for '%s' command\r\n",
1873 cmd->name));
1874 resetClient(c);
1875 return 1;
1876 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1877 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1878 resetClient(c);
1879 return 1;
1880 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1881 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1882
1883 decrRefCount(c->argv[c->argc-1]);
1884 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1885 c->argc--;
1886 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1887 resetClient(c);
1888 return 1;
1889 }
1890 c->argc--;
1891 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1892 /* It is possible that the bulk read is already in the
1893 * buffer. Check this condition and handle it accordingly.
1894 * This is just a fast path, alternative to call processInputBuffer().
1895 * It's a good idea since the code is small and this condition
1896 * happens most of the times. */
1897 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1898 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1899 c->argc++;
1900 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1901 } else {
1902 return 1;
1903 }
1904 }
1905 /* Let's try to share objects on the command arguments vector */
1906 if (server.shareobjects) {
1907 int j;
1908 for(j = 1; j < c->argc; j++)
1909 c->argv[j] = tryObjectSharing(c->argv[j]);
1910 }
1911 /* Let's try to encode the bulk object to save space. */
1912 if (cmd->flags & REDIS_CMD_BULK)
1913 tryObjectEncoding(c->argv[c->argc-1]);
1914
1915 /* Check if the user is authenticated */
1916 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1917 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1918 resetClient(c);
1919 return 1;
1920 }
1921
1922 /* Exec the command */
1923 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1924 queueMultiCommand(c,cmd);
1925 addReply(c,shared.queued);
1926 } else {
1927 call(c,cmd);
1928 }
1929
1930 /* Prepare the client for the next command */
1931 if (c->flags & REDIS_CLOSE) {
1932 freeClient(c);
1933 return 0;
1934 }
1935 resetClient(c);
1936 return 1;
1937 }
1938
1939 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1940 listNode *ln;
1941 int outc = 0, j;
1942 robj **outv;
1943 /* (args*2)+1 is enough room for args, spaces, newlines */
1944 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1945
1946 if (argc <= REDIS_STATIC_ARGS) {
1947 outv = static_outv;
1948 } else {
1949 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1950 }
1951
1952 for (j = 0; j < argc; j++) {
1953 if (j != 0) outv[outc++] = shared.space;
1954 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1955 robj *lenobj;
1956
1957 lenobj = createObject(REDIS_STRING,
1958 sdscatprintf(sdsempty(),"%lu\r\n",
1959 (unsigned long) stringObjectLen(argv[j])));
1960 lenobj->refcount = 0;
1961 outv[outc++] = lenobj;
1962 }
1963 outv[outc++] = argv[j];
1964 }
1965 outv[outc++] = shared.crlf;
1966
1967 /* Increment all the refcounts at start and decrement at end in order to
1968 * be sure to free objects if there is no slave in a replication state
1969 * able to be feed with commands */
1970 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1971 listRewind(slaves);
1972 while((ln = listYield(slaves))) {
1973 redisClient *slave = ln->value;
1974
1975 /* Don't feed slaves that are still waiting for BGSAVE to start */
1976 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1977
1978 /* Feed all the other slaves, MONITORs and so on */
1979 if (slave->slaveseldb != dictid) {
1980 robj *selectcmd;
1981
1982 switch(dictid) {
1983 case 0: selectcmd = shared.select0; break;
1984 case 1: selectcmd = shared.select1; break;
1985 case 2: selectcmd = shared.select2; break;
1986 case 3: selectcmd = shared.select3; break;
1987 case 4: selectcmd = shared.select4; break;
1988 case 5: selectcmd = shared.select5; break;
1989 case 6: selectcmd = shared.select6; break;
1990 case 7: selectcmd = shared.select7; break;
1991 case 8: selectcmd = shared.select8; break;
1992 case 9: selectcmd = shared.select9; break;
1993 default:
1994 selectcmd = createObject(REDIS_STRING,
1995 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1996 selectcmd->refcount = 0;
1997 break;
1998 }
1999 addReply(slave,selectcmd);
2000 slave->slaveseldb = dictid;
2001 }
2002 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
2003 }
2004 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
2005 if (outv != static_outv) zfree(outv);
2006 }
2007
2008 static void processInputBuffer(redisClient *c) {
2009 again:
2010 /* Before to process the input buffer, make sure the client is not
2011 * waitig for a blocking operation such as BLPOP. Note that the first
2012 * iteration the client is never blocked, otherwise the processInputBuffer
2013 * would not be called at all, but after the execution of the first commands
2014 * in the input buffer the client may be blocked, and the "goto again"
2015 * will try to reiterate. The following line will make it return asap. */
2016 if (c->flags & REDIS_BLOCKED) return;
2017 if (c->bulklen == -1) {
2018 /* Read the first line of the query */
2019 char *p = strchr(c->querybuf,'\n');
2020 size_t querylen;
2021
2022 if (p) {
2023 sds query, *argv;
2024 int argc, j;
2025
2026 query = c->querybuf;
2027 c->querybuf = sdsempty();
2028 querylen = 1+(p-(query));
2029 if (sdslen(query) > querylen) {
2030 /* leave data after the first line of the query in the buffer */
2031 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2032 }
2033 *p = '\0'; /* remove "\n" */
2034 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2035 sdsupdatelen(query);
2036
2037 /* Now we can split the query in arguments */
2038 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2039 sdsfree(query);
2040
2041 if (c->argv) zfree(c->argv);
2042 c->argv = zmalloc(sizeof(robj*)*argc);
2043
2044 for (j = 0; j < argc; j++) {
2045 if (sdslen(argv[j])) {
2046 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2047 c->argc++;
2048 } else {
2049 sdsfree(argv[j]);
2050 }
2051 }
2052 zfree(argv);
2053 if (c->argc) {
2054 /* Execute the command. If the client is still valid
2055 * after processCommand() return and there is something
2056 * on the query buffer try to process the next command. */
2057 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2058 } else {
2059 /* Nothing to process, argc == 0. Just process the query
2060 * buffer if it's not empty or return to the caller */
2061 if (sdslen(c->querybuf)) goto again;
2062 }
2063 return;
2064 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2065 redisLog(REDIS_DEBUG, "Client protocol error");
2066 freeClient(c);
2067 return;
2068 }
2069 } else {
2070 /* Bulk read handling. Note that if we are at this point
2071 the client already sent a command terminated with a newline,
2072 we are reading the bulk data that is actually the last
2073 argument of the command. */
2074 int qbl = sdslen(c->querybuf);
2075
2076 if (c->bulklen <= qbl) {
2077 /* Copy everything but the final CRLF as final argument */
2078 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2079 c->argc++;
2080 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2081 /* Process the command. If the client is still valid after
2082 * the processing and there is more data in the buffer
2083 * try to parse it. */
2084 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2085 return;
2086 }
2087 }
2088 }
2089
2090 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2091 redisClient *c = (redisClient*) privdata;
2092 char buf[REDIS_IOBUF_LEN];
2093 int nread;
2094 REDIS_NOTUSED(el);
2095 REDIS_NOTUSED(mask);
2096
2097 nread = read(fd, buf, REDIS_IOBUF_LEN);
2098 if (nread == -1) {
2099 if (errno == EAGAIN) {
2100 nread = 0;
2101 } else {
2102 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2103 freeClient(c);
2104 return;
2105 }
2106 } else if (nread == 0) {
2107 redisLog(REDIS_DEBUG, "Client closed connection");
2108 freeClient(c);
2109 return;
2110 }
2111 if (nread) {
2112 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2113 c->lastinteraction = time(NULL);
2114 } else {
2115 return;
2116 }
2117 processInputBuffer(c);
2118 }
2119
2120 static int selectDb(redisClient *c, int id) {
2121 if (id < 0 || id >= server.dbnum)
2122 return REDIS_ERR;
2123 c->db = &server.db[id];
2124 return REDIS_OK;
2125 }
2126
2127 static void *dupClientReplyValue(void *o) {
2128 incrRefCount((robj*)o);
2129 return 0;
2130 }
2131
2132 static redisClient *createClient(int fd) {
2133 redisClient *c = zmalloc(sizeof(*c));
2134
2135 anetNonBlock(NULL,fd);
2136 anetTcpNoDelay(NULL,fd);
2137 if (!c) return NULL;
2138 selectDb(c,0);
2139 c->fd = fd;
2140 c->querybuf = sdsempty();
2141 c->argc = 0;
2142 c->argv = NULL;
2143 c->bulklen = -1;
2144 c->multibulk = 0;
2145 c->mbargc = 0;
2146 c->mbargv = NULL;
2147 c->sentlen = 0;
2148 c->flags = 0;
2149 c->lastinteraction = time(NULL);
2150 c->authenticated = 0;
2151 c->replstate = REDIS_REPL_NONE;
2152 c->reply = listCreate();
2153 c->blockingkeys = NULL;
2154 c->blockingkeysnum = 0;
2155 listSetFreeMethod(c->reply,decrRefCount);
2156 listSetDupMethod(c->reply,dupClientReplyValue);
2157 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2158 readQueryFromClient, c) == AE_ERR) {
2159 freeClient(c);
2160 return NULL;
2161 }
2162 listAddNodeTail(server.clients,c);
2163 initClientMultiState(c);
2164 return c;
2165 }
2166
2167 static void addReply(redisClient *c, robj *obj) {
2168 if (listLength(c->reply) == 0 &&
2169 (c->replstate == REDIS_REPL_NONE ||
2170 c->replstate == REDIS_REPL_ONLINE) &&
2171 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2172 sendReplyToClient, c) == AE_ERR) return;
2173 listAddNodeTail(c->reply,getDecodedObject(obj));
2174 }
2175
2176 static void addReplySds(redisClient *c, sds s) {
2177 robj *o = createObject(REDIS_STRING,s);
2178 addReply(c,o);
2179 decrRefCount(o);
2180 }
2181
2182 static void addReplyDouble(redisClient *c, double d) {
2183 char buf[128];
2184
2185 snprintf(buf,sizeof(buf),"%.17g",d);
2186 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2187 (unsigned long) strlen(buf),buf));
2188 }
2189
2190 static void addReplyBulkLen(redisClient *c, robj *obj) {
2191 size_t len;
2192
2193 if (obj->encoding == REDIS_ENCODING_RAW) {
2194 len = sdslen(obj->ptr);
2195 } else {
2196 long n = (long)obj->ptr;
2197
2198 /* Compute how many bytes will take this integer as a radix 10 string */
2199 len = 1;
2200 if (n < 0) {
2201 len++;
2202 n = -n;
2203 }
2204 while((n = n/10) != 0) {
2205 len++;
2206 }
2207 }
2208 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2209 }
2210
2211 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2212 int cport, cfd;
2213 char cip[128];
2214 redisClient *c;
2215 REDIS_NOTUSED(el);
2216 REDIS_NOTUSED(mask);
2217 REDIS_NOTUSED(privdata);
2218
2219 cfd = anetAccept(server.neterr, fd, cip, &cport);
2220 if (cfd == AE_ERR) {
2221 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2222 return;
2223 }
2224 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2225 if ((c = createClient(cfd)) == NULL) {
2226 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2227 close(cfd); /* May be already closed, just ingore errors */
2228 return;
2229 }
2230 /* If maxclient directive is set and this is one client more... close the
2231 * connection. Note that we create the client instead to check before
2232 * for this condition, since now the socket is already set in nonblocking
2233 * mode and we can send an error for free using the Kernel I/O */
2234 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2235 char *err = "-ERR max number of clients reached\r\n";
2236
2237 /* That's a best effort error message, don't check write errors */
2238 if (write(c->fd,err,strlen(err)) == -1) {
2239 /* Nothing to do, Just to avoid the warning... */
2240 }
2241 freeClient(c);
2242 return;
2243 }
2244 server.stat_numconnections++;
2245 }
2246
2247 /* ======================= Redis objects implementation ===================== */
2248
2249 static robj *createObject(int type, void *ptr) {
2250 robj *o;
2251
2252 if (listLength(server.objfreelist)) {
2253 listNode *head = listFirst(server.objfreelist);
2254 o = listNodeValue(head);
2255 listDelNode(server.objfreelist,head);
2256 } else {
2257 if (server.vm_enabled) {
2258 o = zmalloc(sizeof(*o));
2259 } else {
2260 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2261 }
2262 }
2263 o->type = type;
2264 o->encoding = REDIS_ENCODING_RAW;
2265 o->ptr = ptr;
2266 o->refcount = 1;
2267 if (server.vm_enabled) {
2268 o->vm.atime = server.unixtime;
2269 o->storage = REDIS_VM_MEMORY;
2270 }
2271 return o;
2272 }
2273
2274 static robj *createStringObject(char *ptr, size_t len) {
2275 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2276 }
2277
2278 static robj *createListObject(void) {
2279 list *l = listCreate();
2280
2281 listSetFreeMethod(l,decrRefCount);
2282 return createObject(REDIS_LIST,l);
2283 }
2284
2285 static robj *createSetObject(void) {
2286 dict *d = dictCreate(&setDictType,NULL);
2287 return createObject(REDIS_SET,d);
2288 }
2289
2290 static robj *createZsetObject(void) {
2291 zset *zs = zmalloc(sizeof(*zs));
2292
2293 zs->dict = dictCreate(&zsetDictType,NULL);
2294 zs->zsl = zslCreate();
2295 return createObject(REDIS_ZSET,zs);
2296 }
2297
2298 static void freeStringObject(robj *o) {
2299 if (o->encoding == REDIS_ENCODING_RAW) {
2300 sdsfree(o->ptr);
2301 }
2302 }
2303
2304 static void freeListObject(robj *o) {
2305 listRelease((list*) o->ptr);
2306 }
2307
2308 static void freeSetObject(robj *o) {
2309 dictRelease((dict*) o->ptr);
2310 }
2311
2312 static void freeZsetObject(robj *o) {
2313 zset *zs = o->ptr;
2314
2315 dictRelease(zs->dict);
2316 zslFree(zs->zsl);
2317 zfree(zs);
2318 }
2319
2320 static void freeHashObject(robj *o) {
2321 dictRelease((dict*) o->ptr);
2322 }
2323
2324 static void incrRefCount(robj *o) {
2325 assert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
2326 o->refcount++;
2327 }
2328
2329 static void decrRefCount(void *obj) {
2330 robj *o = obj;
2331
2332 /* REDIS_VM_SWAPPED */
2333 if (server.vm_enabled && o->storage == REDIS_VM_SWAPPED) {
2334 assert(o->refcount == 1);
2335 assert(o->type == REDIS_STRING);
2336 freeStringObject(o);
2337 vmMarkPagesFree(o->vm.page,o->vm.usedpages);
2338 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2339 !listAddNodeHead(server.objfreelist,o))
2340 zfree(o);
2341 return;
2342 }
2343 /* REDIS_VM_MEMORY */
2344 if (--(o->refcount) == 0) {
2345 switch(o->type) {
2346 case REDIS_STRING: freeStringObject(o); break;
2347 case REDIS_LIST: freeListObject(o); break;
2348 case REDIS_SET: freeSetObject(o); break;
2349 case REDIS_ZSET: freeZsetObject(o); break;
2350 case REDIS_HASH: freeHashObject(o); break;
2351 default: redisAssert(0 != 0); break;
2352 }
2353 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2354 !listAddNodeHead(server.objfreelist,o))
2355 zfree(o);
2356 }
2357 }
2358
2359 static robj *lookupKey(redisDb *db, robj *key) {
2360 dictEntry *de = dictFind(db->dict,key);
2361 if (de) {
2362 robj *o = dictGetEntryVal(de);
2363
2364 /* Update the access time of the key for the aging algorithm. */
2365 if (server.vm_enabled) o->vm.atime = server.unixtime;
2366 return o;
2367 } else {
2368 return NULL;
2369 }
2370 }
2371
2372 static robj *lookupKeyRead(redisDb *db, robj *key) {
2373 expireIfNeeded(db,key);
2374 return lookupKey(db,key);
2375 }
2376
2377 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2378 deleteIfVolatile(db,key);
2379 return lookupKey(db,key);
2380 }
2381
2382 static int deleteKey(redisDb *db, robj *key) {
2383 int retval;
2384
2385 /* We need to protect key from destruction: after the first dictDelete()
2386 * it may happen that 'key' is no longer valid if we don't increment
2387 * it's count. This may happen when we get the object reference directly
2388 * from the hash table with dictRandomKey() or dict iterators */
2389 incrRefCount(key);
2390 if (dictSize(db->expires)) dictDelete(db->expires,key);
2391 retval = dictDelete(db->dict,key);
2392 decrRefCount(key);
2393
2394 return retval == DICT_OK;
2395 }
2396
2397 /* Try to share an object against the shared objects pool */
2398 static robj *tryObjectSharing(robj *o) {
2399 struct dictEntry *de;
2400 unsigned long c;
2401
2402 if (o == NULL || server.shareobjects == 0) return o;
2403
2404 redisAssert(o->type == REDIS_STRING);
2405 de = dictFind(server.sharingpool,o);
2406 if (de) {
2407 robj *shared = dictGetEntryKey(de);
2408
2409 c = ((unsigned long) dictGetEntryVal(de))+1;
2410 dictGetEntryVal(de) = (void*) c;
2411 incrRefCount(shared);
2412 decrRefCount(o);
2413 return shared;
2414 } else {
2415 /* Here we are using a stream algorihtm: Every time an object is
2416 * shared we increment its count, everytime there is a miss we
2417 * recrement the counter of a random object. If this object reaches
2418 * zero we remove the object and put the current object instead. */
2419 if (dictSize(server.sharingpool) >=
2420 server.sharingpoolsize) {
2421 de = dictGetRandomKey(server.sharingpool);
2422 redisAssert(de != NULL);
2423 c = ((unsigned long) dictGetEntryVal(de))-1;
2424 dictGetEntryVal(de) = (void*) c;
2425 if (c == 0) {
2426 dictDelete(server.sharingpool,de->key);
2427 }
2428 } else {
2429 c = 0; /* If the pool is empty we want to add this object */
2430 }
2431 if (c == 0) {
2432 int retval;
2433
2434 retval = dictAdd(server.sharingpool,o,(void*)1);
2435 redisAssert(retval == DICT_OK);
2436 incrRefCount(o);
2437 }
2438 return o;
2439 }
2440 }
2441
2442 /* Check if the nul-terminated string 's' can be represented by a long
2443 * (that is, is a number that fits into long without any other space or
2444 * character before or after the digits).
2445 *
2446 * If so, the function returns REDIS_OK and *longval is set to the value
2447 * of the number. Otherwise REDIS_ERR is returned */
2448 static int isStringRepresentableAsLong(sds s, long *longval) {
2449 char buf[32], *endptr;
2450 long value;
2451 int slen;
2452
2453 value = strtol(s, &endptr, 10);
2454 if (endptr[0] != '\0') return REDIS_ERR;
2455 slen = snprintf(buf,32,"%ld",value);
2456
2457 /* If the number converted back into a string is not identical
2458 * then it's not possible to encode the string as integer */
2459 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2460 if (longval) *longval = value;
2461 return REDIS_OK;
2462 }
2463
2464 /* Try to encode a string object in order to save space */
2465 static int tryObjectEncoding(robj *o) {
2466 long value;
2467 sds s = o->ptr;
2468
2469 if (o->encoding != REDIS_ENCODING_RAW)
2470 return REDIS_ERR; /* Already encoded */
2471
2472 /* It's not save to encode shared objects: shared objects can be shared
2473 * everywhere in the "object space" of Redis. Encoded objects can only
2474 * appear as "values" (and not, for instance, as keys) */
2475 if (o->refcount > 1) return REDIS_ERR;
2476
2477 /* Currently we try to encode only strings */
2478 redisAssert(o->type == REDIS_STRING);
2479
2480 /* Check if we can represent this string as a long integer */
2481 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2482
2483 /* Ok, this object can be encoded */
2484 o->encoding = REDIS_ENCODING_INT;
2485 sdsfree(o->ptr);
2486 o->ptr = (void*) value;
2487 return REDIS_OK;
2488 }
2489
2490 /* Get a decoded version of an encoded object (returned as a new object).
2491 * If the object is already raw-encoded just increment the ref count. */
2492 static robj *getDecodedObject(robj *o) {
2493 robj *dec;
2494
2495 if (o->encoding == REDIS_ENCODING_RAW) {
2496 incrRefCount(o);
2497 return o;
2498 }
2499 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2500 char buf[32];
2501
2502 snprintf(buf,32,"%ld",(long)o->ptr);
2503 dec = createStringObject(buf,strlen(buf));
2504 return dec;
2505 } else {
2506 redisAssert(1 != 1);
2507 }
2508 }
2509
2510 /* Compare two string objects via strcmp() or alike.
2511 * Note that the objects may be integer-encoded. In such a case we
2512 * use snprintf() to get a string representation of the numbers on the stack
2513 * and compare the strings, it's much faster than calling getDecodedObject().
2514 *
2515 * Important note: if objects are not integer encoded, but binary-safe strings,
2516 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2517 * binary safe. */
2518 static int compareStringObjects(robj *a, robj *b) {
2519 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2520 char bufa[128], bufb[128], *astr, *bstr;
2521 int bothsds = 1;
2522
2523 if (a == b) return 0;
2524 if (a->encoding != REDIS_ENCODING_RAW) {
2525 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2526 astr = bufa;
2527 bothsds = 0;
2528 } else {
2529 astr = a->ptr;
2530 }
2531 if (b->encoding != REDIS_ENCODING_RAW) {
2532 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2533 bstr = bufb;
2534 bothsds = 0;
2535 } else {
2536 bstr = b->ptr;
2537 }
2538 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2539 }
2540
2541 static size_t stringObjectLen(robj *o) {
2542 redisAssert(o->type == REDIS_STRING);
2543 if (o->encoding == REDIS_ENCODING_RAW) {
2544 return sdslen(o->ptr);
2545 } else {
2546 char buf[32];
2547
2548 return snprintf(buf,32,"%ld",(long)o->ptr);
2549 }
2550 }
2551
2552 /*============================ RDB saving/loading =========================== */
2553
2554 static int rdbSaveType(FILE *fp, unsigned char type) {
2555 if (fwrite(&type,1,1,fp) == 0) return -1;
2556 return 0;
2557 }
2558
2559 static int rdbSaveTime(FILE *fp, time_t t) {
2560 int32_t t32 = (int32_t) t;
2561 if (fwrite(&t32,4,1,fp) == 0) return -1;
2562 return 0;
2563 }
2564
2565 /* check rdbLoadLen() comments for more info */
2566 static int rdbSaveLen(FILE *fp, uint32_t len) {
2567 unsigned char buf[2];
2568
2569 if (len < (1<<6)) {
2570 /* Save a 6 bit len */
2571 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2572 if (fwrite(buf,1,1,fp) == 0) return -1;
2573 } else if (len < (1<<14)) {
2574 /* Save a 14 bit len */
2575 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2576 buf[1] = len&0xFF;
2577 if (fwrite(buf,2,1,fp) == 0) return -1;
2578 } else {
2579 /* Save a 32 bit len */
2580 buf[0] = (REDIS_RDB_32BITLEN<<6);
2581 if (fwrite(buf,1,1,fp) == 0) return -1;
2582 len = htonl(len);
2583 if (fwrite(&len,4,1,fp) == 0) return -1;
2584 }
2585 return 0;
2586 }
2587
2588 /* String objects in the form "2391" "-100" without any space and with a
2589 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2590 * encoded as integers to save space */
2591 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2592 long long value;
2593 char *endptr, buf[32];
2594
2595 /* Check if it's possible to encode this value as a number */
2596 value = strtoll(s, &endptr, 10);
2597 if (endptr[0] != '\0') return 0;
2598 snprintf(buf,32,"%lld",value);
2599
2600 /* If the number converted back into a string is not identical
2601 * then it's not possible to encode the string as integer */
2602 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2603
2604 /* Finally check if it fits in our ranges */
2605 if (value >= -(1<<7) && value <= (1<<7)-1) {
2606 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2607 enc[1] = value&0xFF;
2608 return 2;
2609 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2610 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2611 enc[1] = value&0xFF;
2612 enc[2] = (value>>8)&0xFF;
2613 return 3;
2614 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2615 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2616 enc[1] = value&0xFF;
2617 enc[2] = (value>>8)&0xFF;
2618 enc[3] = (value>>16)&0xFF;
2619 enc[4] = (value>>24)&0xFF;
2620 return 5;
2621 } else {
2622 return 0;
2623 }
2624 }
2625
2626 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2627 unsigned int comprlen, outlen;
2628 unsigned char byte;
2629 void *out;
2630
2631 /* We require at least four bytes compression for this to be worth it */
2632 outlen = sdslen(obj->ptr)-4;
2633 if (outlen <= 0) return 0;
2634 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2635 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2636 if (comprlen == 0) {
2637 zfree(out);
2638 return 0;
2639 }
2640 /* Data compressed! Let's save it on disk */
2641 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2642 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2643 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2644 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2645 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2646 zfree(out);
2647 return comprlen;
2648
2649 writeerr:
2650 zfree(out);
2651 return -1;
2652 }
2653
2654 /* Save a string objet as [len][data] on disk. If the object is a string
2655 * representation of an integer value we try to safe it in a special form */
2656 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2657 size_t len;
2658 int enclen;
2659
2660 len = sdslen(obj->ptr);
2661
2662 /* Try integer encoding */
2663 if (len <= 11) {
2664 unsigned char buf[5];
2665 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2666 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2667 return 0;
2668 }
2669 }
2670
2671 /* Try LZF compression - under 20 bytes it's unable to compress even
2672 * aaaaaaaaaaaaaaaaaa so skip it */
2673 if (server.rdbcompression && len > 20) {
2674 int retval;
2675
2676 retval = rdbSaveLzfStringObject(fp,obj);
2677 if (retval == -1) return -1;
2678 if (retval > 0) return 0;
2679 /* retval == 0 means data can't be compressed, save the old way */
2680 }
2681
2682 /* Store verbatim */
2683 if (rdbSaveLen(fp,len) == -1) return -1;
2684 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2685 return 0;
2686 }
2687
2688 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2689 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2690 int retval;
2691
2692 obj = getDecodedObject(obj);
2693 retval = rdbSaveStringObjectRaw(fp,obj);
2694 decrRefCount(obj);
2695 return retval;
2696 }
2697
2698 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2699 * 8 bit integer specifing the length of the representation.
2700 * This 8 bit integer has special values in order to specify the following
2701 * conditions:
2702 * 253: not a number
2703 * 254: + inf
2704 * 255: - inf
2705 */
2706 static int rdbSaveDoubleValue(FILE *fp, double val) {
2707 unsigned char buf[128];
2708 int len;
2709
2710 if (isnan(val)) {
2711 buf[0] = 253;
2712 len = 1;
2713 } else if (!isfinite(val)) {
2714 len = 1;
2715 buf[0] = (val < 0) ? 255 : 254;
2716 } else {
2717 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2718 buf[0] = strlen((char*)buf+1);
2719 len = buf[0]+1;
2720 }
2721 if (fwrite(buf,len,1,fp) == 0) return -1;
2722 return 0;
2723 }
2724
2725 /* Save a Redis object. */
2726 static int rdbSaveObject(FILE *fp, robj *o) {
2727 if (o->type == REDIS_STRING) {
2728 /* Save a string value */
2729 if (rdbSaveStringObject(fp,o) == -1) return -1;
2730 } else if (o->type == REDIS_LIST) {
2731 /* Save a list value */
2732 list *list = o->ptr;
2733 listNode *ln;
2734
2735 listRewind(list);
2736 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2737 while((ln = listYield(list))) {
2738 robj *eleobj = listNodeValue(ln);
2739
2740 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2741 }
2742 } else if (o->type == REDIS_SET) {
2743 /* Save a set value */
2744 dict *set = o->ptr;
2745 dictIterator *di = dictGetIterator(set);
2746 dictEntry *de;
2747
2748 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2749 while((de = dictNext(di)) != NULL) {
2750 robj *eleobj = dictGetEntryKey(de);
2751
2752 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2753 }
2754 dictReleaseIterator(di);
2755 } else if (o->type == REDIS_ZSET) {
2756 /* Save a set value */
2757 zset *zs = o->ptr;
2758 dictIterator *di = dictGetIterator(zs->dict);
2759 dictEntry *de;
2760
2761 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2762 while((de = dictNext(di)) != NULL) {
2763 robj *eleobj = dictGetEntryKey(de);
2764 double *score = dictGetEntryVal(de);
2765
2766 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2767 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2768 }
2769 dictReleaseIterator(di);
2770 } else {
2771 redisAssert(0 != 0);
2772 }
2773 return 0;
2774 }
2775
2776 /* Return the length the object will have on disk if saved with
2777 * the rdbSaveObject() function. Currently we use a trick to get
2778 * this length with very little changes to the code. In the future
2779 * we could switch to a faster solution. */
2780 static off_t rdbSavedObjectLen(robj *o) {
2781 static FILE *fp = NULL;
2782
2783 if (fp == NULL) fp = fopen("/dev/null","w");
2784 assert(fp != NULL);
2785
2786 rewind(fp);
2787 assert(rdbSaveObject(fp,o) != 1);
2788 return ftello(fp);
2789 }
2790
2791 /* Return the number of pages required to save this object in the swap file */
2792 static off_t rdbSavedObjectPages(robj *o) {
2793 off_t bytes = rdbSavedObjectLen(o);
2794
2795 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2796 }
2797
2798 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2799 static int rdbSave(char *filename) {
2800 dictIterator *di = NULL;
2801 dictEntry *de;
2802 FILE *fp;
2803 char tmpfile[256];
2804 int j;
2805 time_t now = time(NULL);
2806
2807 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2808 fp = fopen(tmpfile,"w");
2809 if (!fp) {
2810 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2811 return REDIS_ERR;
2812 }
2813 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2814 for (j = 0; j < server.dbnum; j++) {
2815 redisDb *db = server.db+j;
2816 dict *d = db->dict;
2817 if (dictSize(d) == 0) continue;
2818 di = dictGetIterator(d);
2819 if (!di) {
2820 fclose(fp);
2821 return REDIS_ERR;
2822 }
2823
2824 /* Write the SELECT DB opcode */
2825 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2826 if (rdbSaveLen(fp,j) == -1) goto werr;
2827
2828 /* Iterate this DB writing every entry */
2829 while((de = dictNext(di)) != NULL) {
2830 robj *key = dictGetEntryKey(de);
2831 robj *o = dictGetEntryVal(de);
2832 time_t expiretime = getExpire(db,key);
2833
2834 /* Save the expire time */
2835 if (expiretime != -1) {
2836 /* If this key is already expired skip it */
2837 if (expiretime < now) continue;
2838 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2839 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2840 }
2841 /* Save the key and associated value */
2842 if (rdbSaveType(fp,o->type) == -1) goto werr;
2843 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2844 /* Save the actual value */
2845 if (rdbSaveObject(fp,o) == -1) goto werr;
2846 }
2847 dictReleaseIterator(di);
2848 }
2849 /* EOF opcode */
2850 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2851
2852 /* Make sure data will not remain on the OS's output buffers */
2853 fflush(fp);
2854 fsync(fileno(fp));
2855 fclose(fp);
2856
2857 /* Use RENAME to make sure the DB file is changed atomically only
2858 * if the generate DB file is ok. */
2859 if (rename(tmpfile,filename) == -1) {
2860 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2861 unlink(tmpfile);
2862 return REDIS_ERR;
2863 }
2864 redisLog(REDIS_NOTICE,"DB saved on disk");
2865 server.dirty = 0;
2866 server.lastsave = time(NULL);
2867 return REDIS_OK;
2868
2869 werr:
2870 fclose(fp);
2871 unlink(tmpfile);
2872 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2873 if (di) dictReleaseIterator(di);
2874 return REDIS_ERR;
2875 }
2876
2877 static int rdbSaveBackground(char *filename) {
2878 pid_t childpid;
2879
2880 if (server.bgsavechildpid != -1) return REDIS_ERR;
2881 if ((childpid = fork()) == 0) {
2882 /* Child */
2883 close(server.fd);
2884 if (rdbSave(filename) == REDIS_OK) {
2885 exit(0);
2886 } else {
2887 exit(1);
2888 }
2889 } else {
2890 /* Parent */
2891 if (childpid == -1) {
2892 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2893 strerror(errno));
2894 return REDIS_ERR;
2895 }
2896 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2897 server.bgsavechildpid = childpid;
2898 return REDIS_OK;
2899 }
2900 return REDIS_OK; /* unreached */
2901 }
2902
2903 static void rdbRemoveTempFile(pid_t childpid) {
2904 char tmpfile[256];
2905
2906 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2907 unlink(tmpfile);
2908 }
2909
2910 static int rdbLoadType(FILE *fp) {
2911 unsigned char type;
2912 if (fread(&type,1,1,fp) == 0) return -1;
2913 return type;
2914 }
2915
2916 static time_t rdbLoadTime(FILE *fp) {
2917 int32_t t32;
2918 if (fread(&t32,4,1,fp) == 0) return -1;
2919 return (time_t) t32;
2920 }
2921
2922 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2923 * of this file for a description of how this are stored on disk.
2924 *
2925 * isencoded is set to 1 if the readed length is not actually a length but
2926 * an "encoding type", check the above comments for more info */
2927 static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
2928 unsigned char buf[2];
2929 uint32_t len;
2930 int type;
2931
2932 if (isencoded) *isencoded = 0;
2933 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2934 type = (buf[0]&0xC0)>>6;
2935 if (type == REDIS_RDB_6BITLEN) {
2936 /* Read a 6 bit len */
2937 return buf[0]&0x3F;
2938 } else if (type == REDIS_RDB_ENCVAL) {
2939 /* Read a 6 bit len encoding type */
2940 if (isencoded) *isencoded = 1;
2941 return buf[0]&0x3F;
2942 } else if (type == REDIS_RDB_14BITLEN) {
2943 /* Read a 14 bit len */
2944 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2945 return ((buf[0]&0x3F)<<8)|buf[1];
2946 } else {
2947 /* Read a 32 bit len */
2948 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2949 return ntohl(len);
2950 }
2951 }
2952
2953 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2954 unsigned char enc[4];
2955 long long val;
2956
2957 if (enctype == REDIS_RDB_ENC_INT8) {
2958 if (fread(enc,1,1,fp) == 0) return NULL;
2959 val = (signed char)enc[0];
2960 } else if (enctype == REDIS_RDB_ENC_INT16) {
2961 uint16_t v;
2962 if (fread(enc,2,1,fp) == 0) return NULL;
2963 v = enc[0]|(enc[1]<<8);
2964 val = (int16_t)v;
2965 } else if (enctype == REDIS_RDB_ENC_INT32) {
2966 uint32_t v;
2967 if (fread(enc,4,1,fp) == 0) return NULL;
2968 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2969 val = (int32_t)v;
2970 } else {
2971 val = 0; /* anti-warning */
2972 redisAssert(0!=0);
2973 }
2974 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2975 }
2976
2977 static robj *rdbLoadLzfStringObject(FILE*fp) {
2978 unsigned int len, clen;
2979 unsigned char *c = NULL;
2980 sds val = NULL;
2981
2982 if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2983 if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2984 if ((c = zmalloc(clen)) == NULL) goto err;
2985 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2986 if (fread(c,clen,1,fp) == 0) goto err;
2987 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2988 zfree(c);
2989 return createObject(REDIS_STRING,val);
2990 err:
2991 zfree(c);
2992 sdsfree(val);
2993 return NULL;
2994 }
2995
2996 static robj *rdbLoadStringObject(FILE*fp) {
2997 int isencoded;
2998 uint32_t len;
2999 sds val;
3000
3001 len = rdbLoadLen(fp,&isencoded);
3002 if (isencoded) {
3003 switch(len) {
3004 case REDIS_RDB_ENC_INT8:
3005 case REDIS_RDB_ENC_INT16:
3006 case REDIS_RDB_ENC_INT32:
3007 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
3008 case REDIS_RDB_ENC_LZF:
3009 return tryObjectSharing(rdbLoadLzfStringObject(fp));
3010 default:
3011 redisAssert(0!=0);
3012 }
3013 }
3014
3015 if (len == REDIS_RDB_LENERR) return NULL;
3016 val = sdsnewlen(NULL,len);
3017 if (len && fread(val,len,1,fp) == 0) {
3018 sdsfree(val);
3019 return NULL;
3020 }
3021 return tryObjectSharing(createObject(REDIS_STRING,val));
3022 }
3023
3024 /* For information about double serialization check rdbSaveDoubleValue() */
3025 static int rdbLoadDoubleValue(FILE *fp, double *val) {
3026 char buf[128];
3027 unsigned char len;
3028
3029 if (fread(&len,1,1,fp) == 0) return -1;
3030 switch(len) {
3031 case 255: *val = R_NegInf; return 0;
3032 case 254: *val = R_PosInf; return 0;
3033 case 253: *val = R_Nan; return 0;
3034 default:
3035 if (fread(buf,len,1,fp) == 0) return -1;
3036 buf[len] = '\0';
3037 sscanf(buf, "%lg", val);
3038 return 0;
3039 }
3040 }
3041
3042 /* Load a Redis object of the specified type from the specified file.
3043 * On success a newly allocated object is returned, otherwise NULL. */
3044 static robj *rdbLoadObject(int type, FILE *fp) {
3045 robj *o;
3046
3047 if (type == REDIS_STRING) {
3048 /* Read string value */
3049 if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
3050 tryObjectEncoding(o);
3051 } else if (type == REDIS_LIST || type == REDIS_SET) {
3052 /* Read list/set value */
3053 uint32_t listlen;
3054
3055 if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3056 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3057 /* Load every single element of the list/set */
3058 while(listlen--) {
3059 robj *ele;
3060
3061 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3062 tryObjectEncoding(ele);
3063 if (type == REDIS_LIST) {
3064 listAddNodeTail((list*)o->ptr,ele);
3065 } else {
3066 dictAdd((dict*)o->ptr,ele,NULL);
3067 }
3068 }
3069 } else if (type == REDIS_ZSET) {
3070 /* Read list/set value */
3071 uint32_t zsetlen;
3072 zset *zs;
3073
3074 if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3075 o = createZsetObject();
3076 zs = o->ptr;
3077 /* Load every single element of the list/set */
3078 while(zsetlen--) {
3079 robj *ele;
3080 double *score = zmalloc(sizeof(double));
3081
3082 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3083 tryObjectEncoding(ele);
3084 if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
3085 dictAdd(zs->dict,ele,score);
3086 zslInsert(zs->zsl,*score,ele);
3087 incrRefCount(ele); /* added to skiplist */
3088 }
3089 } else {
3090 redisAssert(0 != 0);
3091 }
3092 return o;
3093 }
3094
3095 static int rdbLoad(char *filename) {
3096 FILE *fp;
3097 robj *keyobj = NULL;
3098 uint32_t dbid;
3099 int type, retval, rdbver;
3100 dict *d = server.db[0].dict;
3101 redisDb *db = server.db+0;
3102 char buf[1024];
3103 time_t expiretime = -1, now = time(NULL);
3104
3105 fp = fopen(filename,"r");
3106 if (!fp) return REDIS_ERR;
3107 if (fread(buf,9,1,fp) == 0) goto eoferr;
3108 buf[9] = '\0';
3109 if (memcmp(buf,"REDIS",5) != 0) {
3110 fclose(fp);
3111 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3112 return REDIS_ERR;
3113 }
3114 rdbver = atoi(buf+5);
3115 if (rdbver != 1) {
3116 fclose(fp);
3117 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3118 return REDIS_ERR;
3119 }
3120 while(1) {
3121 robj *o;
3122
3123 /* Read type. */
3124 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3125 if (type == REDIS_EXPIRETIME) {
3126 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3127 /* We read the time so we need to read the object type again */
3128 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3129 }
3130 if (type == REDIS_EOF) break;
3131 /* Handle SELECT DB opcode as a special case */
3132 if (type == REDIS_SELECTDB) {
3133 if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
3134 goto eoferr;
3135 if (dbid >= (unsigned)server.dbnum) {
3136 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3137 exit(1);
3138 }
3139 db = server.db+dbid;
3140 d = db->dict;
3141 continue;
3142 }
3143 /* Read key */
3144 if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
3145 /* Read value */
3146 if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
3147 /* Add the new object in the hash table */
3148 retval = dictAdd(d,keyobj,o);
3149 if (retval == DICT_ERR) {
3150 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3151 exit(1);
3152 }
3153 /* Set the expire time if needed */
3154 if (expiretime != -1) {
3155 setExpire(db,keyobj,expiretime);
3156 /* Delete this key if already expired */
3157 if (expiretime < now) deleteKey(db,keyobj);
3158 expiretime = -1;
3159 }
3160 keyobj = o = NULL;
3161 }
3162 fclose(fp);
3163 return REDIS_OK;
3164
3165 eoferr: /* unexpected end of file is handled here with a fatal exit */
3166 if (keyobj) decrRefCount(keyobj);
3167 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3168 exit(1);
3169 return REDIS_ERR; /* Just to avoid warning */
3170 }
3171
3172 /*================================== Commands =============================== */
3173
3174 static void authCommand(redisClient *c) {
3175 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3176 c->authenticated = 1;
3177 addReply(c,shared.ok);
3178 } else {
3179 c->authenticated = 0;
3180 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3181 }
3182 }
3183
3184 static void pingCommand(redisClient *c) {
3185 addReply(c,shared.pong);
3186 }
3187
3188 static void echoCommand(redisClient *c) {
3189 addReplyBulkLen(c,c->argv[1]);
3190 addReply(c,c->argv[1]);
3191 addReply(c,shared.crlf);
3192 }
3193
3194 /*=================================== Strings =============================== */
3195
3196 static void setGenericCommand(redisClient *c, int nx) {
3197 int retval;
3198
3199 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3200 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3201 if (retval == DICT_ERR) {
3202 if (!nx) {
3203 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3204 incrRefCount(c->argv[2]);
3205 } else {
3206 addReply(c,shared.czero);
3207 return;
3208 }
3209 } else {
3210 incrRefCount(c->argv[1]);
3211 incrRefCount(c->argv[2]);
3212 }
3213 server.dirty++;
3214 removeExpire(c->db,c->argv[1]);
3215 addReply(c, nx ? shared.cone : shared.ok);
3216 }
3217
3218 static void setCommand(redisClient *c) {
3219 setGenericCommand(c,0);
3220 }
3221
3222 static void setnxCommand(redisClient *c) {
3223 setGenericCommand(c,1);
3224 }
3225
3226 static int getGenericCommand(redisClient *c) {
3227 robj *o = lookupKeyRead(c->db,c->argv[1]);
3228
3229 if (o == NULL) {
3230 addReply(c,shared.nullbulk);
3231 return REDIS_OK;
3232 } else {
3233 if (o->type != REDIS_STRING) {
3234 addReply(c,shared.wrongtypeerr);
3235 return REDIS_ERR;
3236 } else {
3237 addReplyBulkLen(c,o);
3238 addReply(c,o);
3239 addReply(c,shared.crlf);
3240 return REDIS_OK;
3241 }
3242 }
3243 }
3244
3245 static void getCommand(redisClient *c) {
3246 getGenericCommand(c);
3247 }
3248
3249 static void getsetCommand(redisClient *c) {
3250 if (getGenericCommand(c) == REDIS_ERR) return;
3251 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3252 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3253 } else {
3254 incrRefCount(c->argv[1]);
3255 }
3256 incrRefCount(c->argv[2]);
3257 server.dirty++;
3258 removeExpire(c->db,c->argv[1]);
3259 }
3260
3261 static void mgetCommand(redisClient *c) {
3262 int j;
3263
3264 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3265 for (j = 1; j < c->argc; j++) {
3266 robj *o = lookupKeyRead(c->db,c->argv[j]);
3267 if (o == NULL) {
3268 addReply(c,shared.nullbulk);
3269 } else {
3270 if (o->type != REDIS_STRING) {
3271 addReply(c,shared.nullbulk);
3272 } else {
3273 addReplyBulkLen(c,o);
3274 addReply(c,o);
3275 addReply(c,shared.crlf);
3276 }
3277 }
3278 }
3279 }
3280
3281 static void msetGenericCommand(redisClient *c, int nx) {
3282 int j, busykeys = 0;
3283
3284 if ((c->argc % 2) == 0) {
3285 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3286 return;
3287 }
3288 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3289 * set nothing at all if at least one already key exists. */
3290 if (nx) {
3291 for (j = 1; j < c->argc; j += 2) {
3292 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3293 busykeys++;
3294 }
3295 }
3296 }
3297 if (busykeys) {
3298 addReply(c, shared.czero);
3299 return;
3300 }
3301
3302 for (j = 1; j < c->argc; j += 2) {
3303 int retval;
3304
3305 tryObjectEncoding(c->argv[j+1]);
3306 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3307 if (retval == DICT_ERR) {
3308 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3309 incrRefCount(c->argv[j+1]);
3310 } else {
3311 incrRefCount(c->argv[j]);
3312 incrRefCount(c->argv[j+1]);
3313 }
3314 removeExpire(c->db,c->argv[j]);
3315 }
3316 server.dirty += (c->argc-1)/2;
3317 addReply(c, nx ? shared.cone : shared.ok);
3318 }
3319
3320 static void msetCommand(redisClient *c) {
3321 msetGenericCommand(c,0);
3322 }
3323
3324 static void msetnxCommand(redisClient *c) {
3325 msetGenericCommand(c,1);
3326 }
3327
3328 static void incrDecrCommand(redisClient *c, long long incr) {
3329 long long value;
3330 int retval;
3331 robj *o;
3332
3333 o = lookupKeyWrite(c->db,c->argv[1]);
3334 if (o == NULL) {
3335 value = 0;
3336 } else {
3337 if (o->type != REDIS_STRING) {
3338 value = 0;
3339 } else {
3340 char *eptr;
3341
3342 if (o->encoding == REDIS_ENCODING_RAW)
3343 value = strtoll(o->ptr, &eptr, 10);
3344 else if (o->encoding == REDIS_ENCODING_INT)
3345 value = (long)o->ptr;
3346 else
3347 redisAssert(1 != 1);
3348 }
3349 }
3350
3351 value += incr;
3352 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3353 tryObjectEncoding(o);
3354 retval = dictAdd(c->db->dict,c->argv[1],o);
3355 if (retval == DICT_ERR) {
3356 dictReplace(c->db->dict,c->argv[1],o);
3357 removeExpire(c->db,c->argv[1]);
3358 } else {
3359 incrRefCount(c->argv[1]);
3360 }
3361 server.dirty++;
3362 addReply(c,shared.colon);
3363 addReply(c,o);
3364 addReply(c,shared.crlf);
3365 }
3366
3367 static void incrCommand(redisClient *c) {
3368 incrDecrCommand(c,1);
3369 }
3370
3371 static void decrCommand(redisClient *c) {
3372 incrDecrCommand(c,-1);
3373 }
3374
3375 static void incrbyCommand(redisClient *c) {
3376 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3377 incrDecrCommand(c,incr);
3378 }
3379
3380 static void decrbyCommand(redisClient *c) {
3381 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3382 incrDecrCommand(c,-incr);
3383 }
3384
3385 /* ========================= Type agnostic commands ========================= */
3386
3387 static void delCommand(redisClient *c) {
3388 int deleted = 0, j;
3389
3390 for (j = 1; j < c->argc; j++) {
3391 if (deleteKey(c->db,c->argv[j])) {
3392 server.dirty++;
3393 deleted++;
3394 }
3395 }
3396 switch(deleted) {
3397 case 0:
3398 addReply(c,shared.czero);
3399 break;
3400 case 1:
3401 addReply(c,shared.cone);
3402 break;
3403 default:
3404 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3405 break;
3406 }
3407 }
3408
3409 static void existsCommand(redisClient *c) {
3410 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3411 }
3412
3413 static void selectCommand(redisClient *c) {
3414 int id = atoi(c->argv[1]->ptr);
3415
3416 if (selectDb(c,id) == REDIS_ERR) {
3417 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3418 } else {
3419 addReply(c,shared.ok);
3420 }
3421 }
3422
3423 static void randomkeyCommand(redisClient *c) {
3424 dictEntry *de;
3425
3426 while(1) {
3427 de = dictGetRandomKey(c->db->dict);
3428 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3429 }
3430 if (de == NULL) {
3431 addReply(c,shared.plus);
3432 addReply(c,shared.crlf);
3433 } else {
3434 addReply(c,shared.plus);
3435 addReply(c,dictGetEntryKey(de));
3436 addReply(c,shared.crlf);
3437 }
3438 }
3439
3440 static void keysCommand(redisClient *c) {
3441 dictIterator *di;
3442 dictEntry *de;
3443 sds pattern = c->argv[1]->ptr;
3444 int plen = sdslen(pattern);
3445 unsigned long numkeys = 0, keyslen = 0;
3446 robj *lenobj = createObject(REDIS_STRING,NULL);
3447
3448 di = dictGetIterator(c->db->dict);
3449 addReply(c,lenobj);
3450 decrRefCount(lenobj);
3451 while((de = dictNext(di)) != NULL) {
3452 robj *keyobj = dictGetEntryKey(de);
3453
3454 sds key = keyobj->ptr;
3455 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3456 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3457 if (expireIfNeeded(c->db,keyobj) == 0) {
3458 if (numkeys != 0)
3459 addReply(c,shared.space);
3460 addReply(c,keyobj);
3461 numkeys++;
3462 keyslen += sdslen(key);
3463 }
3464 }
3465 }
3466 dictReleaseIterator(di);
3467 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3468 addReply(c,shared.crlf);
3469 }
3470
3471 static void dbsizeCommand(redisClient *c) {
3472 addReplySds(c,
3473 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3474 }
3475
3476 static void lastsaveCommand(redisClient *c) {
3477 addReplySds(c,
3478 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3479 }
3480
3481 static void typeCommand(redisClient *c) {
3482 robj *o;
3483 char *type;
3484
3485 o = lookupKeyRead(c->db,c->argv[1]);
3486 if (o == NULL) {
3487 type = "+none";
3488 } else {
3489 switch(o->type) {
3490 case REDIS_STRING: type = "+string"; break;
3491 case REDIS_LIST: type = "+list"; break;
3492 case REDIS_SET: type = "+set"; break;
3493 case REDIS_ZSET: type = "+zset"; break;
3494 default: type = "unknown"; break;
3495 }
3496 }
3497 addReplySds(c,sdsnew(type));
3498 addReply(c,shared.crlf);
3499 }
3500
3501 static void saveCommand(redisClient *c) {
3502 if (server.bgsavechildpid != -1) {
3503 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3504 return;
3505 }
3506 if (rdbSave(server.dbfilename) == REDIS_OK) {
3507 addReply(c,shared.ok);
3508 } else {
3509 addReply(c,shared.err);
3510 }
3511 }
3512
3513 static void bgsaveCommand(redisClient *c) {
3514 if (server.bgsavechildpid != -1) {
3515 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3516 return;
3517 }
3518 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3519 char *status = "+Background saving started\r\n";
3520 addReplySds(c,sdsnew(status));
3521 } else {
3522 addReply(c,shared.err);
3523 }
3524 }
3525
3526 static void shutdownCommand(redisClient *c) {
3527 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3528 /* Kill the saving child if there is a background saving in progress.
3529 We want to avoid race conditions, for instance our saving child may
3530 overwrite the synchronous saving did by SHUTDOWN. */
3531 if (server.bgsavechildpid != -1) {
3532 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3533 kill(server.bgsavechildpid,SIGKILL);
3534 rdbRemoveTempFile(server.bgsavechildpid);
3535 }
3536 if (server.appendonly) {
3537 /* Append only file: fsync() the AOF and exit */
3538 fsync(server.appendfd);
3539 exit(0);
3540 } else {
3541 /* Snapshotting. Perform a SYNC SAVE and exit */
3542 if (rdbSave(server.dbfilename) == REDIS_OK) {
3543 if (server.daemonize)
3544 unlink(server.pidfile);
3545 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3546 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3547 exit(0);
3548 } else {
3549 /* Ooops.. error saving! The best we can do is to continue operating.
3550 * Note that if there was a background saving process, in the next
3551 * cron() Redis will be notified that the background saving aborted,
3552 * handling special stuff like slaves pending for synchronization... */
3553 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3554 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3555 }
3556 }
3557 }
3558
3559 static void renameGenericCommand(redisClient *c, int nx) {
3560 robj *o;
3561
3562 /* To use the same key as src and dst is probably an error */
3563 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3564 addReply(c,shared.sameobjecterr);
3565 return;
3566 }
3567
3568 o = lookupKeyWrite(c->db,c->argv[1]);
3569 if (o == NULL) {
3570 addReply(c,shared.nokeyerr);
3571 return;
3572 }
3573 incrRefCount(o);
3574 deleteIfVolatile(c->db,c->argv[2]);
3575 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3576 if (nx) {
3577 decrRefCount(o);
3578 addReply(c,shared.czero);
3579 return;
3580 }
3581 dictReplace(c->db->dict,c->argv[2],o);
3582 } else {
3583 incrRefCount(c->argv[2]);
3584 }
3585 deleteKey(c->db,c->argv[1]);
3586 server.dirty++;
3587 addReply(c,nx ? shared.cone : shared.ok);
3588 }
3589
3590 static void renameCommand(redisClient *c) {
3591 renameGenericCommand(c,0);
3592 }
3593
3594 static void renamenxCommand(redisClient *c) {
3595 renameGenericCommand(c,1);
3596 }
3597
3598 static void moveCommand(redisClient *c) {
3599 robj *o;
3600 redisDb *src, *dst;
3601 int srcid;
3602
3603 /* Obtain source and target DB pointers */
3604 src = c->db;
3605 srcid = c->db->id;
3606 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3607 addReply(c,shared.outofrangeerr);
3608 return;
3609 }
3610 dst = c->db;
3611 selectDb(c,srcid); /* Back to the source DB */
3612
3613 /* If the user is moving using as target the same
3614 * DB as the source DB it is probably an error. */
3615 if (src == dst) {
3616 addReply(c,shared.sameobjecterr);
3617 return;
3618 }
3619
3620 /* Check if the element exists and get a reference */
3621 o = lookupKeyWrite(c->db,c->argv[1]);
3622 if (!o) {
3623 addReply(c,shared.czero);
3624 return;
3625 }
3626
3627 /* Try to add the element to the target DB */
3628 deleteIfVolatile(dst,c->argv[1]);
3629 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3630 addReply(c,shared.czero);
3631 return;
3632 }
3633 incrRefCount(c->argv[1]);
3634 incrRefCount(o);
3635
3636 /* OK! key moved, free the entry in the source DB */
3637 deleteKey(src,c->argv[1]);
3638 server.dirty++;
3639 addReply(c,shared.cone);
3640 }
3641
3642 /* =================================== Lists ================================ */
3643 static void pushGenericCommand(redisClient *c, int where) {
3644 robj *lobj;
3645 list *list;
3646
3647 lobj = lookupKeyWrite(c->db,c->argv[1]);
3648 if (lobj == NULL) {
3649 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3650 addReply(c,shared.ok);
3651 return;
3652 }
3653 lobj = createListObject();
3654 list = lobj->ptr;
3655 if (where == REDIS_HEAD) {
3656 listAddNodeHead(list,c->argv[2]);
3657 } else {
3658 listAddNodeTail(list,c->argv[2]);
3659 }
3660 dictAdd(c->db->dict,c->argv[1],lobj);
3661 incrRefCount(c->argv[1]);
3662 incrRefCount(c->argv[2]);
3663 } else {
3664 if (lobj->type != REDIS_LIST) {
3665 addReply(c,shared.wrongtypeerr);
3666 return;
3667 }
3668 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3669 addReply(c,shared.ok);
3670 return;
3671 }
3672 list = lobj->ptr;
3673 if (where == REDIS_HEAD) {
3674 listAddNodeHead(list,c->argv[2]);
3675 } else {
3676 listAddNodeTail(list,c->argv[2]);
3677 }
3678 incrRefCount(c->argv[2]);
3679 }
3680 server.dirty++;
3681 addReply(c,shared.ok);
3682 }
3683
3684 static void lpushCommand(redisClient *c) {
3685 pushGenericCommand(c,REDIS_HEAD);
3686 }
3687
3688 static void rpushCommand(redisClient *c) {
3689 pushGenericCommand(c,REDIS_TAIL);
3690 }
3691
3692 static void llenCommand(redisClient *c) {
3693 robj *o;
3694 list *l;
3695
3696 o = lookupKeyRead(c->db,c->argv[1]);
3697 if (o == NULL) {
3698 addReply(c,shared.czero);
3699 return;
3700 } else {
3701 if (o->type != REDIS_LIST) {
3702 addReply(c,shared.wrongtypeerr);
3703 } else {
3704 l = o->ptr;
3705 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3706 }
3707 }
3708 }
3709
3710 static void lindexCommand(redisClient *c) {
3711 robj *o;
3712 int index = atoi(c->argv[2]->ptr);
3713
3714 o = lookupKeyRead(c->db,c->argv[1]);
3715 if (o == NULL) {
3716 addReply(c,shared.nullbulk);
3717 } else {
3718 if (o->type != REDIS_LIST) {
3719 addReply(c,shared.wrongtypeerr);
3720 } else {
3721 list *list = o->ptr;
3722 listNode *ln;
3723
3724 ln = listIndex(list, index);
3725 if (ln == NULL) {
3726 addReply(c,shared.nullbulk);
3727 } else {
3728 robj *ele = listNodeValue(ln);
3729 addReplyBulkLen(c,ele);
3730 addReply(c,ele);
3731 addReply(c,shared.crlf);
3732 }
3733 }
3734 }
3735 }
3736
3737 static void lsetCommand(redisClient *c) {
3738 robj *o;
3739 int index = atoi(c->argv[2]->ptr);
3740
3741 o = lookupKeyWrite(c->db,c->argv[1]);
3742 if (o == NULL) {
3743 addReply(c,shared.nokeyerr);
3744 } else {
3745 if (o->type != REDIS_LIST) {
3746 addReply(c,shared.wrongtypeerr);
3747 } else {
3748 list *list = o->ptr;
3749 listNode *ln;
3750
3751 ln = listIndex(list, index);
3752 if (ln == NULL) {
3753 addReply(c,shared.outofrangeerr);
3754 } else {
3755 robj *ele = listNodeValue(ln);
3756
3757 decrRefCount(ele);
3758 listNodeValue(ln) = c->argv[3];
3759 incrRefCount(c->argv[3]);
3760 addReply(c,shared.ok);
3761 server.dirty++;
3762 }
3763 }
3764 }
3765 }
3766
3767 static void popGenericCommand(redisClient *c, int where) {
3768 robj *o;
3769
3770 o = lookupKeyWrite(c->db,c->argv[1]);
3771 if (o == NULL) {
3772 addReply(c,shared.nullbulk);
3773 } else {
3774 if (o->type != REDIS_LIST) {
3775 addReply(c,shared.wrongtypeerr);
3776 } else {
3777 list *list = o->ptr;
3778 listNode *ln;
3779
3780 if (where == REDIS_HEAD)
3781 ln = listFirst(list);
3782 else
3783 ln = listLast(list);
3784
3785 if (ln == NULL) {
3786 addReply(c,shared.nullbulk);
3787 } else {
3788 robj *ele = listNodeValue(ln);
3789 addReplyBulkLen(c,ele);
3790 addReply(c,ele);
3791 addReply(c,shared.crlf);
3792 listDelNode(list,ln);
3793 server.dirty++;
3794 }
3795 }
3796 }
3797 }
3798
3799 static void lpopCommand(redisClient *c) {
3800 popGenericCommand(c,REDIS_HEAD);
3801 }
3802
3803 static void rpopCommand(redisClient *c) {
3804 popGenericCommand(c,REDIS_TAIL);
3805 }
3806
3807 static void lrangeCommand(redisClient *c) {
3808 robj *o;
3809 int start = atoi(c->argv[2]->ptr);
3810 int end = atoi(c->argv[3]->ptr);
3811
3812 o = lookupKeyRead(c->db,c->argv[1]);
3813 if (o == NULL) {
3814 addReply(c,shared.nullmultibulk);
3815 } else {
3816 if (o->type != REDIS_LIST) {
3817 addReply(c,shared.wrongtypeerr);
3818 } else {
3819 list *list = o->ptr;
3820 listNode *ln;
3821 int llen = listLength(list);
3822 int rangelen, j;
3823 robj *ele;
3824
3825 /* convert negative indexes */
3826 if (start < 0) start = llen+start;
3827 if (end < 0) end = llen+end;
3828 if (start < 0) start = 0;
3829 if (end < 0) end = 0;
3830
3831 /* indexes sanity checks */
3832 if (start > end || start >= llen) {
3833 /* Out of range start or start > end result in empty list */
3834 addReply(c,shared.emptymultibulk);
3835 return;
3836 }
3837 if (end >= llen) end = llen-1;
3838 rangelen = (end-start)+1;
3839
3840 /* Return the result in form of a multi-bulk reply */
3841 ln = listIndex(list, start);
3842 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3843 for (j = 0; j < rangelen; j++) {
3844 ele = listNodeValue(ln);
3845 addReplyBulkLen(c,ele);
3846 addReply(c,ele);
3847 addReply(c,shared.crlf);
3848 ln = ln->next;
3849 }
3850 }
3851 }
3852 }
3853
3854 static void ltrimCommand(redisClient *c) {
3855 robj *o;
3856 int start = atoi(c->argv[2]->ptr);
3857 int end = atoi(c->argv[3]->ptr);
3858
3859 o = lookupKeyWrite(c->db,c->argv[1]);
3860 if (o == NULL) {
3861 addReply(c,shared.ok);
3862 } else {
3863 if (o->type != REDIS_LIST) {
3864 addReply(c,shared.wrongtypeerr);
3865 } else {
3866 list *list = o->ptr;
3867 listNode *ln;
3868 int llen = listLength(list);
3869 int j, ltrim, rtrim;
3870
3871 /* convert negative indexes */
3872 if (start < 0) start = llen+start;
3873 if (end < 0) end = llen+end;
3874 if (start < 0) start = 0;
3875 if (end < 0) end = 0;
3876
3877 /* indexes sanity checks */
3878 if (start > end || start >= llen) {
3879 /* Out of range start or start > end result in empty list */
3880 ltrim = llen;
3881 rtrim = 0;
3882 } else {
3883 if (end >= llen) end = llen-1;
3884 ltrim = start;
3885 rtrim = llen-end-1;
3886 }
3887
3888 /* Remove list elements to perform the trim */
3889 for (j = 0; j < ltrim; j++) {
3890 ln = listFirst(list);
3891 listDelNode(list,ln);
3892 }
3893 for (j = 0; j < rtrim; j++) {
3894 ln = listLast(list);
3895 listDelNode(list,ln);
3896 }
3897 server.dirty++;
3898 addReply(c,shared.ok);
3899 }
3900 }
3901 }
3902
3903 static void lremCommand(redisClient *c) {
3904 robj *o;
3905
3906 o = lookupKeyWrite(c->db,c->argv[1]);
3907 if (o == NULL) {
3908 addReply(c,shared.czero);
3909 } else {
3910 if (o->type != REDIS_LIST) {
3911 addReply(c,shared.wrongtypeerr);
3912 } else {
3913 list *list = o->ptr;
3914 listNode *ln, *next;
3915 int toremove = atoi(c->argv[2]->ptr);
3916 int removed = 0;
3917 int fromtail = 0;
3918
3919 if (toremove < 0) {
3920 toremove = -toremove;
3921 fromtail = 1;
3922 }
3923 ln = fromtail ? list->tail : list->head;
3924 while (ln) {
3925 robj *ele = listNodeValue(ln);
3926
3927 next = fromtail ? ln->prev : ln->next;
3928 if (compareStringObjects(ele,c->argv[3]) == 0) {
3929 listDelNode(list,ln);
3930 server.dirty++;
3931 removed++;
3932 if (toremove && removed == toremove) break;
3933 }
3934 ln = next;
3935 }
3936 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3937 }
3938 }
3939 }
3940
3941 /* This is the semantic of this command:
3942 * RPOPLPUSH srclist dstlist:
3943 * IF LLEN(srclist) > 0
3944 * element = RPOP srclist
3945 * LPUSH dstlist element
3946 * RETURN element
3947 * ELSE
3948 * RETURN nil
3949 * END
3950 * END
3951 *
3952 * The idea is to be able to get an element from a list in a reliable way
3953 * since the element is not just returned but pushed against another list
3954 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3955 */
3956 static void rpoplpushcommand(redisClient *c) {
3957 robj *sobj;
3958
3959 sobj = lookupKeyWrite(c->db,c->argv[1]);
3960 if (sobj == NULL) {
3961 addReply(c,shared.nullbulk);
3962 } else {
3963 if (sobj->type != REDIS_LIST) {
3964 addReply(c,shared.wrongtypeerr);
3965 } else {
3966 list *srclist = sobj->ptr;
3967 listNode *ln = listLast(srclist);
3968
3969 if (ln == NULL) {
3970 addReply(c,shared.nullbulk);
3971 } else {
3972 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3973 robj *ele = listNodeValue(ln);
3974 list *dstlist;
3975
3976 if (dobj && dobj->type != REDIS_LIST) {
3977 addReply(c,shared.wrongtypeerr);
3978 return;
3979 }
3980
3981 /* Add the element to the target list (unless it's directly
3982 * passed to some BLPOP-ing client */
3983 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3984 if (dobj == NULL) {
3985 /* Create the list if the key does not exist */
3986 dobj = createListObject();
3987 dictAdd(c->db->dict,c->argv[2],dobj);
3988 incrRefCount(c->argv[2]);
3989 }
3990 dstlist = dobj->ptr;
3991 listAddNodeHead(dstlist,ele);
3992 incrRefCount(ele);
3993 }
3994
3995 /* Send the element to the client as reply as well */
3996 addReplyBulkLen(c,ele);
3997 addReply(c,ele);
3998 addReply(c,shared.crlf);
3999
4000 /* Finally remove the element from the source list */
4001 listDelNode(srclist,ln);
4002 server.dirty++;
4003 }
4004 }
4005 }
4006 }
4007
4008
4009 /* ==================================== Sets ================================ */
4010
4011 static void saddCommand(redisClient *c) {
4012 robj *set;
4013
4014 set = lookupKeyWrite(c->db,c->argv[1]);
4015 if (set == NULL) {
4016 set = createSetObject();
4017 dictAdd(c->db->dict,c->argv[1],set);
4018 incrRefCount(c->argv[1]);
4019 } else {
4020 if (set->type != REDIS_SET) {
4021 addReply(c,shared.wrongtypeerr);
4022 return;
4023 }
4024 }
4025 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
4026 incrRefCount(c->argv[2]);
4027 server.dirty++;
4028 addReply(c,shared.cone);
4029 } else {
4030 addReply(c,shared.czero);
4031 }
4032 }
4033
4034 static void sremCommand(redisClient *c) {
4035 robj *set;
4036
4037 set = lookupKeyWrite(c->db,c->argv[1]);
4038 if (set == NULL) {
4039 addReply(c,shared.czero);
4040 } else {
4041 if (set->type != REDIS_SET) {
4042 addReply(c,shared.wrongtypeerr);
4043 return;
4044 }
4045 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4046 server.dirty++;
4047 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4048 addReply(c,shared.cone);
4049 } else {
4050 addReply(c,shared.czero);
4051 }
4052 }
4053 }
4054
4055 static void smoveCommand(redisClient *c) {
4056 robj *srcset, *dstset;
4057
4058 srcset = lookupKeyWrite(c->db,c->argv[1]);
4059 dstset = lookupKeyWrite(c->db,c->argv[2]);
4060
4061 /* If the source key does not exist return 0, if it's of the wrong type
4062 * raise an error */
4063 if (srcset == NULL || srcset->type != REDIS_SET) {
4064 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4065 return;
4066 }
4067 /* Error if the destination key is not a set as well */
4068 if (dstset && dstset->type != REDIS_SET) {
4069 addReply(c,shared.wrongtypeerr);
4070 return;
4071 }
4072 /* Remove the element from the source set */
4073 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4074 /* Key not found in the src set! return zero */
4075 addReply(c,shared.czero);
4076 return;
4077 }
4078 server.dirty++;
4079 /* Add the element to the destination set */
4080 if (!dstset) {
4081 dstset = createSetObject();
4082 dictAdd(c->db->dict,c->argv[2],dstset);
4083 incrRefCount(c->argv[2]);
4084 }
4085 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4086 incrRefCount(c->argv[3]);
4087 addReply(c,shared.cone);
4088 }
4089
4090 static void sismemberCommand(redisClient *c) {
4091 robj *set;
4092
4093 set = lookupKeyRead(c->db,c->argv[1]);
4094 if (set == NULL) {
4095 addReply(c,shared.czero);
4096 } else {
4097 if (set->type != REDIS_SET) {
4098 addReply(c,shared.wrongtypeerr);
4099 return;
4100 }
4101 if (dictFind(set->ptr,c->argv[2]))
4102 addReply(c,shared.cone);
4103 else
4104 addReply(c,shared.czero);
4105 }
4106 }
4107
4108 static void scardCommand(redisClient *c) {
4109 robj *o;
4110 dict *s;
4111
4112 o = lookupKeyRead(c->db,c->argv[1]);
4113 if (o == NULL) {
4114 addReply(c,shared.czero);
4115 return;
4116 } else {
4117 if (o->type != REDIS_SET) {
4118 addReply(c,shared.wrongtypeerr);
4119 } else {
4120 s = o->ptr;
4121 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4122 dictSize(s)));
4123 }
4124 }
4125 }
4126
4127 static void spopCommand(redisClient *c) {
4128 robj *set;
4129 dictEntry *de;
4130
4131 set = lookupKeyWrite(c->db,c->argv[1]);
4132 if (set == NULL) {
4133 addReply(c,shared.nullbulk);
4134 } else {
4135 if (set->type != REDIS_SET) {
4136 addReply(c,shared.wrongtypeerr);
4137 return;
4138 }
4139 de = dictGetRandomKey(set->ptr);
4140 if (de == NULL) {
4141 addReply(c,shared.nullbulk);
4142 } else {
4143 robj *ele = dictGetEntryKey(de);
4144
4145 addReplyBulkLen(c,ele);
4146 addReply(c,ele);
4147 addReply(c,shared.crlf);
4148 dictDelete(set->ptr,ele);
4149 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4150 server.dirty++;
4151 }
4152 }
4153 }
4154
4155 static void srandmemberCommand(redisClient *c) {
4156 robj *set;
4157 dictEntry *de;
4158
4159 set = lookupKeyRead(c->db,c->argv[1]);
4160 if (set == NULL) {
4161 addReply(c,shared.nullbulk);
4162 } else {
4163 if (set->type != REDIS_SET) {
4164 addReply(c,shared.wrongtypeerr);
4165 return;
4166 }
4167 de = dictGetRandomKey(set->ptr);
4168 if (de == NULL) {
4169 addReply(c,shared.nullbulk);
4170 } else {
4171 robj *ele = dictGetEntryKey(de);
4172
4173 addReplyBulkLen(c,ele);
4174 addReply(c,ele);
4175 addReply(c,shared.crlf);
4176 }
4177 }
4178 }
4179
4180 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4181 dict **d1 = (void*) s1, **d2 = (void*) s2;
4182
4183 return dictSize(*d1)-dictSize(*d2);
4184 }
4185
4186 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4187 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4188 dictIterator *di;
4189 dictEntry *de;
4190 robj *lenobj = NULL, *dstset = NULL;
4191 unsigned long j, cardinality = 0;
4192
4193 for (j = 0; j < setsnum; j++) {
4194 robj *setobj;
4195
4196 setobj = dstkey ?
4197 lookupKeyWrite(c->db,setskeys[j]) :
4198 lookupKeyRead(c->db,setskeys[j]);
4199 if (!setobj) {
4200 zfree(dv);
4201 if (dstkey) {
4202 if (deleteKey(c->db,dstkey))
4203 server.dirty++;
4204 addReply(c,shared.czero);
4205 } else {
4206 addReply(c,shared.nullmultibulk);
4207 }
4208 return;
4209 }
4210 if (setobj->type != REDIS_SET) {
4211 zfree(dv);
4212 addReply(c,shared.wrongtypeerr);
4213 return;
4214 }
4215 dv[j] = setobj->ptr;
4216 }
4217 /* Sort sets from the smallest to largest, this will improve our
4218 * algorithm's performace */
4219 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4220
4221 /* The first thing we should output is the total number of elements...
4222 * since this is a multi-bulk write, but at this stage we don't know
4223 * the intersection set size, so we use a trick, append an empty object
4224 * to the output list and save the pointer to later modify it with the
4225 * right length */
4226 if (!dstkey) {
4227 lenobj = createObject(REDIS_STRING,NULL);
4228 addReply(c,lenobj);
4229 decrRefCount(lenobj);
4230 } else {
4231 /* If we have a target key where to store the resulting set
4232 * create this key with an empty set inside */
4233 dstset = createSetObject();
4234 }
4235
4236 /* Iterate all the elements of the first (smallest) set, and test
4237 * the element against all the other sets, if at least one set does
4238 * not include the element it is discarded */
4239 di = dictGetIterator(dv[0]);
4240
4241 while((de = dictNext(di)) != NULL) {
4242 robj *ele;
4243
4244 for (j = 1; j < setsnum; j++)
4245 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4246 if (j != setsnum)
4247 continue; /* at least one set does not contain the member */
4248 ele = dictGetEntryKey(de);
4249 if (!dstkey) {
4250 addReplyBulkLen(c,ele);
4251 addReply(c,ele);
4252 addReply(c,shared.crlf);
4253 cardinality++;
4254 } else {
4255 dictAdd(dstset->ptr,ele,NULL);
4256 incrRefCount(ele);
4257 }
4258 }
4259 dictReleaseIterator(di);
4260
4261 if (dstkey) {
4262 /* Store the resulting set into the target */
4263 deleteKey(c->db,dstkey);
4264 dictAdd(c->db->dict,dstkey,dstset);
4265 incrRefCount(dstkey);
4266 }
4267
4268 if (!dstkey) {
4269 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4270 } else {
4271 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4272 dictSize((dict*)dstset->ptr)));
4273 server.dirty++;
4274 }
4275 zfree(dv);
4276 }
4277
4278 static void sinterCommand(redisClient *c) {
4279 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4280 }
4281
4282 static void sinterstoreCommand(redisClient *c) {
4283 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4284 }
4285
4286 #define REDIS_OP_UNION 0
4287 #define REDIS_OP_DIFF 1
4288
4289 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4290 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4291 dictIterator *di;
4292 dictEntry *de;
4293 robj *dstset = NULL;
4294 int j, cardinality = 0;
4295
4296 for (j = 0; j < setsnum; j++) {
4297 robj *setobj;
4298
4299 setobj = dstkey ?
4300 lookupKeyWrite(c->db,setskeys[j]) :
4301 lookupKeyRead(c->db,setskeys[j]);
4302 if (!setobj) {
4303 dv[j] = NULL;
4304 continue;
4305 }
4306 if (setobj->type != REDIS_SET) {
4307 zfree(dv);
4308 addReply(c,shared.wrongtypeerr);
4309 return;
4310 }
4311 dv[j] = setobj->ptr;
4312 }
4313
4314 /* We need a temp set object to store our union. If the dstkey
4315 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4316 * this set object will be the resulting object to set into the target key*/
4317 dstset = createSetObject();
4318
4319 /* Iterate all the elements of all the sets, add every element a single
4320 * time to the result set */
4321 for (j = 0; j < setsnum; j++) {
4322 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4323 if (!dv[j]) continue; /* non existing keys are like empty sets */
4324
4325 di = dictGetIterator(dv[j]);
4326
4327 while((de = dictNext(di)) != NULL) {
4328 robj *ele;
4329
4330 /* dictAdd will not add the same element multiple times */
4331 ele = dictGetEntryKey(de);
4332 if (op == REDIS_OP_UNION || j == 0) {
4333 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4334 incrRefCount(ele);
4335 cardinality++;
4336 }
4337 } else if (op == REDIS_OP_DIFF) {
4338 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4339 cardinality--;
4340 }
4341 }
4342 }
4343 dictReleaseIterator(di);
4344
4345 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4346 }
4347
4348 /* Output the content of the resulting set, if not in STORE mode */
4349 if (!dstkey) {
4350 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4351 di = dictGetIterator(dstset->ptr);
4352 while((de = dictNext(di)) != NULL) {
4353 robj *ele;
4354
4355 ele = dictGetEntryKey(de);
4356 addReplyBulkLen(c,ele);
4357 addReply(c,ele);
4358 addReply(c,shared.crlf);
4359 }
4360 dictReleaseIterator(di);
4361 } else {
4362 /* If we have a target key where to store the resulting set
4363 * create this key with the result set inside */
4364 deleteKey(c->db,dstkey);
4365 dictAdd(c->db->dict,dstkey,dstset);
4366 incrRefCount(dstkey);
4367 }
4368
4369 /* Cleanup */
4370 if (!dstkey) {
4371 decrRefCount(dstset);
4372 } else {
4373 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4374 dictSize((dict*)dstset->ptr)));
4375 server.dirty++;
4376 }
4377 zfree(dv);
4378 }
4379
4380 static void sunionCommand(redisClient *c) {
4381 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4382 }
4383
4384 static void sunionstoreCommand(redisClient *c) {
4385 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4386 }
4387
4388 static void sdiffCommand(redisClient *c) {
4389 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4390 }
4391
4392 static void sdiffstoreCommand(redisClient *c) {
4393 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4394 }
4395
4396 /* ==================================== ZSets =============================== */
4397
4398 /* ZSETs are ordered sets using two data structures to hold the same elements
4399 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4400 * data structure.
4401 *
4402 * The elements are added to an hash table mapping Redis objects to scores.
4403 * At the same time the elements are added to a skip list mapping scores
4404 * to Redis objects (so objects are sorted by scores in this "view"). */
4405
4406 /* This skiplist implementation is almost a C translation of the original
4407 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4408 * Alternative to Balanced Trees", modified in three ways:
4409 * a) this implementation allows for repeated values.
4410 * b) the comparison is not just by key (our 'score') but by satellite data.
4411 * c) there is a back pointer, so it's a doubly linked list with the back
4412 * pointers being only at "level 1". This allows to traverse the list
4413 * from tail to head, useful for ZREVRANGE. */
4414
4415 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4416 zskiplistNode *zn = zmalloc(sizeof(*zn));
4417
4418 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4419 zn->score = score;
4420 zn->obj = obj;
4421 return zn;
4422 }
4423
4424 static zskiplist *zslCreate(void) {
4425 int j;
4426 zskiplist *zsl;
4427
4428 zsl = zmalloc(sizeof(*zsl));
4429 zsl->level = 1;
4430 zsl->length = 0;
4431 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4432 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4433 zsl->header->forward[j] = NULL;
4434 zsl->header->backward = NULL;
4435 zsl->tail = NULL;
4436 return zsl;
4437 }
4438
4439 static void zslFreeNode(zskiplistNode *node) {
4440 decrRefCount(node->obj);
4441 zfree(node->forward);
4442 zfree(node);
4443 }
4444
4445 static void zslFree(zskiplist *zsl) {
4446 zskiplistNode *node = zsl->header->forward[0], *next;
4447
4448 zfree(zsl->header->forward);
4449 zfree(zsl->header);
4450 while(node) {
4451 next = node->forward[0];
4452 zslFreeNode(node);
4453 node = next;
4454 }
4455 zfree(zsl);
4456 }
4457
4458 static int zslRandomLevel(void) {
4459 int level = 1;
4460 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4461 level += 1;
4462 return level;
4463 }
4464
4465 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4466 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4467 int i, level;
4468
4469 x = zsl->header;
4470 for (i = zsl->level-1; i >= 0; i--) {
4471 while (x->forward[i] &&
4472 (x->forward[i]->score < score ||
4473 (x->forward[i]->score == score &&
4474 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4475 x = x->forward[i];
4476 update[i] = x;
4477 }
4478 /* we assume the key is not already inside, since we allow duplicated
4479 * scores, and the re-insertion of score and redis object should never
4480 * happpen since the caller of zslInsert() should test in the hash table
4481 * if the element is already inside or not. */
4482 level = zslRandomLevel();
4483 if (level > zsl->level) {
4484 for (i = zsl->level; i < level; i++)
4485 update[i] = zsl->header;
4486 zsl->level = level;
4487 }
4488 x = zslCreateNode(level,score,obj);
4489 for (i = 0; i < level; i++) {
4490 x->forward[i] = update[i]->forward[i];
4491 update[i]->forward[i] = x;
4492 }
4493 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4494 if (x->forward[0])
4495 x->forward[0]->backward = x;
4496 else
4497 zsl->tail = x;
4498 zsl->length++;
4499 }
4500
4501 /* Delete an element with matching score/object from the skiplist. */
4502 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4503 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4504 int i;
4505
4506 x = zsl->header;
4507 for (i = zsl->level-1; i >= 0; i--) {
4508 while (x->forward[i] &&
4509 (x->forward[i]->score < score ||
4510 (x->forward[i]->score == score &&
4511 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4512 x = x->forward[i];
4513 update[i] = x;
4514 }
4515 /* We may have multiple elements with the same score, what we need
4516 * is to find the element with both the right score and object. */
4517 x = x->forward[0];
4518 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4519 for (i = 0; i < zsl->level; i++) {
4520 if (update[i]->forward[i] != x) break;
4521 update[i]->forward[i] = x->forward[i];
4522 }
4523 if (x->forward[0]) {
4524 x->forward[0]->backward = (x->backward == zsl->header) ?
4525 NULL : x->backward;
4526 } else {
4527 zsl->tail = x->backward;
4528 }
4529 zslFreeNode(x);
4530 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4531 zsl->level--;
4532 zsl->length--;
4533 return 1;
4534 } else {
4535 return 0; /* not found */
4536 }
4537 return 0; /* not found */
4538 }
4539
4540 /* Delete all the elements with score between min and max from the skiplist.
4541 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4542 * Note that this function takes the reference to the hash table view of the
4543 * sorted set, in order to remove the elements from the hash table too. */
4544 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4545 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4546 unsigned long removed = 0;
4547 int i;
4548
4549 x = zsl->header;
4550 for (i = zsl->level-1; i >= 0; i--) {
4551 while (x->forward[i] && x->forward[i]->score < min)
4552 x = x->forward[i];
4553 update[i] = x;
4554 }
4555 /* We may have multiple elements with the same score, what we need
4556 * is to find the element with both the right score and object. */
4557 x = x->forward[0];
4558 while (x && x->score <= max) {
4559 zskiplistNode *next;
4560
4561 for (i = 0; i < zsl->level; i++) {
4562 if (update[i]->forward[i] != x) break;
4563 update[i]->forward[i] = x->forward[i];
4564 }
4565 if (x->forward[0]) {
4566 x->forward[0]->backward = (x->backward == zsl->header) ?
4567 NULL : x->backward;
4568 } else {
4569 zsl->tail = x->backward;
4570 }
4571 next = x->forward[0];
4572 dictDelete(dict,x->obj);
4573 zslFreeNode(x);
4574 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4575 zsl->level--;
4576 zsl->length--;
4577 removed++;
4578 x = next;
4579 }
4580 return removed; /* not found */
4581 }
4582
4583 /* Find the first node having a score equal or greater than the specified one.
4584 * Returns NULL if there is no match. */
4585 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4586 zskiplistNode *x;
4587 int i;
4588
4589 x = zsl->header;
4590 for (i = zsl->level-1; i >= 0; i--) {
4591 while (x->forward[i] && x->forward[i]->score < score)
4592 x = x->forward[i];
4593 }
4594 /* We may have multiple elements with the same score, what we need
4595 * is to find the element with both the right score and object. */
4596 return x->forward[0];
4597 }
4598
4599 /* The actual Z-commands implementations */
4600
4601 /* This generic command implements both ZADD and ZINCRBY.
4602 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4603 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4604 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4605 robj *zsetobj;
4606 zset *zs;
4607 double *score;
4608
4609 zsetobj = lookupKeyWrite(c->db,key);
4610 if (zsetobj == NULL) {
4611 zsetobj = createZsetObject();
4612 dictAdd(c->db->dict,key,zsetobj);
4613 incrRefCount(key);
4614 } else {
4615 if (zsetobj->type != REDIS_ZSET) {
4616 addReply(c,shared.wrongtypeerr);
4617 return;
4618 }
4619 }
4620 zs = zsetobj->ptr;
4621
4622 /* Ok now since we implement both ZADD and ZINCRBY here the code
4623 * needs to handle the two different conditions. It's all about setting
4624 * '*score', that is, the new score to set, to the right value. */
4625 score = zmalloc(sizeof(double));
4626 if (doincrement) {
4627 dictEntry *de;
4628
4629 /* Read the old score. If the element was not present starts from 0 */
4630 de = dictFind(zs->dict,ele);
4631 if (de) {
4632 double *oldscore = dictGetEntryVal(de);
4633 *score = *oldscore + scoreval;
4634 } else {
4635 *score = scoreval;
4636 }
4637 } else {
4638 *score = scoreval;
4639 }
4640
4641 /* What follows is a simple remove and re-insert operation that is common
4642 * to both ZADD and ZINCRBY... */
4643 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4644 /* case 1: New element */
4645 incrRefCount(ele); /* added to hash */
4646 zslInsert(zs->zsl,*score,ele);
4647 incrRefCount(ele); /* added to skiplist */
4648 server.dirty++;
4649 if (doincrement)
4650 addReplyDouble(c,*score);
4651 else
4652 addReply(c,shared.cone);
4653 } else {
4654 dictEntry *de;
4655 double *oldscore;
4656
4657 /* case 2: Score update operation */
4658 de = dictFind(zs->dict,ele);
4659 redisAssert(de != NULL);
4660 oldscore = dictGetEntryVal(de);
4661 if (*score != *oldscore) {
4662 int deleted;
4663
4664 /* Remove and insert the element in the skip list with new score */
4665 deleted = zslDelete(zs->zsl,*oldscore,ele);
4666 redisAssert(deleted != 0);
4667 zslInsert(zs->zsl,*score,ele);
4668 incrRefCount(ele);
4669 /* Update the score in the hash table */
4670 dictReplace(zs->dict,ele,score);
4671 server.dirty++;
4672 } else {
4673 zfree(score);
4674 }
4675 if (doincrement)
4676 addReplyDouble(c,*score);
4677 else
4678 addReply(c,shared.czero);
4679 }
4680 }
4681
4682 static void zaddCommand(redisClient *c) {
4683 double scoreval;
4684
4685 scoreval = strtod(c->argv[2]->ptr,NULL);
4686 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4687 }
4688
4689 static void zincrbyCommand(redisClient *c) {
4690 double scoreval;
4691
4692 scoreval = strtod(c->argv[2]->ptr,NULL);
4693 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4694 }
4695
4696 static void zremCommand(redisClient *c) {
4697 robj *zsetobj;
4698 zset *zs;
4699
4700 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4701 if (zsetobj == NULL) {
4702 addReply(c,shared.czero);
4703 } else {
4704 dictEntry *de;
4705 double *oldscore;
4706 int deleted;
4707
4708 if (zsetobj->type != REDIS_ZSET) {
4709 addReply(c,shared.wrongtypeerr);
4710 return;
4711 }
4712 zs = zsetobj->ptr;
4713 de = dictFind(zs->dict,c->argv[2]);
4714 if (de == NULL) {
4715 addReply(c,shared.czero);
4716 return;
4717 }
4718 /* Delete from the skiplist */
4719 oldscore = dictGetEntryVal(de);
4720 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4721 redisAssert(deleted != 0);
4722
4723 /* Delete from the hash table */
4724 dictDelete(zs->dict,c->argv[2]);
4725 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4726 server.dirty++;
4727 addReply(c,shared.cone);
4728 }
4729 }
4730
4731 static void zremrangebyscoreCommand(redisClient *c) {
4732 double min = strtod(c->argv[2]->ptr,NULL);
4733 double max = strtod(c->argv[3]->ptr,NULL);
4734 robj *zsetobj;
4735 zset *zs;
4736
4737 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4738 if (zsetobj == NULL) {
4739 addReply(c,shared.czero);
4740 } else {
4741 long deleted;
4742
4743 if (zsetobj->type != REDIS_ZSET) {
4744 addReply(c,shared.wrongtypeerr);
4745 return;
4746 }
4747 zs = zsetobj->ptr;
4748 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4749 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4750 server.dirty += deleted;
4751 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4752 }
4753 }
4754
4755 static void zrangeGenericCommand(redisClient *c, int reverse) {
4756 robj *o;
4757 int start = atoi(c->argv[2]->ptr);
4758 int end = atoi(c->argv[3]->ptr);
4759 int withscores = 0;
4760
4761 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4762 withscores = 1;
4763 } else if (c->argc >= 5) {
4764 addReply(c,shared.syntaxerr);
4765 return;
4766 }
4767
4768 o = lookupKeyRead(c->db,c->argv[1]);
4769 if (o == NULL) {
4770 addReply(c,shared.nullmultibulk);
4771 } else {
4772 if (o->type != REDIS_ZSET) {
4773 addReply(c,shared.wrongtypeerr);
4774 } else {
4775 zset *zsetobj = o->ptr;
4776 zskiplist *zsl = zsetobj->zsl;
4777 zskiplistNode *ln;
4778
4779 int llen = zsl->length;
4780 int rangelen, j;
4781 robj *ele;
4782
4783 /* convert negative indexes */
4784 if (start < 0) start = llen+start;
4785 if (end < 0) end = llen+end;
4786 if (start < 0) start = 0;
4787 if (end < 0) end = 0;
4788
4789 /* indexes sanity checks */
4790 if (start > end || start >= llen) {
4791 /* Out of range start or start > end result in empty list */
4792 addReply(c,shared.emptymultibulk);
4793 return;
4794 }
4795 if (end >= llen) end = llen-1;
4796 rangelen = (end-start)+1;
4797
4798 /* Return the result in form of a multi-bulk reply */
4799 if (reverse) {
4800 ln = zsl->tail;
4801 while (start--)
4802 ln = ln->backward;
4803 } else {
4804 ln = zsl->header->forward[0];
4805 while (start--)
4806 ln = ln->forward[0];
4807 }
4808
4809 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4810 withscores ? (rangelen*2) : rangelen));
4811 for (j = 0; j < rangelen; j++) {
4812 ele = ln->obj;
4813 addReplyBulkLen(c,ele);
4814 addReply(c,ele);
4815 addReply(c,shared.crlf);
4816 if (withscores)
4817 addReplyDouble(c,ln->score);
4818 ln = reverse ? ln->backward : ln->forward[0];
4819 }
4820 }
4821 }
4822 }
4823
4824 static void zrangeCommand(redisClient *c) {
4825 zrangeGenericCommand(c,0);
4826 }
4827
4828 static void zrevrangeCommand(redisClient *c) {
4829 zrangeGenericCommand(c,1);
4830 }
4831
4832 static void zrangebyscoreCommand(redisClient *c) {
4833 robj *o;
4834 double min = strtod(c->argv[2]->ptr,NULL);
4835 double max = strtod(c->argv[3]->ptr,NULL);
4836 int offset = 0, limit = -1;
4837
4838 if (c->argc != 4 && c->argc != 7) {
4839 addReplySds(c,
4840 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4841 return;
4842 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4843 addReply(c,shared.syntaxerr);
4844 return;
4845 } else if (c->argc == 7) {
4846 offset = atoi(c->argv[5]->ptr);
4847 limit = atoi(c->argv[6]->ptr);
4848 if (offset < 0) offset = 0;
4849 }
4850
4851 o = lookupKeyRead(c->db,c->argv[1]);
4852 if (o == NULL) {
4853 addReply(c,shared.nullmultibulk);
4854 } else {
4855 if (o->type != REDIS_ZSET) {
4856 addReply(c,shared.wrongtypeerr);
4857 } else {
4858 zset *zsetobj = o->ptr;
4859 zskiplist *zsl = zsetobj->zsl;
4860 zskiplistNode *ln;
4861 robj *ele, *lenobj;
4862 unsigned int rangelen = 0;
4863
4864 /* Get the first node with the score >= min */
4865 ln = zslFirstWithScore(zsl,min);
4866 if (ln == NULL) {
4867 /* No element matching the speciifed interval */
4868 addReply(c,shared.emptymultibulk);
4869 return;
4870 }
4871
4872 /* We don't know in advance how many matching elements there
4873 * are in the list, so we push this object that will represent
4874 * the multi-bulk length in the output buffer, and will "fix"
4875 * it later */
4876 lenobj = createObject(REDIS_STRING,NULL);
4877 addReply(c,lenobj);
4878 decrRefCount(lenobj);
4879
4880 while(ln && ln->score <= max) {
4881 if (offset) {
4882 offset--;
4883 ln = ln->forward[0];
4884 continue;
4885 }
4886 if (limit == 0) break;
4887 ele = ln->obj;
4888 addReplyBulkLen(c,ele);
4889 addReply(c,ele);
4890 addReply(c,shared.crlf);
4891 ln = ln->forward[0];
4892 rangelen++;
4893 if (limit > 0) limit--;
4894 }
4895 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4896 }
4897 }
4898 }
4899
4900 static void zcardCommand(redisClient *c) {
4901 robj *o;
4902 zset *zs;
4903
4904 o = lookupKeyRead(c->db,c->argv[1]);
4905 if (o == NULL) {
4906 addReply(c,shared.czero);
4907 return;
4908 } else {
4909 if (o->type != REDIS_ZSET) {
4910 addReply(c,shared.wrongtypeerr);
4911 } else {
4912 zs = o->ptr;
4913 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4914 }
4915 }
4916 }
4917
4918 static void zscoreCommand(redisClient *c) {
4919 robj *o;
4920 zset *zs;
4921
4922 o = lookupKeyRead(c->db,c->argv[1]);
4923 if (o == NULL) {
4924 addReply(c,shared.nullbulk);
4925 return;
4926 } else {
4927 if (o->type != REDIS_ZSET) {
4928 addReply(c,shared.wrongtypeerr);
4929 } else {
4930 dictEntry *de;
4931
4932 zs = o->ptr;
4933 de = dictFind(zs->dict,c->argv[2]);
4934 if (!de) {
4935 addReply(c,shared.nullbulk);
4936 } else {
4937 double *score = dictGetEntryVal(de);
4938
4939 addReplyDouble(c,*score);
4940 }
4941 }
4942 }
4943 }
4944
4945 /* ========================= Non type-specific commands ==================== */
4946
4947 static void flushdbCommand(redisClient *c) {
4948 server.dirty += dictSize(c->db->dict);
4949 dictEmpty(c->db->dict);
4950 dictEmpty(c->db->expires);
4951 addReply(c,shared.ok);
4952 }
4953
4954 static void flushallCommand(redisClient *c) {
4955 server.dirty += emptyDb();
4956 addReply(c,shared.ok);
4957 rdbSave(server.dbfilename);
4958 server.dirty++;
4959 }
4960
4961 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4962 redisSortOperation *so = zmalloc(sizeof(*so));
4963 so->type = type;
4964 so->pattern = pattern;
4965 return so;
4966 }
4967
4968 /* Return the value associated to the key with a name obtained
4969 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4970 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4971 char *p;
4972 sds spat, ssub;
4973 robj keyobj;
4974 int prefixlen, sublen, postfixlen;
4975 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4976 struct {
4977 long len;
4978 long free;
4979 char buf[REDIS_SORTKEY_MAX+1];
4980 } keyname;
4981
4982 /* If the pattern is "#" return the substitution object itself in order
4983 * to implement the "SORT ... GET #" feature. */
4984 spat = pattern->ptr;
4985 if (spat[0] == '#' && spat[1] == '\0') {
4986 return subst;
4987 }
4988
4989 /* The substitution object may be specially encoded. If so we create
4990 * a decoded object on the fly. Otherwise getDecodedObject will just
4991 * increment the ref count, that we'll decrement later. */
4992 subst = getDecodedObject(subst);
4993
4994 ssub = subst->ptr;
4995 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4996 p = strchr(spat,'*');
4997 if (!p) {
4998 decrRefCount(subst);
4999 return NULL;
5000 }
5001
5002 prefixlen = p-spat;
5003 sublen = sdslen(ssub);
5004 postfixlen = sdslen(spat)-(prefixlen+1);
5005 memcpy(keyname.buf,spat,prefixlen);
5006 memcpy(keyname.buf+prefixlen,ssub,sublen);
5007 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
5008 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
5009 keyname.len = prefixlen+sublen+postfixlen;
5010
5011 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
5012 decrRefCount(subst);
5013
5014 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
5015 return lookupKeyRead(db,&keyobj);
5016 }
5017
5018 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
5019 * the additional parameter is not standard but a BSD-specific we have to
5020 * pass sorting parameters via the global 'server' structure */
5021 static int sortCompare(const void *s1, const void *s2) {
5022 const redisSortObject *so1 = s1, *so2 = s2;
5023 int cmp;
5024
5025 if (!server.sort_alpha) {
5026 /* Numeric sorting. Here it's trivial as we precomputed scores */
5027 if (so1->u.score > so2->u.score) {
5028 cmp = 1;
5029 } else if (so1->u.score < so2->u.score) {
5030 cmp = -1;
5031 } else {
5032 cmp = 0;
5033 }
5034 } else {
5035 /* Alphanumeric sorting */
5036 if (server.sort_bypattern) {
5037 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5038 /* At least one compare object is NULL */
5039 if (so1->u.cmpobj == so2->u.cmpobj)
5040 cmp = 0;
5041 else if (so1->u.cmpobj == NULL)
5042 cmp = -1;
5043 else
5044 cmp = 1;
5045 } else {
5046 /* We have both the objects, use strcoll */
5047 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5048 }
5049 } else {
5050 /* Compare elements directly */
5051 robj *dec1, *dec2;
5052
5053 dec1 = getDecodedObject(so1->obj);
5054 dec2 = getDecodedObject(so2->obj);
5055 cmp = strcoll(dec1->ptr,dec2->ptr);
5056 decrRefCount(dec1);
5057 decrRefCount(dec2);
5058 }
5059 }
5060 return server.sort_desc ? -cmp : cmp;
5061 }
5062
5063 /* The SORT command is the most complex command in Redis. Warning: this code
5064 * is optimized for speed and a bit less for readability */
5065 static void sortCommand(redisClient *c) {
5066 list *operations;
5067 int outputlen = 0;
5068 int desc = 0, alpha = 0;
5069 int limit_start = 0, limit_count = -1, start, end;
5070 int j, dontsort = 0, vectorlen;
5071 int getop = 0; /* GET operation counter */
5072 robj *sortval, *sortby = NULL, *storekey = NULL;
5073 redisSortObject *vector; /* Resulting vector to sort */
5074
5075 /* Lookup the key to sort. It must be of the right types */
5076 sortval = lookupKeyRead(c->db,c->argv[1]);
5077 if (sortval == NULL) {
5078 addReply(c,shared.nullmultibulk);
5079 return;
5080 }
5081 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5082 sortval->type != REDIS_ZSET)
5083 {
5084 addReply(c,shared.wrongtypeerr);
5085 return;
5086 }
5087
5088 /* Create a list of operations to perform for every sorted element.
5089 * Operations can be GET/DEL/INCR/DECR */
5090 operations = listCreate();
5091 listSetFreeMethod(operations,zfree);
5092 j = 2;
5093
5094 /* Now we need to protect sortval incrementing its count, in the future
5095 * SORT may have options able to overwrite/delete keys during the sorting
5096 * and the sorted key itself may get destroied */
5097 incrRefCount(sortval);
5098
5099 /* The SORT command has an SQL-alike syntax, parse it */
5100 while(j < c->argc) {
5101 int leftargs = c->argc-j-1;
5102 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5103 desc = 0;
5104 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5105 desc = 1;
5106 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5107 alpha = 1;
5108 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5109 limit_start = atoi(c->argv[j+1]->ptr);
5110 limit_count = atoi(c->argv[j+2]->ptr);
5111 j+=2;
5112 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5113 storekey = c->argv[j+1];
5114 j++;
5115 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5116 sortby = c->argv[j+1];
5117 /* If the BY pattern does not contain '*', i.e. it is constant,
5118 * we don't need to sort nor to lookup the weight keys. */
5119 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5120 j++;
5121 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5122 listAddNodeTail(operations,createSortOperation(
5123 REDIS_SORT_GET,c->argv[j+1]));
5124 getop++;
5125 j++;
5126 } else {
5127 decrRefCount(sortval);
5128 listRelease(operations);
5129 addReply(c,shared.syntaxerr);
5130 return;
5131 }
5132 j++;
5133 }
5134
5135 /* Load the sorting vector with all the objects to sort */
5136 switch(sortval->type) {
5137 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5138 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5139 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5140 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5141 }
5142 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5143 j = 0;
5144
5145 if (sortval->type == REDIS_LIST) {
5146 list *list = sortval->ptr;
5147 listNode *ln;
5148
5149 listRewind(list);
5150 while((ln = listYield(list))) {
5151 robj *ele = ln->value;
5152 vector[j].obj = ele;
5153 vector[j].u.score = 0;
5154 vector[j].u.cmpobj = NULL;
5155 j++;
5156 }
5157 } else {
5158 dict *set;
5159 dictIterator *di;
5160 dictEntry *setele;
5161
5162 if (sortval->type == REDIS_SET) {
5163 set = sortval->ptr;
5164 } else {
5165 zset *zs = sortval->ptr;
5166 set = zs->dict;
5167 }
5168
5169 di = dictGetIterator(set);
5170 while((setele = dictNext(di)) != NULL) {
5171 vector[j].obj = dictGetEntryKey(setele);
5172 vector[j].u.score = 0;
5173 vector[j].u.cmpobj = NULL;
5174 j++;
5175 }
5176 dictReleaseIterator(di);
5177 }
5178 redisAssert(j == vectorlen);
5179
5180 /* Now it's time to load the right scores in the sorting vector */
5181 if (dontsort == 0) {
5182 for (j = 0; j < vectorlen; j++) {
5183 if (sortby) {
5184 robj *byval;
5185
5186 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5187 if (!byval || byval->type != REDIS_STRING) continue;
5188 if (alpha) {
5189 vector[j].u.cmpobj = getDecodedObject(byval);
5190 } else {
5191 if (byval->encoding == REDIS_ENCODING_RAW) {
5192 vector[j].u.score = strtod(byval->ptr,NULL);
5193 } else {
5194 /* Don't need to decode the object if it's
5195 * integer-encoded (the only encoding supported) so
5196 * far. We can just cast it */
5197 if (byval->encoding == REDIS_ENCODING_INT) {
5198 vector[j].u.score = (long)byval->ptr;
5199 } else
5200 redisAssert(1 != 1);
5201 }
5202 }
5203 } else {
5204 if (!alpha) {
5205 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5206 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5207 else {
5208 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5209 vector[j].u.score = (long) vector[j].obj->ptr;
5210 else
5211 redisAssert(1 != 1);
5212 }
5213 }
5214 }
5215 }
5216 }
5217
5218 /* We are ready to sort the vector... perform a bit of sanity check
5219 * on the LIMIT option too. We'll use a partial version of quicksort. */
5220 start = (limit_start < 0) ? 0 : limit_start;
5221 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5222 if (start >= vectorlen) {
5223 start = vectorlen-1;
5224 end = vectorlen-2;
5225 }
5226 if (end >= vectorlen) end = vectorlen-1;
5227
5228 if (dontsort == 0) {
5229 server.sort_desc = desc;
5230 server.sort_alpha = alpha;
5231 server.sort_bypattern = sortby ? 1 : 0;
5232 if (sortby && (start != 0 || end != vectorlen-1))
5233 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5234 else
5235 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5236 }
5237
5238 /* Send command output to the output buffer, performing the specified
5239 * GET/DEL/INCR/DECR operations if any. */
5240 outputlen = getop ? getop*(end-start+1) : end-start+1;
5241 if (storekey == NULL) {
5242 /* STORE option not specified, sent the sorting result to client */
5243 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5244 for (j = start; j <= end; j++) {
5245 listNode *ln;
5246 if (!getop) {
5247 addReplyBulkLen(c,vector[j].obj);
5248 addReply(c,vector[j].obj);
5249 addReply(c,shared.crlf);
5250 }
5251 listRewind(operations);
5252 while((ln = listYield(operations))) {
5253 redisSortOperation *sop = ln->value;
5254 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5255 vector[j].obj);
5256
5257 if (sop->type == REDIS_SORT_GET) {
5258 if (!val || val->type != REDIS_STRING) {
5259 addReply(c,shared.nullbulk);
5260 } else {
5261 addReplyBulkLen(c,val);
5262 addReply(c,val);
5263 addReply(c,shared.crlf);
5264 }
5265 } else {
5266 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5267 }
5268 }
5269 }
5270 } else {
5271 robj *listObject = createListObject();
5272 list *listPtr = (list*) listObject->ptr;
5273
5274 /* STORE option specified, set the sorting result as a List object */
5275 for (j = start; j <= end; j++) {
5276 listNode *ln;
5277 if (!getop) {
5278 listAddNodeTail(listPtr,vector[j].obj);
5279 incrRefCount(vector[j].obj);
5280 }
5281 listRewind(operations);
5282 while((ln = listYield(operations))) {
5283 redisSortOperation *sop = ln->value;
5284 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5285 vector[j].obj);
5286
5287 if (sop->type == REDIS_SORT_GET) {
5288 if (!val || val->type != REDIS_STRING) {
5289 listAddNodeTail(listPtr,createStringObject("",0));
5290 } else {
5291 listAddNodeTail(listPtr,val);
5292 incrRefCount(val);
5293 }
5294 } else {
5295 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5296 }
5297 }
5298 }
5299 if (dictReplace(c->db->dict,storekey,listObject)) {
5300 incrRefCount(storekey);
5301 }
5302 /* Note: we add 1 because the DB is dirty anyway since even if the
5303 * SORT result is empty a new key is set and maybe the old content
5304 * replaced. */
5305 server.dirty += 1+outputlen;
5306 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5307 }
5308
5309 /* Cleanup */
5310 decrRefCount(sortval);
5311 listRelease(operations);
5312 for (j = 0; j < vectorlen; j++) {
5313 if (sortby && alpha && vector[j].u.cmpobj)
5314 decrRefCount(vector[j].u.cmpobj);
5315 }
5316 zfree(vector);
5317 }
5318
5319 /* Create the string returned by the INFO command. This is decoupled
5320 * by the INFO command itself as we need to report the same information
5321 * on memory corruption problems. */
5322 static sds genRedisInfoString(void) {
5323 sds info;
5324 time_t uptime = time(NULL)-server.stat_starttime;
5325 int j;
5326
5327 info = sdscatprintf(sdsempty(),
5328 "redis_version:%s\r\n"
5329 "arch_bits:%s\r\n"
5330 "multiplexing_api:%s\r\n"
5331 "uptime_in_seconds:%ld\r\n"
5332 "uptime_in_days:%ld\r\n"
5333 "connected_clients:%d\r\n"
5334 "connected_slaves:%d\r\n"
5335 "blocked_clients:%d\r\n"
5336 "used_memory:%zu\r\n"
5337 "changes_since_last_save:%lld\r\n"
5338 "bgsave_in_progress:%d\r\n"
5339 "last_save_time:%ld\r\n"
5340 "bgrewriteaof_in_progress:%d\r\n"
5341 "total_connections_received:%lld\r\n"
5342 "total_commands_processed:%lld\r\n"
5343 "role:%s\r\n"
5344 ,REDIS_VERSION,
5345 (sizeof(long) == 8) ? "64" : "32",
5346 aeGetApiName(),
5347 uptime,
5348 uptime/(3600*24),
5349 listLength(server.clients)-listLength(server.slaves),
5350 listLength(server.slaves),
5351 server.blockedclients,
5352 server.usedmemory,
5353 server.dirty,
5354 server.bgsavechildpid != -1,
5355 server.lastsave,
5356 server.bgrewritechildpid != -1,
5357 server.stat_numconnections,
5358 server.stat_numcommands,
5359 server.masterhost == NULL ? "master" : "slave"
5360 );
5361 if (server.masterhost) {
5362 info = sdscatprintf(info,
5363 "master_host:%s\r\n"
5364 "master_port:%d\r\n"
5365 "master_link_status:%s\r\n"
5366 "master_last_io_seconds_ago:%d\r\n"
5367 ,server.masterhost,
5368 server.masterport,
5369 (server.replstate == REDIS_REPL_CONNECTED) ?
5370 "up" : "down",
5371 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5372 );
5373 }
5374 for (j = 0; j < server.dbnum; j++) {
5375 long long keys, vkeys;
5376
5377 keys = dictSize(server.db[j].dict);
5378 vkeys = dictSize(server.db[j].expires);
5379 if (keys || vkeys) {
5380 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5381 j, keys, vkeys);
5382 }
5383 }
5384 return info;
5385 }
5386
5387 static void infoCommand(redisClient *c) {
5388 sds info = genRedisInfoString();
5389 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5390 (unsigned long)sdslen(info)));
5391 addReplySds(c,info);
5392 addReply(c,shared.crlf);
5393 }
5394
5395 static void monitorCommand(redisClient *c) {
5396 /* ignore MONITOR if aleady slave or in monitor mode */
5397 if (c->flags & REDIS_SLAVE) return;
5398
5399 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5400 c->slaveseldb = 0;
5401 listAddNodeTail(server.monitors,c);
5402 addReply(c,shared.ok);
5403 }
5404
5405 /* ================================= Expire ================================= */
5406 static int removeExpire(redisDb *db, robj *key) {
5407 if (dictDelete(db->expires,key) == DICT_OK) {
5408 return 1;
5409 } else {
5410 return 0;
5411 }
5412 }
5413
5414 static int setExpire(redisDb *db, robj *key, time_t when) {
5415 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5416 return 0;
5417 } else {
5418 incrRefCount(key);
5419 return 1;
5420 }
5421 }
5422
5423 /* Return the expire time of the specified key, or -1 if no expire
5424 * is associated with this key (i.e. the key is non volatile) */
5425 static time_t getExpire(redisDb *db, robj *key) {
5426 dictEntry *de;
5427
5428 /* No expire? return ASAP */
5429 if (dictSize(db->expires) == 0 ||
5430 (de = dictFind(db->expires,key)) == NULL) return -1;
5431
5432 return (time_t) dictGetEntryVal(de);
5433 }
5434
5435 static int expireIfNeeded(redisDb *db, robj *key) {
5436 time_t when;
5437 dictEntry *de;
5438
5439 /* No expire? return ASAP */
5440 if (dictSize(db->expires) == 0 ||
5441 (de = dictFind(db->expires,key)) == NULL) return 0;
5442
5443 /* Lookup the expire */
5444 when = (time_t) dictGetEntryVal(de);
5445 if (time(NULL) <= when) return 0;
5446
5447 /* Delete the key */
5448 dictDelete(db->expires,key);
5449 return dictDelete(db->dict,key) == DICT_OK;
5450 }
5451
5452 static int deleteIfVolatile(redisDb *db, robj *key) {
5453 dictEntry *de;
5454
5455 /* No expire? return ASAP */
5456 if (dictSize(db->expires) == 0 ||
5457 (de = dictFind(db->expires,key)) == NULL) return 0;
5458
5459 /* Delete the key */
5460 server.dirty++;
5461 dictDelete(db->expires,key);
5462 return dictDelete(db->dict,key) == DICT_OK;
5463 }
5464
5465 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5466 dictEntry *de;
5467
5468 de = dictFind(c->db->dict,key);
5469 if (de == NULL) {
5470 addReply(c,shared.czero);
5471 return;
5472 }
5473 if (seconds < 0) {
5474 if (deleteKey(c->db,key)) server.dirty++;
5475 addReply(c, shared.cone);
5476 return;
5477 } else {
5478 time_t when = time(NULL)+seconds;
5479 if (setExpire(c->db,key,when)) {
5480 addReply(c,shared.cone);
5481 server.dirty++;
5482 } else {
5483 addReply(c,shared.czero);
5484 }
5485 return;
5486 }
5487 }
5488
5489 static void expireCommand(redisClient *c) {
5490 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5491 }
5492
5493 static void expireatCommand(redisClient *c) {
5494 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5495 }
5496
5497 static void ttlCommand(redisClient *c) {
5498 time_t expire;
5499 int ttl = -1;
5500
5501 expire = getExpire(c->db,c->argv[1]);
5502 if (expire != -1) {
5503 ttl = (int) (expire-time(NULL));
5504 if (ttl < 0) ttl = -1;
5505 }
5506 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5507 }
5508
5509 /* ================================ MULTI/EXEC ============================== */
5510
5511 /* Client state initialization for MULTI/EXEC */
5512 static void initClientMultiState(redisClient *c) {
5513 c->mstate.commands = NULL;
5514 c->mstate.count = 0;
5515 }
5516
5517 /* Release all the resources associated with MULTI/EXEC state */
5518 static void freeClientMultiState(redisClient *c) {
5519 int j;
5520
5521 for (j = 0; j < c->mstate.count; j++) {
5522 int i;
5523 multiCmd *mc = c->mstate.commands+j;
5524
5525 for (i = 0; i < mc->argc; i++)
5526 decrRefCount(mc->argv[i]);
5527 zfree(mc->argv);
5528 }
5529 zfree(c->mstate.commands);
5530 }
5531
5532 /* Add a new command into the MULTI commands queue */
5533 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5534 multiCmd *mc;
5535 int j;
5536
5537 c->mstate.commands = zrealloc(c->mstate.commands,
5538 sizeof(multiCmd)*(c->mstate.count+1));
5539 mc = c->mstate.commands+c->mstate.count;
5540 mc->cmd = cmd;
5541 mc->argc = c->argc;
5542 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5543 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5544 for (j = 0; j < c->argc; j++)
5545 incrRefCount(mc->argv[j]);
5546 c->mstate.count++;
5547 }
5548
5549 static void multiCommand(redisClient *c) {
5550 c->flags |= REDIS_MULTI;
5551 addReply(c,shared.ok);
5552 }
5553
5554 static void execCommand(redisClient *c) {
5555 int j;
5556 robj **orig_argv;
5557 int orig_argc;
5558
5559 if (!(c->flags & REDIS_MULTI)) {
5560 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5561 return;
5562 }
5563
5564 orig_argv = c->argv;
5565 orig_argc = c->argc;
5566 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5567 for (j = 0; j < c->mstate.count; j++) {
5568 c->argc = c->mstate.commands[j].argc;
5569 c->argv = c->mstate.commands[j].argv;
5570 call(c,c->mstate.commands[j].cmd);
5571 }
5572 c->argv = orig_argv;
5573 c->argc = orig_argc;
5574 freeClientMultiState(c);
5575 initClientMultiState(c);
5576 c->flags &= (~REDIS_MULTI);
5577 }
5578
5579 /* =========================== Blocking Operations ========================= */
5580
5581 /* Currently Redis blocking operations support is limited to list POP ops,
5582 * so the current implementation is not fully generic, but it is also not
5583 * completely specific so it will not require a rewrite to support new
5584 * kind of blocking operations in the future.
5585 *
5586 * Still it's important to note that list blocking operations can be already
5587 * used as a notification mechanism in order to implement other blocking
5588 * operations at application level, so there must be a very strong evidence
5589 * of usefulness and generality before new blocking operations are implemented.
5590 *
5591 * This is how the current blocking POP works, we use BLPOP as example:
5592 * - If the user calls BLPOP and the key exists and contains a non empty list
5593 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5594 * if there is not to block.
5595 * - If instead BLPOP is called and the key does not exists or the list is
5596 * empty we need to block. In order to do so we remove the notification for
5597 * new data to read in the client socket (so that we'll not serve new
5598 * requests if the blocking request is not served). Also we put the client
5599 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5600 * blocking for this keys.
5601 * - If a PUSH operation against a key with blocked clients waiting is
5602 * performed, we serve the first in the list: basically instead to push
5603 * the new element inside the list we return it to the (first / oldest)
5604 * blocking client, unblock the client, and remove it form the list.
5605 *
5606 * The above comment and the source code should be enough in order to understand
5607 * the implementation and modify / fix it later.
5608 */
5609
5610 /* Set a client in blocking mode for the specified key, with the specified
5611 * timeout */
5612 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5613 dictEntry *de;
5614 list *l;
5615 int j;
5616
5617 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5618 c->blockingkeysnum = numkeys;
5619 c->blockingto = timeout;
5620 for (j = 0; j < numkeys; j++) {
5621 /* Add the key in the client structure, to map clients -> keys */
5622 c->blockingkeys[j] = keys[j];
5623 incrRefCount(keys[j]);
5624
5625 /* And in the other "side", to map keys -> clients */
5626 de = dictFind(c->db->blockingkeys,keys[j]);
5627 if (de == NULL) {
5628 int retval;
5629
5630 /* For every key we take a list of clients blocked for it */
5631 l = listCreate();
5632 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5633 incrRefCount(keys[j]);
5634 assert(retval == DICT_OK);
5635 } else {
5636 l = dictGetEntryVal(de);
5637 }
5638 listAddNodeTail(l,c);
5639 }
5640 /* Mark the client as a blocked client */
5641 c->flags |= REDIS_BLOCKED;
5642 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5643 server.blockedclients++;
5644 }
5645
5646 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5647 static void unblockClient(redisClient *c) {
5648 dictEntry *de;
5649 list *l;
5650 int j;
5651
5652 assert(c->blockingkeys != NULL);
5653 /* The client may wait for multiple keys, so unblock it for every key. */
5654 for (j = 0; j < c->blockingkeysnum; j++) {
5655 /* Remove this client from the list of clients waiting for this key. */
5656 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5657 assert(de != NULL);
5658 l = dictGetEntryVal(de);
5659 listDelNode(l,listSearchKey(l,c));
5660 /* If the list is empty we need to remove it to avoid wasting memory */
5661 if (listLength(l) == 0)
5662 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5663 decrRefCount(c->blockingkeys[j]);
5664 }
5665 /* Cleanup the client structure */
5666 zfree(c->blockingkeys);
5667 c->blockingkeys = NULL;
5668 c->flags &= (~REDIS_BLOCKED);
5669 server.blockedclients--;
5670 /* Ok now we are ready to get read events from socket, note that we
5671 * can't trap errors here as it's possible that unblockClients() is
5672 * called from freeClient() itself, and the only thing we can do
5673 * if we failed to register the READABLE event is to kill the client.
5674 * Still the following function should never fail in the real world as
5675 * we are sure the file descriptor is sane, and we exit on out of mem. */
5676 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5677 /* As a final step we want to process data if there is some command waiting
5678 * in the input buffer. Note that this is safe even if unblockClient()
5679 * gets called from freeClient() because freeClient() will be smart
5680 * enough to call this function *after* c->querybuf was set to NULL. */
5681 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5682 }
5683
5684 /* This should be called from any function PUSHing into lists.
5685 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5686 * 'ele' is the element pushed.
5687 *
5688 * If the function returns 0 there was no client waiting for a list push
5689 * against this key.
5690 *
5691 * If the function returns 1 there was a client waiting for a list push
5692 * against this key, the element was passed to this client thus it's not
5693 * needed to actually add it to the list and the caller should return asap. */
5694 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5695 struct dictEntry *de;
5696 redisClient *receiver;
5697 list *l;
5698 listNode *ln;
5699
5700 de = dictFind(c->db->blockingkeys,key);
5701 if (de == NULL) return 0;
5702 l = dictGetEntryVal(de);
5703 ln = listFirst(l);
5704 assert(ln != NULL);
5705 receiver = ln->value;
5706
5707 addReplySds(receiver,sdsnew("*2\r\n"));
5708 addReplyBulkLen(receiver,key);
5709 addReply(receiver,key);
5710 addReply(receiver,shared.crlf);
5711 addReplyBulkLen(receiver,ele);
5712 addReply(receiver,ele);
5713 addReply(receiver,shared.crlf);
5714 unblockClient(receiver);
5715 return 1;
5716 }
5717
5718 /* Blocking RPOP/LPOP */
5719 static void blockingPopGenericCommand(redisClient *c, int where) {
5720 robj *o;
5721 time_t timeout;
5722 int j;
5723
5724 for (j = 1; j < c->argc-1; j++) {
5725 o = lookupKeyWrite(c->db,c->argv[j]);
5726 if (o != NULL) {
5727 if (o->type != REDIS_LIST) {
5728 addReply(c,shared.wrongtypeerr);
5729 return;
5730 } else {
5731 list *list = o->ptr;
5732 if (listLength(list) != 0) {
5733 /* If the list contains elements fall back to the usual
5734 * non-blocking POP operation */
5735 robj *argv[2], **orig_argv;
5736 int orig_argc;
5737
5738 /* We need to alter the command arguments before to call
5739 * popGenericCommand() as the command takes a single key. */
5740 orig_argv = c->argv;
5741 orig_argc = c->argc;
5742 argv[1] = c->argv[j];
5743 c->argv = argv;
5744 c->argc = 2;
5745
5746 /* Also the return value is different, we need to output
5747 * the multi bulk reply header and the key name. The
5748 * "real" command will add the last element (the value)
5749 * for us. If this souds like an hack to you it's just
5750 * because it is... */
5751 addReplySds(c,sdsnew("*2\r\n"));
5752 addReplyBulkLen(c,argv[1]);
5753 addReply(c,argv[1]);
5754 addReply(c,shared.crlf);
5755 popGenericCommand(c,where);
5756
5757 /* Fix the client structure with the original stuff */
5758 c->argv = orig_argv;
5759 c->argc = orig_argc;
5760 return;
5761 }
5762 }
5763 }
5764 }
5765 /* If the list is empty or the key does not exists we must block */
5766 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5767 if (timeout > 0) timeout += time(NULL);
5768 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5769 }
5770
5771 static void blpopCommand(redisClient *c) {
5772 blockingPopGenericCommand(c,REDIS_HEAD);
5773 }
5774
5775 static void brpopCommand(redisClient *c) {
5776 blockingPopGenericCommand(c,REDIS_TAIL);
5777 }
5778
5779 /* =============================== Replication ============================= */
5780
5781 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5782 ssize_t nwritten, ret = size;
5783 time_t start = time(NULL);
5784
5785 timeout++;
5786 while(size) {
5787 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5788 nwritten = write(fd,ptr,size);
5789 if (nwritten == -1) return -1;
5790 ptr += nwritten;
5791 size -= nwritten;
5792 }
5793 if ((time(NULL)-start) > timeout) {
5794 errno = ETIMEDOUT;
5795 return -1;
5796 }
5797 }
5798 return ret;
5799 }
5800
5801 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5802 ssize_t nread, totread = 0;
5803 time_t start = time(NULL);
5804
5805 timeout++;
5806 while(size) {
5807 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5808 nread = read(fd,ptr,size);
5809 if (nread == -1) return -1;
5810 ptr += nread;
5811 size -= nread;
5812 totread += nread;
5813 }
5814 if ((time(NULL)-start) > timeout) {
5815 errno = ETIMEDOUT;
5816 return -1;
5817 }
5818 }
5819 return totread;
5820 }
5821
5822 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5823 ssize_t nread = 0;
5824
5825 size--;
5826 while(size) {
5827 char c;
5828
5829 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5830 if (c == '\n') {
5831 *ptr = '\0';
5832 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5833 return nread;
5834 } else {
5835 *ptr++ = c;
5836 *ptr = '\0';
5837 nread++;
5838 }
5839 }
5840 return nread;
5841 }
5842
5843 static void syncCommand(redisClient *c) {
5844 /* ignore SYNC if aleady slave or in monitor mode */
5845 if (c->flags & REDIS_SLAVE) return;
5846
5847 /* SYNC can't be issued when the server has pending data to send to
5848 * the client about already issued commands. We need a fresh reply
5849 * buffer registering the differences between the BGSAVE and the current
5850 * dataset, so that we can copy to other slaves if needed. */
5851 if (listLength(c->reply) != 0) {
5852 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5853 return;
5854 }
5855
5856 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5857 /* Here we need to check if there is a background saving operation
5858 * in progress, or if it is required to start one */
5859 if (server.bgsavechildpid != -1) {
5860 /* Ok a background save is in progress. Let's check if it is a good
5861 * one for replication, i.e. if there is another slave that is
5862 * registering differences since the server forked to save */
5863 redisClient *slave;
5864 listNode *ln;
5865
5866 listRewind(server.slaves);
5867 while((ln = listYield(server.slaves))) {
5868 slave = ln->value;
5869 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5870 }
5871 if (ln) {
5872 /* Perfect, the server is already registering differences for
5873 * another slave. Set the right state, and copy the buffer. */
5874 listRelease(c->reply);
5875 c->reply = listDup(slave->reply);
5876 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5877 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5878 } else {
5879 /* No way, we need to wait for the next BGSAVE in order to
5880 * register differences */
5881 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5882 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5883 }
5884 } else {
5885 /* Ok we don't have a BGSAVE in progress, let's start one */
5886 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5887 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5888 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5889 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5890 return;
5891 }
5892 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5893 }
5894 c->repldbfd = -1;
5895 c->flags |= REDIS_SLAVE;
5896 c->slaveseldb = 0;
5897 listAddNodeTail(server.slaves,c);
5898 return;
5899 }
5900
5901 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5902 redisClient *slave = privdata;
5903 REDIS_NOTUSED(el);
5904 REDIS_NOTUSED(mask);
5905 char buf[REDIS_IOBUF_LEN];
5906 ssize_t nwritten, buflen;
5907
5908 if (slave->repldboff == 0) {
5909 /* Write the bulk write count before to transfer the DB. In theory here
5910 * we don't know how much room there is in the output buffer of the
5911 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5912 * operations) will never be smaller than the few bytes we need. */
5913 sds bulkcount;
5914
5915 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5916 slave->repldbsize);
5917 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5918 {
5919 sdsfree(bulkcount);
5920 freeClient(slave);
5921 return;
5922 }
5923 sdsfree(bulkcount);
5924 }
5925 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5926 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5927 if (buflen <= 0) {
5928 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5929 (buflen == 0) ? "premature EOF" : strerror(errno));
5930 freeClient(slave);
5931 return;
5932 }
5933 if ((nwritten = write(fd,buf,buflen)) == -1) {
5934 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5935 strerror(errno));
5936 freeClient(slave);
5937 return;
5938 }
5939 slave->repldboff += nwritten;
5940 if (slave->repldboff == slave->repldbsize) {
5941 close(slave->repldbfd);
5942 slave->repldbfd = -1;
5943 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5944 slave->replstate = REDIS_REPL_ONLINE;
5945 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5946 sendReplyToClient, slave) == AE_ERR) {
5947 freeClient(slave);
5948 return;
5949 }
5950 addReplySds(slave,sdsempty());
5951 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5952 }
5953 }
5954
5955 /* This function is called at the end of every backgrond saving.
5956 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5957 * otherwise REDIS_ERR is passed to the function.
5958 *
5959 * The goal of this function is to handle slaves waiting for a successful
5960 * background saving in order to perform non-blocking synchronization. */
5961 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5962 listNode *ln;
5963 int startbgsave = 0;
5964
5965 listRewind(server.slaves);
5966 while((ln = listYield(server.slaves))) {
5967 redisClient *slave = ln->value;
5968
5969 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5970 startbgsave = 1;
5971 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5972 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5973 struct redis_stat buf;
5974
5975 if (bgsaveerr != REDIS_OK) {
5976 freeClient(slave);
5977 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5978 continue;
5979 }
5980 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5981 redis_fstat(slave->repldbfd,&buf) == -1) {
5982 freeClient(slave);
5983 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5984 continue;
5985 }
5986 slave->repldboff = 0;
5987 slave->repldbsize = buf.st_size;
5988 slave->replstate = REDIS_REPL_SEND_BULK;
5989 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5990 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5991 freeClient(slave);
5992 continue;
5993 }
5994 }
5995 }
5996 if (startbgsave) {
5997 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5998 listRewind(server.slaves);
5999 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
6000 while((ln = listYield(server.slaves))) {
6001 redisClient *slave = ln->value;
6002
6003 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
6004 freeClient(slave);
6005 }
6006 }
6007 }
6008 }
6009
6010 static int syncWithMaster(void) {
6011 char buf[1024], tmpfile[256], authcmd[1024];
6012 int dumpsize;
6013 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
6014 int dfd;
6015
6016 if (fd == -1) {
6017 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
6018 strerror(errno));
6019 return REDIS_ERR;
6020 }
6021
6022 /* AUTH with the master if required. */
6023 if(server.masterauth) {
6024 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
6025 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
6026 close(fd);
6027 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
6028 strerror(errno));
6029 return REDIS_ERR;
6030 }
6031 /* Read the AUTH result. */
6032 if (syncReadLine(fd,buf,1024,3600) == -1) {
6033 close(fd);
6034 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6035 strerror(errno));
6036 return REDIS_ERR;
6037 }
6038 if (buf[0] != '+') {
6039 close(fd);
6040 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6041 return REDIS_ERR;
6042 }
6043 }
6044
6045 /* Issue the SYNC command */
6046 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6047 close(fd);
6048 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6049 strerror(errno));
6050 return REDIS_ERR;
6051 }
6052 /* Read the bulk write count */
6053 if (syncReadLine(fd,buf,1024,3600) == -1) {
6054 close(fd);
6055 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6056 strerror(errno));
6057 return REDIS_ERR;
6058 }
6059 if (buf[0] != '$') {
6060 close(fd);
6061 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6062 return REDIS_ERR;
6063 }
6064 dumpsize = atoi(buf+1);
6065 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6066 /* Read the bulk write data on a temp file */
6067 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6068 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6069 if (dfd == -1) {
6070 close(fd);
6071 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6072 return REDIS_ERR;
6073 }
6074 while(dumpsize) {
6075 int nread, nwritten;
6076
6077 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6078 if (nread == -1) {
6079 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6080 strerror(errno));
6081 close(fd);
6082 close(dfd);
6083 return REDIS_ERR;
6084 }
6085 nwritten = write(dfd,buf,nread);
6086 if (nwritten == -1) {
6087 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6088 close(fd);
6089 close(dfd);
6090 return REDIS_ERR;
6091 }
6092 dumpsize -= nread;
6093 }
6094 close(dfd);
6095 if (rename(tmpfile,server.dbfilename) == -1) {
6096 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6097 unlink(tmpfile);
6098 close(fd);
6099 return REDIS_ERR;
6100 }
6101 emptyDb();
6102 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6103 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6104 close(fd);
6105 return REDIS_ERR;
6106 }
6107 server.master = createClient(fd);
6108 server.master->flags |= REDIS_MASTER;
6109 server.master->authenticated = 1;
6110 server.replstate = REDIS_REPL_CONNECTED;
6111 return REDIS_OK;
6112 }
6113
6114 static void slaveofCommand(redisClient *c) {
6115 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6116 !strcasecmp(c->argv[2]->ptr,"one")) {
6117 if (server.masterhost) {
6118 sdsfree(server.masterhost);
6119 server.masterhost = NULL;
6120 if (server.master) freeClient(server.master);
6121 server.replstate = REDIS_REPL_NONE;
6122 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6123 }
6124 } else {
6125 sdsfree(server.masterhost);
6126 server.masterhost = sdsdup(c->argv[1]->ptr);
6127 server.masterport = atoi(c->argv[2]->ptr);
6128 if (server.master) freeClient(server.master);
6129 server.replstate = REDIS_REPL_CONNECT;
6130 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6131 server.masterhost, server.masterport);
6132 }
6133 addReply(c,shared.ok);
6134 }
6135
6136 /* ============================ Maxmemory directive ======================== */
6137
6138 /* This function gets called when 'maxmemory' is set on the config file to limit
6139 * the max memory used by the server, and we are out of memory.
6140 * This function will try to, in order:
6141 *
6142 * - Free objects from the free list
6143 * - Try to remove keys with an EXPIRE set
6144 *
6145 * It is not possible to free enough memory to reach used-memory < maxmemory
6146 * the server will start refusing commands that will enlarge even more the
6147 * memory usage.
6148 */
6149 static void freeMemoryIfNeeded(void) {
6150 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6151 if (listLength(server.objfreelist)) {
6152 robj *o;
6153
6154 listNode *head = listFirst(server.objfreelist);
6155 o = listNodeValue(head);
6156 listDelNode(server.objfreelist,head);
6157 zfree(o);
6158 } else {
6159 int j, k, freed = 0;
6160
6161 for (j = 0; j < server.dbnum; j++) {
6162 int minttl = -1;
6163 robj *minkey = NULL;
6164 struct dictEntry *de;
6165
6166 if (dictSize(server.db[j].expires)) {
6167 freed = 1;
6168 /* From a sample of three keys drop the one nearest to
6169 * the natural expire */
6170 for (k = 0; k < 3; k++) {
6171 time_t t;
6172
6173 de = dictGetRandomKey(server.db[j].expires);
6174 t = (time_t) dictGetEntryVal(de);
6175 if (minttl == -1 || t < minttl) {
6176 minkey = dictGetEntryKey(de);
6177 minttl = t;
6178 }
6179 }
6180 deleteKey(server.db+j,minkey);
6181 }
6182 }
6183 if (!freed) return; /* nothing to free... */
6184 }
6185 }
6186 }
6187
6188 /* ============================== Append Only file ========================== */
6189
6190 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6191 sds buf = sdsempty();
6192 int j;
6193 ssize_t nwritten;
6194 time_t now;
6195 robj *tmpargv[3];
6196
6197 /* The DB this command was targetting is not the same as the last command
6198 * we appendend. To issue a SELECT command is needed. */
6199 if (dictid != server.appendseldb) {
6200 char seldb[64];
6201
6202 snprintf(seldb,sizeof(seldb),"%d",dictid);
6203 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6204 (unsigned long)strlen(seldb),seldb);
6205 server.appendseldb = dictid;
6206 }
6207
6208 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6209 * EXPIREs into EXPIREATs calls */
6210 if (cmd->proc == expireCommand) {
6211 long when;
6212
6213 tmpargv[0] = createStringObject("EXPIREAT",8);
6214 tmpargv[1] = argv[1];
6215 incrRefCount(argv[1]);
6216 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6217 tmpargv[2] = createObject(REDIS_STRING,
6218 sdscatprintf(sdsempty(),"%ld",when));
6219 argv = tmpargv;
6220 }
6221
6222 /* Append the actual command */
6223 buf = sdscatprintf(buf,"*%d\r\n",argc);
6224 for (j = 0; j < argc; j++) {
6225 robj *o = argv[j];
6226
6227 o = getDecodedObject(o);
6228 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6229 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6230 buf = sdscatlen(buf,"\r\n",2);
6231 decrRefCount(o);
6232 }
6233
6234 /* Free the objects from the modified argv for EXPIREAT */
6235 if (cmd->proc == expireCommand) {
6236 for (j = 0; j < 3; j++)
6237 decrRefCount(argv[j]);
6238 }
6239
6240 /* We want to perform a single write. This should be guaranteed atomic
6241 * at least if the filesystem we are writing is a real physical one.
6242 * While this will save us against the server being killed I don't think
6243 * there is much to do about the whole server stopping for power problems
6244 * or alike */
6245 nwritten = write(server.appendfd,buf,sdslen(buf));
6246 if (nwritten != (signed)sdslen(buf)) {
6247 /* Ooops, we are in troubles. The best thing to do for now is
6248 * to simply exit instead to give the illusion that everything is
6249 * working as expected. */
6250 if (nwritten == -1) {
6251 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6252 } else {
6253 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6254 }
6255 exit(1);
6256 }
6257 /* If a background append only file rewriting is in progress we want to
6258 * accumulate the differences between the child DB and the current one
6259 * in a buffer, so that when the child process will do its work we
6260 * can append the differences to the new append only file. */
6261 if (server.bgrewritechildpid != -1)
6262 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6263
6264 sdsfree(buf);
6265 now = time(NULL);
6266 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6267 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6268 now-server.lastfsync > 1))
6269 {
6270 fsync(server.appendfd); /* Let's try to get this data on the disk */
6271 server.lastfsync = now;
6272 }
6273 }
6274
6275 /* In Redis commands are always executed in the context of a client, so in
6276 * order to load the append only file we need to create a fake client. */
6277 static struct redisClient *createFakeClient(void) {
6278 struct redisClient *c = zmalloc(sizeof(*c));
6279
6280 selectDb(c,0);
6281 c->fd = -1;
6282 c->querybuf = sdsempty();
6283 c->argc = 0;
6284 c->argv = NULL;
6285 c->flags = 0;
6286 /* We set the fake client as a slave waiting for the synchronization
6287 * so that Redis will not try to send replies to this client. */
6288 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6289 c->reply = listCreate();
6290 listSetFreeMethod(c->reply,decrRefCount);
6291 listSetDupMethod(c->reply,dupClientReplyValue);
6292 return c;
6293 }
6294
6295 static void freeFakeClient(struct redisClient *c) {
6296 sdsfree(c->querybuf);
6297 listRelease(c->reply);
6298 zfree(c);
6299 }
6300
6301 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6302 * error (the append only file is zero-length) REDIS_ERR is returned. On
6303 * fatal error an error message is logged and the program exists. */
6304 int loadAppendOnlyFile(char *filename) {
6305 struct redisClient *fakeClient;
6306 FILE *fp = fopen(filename,"r");
6307 struct redis_stat sb;
6308
6309 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6310 return REDIS_ERR;
6311
6312 if (fp == NULL) {
6313 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6314 exit(1);
6315 }
6316
6317 fakeClient = createFakeClient();
6318 while(1) {
6319 int argc, j;
6320 unsigned long len;
6321 robj **argv;
6322 char buf[128];
6323 sds argsds;
6324 struct redisCommand *cmd;
6325
6326 if (fgets(buf,sizeof(buf),fp) == NULL) {
6327 if (feof(fp))
6328 break;
6329 else
6330 goto readerr;
6331 }
6332 if (buf[0] != '*') goto fmterr;
6333 argc = atoi(buf+1);
6334 argv = zmalloc(sizeof(robj*)*argc);
6335 for (j = 0; j < argc; j++) {
6336 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6337 if (buf[0] != '$') goto fmterr;
6338 len = strtol(buf+1,NULL,10);
6339 argsds = sdsnewlen(NULL,len);
6340 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6341 argv[j] = createObject(REDIS_STRING,argsds);
6342 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6343 }
6344
6345 /* Command lookup */
6346 cmd = lookupCommand(argv[0]->ptr);
6347 if (!cmd) {
6348 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6349 exit(1);
6350 }
6351 /* Try object sharing and encoding */
6352 if (server.shareobjects) {
6353 int j;
6354 for(j = 1; j < argc; j++)
6355 argv[j] = tryObjectSharing(argv[j]);
6356 }
6357 if (cmd->flags & REDIS_CMD_BULK)
6358 tryObjectEncoding(argv[argc-1]);
6359 /* Run the command in the context of a fake client */
6360 fakeClient->argc = argc;
6361 fakeClient->argv = argv;
6362 cmd->proc(fakeClient);
6363 /* Discard the reply objects list from the fake client */
6364 while(listLength(fakeClient->reply))
6365 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6366 /* Clean up, ready for the next command */
6367 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6368 zfree(argv);
6369 }
6370 fclose(fp);
6371 freeFakeClient(fakeClient);
6372 return REDIS_OK;
6373
6374 readerr:
6375 if (feof(fp)) {
6376 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6377 } else {
6378 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6379 }
6380 exit(1);
6381 fmterr:
6382 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6383 exit(1);
6384 }
6385
6386 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6387 static int fwriteBulk(FILE *fp, robj *obj) {
6388 char buf[128];
6389 obj = getDecodedObject(obj);
6390 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6391 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6392 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6393 goto err;
6394 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6395 decrRefCount(obj);
6396 return 1;
6397 err:
6398 decrRefCount(obj);
6399 return 0;
6400 }
6401
6402 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6403 static int fwriteBulkDouble(FILE *fp, double d) {
6404 char buf[128], dbuf[128];
6405
6406 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6407 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6408 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6409 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6410 return 1;
6411 }
6412
6413 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6414 static int fwriteBulkLong(FILE *fp, long l) {
6415 char buf[128], lbuf[128];
6416
6417 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6418 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6419 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6420 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6421 return 1;
6422 }
6423
6424 /* Write a sequence of commands able to fully rebuild the dataset into
6425 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6426 static int rewriteAppendOnlyFile(char *filename) {
6427 dictIterator *di = NULL;
6428 dictEntry *de;
6429 FILE *fp;
6430 char tmpfile[256];
6431 int j;
6432 time_t now = time(NULL);
6433
6434 /* Note that we have to use a different temp name here compared to the
6435 * one used by rewriteAppendOnlyFileBackground() function. */
6436 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6437 fp = fopen(tmpfile,"w");
6438 if (!fp) {
6439 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6440 return REDIS_ERR;
6441 }
6442 for (j = 0; j < server.dbnum; j++) {
6443 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6444 redisDb *db = server.db+j;
6445 dict *d = db->dict;
6446 if (dictSize(d) == 0) continue;
6447 di = dictGetIterator(d);
6448 if (!di) {
6449 fclose(fp);
6450 return REDIS_ERR;
6451 }
6452
6453 /* SELECT the new DB */
6454 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6455 if (fwriteBulkLong(fp,j) == 0) goto werr;
6456
6457 /* Iterate this DB writing every entry */
6458 while((de = dictNext(di)) != NULL) {
6459 robj *key = dictGetEntryKey(de);
6460 robj *o = dictGetEntryVal(de);
6461 time_t expiretime = getExpire(db,key);
6462
6463 /* Save the key and associated value */
6464 if (o->type == REDIS_STRING) {
6465 /* Emit a SET command */
6466 char cmd[]="*3\r\n$3\r\nSET\r\n";
6467 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6468 /* Key and value */
6469 if (fwriteBulk(fp,key) == 0) goto werr;
6470 if (fwriteBulk(fp,o) == 0) goto werr;
6471 } else if (o->type == REDIS_LIST) {
6472 /* Emit the RPUSHes needed to rebuild the list */
6473 list *list = o->ptr;
6474 listNode *ln;
6475
6476 listRewind(list);
6477 while((ln = listYield(list))) {
6478 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6479 robj *eleobj = listNodeValue(ln);
6480
6481 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6482 if (fwriteBulk(fp,key) == 0) goto werr;
6483 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6484 }
6485 } else if (o->type == REDIS_SET) {
6486 /* Emit the SADDs needed to rebuild the set */
6487 dict *set = o->ptr;
6488 dictIterator *di = dictGetIterator(set);
6489 dictEntry *de;
6490
6491 while((de = dictNext(di)) != NULL) {
6492 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6493 robj *eleobj = dictGetEntryKey(de);
6494
6495 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6496 if (fwriteBulk(fp,key) == 0) goto werr;
6497 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6498 }
6499 dictReleaseIterator(di);
6500 } else if (o->type == REDIS_ZSET) {
6501 /* Emit the ZADDs needed to rebuild the sorted set */
6502 zset *zs = o->ptr;
6503 dictIterator *di = dictGetIterator(zs->dict);
6504 dictEntry *de;
6505
6506 while((de = dictNext(di)) != NULL) {
6507 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6508 robj *eleobj = dictGetEntryKey(de);
6509 double *score = dictGetEntryVal(de);
6510
6511 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6512 if (fwriteBulk(fp,key) == 0) goto werr;
6513 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6514 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6515 }
6516 dictReleaseIterator(di);
6517 } else {
6518 redisAssert(0 != 0);
6519 }
6520 /* Save the expire time */
6521 if (expiretime != -1) {
6522 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6523 /* If this key is already expired skip it */
6524 if (expiretime < now) continue;
6525 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6526 if (fwriteBulk(fp,key) == 0) goto werr;
6527 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6528 }
6529 }
6530 dictReleaseIterator(di);
6531 }
6532
6533 /* Make sure data will not remain on the OS's output buffers */
6534 fflush(fp);
6535 fsync(fileno(fp));
6536 fclose(fp);
6537
6538 /* Use RENAME to make sure the DB file is changed atomically only
6539 * if the generate DB file is ok. */
6540 if (rename(tmpfile,filename) == -1) {
6541 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6542 unlink(tmpfile);
6543 return REDIS_ERR;
6544 }
6545 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6546 return REDIS_OK;
6547
6548 werr:
6549 fclose(fp);
6550 unlink(tmpfile);
6551 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6552 if (di) dictReleaseIterator(di);
6553 return REDIS_ERR;
6554 }
6555
6556 /* This is how rewriting of the append only file in background works:
6557 *
6558 * 1) The user calls BGREWRITEAOF
6559 * 2) Redis calls this function, that forks():
6560 * 2a) the child rewrite the append only file in a temp file.
6561 * 2b) the parent accumulates differences in server.bgrewritebuf.
6562 * 3) When the child finished '2a' exists.
6563 * 4) The parent will trap the exit code, if it's OK, will append the
6564 * data accumulated into server.bgrewritebuf into the temp file, and
6565 * finally will rename(2) the temp file in the actual file name.
6566 * The the new file is reopened as the new append only file. Profit!
6567 */
6568 static int rewriteAppendOnlyFileBackground(void) {
6569 pid_t childpid;
6570
6571 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6572 if ((childpid = fork()) == 0) {
6573 /* Child */
6574 char tmpfile[256];
6575 close(server.fd);
6576
6577 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6578 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6579 exit(0);
6580 } else {
6581 exit(1);
6582 }
6583 } else {
6584 /* Parent */
6585 if (childpid == -1) {
6586 redisLog(REDIS_WARNING,
6587 "Can't rewrite append only file in background: fork: %s",
6588 strerror(errno));
6589 return REDIS_ERR;
6590 }
6591 redisLog(REDIS_NOTICE,
6592 "Background append only file rewriting started by pid %d",childpid);
6593 server.bgrewritechildpid = childpid;
6594 /* We set appendseldb to -1 in order to force the next call to the
6595 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6596 * accumulated by the parent into server.bgrewritebuf will start
6597 * with a SELECT statement and it will be safe to merge. */
6598 server.appendseldb = -1;
6599 return REDIS_OK;
6600 }
6601 return REDIS_OK; /* unreached */
6602 }
6603
6604 static void bgrewriteaofCommand(redisClient *c) {
6605 if (server.bgrewritechildpid != -1) {
6606 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6607 return;
6608 }
6609 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6610 char *status = "+Background append only file rewriting started\r\n";
6611 addReplySds(c,sdsnew(status));
6612 } else {
6613 addReply(c,shared.err);
6614 }
6615 }
6616
6617 static void aofRemoveTempFile(pid_t childpid) {
6618 char tmpfile[256];
6619
6620 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6621 unlink(tmpfile);
6622 }
6623
6624 /* =============================== Virtual Memory =========================== */
6625 static void vmInit(void) {
6626 off_t totsize;
6627
6628 server.vm_fp = fopen("/tmp/redisvm","w+b");
6629 if (server.vm_fp == NULL) {
6630 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6631 exit(1);
6632 }
6633 server.vm_fd = fileno(server.vm_fp);
6634 server.vm_next_page = 0;
6635 server.vm_near_pages = 0;
6636 totsize = server.vm_pages*server.vm_page_size;
6637 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6638 if (ftruncate(server.vm_fd,totsize) == -1) {
6639 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6640 strerror(errno));
6641 exit(1);
6642 } else {
6643 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6644 }
6645 server.vm_bitmap = zmalloc((server.vm_near_pages+7)/8);
6646 memset(server.vm_bitmap,0,(server.vm_near_pages+7)/8);
6647 /* Try to remove the swap file, so the OS will really delete it from the
6648 * file system when Redis exists. */
6649 unlink("/tmp/redisvm");
6650 }
6651
6652 /* Mark the page as used */
6653 static void vmMarkPageUsed(off_t page) {
6654 off_t byte = page/8;
6655 int bit = page&7;
6656 server.vm_bitmap[byte] |= 1<<bit;
6657 }
6658
6659 /* Mark N contiguous pages as used, with 'page' being the first. */
6660 static void vmMarkPagesUsed(off_t page, off_t count) {
6661 off_t j;
6662
6663 for (j = 0; j < count; j++)
6664 vmMarkPageUsed(page+count);
6665 }
6666
6667 /* Mark the page as free */
6668 static void vmMarkPageFree(off_t page) {
6669 off_t byte = page/8;
6670 int bit = page&7;
6671 server.vm_bitmap[byte] &= ~(1<<bit);
6672 }
6673
6674 /* Mark N contiguous pages as free, with 'page' being the first. */
6675 static void vmMarkPagesFree(off_t page, off_t count) {
6676 off_t j;
6677
6678 for (j = 0; j < count; j++)
6679 vmMarkPageFree(page+count);
6680 }
6681
6682 /* Test if the page is free */
6683 static int vmFreePage(off_t page) {
6684 off_t byte = page/8;
6685 int bit = page&7;
6686 return server.vm_bitmap[byte] & bit;
6687 }
6688
6689 /* Find N contiguous free pages storing the first page of the cluster in *first.
6690 * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
6691 * REDIS_ERR is returned.
6692 *
6693 * This function uses a simple algorithm: we try to allocate
6694 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6695 * again from the start of the swap file searching for free spaces.
6696 *
6697 * If it looks pretty clear that there are no free pages near our offset
6698 * we try to find less populated places doing a forward jump of
6699 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6700 * without hurry, and then we jump again and so forth...
6701 *
6702 * This function can be improved using a free list to avoid to guess
6703 * too much, since we could collect data about freed pages.
6704 *
6705 * note: I implemented this function just after watching an episode of
6706 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6707 */
6708 static int vmFindContiguousPages(off_t *first, int n) {
6709 off_t base, offset = 0, since_jump = 0, numfree = 0;
6710
6711 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6712 server.vm_near_pages = 0;
6713 server.vm_next_page = 0;
6714 }
6715 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6716 base = server.vm_next_page;
6717
6718 while(offset < server.vm_pages) {
6719 off_t this = base+offset;
6720
6721 /* If we overflow, restart from page zero */
6722 if (this >= server.vm_pages) {
6723 this -= server.vm_pages;
6724 if (this == 0) {
6725 /* Just overflowed, what we found on tail is no longer
6726 * interesting, as it's no longer contiguous. */
6727 numfree = 0;
6728 }
6729 }
6730 if (vmFreePage(this)) {
6731 /* This is a free page */
6732 numfree++;
6733 /* Already got N free pages? Return to the caller, with success */
6734 if (numfree == n) {
6735 *first = this;
6736 return REDIS_OK;
6737 }
6738 } else {
6739 /* The current one is not a free page */
6740 numfree = 0;
6741 }
6742
6743 /* Fast-forward if the current page is not free and we already
6744 * searched enough near this place. */
6745 since_jump++;
6746 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6747 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6748 since_jump = 0;
6749 /* Note that even if we rewind after the jump, we are don't need
6750 * to make sure numfree is set to zero as we only jump *if* it
6751 * is set to zero. */
6752 } else {
6753 /* Otherwise just check the next page */
6754 offset++;
6755 }
6756 }
6757 return REDIS_ERR;
6758 }
6759
6760 /* Swap the 'val' object relative to 'key' into disk. Store all the information
6761 * needed to later retrieve the object into the key object.
6762 * If we can't find enough contiguous empty pages to swap the object on disk
6763 * REDIS_ERR is returned. */
6764 static int vmSwapObject(robj *key, robj *val) {
6765 off_t pages = rdbSavedObjectPages(val);
6766 off_t page;
6767
6768 assert(key->storage == REDIS_VM_MEMORY);
6769 if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
6770 if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
6771 redisLog(REDIS_WARNING,
6772 "Critical VM problem in vmSwapObject(): can't seek: %s",
6773 strerror(errno));
6774 return REDIS_ERR;
6775 }
6776 rdbSaveObject(server.vm_fp,val);
6777 key->vm.page = page;
6778 key->vm.usedpages = pages;
6779 key->storage = REDIS_VM_SWAPPED;
6780 decrRefCount(val); /* Deallocate the object from memory. */
6781 vmMarkPagesUsed(page,pages);
6782 return REDIS_OK;
6783 }
6784
6785 /* Load the value object relative to the 'key' object from swap to memory.
6786 * The newly allocated object is returned. */
6787 static robj *vmLoadObject(robj *key) {
6788 robj *val;
6789
6790 assert(key->storage == REDIS_VM_SWAPPED);
6791 if (fseeko(server.vm_fp,key->vm.page*server.vm_page_size,SEEK_SET) == -1) {
6792 redisLog(REDIS_WARNING,
6793 "Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
6794 strerror(errno));
6795 exit(1);
6796 }
6797 val = rdbLoadObject(key->type,server.vm_fp);
6798 if (val == NULL) {
6799 redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno));
6800 exit(1);
6801 }
6802 key->storage = REDIS_VM_MEMORY;
6803 key->vm.atime = server.unixtime;
6804 vmMarkPagesFree(key->vm.page,key->vm.usedpages);
6805 return val;
6806 }
6807
6808 /* ================================= Debugging ============================== */
6809
6810 static void debugCommand(redisClient *c) {
6811 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6812 *((char*)-1) = 'x';
6813 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6814 if (rdbSave(server.dbfilename) != REDIS_OK) {
6815 addReply(c,shared.err);
6816 return;
6817 }
6818 emptyDb();
6819 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6820 addReply(c,shared.err);
6821 return;
6822 }
6823 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6824 addReply(c,shared.ok);
6825 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6826 emptyDb();
6827 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6828 addReply(c,shared.err);
6829 return;
6830 }
6831 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6832 addReply(c,shared.ok);
6833 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6834 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6835 robj *key, *val;
6836
6837 if (!de) {
6838 addReply(c,shared.nokeyerr);
6839 return;
6840 }
6841 key = dictGetEntryKey(de);
6842 val = dictGetEntryVal(de);
6843 addReplySds(c,sdscatprintf(sdsempty(),
6844 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6845 (void*)key, key->refcount, (void*)val, val->refcount,
6846 val->encoding, rdbSavedObjectLen(val)));
6847 } else {
6848 addReplySds(c,sdsnew(
6849 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6850 }
6851 }
6852
6853 static void _redisAssert(char *estr) {
6854 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6855 redisLog(REDIS_WARNING,"==> %s\n",estr);
6856 #ifdef HAVE_BACKTRACE
6857 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6858 *((char*)-1) = 'x';
6859 #endif
6860 }
6861
6862 /* =================================== Main! ================================ */
6863
6864 #ifdef __linux__
6865 int linuxOvercommitMemoryValue(void) {
6866 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6867 char buf[64];
6868
6869 if (!fp) return -1;
6870 if (fgets(buf,64,fp) == NULL) {
6871 fclose(fp);
6872 return -1;
6873 }
6874 fclose(fp);
6875
6876 return atoi(buf);
6877 }
6878
6879 void linuxOvercommitMemoryWarning(void) {
6880 if (linuxOvercommitMemoryValue() == 0) {
6881 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6882 }
6883 }
6884 #endif /* __linux__ */
6885
6886 static void daemonize(void) {
6887 int fd;
6888 FILE *fp;
6889
6890 if (fork() != 0) exit(0); /* parent exits */
6891 printf("New pid: %d\n", getpid());
6892 setsid(); /* create a new session */
6893
6894 /* Every output goes to /dev/null. If Redis is daemonized but
6895 * the 'logfile' is set to 'stdout' in the configuration file
6896 * it will not log at all. */
6897 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6898 dup2(fd, STDIN_FILENO);
6899 dup2(fd, STDOUT_FILENO);
6900 dup2(fd, STDERR_FILENO);
6901 if (fd > STDERR_FILENO) close(fd);
6902 }
6903 /* Try to write the pid file */
6904 fp = fopen(server.pidfile,"w");
6905 if (fp) {
6906 fprintf(fp,"%d\n",getpid());
6907 fclose(fp);
6908 }
6909 }
6910
6911 int main(int argc, char **argv) {
6912 initServerConfig();
6913 if (argc == 2) {
6914 resetServerSaveParams();
6915 loadServerConfig(argv[1]);
6916 } else if (argc > 2) {
6917 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6918 exit(1);
6919 } else {
6920 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6921 }
6922 if (server.daemonize) daemonize();
6923 initServer();
6924 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6925 #ifdef __linux__
6926 linuxOvercommitMemoryWarning();
6927 #endif
6928 if (server.appendonly) {
6929 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6930 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6931 } else {
6932 if (rdbLoad(server.dbfilename) == REDIS_OK)
6933 redisLog(REDIS_NOTICE,"DB loaded from disk");
6934 }
6935 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6936 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6937 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6938 aeMain(server.el);
6939 aeDeleteEventLoop(server.el);
6940 return 0;
6941 }
6942
6943 /* ============================= Backtrace support ========================= */
6944
6945 #ifdef HAVE_BACKTRACE
6946 static char *findFuncName(void *pointer, unsigned long *offset);
6947
6948 static void *getMcontextEip(ucontext_t *uc) {
6949 #if defined(__FreeBSD__)
6950 return (void*) uc->uc_mcontext.mc_eip;
6951 #elif defined(__dietlibc__)
6952 return (void*) uc->uc_mcontext.eip;
6953 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6954 #if __x86_64__
6955 return (void*) uc->uc_mcontext->__ss.__rip;
6956 #else
6957 return (void*) uc->uc_mcontext->__ss.__eip;
6958 #endif
6959 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6960 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6961 return (void*) uc->uc_mcontext->__ss.__rip;
6962 #else
6963 return (void*) uc->uc_mcontext->__ss.__eip;
6964 #endif
6965 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6966 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6967 #elif defined(__ia64__) /* Linux IA64 */
6968 return (void*) uc->uc_mcontext.sc_ip;
6969 #else
6970 return NULL;
6971 #endif
6972 }
6973
6974 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6975 void *trace[100];
6976 char **messages = NULL;
6977 int i, trace_size = 0;
6978 unsigned long offset=0;
6979 ucontext_t *uc = (ucontext_t*) secret;
6980 sds infostring;
6981 REDIS_NOTUSED(info);
6982
6983 redisLog(REDIS_WARNING,
6984 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6985 infostring = genRedisInfoString();
6986 redisLog(REDIS_WARNING, "%s",infostring);
6987 /* It's not safe to sdsfree() the returned string under memory
6988 * corruption conditions. Let it leak as we are going to abort */
6989
6990 trace_size = backtrace(trace, 100);
6991 /* overwrite sigaction with caller's address */
6992 if (getMcontextEip(uc) != NULL) {
6993 trace[1] = getMcontextEip(uc);
6994 }
6995 messages = backtrace_symbols(trace, trace_size);
6996
6997 for (i=1; i<trace_size; ++i) {
6998 char *fn = findFuncName(trace[i], &offset), *p;
6999
7000 p = strchr(messages[i],'+');
7001 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
7002 redisLog(REDIS_WARNING,"%s", messages[i]);
7003 } else {
7004 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
7005 }
7006 }
7007 /* free(messages); Don't call free() with possibly corrupted memory. */
7008 exit(0);
7009 }
7010
7011 static void setupSigSegvAction(void) {
7012 struct sigaction act;
7013
7014 sigemptyset (&act.sa_mask);
7015 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
7016 * is used. Otherwise, sa_handler is used */
7017 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
7018 act.sa_sigaction = segvHandler;
7019 sigaction (SIGSEGV, &act, NULL);
7020 sigaction (SIGBUS, &act, NULL);
7021 sigaction (SIGFPE, &act, NULL);
7022 sigaction (SIGILL, &act, NULL);
7023 sigaction (SIGBUS, &act, NULL);
7024 return;
7025 }
7026
7027 #include "staticsymbols.h"
7028 /* This function try to convert a pointer into a function name. It's used in
7029 * oreder to provide a backtrace under segmentation fault that's able to
7030 * display functions declared as static (otherwise the backtrace is useless). */
7031 static char *findFuncName(void *pointer, unsigned long *offset){
7032 int i, ret = -1;
7033 unsigned long off, minoff = 0;
7034
7035 /* Try to match against the Symbol with the smallest offset */
7036 for (i=0; symsTable[i].pointer; i++) {
7037 unsigned long lp = (unsigned long) pointer;
7038
7039 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
7040 off=lp-symsTable[i].pointer;
7041 if (ret < 0 || off < minoff) {
7042 minoff=off;
7043 ret=i;
7044 }
7045 }
7046 }
7047 if (ret == -1) return NULL;
7048 *offset = minoff;
7049 return symsTable[ret].name;
7050 }
7051 #else /* HAVE_BACKTRACE */
7052 static void setupSigSegvAction(void) {
7053 }
7054 #endif /* HAVE_BACKTRACE */
7055
7056
7057
7058 /* The End */
7059
7060
7061