]> git.saurik.com Git - redis.git/blob - redis.c
1e93a1e4cfdf8f678cfd40d999cbc1c6faa09406
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Virtual memory static configuration stuff.
162 * Check vmFindContiguousPages() to know more about this magic numbers. */
163 #define REDIS_VM_MAX_NEAR_PAGES 65536
164 #define REDIS_VM_MAX_RANDOM_JUMP 4096
165
166 /* Client flags */
167 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
168 #define REDIS_SLAVE 2 /* This client is a slave server */
169 #define REDIS_MASTER 4 /* This client is a master server */
170 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
171 #define REDIS_MULTI 16 /* This client is in a MULTI context */
172 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
173
174 /* Slave replication state - slave side */
175 #define REDIS_REPL_NONE 0 /* No active replication */
176 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
177 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
178
179 /* Slave replication state - from the point of view of master
180 * Note that in SEND_BULK and ONLINE state the slave receives new updates
181 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
182 * to start the next background saving in order to send updates to it. */
183 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
184 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
185 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
186 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
187
188 /* List related stuff */
189 #define REDIS_HEAD 0
190 #define REDIS_TAIL 1
191
192 /* Sort operations */
193 #define REDIS_SORT_GET 0
194 #define REDIS_SORT_ASC 1
195 #define REDIS_SORT_DESC 2
196 #define REDIS_SORTKEY_MAX 1024
197
198 /* Log levels */
199 #define REDIS_DEBUG 0
200 #define REDIS_NOTICE 1
201 #define REDIS_WARNING 2
202
203 /* Anti-warning macro... */
204 #define REDIS_NOTUSED(V) ((void) V)
205
206 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
207 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
208
209 /* Append only defines */
210 #define APPENDFSYNC_NO 0
211 #define APPENDFSYNC_ALWAYS 1
212 #define APPENDFSYNC_EVERYSEC 2
213
214 /* We can print the stacktrace, so our assert is defined this way: */
215 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
216 static void _redisAssert(char *estr);
217
218 /*================================= Data types ============================== */
219
220 /* A redis object, that is a type able to hold a string / list / set */
221
222 /* The VM object structure */
223 struct redisObjectVM {
224 off_t offset; /* the page at witch the object is stored on disk */
225 int pages; /* number of pages used on disk */
226 } vm;
227
228 /* The actual Redis Object */
229 typedef struct redisObject {
230 void *ptr;
231 unsigned char type;
232 unsigned char encoding;
233 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
234 unsigned char notused;
235 int refcount;
236 /* VM fields, this are only allocated if VM is active, otherwise the
237 * object allocation function will just allocate
238 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
239 * Redis without VM active will not have any overhead. */
240 struct redisObjectVM vm;
241 } robj;
242
243 /* Macro used to initalize a Redis object allocated on the stack.
244 * Note that this macro is taken near the structure definition to make sure
245 * we'll update it when the structure is changed, to avoid bugs like
246 * bug #85 introduced exactly in this way. */
247 #define initStaticStringObject(_var,_ptr) do { \
248 _var.refcount = 1; \
249 _var.type = REDIS_STRING; \
250 _var.encoding = REDIS_ENCODING_RAW; \
251 _var.ptr = _ptr; \
252 } while(0);
253
254 typedef struct redisDb {
255 dict *dict; /* The keyspace for this DB */
256 dict *expires; /* Timeout of keys with a timeout set */
257 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
258 int id;
259 } redisDb;
260
261 /* Client MULTI/EXEC state */
262 typedef struct multiCmd {
263 robj **argv;
264 int argc;
265 struct redisCommand *cmd;
266 } multiCmd;
267
268 typedef struct multiState {
269 multiCmd *commands; /* Array of MULTI commands */
270 int count; /* Total number of MULTI commands */
271 } multiState;
272
273 /* With multiplexing we need to take per-clinet state.
274 * Clients are taken in a liked list. */
275 typedef struct redisClient {
276 int fd;
277 redisDb *db;
278 int dictid;
279 sds querybuf;
280 robj **argv, **mbargv;
281 int argc, mbargc;
282 int bulklen; /* bulk read len. -1 if not in bulk read mode */
283 int multibulk; /* multi bulk command format active */
284 list *reply;
285 int sentlen;
286 time_t lastinteraction; /* time of the last interaction, used for timeout */
287 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
288 /* REDIS_MULTI */
289 int slaveseldb; /* slave selected db, if this client is a slave */
290 int authenticated; /* when requirepass is non-NULL */
291 int replstate; /* replication state if this is a slave */
292 int repldbfd; /* replication DB file descriptor */
293 long repldboff; /* replication DB file offset */
294 off_t repldbsize; /* replication DB file size */
295 multiState mstate; /* MULTI/EXEC state */
296 robj **blockingkeys; /* The key we waiting to terminate a blocking
297 * operation such as BLPOP. Otherwise NULL. */
298 int blockingkeysnum; /* Number of blocking keys */
299 time_t blockingto; /* Blocking operation timeout. If UNIX current time
300 * is >= blockingto then the operation timed out. */
301 } redisClient;
302
303 struct saveparam {
304 time_t seconds;
305 int changes;
306 };
307
308 /* Global server state structure */
309 struct redisServer {
310 int port;
311 int fd;
312 redisDb *db;
313 dict *sharingpool; /* Poll used for object sharing */
314 unsigned int sharingpoolsize;
315 long long dirty; /* changes to DB from the last save */
316 list *clients;
317 list *slaves, *monitors;
318 char neterr[ANET_ERR_LEN];
319 aeEventLoop *el;
320 int cronloops; /* number of times the cron function run */
321 list *objfreelist; /* A list of freed objects to avoid malloc() */
322 time_t lastsave; /* Unix time of last save succeeede */
323 size_t usedmemory; /* Used memory in megabytes */
324 /* Fields used only for stats */
325 time_t stat_starttime; /* server start time */
326 long long stat_numcommands; /* number of processed commands */
327 long long stat_numconnections; /* number of connections received */
328 /* Configuration */
329 int verbosity;
330 int glueoutputbuf;
331 int maxidletime;
332 int dbnum;
333 int daemonize;
334 int appendonly;
335 int appendfsync;
336 time_t lastfsync;
337 int appendfd;
338 int appendseldb;
339 char *pidfile;
340 pid_t bgsavechildpid;
341 pid_t bgrewritechildpid;
342 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
343 struct saveparam *saveparams;
344 int saveparamslen;
345 char *logfile;
346 char *bindaddr;
347 char *dbfilename;
348 char *appendfilename;
349 char *requirepass;
350 int shareobjects;
351 int rdbcompression;
352 /* Replication related */
353 int isslave;
354 char *masterauth;
355 char *masterhost;
356 int masterport;
357 redisClient *master; /* client that is master for this slave */
358 int replstate;
359 unsigned int maxclients;
360 unsigned long maxmemory;
361 unsigned int blockedclients;
362 /* Sort parameters - qsort_r() is only available under BSD so we
363 * have to take this state global, in order to pass it to sortCompare() */
364 int sort_desc;
365 int sort_alpha;
366 int sort_bypattern;
367 /* Virtual memory configuration */
368 int vm_enabled;
369 off_t vm_page_size;
370 off_t vm_pages;
371 long vm_max_memory;
372 /* Virtual memory state */
373 FILE *vm_fp;
374 int vm_fd;
375 off_t vm_next_page; /* Next probably empty page */
376 off_t vm_near_pages; /* Number of pages allocated sequentially */
377 unsigned char *vm_bitmap; /* Bitmap of free/used pages */
378 };
379
380 typedef void redisCommandProc(redisClient *c);
381 struct redisCommand {
382 char *name;
383 redisCommandProc *proc;
384 int arity;
385 int flags;
386 };
387
388 struct redisFunctionSym {
389 char *name;
390 unsigned long pointer;
391 };
392
393 typedef struct _redisSortObject {
394 robj *obj;
395 union {
396 double score;
397 robj *cmpobj;
398 } u;
399 } redisSortObject;
400
401 typedef struct _redisSortOperation {
402 int type;
403 robj *pattern;
404 } redisSortOperation;
405
406 /* ZSETs use a specialized version of Skiplists */
407
408 typedef struct zskiplistNode {
409 struct zskiplistNode **forward;
410 struct zskiplistNode *backward;
411 double score;
412 robj *obj;
413 } zskiplistNode;
414
415 typedef struct zskiplist {
416 struct zskiplistNode *header, *tail;
417 unsigned long length;
418 int level;
419 } zskiplist;
420
421 typedef struct zset {
422 dict *dict;
423 zskiplist *zsl;
424 } zset;
425
426 /* Our shared "common" objects */
427
428 struct sharedObjectsStruct {
429 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
430 *colon, *nullbulk, *nullmultibulk, *queued,
431 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
432 *outofrangeerr, *plus,
433 *select0, *select1, *select2, *select3, *select4,
434 *select5, *select6, *select7, *select8, *select9;
435 } shared;
436
437 /* Global vars that are actally used as constants. The following double
438 * values are used for double on-disk serialization, and are initialized
439 * at runtime to avoid strange compiler optimizations. */
440
441 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
442
443 /*================================ Prototypes =============================== */
444
445 static void freeStringObject(robj *o);
446 static void freeListObject(robj *o);
447 static void freeSetObject(robj *o);
448 static void decrRefCount(void *o);
449 static robj *createObject(int type, void *ptr);
450 static void freeClient(redisClient *c);
451 static int rdbLoad(char *filename);
452 static void addReply(redisClient *c, robj *obj);
453 static void addReplySds(redisClient *c, sds s);
454 static void incrRefCount(robj *o);
455 static int rdbSaveBackground(char *filename);
456 static robj *createStringObject(char *ptr, size_t len);
457 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
458 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
459 static int syncWithMaster(void);
460 static robj *tryObjectSharing(robj *o);
461 static int tryObjectEncoding(robj *o);
462 static robj *getDecodedObject(robj *o);
463 static int removeExpire(redisDb *db, robj *key);
464 static int expireIfNeeded(redisDb *db, robj *key);
465 static int deleteIfVolatile(redisDb *db, robj *key);
466 static int deleteKey(redisDb *db, robj *key);
467 static time_t getExpire(redisDb *db, robj *key);
468 static int setExpire(redisDb *db, robj *key, time_t when);
469 static void updateSlavesWaitingBgsave(int bgsaveerr);
470 static void freeMemoryIfNeeded(void);
471 static int processCommand(redisClient *c);
472 static void setupSigSegvAction(void);
473 static void rdbRemoveTempFile(pid_t childpid);
474 static void aofRemoveTempFile(pid_t childpid);
475 static size_t stringObjectLen(robj *o);
476 static void processInputBuffer(redisClient *c);
477 static zskiplist *zslCreate(void);
478 static void zslFree(zskiplist *zsl);
479 static void zslInsert(zskiplist *zsl, double score, robj *obj);
480 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
481 static void initClientMultiState(redisClient *c);
482 static void freeClientMultiState(redisClient *c);
483 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
484 static void unblockClient(redisClient *c);
485 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
486 static void vmInit(void);
487
488 static void authCommand(redisClient *c);
489 static void pingCommand(redisClient *c);
490 static void echoCommand(redisClient *c);
491 static void setCommand(redisClient *c);
492 static void setnxCommand(redisClient *c);
493 static void getCommand(redisClient *c);
494 static void delCommand(redisClient *c);
495 static void existsCommand(redisClient *c);
496 static void incrCommand(redisClient *c);
497 static void decrCommand(redisClient *c);
498 static void incrbyCommand(redisClient *c);
499 static void decrbyCommand(redisClient *c);
500 static void selectCommand(redisClient *c);
501 static void randomkeyCommand(redisClient *c);
502 static void keysCommand(redisClient *c);
503 static void dbsizeCommand(redisClient *c);
504 static void lastsaveCommand(redisClient *c);
505 static void saveCommand(redisClient *c);
506 static void bgsaveCommand(redisClient *c);
507 static void bgrewriteaofCommand(redisClient *c);
508 static void shutdownCommand(redisClient *c);
509 static void moveCommand(redisClient *c);
510 static void renameCommand(redisClient *c);
511 static void renamenxCommand(redisClient *c);
512 static void lpushCommand(redisClient *c);
513 static void rpushCommand(redisClient *c);
514 static void lpopCommand(redisClient *c);
515 static void rpopCommand(redisClient *c);
516 static void llenCommand(redisClient *c);
517 static void lindexCommand(redisClient *c);
518 static void lrangeCommand(redisClient *c);
519 static void ltrimCommand(redisClient *c);
520 static void typeCommand(redisClient *c);
521 static void lsetCommand(redisClient *c);
522 static void saddCommand(redisClient *c);
523 static void sremCommand(redisClient *c);
524 static void smoveCommand(redisClient *c);
525 static void sismemberCommand(redisClient *c);
526 static void scardCommand(redisClient *c);
527 static void spopCommand(redisClient *c);
528 static void srandmemberCommand(redisClient *c);
529 static void sinterCommand(redisClient *c);
530 static void sinterstoreCommand(redisClient *c);
531 static void sunionCommand(redisClient *c);
532 static void sunionstoreCommand(redisClient *c);
533 static void sdiffCommand(redisClient *c);
534 static void sdiffstoreCommand(redisClient *c);
535 static void syncCommand(redisClient *c);
536 static void flushdbCommand(redisClient *c);
537 static void flushallCommand(redisClient *c);
538 static void sortCommand(redisClient *c);
539 static void lremCommand(redisClient *c);
540 static void rpoplpushcommand(redisClient *c);
541 static void infoCommand(redisClient *c);
542 static void mgetCommand(redisClient *c);
543 static void monitorCommand(redisClient *c);
544 static void expireCommand(redisClient *c);
545 static void expireatCommand(redisClient *c);
546 static void getsetCommand(redisClient *c);
547 static void ttlCommand(redisClient *c);
548 static void slaveofCommand(redisClient *c);
549 static void debugCommand(redisClient *c);
550 static void msetCommand(redisClient *c);
551 static void msetnxCommand(redisClient *c);
552 static void zaddCommand(redisClient *c);
553 static void zincrbyCommand(redisClient *c);
554 static void zrangeCommand(redisClient *c);
555 static void zrangebyscoreCommand(redisClient *c);
556 static void zrevrangeCommand(redisClient *c);
557 static void zcardCommand(redisClient *c);
558 static void zremCommand(redisClient *c);
559 static void zscoreCommand(redisClient *c);
560 static void zremrangebyscoreCommand(redisClient *c);
561 static void multiCommand(redisClient *c);
562 static void execCommand(redisClient *c);
563 static void blpopCommand(redisClient *c);
564 static void brpopCommand(redisClient *c);
565
566 /*================================= Globals ================================= */
567
568 /* Global vars */
569 static struct redisServer server; /* server global state */
570 static struct redisCommand cmdTable[] = {
571 {"get",getCommand,2,REDIS_CMD_INLINE},
572 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
573 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
574 {"del",delCommand,-2,REDIS_CMD_INLINE},
575 {"exists",existsCommand,2,REDIS_CMD_INLINE},
576 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
577 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
578 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
579 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
580 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
582 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
583 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
584 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
585 {"llen",llenCommand,2,REDIS_CMD_INLINE},
586 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
587 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
588 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
589 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
590 {"lrem",lremCommand,4,REDIS_CMD_BULK},
591 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
592 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
593 {"srem",sremCommand,3,REDIS_CMD_BULK},
594 {"smove",smoveCommand,4,REDIS_CMD_BULK},
595 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
596 {"scard",scardCommand,2,REDIS_CMD_INLINE},
597 {"spop",spopCommand,2,REDIS_CMD_INLINE},
598 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
599 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
600 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
601 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
602 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
603 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
604 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
605 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
606 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
607 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
608 {"zrem",zremCommand,3,REDIS_CMD_BULK},
609 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
610 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
611 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
612 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
613 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
614 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
615 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
616 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
617 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
618 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
619 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
620 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
621 {"select",selectCommand,2,REDIS_CMD_INLINE},
622 {"move",moveCommand,3,REDIS_CMD_INLINE},
623 {"rename",renameCommand,3,REDIS_CMD_INLINE},
624 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
625 {"expire",expireCommand,3,REDIS_CMD_INLINE},
626 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
627 {"keys",keysCommand,2,REDIS_CMD_INLINE},
628 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
629 {"auth",authCommand,2,REDIS_CMD_INLINE},
630 {"ping",pingCommand,1,REDIS_CMD_INLINE},
631 {"echo",echoCommand,2,REDIS_CMD_BULK},
632 {"save",saveCommand,1,REDIS_CMD_INLINE},
633 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
634 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
635 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
636 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
637 {"type",typeCommand,2,REDIS_CMD_INLINE},
638 {"multi",multiCommand,1,REDIS_CMD_INLINE},
639 {"exec",execCommand,1,REDIS_CMD_INLINE},
640 {"sync",syncCommand,1,REDIS_CMD_INLINE},
641 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
642 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
643 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
644 {"info",infoCommand,1,REDIS_CMD_INLINE},
645 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
646 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
647 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
648 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
649 {NULL,NULL,0,0}
650 };
651
652 /*============================ Utility functions ============================ */
653
654 /* Glob-style pattern matching. */
655 int stringmatchlen(const char *pattern, int patternLen,
656 const char *string, int stringLen, int nocase)
657 {
658 while(patternLen) {
659 switch(pattern[0]) {
660 case '*':
661 while (pattern[1] == '*') {
662 pattern++;
663 patternLen--;
664 }
665 if (patternLen == 1)
666 return 1; /* match */
667 while(stringLen) {
668 if (stringmatchlen(pattern+1, patternLen-1,
669 string, stringLen, nocase))
670 return 1; /* match */
671 string++;
672 stringLen--;
673 }
674 return 0; /* no match */
675 break;
676 case '?':
677 if (stringLen == 0)
678 return 0; /* no match */
679 string++;
680 stringLen--;
681 break;
682 case '[':
683 {
684 int not, match;
685
686 pattern++;
687 patternLen--;
688 not = pattern[0] == '^';
689 if (not) {
690 pattern++;
691 patternLen--;
692 }
693 match = 0;
694 while(1) {
695 if (pattern[0] == '\\') {
696 pattern++;
697 patternLen--;
698 if (pattern[0] == string[0])
699 match = 1;
700 } else if (pattern[0] == ']') {
701 break;
702 } else if (patternLen == 0) {
703 pattern--;
704 patternLen++;
705 break;
706 } else if (pattern[1] == '-' && patternLen >= 3) {
707 int start = pattern[0];
708 int end = pattern[2];
709 int c = string[0];
710 if (start > end) {
711 int t = start;
712 start = end;
713 end = t;
714 }
715 if (nocase) {
716 start = tolower(start);
717 end = tolower(end);
718 c = tolower(c);
719 }
720 pattern += 2;
721 patternLen -= 2;
722 if (c >= start && c <= end)
723 match = 1;
724 } else {
725 if (!nocase) {
726 if (pattern[0] == string[0])
727 match = 1;
728 } else {
729 if (tolower((int)pattern[0]) == tolower((int)string[0]))
730 match = 1;
731 }
732 }
733 pattern++;
734 patternLen--;
735 }
736 if (not)
737 match = !match;
738 if (!match)
739 return 0; /* no match */
740 string++;
741 stringLen--;
742 break;
743 }
744 case '\\':
745 if (patternLen >= 2) {
746 pattern++;
747 patternLen--;
748 }
749 /* fall through */
750 default:
751 if (!nocase) {
752 if (pattern[0] != string[0])
753 return 0; /* no match */
754 } else {
755 if (tolower((int)pattern[0]) != tolower((int)string[0]))
756 return 0; /* no match */
757 }
758 string++;
759 stringLen--;
760 break;
761 }
762 pattern++;
763 patternLen--;
764 if (stringLen == 0) {
765 while(*pattern == '*') {
766 pattern++;
767 patternLen--;
768 }
769 break;
770 }
771 }
772 if (patternLen == 0 && stringLen == 0)
773 return 1;
774 return 0;
775 }
776
777 static void redisLog(int level, const char *fmt, ...) {
778 va_list ap;
779 FILE *fp;
780
781 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
782 if (!fp) return;
783
784 va_start(ap, fmt);
785 if (level >= server.verbosity) {
786 char *c = ".-*";
787 char buf[64];
788 time_t now;
789
790 now = time(NULL);
791 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
792 fprintf(fp,"%s %c ",buf,c[level]);
793 vfprintf(fp, fmt, ap);
794 fprintf(fp,"\n");
795 fflush(fp);
796 }
797 va_end(ap);
798
799 if (server.logfile) fclose(fp);
800 }
801
802 /*====================== Hash table type implementation ==================== */
803
804 /* This is an hash table type that uses the SDS dynamic strings libary as
805 * keys and radis objects as values (objects can hold SDS strings,
806 * lists, sets). */
807
808 static void dictVanillaFree(void *privdata, void *val)
809 {
810 DICT_NOTUSED(privdata);
811 zfree(val);
812 }
813
814 static void dictListDestructor(void *privdata, void *val)
815 {
816 DICT_NOTUSED(privdata);
817 listRelease((list*)val);
818 }
819
820 static int sdsDictKeyCompare(void *privdata, const void *key1,
821 const void *key2)
822 {
823 int l1,l2;
824 DICT_NOTUSED(privdata);
825
826 l1 = sdslen((sds)key1);
827 l2 = sdslen((sds)key2);
828 if (l1 != l2) return 0;
829 return memcmp(key1, key2, l1) == 0;
830 }
831
832 static void dictRedisObjectDestructor(void *privdata, void *val)
833 {
834 DICT_NOTUSED(privdata);
835
836 decrRefCount(val);
837 }
838
839 static int dictObjKeyCompare(void *privdata, const void *key1,
840 const void *key2)
841 {
842 const robj *o1 = key1, *o2 = key2;
843 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
844 }
845
846 static unsigned int dictObjHash(const void *key) {
847 const robj *o = key;
848 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
849 }
850
851 static int dictEncObjKeyCompare(void *privdata, const void *key1,
852 const void *key2)
853 {
854 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
855 int cmp;
856
857 o1 = getDecodedObject(o1);
858 o2 = getDecodedObject(o2);
859 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
860 decrRefCount(o1);
861 decrRefCount(o2);
862 return cmp;
863 }
864
865 static unsigned int dictEncObjHash(const void *key) {
866 robj *o = (robj*) key;
867
868 o = getDecodedObject(o);
869 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
870 decrRefCount(o);
871 return hash;
872 }
873
874 static dictType setDictType = {
875 dictEncObjHash, /* hash function */
876 NULL, /* key dup */
877 NULL, /* val dup */
878 dictEncObjKeyCompare, /* key compare */
879 dictRedisObjectDestructor, /* key destructor */
880 NULL /* val destructor */
881 };
882
883 static dictType zsetDictType = {
884 dictEncObjHash, /* hash function */
885 NULL, /* key dup */
886 NULL, /* val dup */
887 dictEncObjKeyCompare, /* key compare */
888 dictRedisObjectDestructor, /* key destructor */
889 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
890 };
891
892 static dictType hashDictType = {
893 dictObjHash, /* hash function */
894 NULL, /* key dup */
895 NULL, /* val dup */
896 dictObjKeyCompare, /* key compare */
897 dictRedisObjectDestructor, /* key destructor */
898 dictRedisObjectDestructor /* val destructor */
899 };
900
901 /* Keylist hash table type has unencoded redis objects as keys and
902 * lists as values. It's used for blocking operations (BLPOP) */
903 static dictType keylistDictType = {
904 dictObjHash, /* hash function */
905 NULL, /* key dup */
906 NULL, /* val dup */
907 dictObjKeyCompare, /* key compare */
908 dictRedisObjectDestructor, /* key destructor */
909 dictListDestructor /* val destructor */
910 };
911
912 /* ========================= Random utility functions ======================= */
913
914 /* Redis generally does not try to recover from out of memory conditions
915 * when allocating objects or strings, it is not clear if it will be possible
916 * to report this condition to the client since the networking layer itself
917 * is based on heap allocation for send buffers, so we simply abort.
918 * At least the code will be simpler to read... */
919 static void oom(const char *msg) {
920 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
921 sleep(1);
922 abort();
923 }
924
925 /* ====================== Redis server networking stuff ===================== */
926 static void closeTimedoutClients(void) {
927 redisClient *c;
928 listNode *ln;
929 time_t now = time(NULL);
930
931 listRewind(server.clients);
932 while ((ln = listYield(server.clients)) != NULL) {
933 c = listNodeValue(ln);
934 if (server.maxidletime &&
935 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
936 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
937 (now - c->lastinteraction > server.maxidletime))
938 {
939 redisLog(REDIS_DEBUG,"Closing idle client");
940 freeClient(c);
941 } else if (c->flags & REDIS_BLOCKED) {
942 if (c->blockingto != 0 && c->blockingto < now) {
943 addReply(c,shared.nullmultibulk);
944 unblockClient(c);
945 }
946 }
947 }
948 }
949
950 static int htNeedsResize(dict *dict) {
951 long long size, used;
952
953 size = dictSlots(dict);
954 used = dictSize(dict);
955 return (size && used && size > DICT_HT_INITIAL_SIZE &&
956 (used*100/size < REDIS_HT_MINFILL));
957 }
958
959 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
960 * we resize the hash table to save memory */
961 static void tryResizeHashTables(void) {
962 int j;
963
964 for (j = 0; j < server.dbnum; j++) {
965 if (htNeedsResize(server.db[j].dict)) {
966 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
967 dictResize(server.db[j].dict);
968 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
969 }
970 if (htNeedsResize(server.db[j].expires))
971 dictResize(server.db[j].expires);
972 }
973 }
974
975 /* A background saving child (BGSAVE) terminated its work. Handle this. */
976 void backgroundSaveDoneHandler(int statloc) {
977 int exitcode = WEXITSTATUS(statloc);
978 int bysignal = WIFSIGNALED(statloc);
979
980 if (!bysignal && exitcode == 0) {
981 redisLog(REDIS_NOTICE,
982 "Background saving terminated with success");
983 server.dirty = 0;
984 server.lastsave = time(NULL);
985 } else if (!bysignal && exitcode != 0) {
986 redisLog(REDIS_WARNING, "Background saving error");
987 } else {
988 redisLog(REDIS_WARNING,
989 "Background saving terminated by signal");
990 rdbRemoveTempFile(server.bgsavechildpid);
991 }
992 server.bgsavechildpid = -1;
993 /* Possibly there are slaves waiting for a BGSAVE in order to be served
994 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
995 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
996 }
997
998 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
999 * Handle this. */
1000 void backgroundRewriteDoneHandler(int statloc) {
1001 int exitcode = WEXITSTATUS(statloc);
1002 int bysignal = WIFSIGNALED(statloc);
1003
1004 if (!bysignal && exitcode == 0) {
1005 int fd;
1006 char tmpfile[256];
1007
1008 redisLog(REDIS_NOTICE,
1009 "Background append only file rewriting terminated with success");
1010 /* Now it's time to flush the differences accumulated by the parent */
1011 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1012 fd = open(tmpfile,O_WRONLY|O_APPEND);
1013 if (fd == -1) {
1014 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1015 goto cleanup;
1016 }
1017 /* Flush our data... */
1018 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1019 (signed) sdslen(server.bgrewritebuf)) {
1020 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1021 close(fd);
1022 goto cleanup;
1023 }
1024 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1025 /* Now our work is to rename the temp file into the stable file. And
1026 * switch the file descriptor used by the server for append only. */
1027 if (rename(tmpfile,server.appendfilename) == -1) {
1028 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1029 close(fd);
1030 goto cleanup;
1031 }
1032 /* Mission completed... almost */
1033 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1034 if (server.appendfd != -1) {
1035 /* If append only is actually enabled... */
1036 close(server.appendfd);
1037 server.appendfd = fd;
1038 fsync(fd);
1039 server.appendseldb = -1; /* Make sure it will issue SELECT */
1040 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1041 } else {
1042 /* If append only is disabled we just generate a dump in this
1043 * format. Why not? */
1044 close(fd);
1045 }
1046 } else if (!bysignal && exitcode != 0) {
1047 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1048 } else {
1049 redisLog(REDIS_WARNING,
1050 "Background append only file rewriting terminated by signal");
1051 }
1052 cleanup:
1053 sdsfree(server.bgrewritebuf);
1054 server.bgrewritebuf = sdsempty();
1055 aofRemoveTempFile(server.bgrewritechildpid);
1056 server.bgrewritechildpid = -1;
1057 }
1058
1059 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1060 int j, loops = server.cronloops++;
1061 REDIS_NOTUSED(eventLoop);
1062 REDIS_NOTUSED(id);
1063 REDIS_NOTUSED(clientData);
1064
1065 /* Update the global state with the amount of used memory */
1066 server.usedmemory = zmalloc_used_memory();
1067
1068 /* Show some info about non-empty databases */
1069 for (j = 0; j < server.dbnum; j++) {
1070 long long size, used, vkeys;
1071
1072 size = dictSlots(server.db[j].dict);
1073 used = dictSize(server.db[j].dict);
1074 vkeys = dictSize(server.db[j].expires);
1075 if (!(loops % 5) && (used || vkeys)) {
1076 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1077 /* dictPrintStats(server.dict); */
1078 }
1079 }
1080
1081 /* We don't want to resize the hash tables while a bacground saving
1082 * is in progress: the saving child is created using fork() that is
1083 * implemented with a copy-on-write semantic in most modern systems, so
1084 * if we resize the HT while there is the saving child at work actually
1085 * a lot of memory movements in the parent will cause a lot of pages
1086 * copied. */
1087 if (server.bgsavechildpid == -1) tryResizeHashTables();
1088
1089 /* Show information about connected clients */
1090 if (!(loops % 5)) {
1091 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1092 listLength(server.clients)-listLength(server.slaves),
1093 listLength(server.slaves),
1094 server.usedmemory,
1095 dictSize(server.sharingpool));
1096 }
1097
1098 /* Close connections of timedout clients */
1099 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1100 closeTimedoutClients();
1101
1102 /* Check if a background saving or AOF rewrite in progress terminated */
1103 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1104 int statloc;
1105 pid_t pid;
1106
1107 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1108 if (pid == server.bgsavechildpid) {
1109 backgroundSaveDoneHandler(statloc);
1110 } else {
1111 backgroundRewriteDoneHandler(statloc);
1112 }
1113 }
1114 } else {
1115 /* If there is not a background saving in progress check if
1116 * we have to save now */
1117 time_t now = time(NULL);
1118 for (j = 0; j < server.saveparamslen; j++) {
1119 struct saveparam *sp = server.saveparams+j;
1120
1121 if (server.dirty >= sp->changes &&
1122 now-server.lastsave > sp->seconds) {
1123 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1124 sp->changes, sp->seconds);
1125 rdbSaveBackground(server.dbfilename);
1126 break;
1127 }
1128 }
1129 }
1130
1131 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1132 * will use few CPU cycles if there are few expiring keys, otherwise
1133 * it will get more aggressive to avoid that too much memory is used by
1134 * keys that can be removed from the keyspace. */
1135 for (j = 0; j < server.dbnum; j++) {
1136 int expired;
1137 redisDb *db = server.db+j;
1138
1139 /* Continue to expire if at the end of the cycle more than 25%
1140 * of the keys were expired. */
1141 do {
1142 int num = dictSize(db->expires);
1143 time_t now = time(NULL);
1144
1145 expired = 0;
1146 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1147 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1148 while (num--) {
1149 dictEntry *de;
1150 time_t t;
1151
1152 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1153 t = (time_t) dictGetEntryVal(de);
1154 if (now > t) {
1155 deleteKey(db,dictGetEntryKey(de));
1156 expired++;
1157 }
1158 }
1159 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1160 }
1161
1162 /* Check if we should connect to a MASTER */
1163 if (server.replstate == REDIS_REPL_CONNECT) {
1164 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1165 if (syncWithMaster() == REDIS_OK) {
1166 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1167 }
1168 }
1169 return 1000;
1170 }
1171
1172 static void createSharedObjects(void) {
1173 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1174 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1175 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1176 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1177 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1178 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1179 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1180 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1181 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1182 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1183 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1184 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1185 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1186 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1187 "-ERR no such key\r\n"));
1188 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1189 "-ERR syntax error\r\n"));
1190 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1191 "-ERR source and destination objects are the same\r\n"));
1192 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1193 "-ERR index out of range\r\n"));
1194 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1195 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1196 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1197 shared.select0 = createStringObject("select 0\r\n",10);
1198 shared.select1 = createStringObject("select 1\r\n",10);
1199 shared.select2 = createStringObject("select 2\r\n",10);
1200 shared.select3 = createStringObject("select 3\r\n",10);
1201 shared.select4 = createStringObject("select 4\r\n",10);
1202 shared.select5 = createStringObject("select 5\r\n",10);
1203 shared.select6 = createStringObject("select 6\r\n",10);
1204 shared.select7 = createStringObject("select 7\r\n",10);
1205 shared.select8 = createStringObject("select 8\r\n",10);
1206 shared.select9 = createStringObject("select 9\r\n",10);
1207 }
1208
1209 static void appendServerSaveParams(time_t seconds, int changes) {
1210 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1211 server.saveparams[server.saveparamslen].seconds = seconds;
1212 server.saveparams[server.saveparamslen].changes = changes;
1213 server.saveparamslen++;
1214 }
1215
1216 static void resetServerSaveParams() {
1217 zfree(server.saveparams);
1218 server.saveparams = NULL;
1219 server.saveparamslen = 0;
1220 }
1221
1222 static void initServerConfig() {
1223 server.dbnum = REDIS_DEFAULT_DBNUM;
1224 server.port = REDIS_SERVERPORT;
1225 server.verbosity = REDIS_DEBUG;
1226 server.maxidletime = REDIS_MAXIDLETIME;
1227 server.saveparams = NULL;
1228 server.logfile = NULL; /* NULL = log on standard output */
1229 server.bindaddr = NULL;
1230 server.glueoutputbuf = 1;
1231 server.daemonize = 0;
1232 server.appendonly = 0;
1233 server.appendfsync = APPENDFSYNC_ALWAYS;
1234 server.lastfsync = time(NULL);
1235 server.appendfd = -1;
1236 server.appendseldb = -1; /* Make sure the first time will not match */
1237 server.pidfile = "/var/run/redis.pid";
1238 server.dbfilename = "dump.rdb";
1239 server.appendfilename = "appendonly.aof";
1240 server.requirepass = NULL;
1241 server.shareobjects = 0;
1242 server.rdbcompression = 1;
1243 server.sharingpoolsize = 1024;
1244 server.maxclients = 0;
1245 server.blockedclients = 0;
1246 server.maxmemory = 0;
1247 server.vm_enabled = 0;
1248 server.vm_page_size = 256; /* 256 bytes per page */
1249 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1250 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1251
1252 resetServerSaveParams();
1253
1254 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1255 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1256 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1257 /* Replication related */
1258 server.isslave = 0;
1259 server.masterauth = NULL;
1260 server.masterhost = NULL;
1261 server.masterport = 6379;
1262 server.master = NULL;
1263 server.replstate = REDIS_REPL_NONE;
1264
1265 /* Double constants initialization */
1266 R_Zero = 0.0;
1267 R_PosInf = 1.0/R_Zero;
1268 R_NegInf = -1.0/R_Zero;
1269 R_Nan = R_Zero/R_Zero;
1270 }
1271
1272 static void initServer() {
1273 int j;
1274
1275 signal(SIGHUP, SIG_IGN);
1276 signal(SIGPIPE, SIG_IGN);
1277 setupSigSegvAction();
1278
1279 server.clients = listCreate();
1280 server.slaves = listCreate();
1281 server.monitors = listCreate();
1282 server.objfreelist = listCreate();
1283 createSharedObjects();
1284 server.el = aeCreateEventLoop();
1285 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1286 server.sharingpool = dictCreate(&setDictType,NULL);
1287 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1288 if (server.fd == -1) {
1289 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1290 exit(1);
1291 }
1292 for (j = 0; j < server.dbnum; j++) {
1293 server.db[j].dict = dictCreate(&hashDictType,NULL);
1294 server.db[j].expires = dictCreate(&setDictType,NULL);
1295 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1296 server.db[j].id = j;
1297 }
1298 server.cronloops = 0;
1299 server.bgsavechildpid = -1;
1300 server.bgrewritechildpid = -1;
1301 server.bgrewritebuf = sdsempty();
1302 server.lastsave = time(NULL);
1303 server.dirty = 0;
1304 server.usedmemory = 0;
1305 server.stat_numcommands = 0;
1306 server.stat_numconnections = 0;
1307 server.stat_starttime = time(NULL);
1308 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1309
1310 if (server.appendonly) {
1311 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1312 if (server.appendfd == -1) {
1313 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1314 strerror(errno));
1315 exit(1);
1316 }
1317 }
1318
1319 if (server.vm_enabled) vmInit();
1320 }
1321
1322 /* Empty the whole database */
1323 static long long emptyDb() {
1324 int j;
1325 long long removed = 0;
1326
1327 for (j = 0; j < server.dbnum; j++) {
1328 removed += dictSize(server.db[j].dict);
1329 dictEmpty(server.db[j].dict);
1330 dictEmpty(server.db[j].expires);
1331 }
1332 return removed;
1333 }
1334
1335 static int yesnotoi(char *s) {
1336 if (!strcasecmp(s,"yes")) return 1;
1337 else if (!strcasecmp(s,"no")) return 0;
1338 else return -1;
1339 }
1340
1341 /* I agree, this is a very rudimental way to load a configuration...
1342 will improve later if the config gets more complex */
1343 static void loadServerConfig(char *filename) {
1344 FILE *fp;
1345 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1346 int linenum = 0;
1347 sds line = NULL;
1348
1349 if (filename[0] == '-' && filename[1] == '\0')
1350 fp = stdin;
1351 else {
1352 if ((fp = fopen(filename,"r")) == NULL) {
1353 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1354 exit(1);
1355 }
1356 }
1357
1358 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1359 sds *argv;
1360 int argc, j;
1361
1362 linenum++;
1363 line = sdsnew(buf);
1364 line = sdstrim(line," \t\r\n");
1365
1366 /* Skip comments and blank lines*/
1367 if (line[0] == '#' || line[0] == '\0') {
1368 sdsfree(line);
1369 continue;
1370 }
1371
1372 /* Split into arguments */
1373 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1374 sdstolower(argv[0]);
1375
1376 /* Execute config directives */
1377 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1378 server.maxidletime = atoi(argv[1]);
1379 if (server.maxidletime < 0) {
1380 err = "Invalid timeout value"; goto loaderr;
1381 }
1382 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1383 server.port = atoi(argv[1]);
1384 if (server.port < 1 || server.port > 65535) {
1385 err = "Invalid port"; goto loaderr;
1386 }
1387 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1388 server.bindaddr = zstrdup(argv[1]);
1389 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1390 int seconds = atoi(argv[1]);
1391 int changes = atoi(argv[2]);
1392 if (seconds < 1 || changes < 0) {
1393 err = "Invalid save parameters"; goto loaderr;
1394 }
1395 appendServerSaveParams(seconds,changes);
1396 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1397 if (chdir(argv[1]) == -1) {
1398 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1399 argv[1], strerror(errno));
1400 exit(1);
1401 }
1402 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1403 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1404 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1405 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1406 else {
1407 err = "Invalid log level. Must be one of debug, notice, warning";
1408 goto loaderr;
1409 }
1410 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1411 FILE *logfp;
1412
1413 server.logfile = zstrdup(argv[1]);
1414 if (!strcasecmp(server.logfile,"stdout")) {
1415 zfree(server.logfile);
1416 server.logfile = NULL;
1417 }
1418 if (server.logfile) {
1419 /* Test if we are able to open the file. The server will not
1420 * be able to abort just for this problem later... */
1421 logfp = fopen(server.logfile,"a");
1422 if (logfp == NULL) {
1423 err = sdscatprintf(sdsempty(),
1424 "Can't open the log file: %s", strerror(errno));
1425 goto loaderr;
1426 }
1427 fclose(logfp);
1428 }
1429 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1430 server.dbnum = atoi(argv[1]);
1431 if (server.dbnum < 1) {
1432 err = "Invalid number of databases"; goto loaderr;
1433 }
1434 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1435 server.maxclients = atoi(argv[1]);
1436 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1437 server.maxmemory = strtoll(argv[1], NULL, 10);
1438 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1439 server.masterhost = sdsnew(argv[1]);
1440 server.masterport = atoi(argv[2]);
1441 server.replstate = REDIS_REPL_CONNECT;
1442 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1443 server.masterauth = zstrdup(argv[1]);
1444 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1445 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1446 err = "argument must be 'yes' or 'no'"; goto loaderr;
1447 }
1448 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1449 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1450 err = "argument must be 'yes' or 'no'"; goto loaderr;
1451 }
1452 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1453 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1454 err = "argument must be 'yes' or 'no'"; goto loaderr;
1455 }
1456 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1457 server.sharingpoolsize = atoi(argv[1]);
1458 if (server.sharingpoolsize < 1) {
1459 err = "invalid object sharing pool size"; goto loaderr;
1460 }
1461 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1462 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1463 err = "argument must be 'yes' or 'no'"; goto loaderr;
1464 }
1465 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1466 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1467 err = "argument must be 'yes' or 'no'"; goto loaderr;
1468 }
1469 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1470 if (!strcasecmp(argv[1],"no")) {
1471 server.appendfsync = APPENDFSYNC_NO;
1472 } else if (!strcasecmp(argv[1],"always")) {
1473 server.appendfsync = APPENDFSYNC_ALWAYS;
1474 } else if (!strcasecmp(argv[1],"everysec")) {
1475 server.appendfsync = APPENDFSYNC_EVERYSEC;
1476 } else {
1477 err = "argument must be 'no', 'always' or 'everysec'";
1478 goto loaderr;
1479 }
1480 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1481 server.requirepass = zstrdup(argv[1]);
1482 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1483 server.pidfile = zstrdup(argv[1]);
1484 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1485 server.dbfilename = zstrdup(argv[1]);
1486 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1487 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1488 err = "argument must be 'yes' or 'no'"; goto loaderr;
1489 }
1490 } else {
1491 err = "Bad directive or wrong number of arguments"; goto loaderr;
1492 }
1493 for (j = 0; j < argc; j++)
1494 sdsfree(argv[j]);
1495 zfree(argv);
1496 sdsfree(line);
1497 }
1498 if (fp != stdin) fclose(fp);
1499 return;
1500
1501 loaderr:
1502 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1503 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1504 fprintf(stderr, ">>> '%s'\n", line);
1505 fprintf(stderr, "%s\n", err);
1506 exit(1);
1507 }
1508
1509 static void freeClientArgv(redisClient *c) {
1510 int j;
1511
1512 for (j = 0; j < c->argc; j++)
1513 decrRefCount(c->argv[j]);
1514 for (j = 0; j < c->mbargc; j++)
1515 decrRefCount(c->mbargv[j]);
1516 c->argc = 0;
1517 c->mbargc = 0;
1518 }
1519
1520 static void freeClient(redisClient *c) {
1521 listNode *ln;
1522
1523 /* Note that if the client we are freeing is blocked into a blocking
1524 * call, we have to set querybuf to NULL *before* to call unblockClient()
1525 * to avoid processInputBuffer() will get called. Also it is important
1526 * to remove the file events after this, because this call adds
1527 * the READABLE event. */
1528 sdsfree(c->querybuf);
1529 c->querybuf = NULL;
1530 if (c->flags & REDIS_BLOCKED)
1531 unblockClient(c);
1532
1533 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1534 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1535 listRelease(c->reply);
1536 freeClientArgv(c);
1537 close(c->fd);
1538 ln = listSearchKey(server.clients,c);
1539 redisAssert(ln != NULL);
1540 listDelNode(server.clients,ln);
1541 if (c->flags & REDIS_SLAVE) {
1542 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1543 close(c->repldbfd);
1544 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1545 ln = listSearchKey(l,c);
1546 redisAssert(ln != NULL);
1547 listDelNode(l,ln);
1548 }
1549 if (c->flags & REDIS_MASTER) {
1550 server.master = NULL;
1551 server.replstate = REDIS_REPL_CONNECT;
1552 }
1553 zfree(c->argv);
1554 zfree(c->mbargv);
1555 freeClientMultiState(c);
1556 zfree(c);
1557 }
1558
1559 #define GLUEREPLY_UP_TO (1024)
1560 static void glueReplyBuffersIfNeeded(redisClient *c) {
1561 int copylen = 0;
1562 char buf[GLUEREPLY_UP_TO];
1563 listNode *ln;
1564 robj *o;
1565
1566 listRewind(c->reply);
1567 while((ln = listYield(c->reply))) {
1568 int objlen;
1569
1570 o = ln->value;
1571 objlen = sdslen(o->ptr);
1572 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1573 memcpy(buf+copylen,o->ptr,objlen);
1574 copylen += objlen;
1575 listDelNode(c->reply,ln);
1576 } else {
1577 if (copylen == 0) return;
1578 break;
1579 }
1580 }
1581 /* Now the output buffer is empty, add the new single element */
1582 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1583 listAddNodeHead(c->reply,o);
1584 }
1585
1586 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1587 redisClient *c = privdata;
1588 int nwritten = 0, totwritten = 0, objlen;
1589 robj *o;
1590 REDIS_NOTUSED(el);
1591 REDIS_NOTUSED(mask);
1592
1593 /* Use writev() if we have enough buffers to send */
1594 if (!server.glueoutputbuf &&
1595 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1596 !(c->flags & REDIS_MASTER))
1597 {
1598 sendReplyToClientWritev(el, fd, privdata, mask);
1599 return;
1600 }
1601
1602 while(listLength(c->reply)) {
1603 if (server.glueoutputbuf && listLength(c->reply) > 1)
1604 glueReplyBuffersIfNeeded(c);
1605
1606 o = listNodeValue(listFirst(c->reply));
1607 objlen = sdslen(o->ptr);
1608
1609 if (objlen == 0) {
1610 listDelNode(c->reply,listFirst(c->reply));
1611 continue;
1612 }
1613
1614 if (c->flags & REDIS_MASTER) {
1615 /* Don't reply to a master */
1616 nwritten = objlen - c->sentlen;
1617 } else {
1618 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1619 if (nwritten <= 0) break;
1620 }
1621 c->sentlen += nwritten;
1622 totwritten += nwritten;
1623 /* If we fully sent the object on head go to the next one */
1624 if (c->sentlen == objlen) {
1625 listDelNode(c->reply,listFirst(c->reply));
1626 c->sentlen = 0;
1627 }
1628 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1629 * bytes, in a single threaded server it's a good idea to serve
1630 * other clients as well, even if a very large request comes from
1631 * super fast link that is always able to accept data (in real world
1632 * scenario think about 'KEYS *' against the loopback interfae) */
1633 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1634 }
1635 if (nwritten == -1) {
1636 if (errno == EAGAIN) {
1637 nwritten = 0;
1638 } else {
1639 redisLog(REDIS_DEBUG,
1640 "Error writing to client: %s", strerror(errno));
1641 freeClient(c);
1642 return;
1643 }
1644 }
1645 if (totwritten > 0) c->lastinteraction = time(NULL);
1646 if (listLength(c->reply) == 0) {
1647 c->sentlen = 0;
1648 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1649 }
1650 }
1651
1652 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1653 {
1654 redisClient *c = privdata;
1655 int nwritten = 0, totwritten = 0, objlen, willwrite;
1656 robj *o;
1657 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1658 int offset, ion = 0;
1659 REDIS_NOTUSED(el);
1660 REDIS_NOTUSED(mask);
1661
1662 listNode *node;
1663 while (listLength(c->reply)) {
1664 offset = c->sentlen;
1665 ion = 0;
1666 willwrite = 0;
1667
1668 /* fill-in the iov[] array */
1669 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1670 o = listNodeValue(node);
1671 objlen = sdslen(o->ptr);
1672
1673 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1674 break;
1675
1676 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1677 break; /* no more iovecs */
1678
1679 iov[ion].iov_base = ((char*)o->ptr) + offset;
1680 iov[ion].iov_len = objlen - offset;
1681 willwrite += objlen - offset;
1682 offset = 0; /* just for the first item */
1683 ion++;
1684 }
1685
1686 if(willwrite == 0)
1687 break;
1688
1689 /* write all collected blocks at once */
1690 if((nwritten = writev(fd, iov, ion)) < 0) {
1691 if (errno != EAGAIN) {
1692 redisLog(REDIS_DEBUG,
1693 "Error writing to client: %s", strerror(errno));
1694 freeClient(c);
1695 return;
1696 }
1697 break;
1698 }
1699
1700 totwritten += nwritten;
1701 offset = c->sentlen;
1702
1703 /* remove written robjs from c->reply */
1704 while (nwritten && listLength(c->reply)) {
1705 o = listNodeValue(listFirst(c->reply));
1706 objlen = sdslen(o->ptr);
1707
1708 if(nwritten >= objlen - offset) {
1709 listDelNode(c->reply, listFirst(c->reply));
1710 nwritten -= objlen - offset;
1711 c->sentlen = 0;
1712 } else {
1713 /* partial write */
1714 c->sentlen += nwritten;
1715 break;
1716 }
1717 offset = 0;
1718 }
1719 }
1720
1721 if (totwritten > 0)
1722 c->lastinteraction = time(NULL);
1723
1724 if (listLength(c->reply) == 0) {
1725 c->sentlen = 0;
1726 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1727 }
1728 }
1729
1730 static struct redisCommand *lookupCommand(char *name) {
1731 int j = 0;
1732 while(cmdTable[j].name != NULL) {
1733 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1734 j++;
1735 }
1736 return NULL;
1737 }
1738
1739 /* resetClient prepare the client to process the next command */
1740 static void resetClient(redisClient *c) {
1741 freeClientArgv(c);
1742 c->bulklen = -1;
1743 c->multibulk = 0;
1744 }
1745
1746 /* Call() is the core of Redis execution of a command */
1747 static void call(redisClient *c, struct redisCommand *cmd) {
1748 long long dirty;
1749
1750 dirty = server.dirty;
1751 cmd->proc(c);
1752 if (server.appendonly && server.dirty-dirty)
1753 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1754 if (server.dirty-dirty && listLength(server.slaves))
1755 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1756 if (listLength(server.monitors))
1757 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1758 server.stat_numcommands++;
1759 }
1760
1761 /* If this function gets called we already read a whole
1762 * command, argments are in the client argv/argc fields.
1763 * processCommand() execute the command or prepare the
1764 * server for a bulk read from the client.
1765 *
1766 * If 1 is returned the client is still alive and valid and
1767 * and other operations can be performed by the caller. Otherwise
1768 * if 0 is returned the client was destroied (i.e. after QUIT). */
1769 static int processCommand(redisClient *c) {
1770 struct redisCommand *cmd;
1771
1772 /* Free some memory if needed (maxmemory setting) */
1773 if (server.maxmemory) freeMemoryIfNeeded();
1774
1775 /* Handle the multi bulk command type. This is an alternative protocol
1776 * supported by Redis in order to receive commands that are composed of
1777 * multiple binary-safe "bulk" arguments. The latency of processing is
1778 * a bit higher but this allows things like multi-sets, so if this
1779 * protocol is used only for MSET and similar commands this is a big win. */
1780 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1781 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1782 if (c->multibulk <= 0) {
1783 resetClient(c);
1784 return 1;
1785 } else {
1786 decrRefCount(c->argv[c->argc-1]);
1787 c->argc--;
1788 return 1;
1789 }
1790 } else if (c->multibulk) {
1791 if (c->bulklen == -1) {
1792 if (((char*)c->argv[0]->ptr)[0] != '$') {
1793 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1794 resetClient(c);
1795 return 1;
1796 } else {
1797 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1798 decrRefCount(c->argv[0]);
1799 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1800 c->argc--;
1801 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1802 resetClient(c);
1803 return 1;
1804 }
1805 c->argc--;
1806 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1807 return 1;
1808 }
1809 } else {
1810 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1811 c->mbargv[c->mbargc] = c->argv[0];
1812 c->mbargc++;
1813 c->argc--;
1814 c->multibulk--;
1815 if (c->multibulk == 0) {
1816 robj **auxargv;
1817 int auxargc;
1818
1819 /* Here we need to swap the multi-bulk argc/argv with the
1820 * normal argc/argv of the client structure. */
1821 auxargv = c->argv;
1822 c->argv = c->mbargv;
1823 c->mbargv = auxargv;
1824
1825 auxargc = c->argc;
1826 c->argc = c->mbargc;
1827 c->mbargc = auxargc;
1828
1829 /* We need to set bulklen to something different than -1
1830 * in order for the code below to process the command without
1831 * to try to read the last argument of a bulk command as
1832 * a special argument. */
1833 c->bulklen = 0;
1834 /* continue below and process the command */
1835 } else {
1836 c->bulklen = -1;
1837 return 1;
1838 }
1839 }
1840 }
1841 /* -- end of multi bulk commands processing -- */
1842
1843 /* The QUIT command is handled as a special case. Normal command
1844 * procs are unable to close the client connection safely */
1845 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1846 freeClient(c);
1847 return 0;
1848 }
1849 cmd = lookupCommand(c->argv[0]->ptr);
1850 if (!cmd) {
1851 addReplySds(c,
1852 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1853 (char*)c->argv[0]->ptr));
1854 resetClient(c);
1855 return 1;
1856 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1857 (c->argc < -cmd->arity)) {
1858 addReplySds(c,
1859 sdscatprintf(sdsempty(),
1860 "-ERR wrong number of arguments for '%s' command\r\n",
1861 cmd->name));
1862 resetClient(c);
1863 return 1;
1864 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1865 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1866 resetClient(c);
1867 return 1;
1868 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1869 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1870
1871 decrRefCount(c->argv[c->argc-1]);
1872 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1873 c->argc--;
1874 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1875 resetClient(c);
1876 return 1;
1877 }
1878 c->argc--;
1879 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1880 /* It is possible that the bulk read is already in the
1881 * buffer. Check this condition and handle it accordingly.
1882 * This is just a fast path, alternative to call processInputBuffer().
1883 * It's a good idea since the code is small and this condition
1884 * happens most of the times. */
1885 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1886 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1887 c->argc++;
1888 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1889 } else {
1890 return 1;
1891 }
1892 }
1893 /* Let's try to share objects on the command arguments vector */
1894 if (server.shareobjects) {
1895 int j;
1896 for(j = 1; j < c->argc; j++)
1897 c->argv[j] = tryObjectSharing(c->argv[j]);
1898 }
1899 /* Let's try to encode the bulk object to save space. */
1900 if (cmd->flags & REDIS_CMD_BULK)
1901 tryObjectEncoding(c->argv[c->argc-1]);
1902
1903 /* Check if the user is authenticated */
1904 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1905 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1906 resetClient(c);
1907 return 1;
1908 }
1909
1910 /* Exec the command */
1911 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1912 queueMultiCommand(c,cmd);
1913 addReply(c,shared.queued);
1914 } else {
1915 call(c,cmd);
1916 }
1917
1918 /* Prepare the client for the next command */
1919 if (c->flags & REDIS_CLOSE) {
1920 freeClient(c);
1921 return 0;
1922 }
1923 resetClient(c);
1924 return 1;
1925 }
1926
1927 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1928 listNode *ln;
1929 int outc = 0, j;
1930 robj **outv;
1931 /* (args*2)+1 is enough room for args, spaces, newlines */
1932 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1933
1934 if (argc <= REDIS_STATIC_ARGS) {
1935 outv = static_outv;
1936 } else {
1937 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1938 }
1939
1940 for (j = 0; j < argc; j++) {
1941 if (j != 0) outv[outc++] = shared.space;
1942 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1943 robj *lenobj;
1944
1945 lenobj = createObject(REDIS_STRING,
1946 sdscatprintf(sdsempty(),"%lu\r\n",
1947 (unsigned long) stringObjectLen(argv[j])));
1948 lenobj->refcount = 0;
1949 outv[outc++] = lenobj;
1950 }
1951 outv[outc++] = argv[j];
1952 }
1953 outv[outc++] = shared.crlf;
1954
1955 /* Increment all the refcounts at start and decrement at end in order to
1956 * be sure to free objects if there is no slave in a replication state
1957 * able to be feed with commands */
1958 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1959 listRewind(slaves);
1960 while((ln = listYield(slaves))) {
1961 redisClient *slave = ln->value;
1962
1963 /* Don't feed slaves that are still waiting for BGSAVE to start */
1964 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1965
1966 /* Feed all the other slaves, MONITORs and so on */
1967 if (slave->slaveseldb != dictid) {
1968 robj *selectcmd;
1969
1970 switch(dictid) {
1971 case 0: selectcmd = shared.select0; break;
1972 case 1: selectcmd = shared.select1; break;
1973 case 2: selectcmd = shared.select2; break;
1974 case 3: selectcmd = shared.select3; break;
1975 case 4: selectcmd = shared.select4; break;
1976 case 5: selectcmd = shared.select5; break;
1977 case 6: selectcmd = shared.select6; break;
1978 case 7: selectcmd = shared.select7; break;
1979 case 8: selectcmd = shared.select8; break;
1980 case 9: selectcmd = shared.select9; break;
1981 default:
1982 selectcmd = createObject(REDIS_STRING,
1983 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1984 selectcmd->refcount = 0;
1985 break;
1986 }
1987 addReply(slave,selectcmd);
1988 slave->slaveseldb = dictid;
1989 }
1990 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1991 }
1992 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1993 if (outv != static_outv) zfree(outv);
1994 }
1995
1996 static void processInputBuffer(redisClient *c) {
1997 again:
1998 /* Before to process the input buffer, make sure the client is not
1999 * waitig for a blocking operation such as BLPOP. Note that the first
2000 * iteration the client is never blocked, otherwise the processInputBuffer
2001 * would not be called at all, but after the execution of the first commands
2002 * in the input buffer the client may be blocked, and the "goto again"
2003 * will try to reiterate. The following line will make it return asap. */
2004 if (c->flags & REDIS_BLOCKED) return;
2005 if (c->bulklen == -1) {
2006 /* Read the first line of the query */
2007 char *p = strchr(c->querybuf,'\n');
2008 size_t querylen;
2009
2010 if (p) {
2011 sds query, *argv;
2012 int argc, j;
2013
2014 query = c->querybuf;
2015 c->querybuf = sdsempty();
2016 querylen = 1+(p-(query));
2017 if (sdslen(query) > querylen) {
2018 /* leave data after the first line of the query in the buffer */
2019 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2020 }
2021 *p = '\0'; /* remove "\n" */
2022 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2023 sdsupdatelen(query);
2024
2025 /* Now we can split the query in arguments */
2026 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2027 sdsfree(query);
2028
2029 if (c->argv) zfree(c->argv);
2030 c->argv = zmalloc(sizeof(robj*)*argc);
2031
2032 for (j = 0; j < argc; j++) {
2033 if (sdslen(argv[j])) {
2034 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2035 c->argc++;
2036 } else {
2037 sdsfree(argv[j]);
2038 }
2039 }
2040 zfree(argv);
2041 if (c->argc) {
2042 /* Execute the command. If the client is still valid
2043 * after processCommand() return and there is something
2044 * on the query buffer try to process the next command. */
2045 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2046 } else {
2047 /* Nothing to process, argc == 0. Just process the query
2048 * buffer if it's not empty or return to the caller */
2049 if (sdslen(c->querybuf)) goto again;
2050 }
2051 return;
2052 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2053 redisLog(REDIS_DEBUG, "Client protocol error");
2054 freeClient(c);
2055 return;
2056 }
2057 } else {
2058 /* Bulk read handling. Note that if we are at this point
2059 the client already sent a command terminated with a newline,
2060 we are reading the bulk data that is actually the last
2061 argument of the command. */
2062 int qbl = sdslen(c->querybuf);
2063
2064 if (c->bulklen <= qbl) {
2065 /* Copy everything but the final CRLF as final argument */
2066 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2067 c->argc++;
2068 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2069 /* Process the command. If the client is still valid after
2070 * the processing and there is more data in the buffer
2071 * try to parse it. */
2072 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2073 return;
2074 }
2075 }
2076 }
2077
2078 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2079 redisClient *c = (redisClient*) privdata;
2080 char buf[REDIS_IOBUF_LEN];
2081 int nread;
2082 REDIS_NOTUSED(el);
2083 REDIS_NOTUSED(mask);
2084
2085 nread = read(fd, buf, REDIS_IOBUF_LEN);
2086 if (nread == -1) {
2087 if (errno == EAGAIN) {
2088 nread = 0;
2089 } else {
2090 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2091 freeClient(c);
2092 return;
2093 }
2094 } else if (nread == 0) {
2095 redisLog(REDIS_DEBUG, "Client closed connection");
2096 freeClient(c);
2097 return;
2098 }
2099 if (nread) {
2100 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2101 c->lastinteraction = time(NULL);
2102 } else {
2103 return;
2104 }
2105 processInputBuffer(c);
2106 }
2107
2108 static int selectDb(redisClient *c, int id) {
2109 if (id < 0 || id >= server.dbnum)
2110 return REDIS_ERR;
2111 c->db = &server.db[id];
2112 return REDIS_OK;
2113 }
2114
2115 static void *dupClientReplyValue(void *o) {
2116 incrRefCount((robj*)o);
2117 return 0;
2118 }
2119
2120 static redisClient *createClient(int fd) {
2121 redisClient *c = zmalloc(sizeof(*c));
2122
2123 anetNonBlock(NULL,fd);
2124 anetTcpNoDelay(NULL,fd);
2125 if (!c) return NULL;
2126 selectDb(c,0);
2127 c->fd = fd;
2128 c->querybuf = sdsempty();
2129 c->argc = 0;
2130 c->argv = NULL;
2131 c->bulklen = -1;
2132 c->multibulk = 0;
2133 c->mbargc = 0;
2134 c->mbargv = NULL;
2135 c->sentlen = 0;
2136 c->flags = 0;
2137 c->lastinteraction = time(NULL);
2138 c->authenticated = 0;
2139 c->replstate = REDIS_REPL_NONE;
2140 c->reply = listCreate();
2141 c->blockingkeys = NULL;
2142 c->blockingkeysnum = 0;
2143 listSetFreeMethod(c->reply,decrRefCount);
2144 listSetDupMethod(c->reply,dupClientReplyValue);
2145 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2146 readQueryFromClient, c) == AE_ERR) {
2147 freeClient(c);
2148 return NULL;
2149 }
2150 listAddNodeTail(server.clients,c);
2151 initClientMultiState(c);
2152 return c;
2153 }
2154
2155 static void addReply(redisClient *c, robj *obj) {
2156 if (listLength(c->reply) == 0 &&
2157 (c->replstate == REDIS_REPL_NONE ||
2158 c->replstate == REDIS_REPL_ONLINE) &&
2159 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2160 sendReplyToClient, c) == AE_ERR) return;
2161 listAddNodeTail(c->reply,getDecodedObject(obj));
2162 }
2163
2164 static void addReplySds(redisClient *c, sds s) {
2165 robj *o = createObject(REDIS_STRING,s);
2166 addReply(c,o);
2167 decrRefCount(o);
2168 }
2169
2170 static void addReplyDouble(redisClient *c, double d) {
2171 char buf[128];
2172
2173 snprintf(buf,sizeof(buf),"%.17g",d);
2174 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2175 (unsigned long) strlen(buf),buf));
2176 }
2177
2178 static void addReplyBulkLen(redisClient *c, robj *obj) {
2179 size_t len;
2180
2181 if (obj->encoding == REDIS_ENCODING_RAW) {
2182 len = sdslen(obj->ptr);
2183 } else {
2184 long n = (long)obj->ptr;
2185
2186 /* Compute how many bytes will take this integer as a radix 10 string */
2187 len = 1;
2188 if (n < 0) {
2189 len++;
2190 n = -n;
2191 }
2192 while((n = n/10) != 0) {
2193 len++;
2194 }
2195 }
2196 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2197 }
2198
2199 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2200 int cport, cfd;
2201 char cip[128];
2202 redisClient *c;
2203 REDIS_NOTUSED(el);
2204 REDIS_NOTUSED(mask);
2205 REDIS_NOTUSED(privdata);
2206
2207 cfd = anetAccept(server.neterr, fd, cip, &cport);
2208 if (cfd == AE_ERR) {
2209 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2210 return;
2211 }
2212 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2213 if ((c = createClient(cfd)) == NULL) {
2214 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2215 close(cfd); /* May be already closed, just ingore errors */
2216 return;
2217 }
2218 /* If maxclient directive is set and this is one client more... close the
2219 * connection. Note that we create the client instead to check before
2220 * for this condition, since now the socket is already set in nonblocking
2221 * mode and we can send an error for free using the Kernel I/O */
2222 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2223 char *err = "-ERR max number of clients reached\r\n";
2224
2225 /* That's a best effort error message, don't check write errors */
2226 if (write(c->fd,err,strlen(err)) == -1) {
2227 /* Nothing to do, Just to avoid the warning... */
2228 }
2229 freeClient(c);
2230 return;
2231 }
2232 server.stat_numconnections++;
2233 }
2234
2235 /* ======================= Redis objects implementation ===================== */
2236
2237 static robj *createObject(int type, void *ptr) {
2238 robj *o;
2239
2240 if (listLength(server.objfreelist)) {
2241 listNode *head = listFirst(server.objfreelist);
2242 o = listNodeValue(head);
2243 listDelNode(server.objfreelist,head);
2244 } else {
2245 if (server.vm_enabled) {
2246 o = zmalloc(sizeof(*o));
2247 } else {
2248 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2249 }
2250 }
2251 o->type = type;
2252 o->encoding = REDIS_ENCODING_RAW;
2253 o->ptr = ptr;
2254 o->refcount = 1;
2255 return o;
2256 }
2257
2258 static robj *createStringObject(char *ptr, size_t len) {
2259 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2260 }
2261
2262 static robj *createListObject(void) {
2263 list *l = listCreate();
2264
2265 listSetFreeMethod(l,decrRefCount);
2266 return createObject(REDIS_LIST,l);
2267 }
2268
2269 static robj *createSetObject(void) {
2270 dict *d = dictCreate(&setDictType,NULL);
2271 return createObject(REDIS_SET,d);
2272 }
2273
2274 static robj *createZsetObject(void) {
2275 zset *zs = zmalloc(sizeof(*zs));
2276
2277 zs->dict = dictCreate(&zsetDictType,NULL);
2278 zs->zsl = zslCreate();
2279 return createObject(REDIS_ZSET,zs);
2280 }
2281
2282 static void freeStringObject(robj *o) {
2283 if (o->encoding == REDIS_ENCODING_RAW) {
2284 sdsfree(o->ptr);
2285 }
2286 }
2287
2288 static void freeListObject(robj *o) {
2289 listRelease((list*) o->ptr);
2290 }
2291
2292 static void freeSetObject(robj *o) {
2293 dictRelease((dict*) o->ptr);
2294 }
2295
2296 static void freeZsetObject(robj *o) {
2297 zset *zs = o->ptr;
2298
2299 dictRelease(zs->dict);
2300 zslFree(zs->zsl);
2301 zfree(zs);
2302 }
2303
2304 static void freeHashObject(robj *o) {
2305 dictRelease((dict*) o->ptr);
2306 }
2307
2308 static void incrRefCount(robj *o) {
2309 o->refcount++;
2310 #ifdef DEBUG_REFCOUNT
2311 if (o->type == REDIS_STRING)
2312 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2313 #endif
2314 }
2315
2316 static void decrRefCount(void *obj) {
2317 robj *o = obj;
2318
2319 #ifdef DEBUG_REFCOUNT
2320 if (o->type == REDIS_STRING)
2321 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2322 #endif
2323 if (--(o->refcount) == 0) {
2324 switch(o->type) {
2325 case REDIS_STRING: freeStringObject(o); break;
2326 case REDIS_LIST: freeListObject(o); break;
2327 case REDIS_SET: freeSetObject(o); break;
2328 case REDIS_ZSET: freeZsetObject(o); break;
2329 case REDIS_HASH: freeHashObject(o); break;
2330 default: redisAssert(0 != 0); break;
2331 }
2332 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2333 !listAddNodeHead(server.objfreelist,o))
2334 zfree(o);
2335 }
2336 }
2337
2338 static robj *lookupKey(redisDb *db, robj *key) {
2339 dictEntry *de = dictFind(db->dict,key);
2340 return de ? dictGetEntryVal(de) : NULL;
2341 }
2342
2343 static robj *lookupKeyRead(redisDb *db, robj *key) {
2344 expireIfNeeded(db,key);
2345 return lookupKey(db,key);
2346 }
2347
2348 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2349 deleteIfVolatile(db,key);
2350 return lookupKey(db,key);
2351 }
2352
2353 static int deleteKey(redisDb *db, robj *key) {
2354 int retval;
2355
2356 /* We need to protect key from destruction: after the first dictDelete()
2357 * it may happen that 'key' is no longer valid if we don't increment
2358 * it's count. This may happen when we get the object reference directly
2359 * from the hash table with dictRandomKey() or dict iterators */
2360 incrRefCount(key);
2361 if (dictSize(db->expires)) dictDelete(db->expires,key);
2362 retval = dictDelete(db->dict,key);
2363 decrRefCount(key);
2364
2365 return retval == DICT_OK;
2366 }
2367
2368 /* Try to share an object against the shared objects pool */
2369 static robj *tryObjectSharing(robj *o) {
2370 struct dictEntry *de;
2371 unsigned long c;
2372
2373 if (o == NULL || server.shareobjects == 0) return o;
2374
2375 redisAssert(o->type == REDIS_STRING);
2376 de = dictFind(server.sharingpool,o);
2377 if (de) {
2378 robj *shared = dictGetEntryKey(de);
2379
2380 c = ((unsigned long) dictGetEntryVal(de))+1;
2381 dictGetEntryVal(de) = (void*) c;
2382 incrRefCount(shared);
2383 decrRefCount(o);
2384 return shared;
2385 } else {
2386 /* Here we are using a stream algorihtm: Every time an object is
2387 * shared we increment its count, everytime there is a miss we
2388 * recrement the counter of a random object. If this object reaches
2389 * zero we remove the object and put the current object instead. */
2390 if (dictSize(server.sharingpool) >=
2391 server.sharingpoolsize) {
2392 de = dictGetRandomKey(server.sharingpool);
2393 redisAssert(de != NULL);
2394 c = ((unsigned long) dictGetEntryVal(de))-1;
2395 dictGetEntryVal(de) = (void*) c;
2396 if (c == 0) {
2397 dictDelete(server.sharingpool,de->key);
2398 }
2399 } else {
2400 c = 0; /* If the pool is empty we want to add this object */
2401 }
2402 if (c == 0) {
2403 int retval;
2404
2405 retval = dictAdd(server.sharingpool,o,(void*)1);
2406 redisAssert(retval == DICT_OK);
2407 incrRefCount(o);
2408 }
2409 return o;
2410 }
2411 }
2412
2413 /* Check if the nul-terminated string 's' can be represented by a long
2414 * (that is, is a number that fits into long without any other space or
2415 * character before or after the digits).
2416 *
2417 * If so, the function returns REDIS_OK and *longval is set to the value
2418 * of the number. Otherwise REDIS_ERR is returned */
2419 static int isStringRepresentableAsLong(sds s, long *longval) {
2420 char buf[32], *endptr;
2421 long value;
2422 int slen;
2423
2424 value = strtol(s, &endptr, 10);
2425 if (endptr[0] != '\0') return REDIS_ERR;
2426 slen = snprintf(buf,32,"%ld",value);
2427
2428 /* If the number converted back into a string is not identical
2429 * then it's not possible to encode the string as integer */
2430 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2431 if (longval) *longval = value;
2432 return REDIS_OK;
2433 }
2434
2435 /* Try to encode a string object in order to save space */
2436 static int tryObjectEncoding(robj *o) {
2437 long value;
2438 sds s = o->ptr;
2439
2440 if (o->encoding != REDIS_ENCODING_RAW)
2441 return REDIS_ERR; /* Already encoded */
2442
2443 /* It's not save to encode shared objects: shared objects can be shared
2444 * everywhere in the "object space" of Redis. Encoded objects can only
2445 * appear as "values" (and not, for instance, as keys) */
2446 if (o->refcount > 1) return REDIS_ERR;
2447
2448 /* Currently we try to encode only strings */
2449 redisAssert(o->type == REDIS_STRING);
2450
2451 /* Check if we can represent this string as a long integer */
2452 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2453
2454 /* Ok, this object can be encoded */
2455 o->encoding = REDIS_ENCODING_INT;
2456 sdsfree(o->ptr);
2457 o->ptr = (void*) value;
2458 return REDIS_OK;
2459 }
2460
2461 /* Get a decoded version of an encoded object (returned as a new object).
2462 * If the object is already raw-encoded just increment the ref count. */
2463 static robj *getDecodedObject(robj *o) {
2464 robj *dec;
2465
2466 if (o->encoding == REDIS_ENCODING_RAW) {
2467 incrRefCount(o);
2468 return o;
2469 }
2470 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2471 char buf[32];
2472
2473 snprintf(buf,32,"%ld",(long)o->ptr);
2474 dec = createStringObject(buf,strlen(buf));
2475 return dec;
2476 } else {
2477 redisAssert(1 != 1);
2478 }
2479 }
2480
2481 /* Compare two string objects via strcmp() or alike.
2482 * Note that the objects may be integer-encoded. In such a case we
2483 * use snprintf() to get a string representation of the numbers on the stack
2484 * and compare the strings, it's much faster than calling getDecodedObject().
2485 *
2486 * Important note: if objects are not integer encoded, but binary-safe strings,
2487 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2488 * binary safe. */
2489 static int compareStringObjects(robj *a, robj *b) {
2490 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2491 char bufa[128], bufb[128], *astr, *bstr;
2492 int bothsds = 1;
2493
2494 if (a == b) return 0;
2495 if (a->encoding != REDIS_ENCODING_RAW) {
2496 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2497 astr = bufa;
2498 bothsds = 0;
2499 } else {
2500 astr = a->ptr;
2501 }
2502 if (b->encoding != REDIS_ENCODING_RAW) {
2503 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2504 bstr = bufb;
2505 bothsds = 0;
2506 } else {
2507 bstr = b->ptr;
2508 }
2509 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2510 }
2511
2512 static size_t stringObjectLen(robj *o) {
2513 redisAssert(o->type == REDIS_STRING);
2514 if (o->encoding == REDIS_ENCODING_RAW) {
2515 return sdslen(o->ptr);
2516 } else {
2517 char buf[32];
2518
2519 return snprintf(buf,32,"%ld",(long)o->ptr);
2520 }
2521 }
2522
2523 /*============================ RDB saving/loading =========================== */
2524
2525 static int rdbSaveType(FILE *fp, unsigned char type) {
2526 if (fwrite(&type,1,1,fp) == 0) return -1;
2527 return 0;
2528 }
2529
2530 static int rdbSaveTime(FILE *fp, time_t t) {
2531 int32_t t32 = (int32_t) t;
2532 if (fwrite(&t32,4,1,fp) == 0) return -1;
2533 return 0;
2534 }
2535
2536 /* check rdbLoadLen() comments for more info */
2537 static int rdbSaveLen(FILE *fp, uint32_t len) {
2538 unsigned char buf[2];
2539
2540 if (len < (1<<6)) {
2541 /* Save a 6 bit len */
2542 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2543 if (fwrite(buf,1,1,fp) == 0) return -1;
2544 } else if (len < (1<<14)) {
2545 /* Save a 14 bit len */
2546 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2547 buf[1] = len&0xFF;
2548 if (fwrite(buf,2,1,fp) == 0) return -1;
2549 } else {
2550 /* Save a 32 bit len */
2551 buf[0] = (REDIS_RDB_32BITLEN<<6);
2552 if (fwrite(buf,1,1,fp) == 0) return -1;
2553 len = htonl(len);
2554 if (fwrite(&len,4,1,fp) == 0) return -1;
2555 }
2556 return 0;
2557 }
2558
2559 /* String objects in the form "2391" "-100" without any space and with a
2560 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2561 * encoded as integers to save space */
2562 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2563 long long value;
2564 char *endptr, buf[32];
2565
2566 /* Check if it's possible to encode this value as a number */
2567 value = strtoll(s, &endptr, 10);
2568 if (endptr[0] != '\0') return 0;
2569 snprintf(buf,32,"%lld",value);
2570
2571 /* If the number converted back into a string is not identical
2572 * then it's not possible to encode the string as integer */
2573 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2574
2575 /* Finally check if it fits in our ranges */
2576 if (value >= -(1<<7) && value <= (1<<7)-1) {
2577 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2578 enc[1] = value&0xFF;
2579 return 2;
2580 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2581 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2582 enc[1] = value&0xFF;
2583 enc[2] = (value>>8)&0xFF;
2584 return 3;
2585 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2586 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2587 enc[1] = value&0xFF;
2588 enc[2] = (value>>8)&0xFF;
2589 enc[3] = (value>>16)&0xFF;
2590 enc[4] = (value>>24)&0xFF;
2591 return 5;
2592 } else {
2593 return 0;
2594 }
2595 }
2596
2597 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2598 unsigned int comprlen, outlen;
2599 unsigned char byte;
2600 void *out;
2601
2602 /* We require at least four bytes compression for this to be worth it */
2603 outlen = sdslen(obj->ptr)-4;
2604 if (outlen <= 0) return 0;
2605 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2606 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2607 if (comprlen == 0) {
2608 zfree(out);
2609 return 0;
2610 }
2611 /* Data compressed! Let's save it on disk */
2612 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2613 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2614 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2615 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2616 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2617 zfree(out);
2618 return comprlen;
2619
2620 writeerr:
2621 zfree(out);
2622 return -1;
2623 }
2624
2625 /* Save a string objet as [len][data] on disk. If the object is a string
2626 * representation of an integer value we try to safe it in a special form */
2627 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2628 size_t len;
2629 int enclen;
2630
2631 len = sdslen(obj->ptr);
2632
2633 /* Try integer encoding */
2634 if (len <= 11) {
2635 unsigned char buf[5];
2636 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2637 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2638 return 0;
2639 }
2640 }
2641
2642 /* Try LZF compression - under 20 bytes it's unable to compress even
2643 * aaaaaaaaaaaaaaaaaa so skip it */
2644 if (server.rdbcompression && len > 20) {
2645 int retval;
2646
2647 retval = rdbSaveLzfStringObject(fp,obj);
2648 if (retval == -1) return -1;
2649 if (retval > 0) return 0;
2650 /* retval == 0 means data can't be compressed, save the old way */
2651 }
2652
2653 /* Store verbatim */
2654 if (rdbSaveLen(fp,len) == -1) return -1;
2655 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2656 return 0;
2657 }
2658
2659 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2660 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2661 int retval;
2662
2663 obj = getDecodedObject(obj);
2664 retval = rdbSaveStringObjectRaw(fp,obj);
2665 decrRefCount(obj);
2666 return retval;
2667 }
2668
2669 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2670 * 8 bit integer specifing the length of the representation.
2671 * This 8 bit integer has special values in order to specify the following
2672 * conditions:
2673 * 253: not a number
2674 * 254: + inf
2675 * 255: - inf
2676 */
2677 static int rdbSaveDoubleValue(FILE *fp, double val) {
2678 unsigned char buf[128];
2679 int len;
2680
2681 if (isnan(val)) {
2682 buf[0] = 253;
2683 len = 1;
2684 } else if (!isfinite(val)) {
2685 len = 1;
2686 buf[0] = (val < 0) ? 255 : 254;
2687 } else {
2688 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2689 buf[0] = strlen((char*)buf+1);
2690 len = buf[0]+1;
2691 }
2692 if (fwrite(buf,len,1,fp) == 0) return -1;
2693 return 0;
2694 }
2695
2696 /* Save a Redis object. */
2697 static int rdbSaveObject(FILE *fp, robj *o) {
2698 if (o->type == REDIS_STRING) {
2699 /* Save a string value */
2700 if (rdbSaveStringObject(fp,o) == -1) return -1;
2701 } else if (o->type == REDIS_LIST) {
2702 /* Save a list value */
2703 list *list = o->ptr;
2704 listNode *ln;
2705
2706 listRewind(list);
2707 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2708 while((ln = listYield(list))) {
2709 robj *eleobj = listNodeValue(ln);
2710
2711 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2712 }
2713 } else if (o->type == REDIS_SET) {
2714 /* Save a set value */
2715 dict *set = o->ptr;
2716 dictIterator *di = dictGetIterator(set);
2717 dictEntry *de;
2718
2719 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2720 while((de = dictNext(di)) != NULL) {
2721 robj *eleobj = dictGetEntryKey(de);
2722
2723 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2724 }
2725 dictReleaseIterator(di);
2726 } else if (o->type == REDIS_ZSET) {
2727 /* Save a set value */
2728 zset *zs = o->ptr;
2729 dictIterator *di = dictGetIterator(zs->dict);
2730 dictEntry *de;
2731
2732 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2733 while((de = dictNext(di)) != NULL) {
2734 robj *eleobj = dictGetEntryKey(de);
2735 double *score = dictGetEntryVal(de);
2736
2737 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2738 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2739 }
2740 dictReleaseIterator(di);
2741 } else {
2742 redisAssert(0 != 0);
2743 }
2744 return 0;
2745 }
2746
2747 /* Return the length the object will have on disk if saved with
2748 * the rdbSaveObject() function. Currently we use a trick to get
2749 * this length with very little changes to the code. In the future
2750 * we could switch to a faster solution. */
2751 static off_t rdbSavedObjectLen(robj *o) {
2752 static FILE *fp = NULL;
2753
2754 if (fp == NULL) fp = fopen("/dev/null","w");
2755 assert(fp != NULL);
2756
2757 rewind(fp);
2758 assert(rdbSaveObject(fp,o) != 1);
2759 return ftello(fp);
2760 }
2761
2762 /* Return the number of pages required to save this object in the swap file */
2763 static off_t rdbSavedObjectPages(robj *o) {
2764 off_t bytes = rdbSavedObjectLen(o);
2765
2766 return (bytes+(server.vm_page_size-1))/server.vm_page_size;
2767 }
2768
2769 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2770 static int rdbSave(char *filename) {
2771 dictIterator *di = NULL;
2772 dictEntry *de;
2773 FILE *fp;
2774 char tmpfile[256];
2775 int j;
2776 time_t now = time(NULL);
2777
2778 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2779 fp = fopen(tmpfile,"w");
2780 if (!fp) {
2781 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2782 return REDIS_ERR;
2783 }
2784 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2785 for (j = 0; j < server.dbnum; j++) {
2786 redisDb *db = server.db+j;
2787 dict *d = db->dict;
2788 if (dictSize(d) == 0) continue;
2789 di = dictGetIterator(d);
2790 if (!di) {
2791 fclose(fp);
2792 return REDIS_ERR;
2793 }
2794
2795 /* Write the SELECT DB opcode */
2796 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2797 if (rdbSaveLen(fp,j) == -1) goto werr;
2798
2799 /* Iterate this DB writing every entry */
2800 while((de = dictNext(di)) != NULL) {
2801 robj *key = dictGetEntryKey(de);
2802 robj *o = dictGetEntryVal(de);
2803 time_t expiretime = getExpire(db,key);
2804
2805 /* Save the expire time */
2806 if (expiretime != -1) {
2807 /* If this key is already expired skip it */
2808 if (expiretime < now) continue;
2809 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2810 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2811 }
2812 /* Save the key and associated value */
2813 if (rdbSaveType(fp,o->type) == -1) goto werr;
2814 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2815 /* Save the actual value */
2816 if (rdbSaveObject(fp,o) == -1) goto werr;
2817 }
2818 dictReleaseIterator(di);
2819 }
2820 /* EOF opcode */
2821 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2822
2823 /* Make sure data will not remain on the OS's output buffers */
2824 fflush(fp);
2825 fsync(fileno(fp));
2826 fclose(fp);
2827
2828 /* Use RENAME to make sure the DB file is changed atomically only
2829 * if the generate DB file is ok. */
2830 if (rename(tmpfile,filename) == -1) {
2831 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2832 unlink(tmpfile);
2833 return REDIS_ERR;
2834 }
2835 redisLog(REDIS_NOTICE,"DB saved on disk");
2836 server.dirty = 0;
2837 server.lastsave = time(NULL);
2838 return REDIS_OK;
2839
2840 werr:
2841 fclose(fp);
2842 unlink(tmpfile);
2843 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2844 if (di) dictReleaseIterator(di);
2845 return REDIS_ERR;
2846 }
2847
2848 static int rdbSaveBackground(char *filename) {
2849 pid_t childpid;
2850
2851 if (server.bgsavechildpid != -1) return REDIS_ERR;
2852 if ((childpid = fork()) == 0) {
2853 /* Child */
2854 close(server.fd);
2855 if (rdbSave(filename) == REDIS_OK) {
2856 exit(0);
2857 } else {
2858 exit(1);
2859 }
2860 } else {
2861 /* Parent */
2862 if (childpid == -1) {
2863 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2864 strerror(errno));
2865 return REDIS_ERR;
2866 }
2867 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2868 server.bgsavechildpid = childpid;
2869 return REDIS_OK;
2870 }
2871 return REDIS_OK; /* unreached */
2872 }
2873
2874 static void rdbRemoveTempFile(pid_t childpid) {
2875 char tmpfile[256];
2876
2877 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2878 unlink(tmpfile);
2879 }
2880
2881 static int rdbLoadType(FILE *fp) {
2882 unsigned char type;
2883 if (fread(&type,1,1,fp) == 0) return -1;
2884 return type;
2885 }
2886
2887 static time_t rdbLoadTime(FILE *fp) {
2888 int32_t t32;
2889 if (fread(&t32,4,1,fp) == 0) return -1;
2890 return (time_t) t32;
2891 }
2892
2893 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2894 * of this file for a description of how this are stored on disk.
2895 *
2896 * isencoded is set to 1 if the readed length is not actually a length but
2897 * an "encoding type", check the above comments for more info */
2898 static uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
2899 unsigned char buf[2];
2900 uint32_t len;
2901 int type;
2902
2903 if (isencoded) *isencoded = 0;
2904 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2905 type = (buf[0]&0xC0)>>6;
2906 if (type == REDIS_RDB_6BITLEN) {
2907 /* Read a 6 bit len */
2908 return buf[0]&0x3F;
2909 } else if (type == REDIS_RDB_ENCVAL) {
2910 /* Read a 6 bit len encoding type */
2911 if (isencoded) *isencoded = 1;
2912 return buf[0]&0x3F;
2913 } else if (type == REDIS_RDB_14BITLEN) {
2914 /* Read a 14 bit len */
2915 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2916 return ((buf[0]&0x3F)<<8)|buf[1];
2917 } else {
2918 /* Read a 32 bit len */
2919 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2920 return ntohl(len);
2921 }
2922 }
2923
2924 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2925 unsigned char enc[4];
2926 long long val;
2927
2928 if (enctype == REDIS_RDB_ENC_INT8) {
2929 if (fread(enc,1,1,fp) == 0) return NULL;
2930 val = (signed char)enc[0];
2931 } else if (enctype == REDIS_RDB_ENC_INT16) {
2932 uint16_t v;
2933 if (fread(enc,2,1,fp) == 0) return NULL;
2934 v = enc[0]|(enc[1]<<8);
2935 val = (int16_t)v;
2936 } else if (enctype == REDIS_RDB_ENC_INT32) {
2937 uint32_t v;
2938 if (fread(enc,4,1,fp) == 0) return NULL;
2939 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2940 val = (int32_t)v;
2941 } else {
2942 val = 0; /* anti-warning */
2943 redisAssert(0!=0);
2944 }
2945 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2946 }
2947
2948 static robj *rdbLoadLzfStringObject(FILE*fp) {
2949 unsigned int len, clen;
2950 unsigned char *c = NULL;
2951 sds val = NULL;
2952
2953 if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2954 if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
2955 if ((c = zmalloc(clen)) == NULL) goto err;
2956 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2957 if (fread(c,clen,1,fp) == 0) goto err;
2958 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2959 zfree(c);
2960 return createObject(REDIS_STRING,val);
2961 err:
2962 zfree(c);
2963 sdsfree(val);
2964 return NULL;
2965 }
2966
2967 static robj *rdbLoadStringObject(FILE*fp) {
2968 int isencoded;
2969 uint32_t len;
2970 sds val;
2971
2972 len = rdbLoadLen(fp,&isencoded);
2973 if (isencoded) {
2974 switch(len) {
2975 case REDIS_RDB_ENC_INT8:
2976 case REDIS_RDB_ENC_INT16:
2977 case REDIS_RDB_ENC_INT32:
2978 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2979 case REDIS_RDB_ENC_LZF:
2980 return tryObjectSharing(rdbLoadLzfStringObject(fp));
2981 default:
2982 redisAssert(0!=0);
2983 }
2984 }
2985
2986 if (len == REDIS_RDB_LENERR) return NULL;
2987 val = sdsnewlen(NULL,len);
2988 if (len && fread(val,len,1,fp) == 0) {
2989 sdsfree(val);
2990 return NULL;
2991 }
2992 return tryObjectSharing(createObject(REDIS_STRING,val));
2993 }
2994
2995 /* For information about double serialization check rdbSaveDoubleValue() */
2996 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2997 char buf[128];
2998 unsigned char len;
2999
3000 if (fread(&len,1,1,fp) == 0) return -1;
3001 switch(len) {
3002 case 255: *val = R_NegInf; return 0;
3003 case 254: *val = R_PosInf; return 0;
3004 case 253: *val = R_Nan; return 0;
3005 default:
3006 if (fread(buf,len,1,fp) == 0) return -1;
3007 buf[len] = '\0';
3008 sscanf(buf, "%lg", val);
3009 return 0;
3010 }
3011 }
3012
3013 /* Load a Redis object of the specified type from the specified file.
3014 * On success a newly allocated object is returned, otherwise NULL. */
3015 static robj *rdbLoadObject(int type, FILE *fp) {
3016 robj *o;
3017
3018 if (type == REDIS_STRING) {
3019 /* Read string value */
3020 if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
3021 tryObjectEncoding(o);
3022 } else if (type == REDIS_LIST || type == REDIS_SET) {
3023 /* Read list/set value */
3024 uint32_t listlen;
3025
3026 if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3027 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3028 /* Load every single element of the list/set */
3029 while(listlen--) {
3030 robj *ele;
3031
3032 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3033 tryObjectEncoding(ele);
3034 if (type == REDIS_LIST) {
3035 listAddNodeTail((list*)o->ptr,ele);
3036 } else {
3037 dictAdd((dict*)o->ptr,ele,NULL);
3038 }
3039 }
3040 } else if (type == REDIS_ZSET) {
3041 /* Read list/set value */
3042 uint32_t zsetlen;
3043 zset *zs;
3044
3045 if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
3046 o = createZsetObject();
3047 zs = o->ptr;
3048 /* Load every single element of the list/set */
3049 while(zsetlen--) {
3050 robj *ele;
3051 double *score = zmalloc(sizeof(double));
3052
3053 if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
3054 tryObjectEncoding(ele);
3055 if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
3056 dictAdd(zs->dict,ele,score);
3057 zslInsert(zs->zsl,*score,ele);
3058 incrRefCount(ele); /* added to skiplist */
3059 }
3060 } else {
3061 redisAssert(0 != 0);
3062 }
3063 return o;
3064 }
3065
3066 static int rdbLoad(char *filename) {
3067 FILE *fp;
3068 robj *keyobj = NULL;
3069 uint32_t dbid;
3070 int type, retval, rdbver;
3071 dict *d = server.db[0].dict;
3072 redisDb *db = server.db+0;
3073 char buf[1024];
3074 time_t expiretime = -1, now = time(NULL);
3075
3076 fp = fopen(filename,"r");
3077 if (!fp) return REDIS_ERR;
3078 if (fread(buf,9,1,fp) == 0) goto eoferr;
3079 buf[9] = '\0';
3080 if (memcmp(buf,"REDIS",5) != 0) {
3081 fclose(fp);
3082 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3083 return REDIS_ERR;
3084 }
3085 rdbver = atoi(buf+5);
3086 if (rdbver != 1) {
3087 fclose(fp);
3088 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3089 return REDIS_ERR;
3090 }
3091 while(1) {
3092 robj *o;
3093
3094 /* Read type. */
3095 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3096 if (type == REDIS_EXPIRETIME) {
3097 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3098 /* We read the time so we need to read the object type again */
3099 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3100 }
3101 if (type == REDIS_EOF) break;
3102 /* Handle SELECT DB opcode as a special case */
3103 if (type == REDIS_SELECTDB) {
3104 if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
3105 goto eoferr;
3106 if (dbid >= (unsigned)server.dbnum) {
3107 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3108 exit(1);
3109 }
3110 db = server.db+dbid;
3111 d = db->dict;
3112 continue;
3113 }
3114 /* Read key */
3115 if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
3116 /* Read value */
3117 if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
3118 /* Add the new object in the hash table */
3119 retval = dictAdd(d,keyobj,o);
3120 if (retval == DICT_ERR) {
3121 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3122 exit(1);
3123 }
3124 /* Set the expire time if needed */
3125 if (expiretime != -1) {
3126 setExpire(db,keyobj,expiretime);
3127 /* Delete this key if already expired */
3128 if (expiretime < now) deleteKey(db,keyobj);
3129 expiretime = -1;
3130 }
3131 keyobj = o = NULL;
3132 }
3133 fclose(fp);
3134 return REDIS_OK;
3135
3136 eoferr: /* unexpected end of file is handled here with a fatal exit */
3137 if (keyobj) decrRefCount(keyobj);
3138 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3139 exit(1);
3140 return REDIS_ERR; /* Just to avoid warning */
3141 }
3142
3143 /*================================== Commands =============================== */
3144
3145 static void authCommand(redisClient *c) {
3146 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3147 c->authenticated = 1;
3148 addReply(c,shared.ok);
3149 } else {
3150 c->authenticated = 0;
3151 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3152 }
3153 }
3154
3155 static void pingCommand(redisClient *c) {
3156 addReply(c,shared.pong);
3157 }
3158
3159 static void echoCommand(redisClient *c) {
3160 addReplyBulkLen(c,c->argv[1]);
3161 addReply(c,c->argv[1]);
3162 addReply(c,shared.crlf);
3163 }
3164
3165 /*=================================== Strings =============================== */
3166
3167 static void setGenericCommand(redisClient *c, int nx) {
3168 int retval;
3169
3170 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3171 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3172 if (retval == DICT_ERR) {
3173 if (!nx) {
3174 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3175 incrRefCount(c->argv[2]);
3176 } else {
3177 addReply(c,shared.czero);
3178 return;
3179 }
3180 } else {
3181 incrRefCount(c->argv[1]);
3182 incrRefCount(c->argv[2]);
3183 }
3184 server.dirty++;
3185 removeExpire(c->db,c->argv[1]);
3186 addReply(c, nx ? shared.cone : shared.ok);
3187 }
3188
3189 static void setCommand(redisClient *c) {
3190 setGenericCommand(c,0);
3191 }
3192
3193 static void setnxCommand(redisClient *c) {
3194 setGenericCommand(c,1);
3195 }
3196
3197 static int getGenericCommand(redisClient *c) {
3198 robj *o = lookupKeyRead(c->db,c->argv[1]);
3199
3200 if (o == NULL) {
3201 addReply(c,shared.nullbulk);
3202 return REDIS_OK;
3203 } else {
3204 if (o->type != REDIS_STRING) {
3205 addReply(c,shared.wrongtypeerr);
3206 return REDIS_ERR;
3207 } else {
3208 addReplyBulkLen(c,o);
3209 addReply(c,o);
3210 addReply(c,shared.crlf);
3211 return REDIS_OK;
3212 }
3213 }
3214 }
3215
3216 static void getCommand(redisClient *c) {
3217 getGenericCommand(c);
3218 }
3219
3220 static void getsetCommand(redisClient *c) {
3221 if (getGenericCommand(c) == REDIS_ERR) return;
3222 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3223 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3224 } else {
3225 incrRefCount(c->argv[1]);
3226 }
3227 incrRefCount(c->argv[2]);
3228 server.dirty++;
3229 removeExpire(c->db,c->argv[1]);
3230 }
3231
3232 static void mgetCommand(redisClient *c) {
3233 int j;
3234
3235 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3236 for (j = 1; j < c->argc; j++) {
3237 robj *o = lookupKeyRead(c->db,c->argv[j]);
3238 if (o == NULL) {
3239 addReply(c,shared.nullbulk);
3240 } else {
3241 if (o->type != REDIS_STRING) {
3242 addReply(c,shared.nullbulk);
3243 } else {
3244 addReplyBulkLen(c,o);
3245 addReply(c,o);
3246 addReply(c,shared.crlf);
3247 }
3248 }
3249 }
3250 }
3251
3252 static void msetGenericCommand(redisClient *c, int nx) {
3253 int j, busykeys = 0;
3254
3255 if ((c->argc % 2) == 0) {
3256 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3257 return;
3258 }
3259 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3260 * set nothing at all if at least one already key exists. */
3261 if (nx) {
3262 for (j = 1; j < c->argc; j += 2) {
3263 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3264 busykeys++;
3265 }
3266 }
3267 }
3268 if (busykeys) {
3269 addReply(c, shared.czero);
3270 return;
3271 }
3272
3273 for (j = 1; j < c->argc; j += 2) {
3274 int retval;
3275
3276 tryObjectEncoding(c->argv[j+1]);
3277 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3278 if (retval == DICT_ERR) {
3279 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3280 incrRefCount(c->argv[j+1]);
3281 } else {
3282 incrRefCount(c->argv[j]);
3283 incrRefCount(c->argv[j+1]);
3284 }
3285 removeExpire(c->db,c->argv[j]);
3286 }
3287 server.dirty += (c->argc-1)/2;
3288 addReply(c, nx ? shared.cone : shared.ok);
3289 }
3290
3291 static void msetCommand(redisClient *c) {
3292 msetGenericCommand(c,0);
3293 }
3294
3295 static void msetnxCommand(redisClient *c) {
3296 msetGenericCommand(c,1);
3297 }
3298
3299 static void incrDecrCommand(redisClient *c, long long incr) {
3300 long long value;
3301 int retval;
3302 robj *o;
3303
3304 o = lookupKeyWrite(c->db,c->argv[1]);
3305 if (o == NULL) {
3306 value = 0;
3307 } else {
3308 if (o->type != REDIS_STRING) {
3309 value = 0;
3310 } else {
3311 char *eptr;
3312
3313 if (o->encoding == REDIS_ENCODING_RAW)
3314 value = strtoll(o->ptr, &eptr, 10);
3315 else if (o->encoding == REDIS_ENCODING_INT)
3316 value = (long)o->ptr;
3317 else
3318 redisAssert(1 != 1);
3319 }
3320 }
3321
3322 value += incr;
3323 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3324 tryObjectEncoding(o);
3325 retval = dictAdd(c->db->dict,c->argv[1],o);
3326 if (retval == DICT_ERR) {
3327 dictReplace(c->db->dict,c->argv[1],o);
3328 removeExpire(c->db,c->argv[1]);
3329 } else {
3330 incrRefCount(c->argv[1]);
3331 }
3332 server.dirty++;
3333 addReply(c,shared.colon);
3334 addReply(c,o);
3335 addReply(c,shared.crlf);
3336 }
3337
3338 static void incrCommand(redisClient *c) {
3339 incrDecrCommand(c,1);
3340 }
3341
3342 static void decrCommand(redisClient *c) {
3343 incrDecrCommand(c,-1);
3344 }
3345
3346 static void incrbyCommand(redisClient *c) {
3347 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3348 incrDecrCommand(c,incr);
3349 }
3350
3351 static void decrbyCommand(redisClient *c) {
3352 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3353 incrDecrCommand(c,-incr);
3354 }
3355
3356 /* ========================= Type agnostic commands ========================= */
3357
3358 static void delCommand(redisClient *c) {
3359 int deleted = 0, j;
3360
3361 for (j = 1; j < c->argc; j++) {
3362 if (deleteKey(c->db,c->argv[j])) {
3363 server.dirty++;
3364 deleted++;
3365 }
3366 }
3367 switch(deleted) {
3368 case 0:
3369 addReply(c,shared.czero);
3370 break;
3371 case 1:
3372 addReply(c,shared.cone);
3373 break;
3374 default:
3375 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3376 break;
3377 }
3378 }
3379
3380 static void existsCommand(redisClient *c) {
3381 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3382 }
3383
3384 static void selectCommand(redisClient *c) {
3385 int id = atoi(c->argv[1]->ptr);
3386
3387 if (selectDb(c,id) == REDIS_ERR) {
3388 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3389 } else {
3390 addReply(c,shared.ok);
3391 }
3392 }
3393
3394 static void randomkeyCommand(redisClient *c) {
3395 dictEntry *de;
3396
3397 while(1) {
3398 de = dictGetRandomKey(c->db->dict);
3399 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3400 }
3401 if (de == NULL) {
3402 addReply(c,shared.plus);
3403 addReply(c,shared.crlf);
3404 } else {
3405 addReply(c,shared.plus);
3406 addReply(c,dictGetEntryKey(de));
3407 addReply(c,shared.crlf);
3408 }
3409 }
3410
3411 static void keysCommand(redisClient *c) {
3412 dictIterator *di;
3413 dictEntry *de;
3414 sds pattern = c->argv[1]->ptr;
3415 int plen = sdslen(pattern);
3416 unsigned long numkeys = 0, keyslen = 0;
3417 robj *lenobj = createObject(REDIS_STRING,NULL);
3418
3419 di = dictGetIterator(c->db->dict);
3420 addReply(c,lenobj);
3421 decrRefCount(lenobj);
3422 while((de = dictNext(di)) != NULL) {
3423 robj *keyobj = dictGetEntryKey(de);
3424
3425 sds key = keyobj->ptr;
3426 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3427 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3428 if (expireIfNeeded(c->db,keyobj) == 0) {
3429 if (numkeys != 0)
3430 addReply(c,shared.space);
3431 addReply(c,keyobj);
3432 numkeys++;
3433 keyslen += sdslen(key);
3434 }
3435 }
3436 }
3437 dictReleaseIterator(di);
3438 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3439 addReply(c,shared.crlf);
3440 }
3441
3442 static void dbsizeCommand(redisClient *c) {
3443 addReplySds(c,
3444 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3445 }
3446
3447 static void lastsaveCommand(redisClient *c) {
3448 addReplySds(c,
3449 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3450 }
3451
3452 static void typeCommand(redisClient *c) {
3453 robj *o;
3454 char *type;
3455
3456 o = lookupKeyRead(c->db,c->argv[1]);
3457 if (o == NULL) {
3458 type = "+none";
3459 } else {
3460 switch(o->type) {
3461 case REDIS_STRING: type = "+string"; break;
3462 case REDIS_LIST: type = "+list"; break;
3463 case REDIS_SET: type = "+set"; break;
3464 case REDIS_ZSET: type = "+zset"; break;
3465 default: type = "unknown"; break;
3466 }
3467 }
3468 addReplySds(c,sdsnew(type));
3469 addReply(c,shared.crlf);
3470 }
3471
3472 static void saveCommand(redisClient *c) {
3473 if (server.bgsavechildpid != -1) {
3474 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3475 return;
3476 }
3477 if (rdbSave(server.dbfilename) == REDIS_OK) {
3478 addReply(c,shared.ok);
3479 } else {
3480 addReply(c,shared.err);
3481 }
3482 }
3483
3484 static void bgsaveCommand(redisClient *c) {
3485 if (server.bgsavechildpid != -1) {
3486 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3487 return;
3488 }
3489 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3490 char *status = "+Background saving started\r\n";
3491 addReplySds(c,sdsnew(status));
3492 } else {
3493 addReply(c,shared.err);
3494 }
3495 }
3496
3497 static void shutdownCommand(redisClient *c) {
3498 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3499 /* Kill the saving child if there is a background saving in progress.
3500 We want to avoid race conditions, for instance our saving child may
3501 overwrite the synchronous saving did by SHUTDOWN. */
3502 if (server.bgsavechildpid != -1) {
3503 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3504 kill(server.bgsavechildpid,SIGKILL);
3505 rdbRemoveTempFile(server.bgsavechildpid);
3506 }
3507 if (server.appendonly) {
3508 /* Append only file: fsync() the AOF and exit */
3509 fsync(server.appendfd);
3510 exit(0);
3511 } else {
3512 /* Snapshotting. Perform a SYNC SAVE and exit */
3513 if (rdbSave(server.dbfilename) == REDIS_OK) {
3514 if (server.daemonize)
3515 unlink(server.pidfile);
3516 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3517 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3518 exit(0);
3519 } else {
3520 /* Ooops.. error saving! The best we can do is to continue operating.
3521 * Note that if there was a background saving process, in the next
3522 * cron() Redis will be notified that the background saving aborted,
3523 * handling special stuff like slaves pending for synchronization... */
3524 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3525 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3526 }
3527 }
3528 }
3529
3530 static void renameGenericCommand(redisClient *c, int nx) {
3531 robj *o;
3532
3533 /* To use the same key as src and dst is probably an error */
3534 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3535 addReply(c,shared.sameobjecterr);
3536 return;
3537 }
3538
3539 o = lookupKeyWrite(c->db,c->argv[1]);
3540 if (o == NULL) {
3541 addReply(c,shared.nokeyerr);
3542 return;
3543 }
3544 incrRefCount(o);
3545 deleteIfVolatile(c->db,c->argv[2]);
3546 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3547 if (nx) {
3548 decrRefCount(o);
3549 addReply(c,shared.czero);
3550 return;
3551 }
3552 dictReplace(c->db->dict,c->argv[2],o);
3553 } else {
3554 incrRefCount(c->argv[2]);
3555 }
3556 deleteKey(c->db,c->argv[1]);
3557 server.dirty++;
3558 addReply(c,nx ? shared.cone : shared.ok);
3559 }
3560
3561 static void renameCommand(redisClient *c) {
3562 renameGenericCommand(c,0);
3563 }
3564
3565 static void renamenxCommand(redisClient *c) {
3566 renameGenericCommand(c,1);
3567 }
3568
3569 static void moveCommand(redisClient *c) {
3570 robj *o;
3571 redisDb *src, *dst;
3572 int srcid;
3573
3574 /* Obtain source and target DB pointers */
3575 src = c->db;
3576 srcid = c->db->id;
3577 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3578 addReply(c,shared.outofrangeerr);
3579 return;
3580 }
3581 dst = c->db;
3582 selectDb(c,srcid); /* Back to the source DB */
3583
3584 /* If the user is moving using as target the same
3585 * DB as the source DB it is probably an error. */
3586 if (src == dst) {
3587 addReply(c,shared.sameobjecterr);
3588 return;
3589 }
3590
3591 /* Check if the element exists and get a reference */
3592 o = lookupKeyWrite(c->db,c->argv[1]);
3593 if (!o) {
3594 addReply(c,shared.czero);
3595 return;
3596 }
3597
3598 /* Try to add the element to the target DB */
3599 deleteIfVolatile(dst,c->argv[1]);
3600 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3601 addReply(c,shared.czero);
3602 return;
3603 }
3604 incrRefCount(c->argv[1]);
3605 incrRefCount(o);
3606
3607 /* OK! key moved, free the entry in the source DB */
3608 deleteKey(src,c->argv[1]);
3609 server.dirty++;
3610 addReply(c,shared.cone);
3611 }
3612
3613 /* =================================== Lists ================================ */
3614 static void pushGenericCommand(redisClient *c, int where) {
3615 robj *lobj;
3616 list *list;
3617
3618 lobj = lookupKeyWrite(c->db,c->argv[1]);
3619 if (lobj == NULL) {
3620 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3621 addReply(c,shared.ok);
3622 return;
3623 }
3624 lobj = createListObject();
3625 list = lobj->ptr;
3626 if (where == REDIS_HEAD) {
3627 listAddNodeHead(list,c->argv[2]);
3628 } else {
3629 listAddNodeTail(list,c->argv[2]);
3630 }
3631 dictAdd(c->db->dict,c->argv[1],lobj);
3632 incrRefCount(c->argv[1]);
3633 incrRefCount(c->argv[2]);
3634 } else {
3635 if (lobj->type != REDIS_LIST) {
3636 addReply(c,shared.wrongtypeerr);
3637 return;
3638 }
3639 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3640 addReply(c,shared.ok);
3641 return;
3642 }
3643 list = lobj->ptr;
3644 if (where == REDIS_HEAD) {
3645 listAddNodeHead(list,c->argv[2]);
3646 } else {
3647 listAddNodeTail(list,c->argv[2]);
3648 }
3649 incrRefCount(c->argv[2]);
3650 }
3651 server.dirty++;
3652 addReply(c,shared.ok);
3653 }
3654
3655 static void lpushCommand(redisClient *c) {
3656 pushGenericCommand(c,REDIS_HEAD);
3657 }
3658
3659 static void rpushCommand(redisClient *c) {
3660 pushGenericCommand(c,REDIS_TAIL);
3661 }
3662
3663 static void llenCommand(redisClient *c) {
3664 robj *o;
3665 list *l;
3666
3667 o = lookupKeyRead(c->db,c->argv[1]);
3668 if (o == NULL) {
3669 addReply(c,shared.czero);
3670 return;
3671 } else {
3672 if (o->type != REDIS_LIST) {
3673 addReply(c,shared.wrongtypeerr);
3674 } else {
3675 l = o->ptr;
3676 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3677 }
3678 }
3679 }
3680
3681 static void lindexCommand(redisClient *c) {
3682 robj *o;
3683 int index = atoi(c->argv[2]->ptr);
3684
3685 o = lookupKeyRead(c->db,c->argv[1]);
3686 if (o == NULL) {
3687 addReply(c,shared.nullbulk);
3688 } else {
3689 if (o->type != REDIS_LIST) {
3690 addReply(c,shared.wrongtypeerr);
3691 } else {
3692 list *list = o->ptr;
3693 listNode *ln;
3694
3695 ln = listIndex(list, index);
3696 if (ln == NULL) {
3697 addReply(c,shared.nullbulk);
3698 } else {
3699 robj *ele = listNodeValue(ln);
3700 addReplyBulkLen(c,ele);
3701 addReply(c,ele);
3702 addReply(c,shared.crlf);
3703 }
3704 }
3705 }
3706 }
3707
3708 static void lsetCommand(redisClient *c) {
3709 robj *o;
3710 int index = atoi(c->argv[2]->ptr);
3711
3712 o = lookupKeyWrite(c->db,c->argv[1]);
3713 if (o == NULL) {
3714 addReply(c,shared.nokeyerr);
3715 } else {
3716 if (o->type != REDIS_LIST) {
3717 addReply(c,shared.wrongtypeerr);
3718 } else {
3719 list *list = o->ptr;
3720 listNode *ln;
3721
3722 ln = listIndex(list, index);
3723 if (ln == NULL) {
3724 addReply(c,shared.outofrangeerr);
3725 } else {
3726 robj *ele = listNodeValue(ln);
3727
3728 decrRefCount(ele);
3729 listNodeValue(ln) = c->argv[3];
3730 incrRefCount(c->argv[3]);
3731 addReply(c,shared.ok);
3732 server.dirty++;
3733 }
3734 }
3735 }
3736 }
3737
3738 static void popGenericCommand(redisClient *c, int where) {
3739 robj *o;
3740
3741 o = lookupKeyWrite(c->db,c->argv[1]);
3742 if (o == NULL) {
3743 addReply(c,shared.nullbulk);
3744 } else {
3745 if (o->type != REDIS_LIST) {
3746 addReply(c,shared.wrongtypeerr);
3747 } else {
3748 list *list = o->ptr;
3749 listNode *ln;
3750
3751 if (where == REDIS_HEAD)
3752 ln = listFirst(list);
3753 else
3754 ln = listLast(list);
3755
3756 if (ln == NULL) {
3757 addReply(c,shared.nullbulk);
3758 } else {
3759 robj *ele = listNodeValue(ln);
3760 addReplyBulkLen(c,ele);
3761 addReply(c,ele);
3762 addReply(c,shared.crlf);
3763 listDelNode(list,ln);
3764 server.dirty++;
3765 }
3766 }
3767 }
3768 }
3769
3770 static void lpopCommand(redisClient *c) {
3771 popGenericCommand(c,REDIS_HEAD);
3772 }
3773
3774 static void rpopCommand(redisClient *c) {
3775 popGenericCommand(c,REDIS_TAIL);
3776 }
3777
3778 static void lrangeCommand(redisClient *c) {
3779 robj *o;
3780 int start = atoi(c->argv[2]->ptr);
3781 int end = atoi(c->argv[3]->ptr);
3782
3783 o = lookupKeyRead(c->db,c->argv[1]);
3784 if (o == NULL) {
3785 addReply(c,shared.nullmultibulk);
3786 } else {
3787 if (o->type != REDIS_LIST) {
3788 addReply(c,shared.wrongtypeerr);
3789 } else {
3790 list *list = o->ptr;
3791 listNode *ln;
3792 int llen = listLength(list);
3793 int rangelen, j;
3794 robj *ele;
3795
3796 /* convert negative indexes */
3797 if (start < 0) start = llen+start;
3798 if (end < 0) end = llen+end;
3799 if (start < 0) start = 0;
3800 if (end < 0) end = 0;
3801
3802 /* indexes sanity checks */
3803 if (start > end || start >= llen) {
3804 /* Out of range start or start > end result in empty list */
3805 addReply(c,shared.emptymultibulk);
3806 return;
3807 }
3808 if (end >= llen) end = llen-1;
3809 rangelen = (end-start)+1;
3810
3811 /* Return the result in form of a multi-bulk reply */
3812 ln = listIndex(list, start);
3813 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3814 for (j = 0; j < rangelen; j++) {
3815 ele = listNodeValue(ln);
3816 addReplyBulkLen(c,ele);
3817 addReply(c,ele);
3818 addReply(c,shared.crlf);
3819 ln = ln->next;
3820 }
3821 }
3822 }
3823 }
3824
3825 static void ltrimCommand(redisClient *c) {
3826 robj *o;
3827 int start = atoi(c->argv[2]->ptr);
3828 int end = atoi(c->argv[3]->ptr);
3829
3830 o = lookupKeyWrite(c->db,c->argv[1]);
3831 if (o == NULL) {
3832 addReply(c,shared.ok);
3833 } else {
3834 if (o->type != REDIS_LIST) {
3835 addReply(c,shared.wrongtypeerr);
3836 } else {
3837 list *list = o->ptr;
3838 listNode *ln;
3839 int llen = listLength(list);
3840 int j, ltrim, rtrim;
3841
3842 /* convert negative indexes */
3843 if (start < 0) start = llen+start;
3844 if (end < 0) end = llen+end;
3845 if (start < 0) start = 0;
3846 if (end < 0) end = 0;
3847
3848 /* indexes sanity checks */
3849 if (start > end || start >= llen) {
3850 /* Out of range start or start > end result in empty list */
3851 ltrim = llen;
3852 rtrim = 0;
3853 } else {
3854 if (end >= llen) end = llen-1;
3855 ltrim = start;
3856 rtrim = llen-end-1;
3857 }
3858
3859 /* Remove list elements to perform the trim */
3860 for (j = 0; j < ltrim; j++) {
3861 ln = listFirst(list);
3862 listDelNode(list,ln);
3863 }
3864 for (j = 0; j < rtrim; j++) {
3865 ln = listLast(list);
3866 listDelNode(list,ln);
3867 }
3868 server.dirty++;
3869 addReply(c,shared.ok);
3870 }
3871 }
3872 }
3873
3874 static void lremCommand(redisClient *c) {
3875 robj *o;
3876
3877 o = lookupKeyWrite(c->db,c->argv[1]);
3878 if (o == NULL) {
3879 addReply(c,shared.czero);
3880 } else {
3881 if (o->type != REDIS_LIST) {
3882 addReply(c,shared.wrongtypeerr);
3883 } else {
3884 list *list = o->ptr;
3885 listNode *ln, *next;
3886 int toremove = atoi(c->argv[2]->ptr);
3887 int removed = 0;
3888 int fromtail = 0;
3889
3890 if (toremove < 0) {
3891 toremove = -toremove;
3892 fromtail = 1;
3893 }
3894 ln = fromtail ? list->tail : list->head;
3895 while (ln) {
3896 robj *ele = listNodeValue(ln);
3897
3898 next = fromtail ? ln->prev : ln->next;
3899 if (compareStringObjects(ele,c->argv[3]) == 0) {
3900 listDelNode(list,ln);
3901 server.dirty++;
3902 removed++;
3903 if (toremove && removed == toremove) break;
3904 }
3905 ln = next;
3906 }
3907 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3908 }
3909 }
3910 }
3911
3912 /* This is the semantic of this command:
3913 * RPOPLPUSH srclist dstlist:
3914 * IF LLEN(srclist) > 0
3915 * element = RPOP srclist
3916 * LPUSH dstlist element
3917 * RETURN element
3918 * ELSE
3919 * RETURN nil
3920 * END
3921 * END
3922 *
3923 * The idea is to be able to get an element from a list in a reliable way
3924 * since the element is not just returned but pushed against another list
3925 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3926 */
3927 static void rpoplpushcommand(redisClient *c) {
3928 robj *sobj;
3929
3930 sobj = lookupKeyWrite(c->db,c->argv[1]);
3931 if (sobj == NULL) {
3932 addReply(c,shared.nullbulk);
3933 } else {
3934 if (sobj->type != REDIS_LIST) {
3935 addReply(c,shared.wrongtypeerr);
3936 } else {
3937 list *srclist = sobj->ptr;
3938 listNode *ln = listLast(srclist);
3939
3940 if (ln == NULL) {
3941 addReply(c,shared.nullbulk);
3942 } else {
3943 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3944 robj *ele = listNodeValue(ln);
3945 list *dstlist;
3946
3947 if (dobj && dobj->type != REDIS_LIST) {
3948 addReply(c,shared.wrongtypeerr);
3949 return;
3950 }
3951
3952 /* Add the element to the target list (unless it's directly
3953 * passed to some BLPOP-ing client */
3954 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3955 if (dobj == NULL) {
3956 /* Create the list if the key does not exist */
3957 dobj = createListObject();
3958 dictAdd(c->db->dict,c->argv[2],dobj);
3959 incrRefCount(c->argv[2]);
3960 }
3961 dstlist = dobj->ptr;
3962 listAddNodeHead(dstlist,ele);
3963 incrRefCount(ele);
3964 }
3965
3966 /* Send the element to the client as reply as well */
3967 addReplyBulkLen(c,ele);
3968 addReply(c,ele);
3969 addReply(c,shared.crlf);
3970
3971 /* Finally remove the element from the source list */
3972 listDelNode(srclist,ln);
3973 server.dirty++;
3974 }
3975 }
3976 }
3977 }
3978
3979
3980 /* ==================================== Sets ================================ */
3981
3982 static void saddCommand(redisClient *c) {
3983 robj *set;
3984
3985 set = lookupKeyWrite(c->db,c->argv[1]);
3986 if (set == NULL) {
3987 set = createSetObject();
3988 dictAdd(c->db->dict,c->argv[1],set);
3989 incrRefCount(c->argv[1]);
3990 } else {
3991 if (set->type != REDIS_SET) {
3992 addReply(c,shared.wrongtypeerr);
3993 return;
3994 }
3995 }
3996 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3997 incrRefCount(c->argv[2]);
3998 server.dirty++;
3999 addReply(c,shared.cone);
4000 } else {
4001 addReply(c,shared.czero);
4002 }
4003 }
4004
4005 static void sremCommand(redisClient *c) {
4006 robj *set;
4007
4008 set = lookupKeyWrite(c->db,c->argv[1]);
4009 if (set == NULL) {
4010 addReply(c,shared.czero);
4011 } else {
4012 if (set->type != REDIS_SET) {
4013 addReply(c,shared.wrongtypeerr);
4014 return;
4015 }
4016 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4017 server.dirty++;
4018 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4019 addReply(c,shared.cone);
4020 } else {
4021 addReply(c,shared.czero);
4022 }
4023 }
4024 }
4025
4026 static void smoveCommand(redisClient *c) {
4027 robj *srcset, *dstset;
4028
4029 srcset = lookupKeyWrite(c->db,c->argv[1]);
4030 dstset = lookupKeyWrite(c->db,c->argv[2]);
4031
4032 /* If the source key does not exist return 0, if it's of the wrong type
4033 * raise an error */
4034 if (srcset == NULL || srcset->type != REDIS_SET) {
4035 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4036 return;
4037 }
4038 /* Error if the destination key is not a set as well */
4039 if (dstset && dstset->type != REDIS_SET) {
4040 addReply(c,shared.wrongtypeerr);
4041 return;
4042 }
4043 /* Remove the element from the source set */
4044 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4045 /* Key not found in the src set! return zero */
4046 addReply(c,shared.czero);
4047 return;
4048 }
4049 server.dirty++;
4050 /* Add the element to the destination set */
4051 if (!dstset) {
4052 dstset = createSetObject();
4053 dictAdd(c->db->dict,c->argv[2],dstset);
4054 incrRefCount(c->argv[2]);
4055 }
4056 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4057 incrRefCount(c->argv[3]);
4058 addReply(c,shared.cone);
4059 }
4060
4061 static void sismemberCommand(redisClient *c) {
4062 robj *set;
4063
4064 set = lookupKeyRead(c->db,c->argv[1]);
4065 if (set == NULL) {
4066 addReply(c,shared.czero);
4067 } else {
4068 if (set->type != REDIS_SET) {
4069 addReply(c,shared.wrongtypeerr);
4070 return;
4071 }
4072 if (dictFind(set->ptr,c->argv[2]))
4073 addReply(c,shared.cone);
4074 else
4075 addReply(c,shared.czero);
4076 }
4077 }
4078
4079 static void scardCommand(redisClient *c) {
4080 robj *o;
4081 dict *s;
4082
4083 o = lookupKeyRead(c->db,c->argv[1]);
4084 if (o == NULL) {
4085 addReply(c,shared.czero);
4086 return;
4087 } else {
4088 if (o->type != REDIS_SET) {
4089 addReply(c,shared.wrongtypeerr);
4090 } else {
4091 s = o->ptr;
4092 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4093 dictSize(s)));
4094 }
4095 }
4096 }
4097
4098 static void spopCommand(redisClient *c) {
4099 robj *set;
4100 dictEntry *de;
4101
4102 set = lookupKeyWrite(c->db,c->argv[1]);
4103 if (set == NULL) {
4104 addReply(c,shared.nullbulk);
4105 } else {
4106 if (set->type != REDIS_SET) {
4107 addReply(c,shared.wrongtypeerr);
4108 return;
4109 }
4110 de = dictGetRandomKey(set->ptr);
4111 if (de == NULL) {
4112 addReply(c,shared.nullbulk);
4113 } else {
4114 robj *ele = dictGetEntryKey(de);
4115
4116 addReplyBulkLen(c,ele);
4117 addReply(c,ele);
4118 addReply(c,shared.crlf);
4119 dictDelete(set->ptr,ele);
4120 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4121 server.dirty++;
4122 }
4123 }
4124 }
4125
4126 static void srandmemberCommand(redisClient *c) {
4127 robj *set;
4128 dictEntry *de;
4129
4130 set = lookupKeyRead(c->db,c->argv[1]);
4131 if (set == NULL) {
4132 addReply(c,shared.nullbulk);
4133 } else {
4134 if (set->type != REDIS_SET) {
4135 addReply(c,shared.wrongtypeerr);
4136 return;
4137 }
4138 de = dictGetRandomKey(set->ptr);
4139 if (de == NULL) {
4140 addReply(c,shared.nullbulk);
4141 } else {
4142 robj *ele = dictGetEntryKey(de);
4143
4144 addReplyBulkLen(c,ele);
4145 addReply(c,ele);
4146 addReply(c,shared.crlf);
4147 }
4148 }
4149 }
4150
4151 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4152 dict **d1 = (void*) s1, **d2 = (void*) s2;
4153
4154 return dictSize(*d1)-dictSize(*d2);
4155 }
4156
4157 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4158 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4159 dictIterator *di;
4160 dictEntry *de;
4161 robj *lenobj = NULL, *dstset = NULL;
4162 unsigned long j, cardinality = 0;
4163
4164 for (j = 0; j < setsnum; j++) {
4165 robj *setobj;
4166
4167 setobj = dstkey ?
4168 lookupKeyWrite(c->db,setskeys[j]) :
4169 lookupKeyRead(c->db,setskeys[j]);
4170 if (!setobj) {
4171 zfree(dv);
4172 if (dstkey) {
4173 if (deleteKey(c->db,dstkey))
4174 server.dirty++;
4175 addReply(c,shared.czero);
4176 } else {
4177 addReply(c,shared.nullmultibulk);
4178 }
4179 return;
4180 }
4181 if (setobj->type != REDIS_SET) {
4182 zfree(dv);
4183 addReply(c,shared.wrongtypeerr);
4184 return;
4185 }
4186 dv[j] = setobj->ptr;
4187 }
4188 /* Sort sets from the smallest to largest, this will improve our
4189 * algorithm's performace */
4190 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4191
4192 /* The first thing we should output is the total number of elements...
4193 * since this is a multi-bulk write, but at this stage we don't know
4194 * the intersection set size, so we use a trick, append an empty object
4195 * to the output list and save the pointer to later modify it with the
4196 * right length */
4197 if (!dstkey) {
4198 lenobj = createObject(REDIS_STRING,NULL);
4199 addReply(c,lenobj);
4200 decrRefCount(lenobj);
4201 } else {
4202 /* If we have a target key where to store the resulting set
4203 * create this key with an empty set inside */
4204 dstset = createSetObject();
4205 }
4206
4207 /* Iterate all the elements of the first (smallest) set, and test
4208 * the element against all the other sets, if at least one set does
4209 * not include the element it is discarded */
4210 di = dictGetIterator(dv[0]);
4211
4212 while((de = dictNext(di)) != NULL) {
4213 robj *ele;
4214
4215 for (j = 1; j < setsnum; j++)
4216 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4217 if (j != setsnum)
4218 continue; /* at least one set does not contain the member */
4219 ele = dictGetEntryKey(de);
4220 if (!dstkey) {
4221 addReplyBulkLen(c,ele);
4222 addReply(c,ele);
4223 addReply(c,shared.crlf);
4224 cardinality++;
4225 } else {
4226 dictAdd(dstset->ptr,ele,NULL);
4227 incrRefCount(ele);
4228 }
4229 }
4230 dictReleaseIterator(di);
4231
4232 if (dstkey) {
4233 /* Store the resulting set into the target */
4234 deleteKey(c->db,dstkey);
4235 dictAdd(c->db->dict,dstkey,dstset);
4236 incrRefCount(dstkey);
4237 }
4238
4239 if (!dstkey) {
4240 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4241 } else {
4242 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4243 dictSize((dict*)dstset->ptr)));
4244 server.dirty++;
4245 }
4246 zfree(dv);
4247 }
4248
4249 static void sinterCommand(redisClient *c) {
4250 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4251 }
4252
4253 static void sinterstoreCommand(redisClient *c) {
4254 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4255 }
4256
4257 #define REDIS_OP_UNION 0
4258 #define REDIS_OP_DIFF 1
4259
4260 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4261 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4262 dictIterator *di;
4263 dictEntry *de;
4264 robj *dstset = NULL;
4265 int j, cardinality = 0;
4266
4267 for (j = 0; j < setsnum; j++) {
4268 robj *setobj;
4269
4270 setobj = dstkey ?
4271 lookupKeyWrite(c->db,setskeys[j]) :
4272 lookupKeyRead(c->db,setskeys[j]);
4273 if (!setobj) {
4274 dv[j] = NULL;
4275 continue;
4276 }
4277 if (setobj->type != REDIS_SET) {
4278 zfree(dv);
4279 addReply(c,shared.wrongtypeerr);
4280 return;
4281 }
4282 dv[j] = setobj->ptr;
4283 }
4284
4285 /* We need a temp set object to store our union. If the dstkey
4286 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4287 * this set object will be the resulting object to set into the target key*/
4288 dstset = createSetObject();
4289
4290 /* Iterate all the elements of all the sets, add every element a single
4291 * time to the result set */
4292 for (j = 0; j < setsnum; j++) {
4293 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4294 if (!dv[j]) continue; /* non existing keys are like empty sets */
4295
4296 di = dictGetIterator(dv[j]);
4297
4298 while((de = dictNext(di)) != NULL) {
4299 robj *ele;
4300
4301 /* dictAdd will not add the same element multiple times */
4302 ele = dictGetEntryKey(de);
4303 if (op == REDIS_OP_UNION || j == 0) {
4304 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4305 incrRefCount(ele);
4306 cardinality++;
4307 }
4308 } else if (op == REDIS_OP_DIFF) {
4309 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4310 cardinality--;
4311 }
4312 }
4313 }
4314 dictReleaseIterator(di);
4315
4316 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4317 }
4318
4319 /* Output the content of the resulting set, if not in STORE mode */
4320 if (!dstkey) {
4321 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4322 di = dictGetIterator(dstset->ptr);
4323 while((de = dictNext(di)) != NULL) {
4324 robj *ele;
4325
4326 ele = dictGetEntryKey(de);
4327 addReplyBulkLen(c,ele);
4328 addReply(c,ele);
4329 addReply(c,shared.crlf);
4330 }
4331 dictReleaseIterator(di);
4332 } else {
4333 /* If we have a target key where to store the resulting set
4334 * create this key with the result set inside */
4335 deleteKey(c->db,dstkey);
4336 dictAdd(c->db->dict,dstkey,dstset);
4337 incrRefCount(dstkey);
4338 }
4339
4340 /* Cleanup */
4341 if (!dstkey) {
4342 decrRefCount(dstset);
4343 } else {
4344 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4345 dictSize((dict*)dstset->ptr)));
4346 server.dirty++;
4347 }
4348 zfree(dv);
4349 }
4350
4351 static void sunionCommand(redisClient *c) {
4352 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4353 }
4354
4355 static void sunionstoreCommand(redisClient *c) {
4356 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4357 }
4358
4359 static void sdiffCommand(redisClient *c) {
4360 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4361 }
4362
4363 static void sdiffstoreCommand(redisClient *c) {
4364 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4365 }
4366
4367 /* ==================================== ZSets =============================== */
4368
4369 /* ZSETs are ordered sets using two data structures to hold the same elements
4370 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4371 * data structure.
4372 *
4373 * The elements are added to an hash table mapping Redis objects to scores.
4374 * At the same time the elements are added to a skip list mapping scores
4375 * to Redis objects (so objects are sorted by scores in this "view"). */
4376
4377 /* This skiplist implementation is almost a C translation of the original
4378 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4379 * Alternative to Balanced Trees", modified in three ways:
4380 * a) this implementation allows for repeated values.
4381 * b) the comparison is not just by key (our 'score') but by satellite data.
4382 * c) there is a back pointer, so it's a doubly linked list with the back
4383 * pointers being only at "level 1". This allows to traverse the list
4384 * from tail to head, useful for ZREVRANGE. */
4385
4386 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4387 zskiplistNode *zn = zmalloc(sizeof(*zn));
4388
4389 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4390 zn->score = score;
4391 zn->obj = obj;
4392 return zn;
4393 }
4394
4395 static zskiplist *zslCreate(void) {
4396 int j;
4397 zskiplist *zsl;
4398
4399 zsl = zmalloc(sizeof(*zsl));
4400 zsl->level = 1;
4401 zsl->length = 0;
4402 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4403 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4404 zsl->header->forward[j] = NULL;
4405 zsl->header->backward = NULL;
4406 zsl->tail = NULL;
4407 return zsl;
4408 }
4409
4410 static void zslFreeNode(zskiplistNode *node) {
4411 decrRefCount(node->obj);
4412 zfree(node->forward);
4413 zfree(node);
4414 }
4415
4416 static void zslFree(zskiplist *zsl) {
4417 zskiplistNode *node = zsl->header->forward[0], *next;
4418
4419 zfree(zsl->header->forward);
4420 zfree(zsl->header);
4421 while(node) {
4422 next = node->forward[0];
4423 zslFreeNode(node);
4424 node = next;
4425 }
4426 zfree(zsl);
4427 }
4428
4429 static int zslRandomLevel(void) {
4430 int level = 1;
4431 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4432 level += 1;
4433 return level;
4434 }
4435
4436 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4437 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4438 int i, level;
4439
4440 x = zsl->header;
4441 for (i = zsl->level-1; i >= 0; i--) {
4442 while (x->forward[i] &&
4443 (x->forward[i]->score < score ||
4444 (x->forward[i]->score == score &&
4445 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4446 x = x->forward[i];
4447 update[i] = x;
4448 }
4449 /* we assume the key is not already inside, since we allow duplicated
4450 * scores, and the re-insertion of score and redis object should never
4451 * happpen since the caller of zslInsert() should test in the hash table
4452 * if the element is already inside or not. */
4453 level = zslRandomLevel();
4454 if (level > zsl->level) {
4455 for (i = zsl->level; i < level; i++)
4456 update[i] = zsl->header;
4457 zsl->level = level;
4458 }
4459 x = zslCreateNode(level,score,obj);
4460 for (i = 0; i < level; i++) {
4461 x->forward[i] = update[i]->forward[i];
4462 update[i]->forward[i] = x;
4463 }
4464 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4465 if (x->forward[0])
4466 x->forward[0]->backward = x;
4467 else
4468 zsl->tail = x;
4469 zsl->length++;
4470 }
4471
4472 /* Delete an element with matching score/object from the skiplist. */
4473 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4474 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4475 int i;
4476
4477 x = zsl->header;
4478 for (i = zsl->level-1; i >= 0; i--) {
4479 while (x->forward[i] &&
4480 (x->forward[i]->score < score ||
4481 (x->forward[i]->score == score &&
4482 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4483 x = x->forward[i];
4484 update[i] = x;
4485 }
4486 /* We may have multiple elements with the same score, what we need
4487 * is to find the element with both the right score and object. */
4488 x = x->forward[0];
4489 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4490 for (i = 0; i < zsl->level; i++) {
4491 if (update[i]->forward[i] != x) break;
4492 update[i]->forward[i] = x->forward[i];
4493 }
4494 if (x->forward[0]) {
4495 x->forward[0]->backward = (x->backward == zsl->header) ?
4496 NULL : x->backward;
4497 } else {
4498 zsl->tail = x->backward;
4499 }
4500 zslFreeNode(x);
4501 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4502 zsl->level--;
4503 zsl->length--;
4504 return 1;
4505 } else {
4506 return 0; /* not found */
4507 }
4508 return 0; /* not found */
4509 }
4510
4511 /* Delete all the elements with score between min and max from the skiplist.
4512 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4513 * Note that this function takes the reference to the hash table view of the
4514 * sorted set, in order to remove the elements from the hash table too. */
4515 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4516 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4517 unsigned long removed = 0;
4518 int i;
4519
4520 x = zsl->header;
4521 for (i = zsl->level-1; i >= 0; i--) {
4522 while (x->forward[i] && x->forward[i]->score < min)
4523 x = x->forward[i];
4524 update[i] = x;
4525 }
4526 /* We may have multiple elements with the same score, what we need
4527 * is to find the element with both the right score and object. */
4528 x = x->forward[0];
4529 while (x && x->score <= max) {
4530 zskiplistNode *next;
4531
4532 for (i = 0; i < zsl->level; i++) {
4533 if (update[i]->forward[i] != x) break;
4534 update[i]->forward[i] = x->forward[i];
4535 }
4536 if (x->forward[0]) {
4537 x->forward[0]->backward = (x->backward == zsl->header) ?
4538 NULL : x->backward;
4539 } else {
4540 zsl->tail = x->backward;
4541 }
4542 next = x->forward[0];
4543 dictDelete(dict,x->obj);
4544 zslFreeNode(x);
4545 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4546 zsl->level--;
4547 zsl->length--;
4548 removed++;
4549 x = next;
4550 }
4551 return removed; /* not found */
4552 }
4553
4554 /* Find the first node having a score equal or greater than the specified one.
4555 * Returns NULL if there is no match. */
4556 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4557 zskiplistNode *x;
4558 int i;
4559
4560 x = zsl->header;
4561 for (i = zsl->level-1; i >= 0; i--) {
4562 while (x->forward[i] && x->forward[i]->score < score)
4563 x = x->forward[i];
4564 }
4565 /* We may have multiple elements with the same score, what we need
4566 * is to find the element with both the right score and object. */
4567 return x->forward[0];
4568 }
4569
4570 /* The actual Z-commands implementations */
4571
4572 /* This generic command implements both ZADD and ZINCRBY.
4573 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4574 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4575 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4576 robj *zsetobj;
4577 zset *zs;
4578 double *score;
4579
4580 zsetobj = lookupKeyWrite(c->db,key);
4581 if (zsetobj == NULL) {
4582 zsetobj = createZsetObject();
4583 dictAdd(c->db->dict,key,zsetobj);
4584 incrRefCount(key);
4585 } else {
4586 if (zsetobj->type != REDIS_ZSET) {
4587 addReply(c,shared.wrongtypeerr);
4588 return;
4589 }
4590 }
4591 zs = zsetobj->ptr;
4592
4593 /* Ok now since we implement both ZADD and ZINCRBY here the code
4594 * needs to handle the two different conditions. It's all about setting
4595 * '*score', that is, the new score to set, to the right value. */
4596 score = zmalloc(sizeof(double));
4597 if (doincrement) {
4598 dictEntry *de;
4599
4600 /* Read the old score. If the element was not present starts from 0 */
4601 de = dictFind(zs->dict,ele);
4602 if (de) {
4603 double *oldscore = dictGetEntryVal(de);
4604 *score = *oldscore + scoreval;
4605 } else {
4606 *score = scoreval;
4607 }
4608 } else {
4609 *score = scoreval;
4610 }
4611
4612 /* What follows is a simple remove and re-insert operation that is common
4613 * to both ZADD and ZINCRBY... */
4614 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4615 /* case 1: New element */
4616 incrRefCount(ele); /* added to hash */
4617 zslInsert(zs->zsl,*score,ele);
4618 incrRefCount(ele); /* added to skiplist */
4619 server.dirty++;
4620 if (doincrement)
4621 addReplyDouble(c,*score);
4622 else
4623 addReply(c,shared.cone);
4624 } else {
4625 dictEntry *de;
4626 double *oldscore;
4627
4628 /* case 2: Score update operation */
4629 de = dictFind(zs->dict,ele);
4630 redisAssert(de != NULL);
4631 oldscore = dictGetEntryVal(de);
4632 if (*score != *oldscore) {
4633 int deleted;
4634
4635 /* Remove and insert the element in the skip list with new score */
4636 deleted = zslDelete(zs->zsl,*oldscore,ele);
4637 redisAssert(deleted != 0);
4638 zslInsert(zs->zsl,*score,ele);
4639 incrRefCount(ele);
4640 /* Update the score in the hash table */
4641 dictReplace(zs->dict,ele,score);
4642 server.dirty++;
4643 } else {
4644 zfree(score);
4645 }
4646 if (doincrement)
4647 addReplyDouble(c,*score);
4648 else
4649 addReply(c,shared.czero);
4650 }
4651 }
4652
4653 static void zaddCommand(redisClient *c) {
4654 double scoreval;
4655
4656 scoreval = strtod(c->argv[2]->ptr,NULL);
4657 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4658 }
4659
4660 static void zincrbyCommand(redisClient *c) {
4661 double scoreval;
4662
4663 scoreval = strtod(c->argv[2]->ptr,NULL);
4664 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4665 }
4666
4667 static void zremCommand(redisClient *c) {
4668 robj *zsetobj;
4669 zset *zs;
4670
4671 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4672 if (zsetobj == NULL) {
4673 addReply(c,shared.czero);
4674 } else {
4675 dictEntry *de;
4676 double *oldscore;
4677 int deleted;
4678
4679 if (zsetobj->type != REDIS_ZSET) {
4680 addReply(c,shared.wrongtypeerr);
4681 return;
4682 }
4683 zs = zsetobj->ptr;
4684 de = dictFind(zs->dict,c->argv[2]);
4685 if (de == NULL) {
4686 addReply(c,shared.czero);
4687 return;
4688 }
4689 /* Delete from the skiplist */
4690 oldscore = dictGetEntryVal(de);
4691 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4692 redisAssert(deleted != 0);
4693
4694 /* Delete from the hash table */
4695 dictDelete(zs->dict,c->argv[2]);
4696 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4697 server.dirty++;
4698 addReply(c,shared.cone);
4699 }
4700 }
4701
4702 static void zremrangebyscoreCommand(redisClient *c) {
4703 double min = strtod(c->argv[2]->ptr,NULL);
4704 double max = strtod(c->argv[3]->ptr,NULL);
4705 robj *zsetobj;
4706 zset *zs;
4707
4708 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4709 if (zsetobj == NULL) {
4710 addReply(c,shared.czero);
4711 } else {
4712 long deleted;
4713
4714 if (zsetobj->type != REDIS_ZSET) {
4715 addReply(c,shared.wrongtypeerr);
4716 return;
4717 }
4718 zs = zsetobj->ptr;
4719 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4720 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4721 server.dirty += deleted;
4722 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4723 }
4724 }
4725
4726 static void zrangeGenericCommand(redisClient *c, int reverse) {
4727 robj *o;
4728 int start = atoi(c->argv[2]->ptr);
4729 int end = atoi(c->argv[3]->ptr);
4730 int withscores = 0;
4731
4732 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4733 withscores = 1;
4734 } else if (c->argc >= 5) {
4735 addReply(c,shared.syntaxerr);
4736 return;
4737 }
4738
4739 o = lookupKeyRead(c->db,c->argv[1]);
4740 if (o == NULL) {
4741 addReply(c,shared.nullmultibulk);
4742 } else {
4743 if (o->type != REDIS_ZSET) {
4744 addReply(c,shared.wrongtypeerr);
4745 } else {
4746 zset *zsetobj = o->ptr;
4747 zskiplist *zsl = zsetobj->zsl;
4748 zskiplistNode *ln;
4749
4750 int llen = zsl->length;
4751 int rangelen, j;
4752 robj *ele;
4753
4754 /* convert negative indexes */
4755 if (start < 0) start = llen+start;
4756 if (end < 0) end = llen+end;
4757 if (start < 0) start = 0;
4758 if (end < 0) end = 0;
4759
4760 /* indexes sanity checks */
4761 if (start > end || start >= llen) {
4762 /* Out of range start or start > end result in empty list */
4763 addReply(c,shared.emptymultibulk);
4764 return;
4765 }
4766 if (end >= llen) end = llen-1;
4767 rangelen = (end-start)+1;
4768
4769 /* Return the result in form of a multi-bulk reply */
4770 if (reverse) {
4771 ln = zsl->tail;
4772 while (start--)
4773 ln = ln->backward;
4774 } else {
4775 ln = zsl->header->forward[0];
4776 while (start--)
4777 ln = ln->forward[0];
4778 }
4779
4780 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4781 withscores ? (rangelen*2) : rangelen));
4782 for (j = 0; j < rangelen; j++) {
4783 ele = ln->obj;
4784 addReplyBulkLen(c,ele);
4785 addReply(c,ele);
4786 addReply(c,shared.crlf);
4787 if (withscores)
4788 addReplyDouble(c,ln->score);
4789 ln = reverse ? ln->backward : ln->forward[0];
4790 }
4791 }
4792 }
4793 }
4794
4795 static void zrangeCommand(redisClient *c) {
4796 zrangeGenericCommand(c,0);
4797 }
4798
4799 static void zrevrangeCommand(redisClient *c) {
4800 zrangeGenericCommand(c,1);
4801 }
4802
4803 static void zrangebyscoreCommand(redisClient *c) {
4804 robj *o;
4805 double min = strtod(c->argv[2]->ptr,NULL);
4806 double max = strtod(c->argv[3]->ptr,NULL);
4807 int offset = 0, limit = -1;
4808
4809 if (c->argc != 4 && c->argc != 7) {
4810 addReplySds(c,
4811 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4812 return;
4813 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4814 addReply(c,shared.syntaxerr);
4815 return;
4816 } else if (c->argc == 7) {
4817 offset = atoi(c->argv[5]->ptr);
4818 limit = atoi(c->argv[6]->ptr);
4819 if (offset < 0) offset = 0;
4820 }
4821
4822 o = lookupKeyRead(c->db,c->argv[1]);
4823 if (o == NULL) {
4824 addReply(c,shared.nullmultibulk);
4825 } else {
4826 if (o->type != REDIS_ZSET) {
4827 addReply(c,shared.wrongtypeerr);
4828 } else {
4829 zset *zsetobj = o->ptr;
4830 zskiplist *zsl = zsetobj->zsl;
4831 zskiplistNode *ln;
4832 robj *ele, *lenobj;
4833 unsigned int rangelen = 0;
4834
4835 /* Get the first node with the score >= min */
4836 ln = zslFirstWithScore(zsl,min);
4837 if (ln == NULL) {
4838 /* No element matching the speciifed interval */
4839 addReply(c,shared.emptymultibulk);
4840 return;
4841 }
4842
4843 /* We don't know in advance how many matching elements there
4844 * are in the list, so we push this object that will represent
4845 * the multi-bulk length in the output buffer, and will "fix"
4846 * it later */
4847 lenobj = createObject(REDIS_STRING,NULL);
4848 addReply(c,lenobj);
4849 decrRefCount(lenobj);
4850
4851 while(ln && ln->score <= max) {
4852 if (offset) {
4853 offset--;
4854 ln = ln->forward[0];
4855 continue;
4856 }
4857 if (limit == 0) break;
4858 ele = ln->obj;
4859 addReplyBulkLen(c,ele);
4860 addReply(c,ele);
4861 addReply(c,shared.crlf);
4862 ln = ln->forward[0];
4863 rangelen++;
4864 if (limit > 0) limit--;
4865 }
4866 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4867 }
4868 }
4869 }
4870
4871 static void zcardCommand(redisClient *c) {
4872 robj *o;
4873 zset *zs;
4874
4875 o = lookupKeyRead(c->db,c->argv[1]);
4876 if (o == NULL) {
4877 addReply(c,shared.czero);
4878 return;
4879 } else {
4880 if (o->type != REDIS_ZSET) {
4881 addReply(c,shared.wrongtypeerr);
4882 } else {
4883 zs = o->ptr;
4884 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4885 }
4886 }
4887 }
4888
4889 static void zscoreCommand(redisClient *c) {
4890 robj *o;
4891 zset *zs;
4892
4893 o = lookupKeyRead(c->db,c->argv[1]);
4894 if (o == NULL) {
4895 addReply(c,shared.nullbulk);
4896 return;
4897 } else {
4898 if (o->type != REDIS_ZSET) {
4899 addReply(c,shared.wrongtypeerr);
4900 } else {
4901 dictEntry *de;
4902
4903 zs = o->ptr;
4904 de = dictFind(zs->dict,c->argv[2]);
4905 if (!de) {
4906 addReply(c,shared.nullbulk);
4907 } else {
4908 double *score = dictGetEntryVal(de);
4909
4910 addReplyDouble(c,*score);
4911 }
4912 }
4913 }
4914 }
4915
4916 /* ========================= Non type-specific commands ==================== */
4917
4918 static void flushdbCommand(redisClient *c) {
4919 server.dirty += dictSize(c->db->dict);
4920 dictEmpty(c->db->dict);
4921 dictEmpty(c->db->expires);
4922 addReply(c,shared.ok);
4923 }
4924
4925 static void flushallCommand(redisClient *c) {
4926 server.dirty += emptyDb();
4927 addReply(c,shared.ok);
4928 rdbSave(server.dbfilename);
4929 server.dirty++;
4930 }
4931
4932 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4933 redisSortOperation *so = zmalloc(sizeof(*so));
4934 so->type = type;
4935 so->pattern = pattern;
4936 return so;
4937 }
4938
4939 /* Return the value associated to the key with a name obtained
4940 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4941 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4942 char *p;
4943 sds spat, ssub;
4944 robj keyobj;
4945 int prefixlen, sublen, postfixlen;
4946 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4947 struct {
4948 long len;
4949 long free;
4950 char buf[REDIS_SORTKEY_MAX+1];
4951 } keyname;
4952
4953 /* If the pattern is "#" return the substitution object itself in order
4954 * to implement the "SORT ... GET #" feature. */
4955 spat = pattern->ptr;
4956 if (spat[0] == '#' && spat[1] == '\0') {
4957 return subst;
4958 }
4959
4960 /* The substitution object may be specially encoded. If so we create
4961 * a decoded object on the fly. Otherwise getDecodedObject will just
4962 * increment the ref count, that we'll decrement later. */
4963 subst = getDecodedObject(subst);
4964
4965 ssub = subst->ptr;
4966 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4967 p = strchr(spat,'*');
4968 if (!p) {
4969 decrRefCount(subst);
4970 return NULL;
4971 }
4972
4973 prefixlen = p-spat;
4974 sublen = sdslen(ssub);
4975 postfixlen = sdslen(spat)-(prefixlen+1);
4976 memcpy(keyname.buf,spat,prefixlen);
4977 memcpy(keyname.buf+prefixlen,ssub,sublen);
4978 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4979 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4980 keyname.len = prefixlen+sublen+postfixlen;
4981
4982 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4983 decrRefCount(subst);
4984
4985 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4986 return lookupKeyRead(db,&keyobj);
4987 }
4988
4989 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4990 * the additional parameter is not standard but a BSD-specific we have to
4991 * pass sorting parameters via the global 'server' structure */
4992 static int sortCompare(const void *s1, const void *s2) {
4993 const redisSortObject *so1 = s1, *so2 = s2;
4994 int cmp;
4995
4996 if (!server.sort_alpha) {
4997 /* Numeric sorting. Here it's trivial as we precomputed scores */
4998 if (so1->u.score > so2->u.score) {
4999 cmp = 1;
5000 } else if (so1->u.score < so2->u.score) {
5001 cmp = -1;
5002 } else {
5003 cmp = 0;
5004 }
5005 } else {
5006 /* Alphanumeric sorting */
5007 if (server.sort_bypattern) {
5008 if (!so1->u.cmpobj || !so2->u.cmpobj) {
5009 /* At least one compare object is NULL */
5010 if (so1->u.cmpobj == so2->u.cmpobj)
5011 cmp = 0;
5012 else if (so1->u.cmpobj == NULL)
5013 cmp = -1;
5014 else
5015 cmp = 1;
5016 } else {
5017 /* We have both the objects, use strcoll */
5018 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5019 }
5020 } else {
5021 /* Compare elements directly */
5022 robj *dec1, *dec2;
5023
5024 dec1 = getDecodedObject(so1->obj);
5025 dec2 = getDecodedObject(so2->obj);
5026 cmp = strcoll(dec1->ptr,dec2->ptr);
5027 decrRefCount(dec1);
5028 decrRefCount(dec2);
5029 }
5030 }
5031 return server.sort_desc ? -cmp : cmp;
5032 }
5033
5034 /* The SORT command is the most complex command in Redis. Warning: this code
5035 * is optimized for speed and a bit less for readability */
5036 static void sortCommand(redisClient *c) {
5037 list *operations;
5038 int outputlen = 0;
5039 int desc = 0, alpha = 0;
5040 int limit_start = 0, limit_count = -1, start, end;
5041 int j, dontsort = 0, vectorlen;
5042 int getop = 0; /* GET operation counter */
5043 robj *sortval, *sortby = NULL, *storekey = NULL;
5044 redisSortObject *vector; /* Resulting vector to sort */
5045
5046 /* Lookup the key to sort. It must be of the right types */
5047 sortval = lookupKeyRead(c->db,c->argv[1]);
5048 if (sortval == NULL) {
5049 addReply(c,shared.nullmultibulk);
5050 return;
5051 }
5052 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5053 sortval->type != REDIS_ZSET)
5054 {
5055 addReply(c,shared.wrongtypeerr);
5056 return;
5057 }
5058
5059 /* Create a list of operations to perform for every sorted element.
5060 * Operations can be GET/DEL/INCR/DECR */
5061 operations = listCreate();
5062 listSetFreeMethod(operations,zfree);
5063 j = 2;
5064
5065 /* Now we need to protect sortval incrementing its count, in the future
5066 * SORT may have options able to overwrite/delete keys during the sorting
5067 * and the sorted key itself may get destroied */
5068 incrRefCount(sortval);
5069
5070 /* The SORT command has an SQL-alike syntax, parse it */
5071 while(j < c->argc) {
5072 int leftargs = c->argc-j-1;
5073 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5074 desc = 0;
5075 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5076 desc = 1;
5077 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5078 alpha = 1;
5079 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5080 limit_start = atoi(c->argv[j+1]->ptr);
5081 limit_count = atoi(c->argv[j+2]->ptr);
5082 j+=2;
5083 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5084 storekey = c->argv[j+1];
5085 j++;
5086 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5087 sortby = c->argv[j+1];
5088 /* If the BY pattern does not contain '*', i.e. it is constant,
5089 * we don't need to sort nor to lookup the weight keys. */
5090 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5091 j++;
5092 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5093 listAddNodeTail(operations,createSortOperation(
5094 REDIS_SORT_GET,c->argv[j+1]));
5095 getop++;
5096 j++;
5097 } else {
5098 decrRefCount(sortval);
5099 listRelease(operations);
5100 addReply(c,shared.syntaxerr);
5101 return;
5102 }
5103 j++;
5104 }
5105
5106 /* Load the sorting vector with all the objects to sort */
5107 switch(sortval->type) {
5108 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5109 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5110 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5111 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5112 }
5113 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5114 j = 0;
5115
5116 if (sortval->type == REDIS_LIST) {
5117 list *list = sortval->ptr;
5118 listNode *ln;
5119
5120 listRewind(list);
5121 while((ln = listYield(list))) {
5122 robj *ele = ln->value;
5123 vector[j].obj = ele;
5124 vector[j].u.score = 0;
5125 vector[j].u.cmpobj = NULL;
5126 j++;
5127 }
5128 } else {
5129 dict *set;
5130 dictIterator *di;
5131 dictEntry *setele;
5132
5133 if (sortval->type == REDIS_SET) {
5134 set = sortval->ptr;
5135 } else {
5136 zset *zs = sortval->ptr;
5137 set = zs->dict;
5138 }
5139
5140 di = dictGetIterator(set);
5141 while((setele = dictNext(di)) != NULL) {
5142 vector[j].obj = dictGetEntryKey(setele);
5143 vector[j].u.score = 0;
5144 vector[j].u.cmpobj = NULL;
5145 j++;
5146 }
5147 dictReleaseIterator(di);
5148 }
5149 redisAssert(j == vectorlen);
5150
5151 /* Now it's time to load the right scores in the sorting vector */
5152 if (dontsort == 0) {
5153 for (j = 0; j < vectorlen; j++) {
5154 if (sortby) {
5155 robj *byval;
5156
5157 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5158 if (!byval || byval->type != REDIS_STRING) continue;
5159 if (alpha) {
5160 vector[j].u.cmpobj = getDecodedObject(byval);
5161 } else {
5162 if (byval->encoding == REDIS_ENCODING_RAW) {
5163 vector[j].u.score = strtod(byval->ptr,NULL);
5164 } else {
5165 /* Don't need to decode the object if it's
5166 * integer-encoded (the only encoding supported) so
5167 * far. We can just cast it */
5168 if (byval->encoding == REDIS_ENCODING_INT) {
5169 vector[j].u.score = (long)byval->ptr;
5170 } else
5171 redisAssert(1 != 1);
5172 }
5173 }
5174 } else {
5175 if (!alpha) {
5176 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5177 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5178 else {
5179 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5180 vector[j].u.score = (long) vector[j].obj->ptr;
5181 else
5182 redisAssert(1 != 1);
5183 }
5184 }
5185 }
5186 }
5187 }
5188
5189 /* We are ready to sort the vector... perform a bit of sanity check
5190 * on the LIMIT option too. We'll use a partial version of quicksort. */
5191 start = (limit_start < 0) ? 0 : limit_start;
5192 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5193 if (start >= vectorlen) {
5194 start = vectorlen-1;
5195 end = vectorlen-2;
5196 }
5197 if (end >= vectorlen) end = vectorlen-1;
5198
5199 if (dontsort == 0) {
5200 server.sort_desc = desc;
5201 server.sort_alpha = alpha;
5202 server.sort_bypattern = sortby ? 1 : 0;
5203 if (sortby && (start != 0 || end != vectorlen-1))
5204 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5205 else
5206 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5207 }
5208
5209 /* Send command output to the output buffer, performing the specified
5210 * GET/DEL/INCR/DECR operations if any. */
5211 outputlen = getop ? getop*(end-start+1) : end-start+1;
5212 if (storekey == NULL) {
5213 /* STORE option not specified, sent the sorting result to client */
5214 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5215 for (j = start; j <= end; j++) {
5216 listNode *ln;
5217 if (!getop) {
5218 addReplyBulkLen(c,vector[j].obj);
5219 addReply(c,vector[j].obj);
5220 addReply(c,shared.crlf);
5221 }
5222 listRewind(operations);
5223 while((ln = listYield(operations))) {
5224 redisSortOperation *sop = ln->value;
5225 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5226 vector[j].obj);
5227
5228 if (sop->type == REDIS_SORT_GET) {
5229 if (!val || val->type != REDIS_STRING) {
5230 addReply(c,shared.nullbulk);
5231 } else {
5232 addReplyBulkLen(c,val);
5233 addReply(c,val);
5234 addReply(c,shared.crlf);
5235 }
5236 } else {
5237 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5238 }
5239 }
5240 }
5241 } else {
5242 robj *listObject = createListObject();
5243 list *listPtr = (list*) listObject->ptr;
5244
5245 /* STORE option specified, set the sorting result as a List object */
5246 for (j = start; j <= end; j++) {
5247 listNode *ln;
5248 if (!getop) {
5249 listAddNodeTail(listPtr,vector[j].obj);
5250 incrRefCount(vector[j].obj);
5251 }
5252 listRewind(operations);
5253 while((ln = listYield(operations))) {
5254 redisSortOperation *sop = ln->value;
5255 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5256 vector[j].obj);
5257
5258 if (sop->type == REDIS_SORT_GET) {
5259 if (!val || val->type != REDIS_STRING) {
5260 listAddNodeTail(listPtr,createStringObject("",0));
5261 } else {
5262 listAddNodeTail(listPtr,val);
5263 incrRefCount(val);
5264 }
5265 } else {
5266 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5267 }
5268 }
5269 }
5270 if (dictReplace(c->db->dict,storekey,listObject)) {
5271 incrRefCount(storekey);
5272 }
5273 /* Note: we add 1 because the DB is dirty anyway since even if the
5274 * SORT result is empty a new key is set and maybe the old content
5275 * replaced. */
5276 server.dirty += 1+outputlen;
5277 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5278 }
5279
5280 /* Cleanup */
5281 decrRefCount(sortval);
5282 listRelease(operations);
5283 for (j = 0; j < vectorlen; j++) {
5284 if (sortby && alpha && vector[j].u.cmpobj)
5285 decrRefCount(vector[j].u.cmpobj);
5286 }
5287 zfree(vector);
5288 }
5289
5290 /* Create the string returned by the INFO command. This is decoupled
5291 * by the INFO command itself as we need to report the same information
5292 * on memory corruption problems. */
5293 static sds genRedisInfoString(void) {
5294 sds info;
5295 time_t uptime = time(NULL)-server.stat_starttime;
5296 int j;
5297
5298 info = sdscatprintf(sdsempty(),
5299 "redis_version:%s\r\n"
5300 "arch_bits:%s\r\n"
5301 "multiplexing_api:%s\r\n"
5302 "uptime_in_seconds:%ld\r\n"
5303 "uptime_in_days:%ld\r\n"
5304 "connected_clients:%d\r\n"
5305 "connected_slaves:%d\r\n"
5306 "blocked_clients:%d\r\n"
5307 "used_memory:%zu\r\n"
5308 "changes_since_last_save:%lld\r\n"
5309 "bgsave_in_progress:%d\r\n"
5310 "last_save_time:%ld\r\n"
5311 "bgrewriteaof_in_progress:%d\r\n"
5312 "total_connections_received:%lld\r\n"
5313 "total_commands_processed:%lld\r\n"
5314 "role:%s\r\n"
5315 ,REDIS_VERSION,
5316 (sizeof(long) == 8) ? "64" : "32",
5317 aeGetApiName(),
5318 uptime,
5319 uptime/(3600*24),
5320 listLength(server.clients)-listLength(server.slaves),
5321 listLength(server.slaves),
5322 server.blockedclients,
5323 server.usedmemory,
5324 server.dirty,
5325 server.bgsavechildpid != -1,
5326 server.lastsave,
5327 server.bgrewritechildpid != -1,
5328 server.stat_numconnections,
5329 server.stat_numcommands,
5330 server.masterhost == NULL ? "master" : "slave"
5331 );
5332 if (server.masterhost) {
5333 info = sdscatprintf(info,
5334 "master_host:%s\r\n"
5335 "master_port:%d\r\n"
5336 "master_link_status:%s\r\n"
5337 "master_last_io_seconds_ago:%d\r\n"
5338 ,server.masterhost,
5339 server.masterport,
5340 (server.replstate == REDIS_REPL_CONNECTED) ?
5341 "up" : "down",
5342 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5343 );
5344 }
5345 for (j = 0; j < server.dbnum; j++) {
5346 long long keys, vkeys;
5347
5348 keys = dictSize(server.db[j].dict);
5349 vkeys = dictSize(server.db[j].expires);
5350 if (keys || vkeys) {
5351 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5352 j, keys, vkeys);
5353 }
5354 }
5355 return info;
5356 }
5357
5358 static void infoCommand(redisClient *c) {
5359 sds info = genRedisInfoString();
5360 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5361 (unsigned long)sdslen(info)));
5362 addReplySds(c,info);
5363 addReply(c,shared.crlf);
5364 }
5365
5366 static void monitorCommand(redisClient *c) {
5367 /* ignore MONITOR if aleady slave or in monitor mode */
5368 if (c->flags & REDIS_SLAVE) return;
5369
5370 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5371 c->slaveseldb = 0;
5372 listAddNodeTail(server.monitors,c);
5373 addReply(c,shared.ok);
5374 }
5375
5376 /* ================================= Expire ================================= */
5377 static int removeExpire(redisDb *db, robj *key) {
5378 if (dictDelete(db->expires,key) == DICT_OK) {
5379 return 1;
5380 } else {
5381 return 0;
5382 }
5383 }
5384
5385 static int setExpire(redisDb *db, robj *key, time_t when) {
5386 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5387 return 0;
5388 } else {
5389 incrRefCount(key);
5390 return 1;
5391 }
5392 }
5393
5394 /* Return the expire time of the specified key, or -1 if no expire
5395 * is associated with this key (i.e. the key is non volatile) */
5396 static time_t getExpire(redisDb *db, robj *key) {
5397 dictEntry *de;
5398
5399 /* No expire? return ASAP */
5400 if (dictSize(db->expires) == 0 ||
5401 (de = dictFind(db->expires,key)) == NULL) return -1;
5402
5403 return (time_t) dictGetEntryVal(de);
5404 }
5405
5406 static int expireIfNeeded(redisDb *db, robj *key) {
5407 time_t when;
5408 dictEntry *de;
5409
5410 /* No expire? return ASAP */
5411 if (dictSize(db->expires) == 0 ||
5412 (de = dictFind(db->expires,key)) == NULL) return 0;
5413
5414 /* Lookup the expire */
5415 when = (time_t) dictGetEntryVal(de);
5416 if (time(NULL) <= when) return 0;
5417
5418 /* Delete the key */
5419 dictDelete(db->expires,key);
5420 return dictDelete(db->dict,key) == DICT_OK;
5421 }
5422
5423 static int deleteIfVolatile(redisDb *db, robj *key) {
5424 dictEntry *de;
5425
5426 /* No expire? return ASAP */
5427 if (dictSize(db->expires) == 0 ||
5428 (de = dictFind(db->expires,key)) == NULL) return 0;
5429
5430 /* Delete the key */
5431 server.dirty++;
5432 dictDelete(db->expires,key);
5433 return dictDelete(db->dict,key) == DICT_OK;
5434 }
5435
5436 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5437 dictEntry *de;
5438
5439 de = dictFind(c->db->dict,key);
5440 if (de == NULL) {
5441 addReply(c,shared.czero);
5442 return;
5443 }
5444 if (seconds < 0) {
5445 if (deleteKey(c->db,key)) server.dirty++;
5446 addReply(c, shared.cone);
5447 return;
5448 } else {
5449 time_t when = time(NULL)+seconds;
5450 if (setExpire(c->db,key,when)) {
5451 addReply(c,shared.cone);
5452 server.dirty++;
5453 } else {
5454 addReply(c,shared.czero);
5455 }
5456 return;
5457 }
5458 }
5459
5460 static void expireCommand(redisClient *c) {
5461 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5462 }
5463
5464 static void expireatCommand(redisClient *c) {
5465 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5466 }
5467
5468 static void ttlCommand(redisClient *c) {
5469 time_t expire;
5470 int ttl = -1;
5471
5472 expire = getExpire(c->db,c->argv[1]);
5473 if (expire != -1) {
5474 ttl = (int) (expire-time(NULL));
5475 if (ttl < 0) ttl = -1;
5476 }
5477 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5478 }
5479
5480 /* ================================ MULTI/EXEC ============================== */
5481
5482 /* Client state initialization for MULTI/EXEC */
5483 static void initClientMultiState(redisClient *c) {
5484 c->mstate.commands = NULL;
5485 c->mstate.count = 0;
5486 }
5487
5488 /* Release all the resources associated with MULTI/EXEC state */
5489 static void freeClientMultiState(redisClient *c) {
5490 int j;
5491
5492 for (j = 0; j < c->mstate.count; j++) {
5493 int i;
5494 multiCmd *mc = c->mstate.commands+j;
5495
5496 for (i = 0; i < mc->argc; i++)
5497 decrRefCount(mc->argv[i]);
5498 zfree(mc->argv);
5499 }
5500 zfree(c->mstate.commands);
5501 }
5502
5503 /* Add a new command into the MULTI commands queue */
5504 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5505 multiCmd *mc;
5506 int j;
5507
5508 c->mstate.commands = zrealloc(c->mstate.commands,
5509 sizeof(multiCmd)*(c->mstate.count+1));
5510 mc = c->mstate.commands+c->mstate.count;
5511 mc->cmd = cmd;
5512 mc->argc = c->argc;
5513 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5514 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5515 for (j = 0; j < c->argc; j++)
5516 incrRefCount(mc->argv[j]);
5517 c->mstate.count++;
5518 }
5519
5520 static void multiCommand(redisClient *c) {
5521 c->flags |= REDIS_MULTI;
5522 addReply(c,shared.ok);
5523 }
5524
5525 static void execCommand(redisClient *c) {
5526 int j;
5527 robj **orig_argv;
5528 int orig_argc;
5529
5530 if (!(c->flags & REDIS_MULTI)) {
5531 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5532 return;
5533 }
5534
5535 orig_argv = c->argv;
5536 orig_argc = c->argc;
5537 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5538 for (j = 0; j < c->mstate.count; j++) {
5539 c->argc = c->mstate.commands[j].argc;
5540 c->argv = c->mstate.commands[j].argv;
5541 call(c,c->mstate.commands[j].cmd);
5542 }
5543 c->argv = orig_argv;
5544 c->argc = orig_argc;
5545 freeClientMultiState(c);
5546 initClientMultiState(c);
5547 c->flags &= (~REDIS_MULTI);
5548 }
5549
5550 /* =========================== Blocking Operations ========================= */
5551
5552 /* Currently Redis blocking operations support is limited to list POP ops,
5553 * so the current implementation is not fully generic, but it is also not
5554 * completely specific so it will not require a rewrite to support new
5555 * kind of blocking operations in the future.
5556 *
5557 * Still it's important to note that list blocking operations can be already
5558 * used as a notification mechanism in order to implement other blocking
5559 * operations at application level, so there must be a very strong evidence
5560 * of usefulness and generality before new blocking operations are implemented.
5561 *
5562 * This is how the current blocking POP works, we use BLPOP as example:
5563 * - If the user calls BLPOP and the key exists and contains a non empty list
5564 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5565 * if there is not to block.
5566 * - If instead BLPOP is called and the key does not exists or the list is
5567 * empty we need to block. In order to do so we remove the notification for
5568 * new data to read in the client socket (so that we'll not serve new
5569 * requests if the blocking request is not served). Also we put the client
5570 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5571 * blocking for this keys.
5572 * - If a PUSH operation against a key with blocked clients waiting is
5573 * performed, we serve the first in the list: basically instead to push
5574 * the new element inside the list we return it to the (first / oldest)
5575 * blocking client, unblock the client, and remove it form the list.
5576 *
5577 * The above comment and the source code should be enough in order to understand
5578 * the implementation and modify / fix it later.
5579 */
5580
5581 /* Set a client in blocking mode for the specified key, with the specified
5582 * timeout */
5583 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5584 dictEntry *de;
5585 list *l;
5586 int j;
5587
5588 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5589 c->blockingkeysnum = numkeys;
5590 c->blockingto = timeout;
5591 for (j = 0; j < numkeys; j++) {
5592 /* Add the key in the client structure, to map clients -> keys */
5593 c->blockingkeys[j] = keys[j];
5594 incrRefCount(keys[j]);
5595
5596 /* And in the other "side", to map keys -> clients */
5597 de = dictFind(c->db->blockingkeys,keys[j]);
5598 if (de == NULL) {
5599 int retval;
5600
5601 /* For every key we take a list of clients blocked for it */
5602 l = listCreate();
5603 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5604 incrRefCount(keys[j]);
5605 assert(retval == DICT_OK);
5606 } else {
5607 l = dictGetEntryVal(de);
5608 }
5609 listAddNodeTail(l,c);
5610 }
5611 /* Mark the client as a blocked client */
5612 c->flags |= REDIS_BLOCKED;
5613 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5614 server.blockedclients++;
5615 }
5616
5617 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5618 static void unblockClient(redisClient *c) {
5619 dictEntry *de;
5620 list *l;
5621 int j;
5622
5623 assert(c->blockingkeys != NULL);
5624 /* The client may wait for multiple keys, so unblock it for every key. */
5625 for (j = 0; j < c->blockingkeysnum; j++) {
5626 /* Remove this client from the list of clients waiting for this key. */
5627 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5628 assert(de != NULL);
5629 l = dictGetEntryVal(de);
5630 listDelNode(l,listSearchKey(l,c));
5631 /* If the list is empty we need to remove it to avoid wasting memory */
5632 if (listLength(l) == 0)
5633 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5634 decrRefCount(c->blockingkeys[j]);
5635 }
5636 /* Cleanup the client structure */
5637 zfree(c->blockingkeys);
5638 c->blockingkeys = NULL;
5639 c->flags &= (~REDIS_BLOCKED);
5640 server.blockedclients--;
5641 /* Ok now we are ready to get read events from socket, note that we
5642 * can't trap errors here as it's possible that unblockClients() is
5643 * called from freeClient() itself, and the only thing we can do
5644 * if we failed to register the READABLE event is to kill the client.
5645 * Still the following function should never fail in the real world as
5646 * we are sure the file descriptor is sane, and we exit on out of mem. */
5647 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5648 /* As a final step we want to process data if there is some command waiting
5649 * in the input buffer. Note that this is safe even if unblockClient()
5650 * gets called from freeClient() because freeClient() will be smart
5651 * enough to call this function *after* c->querybuf was set to NULL. */
5652 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5653 }
5654
5655 /* This should be called from any function PUSHing into lists.
5656 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5657 * 'ele' is the element pushed.
5658 *
5659 * If the function returns 0 there was no client waiting for a list push
5660 * against this key.
5661 *
5662 * If the function returns 1 there was a client waiting for a list push
5663 * against this key, the element was passed to this client thus it's not
5664 * needed to actually add it to the list and the caller should return asap. */
5665 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5666 struct dictEntry *de;
5667 redisClient *receiver;
5668 list *l;
5669 listNode *ln;
5670
5671 de = dictFind(c->db->blockingkeys,key);
5672 if (de == NULL) return 0;
5673 l = dictGetEntryVal(de);
5674 ln = listFirst(l);
5675 assert(ln != NULL);
5676 receiver = ln->value;
5677
5678 addReplySds(receiver,sdsnew("*2\r\n"));
5679 addReplyBulkLen(receiver,key);
5680 addReply(receiver,key);
5681 addReply(receiver,shared.crlf);
5682 addReplyBulkLen(receiver,ele);
5683 addReply(receiver,ele);
5684 addReply(receiver,shared.crlf);
5685 unblockClient(receiver);
5686 return 1;
5687 }
5688
5689 /* Blocking RPOP/LPOP */
5690 static void blockingPopGenericCommand(redisClient *c, int where) {
5691 robj *o;
5692 time_t timeout;
5693 int j;
5694
5695 for (j = 1; j < c->argc-1; j++) {
5696 o = lookupKeyWrite(c->db,c->argv[j]);
5697 if (o != NULL) {
5698 if (o->type != REDIS_LIST) {
5699 addReply(c,shared.wrongtypeerr);
5700 return;
5701 } else {
5702 list *list = o->ptr;
5703 if (listLength(list) != 0) {
5704 /* If the list contains elements fall back to the usual
5705 * non-blocking POP operation */
5706 robj *argv[2], **orig_argv;
5707 int orig_argc;
5708
5709 /* We need to alter the command arguments before to call
5710 * popGenericCommand() as the command takes a single key. */
5711 orig_argv = c->argv;
5712 orig_argc = c->argc;
5713 argv[1] = c->argv[j];
5714 c->argv = argv;
5715 c->argc = 2;
5716
5717 /* Also the return value is different, we need to output
5718 * the multi bulk reply header and the key name. The
5719 * "real" command will add the last element (the value)
5720 * for us. If this souds like an hack to you it's just
5721 * because it is... */
5722 addReplySds(c,sdsnew("*2\r\n"));
5723 addReplyBulkLen(c,argv[1]);
5724 addReply(c,argv[1]);
5725 addReply(c,shared.crlf);
5726 popGenericCommand(c,where);
5727
5728 /* Fix the client structure with the original stuff */
5729 c->argv = orig_argv;
5730 c->argc = orig_argc;
5731 return;
5732 }
5733 }
5734 }
5735 }
5736 /* If the list is empty or the key does not exists we must block */
5737 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5738 if (timeout > 0) timeout += time(NULL);
5739 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5740 }
5741
5742 static void blpopCommand(redisClient *c) {
5743 blockingPopGenericCommand(c,REDIS_HEAD);
5744 }
5745
5746 static void brpopCommand(redisClient *c) {
5747 blockingPopGenericCommand(c,REDIS_TAIL);
5748 }
5749
5750 /* =============================== Replication ============================= */
5751
5752 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5753 ssize_t nwritten, ret = size;
5754 time_t start = time(NULL);
5755
5756 timeout++;
5757 while(size) {
5758 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5759 nwritten = write(fd,ptr,size);
5760 if (nwritten == -1) return -1;
5761 ptr += nwritten;
5762 size -= nwritten;
5763 }
5764 if ((time(NULL)-start) > timeout) {
5765 errno = ETIMEDOUT;
5766 return -1;
5767 }
5768 }
5769 return ret;
5770 }
5771
5772 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5773 ssize_t nread, totread = 0;
5774 time_t start = time(NULL);
5775
5776 timeout++;
5777 while(size) {
5778 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5779 nread = read(fd,ptr,size);
5780 if (nread == -1) return -1;
5781 ptr += nread;
5782 size -= nread;
5783 totread += nread;
5784 }
5785 if ((time(NULL)-start) > timeout) {
5786 errno = ETIMEDOUT;
5787 return -1;
5788 }
5789 }
5790 return totread;
5791 }
5792
5793 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5794 ssize_t nread = 0;
5795
5796 size--;
5797 while(size) {
5798 char c;
5799
5800 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5801 if (c == '\n') {
5802 *ptr = '\0';
5803 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5804 return nread;
5805 } else {
5806 *ptr++ = c;
5807 *ptr = '\0';
5808 nread++;
5809 }
5810 }
5811 return nread;
5812 }
5813
5814 static void syncCommand(redisClient *c) {
5815 /* ignore SYNC if aleady slave or in monitor mode */
5816 if (c->flags & REDIS_SLAVE) return;
5817
5818 /* SYNC can't be issued when the server has pending data to send to
5819 * the client about already issued commands. We need a fresh reply
5820 * buffer registering the differences between the BGSAVE and the current
5821 * dataset, so that we can copy to other slaves if needed. */
5822 if (listLength(c->reply) != 0) {
5823 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5824 return;
5825 }
5826
5827 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5828 /* Here we need to check if there is a background saving operation
5829 * in progress, or if it is required to start one */
5830 if (server.bgsavechildpid != -1) {
5831 /* Ok a background save is in progress. Let's check if it is a good
5832 * one for replication, i.e. if there is another slave that is
5833 * registering differences since the server forked to save */
5834 redisClient *slave;
5835 listNode *ln;
5836
5837 listRewind(server.slaves);
5838 while((ln = listYield(server.slaves))) {
5839 slave = ln->value;
5840 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5841 }
5842 if (ln) {
5843 /* Perfect, the server is already registering differences for
5844 * another slave. Set the right state, and copy the buffer. */
5845 listRelease(c->reply);
5846 c->reply = listDup(slave->reply);
5847 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5848 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5849 } else {
5850 /* No way, we need to wait for the next BGSAVE in order to
5851 * register differences */
5852 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5853 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5854 }
5855 } else {
5856 /* Ok we don't have a BGSAVE in progress, let's start one */
5857 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5858 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5859 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5860 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5861 return;
5862 }
5863 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5864 }
5865 c->repldbfd = -1;
5866 c->flags |= REDIS_SLAVE;
5867 c->slaveseldb = 0;
5868 listAddNodeTail(server.slaves,c);
5869 return;
5870 }
5871
5872 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5873 redisClient *slave = privdata;
5874 REDIS_NOTUSED(el);
5875 REDIS_NOTUSED(mask);
5876 char buf[REDIS_IOBUF_LEN];
5877 ssize_t nwritten, buflen;
5878
5879 if (slave->repldboff == 0) {
5880 /* Write the bulk write count before to transfer the DB. In theory here
5881 * we don't know how much room there is in the output buffer of the
5882 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5883 * operations) will never be smaller than the few bytes we need. */
5884 sds bulkcount;
5885
5886 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5887 slave->repldbsize);
5888 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5889 {
5890 sdsfree(bulkcount);
5891 freeClient(slave);
5892 return;
5893 }
5894 sdsfree(bulkcount);
5895 }
5896 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5897 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5898 if (buflen <= 0) {
5899 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5900 (buflen == 0) ? "premature EOF" : strerror(errno));
5901 freeClient(slave);
5902 return;
5903 }
5904 if ((nwritten = write(fd,buf,buflen)) == -1) {
5905 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5906 strerror(errno));
5907 freeClient(slave);
5908 return;
5909 }
5910 slave->repldboff += nwritten;
5911 if (slave->repldboff == slave->repldbsize) {
5912 close(slave->repldbfd);
5913 slave->repldbfd = -1;
5914 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5915 slave->replstate = REDIS_REPL_ONLINE;
5916 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5917 sendReplyToClient, slave) == AE_ERR) {
5918 freeClient(slave);
5919 return;
5920 }
5921 addReplySds(slave,sdsempty());
5922 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5923 }
5924 }
5925
5926 /* This function is called at the end of every backgrond saving.
5927 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5928 * otherwise REDIS_ERR is passed to the function.
5929 *
5930 * The goal of this function is to handle slaves waiting for a successful
5931 * background saving in order to perform non-blocking synchronization. */
5932 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5933 listNode *ln;
5934 int startbgsave = 0;
5935
5936 listRewind(server.slaves);
5937 while((ln = listYield(server.slaves))) {
5938 redisClient *slave = ln->value;
5939
5940 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5941 startbgsave = 1;
5942 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5943 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5944 struct redis_stat buf;
5945
5946 if (bgsaveerr != REDIS_OK) {
5947 freeClient(slave);
5948 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5949 continue;
5950 }
5951 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5952 redis_fstat(slave->repldbfd,&buf) == -1) {
5953 freeClient(slave);
5954 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5955 continue;
5956 }
5957 slave->repldboff = 0;
5958 slave->repldbsize = buf.st_size;
5959 slave->replstate = REDIS_REPL_SEND_BULK;
5960 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5961 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5962 freeClient(slave);
5963 continue;
5964 }
5965 }
5966 }
5967 if (startbgsave) {
5968 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5969 listRewind(server.slaves);
5970 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5971 while((ln = listYield(server.slaves))) {
5972 redisClient *slave = ln->value;
5973
5974 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5975 freeClient(slave);
5976 }
5977 }
5978 }
5979 }
5980
5981 static int syncWithMaster(void) {
5982 char buf[1024], tmpfile[256], authcmd[1024];
5983 int dumpsize;
5984 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5985 int dfd;
5986
5987 if (fd == -1) {
5988 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5989 strerror(errno));
5990 return REDIS_ERR;
5991 }
5992
5993 /* AUTH with the master if required. */
5994 if(server.masterauth) {
5995 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5996 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5997 close(fd);
5998 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5999 strerror(errno));
6000 return REDIS_ERR;
6001 }
6002 /* Read the AUTH result. */
6003 if (syncReadLine(fd,buf,1024,3600) == -1) {
6004 close(fd);
6005 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
6006 strerror(errno));
6007 return REDIS_ERR;
6008 }
6009 if (buf[0] != '+') {
6010 close(fd);
6011 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
6012 return REDIS_ERR;
6013 }
6014 }
6015
6016 /* Issue the SYNC command */
6017 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6018 close(fd);
6019 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6020 strerror(errno));
6021 return REDIS_ERR;
6022 }
6023 /* Read the bulk write count */
6024 if (syncReadLine(fd,buf,1024,3600) == -1) {
6025 close(fd);
6026 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6027 strerror(errno));
6028 return REDIS_ERR;
6029 }
6030 if (buf[0] != '$') {
6031 close(fd);
6032 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6033 return REDIS_ERR;
6034 }
6035 dumpsize = atoi(buf+1);
6036 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6037 /* Read the bulk write data on a temp file */
6038 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6039 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6040 if (dfd == -1) {
6041 close(fd);
6042 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6043 return REDIS_ERR;
6044 }
6045 while(dumpsize) {
6046 int nread, nwritten;
6047
6048 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6049 if (nread == -1) {
6050 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6051 strerror(errno));
6052 close(fd);
6053 close(dfd);
6054 return REDIS_ERR;
6055 }
6056 nwritten = write(dfd,buf,nread);
6057 if (nwritten == -1) {
6058 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6059 close(fd);
6060 close(dfd);
6061 return REDIS_ERR;
6062 }
6063 dumpsize -= nread;
6064 }
6065 close(dfd);
6066 if (rename(tmpfile,server.dbfilename) == -1) {
6067 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6068 unlink(tmpfile);
6069 close(fd);
6070 return REDIS_ERR;
6071 }
6072 emptyDb();
6073 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6074 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6075 close(fd);
6076 return REDIS_ERR;
6077 }
6078 server.master = createClient(fd);
6079 server.master->flags |= REDIS_MASTER;
6080 server.master->authenticated = 1;
6081 server.replstate = REDIS_REPL_CONNECTED;
6082 return REDIS_OK;
6083 }
6084
6085 static void slaveofCommand(redisClient *c) {
6086 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6087 !strcasecmp(c->argv[2]->ptr,"one")) {
6088 if (server.masterhost) {
6089 sdsfree(server.masterhost);
6090 server.masterhost = NULL;
6091 if (server.master) freeClient(server.master);
6092 server.replstate = REDIS_REPL_NONE;
6093 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6094 }
6095 } else {
6096 sdsfree(server.masterhost);
6097 server.masterhost = sdsdup(c->argv[1]->ptr);
6098 server.masterport = atoi(c->argv[2]->ptr);
6099 if (server.master) freeClient(server.master);
6100 server.replstate = REDIS_REPL_CONNECT;
6101 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6102 server.masterhost, server.masterport);
6103 }
6104 addReply(c,shared.ok);
6105 }
6106
6107 /* ============================ Maxmemory directive ======================== */
6108
6109 /* This function gets called when 'maxmemory' is set on the config file to limit
6110 * the max memory used by the server, and we are out of memory.
6111 * This function will try to, in order:
6112 *
6113 * - Free objects from the free list
6114 * - Try to remove keys with an EXPIRE set
6115 *
6116 * It is not possible to free enough memory to reach used-memory < maxmemory
6117 * the server will start refusing commands that will enlarge even more the
6118 * memory usage.
6119 */
6120 static void freeMemoryIfNeeded(void) {
6121 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6122 if (listLength(server.objfreelist)) {
6123 robj *o;
6124
6125 listNode *head = listFirst(server.objfreelist);
6126 o = listNodeValue(head);
6127 listDelNode(server.objfreelist,head);
6128 zfree(o);
6129 } else {
6130 int j, k, freed = 0;
6131
6132 for (j = 0; j < server.dbnum; j++) {
6133 int minttl = -1;
6134 robj *minkey = NULL;
6135 struct dictEntry *de;
6136
6137 if (dictSize(server.db[j].expires)) {
6138 freed = 1;
6139 /* From a sample of three keys drop the one nearest to
6140 * the natural expire */
6141 for (k = 0; k < 3; k++) {
6142 time_t t;
6143
6144 de = dictGetRandomKey(server.db[j].expires);
6145 t = (time_t) dictGetEntryVal(de);
6146 if (minttl == -1 || t < minttl) {
6147 minkey = dictGetEntryKey(de);
6148 minttl = t;
6149 }
6150 }
6151 deleteKey(server.db+j,minkey);
6152 }
6153 }
6154 if (!freed) return; /* nothing to free... */
6155 }
6156 }
6157 }
6158
6159 /* ============================== Append Only file ========================== */
6160
6161 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6162 sds buf = sdsempty();
6163 int j;
6164 ssize_t nwritten;
6165 time_t now;
6166 robj *tmpargv[3];
6167
6168 /* The DB this command was targetting is not the same as the last command
6169 * we appendend. To issue a SELECT command is needed. */
6170 if (dictid != server.appendseldb) {
6171 char seldb[64];
6172
6173 snprintf(seldb,sizeof(seldb),"%d",dictid);
6174 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6175 (unsigned long)strlen(seldb),seldb);
6176 server.appendseldb = dictid;
6177 }
6178
6179 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6180 * EXPIREs into EXPIREATs calls */
6181 if (cmd->proc == expireCommand) {
6182 long when;
6183
6184 tmpargv[0] = createStringObject("EXPIREAT",8);
6185 tmpargv[1] = argv[1];
6186 incrRefCount(argv[1]);
6187 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6188 tmpargv[2] = createObject(REDIS_STRING,
6189 sdscatprintf(sdsempty(),"%ld",when));
6190 argv = tmpargv;
6191 }
6192
6193 /* Append the actual command */
6194 buf = sdscatprintf(buf,"*%d\r\n",argc);
6195 for (j = 0; j < argc; j++) {
6196 robj *o = argv[j];
6197
6198 o = getDecodedObject(o);
6199 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6200 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6201 buf = sdscatlen(buf,"\r\n",2);
6202 decrRefCount(o);
6203 }
6204
6205 /* Free the objects from the modified argv for EXPIREAT */
6206 if (cmd->proc == expireCommand) {
6207 for (j = 0; j < 3; j++)
6208 decrRefCount(argv[j]);
6209 }
6210
6211 /* We want to perform a single write. This should be guaranteed atomic
6212 * at least if the filesystem we are writing is a real physical one.
6213 * While this will save us against the server being killed I don't think
6214 * there is much to do about the whole server stopping for power problems
6215 * or alike */
6216 nwritten = write(server.appendfd,buf,sdslen(buf));
6217 if (nwritten != (signed)sdslen(buf)) {
6218 /* Ooops, we are in troubles. The best thing to do for now is
6219 * to simply exit instead to give the illusion that everything is
6220 * working as expected. */
6221 if (nwritten == -1) {
6222 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6223 } else {
6224 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6225 }
6226 exit(1);
6227 }
6228 /* If a background append only file rewriting is in progress we want to
6229 * accumulate the differences between the child DB and the current one
6230 * in a buffer, so that when the child process will do its work we
6231 * can append the differences to the new append only file. */
6232 if (server.bgrewritechildpid != -1)
6233 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6234
6235 sdsfree(buf);
6236 now = time(NULL);
6237 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6238 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6239 now-server.lastfsync > 1))
6240 {
6241 fsync(server.appendfd); /* Let's try to get this data on the disk */
6242 server.lastfsync = now;
6243 }
6244 }
6245
6246 /* In Redis commands are always executed in the context of a client, so in
6247 * order to load the append only file we need to create a fake client. */
6248 static struct redisClient *createFakeClient(void) {
6249 struct redisClient *c = zmalloc(sizeof(*c));
6250
6251 selectDb(c,0);
6252 c->fd = -1;
6253 c->querybuf = sdsempty();
6254 c->argc = 0;
6255 c->argv = NULL;
6256 c->flags = 0;
6257 /* We set the fake client as a slave waiting for the synchronization
6258 * so that Redis will not try to send replies to this client. */
6259 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6260 c->reply = listCreate();
6261 listSetFreeMethod(c->reply,decrRefCount);
6262 listSetDupMethod(c->reply,dupClientReplyValue);
6263 return c;
6264 }
6265
6266 static void freeFakeClient(struct redisClient *c) {
6267 sdsfree(c->querybuf);
6268 listRelease(c->reply);
6269 zfree(c);
6270 }
6271
6272 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6273 * error (the append only file is zero-length) REDIS_ERR is returned. On
6274 * fatal error an error message is logged and the program exists. */
6275 int loadAppendOnlyFile(char *filename) {
6276 struct redisClient *fakeClient;
6277 FILE *fp = fopen(filename,"r");
6278 struct redis_stat sb;
6279
6280 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6281 return REDIS_ERR;
6282
6283 if (fp == NULL) {
6284 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6285 exit(1);
6286 }
6287
6288 fakeClient = createFakeClient();
6289 while(1) {
6290 int argc, j;
6291 unsigned long len;
6292 robj **argv;
6293 char buf[128];
6294 sds argsds;
6295 struct redisCommand *cmd;
6296
6297 if (fgets(buf,sizeof(buf),fp) == NULL) {
6298 if (feof(fp))
6299 break;
6300 else
6301 goto readerr;
6302 }
6303 if (buf[0] != '*') goto fmterr;
6304 argc = atoi(buf+1);
6305 argv = zmalloc(sizeof(robj*)*argc);
6306 for (j = 0; j < argc; j++) {
6307 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6308 if (buf[0] != '$') goto fmterr;
6309 len = strtol(buf+1,NULL,10);
6310 argsds = sdsnewlen(NULL,len);
6311 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6312 argv[j] = createObject(REDIS_STRING,argsds);
6313 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6314 }
6315
6316 /* Command lookup */
6317 cmd = lookupCommand(argv[0]->ptr);
6318 if (!cmd) {
6319 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6320 exit(1);
6321 }
6322 /* Try object sharing and encoding */
6323 if (server.shareobjects) {
6324 int j;
6325 for(j = 1; j < argc; j++)
6326 argv[j] = tryObjectSharing(argv[j]);
6327 }
6328 if (cmd->flags & REDIS_CMD_BULK)
6329 tryObjectEncoding(argv[argc-1]);
6330 /* Run the command in the context of a fake client */
6331 fakeClient->argc = argc;
6332 fakeClient->argv = argv;
6333 cmd->proc(fakeClient);
6334 /* Discard the reply objects list from the fake client */
6335 while(listLength(fakeClient->reply))
6336 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6337 /* Clean up, ready for the next command */
6338 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6339 zfree(argv);
6340 }
6341 fclose(fp);
6342 freeFakeClient(fakeClient);
6343 return REDIS_OK;
6344
6345 readerr:
6346 if (feof(fp)) {
6347 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6348 } else {
6349 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6350 }
6351 exit(1);
6352 fmterr:
6353 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6354 exit(1);
6355 }
6356
6357 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6358 static int fwriteBulk(FILE *fp, robj *obj) {
6359 char buf[128];
6360 obj = getDecodedObject(obj);
6361 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6362 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6363 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6364 goto err;
6365 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6366 decrRefCount(obj);
6367 return 1;
6368 err:
6369 decrRefCount(obj);
6370 return 0;
6371 }
6372
6373 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6374 static int fwriteBulkDouble(FILE *fp, double d) {
6375 char buf[128], dbuf[128];
6376
6377 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6378 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6379 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6380 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6381 return 1;
6382 }
6383
6384 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6385 static int fwriteBulkLong(FILE *fp, long l) {
6386 char buf[128], lbuf[128];
6387
6388 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6389 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6390 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6391 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6392 return 1;
6393 }
6394
6395 /* Write a sequence of commands able to fully rebuild the dataset into
6396 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6397 static int rewriteAppendOnlyFile(char *filename) {
6398 dictIterator *di = NULL;
6399 dictEntry *de;
6400 FILE *fp;
6401 char tmpfile[256];
6402 int j;
6403 time_t now = time(NULL);
6404
6405 /* Note that we have to use a different temp name here compared to the
6406 * one used by rewriteAppendOnlyFileBackground() function. */
6407 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6408 fp = fopen(tmpfile,"w");
6409 if (!fp) {
6410 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6411 return REDIS_ERR;
6412 }
6413 for (j = 0; j < server.dbnum; j++) {
6414 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6415 redisDb *db = server.db+j;
6416 dict *d = db->dict;
6417 if (dictSize(d) == 0) continue;
6418 di = dictGetIterator(d);
6419 if (!di) {
6420 fclose(fp);
6421 return REDIS_ERR;
6422 }
6423
6424 /* SELECT the new DB */
6425 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6426 if (fwriteBulkLong(fp,j) == 0) goto werr;
6427
6428 /* Iterate this DB writing every entry */
6429 while((de = dictNext(di)) != NULL) {
6430 robj *key = dictGetEntryKey(de);
6431 robj *o = dictGetEntryVal(de);
6432 time_t expiretime = getExpire(db,key);
6433
6434 /* Save the key and associated value */
6435 if (o->type == REDIS_STRING) {
6436 /* Emit a SET command */
6437 char cmd[]="*3\r\n$3\r\nSET\r\n";
6438 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6439 /* Key and value */
6440 if (fwriteBulk(fp,key) == 0) goto werr;
6441 if (fwriteBulk(fp,o) == 0) goto werr;
6442 } else if (o->type == REDIS_LIST) {
6443 /* Emit the RPUSHes needed to rebuild the list */
6444 list *list = o->ptr;
6445 listNode *ln;
6446
6447 listRewind(list);
6448 while((ln = listYield(list))) {
6449 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6450 robj *eleobj = listNodeValue(ln);
6451
6452 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6453 if (fwriteBulk(fp,key) == 0) goto werr;
6454 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6455 }
6456 } else if (o->type == REDIS_SET) {
6457 /* Emit the SADDs needed to rebuild the set */
6458 dict *set = o->ptr;
6459 dictIterator *di = dictGetIterator(set);
6460 dictEntry *de;
6461
6462 while((de = dictNext(di)) != NULL) {
6463 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6464 robj *eleobj = dictGetEntryKey(de);
6465
6466 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6467 if (fwriteBulk(fp,key) == 0) goto werr;
6468 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6469 }
6470 dictReleaseIterator(di);
6471 } else if (o->type == REDIS_ZSET) {
6472 /* Emit the ZADDs needed to rebuild the sorted set */
6473 zset *zs = o->ptr;
6474 dictIterator *di = dictGetIterator(zs->dict);
6475 dictEntry *de;
6476
6477 while((de = dictNext(di)) != NULL) {
6478 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6479 robj *eleobj = dictGetEntryKey(de);
6480 double *score = dictGetEntryVal(de);
6481
6482 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6483 if (fwriteBulk(fp,key) == 0) goto werr;
6484 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6485 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6486 }
6487 dictReleaseIterator(di);
6488 } else {
6489 redisAssert(0 != 0);
6490 }
6491 /* Save the expire time */
6492 if (expiretime != -1) {
6493 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6494 /* If this key is already expired skip it */
6495 if (expiretime < now) continue;
6496 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6497 if (fwriteBulk(fp,key) == 0) goto werr;
6498 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6499 }
6500 }
6501 dictReleaseIterator(di);
6502 }
6503
6504 /* Make sure data will not remain on the OS's output buffers */
6505 fflush(fp);
6506 fsync(fileno(fp));
6507 fclose(fp);
6508
6509 /* Use RENAME to make sure the DB file is changed atomically only
6510 * if the generate DB file is ok. */
6511 if (rename(tmpfile,filename) == -1) {
6512 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6513 unlink(tmpfile);
6514 return REDIS_ERR;
6515 }
6516 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6517 return REDIS_OK;
6518
6519 werr:
6520 fclose(fp);
6521 unlink(tmpfile);
6522 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6523 if (di) dictReleaseIterator(di);
6524 return REDIS_ERR;
6525 }
6526
6527 /* This is how rewriting of the append only file in background works:
6528 *
6529 * 1) The user calls BGREWRITEAOF
6530 * 2) Redis calls this function, that forks():
6531 * 2a) the child rewrite the append only file in a temp file.
6532 * 2b) the parent accumulates differences in server.bgrewritebuf.
6533 * 3) When the child finished '2a' exists.
6534 * 4) The parent will trap the exit code, if it's OK, will append the
6535 * data accumulated into server.bgrewritebuf into the temp file, and
6536 * finally will rename(2) the temp file in the actual file name.
6537 * The the new file is reopened as the new append only file. Profit!
6538 */
6539 static int rewriteAppendOnlyFileBackground(void) {
6540 pid_t childpid;
6541
6542 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6543 if ((childpid = fork()) == 0) {
6544 /* Child */
6545 char tmpfile[256];
6546 close(server.fd);
6547
6548 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6549 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6550 exit(0);
6551 } else {
6552 exit(1);
6553 }
6554 } else {
6555 /* Parent */
6556 if (childpid == -1) {
6557 redisLog(REDIS_WARNING,
6558 "Can't rewrite append only file in background: fork: %s",
6559 strerror(errno));
6560 return REDIS_ERR;
6561 }
6562 redisLog(REDIS_NOTICE,
6563 "Background append only file rewriting started by pid %d",childpid);
6564 server.bgrewritechildpid = childpid;
6565 /* We set appendseldb to -1 in order to force the next call to the
6566 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6567 * accumulated by the parent into server.bgrewritebuf will start
6568 * with a SELECT statement and it will be safe to merge. */
6569 server.appendseldb = -1;
6570 return REDIS_OK;
6571 }
6572 return REDIS_OK; /* unreached */
6573 }
6574
6575 static void bgrewriteaofCommand(redisClient *c) {
6576 if (server.bgrewritechildpid != -1) {
6577 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6578 return;
6579 }
6580 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6581 char *status = "+Background append only file rewriting started\r\n";
6582 addReplySds(c,sdsnew(status));
6583 } else {
6584 addReply(c,shared.err);
6585 }
6586 }
6587
6588 static void aofRemoveTempFile(pid_t childpid) {
6589 char tmpfile[256];
6590
6591 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6592 unlink(tmpfile);
6593 }
6594
6595 /* =============================== Virtual Memory =========================== */
6596 static void vmInit(void) {
6597 off_t totsize;
6598
6599 server.vm_fp = fopen("/tmp/redisvm","w+b");
6600 if (server.vm_fp == NULL) {
6601 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6602 exit(1);
6603 }
6604 server.vm_fd = fileno(server.vm_fp);
6605 server.vm_next_page = 0;
6606 server.vm_near_pages = 0;
6607 totsize = server.vm_pages*server.vm_page_size;
6608 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6609 if (ftruncate(server.vm_fd,totsize) == -1) {
6610 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6611 strerror(errno));
6612 exit(1);
6613 } else {
6614 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6615 }
6616 server.vm_bitmap = zmalloc((server.vm_near_pages+7)/8);
6617 memset(server.vm_bitmap,0,(server.vm_near_pages+7)/8);
6618 /* Try to remove the swap file, so the OS will really delete it from the
6619 * file system when Redis exists. */
6620 unlink("/tmp/redisvm");
6621 }
6622
6623 /* Mark the page as used */
6624 static void vmMarkPageUsed(off_t page) {
6625 off_t byte = page/8;
6626 int bit = page&7;
6627 server.vm_bitmap[byte] |= 1<<bit;
6628 }
6629
6630 /* Mark N contiguous pages as used, with 'page' being the first. */
6631 static void vmMarkPagesUsed(off_t page, off_t count) {
6632 off_t j;
6633
6634 for (j = 0; j < count; j++)
6635 vmMarkPageUsed(page+count);
6636 }
6637
6638 /* Mark the page as free */
6639 static void vmMarkPageFree(off_t page) {
6640 off_t byte = page/8;
6641 int bit = page&7;
6642 server.vm_bitmap[byte] &= ~(1<<bit);
6643 }
6644
6645 /* Mark N contiguous pages as free, with 'page' being the first. */
6646 static void vmMarkPagesFree(off_t page, off_t count) {
6647 off_t j;
6648
6649 for (j = 0; j < count; j++)
6650 vmMarkPageFree(page+count);
6651 }
6652
6653 /* Test if the page is free */
6654 static int vmFreePage(off_t page) {
6655 off_t byte = page/8;
6656 int bit = page&7;
6657 return server.vm_bitmap[byte] & bit;
6658 }
6659
6660 /* Find N contiguous free pages storing the first page of the cluster in *first.
6661 * Returns 1 if it was able to find N contiguous pages, otherwise 0 is
6662 * returned.
6663 *
6664 * This function uses a simple algorithm: we try to allocate
6665 * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
6666 * again from the start of the swap file searching for free spaces.
6667 *
6668 * If it looks pretty clear that there are no free pages near our offset
6669 * we try to find less populated places doing a forward jump of
6670 * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
6671 * without hurry, and then we jump again and so forth...
6672 *
6673 * This function can be improved using a free list to avoid to guess
6674 * too much, since we could collect data about freed pages.
6675 *
6676 * note: I implemented this function just after watching an episode of
6677 * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
6678 */
6679 static int vmFindContiguousPages(off_t *first, int n) {
6680 off_t base, offset = 0, since_jump = 0, numfree = 0;
6681
6682 if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
6683 server.vm_near_pages = 0;
6684 server.vm_next_page = 0;
6685 }
6686 server.vm_near_pages++; /* Yet another try for pages near to the old ones */
6687 base = server.vm_next_page;
6688
6689 while(offset < server.vm_pages) {
6690 off_t this = base+offset;
6691
6692 /* If we overflow, restart from page zero */
6693 if (this >= server.vm_pages) {
6694 this -= server.vm_pages;
6695 if (this == 0) {
6696 /* Just overflowed, what we found on tail is no longer
6697 * interesting, as it's no longer contiguous. */
6698 numfree = 0;
6699 }
6700 }
6701 if (vmFreePage(this)) {
6702 /* This is a free page */
6703 numfree++;
6704 /* Already got N free pages? Return to the caller, with success */
6705 if (numfree == n) {
6706 *first = this;
6707 return 1;
6708 }
6709 } else {
6710 /* The current one is not a free page */
6711 numfree = 0;
6712 }
6713
6714 /* Fast-forward if the current page is not free and we already
6715 * searched enough near this place. */
6716 since_jump++;
6717 if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
6718 offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
6719 since_jump = 0;
6720 /* Note that even if we rewind after the jump, we are don't need
6721 * to make sure numfree is set to zero as we only jump *if* it
6722 * is set to zero. */
6723 } else {
6724 /* Otherwise just check the next page */
6725 offset++;
6726 }
6727 }
6728 return 0;
6729 }
6730
6731 /* ================================= Debugging ============================== */
6732
6733 static void debugCommand(redisClient *c) {
6734 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6735 *((char*)-1) = 'x';
6736 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6737 if (rdbSave(server.dbfilename) != REDIS_OK) {
6738 addReply(c,shared.err);
6739 return;
6740 }
6741 emptyDb();
6742 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6743 addReply(c,shared.err);
6744 return;
6745 }
6746 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6747 addReply(c,shared.ok);
6748 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6749 emptyDb();
6750 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6751 addReply(c,shared.err);
6752 return;
6753 }
6754 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6755 addReply(c,shared.ok);
6756 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6757 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6758 robj *key, *val;
6759
6760 if (!de) {
6761 addReply(c,shared.nokeyerr);
6762 return;
6763 }
6764 key = dictGetEntryKey(de);
6765 val = dictGetEntryVal(de);
6766 addReplySds(c,sdscatprintf(sdsempty(),
6767 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6768 (void*)key, key->refcount, (void*)val, val->refcount,
6769 val->encoding, rdbSavedObjectLen(val)));
6770 } else {
6771 addReplySds(c,sdsnew(
6772 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6773 }
6774 }
6775
6776 static void _redisAssert(char *estr) {
6777 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6778 redisLog(REDIS_WARNING,"==> %s\n",estr);
6779 #ifdef HAVE_BACKTRACE
6780 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6781 *((char*)-1) = 'x';
6782 #endif
6783 }
6784
6785 /* =================================== Main! ================================ */
6786
6787 #ifdef __linux__
6788 int linuxOvercommitMemoryValue(void) {
6789 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6790 char buf[64];
6791
6792 if (!fp) return -1;
6793 if (fgets(buf,64,fp) == NULL) {
6794 fclose(fp);
6795 return -1;
6796 }
6797 fclose(fp);
6798
6799 return atoi(buf);
6800 }
6801
6802 void linuxOvercommitMemoryWarning(void) {
6803 if (linuxOvercommitMemoryValue() == 0) {
6804 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6805 }
6806 }
6807 #endif /* __linux__ */
6808
6809 static void daemonize(void) {
6810 int fd;
6811 FILE *fp;
6812
6813 if (fork() != 0) exit(0); /* parent exits */
6814 printf("New pid: %d\n", getpid());
6815 setsid(); /* create a new session */
6816
6817 /* Every output goes to /dev/null. If Redis is daemonized but
6818 * the 'logfile' is set to 'stdout' in the configuration file
6819 * it will not log at all. */
6820 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6821 dup2(fd, STDIN_FILENO);
6822 dup2(fd, STDOUT_FILENO);
6823 dup2(fd, STDERR_FILENO);
6824 if (fd > STDERR_FILENO) close(fd);
6825 }
6826 /* Try to write the pid file */
6827 fp = fopen(server.pidfile,"w");
6828 if (fp) {
6829 fprintf(fp,"%d\n",getpid());
6830 fclose(fp);
6831 }
6832 }
6833
6834 int main(int argc, char **argv) {
6835 initServerConfig();
6836 if (argc == 2) {
6837 resetServerSaveParams();
6838 loadServerConfig(argv[1]);
6839 } else if (argc > 2) {
6840 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6841 exit(1);
6842 } else {
6843 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6844 }
6845 if (server.daemonize) daemonize();
6846 initServer();
6847 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6848 #ifdef __linux__
6849 linuxOvercommitMemoryWarning();
6850 #endif
6851 if (server.appendonly) {
6852 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6853 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6854 } else {
6855 if (rdbLoad(server.dbfilename) == REDIS_OK)
6856 redisLog(REDIS_NOTICE,"DB loaded from disk");
6857 }
6858 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6859 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6860 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6861 aeMain(server.el);
6862 aeDeleteEventLoop(server.el);
6863 return 0;
6864 }
6865
6866 /* ============================= Backtrace support ========================= */
6867
6868 #ifdef HAVE_BACKTRACE
6869 static char *findFuncName(void *pointer, unsigned long *offset);
6870
6871 static void *getMcontextEip(ucontext_t *uc) {
6872 #if defined(__FreeBSD__)
6873 return (void*) uc->uc_mcontext.mc_eip;
6874 #elif defined(__dietlibc__)
6875 return (void*) uc->uc_mcontext.eip;
6876 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6877 #if __x86_64__
6878 return (void*) uc->uc_mcontext->__ss.__rip;
6879 #else
6880 return (void*) uc->uc_mcontext->__ss.__eip;
6881 #endif
6882 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6883 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6884 return (void*) uc->uc_mcontext->__ss.__rip;
6885 #else
6886 return (void*) uc->uc_mcontext->__ss.__eip;
6887 #endif
6888 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6889 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6890 #elif defined(__ia64__) /* Linux IA64 */
6891 return (void*) uc->uc_mcontext.sc_ip;
6892 #else
6893 return NULL;
6894 #endif
6895 }
6896
6897 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6898 void *trace[100];
6899 char **messages = NULL;
6900 int i, trace_size = 0;
6901 unsigned long offset=0;
6902 ucontext_t *uc = (ucontext_t*) secret;
6903 sds infostring;
6904 REDIS_NOTUSED(info);
6905
6906 redisLog(REDIS_WARNING,
6907 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6908 infostring = genRedisInfoString();
6909 redisLog(REDIS_WARNING, "%s",infostring);
6910 /* It's not safe to sdsfree() the returned string under memory
6911 * corruption conditions. Let it leak as we are going to abort */
6912
6913 trace_size = backtrace(trace, 100);
6914 /* overwrite sigaction with caller's address */
6915 if (getMcontextEip(uc) != NULL) {
6916 trace[1] = getMcontextEip(uc);
6917 }
6918 messages = backtrace_symbols(trace, trace_size);
6919
6920 for (i=1; i<trace_size; ++i) {
6921 char *fn = findFuncName(trace[i], &offset), *p;
6922
6923 p = strchr(messages[i],'+');
6924 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6925 redisLog(REDIS_WARNING,"%s", messages[i]);
6926 } else {
6927 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6928 }
6929 }
6930 /* free(messages); Don't call free() with possibly corrupted memory. */
6931 exit(0);
6932 }
6933
6934 static void setupSigSegvAction(void) {
6935 struct sigaction act;
6936
6937 sigemptyset (&act.sa_mask);
6938 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6939 * is used. Otherwise, sa_handler is used */
6940 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6941 act.sa_sigaction = segvHandler;
6942 sigaction (SIGSEGV, &act, NULL);
6943 sigaction (SIGBUS, &act, NULL);
6944 sigaction (SIGFPE, &act, NULL);
6945 sigaction (SIGILL, &act, NULL);
6946 sigaction (SIGBUS, &act, NULL);
6947 return;
6948 }
6949
6950 #include "staticsymbols.h"
6951 /* This function try to convert a pointer into a function name. It's used in
6952 * oreder to provide a backtrace under segmentation fault that's able to
6953 * display functions declared as static (otherwise the backtrace is useless). */
6954 static char *findFuncName(void *pointer, unsigned long *offset){
6955 int i, ret = -1;
6956 unsigned long off, minoff = 0;
6957
6958 /* Try to match against the Symbol with the smallest offset */
6959 for (i=0; symsTable[i].pointer; i++) {
6960 unsigned long lp = (unsigned long) pointer;
6961
6962 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6963 off=lp-symsTable[i].pointer;
6964 if (ret < 0 || off < minoff) {
6965 minoff=off;
6966 ret=i;
6967 }
6968 }
6969 }
6970 if (ret == -1) return NULL;
6971 *offset = minoff;
6972 return symsTable[ret].name;
6973 }
6974 #else /* HAVE_BACKTRACE */
6975 static void setupSigSegvAction(void) {
6976 }
6977 #endif /* HAVE_BACKTRACE */
6978
6979
6980
6981 /* The End */
6982
6983
6984