]> git.saurik.com Git - redis.git/blame_incremental - redis.c
Non blocking replication (finally!). C-side linked lists API improved.
[redis.git] / redis.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#define REDIS_VERSION "0.091"
31
32#include "fmacros.h"
33
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37#include <time.h>
38#include <unistd.h>
39#include <signal.h>
40#include <sys/wait.h>
41#include <errno.h>
42#include <assert.h>
43#include <ctype.h>
44#include <stdarg.h>
45#include <inttypes.h>
46#include <arpa/inet.h>
47#include <sys/stat.h>
48#include <fcntl.h>
49#include <sys/time.h>
50#include <sys/resource.h>
51#include <limits.h>
52
53#include "ae.h" /* Event driven programming library */
54#include "sds.h" /* Dynamic safe strings */
55#include "anet.h" /* Networking the easy way */
56#include "dict.h" /* Hash tables */
57#include "adlist.h" /* Linked lists */
58#include "zmalloc.h" /* total memory usage aware version of malloc/free */
59#include "lzf.h"
60
61/* Error codes */
62#define REDIS_OK 0
63#define REDIS_ERR -1
64
65/* Static server configuration */
66#define REDIS_SERVERPORT 6379 /* TCP port */
67#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
68#define REDIS_IOBUF_LEN 1024
69#define REDIS_LOADBUF_LEN 1024
70#define REDIS_MAX_ARGS 16
71#define REDIS_DEFAULT_DBNUM 16
72#define REDIS_CONFIGLINE_MAX 1024
73#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
74#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
75#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
76
77/* Hash table parameters */
78#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
79#define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
80
81/* Command flags */
82#define REDIS_CMD_BULK 1
83#define REDIS_CMD_INLINE 2
84
85/* Object types */
86#define REDIS_STRING 0
87#define REDIS_LIST 1
88#define REDIS_SET 2
89#define REDIS_HASH 3
90
91/* Object types only used for dumping to disk */
92#define REDIS_EXPIRETIME 253
93#define REDIS_SELECTDB 254
94#define REDIS_EOF 255
95
96/* Defines related to the dump file format. To store 32 bits lengths for short
97 * keys requires a lot of space, so we check the most significant 2 bits of
98 * the first byte to interpreter the length:
99 *
100 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
101 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
102 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
103 * 11|000000 this means: specially encoded object will follow. The six bits
104 * number specify the kind of object that follows.
105 * See the REDIS_RDB_ENC_* defines.
106 *
107 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
108 * values, will fit inside. */
109#define REDIS_RDB_6BITLEN 0
110#define REDIS_RDB_14BITLEN 1
111#define REDIS_RDB_32BITLEN 2
112#define REDIS_RDB_ENCVAL 3
113#define REDIS_RDB_LENERR UINT_MAX
114
115/* When a length of a string object stored on disk has the first two bits
116 * set, the remaining two bits specify a special encoding for the object
117 * accordingly to the following defines: */
118#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
119#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
120#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
121#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
122
123/* Client flags */
124#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
125#define REDIS_SLAVE 2 /* This client is a slave server */
126#define REDIS_MASTER 4 /* This client is a master server */
127#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
128
129/* Slave replication state - slave side */
130#define REDIS_REPL_NONE 0 /* No active replication */
131#define REDIS_REPL_CONNECT 1 /* Must connect to master */
132#define REDIS_REPL_CONNECTED 2 /* Connected to master */
133
134/* Slave replication state - from the point of view of master
135 * Note that in SEND_BULK and ONLINE state the slave receives new updates
136 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
137 * to start the next background saving in order to send updates to it. */
138#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
139#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
140#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
141#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
142
143/* List related stuff */
144#define REDIS_HEAD 0
145#define REDIS_TAIL 1
146
147/* Sort operations */
148#define REDIS_SORT_GET 0
149#define REDIS_SORT_DEL 1
150#define REDIS_SORT_INCR 2
151#define REDIS_SORT_DECR 3
152#define REDIS_SORT_ASC 4
153#define REDIS_SORT_DESC 5
154#define REDIS_SORTKEY_MAX 1024
155
156/* Log levels */
157#define REDIS_DEBUG 0
158#define REDIS_NOTICE 1
159#define REDIS_WARNING 2
160
161/* Anti-warning macro... */
162#define REDIS_NOTUSED(V) ((void) V)
163
164/*================================= Data types ============================== */
165
166/* A redis object, that is a type able to hold a string / list / set */
167typedef struct redisObject {
168 void *ptr;
169 int type;
170 int refcount;
171} robj;
172
173typedef struct redisDb {
174 dict *dict;
175 dict *expires;
176 int id;
177} redisDb;
178
179/* With multiplexing we need to take per-clinet state.
180 * Clients are taken in a liked list. */
181typedef struct redisClient {
182 int fd;
183 redisDb *db;
184 int dictid;
185 sds querybuf;
186 robj *argv[REDIS_MAX_ARGS];
187 int argc;
188 int bulklen; /* bulk read len. -1 if not in bulk read mode */
189 list *reply;
190 int sentlen;
191 time_t lastinteraction; /* time of the last interaction, used for timeout */
192 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
193 int slaveseldb; /* slave selected db, if this client is a slave */
194 int authenticated; /* when requirepass is non-NULL */
195 int replstate; /* replication state if this is a slave */
196 int repldbfd; /* replication DB file descriptor */
197 long repldboff; /* replication DB file offset */
198 off_t repldbsize; /* replication DB file size */
199} redisClient;
200
201struct saveparam {
202 time_t seconds;
203 int changes;
204};
205
206/* Global server state structure */
207struct redisServer {
208 int port;
209 int fd;
210 redisDb *db;
211 dict *sharingpool;
212 unsigned int sharingpoolsize;
213 long long dirty; /* changes to DB from the last save */
214 list *clients;
215 list *slaves, *monitors;
216 char neterr[ANET_ERR_LEN];
217 aeEventLoop *el;
218 int cronloops; /* number of times the cron function run */
219 list *objfreelist; /* A list of freed objects to avoid malloc() */
220 time_t lastsave; /* Unix time of last save succeeede */
221 size_t usedmemory; /* Used memory in megabytes */
222 /* Fields used only for stats */
223 time_t stat_starttime; /* server start time */
224 long long stat_numcommands; /* number of processed commands */
225 long long stat_numconnections; /* number of connections received */
226 /* Configuration */
227 int verbosity;
228 int glueoutputbuf;
229 int maxidletime;
230 int dbnum;
231 int daemonize;
232 char *pidfile;
233 int bgsaveinprogress;
234 struct saveparam *saveparams;
235 int saveparamslen;
236 char *logfile;
237 char *bindaddr;
238 char *dbfilename;
239 char *requirepass;
240 int shareobjects;
241 /* Replication related */
242 int isslave;
243 char *masterhost;
244 int masterport;
245 redisClient *master; /* client that is master for this slave */
246 int replstate;
247 /* Sort parameters - qsort_r() is only available under BSD so we
248 * have to take this state global, in order to pass it to sortCompare() */
249 int sort_desc;
250 int sort_alpha;
251 int sort_bypattern;
252};
253
254typedef void redisCommandProc(redisClient *c);
255struct redisCommand {
256 char *name;
257 redisCommandProc *proc;
258 int arity;
259 int flags;
260};
261
262typedef struct _redisSortObject {
263 robj *obj;
264 union {
265 double score;
266 robj *cmpobj;
267 } u;
268} redisSortObject;
269
270typedef struct _redisSortOperation {
271 int type;
272 robj *pattern;
273} redisSortOperation;
274
275struct sharedObjectsStruct {
276 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
277 *colon, *nullbulk, *nullmultibulk,
278 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
279 *outofrangeerr, *plus,
280 *select0, *select1, *select2, *select3, *select4,
281 *select5, *select6, *select7, *select8, *select9;
282} shared;
283
284/*================================ Prototypes =============================== */
285
286static void freeStringObject(robj *o);
287static void freeListObject(robj *o);
288static void freeSetObject(robj *o);
289static void decrRefCount(void *o);
290static robj *createObject(int type, void *ptr);
291static void freeClient(redisClient *c);
292static int rdbLoad(char *filename);
293static void addReply(redisClient *c, robj *obj);
294static void addReplySds(redisClient *c, sds s);
295static void incrRefCount(robj *o);
296static int rdbSaveBackground(char *filename);
297static robj *createStringObject(char *ptr, size_t len);
298static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
299static int syncWithMaster(void);
300static robj *tryObjectSharing(robj *o);
301static int removeExpire(redisDb *db, robj *key);
302static int expireIfNeeded(redisDb *db, robj *key);
303static int deleteIfVolatile(redisDb *db, robj *key);
304static int deleteKey(redisDb *db, robj *key);
305static time_t getExpire(redisDb *db, robj *key);
306static int setExpire(redisDb *db, robj *key, time_t when);
307static void updateSalvesWaitingBgsave(int bgsaveerr);
308
309static void authCommand(redisClient *c);
310static void pingCommand(redisClient *c);
311static void echoCommand(redisClient *c);
312static void setCommand(redisClient *c);
313static void setnxCommand(redisClient *c);
314static void getCommand(redisClient *c);
315static void delCommand(redisClient *c);
316static void existsCommand(redisClient *c);
317static void incrCommand(redisClient *c);
318static void decrCommand(redisClient *c);
319static void incrbyCommand(redisClient *c);
320static void decrbyCommand(redisClient *c);
321static void selectCommand(redisClient *c);
322static void randomkeyCommand(redisClient *c);
323static void keysCommand(redisClient *c);
324static void dbsizeCommand(redisClient *c);
325static void lastsaveCommand(redisClient *c);
326static void saveCommand(redisClient *c);
327static void bgsaveCommand(redisClient *c);
328static void shutdownCommand(redisClient *c);
329static void moveCommand(redisClient *c);
330static void renameCommand(redisClient *c);
331static void renamenxCommand(redisClient *c);
332static void lpushCommand(redisClient *c);
333static void rpushCommand(redisClient *c);
334static void lpopCommand(redisClient *c);
335static void rpopCommand(redisClient *c);
336static void llenCommand(redisClient *c);
337static void lindexCommand(redisClient *c);
338static void lrangeCommand(redisClient *c);
339static void ltrimCommand(redisClient *c);
340static void typeCommand(redisClient *c);
341static void lsetCommand(redisClient *c);
342static void saddCommand(redisClient *c);
343static void sremCommand(redisClient *c);
344static void sismemberCommand(redisClient *c);
345static void scardCommand(redisClient *c);
346static void sinterCommand(redisClient *c);
347static void sinterstoreCommand(redisClient *c);
348static void sunionCommand(redisClient *c);
349static void sunionstoreCommand(redisClient *c);
350static void syncCommand(redisClient *c);
351static void flushdbCommand(redisClient *c);
352static void flushallCommand(redisClient *c);
353static void sortCommand(redisClient *c);
354static void lremCommand(redisClient *c);
355static void infoCommand(redisClient *c);
356static void mgetCommand(redisClient *c);
357static void monitorCommand(redisClient *c);
358static void expireCommand(redisClient *c);
359
360/*================================= Globals ================================= */
361
362/* Global vars */
363static struct redisServer server; /* server global state */
364static struct redisCommand cmdTable[] = {
365 {"get",getCommand,2,REDIS_CMD_INLINE},
366 {"set",setCommand,3,REDIS_CMD_BULK},
367 {"setnx",setnxCommand,3,REDIS_CMD_BULK},
368 {"del",delCommand,2,REDIS_CMD_INLINE},
369 {"exists",existsCommand,2,REDIS_CMD_INLINE},
370 {"incr",incrCommand,2,REDIS_CMD_INLINE},
371 {"decr",decrCommand,2,REDIS_CMD_INLINE},
372 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
373 {"rpush",rpushCommand,3,REDIS_CMD_BULK},
374 {"lpush",lpushCommand,3,REDIS_CMD_BULK},
375 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
376 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
377 {"llen",llenCommand,2,REDIS_CMD_INLINE},
378 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
379 {"lset",lsetCommand,4,REDIS_CMD_BULK},
380 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
381 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
382 {"lrem",lremCommand,4,REDIS_CMD_BULK},
383 {"sadd",saddCommand,3,REDIS_CMD_BULK},
384 {"srem",sremCommand,3,REDIS_CMD_BULK},
385 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
386 {"scard",scardCommand,2,REDIS_CMD_INLINE},
387 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE},
388 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE},
389 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE},
390 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE},
391 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
392 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE},
393 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE},
394 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
395 {"select",selectCommand,2,REDIS_CMD_INLINE},
396 {"move",moveCommand,3,REDIS_CMD_INLINE},
397 {"rename",renameCommand,3,REDIS_CMD_INLINE},
398 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
399 {"keys",keysCommand,2,REDIS_CMD_INLINE},
400 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
401 {"auth",authCommand,2,REDIS_CMD_INLINE},
402 {"ping",pingCommand,1,REDIS_CMD_INLINE},
403 {"echo",echoCommand,2,REDIS_CMD_BULK},
404 {"save",saveCommand,1,REDIS_CMD_INLINE},
405 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
406 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
407 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
408 {"type",typeCommand,2,REDIS_CMD_INLINE},
409 {"sync",syncCommand,1,REDIS_CMD_INLINE},
410 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
411 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
412 {"sort",sortCommand,-2,REDIS_CMD_INLINE},
413 {"info",infoCommand,1,REDIS_CMD_INLINE},
414 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
415 {"expire",expireCommand,3,REDIS_CMD_INLINE},
416 {NULL,NULL,0,0}
417};
418
419/*============================ Utility functions ============================ */
420
421/* Glob-style pattern matching. */
422int stringmatchlen(const char *pattern, int patternLen,
423 const char *string, int stringLen, int nocase)
424{
425 while(patternLen) {
426 switch(pattern[0]) {
427 case '*':
428 while (pattern[1] == '*') {
429 pattern++;
430 patternLen--;
431 }
432 if (patternLen == 1)
433 return 1; /* match */
434 while(stringLen) {
435 if (stringmatchlen(pattern+1, patternLen-1,
436 string, stringLen, nocase))
437 return 1; /* match */
438 string++;
439 stringLen--;
440 }
441 return 0; /* no match */
442 break;
443 case '?':
444 if (stringLen == 0)
445 return 0; /* no match */
446 string++;
447 stringLen--;
448 break;
449 case '[':
450 {
451 int not, match;
452
453 pattern++;
454 patternLen--;
455 not = pattern[0] == '^';
456 if (not) {
457 pattern++;
458 patternLen--;
459 }
460 match = 0;
461 while(1) {
462 if (pattern[0] == '\\') {
463 pattern++;
464 patternLen--;
465 if (pattern[0] == string[0])
466 match = 1;
467 } else if (pattern[0] == ']') {
468 break;
469 } else if (patternLen == 0) {
470 pattern--;
471 patternLen++;
472 break;
473 } else if (pattern[1] == '-' && patternLen >= 3) {
474 int start = pattern[0];
475 int end = pattern[2];
476 int c = string[0];
477 if (start > end) {
478 int t = start;
479 start = end;
480 end = t;
481 }
482 if (nocase) {
483 start = tolower(start);
484 end = tolower(end);
485 c = tolower(c);
486 }
487 pattern += 2;
488 patternLen -= 2;
489 if (c >= start && c <= end)
490 match = 1;
491 } else {
492 if (!nocase) {
493 if (pattern[0] == string[0])
494 match = 1;
495 } else {
496 if (tolower((int)pattern[0]) == tolower((int)string[0]))
497 match = 1;
498 }
499 }
500 pattern++;
501 patternLen--;
502 }
503 if (not)
504 match = !match;
505 if (!match)
506 return 0; /* no match */
507 string++;
508 stringLen--;
509 break;
510 }
511 case '\\':
512 if (patternLen >= 2) {
513 pattern++;
514 patternLen--;
515 }
516 /* fall through */
517 default:
518 if (!nocase) {
519 if (pattern[0] != string[0])
520 return 0; /* no match */
521 } else {
522 if (tolower((int)pattern[0]) != tolower((int)string[0]))
523 return 0; /* no match */
524 }
525 string++;
526 stringLen--;
527 break;
528 }
529 pattern++;
530 patternLen--;
531 if (stringLen == 0) {
532 while(*pattern == '*') {
533 pattern++;
534 patternLen--;
535 }
536 break;
537 }
538 }
539 if (patternLen == 0 && stringLen == 0)
540 return 1;
541 return 0;
542}
543
544void redisLog(int level, const char *fmt, ...)
545{
546 va_list ap;
547 FILE *fp;
548
549 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
550 if (!fp) return;
551
552 va_start(ap, fmt);
553 if (level >= server.verbosity) {
554 char *c = ".-*";
555 fprintf(fp,"%c ",c[level]);
556 vfprintf(fp, fmt, ap);
557 fprintf(fp,"\n");
558 fflush(fp);
559 }
560 va_end(ap);
561
562 if (server.logfile) fclose(fp);
563}
564
565/*====================== Hash table type implementation ==================== */
566
567/* This is an hash table type that uses the SDS dynamic strings libary as
568 * keys and radis objects as values (objects can hold SDS strings,
569 * lists, sets). */
570
571static int sdsDictKeyCompare(void *privdata, const void *key1,
572 const void *key2)
573{
574 int l1,l2;
575 DICT_NOTUSED(privdata);
576
577 l1 = sdslen((sds)key1);
578 l2 = sdslen((sds)key2);
579 if (l1 != l2) return 0;
580 return memcmp(key1, key2, l1) == 0;
581}
582
583static void dictRedisObjectDestructor(void *privdata, void *val)
584{
585 DICT_NOTUSED(privdata);
586
587 decrRefCount(val);
588}
589
590static int dictSdsKeyCompare(void *privdata, const void *key1,
591 const void *key2)
592{
593 const robj *o1 = key1, *o2 = key2;
594 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
595}
596
597static unsigned int dictSdsHash(const void *key) {
598 const robj *o = key;
599 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
600}
601
602static dictType setDictType = {
603 dictSdsHash, /* hash function */
604 NULL, /* key dup */
605 NULL, /* val dup */
606 dictSdsKeyCompare, /* key compare */
607 dictRedisObjectDestructor, /* key destructor */
608 NULL /* val destructor */
609};
610
611static dictType hashDictType = {
612 dictSdsHash, /* hash function */
613 NULL, /* key dup */
614 NULL, /* val dup */
615 dictSdsKeyCompare, /* key compare */
616 dictRedisObjectDestructor, /* key destructor */
617 dictRedisObjectDestructor /* val destructor */
618};
619
620/* ========================= Random utility functions ======================= */
621
622/* Redis generally does not try to recover from out of memory conditions
623 * when allocating objects or strings, it is not clear if it will be possible
624 * to report this condition to the client since the networking layer itself
625 * is based on heap allocation for send buffers, so we simply abort.
626 * At least the code will be simpler to read... */
627static void oom(const char *msg) {
628 fprintf(stderr, "%s: Out of memory\n",msg);
629 fflush(stderr);
630 sleep(1);
631 abort();
632}
633
634/* ====================== Redis server networking stuff ===================== */
635void closeTimedoutClients(void) {
636 redisClient *c;
637 listNode *ln;
638 time_t now = time(NULL);
639
640 listRewind(server.clients);
641 while ((ln = listYield(server.clients)) != NULL) {
642 c = listNodeValue(ln);
643 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
644 (now - c->lastinteraction > server.maxidletime)) {
645 redisLog(REDIS_DEBUG,"Closing idle client");
646 freeClient(c);
647 }
648 }
649}
650
651int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
652 int j, loops = server.cronloops++;
653 REDIS_NOTUSED(eventLoop);
654 REDIS_NOTUSED(id);
655 REDIS_NOTUSED(clientData);
656
657 /* Update the global state with the amount of used memory */
658 server.usedmemory = zmalloc_used_memory();
659
660 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
661 * we resize the hash table to save memory */
662 for (j = 0; j < server.dbnum; j++) {
663 int size, used, vkeys;
664
665 size = dictSlots(server.db[j].dict);
666 used = dictSize(server.db[j].dict);
667 vkeys = dictSize(server.db[j].expires);
668 if (!(loops % 5) && used > 0) {
669 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
670 /* dictPrintStats(server.dict); */
671 }
672 if (size && used && size > REDIS_HT_MINSLOTS &&
673 (used*100/size < REDIS_HT_MINFILL)) {
674 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
675 dictResize(server.db[j].dict);
676 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
677 }
678 }
679
680 /* Show information about connected clients */
681 if (!(loops % 5)) {
682 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
683 listLength(server.clients)-listLength(server.slaves),
684 listLength(server.slaves),
685 server.usedmemory,
686 dictSize(server.sharingpool));
687 }
688
689 /* Close connections of timedout clients */
690 if (!(loops % 10))
691 closeTimedoutClients();
692
693 /* Check if a background saving in progress terminated */
694 if (server.bgsaveinprogress) {
695 int statloc;
696 /* XXX: TODO handle the case of the saving child killed */
697 if (wait4(-1,&statloc,WNOHANG,NULL)) {
698 int exitcode = WEXITSTATUS(statloc);
699 if (exitcode == 0) {
700 redisLog(REDIS_NOTICE,
701 "Background saving terminated with success");
702 server.dirty = 0;
703 server.lastsave = time(NULL);
704 } else {
705 redisLog(REDIS_WARNING,
706 "Background saving error");
707 }
708 server.bgsaveinprogress = 0;
709 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
710 }
711 } else {
712 /* If there is not a background saving in progress check if
713 * we have to save now */
714 time_t now = time(NULL);
715 for (j = 0; j < server.saveparamslen; j++) {
716 struct saveparam *sp = server.saveparams+j;
717
718 if (server.dirty >= sp->changes &&
719 now-server.lastsave > sp->seconds) {
720 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
721 sp->changes, sp->seconds);
722 rdbSaveBackground(server.dbfilename);
723 break;
724 }
725 }
726 }
727
728 /* Try to expire a few timed out keys */
729 for (j = 0; j < server.dbnum; j++) {
730 redisDb *db = server.db+j;
731 int num = dictSize(db->expires);
732
733 if (num) {
734 time_t now = time(NULL);
735
736 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
737 num = REDIS_EXPIRELOOKUPS_PER_CRON;
738 while (num--) {
739 dictEntry *de;
740 time_t t;
741
742 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
743 t = (time_t) dictGetEntryVal(de);
744 if (now > t) {
745 deleteKey(db,dictGetEntryKey(de));
746 }
747 }
748 }
749 }
750
751 /* Check if we should connect to a MASTER */
752 if (server.replstate == REDIS_REPL_CONNECT) {
753 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
754 if (syncWithMaster() == REDIS_OK) {
755 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
756 }
757 }
758 return 1000;
759}
760
761static void createSharedObjects(void) {
762 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
763 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
764 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
765 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
766 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
767 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
768 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
769 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
770 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
771 /* no such key */
772 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
773 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
774 "-ERR Operation against a key holding the wrong kind of value\r\n"));
775 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
776 "-ERR no such key\r\n"));
777 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
778 "-ERR syntax error\r\n"));
779 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
780 "-ERR source and destination objects are the same\r\n"));
781 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
782 "-ERR index out of range\r\n"));
783 shared.space = createObject(REDIS_STRING,sdsnew(" "));
784 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
785 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
786 shared.select0 = createStringObject("select 0\r\n",10);
787 shared.select1 = createStringObject("select 1\r\n",10);
788 shared.select2 = createStringObject("select 2\r\n",10);
789 shared.select3 = createStringObject("select 3\r\n",10);
790 shared.select4 = createStringObject("select 4\r\n",10);
791 shared.select5 = createStringObject("select 5\r\n",10);
792 shared.select6 = createStringObject("select 6\r\n",10);
793 shared.select7 = createStringObject("select 7\r\n",10);
794 shared.select8 = createStringObject("select 8\r\n",10);
795 shared.select9 = createStringObject("select 9\r\n",10);
796}
797
798static void appendServerSaveParams(time_t seconds, int changes) {
799 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
800 if (server.saveparams == NULL) oom("appendServerSaveParams");
801 server.saveparams[server.saveparamslen].seconds = seconds;
802 server.saveparams[server.saveparamslen].changes = changes;
803 server.saveparamslen++;
804}
805
806static void ResetServerSaveParams() {
807 zfree(server.saveparams);
808 server.saveparams = NULL;
809 server.saveparamslen = 0;
810}
811
812static void initServerConfig() {
813 server.dbnum = REDIS_DEFAULT_DBNUM;
814 server.port = REDIS_SERVERPORT;
815 server.verbosity = REDIS_DEBUG;
816 server.maxidletime = REDIS_MAXIDLETIME;
817 server.saveparams = NULL;
818 server.logfile = NULL; /* NULL = log on standard output */
819 server.bindaddr = NULL;
820 server.glueoutputbuf = 1;
821 server.daemonize = 0;
822 server.pidfile = "/var/run/redis.pid";
823 server.dbfilename = "dump.rdb";
824 server.requirepass = NULL;
825 server.shareobjects = 0;
826 ResetServerSaveParams();
827
828 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
829 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
830 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
831 /* Replication related */
832 server.isslave = 0;
833 server.masterhost = NULL;
834 server.masterport = 6379;
835 server.master = NULL;
836 server.replstate = REDIS_REPL_NONE;
837}
838
839static void initServer() {
840 int j;
841
842 signal(SIGHUP, SIG_IGN);
843 signal(SIGPIPE, SIG_IGN);
844
845 server.clients = listCreate();
846 server.slaves = listCreate();
847 server.monitors = listCreate();
848 server.objfreelist = listCreate();
849 createSharedObjects();
850 server.el = aeCreateEventLoop();
851 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
852 server.sharingpool = dictCreate(&setDictType,NULL);
853 server.sharingpoolsize = 1024;
854 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
855 oom("server initialization"); /* Fatal OOM */
856 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
857 if (server.fd == -1) {
858 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
859 exit(1);
860 }
861 for (j = 0; j < server.dbnum; j++) {
862 server.db[j].dict = dictCreate(&hashDictType,NULL);
863 server.db[j].expires = dictCreate(&setDictType,NULL);
864 server.db[j].id = j;
865 }
866 server.cronloops = 0;
867 server.bgsaveinprogress = 0;
868 server.lastsave = time(NULL);
869 server.dirty = 0;
870 server.usedmemory = 0;
871 server.stat_numcommands = 0;
872 server.stat_numconnections = 0;
873 server.stat_starttime = time(NULL);
874 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
875}
876
877/* Empty the whole database */
878static void emptyDb() {
879 int j;
880
881 for (j = 0; j < server.dbnum; j++) {
882 dictEmpty(server.db[j].dict);
883 dictEmpty(server.db[j].expires);
884 }
885}
886
887/* I agree, this is a very rudimental way to load a configuration...
888 will improve later if the config gets more complex */
889static void loadServerConfig(char *filename) {
890 FILE *fp = fopen(filename,"r");
891 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
892 int linenum = 0;
893 sds line = NULL;
894
895 if (!fp) {
896 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
897 exit(1);
898 }
899 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
900 sds *argv;
901 int argc, j;
902
903 linenum++;
904 line = sdsnew(buf);
905 line = sdstrim(line," \t\r\n");
906
907 /* Skip comments and blank lines*/
908 if (line[0] == '#' || line[0] == '\0') {
909 sdsfree(line);
910 continue;
911 }
912
913 /* Split into arguments */
914 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
915 sdstolower(argv[0]);
916
917 /* Execute config directives */
918 if (!strcmp(argv[0],"timeout") && argc == 2) {
919 server.maxidletime = atoi(argv[1]);
920 if (server.maxidletime < 1) {
921 err = "Invalid timeout value"; goto loaderr;
922 }
923 } else if (!strcmp(argv[0],"port") && argc == 2) {
924 server.port = atoi(argv[1]);
925 if (server.port < 1 || server.port > 65535) {
926 err = "Invalid port"; goto loaderr;
927 }
928 } else if (!strcmp(argv[0],"bind") && argc == 2) {
929 server.bindaddr = zstrdup(argv[1]);
930 } else if (!strcmp(argv[0],"save") && argc == 3) {
931 int seconds = atoi(argv[1]);
932 int changes = atoi(argv[2]);
933 if (seconds < 1 || changes < 0) {
934 err = "Invalid save parameters"; goto loaderr;
935 }
936 appendServerSaveParams(seconds,changes);
937 } else if (!strcmp(argv[0],"dir") && argc == 2) {
938 if (chdir(argv[1]) == -1) {
939 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
940 argv[1], strerror(errno));
941 exit(1);
942 }
943 } else if (!strcmp(argv[0],"loglevel") && argc == 2) {
944 if (!strcmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
945 else if (!strcmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
946 else if (!strcmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
947 else {
948 err = "Invalid log level. Must be one of debug, notice, warning";
949 goto loaderr;
950 }
951 } else if (!strcmp(argv[0],"logfile") && argc == 2) {
952 FILE *fp;
953
954 server.logfile = zstrdup(argv[1]);
955 if (!strcmp(server.logfile,"stdout")) {
956 zfree(server.logfile);
957 server.logfile = NULL;
958 }
959 if (server.logfile) {
960 /* Test if we are able to open the file. The server will not
961 * be able to abort just for this problem later... */
962 fp = fopen(server.logfile,"a");
963 if (fp == NULL) {
964 err = sdscatprintf(sdsempty(),
965 "Can't open the log file: %s", strerror(errno));
966 goto loaderr;
967 }
968 fclose(fp);
969 }
970 } else if (!strcmp(argv[0],"databases") && argc == 2) {
971 server.dbnum = atoi(argv[1]);
972 if (server.dbnum < 1) {
973 err = "Invalid number of databases"; goto loaderr;
974 }
975 } else if (!strcmp(argv[0],"slaveof") && argc == 3) {
976 server.masterhost = sdsnew(argv[1]);
977 server.masterport = atoi(argv[2]);
978 server.replstate = REDIS_REPL_CONNECT;
979 } else if (!strcmp(argv[0],"glueoutputbuf") && argc == 2) {
980 sdstolower(argv[1]);
981 if (!strcmp(argv[1],"yes")) server.glueoutputbuf = 1;
982 else if (!strcmp(argv[1],"no")) server.glueoutputbuf = 0;
983 else {
984 err = "argument must be 'yes' or 'no'"; goto loaderr;
985 }
986 } else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
987 sdstolower(argv[1]);
988 if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
989 else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
990 else {
991 err = "argument must be 'yes' or 'no'"; goto loaderr;
992 }
993 } else if (!strcmp(argv[0],"daemonize") && argc == 2) {
994 sdstolower(argv[1]);
995 if (!strcmp(argv[1],"yes")) server.daemonize = 1;
996 else if (!strcmp(argv[1],"no")) server.daemonize = 0;
997 else {
998 err = "argument must be 'yes' or 'no'"; goto loaderr;
999 }
1000 } else if (!strcmp(argv[0],"requirepass") && argc == 2) {
1001 server.requirepass = zstrdup(argv[1]);
1002 } else if (!strcmp(argv[0],"pidfile") && argc == 2) {
1003 server.pidfile = zstrdup(argv[1]);
1004 } else {
1005 err = "Bad directive or wrong number of arguments"; goto loaderr;
1006 }
1007 for (j = 0; j < argc; j++)
1008 sdsfree(argv[j]);
1009 zfree(argv);
1010 sdsfree(line);
1011 }
1012 fclose(fp);
1013 return;
1014
1015loaderr:
1016 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1017 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1018 fprintf(stderr, ">>> '%s'\n", line);
1019 fprintf(stderr, "%s\n", err);
1020 exit(1);
1021}
1022
1023static void freeClientArgv(redisClient *c) {
1024 int j;
1025
1026 for (j = 0; j < c->argc; j++)
1027 decrRefCount(c->argv[j]);
1028 c->argc = 0;
1029}
1030
1031static void freeClient(redisClient *c) {
1032 listNode *ln;
1033
1034 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1035 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1036 sdsfree(c->querybuf);
1037 listRelease(c->reply);
1038 freeClientArgv(c);
1039 close(c->fd);
1040 ln = listSearchKey(server.clients,c);
1041 assert(ln != NULL);
1042 listDelNode(server.clients,ln);
1043 if (c->flags & REDIS_SLAVE) {
1044 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1045 close(c->repldbfd);
1046 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1047 ln = listSearchKey(l,c);
1048 assert(ln != NULL);
1049 listDelNode(l,ln);
1050 }
1051 if (c->flags & REDIS_MASTER) {
1052 server.master = NULL;
1053 server.replstate = REDIS_REPL_CONNECT;
1054 }
1055 zfree(c);
1056}
1057
1058static void glueReplyBuffersIfNeeded(redisClient *c) {
1059 int totlen = 0;
1060 listNode *ln;
1061 robj *o;
1062
1063 listRewind(c->reply);
1064 while((ln = listYield(c->reply))) {
1065 o = ln->value;
1066 totlen += sdslen(o->ptr);
1067 /* This optimization makes more sense if we don't have to copy
1068 * too much data */
1069 if (totlen > 1024) return;
1070 }
1071 if (totlen > 0) {
1072 char buf[1024];
1073 int copylen = 0;
1074
1075 listRewind(c->reply);
1076 while((ln = listYield(c->reply))) {
1077 o = ln->value;
1078 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1079 copylen += sdslen(o->ptr);
1080 listDelNode(c->reply,ln);
1081 }
1082 /* Now the output buffer is empty, add the new single element */
1083 addReplySds(c,sdsnewlen(buf,totlen));
1084 }
1085}
1086
1087static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1088 redisClient *c = privdata;
1089 int nwritten = 0, totwritten = 0, objlen;
1090 robj *o;
1091 REDIS_NOTUSED(el);
1092 REDIS_NOTUSED(mask);
1093
1094 if (server.glueoutputbuf && listLength(c->reply) > 1)
1095 glueReplyBuffersIfNeeded(c);
1096 while(listLength(c->reply)) {
1097 o = listNodeValue(listFirst(c->reply));
1098 objlen = sdslen(o->ptr);
1099
1100 if (objlen == 0) {
1101 listDelNode(c->reply,listFirst(c->reply));
1102 continue;
1103 }
1104
1105 if (c->flags & REDIS_MASTER) {
1106 nwritten = objlen - c->sentlen;
1107 } else {
1108 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1109 if (nwritten <= 0) break;
1110 }
1111 c->sentlen += nwritten;
1112 totwritten += nwritten;
1113 /* If we fully sent the object on head go to the next one */
1114 if (c->sentlen == objlen) {
1115 listDelNode(c->reply,listFirst(c->reply));
1116 c->sentlen = 0;
1117 }
1118 }
1119 if (nwritten == -1) {
1120 if (errno == EAGAIN) {
1121 nwritten = 0;
1122 } else {
1123 redisLog(REDIS_DEBUG,
1124 "Error writing to client: %s", strerror(errno));
1125 freeClient(c);
1126 return;
1127 }
1128 }
1129 if (totwritten > 0) c->lastinteraction = time(NULL);
1130 if (listLength(c->reply) == 0) {
1131 c->sentlen = 0;
1132 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1133 }
1134}
1135
1136static struct redisCommand *lookupCommand(char *name) {
1137 int j = 0;
1138 while(cmdTable[j].name != NULL) {
1139 if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j];
1140 j++;
1141 }
1142 return NULL;
1143}
1144
1145/* resetClient prepare the client to process the next command */
1146static void resetClient(redisClient *c) {
1147 freeClientArgv(c);
1148 c->bulklen = -1;
1149}
1150
1151/* If this function gets called we already read a whole
1152 * command, argments are in the client argv/argc fields.
1153 * processCommand() execute the command or prepare the
1154 * server for a bulk read from the client.
1155 *
1156 * If 1 is returned the client is still alive and valid and
1157 * and other operations can be performed by the caller. Otherwise
1158 * if 0 is returned the client was destroied (i.e. after QUIT). */
1159static int processCommand(redisClient *c) {
1160 struct redisCommand *cmd;
1161 long long dirty;
1162
1163 sdstolower(c->argv[0]->ptr);
1164 /* The QUIT command is handled as a special case. Normal command
1165 * procs are unable to close the client connection safely */
1166 if (!strcmp(c->argv[0]->ptr,"quit")) {
1167 freeClient(c);
1168 return 0;
1169 }
1170 cmd = lookupCommand(c->argv[0]->ptr);
1171 if (!cmd) {
1172 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1173 resetClient(c);
1174 return 1;
1175 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1176 (c->argc < -cmd->arity)) {
1177 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1178 resetClient(c);
1179 return 1;
1180 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1181 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1182
1183 decrRefCount(c->argv[c->argc-1]);
1184 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1185 c->argc--;
1186 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1187 resetClient(c);
1188 return 1;
1189 }
1190 c->argc--;
1191 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1192 /* It is possible that the bulk read is already in the
1193 * buffer. Check this condition and handle it accordingly */
1194 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1195 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1196 c->argc++;
1197 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1198 } else {
1199 return 1;
1200 }
1201 }
1202 /* Let's try to share objects on the command arguments vector */
1203 if (server.shareobjects) {
1204 int j;
1205 for(j = 1; j < c->argc; j++)
1206 c->argv[j] = tryObjectSharing(c->argv[j]);
1207 }
1208 /* Check if the user is authenticated */
1209 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1210 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1211 resetClient(c);
1212 return 1;
1213 }
1214
1215 /* Exec the command */
1216 dirty = server.dirty;
1217 cmd->proc(c);
1218 if (server.dirty-dirty != 0 && listLength(server.slaves))
1219 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1220 if (listLength(server.monitors))
1221 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1222 server.stat_numcommands++;
1223
1224 /* Prepare the client for the next command */
1225 if (c->flags & REDIS_CLOSE) {
1226 freeClient(c);
1227 return 0;
1228 }
1229 resetClient(c);
1230 return 1;
1231}
1232
1233static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1234 listNode *ln;
1235 robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
1236 int outc = 0, j;
1237
1238 for (j = 0; j < argc; j++) {
1239 if (j != 0) outv[outc++] = shared.space;
1240 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1241 robj *lenobj;
1242
1243 lenobj = createObject(REDIS_STRING,
1244 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1245 lenobj->refcount = 0;
1246 outv[outc++] = lenobj;
1247 }
1248 outv[outc++] = argv[j];
1249 }
1250 outv[outc++] = shared.crlf;
1251
1252 /* Increment all the refcounts at start and decrement at end in order to
1253 * be sure to free objects if there is no slave in a replication state
1254 * able to be feed with commands */
1255 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1256 listRewind(slaves);
1257 while((ln = listYield(slaves))) {
1258 redisClient *slave = ln->value;
1259
1260 /* Don't feed slaves that are still waiting for BGSAVE to start */
1261 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1262
1263 /* Feed all the other slaves, MONITORs and so on */
1264 if (slave->slaveseldb != dictid) {
1265 robj *selectcmd;
1266
1267 switch(dictid) {
1268 case 0: selectcmd = shared.select0; break;
1269 case 1: selectcmd = shared.select1; break;
1270 case 2: selectcmd = shared.select2; break;
1271 case 3: selectcmd = shared.select3; break;
1272 case 4: selectcmd = shared.select4; break;
1273 case 5: selectcmd = shared.select5; break;
1274 case 6: selectcmd = shared.select6; break;
1275 case 7: selectcmd = shared.select7; break;
1276 case 8: selectcmd = shared.select8; break;
1277 case 9: selectcmd = shared.select9; break;
1278 default:
1279 selectcmd = createObject(REDIS_STRING,
1280 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1281 selectcmd->refcount = 0;
1282 break;
1283 }
1284 addReply(slave,selectcmd);
1285 slave->slaveseldb = dictid;
1286 }
1287 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1288 }
1289 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1290}
1291
1292static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1293 redisClient *c = (redisClient*) privdata;
1294 char buf[REDIS_IOBUF_LEN];
1295 int nread;
1296 REDIS_NOTUSED(el);
1297 REDIS_NOTUSED(mask);
1298
1299 nread = read(fd, buf, REDIS_IOBUF_LEN);
1300 if (nread == -1) {
1301 if (errno == EAGAIN) {
1302 nread = 0;
1303 } else {
1304 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1305 freeClient(c);
1306 return;
1307 }
1308 } else if (nread == 0) {
1309 redisLog(REDIS_DEBUG, "Client closed connection");
1310 freeClient(c);
1311 return;
1312 }
1313 if (nread) {
1314 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1315 c->lastinteraction = time(NULL);
1316 } else {
1317 return;
1318 }
1319
1320again:
1321 if (c->bulklen == -1) {
1322 /* Read the first line of the query */
1323 char *p = strchr(c->querybuf,'\n');
1324 size_t querylen;
1325 if (p) {
1326 sds query, *argv;
1327 int argc, j;
1328
1329 query = c->querybuf;
1330 c->querybuf = sdsempty();
1331 querylen = 1+(p-(query));
1332 if (sdslen(query) > querylen) {
1333 /* leave data after the first line of the query in the buffer */
1334 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1335 }
1336 *p = '\0'; /* remove "\n" */
1337 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1338 sdsupdatelen(query);
1339
1340 /* Now we can split the query in arguments */
1341 if (sdslen(query) == 0) {
1342 /* Ignore empty query */
1343 sdsfree(query);
1344 return;
1345 }
1346 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1347 sdsfree(query);
1348 if (argv == NULL) oom("sdssplitlen");
1349 for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {
1350 if (sdslen(argv[j])) {
1351 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1352 c->argc++;
1353 } else {
1354 sdsfree(argv[j]);
1355 }
1356 }
1357 zfree(argv);
1358 /* Execute the command. If the client is still valid
1359 * after processCommand() return and there is something
1360 * on the query buffer try to process the next command. */
1361 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1362 return;
1363 } else if (sdslen(c->querybuf) >= 1024) {
1364 redisLog(REDIS_DEBUG, "Client protocol error");
1365 freeClient(c);
1366 return;
1367 }
1368 } else {
1369 /* Bulk read handling. Note that if we are at this point
1370 the client already sent a command terminated with a newline,
1371 we are reading the bulk data that is actually the last
1372 argument of the command. */
1373 int qbl = sdslen(c->querybuf);
1374
1375 if (c->bulklen <= qbl) {
1376 /* Copy everything but the final CRLF as final argument */
1377 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1378 c->argc++;
1379 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1380 processCommand(c);
1381 return;
1382 }
1383 }
1384}
1385
1386static int selectDb(redisClient *c, int id) {
1387 if (id < 0 || id >= server.dbnum)
1388 return REDIS_ERR;
1389 c->db = &server.db[id];
1390 return REDIS_OK;
1391}
1392
1393static void *dupClientReplyValue(void *o) {
1394 incrRefCount((robj*)o);
1395 return 0;
1396}
1397
1398static redisClient *createClient(int fd) {
1399 redisClient *c = zmalloc(sizeof(*c));
1400
1401 anetNonBlock(NULL,fd);
1402 anetTcpNoDelay(NULL,fd);
1403 if (!c) return NULL;
1404 selectDb(c,0);
1405 c->fd = fd;
1406 c->querybuf = sdsempty();
1407 c->argc = 0;
1408 c->bulklen = -1;
1409 c->sentlen = 0;
1410 c->flags = 0;
1411 c->lastinteraction = time(NULL);
1412 c->authenticated = 0;
1413 c->replstate = REDIS_REPL_NONE;
1414 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1415 listSetFreeMethod(c->reply,decrRefCount);
1416 listSetDupMethod(c->reply,dupClientReplyValue);
1417 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1418 readQueryFromClient, c, NULL) == AE_ERR) {
1419 freeClient(c);
1420 return NULL;
1421 }
1422 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1423 return c;
1424}
1425
1426static void addReply(redisClient *c, robj *obj) {
1427 if (listLength(c->reply) == 0 &&
1428 (c->replstate == REDIS_REPL_NONE ||
1429 c->replstate == REDIS_REPL_ONLINE) &&
1430 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1431 sendReplyToClient, c, NULL) == AE_ERR) return;
1432 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1433 incrRefCount(obj);
1434}
1435
1436static void addReplySds(redisClient *c, sds s) {
1437 robj *o = createObject(REDIS_STRING,s);
1438 addReply(c,o);
1439 decrRefCount(o);
1440}
1441
1442static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1443 int cport, cfd;
1444 char cip[128];
1445 REDIS_NOTUSED(el);
1446 REDIS_NOTUSED(mask);
1447 REDIS_NOTUSED(privdata);
1448
1449 cfd = anetAccept(server.neterr, fd, cip, &cport);
1450 if (cfd == AE_ERR) {
1451 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1452 return;
1453 }
1454 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1455 if (createClient(cfd) == NULL) {
1456 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1457 close(cfd); /* May be already closed, just ingore errors */
1458 return;
1459 }
1460 server.stat_numconnections++;
1461}
1462
1463/* ======================= Redis objects implementation ===================== */
1464
1465static robj *createObject(int type, void *ptr) {
1466 robj *o;
1467
1468 if (listLength(server.objfreelist)) {
1469 listNode *head = listFirst(server.objfreelist);
1470 o = listNodeValue(head);
1471 listDelNode(server.objfreelist,head);
1472 } else {
1473 o = zmalloc(sizeof(*o));
1474 }
1475 if (!o) oom("createObject");
1476 o->type = type;
1477 o->ptr = ptr;
1478 o->refcount = 1;
1479 return o;
1480}
1481
1482static robj *createStringObject(char *ptr, size_t len) {
1483 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1484}
1485
1486static robj *createListObject(void) {
1487 list *l = listCreate();
1488
1489 if (!l) oom("listCreate");
1490 listSetFreeMethod(l,decrRefCount);
1491 return createObject(REDIS_LIST,l);
1492}
1493
1494static robj *createSetObject(void) {
1495 dict *d = dictCreate(&setDictType,NULL);
1496 if (!d) oom("dictCreate");
1497 return createObject(REDIS_SET,d);
1498}
1499
1500static void freeStringObject(robj *o) {
1501 sdsfree(o->ptr);
1502}
1503
1504static void freeListObject(robj *o) {
1505 listRelease((list*) o->ptr);
1506}
1507
1508static void freeSetObject(robj *o) {
1509 dictRelease((dict*) o->ptr);
1510}
1511
1512static void freeHashObject(robj *o) {
1513 dictRelease((dict*) o->ptr);
1514}
1515
1516static void incrRefCount(robj *o) {
1517 o->refcount++;
1518#ifdef DEBUG_REFCOUNT
1519 if (o->type == REDIS_STRING)
1520 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1521#endif
1522}
1523
1524static void decrRefCount(void *obj) {
1525 robj *o = obj;
1526
1527#ifdef DEBUG_REFCOUNT
1528 if (o->type == REDIS_STRING)
1529 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1530#endif
1531 if (--(o->refcount) == 0) {
1532 switch(o->type) {
1533 case REDIS_STRING: freeStringObject(o); break;
1534 case REDIS_LIST: freeListObject(o); break;
1535 case REDIS_SET: freeSetObject(o); break;
1536 case REDIS_HASH: freeHashObject(o); break;
1537 default: assert(0 != 0); break;
1538 }
1539 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1540 !listAddNodeHead(server.objfreelist,o))
1541 zfree(o);
1542 }
1543}
1544
1545/* Try to share an object against the shared objects pool */
1546static robj *tryObjectSharing(robj *o) {
1547 struct dictEntry *de;
1548 unsigned long c;
1549
1550 if (o == NULL || server.shareobjects == 0) return o;
1551
1552 assert(o->type == REDIS_STRING);
1553 de = dictFind(server.sharingpool,o);
1554 if (de) {
1555 robj *shared = dictGetEntryKey(de);
1556
1557 c = ((unsigned long) dictGetEntryVal(de))+1;
1558 dictGetEntryVal(de) = (void*) c;
1559 incrRefCount(shared);
1560 decrRefCount(o);
1561 return shared;
1562 } else {
1563 /* Here we are using a stream algorihtm: Every time an object is
1564 * shared we increment its count, everytime there is a miss we
1565 * recrement the counter of a random object. If this object reaches
1566 * zero we remove the object and put the current object instead. */
1567 if (dictSize(server.sharingpool) >=
1568 server.sharingpoolsize) {
1569 de = dictGetRandomKey(server.sharingpool);
1570 assert(de != NULL);
1571 c = ((unsigned long) dictGetEntryVal(de))-1;
1572 dictGetEntryVal(de) = (void*) c;
1573 if (c == 0) {
1574 dictDelete(server.sharingpool,de->key);
1575 }
1576 } else {
1577 c = 0; /* If the pool is empty we want to add this object */
1578 }
1579 if (c == 0) {
1580 int retval;
1581
1582 retval = dictAdd(server.sharingpool,o,(void*)1);
1583 assert(retval == DICT_OK);
1584 incrRefCount(o);
1585 }
1586 return o;
1587 }
1588}
1589
1590static robj *lookupKey(redisDb *db, robj *key) {
1591 dictEntry *de = dictFind(db->dict,key);
1592 return de ? dictGetEntryVal(de) : NULL;
1593}
1594
1595static robj *lookupKeyRead(redisDb *db, robj *key) {
1596 expireIfNeeded(db,key);
1597 return lookupKey(db,key);
1598}
1599
1600static robj *lookupKeyWrite(redisDb *db, robj *key) {
1601 deleteIfVolatile(db,key);
1602 return lookupKey(db,key);
1603}
1604
1605static int deleteKey(redisDb *db, robj *key) {
1606 int retval;
1607
1608 /* We need to protect key from destruction: after the first dictDelete()
1609 * it may happen that 'key' is no longer valid if we don't increment
1610 * it's count. This may happen when we get the object reference directly
1611 * from the hash table with dictRandomKey() or dict iterators */
1612 incrRefCount(key);
1613 if (dictSize(db->expires)) dictDelete(db->expires,key);
1614 retval = dictDelete(db->dict,key);
1615 decrRefCount(key);
1616
1617 return retval == DICT_OK;
1618}
1619
1620/*============================ DB saving/loading ============================ */
1621
1622static int rdbSaveType(FILE *fp, unsigned char type) {
1623 if (fwrite(&type,1,1,fp) == 0) return -1;
1624 return 0;
1625}
1626
1627static int rdbSaveTime(FILE *fp, time_t t) {
1628 int32_t t32 = (int32_t) t;
1629 if (fwrite(&t32,4,1,fp) == 0) return -1;
1630 return 0;
1631}
1632
1633/* check rdbLoadLen() comments for more info */
1634static int rdbSaveLen(FILE *fp, uint32_t len) {
1635 unsigned char buf[2];
1636
1637 if (len < (1<<6)) {
1638 /* Save a 6 bit len */
1639 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1640 if (fwrite(buf,1,1,fp) == 0) return -1;
1641 } else if (len < (1<<14)) {
1642 /* Save a 14 bit len */
1643 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1644 buf[1] = len&0xFF;
1645 if (fwrite(buf,2,1,fp) == 0) return -1;
1646 } else {
1647 /* Save a 32 bit len */
1648 buf[0] = (REDIS_RDB_32BITLEN<<6);
1649 if (fwrite(buf,1,1,fp) == 0) return -1;
1650 len = htonl(len);
1651 if (fwrite(&len,4,1,fp) == 0) return -1;
1652 }
1653 return 0;
1654}
1655
1656/* String objects in the form "2391" "-100" without any space and with a
1657 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1658 * encoded as integers to save space */
1659int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1660 long long value;
1661 char *endptr, buf[32];
1662
1663 /* Check if it's possible to encode this value as a number */
1664 value = strtoll(s, &endptr, 10);
1665 if (endptr[0] != '\0') return 0;
1666 snprintf(buf,32,"%lld",value);
1667
1668 /* If the number converted back into a string is not identical
1669 * then it's not possible to encode the string as integer */
1670 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1671
1672 /* Finally check if it fits in our ranges */
1673 if (value >= -(1<<7) && value <= (1<<7)-1) {
1674 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1675 enc[1] = value&0xFF;
1676 return 2;
1677 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1678 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1679 enc[1] = value&0xFF;
1680 enc[2] = (value>>8)&0xFF;
1681 return 3;
1682 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1683 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1684 enc[1] = value&0xFF;
1685 enc[2] = (value>>8)&0xFF;
1686 enc[3] = (value>>16)&0xFF;
1687 enc[4] = (value>>24)&0xFF;
1688 return 5;
1689 } else {
1690 return 0;
1691 }
1692}
1693
1694static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1695 unsigned int comprlen, outlen;
1696 unsigned char byte;
1697 void *out;
1698
1699 /* We require at least four bytes compression for this to be worth it */
1700 outlen = sdslen(obj->ptr)-4;
1701 if (outlen <= 0) return 0;
1702 if ((out = zmalloc(outlen)) == NULL) return 0;
1703 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1704 if (comprlen == 0) {
1705 zfree(out);
1706 return 0;
1707 }
1708 /* Data compressed! Let's save it on disk */
1709 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1710 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1711 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1712 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1713 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1714 zfree(out);
1715 return comprlen;
1716
1717writeerr:
1718 zfree(out);
1719 return -1;
1720}
1721
1722/* Save a string objet as [len][data] on disk. If the object is a string
1723 * representation of an integer value we try to safe it in a special form */
1724static int rdbSaveStringObject(FILE *fp, robj *obj) {
1725 size_t len = sdslen(obj->ptr);
1726 int enclen;
1727
1728 /* Try integer encoding */
1729 if (len <= 11) {
1730 unsigned char buf[5];
1731 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1732 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1733 return 0;
1734 }
1735 }
1736
1737 /* Try LZF compression - under 20 bytes it's unable to compress even
1738 * aaaaaaaaaaaaaaaaaa so skip it */
1739 if (len > 20) {
1740 int retval;
1741
1742 retval = rdbSaveLzfStringObject(fp,obj);
1743 if (retval == -1) return -1;
1744 if (retval > 0) return 0;
1745 /* retval == 0 means data can't be compressed, save the old way */
1746 }
1747
1748 /* Store verbatim */
1749 if (rdbSaveLen(fp,len) == -1) return -1;
1750 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1751 return 0;
1752}
1753
1754/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1755static int rdbSave(char *filename) {
1756 dictIterator *di = NULL;
1757 dictEntry *de;
1758 FILE *fp;
1759 char tmpfile[256];
1760 int j;
1761 time_t now = time(NULL);
1762
1763 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1764 fp = fopen(tmpfile,"w");
1765 if (!fp) {
1766 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1767 return REDIS_ERR;
1768 }
1769 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1770 for (j = 0; j < server.dbnum; j++) {
1771 redisDb *db = server.db+j;
1772 dict *d = db->dict;
1773 if (dictSize(d) == 0) continue;
1774 di = dictGetIterator(d);
1775 if (!di) {
1776 fclose(fp);
1777 return REDIS_ERR;
1778 }
1779
1780 /* Write the SELECT DB opcode */
1781 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1782 if (rdbSaveLen(fp,j) == -1) goto werr;
1783
1784 /* Iterate this DB writing every entry */
1785 while((de = dictNext(di)) != NULL) {
1786 robj *key = dictGetEntryKey(de);
1787 robj *o = dictGetEntryVal(de);
1788 time_t expiretime = getExpire(db,key);
1789
1790 /* Save the expire time */
1791 if (expiretime != -1) {
1792 /* If this key is already expired skip it */
1793 if (expiretime < now) continue;
1794 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1795 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1796 }
1797 /* Save the key and associated value */
1798 if (rdbSaveType(fp,o->type) == -1) goto werr;
1799 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1800 if (o->type == REDIS_STRING) {
1801 /* Save a string value */
1802 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1803 } else if (o->type == REDIS_LIST) {
1804 /* Save a list value */
1805 list *list = o->ptr;
1806 listNode *ln;
1807
1808 listRewind(list);
1809 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1810 while((ln = listYield(list))) {
1811 robj *eleobj = listNodeValue(ln);
1812
1813 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1814 }
1815 } else if (o->type == REDIS_SET) {
1816 /* Save a set value */
1817 dict *set = o->ptr;
1818 dictIterator *di = dictGetIterator(set);
1819 dictEntry *de;
1820
1821 if (!set) oom("dictGetIteraotr");
1822 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1823 while((de = dictNext(di)) != NULL) {
1824 robj *eleobj = dictGetEntryKey(de);
1825
1826 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1827 }
1828 dictReleaseIterator(di);
1829 } else {
1830 assert(0 != 0);
1831 }
1832 }
1833 dictReleaseIterator(di);
1834 }
1835 /* EOF opcode */
1836 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1837
1838 /* Make sure data will not remain on the OS's output buffers */
1839 fflush(fp);
1840 fsync(fileno(fp));
1841 fclose(fp);
1842
1843 /* Use RENAME to make sure the DB file is changed atomically only
1844 * if the generate DB file is ok. */
1845 if (rename(tmpfile,filename) == -1) {
1846 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1847 unlink(tmpfile);
1848 return REDIS_ERR;
1849 }
1850 redisLog(REDIS_NOTICE,"DB saved on disk");
1851 server.dirty = 0;
1852 server.lastsave = time(NULL);
1853 return REDIS_OK;
1854
1855werr:
1856 fclose(fp);
1857 unlink(tmpfile);
1858 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1859 if (di) dictReleaseIterator(di);
1860 return REDIS_ERR;
1861}
1862
1863static int rdbSaveBackground(char *filename) {
1864 pid_t childpid;
1865
1866 if (server.bgsaveinprogress) return REDIS_ERR;
1867 if ((childpid = fork()) == 0) {
1868 /* Child */
1869 close(server.fd);
1870 if (rdbSave(filename) == REDIS_OK) {
1871 exit(0);
1872 } else {
1873 exit(1);
1874 }
1875 } else {
1876 /* Parent */
1877 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1878 server.bgsaveinprogress = 1;
1879 return REDIS_OK;
1880 }
1881 return REDIS_OK; /* unreached */
1882}
1883
1884static int rdbLoadType(FILE *fp) {
1885 unsigned char type;
1886 if (fread(&type,1,1,fp) == 0) return -1;
1887 return type;
1888}
1889
1890static time_t rdbLoadTime(FILE *fp) {
1891 int32_t t32;
1892 if (fread(&t32,4,1,fp) == 0) return -1;
1893 return (time_t) t32;
1894}
1895
1896/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
1897 * of this file for a description of how this are stored on disk.
1898 *
1899 * isencoded is set to 1 if the readed length is not actually a length but
1900 * an "encoding type", check the above comments for more info */
1901static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
1902 unsigned char buf[2];
1903 uint32_t len;
1904
1905 if (isencoded) *isencoded = 0;
1906 if (rdbver == 0) {
1907 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1908 return ntohl(len);
1909 } else {
1910 int type;
1911
1912 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
1913 type = (buf[0]&0xC0)>>6;
1914 if (type == REDIS_RDB_6BITLEN) {
1915 /* Read a 6 bit len */
1916 return buf[0]&0x3F;
1917 } else if (type == REDIS_RDB_ENCVAL) {
1918 /* Read a 6 bit len encoding type */
1919 if (isencoded) *isencoded = 1;
1920 return buf[0]&0x3F;
1921 } else if (type == REDIS_RDB_14BITLEN) {
1922 /* Read a 14 bit len */
1923 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
1924 return ((buf[0]&0x3F)<<8)|buf[1];
1925 } else {
1926 /* Read a 32 bit len */
1927 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1928 return ntohl(len);
1929 }
1930 }
1931}
1932
1933static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
1934 unsigned char enc[4];
1935 long long val;
1936
1937 if (enctype == REDIS_RDB_ENC_INT8) {
1938 if (fread(enc,1,1,fp) == 0) return NULL;
1939 val = (signed char)enc[0];
1940 } else if (enctype == REDIS_RDB_ENC_INT16) {
1941 uint16_t v;
1942 if (fread(enc,2,1,fp) == 0) return NULL;
1943 v = enc[0]|(enc[1]<<8);
1944 val = (int16_t)v;
1945 } else if (enctype == REDIS_RDB_ENC_INT32) {
1946 uint32_t v;
1947 if (fread(enc,4,1,fp) == 0) return NULL;
1948 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
1949 val = (int32_t)v;
1950 } else {
1951 val = 0; /* anti-warning */
1952 assert(0!=0);
1953 }
1954 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
1955}
1956
1957static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
1958 unsigned int len, clen;
1959 unsigned char *c = NULL;
1960 sds val = NULL;
1961
1962 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
1963 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
1964 if ((c = zmalloc(clen)) == NULL) goto err;
1965 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
1966 if (fread(c,clen,1,fp) == 0) goto err;
1967 if (lzf_decompress(c,clen,val,len) == 0) goto err;
1968 return createObject(REDIS_STRING,val);
1969err:
1970 zfree(c);
1971 sdsfree(val);
1972 return NULL;
1973}
1974
1975static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
1976 int isencoded;
1977 uint32_t len;
1978 sds val;
1979
1980 len = rdbLoadLen(fp,rdbver,&isencoded);
1981 if (isencoded) {
1982 switch(len) {
1983 case REDIS_RDB_ENC_INT8:
1984 case REDIS_RDB_ENC_INT16:
1985 case REDIS_RDB_ENC_INT32:
1986 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
1987 case REDIS_RDB_ENC_LZF:
1988 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
1989 default:
1990 assert(0!=0);
1991 }
1992 }
1993
1994 if (len == REDIS_RDB_LENERR) return NULL;
1995 val = sdsnewlen(NULL,len);
1996 if (len && fread(val,len,1,fp) == 0) {
1997 sdsfree(val);
1998 return NULL;
1999 }
2000 return tryObjectSharing(createObject(REDIS_STRING,val));
2001}
2002
2003static int rdbLoad(char *filename) {
2004 FILE *fp;
2005 robj *keyobj = NULL;
2006 uint32_t dbid;
2007 int type, retval, rdbver;
2008 dict *d = server.db[0].dict;
2009 redisDb *db = server.db+0;
2010 char buf[1024];
2011 time_t expiretime = -1, now = time(NULL);
2012
2013 fp = fopen(filename,"r");
2014 if (!fp) return REDIS_ERR;
2015 if (fread(buf,9,1,fp) == 0) goto eoferr;
2016 buf[9] = '\0';
2017 if (memcmp(buf,"REDIS",5) != 0) {
2018 fclose(fp);
2019 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2020 return REDIS_ERR;
2021 }
2022 rdbver = atoi(buf+5);
2023 if (rdbver > 1) {
2024 fclose(fp);
2025 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2026 return REDIS_ERR;
2027 }
2028 while(1) {
2029 robj *o;
2030
2031 /* Read type. */
2032 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2033 if (type == REDIS_EXPIRETIME) {
2034 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2035 /* We read the time so we need to read the object type again */
2036 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2037 }
2038 if (type == REDIS_EOF) break;
2039 /* Handle SELECT DB opcode as a special case */
2040 if (type == REDIS_SELECTDB) {
2041 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2042 goto eoferr;
2043 if (dbid >= (unsigned)server.dbnum) {
2044 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2045 exit(1);
2046 }
2047 db = server.db+dbid;
2048 d = db->dict;
2049 continue;
2050 }
2051 /* Read key */
2052 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2053
2054 if (type == REDIS_STRING) {
2055 /* Read string value */
2056 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2057 } else if (type == REDIS_LIST || type == REDIS_SET) {
2058 /* Read list/set value */
2059 uint32_t listlen;
2060
2061 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2062 goto eoferr;
2063 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2064 /* Load every single element of the list/set */
2065 while(listlen--) {
2066 robj *ele;
2067
2068 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2069 if (type == REDIS_LIST) {
2070 if (!listAddNodeTail((list*)o->ptr,ele))
2071 oom("listAddNodeTail");
2072 } else {
2073 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2074 oom("dictAdd");
2075 }
2076 }
2077 } else {
2078 assert(0 != 0);
2079 }
2080 /* Add the new object in the hash table */
2081 retval = dictAdd(d,keyobj,o);
2082 if (retval == DICT_ERR) {
2083 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2084 exit(1);
2085 }
2086 /* Set the expire time if needed */
2087 if (expiretime != -1) {
2088 setExpire(db,keyobj,expiretime);
2089 /* Delete this key if already expired */
2090 if (expiretime < now) deleteKey(db,keyobj);
2091 expiretime = -1;
2092 }
2093 keyobj = o = NULL;
2094 }
2095 fclose(fp);
2096 return REDIS_OK;
2097
2098eoferr: /* unexpected end of file is handled here with a fatal exit */
2099 if (keyobj) decrRefCount(keyobj);
2100 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2101 exit(1);
2102 return REDIS_ERR; /* Just to avoid warning */
2103}
2104
2105/*================================== Commands =============================== */
2106
2107static void authCommand(redisClient *c) {
2108 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2109 c->authenticated = 1;
2110 addReply(c,shared.ok);
2111 } else {
2112 c->authenticated = 0;
2113 addReply(c,shared.err);
2114 }
2115}
2116
2117static void pingCommand(redisClient *c) {
2118 addReply(c,shared.pong);
2119}
2120
2121static void echoCommand(redisClient *c) {
2122 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2123 (int)sdslen(c->argv[1]->ptr)));
2124 addReply(c,c->argv[1]);
2125 addReply(c,shared.crlf);
2126}
2127
2128/*=================================== Strings =============================== */
2129
2130static void setGenericCommand(redisClient *c, int nx) {
2131 int retval;
2132
2133 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2134 if (retval == DICT_ERR) {
2135 if (!nx) {
2136 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2137 incrRefCount(c->argv[2]);
2138 } else {
2139 addReply(c,shared.czero);
2140 return;
2141 }
2142 } else {
2143 incrRefCount(c->argv[1]);
2144 incrRefCount(c->argv[2]);
2145 }
2146 server.dirty++;
2147 removeExpire(c->db,c->argv[1]);
2148 addReply(c, nx ? shared.cone : shared.ok);
2149}
2150
2151static void setCommand(redisClient *c) {
2152 setGenericCommand(c,0);
2153}
2154
2155static void setnxCommand(redisClient *c) {
2156 setGenericCommand(c,1);
2157}
2158
2159static void getCommand(redisClient *c) {
2160 robj *o = lookupKeyRead(c->db,c->argv[1]);
2161
2162 if (o == NULL) {
2163 addReply(c,shared.nullbulk);
2164 } else {
2165 if (o->type != REDIS_STRING) {
2166 addReply(c,shared.wrongtypeerr);
2167 } else {
2168 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2169 addReply(c,o);
2170 addReply(c,shared.crlf);
2171 }
2172 }
2173}
2174
2175static void mgetCommand(redisClient *c) {
2176 int j;
2177
2178 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2179 for (j = 1; j < c->argc; j++) {
2180 robj *o = lookupKeyRead(c->db,c->argv[j]);
2181 if (o == NULL) {
2182 addReply(c,shared.nullbulk);
2183 } else {
2184 if (o->type != REDIS_STRING) {
2185 addReply(c,shared.nullbulk);
2186 } else {
2187 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2188 addReply(c,o);
2189 addReply(c,shared.crlf);
2190 }
2191 }
2192 }
2193}
2194
2195static void incrDecrCommand(redisClient *c, int incr) {
2196 long long value;
2197 int retval;
2198 robj *o;
2199
2200 o = lookupKeyWrite(c->db,c->argv[1]);
2201 if (o == NULL) {
2202 value = 0;
2203 } else {
2204 if (o->type != REDIS_STRING) {
2205 value = 0;
2206 } else {
2207 char *eptr;
2208
2209 value = strtoll(o->ptr, &eptr, 10);
2210 }
2211 }
2212
2213 value += incr;
2214 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2215 retval = dictAdd(c->db->dict,c->argv[1],o);
2216 if (retval == DICT_ERR) {
2217 dictReplace(c->db->dict,c->argv[1],o);
2218 removeExpire(c->db,c->argv[1]);
2219 } else {
2220 incrRefCount(c->argv[1]);
2221 }
2222 server.dirty++;
2223 addReply(c,shared.colon);
2224 addReply(c,o);
2225 addReply(c,shared.crlf);
2226}
2227
2228static void incrCommand(redisClient *c) {
2229 incrDecrCommand(c,1);
2230}
2231
2232static void decrCommand(redisClient *c) {
2233 incrDecrCommand(c,-1);
2234}
2235
2236static void incrbyCommand(redisClient *c) {
2237 int incr = atoi(c->argv[2]->ptr);
2238 incrDecrCommand(c,incr);
2239}
2240
2241static void decrbyCommand(redisClient *c) {
2242 int incr = atoi(c->argv[2]->ptr);
2243 incrDecrCommand(c,-incr);
2244}
2245
2246/* ========================= Type agnostic commands ========================= */
2247
2248static void delCommand(redisClient *c) {
2249 if (deleteKey(c->db,c->argv[1])) {
2250 server.dirty++;
2251 addReply(c,shared.cone);
2252 } else {
2253 addReply(c,shared.czero);
2254 }
2255}
2256
2257static void existsCommand(redisClient *c) {
2258 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2259}
2260
2261static void selectCommand(redisClient *c) {
2262 int id = atoi(c->argv[1]->ptr);
2263
2264 if (selectDb(c,id) == REDIS_ERR) {
2265 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2266 } else {
2267 addReply(c,shared.ok);
2268 }
2269}
2270
2271static void randomkeyCommand(redisClient *c) {
2272 dictEntry *de;
2273
2274 while(1) {
2275 de = dictGetRandomKey(c->db->dict);
2276 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2277 }
2278 if (de == NULL) {
2279 addReply(c,shared.plus);
2280 addReply(c,shared.crlf);
2281 } else {
2282 addReply(c,shared.plus);
2283 addReply(c,dictGetEntryKey(de));
2284 addReply(c,shared.crlf);
2285 }
2286}
2287
2288static void keysCommand(redisClient *c) {
2289 dictIterator *di;
2290 dictEntry *de;
2291 sds pattern = c->argv[1]->ptr;
2292 int plen = sdslen(pattern);
2293 int numkeys = 0, keyslen = 0;
2294 robj *lenobj = createObject(REDIS_STRING,NULL);
2295
2296 di = dictGetIterator(c->db->dict);
2297 if (!di) oom("dictGetIterator");
2298 addReply(c,lenobj);
2299 decrRefCount(lenobj);
2300 while((de = dictNext(di)) != NULL) {
2301 robj *keyobj = dictGetEntryKey(de);
2302
2303 sds key = keyobj->ptr;
2304 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2305 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2306 if (expireIfNeeded(c->db,keyobj) == 0) {
2307 if (numkeys != 0)
2308 addReply(c,shared.space);
2309 addReply(c,keyobj);
2310 numkeys++;
2311 keyslen += sdslen(key);
2312 }
2313 }
2314 }
2315 dictReleaseIterator(di);
2316 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2317 addReply(c,shared.crlf);
2318}
2319
2320static void dbsizeCommand(redisClient *c) {
2321 addReplySds(c,
2322 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2323}
2324
2325static void lastsaveCommand(redisClient *c) {
2326 addReplySds(c,
2327 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2328}
2329
2330static void typeCommand(redisClient *c) {
2331 robj *o;
2332 char *type;
2333
2334 o = lookupKeyRead(c->db,c->argv[1]);
2335 if (o == NULL) {
2336 type = "+none";
2337 } else {
2338 switch(o->type) {
2339 case REDIS_STRING: type = "+string"; break;
2340 case REDIS_LIST: type = "+list"; break;
2341 case REDIS_SET: type = "+set"; break;
2342 default: type = "unknown"; break;
2343 }
2344 }
2345 addReplySds(c,sdsnew(type));
2346 addReply(c,shared.crlf);
2347}
2348
2349static void saveCommand(redisClient *c) {
2350 if (server.bgsaveinprogress) {
2351 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2352 return;
2353 }
2354 if (rdbSave(server.dbfilename) == REDIS_OK) {
2355 addReply(c,shared.ok);
2356 } else {
2357 addReply(c,shared.err);
2358 }
2359}
2360
2361static void bgsaveCommand(redisClient *c) {
2362 if (server.bgsaveinprogress) {
2363 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2364 return;
2365 }
2366 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2367 addReply(c,shared.ok);
2368 } else {
2369 addReply(c,shared.err);
2370 }
2371}
2372
2373static void shutdownCommand(redisClient *c) {
2374 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2375 /* XXX: TODO kill the child if there is a bgsave in progress */
2376 if (rdbSave(server.dbfilename) == REDIS_OK) {
2377 if (server.daemonize) {
2378 unlink(server.pidfile);
2379 }
2380 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2381 exit(1);
2382 } else {
2383 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2384 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2385 }
2386}
2387
2388static void renameGenericCommand(redisClient *c, int nx) {
2389 robj *o;
2390
2391 /* To use the same key as src and dst is probably an error */
2392 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2393 addReply(c,shared.sameobjecterr);
2394 return;
2395 }
2396
2397 o = lookupKeyWrite(c->db,c->argv[1]);
2398 if (o == NULL) {
2399 addReply(c,shared.nokeyerr);
2400 return;
2401 }
2402 incrRefCount(o);
2403 deleteIfVolatile(c->db,c->argv[2]);
2404 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2405 if (nx) {
2406 decrRefCount(o);
2407 addReply(c,shared.czero);
2408 return;
2409 }
2410 dictReplace(c->db->dict,c->argv[2],o);
2411 } else {
2412 incrRefCount(c->argv[2]);
2413 }
2414 deleteKey(c->db,c->argv[1]);
2415 server.dirty++;
2416 addReply(c,nx ? shared.cone : shared.ok);
2417}
2418
2419static void renameCommand(redisClient *c) {
2420 renameGenericCommand(c,0);
2421}
2422
2423static void renamenxCommand(redisClient *c) {
2424 renameGenericCommand(c,1);
2425}
2426
2427static void moveCommand(redisClient *c) {
2428 robj *o;
2429 redisDb *src, *dst;
2430 int srcid;
2431
2432 /* Obtain source and target DB pointers */
2433 src = c->db;
2434 srcid = c->db->id;
2435 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2436 addReply(c,shared.outofrangeerr);
2437 return;
2438 }
2439 dst = c->db;
2440 selectDb(c,srcid); /* Back to the source DB */
2441
2442 /* If the user is moving using as target the same
2443 * DB as the source DB it is probably an error. */
2444 if (src == dst) {
2445 addReply(c,shared.sameobjecterr);
2446 return;
2447 }
2448
2449 /* Check if the element exists and get a reference */
2450 o = lookupKeyWrite(c->db,c->argv[1]);
2451 if (!o) {
2452 addReply(c,shared.czero);
2453 return;
2454 }
2455
2456 /* Try to add the element to the target DB */
2457 deleteIfVolatile(dst,c->argv[1]);
2458 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2459 addReply(c,shared.czero);
2460 return;
2461 }
2462 incrRefCount(c->argv[1]);
2463 incrRefCount(o);
2464
2465 /* OK! key moved, free the entry in the source DB */
2466 deleteKey(src,c->argv[1]);
2467 server.dirty++;
2468 addReply(c,shared.cone);
2469}
2470
2471/* =================================== Lists ================================ */
2472static void pushGenericCommand(redisClient *c, int where) {
2473 robj *lobj;
2474 list *list;
2475
2476 lobj = lookupKeyWrite(c->db,c->argv[1]);
2477 if (lobj == NULL) {
2478 lobj = createListObject();
2479 list = lobj->ptr;
2480 if (where == REDIS_HEAD) {
2481 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2482 } else {
2483 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2484 }
2485 dictAdd(c->db->dict,c->argv[1],lobj);
2486 incrRefCount(c->argv[1]);
2487 incrRefCount(c->argv[2]);
2488 } else {
2489 if (lobj->type != REDIS_LIST) {
2490 addReply(c,shared.wrongtypeerr);
2491 return;
2492 }
2493 list = lobj->ptr;
2494 if (where == REDIS_HEAD) {
2495 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2496 } else {
2497 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2498 }
2499 incrRefCount(c->argv[2]);
2500 }
2501 server.dirty++;
2502 addReply(c,shared.ok);
2503}
2504
2505static void lpushCommand(redisClient *c) {
2506 pushGenericCommand(c,REDIS_HEAD);
2507}
2508
2509static void rpushCommand(redisClient *c) {
2510 pushGenericCommand(c,REDIS_TAIL);
2511}
2512
2513static void llenCommand(redisClient *c) {
2514 robj *o;
2515 list *l;
2516
2517 o = lookupKeyRead(c->db,c->argv[1]);
2518 if (o == NULL) {
2519 addReply(c,shared.czero);
2520 return;
2521 } else {
2522 if (o->type != REDIS_LIST) {
2523 addReply(c,shared.wrongtypeerr);
2524 } else {
2525 l = o->ptr;
2526 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2527 }
2528 }
2529}
2530
2531static void lindexCommand(redisClient *c) {
2532 robj *o;
2533 int index = atoi(c->argv[2]->ptr);
2534
2535 o = lookupKeyRead(c->db,c->argv[1]);
2536 if (o == NULL) {
2537 addReply(c,shared.nullbulk);
2538 } else {
2539 if (o->type != REDIS_LIST) {
2540 addReply(c,shared.wrongtypeerr);
2541 } else {
2542 list *list = o->ptr;
2543 listNode *ln;
2544
2545 ln = listIndex(list, index);
2546 if (ln == NULL) {
2547 addReply(c,shared.nullbulk);
2548 } else {
2549 robj *ele = listNodeValue(ln);
2550 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2551 addReply(c,ele);
2552 addReply(c,shared.crlf);
2553 }
2554 }
2555 }
2556}
2557
2558static void lsetCommand(redisClient *c) {
2559 robj *o;
2560 int index = atoi(c->argv[2]->ptr);
2561
2562 o = lookupKeyWrite(c->db,c->argv[1]);
2563 if (o == NULL) {
2564 addReply(c,shared.nokeyerr);
2565 } else {
2566 if (o->type != REDIS_LIST) {
2567 addReply(c,shared.wrongtypeerr);
2568 } else {
2569 list *list = o->ptr;
2570 listNode *ln;
2571
2572 ln = listIndex(list, index);
2573 if (ln == NULL) {
2574 addReply(c,shared.outofrangeerr);
2575 } else {
2576 robj *ele = listNodeValue(ln);
2577
2578 decrRefCount(ele);
2579 listNodeValue(ln) = c->argv[3];
2580 incrRefCount(c->argv[3]);
2581 addReply(c,shared.ok);
2582 server.dirty++;
2583 }
2584 }
2585 }
2586}
2587
2588static void popGenericCommand(redisClient *c, int where) {
2589 robj *o;
2590
2591 o = lookupKeyWrite(c->db,c->argv[1]);
2592 if (o == NULL) {
2593 addReply(c,shared.nullbulk);
2594 } else {
2595 if (o->type != REDIS_LIST) {
2596 addReply(c,shared.wrongtypeerr);
2597 } else {
2598 list *list = o->ptr;
2599 listNode *ln;
2600
2601 if (where == REDIS_HEAD)
2602 ln = listFirst(list);
2603 else
2604 ln = listLast(list);
2605
2606 if (ln == NULL) {
2607 addReply(c,shared.nullbulk);
2608 } else {
2609 robj *ele = listNodeValue(ln);
2610 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2611 addReply(c,ele);
2612 addReply(c,shared.crlf);
2613 listDelNode(list,ln);
2614 server.dirty++;
2615 }
2616 }
2617 }
2618}
2619
2620static void lpopCommand(redisClient *c) {
2621 popGenericCommand(c,REDIS_HEAD);
2622}
2623
2624static void rpopCommand(redisClient *c) {
2625 popGenericCommand(c,REDIS_TAIL);
2626}
2627
2628static void lrangeCommand(redisClient *c) {
2629 robj *o;
2630 int start = atoi(c->argv[2]->ptr);
2631 int end = atoi(c->argv[3]->ptr);
2632
2633 o = lookupKeyRead(c->db,c->argv[1]);
2634 if (o == NULL) {
2635 addReply(c,shared.nullmultibulk);
2636 } else {
2637 if (o->type != REDIS_LIST) {
2638 addReply(c,shared.wrongtypeerr);
2639 } else {
2640 list *list = o->ptr;
2641 listNode *ln;
2642 int llen = listLength(list);
2643 int rangelen, j;
2644 robj *ele;
2645
2646 /* convert negative indexes */
2647 if (start < 0) start = llen+start;
2648 if (end < 0) end = llen+end;
2649 if (start < 0) start = 0;
2650 if (end < 0) end = 0;
2651
2652 /* indexes sanity checks */
2653 if (start > end || start >= llen) {
2654 /* Out of range start or start > end result in empty list */
2655 addReply(c,shared.emptymultibulk);
2656 return;
2657 }
2658 if (end >= llen) end = llen-1;
2659 rangelen = (end-start)+1;
2660
2661 /* Return the result in form of a multi-bulk reply */
2662 ln = listIndex(list, start);
2663 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2664 for (j = 0; j < rangelen; j++) {
2665 ele = listNodeValue(ln);
2666 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2667 addReply(c,ele);
2668 addReply(c,shared.crlf);
2669 ln = ln->next;
2670 }
2671 }
2672 }
2673}
2674
2675static void ltrimCommand(redisClient *c) {
2676 robj *o;
2677 int start = atoi(c->argv[2]->ptr);
2678 int end = atoi(c->argv[3]->ptr);
2679
2680 o = lookupKeyWrite(c->db,c->argv[1]);
2681 if (o == NULL) {
2682 addReply(c,shared.nokeyerr);
2683 } else {
2684 if (o->type != REDIS_LIST) {
2685 addReply(c,shared.wrongtypeerr);
2686 } else {
2687 list *list = o->ptr;
2688 listNode *ln;
2689 int llen = listLength(list);
2690 int j, ltrim, rtrim;
2691
2692 /* convert negative indexes */
2693 if (start < 0) start = llen+start;
2694 if (end < 0) end = llen+end;
2695 if (start < 0) start = 0;
2696 if (end < 0) end = 0;
2697
2698 /* indexes sanity checks */
2699 if (start > end || start >= llen) {
2700 /* Out of range start or start > end result in empty list */
2701 ltrim = llen;
2702 rtrim = 0;
2703 } else {
2704 if (end >= llen) end = llen-1;
2705 ltrim = start;
2706 rtrim = llen-end-1;
2707 }
2708
2709 /* Remove list elements to perform the trim */
2710 for (j = 0; j < ltrim; j++) {
2711 ln = listFirst(list);
2712 listDelNode(list,ln);
2713 }
2714 for (j = 0; j < rtrim; j++) {
2715 ln = listLast(list);
2716 listDelNode(list,ln);
2717 }
2718 addReply(c,shared.ok);
2719 server.dirty++;
2720 }
2721 }
2722}
2723
2724static void lremCommand(redisClient *c) {
2725 robj *o;
2726
2727 o = lookupKeyWrite(c->db,c->argv[1]);
2728 if (o == NULL) {
2729 addReply(c,shared.nokeyerr);
2730 } else {
2731 if (o->type != REDIS_LIST) {
2732 addReply(c,shared.wrongtypeerr);
2733 } else {
2734 list *list = o->ptr;
2735 listNode *ln, *next;
2736 int toremove = atoi(c->argv[2]->ptr);
2737 int removed = 0;
2738 int fromtail = 0;
2739
2740 if (toremove < 0) {
2741 toremove = -toremove;
2742 fromtail = 1;
2743 }
2744 ln = fromtail ? list->tail : list->head;
2745 while (ln) {
2746 robj *ele = listNodeValue(ln);
2747
2748 next = fromtail ? ln->prev : ln->next;
2749 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2750 listDelNode(list,ln);
2751 server.dirty++;
2752 removed++;
2753 if (toremove && removed == toremove) break;
2754 }
2755 ln = next;
2756 }
2757 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2758 }
2759 }
2760}
2761
2762/* ==================================== Sets ================================ */
2763
2764static void saddCommand(redisClient *c) {
2765 robj *set;
2766
2767 set = lookupKeyWrite(c->db,c->argv[1]);
2768 if (set == NULL) {
2769 set = createSetObject();
2770 dictAdd(c->db->dict,c->argv[1],set);
2771 incrRefCount(c->argv[1]);
2772 } else {
2773 if (set->type != REDIS_SET) {
2774 addReply(c,shared.wrongtypeerr);
2775 return;
2776 }
2777 }
2778 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2779 incrRefCount(c->argv[2]);
2780 server.dirty++;
2781 addReply(c,shared.cone);
2782 } else {
2783 addReply(c,shared.czero);
2784 }
2785}
2786
2787static void sremCommand(redisClient *c) {
2788 robj *set;
2789
2790 set = lookupKeyWrite(c->db,c->argv[1]);
2791 if (set == NULL) {
2792 addReply(c,shared.czero);
2793 } else {
2794 if (set->type != REDIS_SET) {
2795 addReply(c,shared.wrongtypeerr);
2796 return;
2797 }
2798 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2799 server.dirty++;
2800 addReply(c,shared.cone);
2801 } else {
2802 addReply(c,shared.czero);
2803 }
2804 }
2805}
2806
2807static void sismemberCommand(redisClient *c) {
2808 robj *set;
2809
2810 set = lookupKeyRead(c->db,c->argv[1]);
2811 if (set == NULL) {
2812 addReply(c,shared.czero);
2813 } else {
2814 if (set->type != REDIS_SET) {
2815 addReply(c,shared.wrongtypeerr);
2816 return;
2817 }
2818 if (dictFind(set->ptr,c->argv[2]))
2819 addReply(c,shared.cone);
2820 else
2821 addReply(c,shared.czero);
2822 }
2823}
2824
2825static void scardCommand(redisClient *c) {
2826 robj *o;
2827 dict *s;
2828
2829 o = lookupKeyRead(c->db,c->argv[1]);
2830 if (o == NULL) {
2831 addReply(c,shared.czero);
2832 return;
2833 } else {
2834 if (o->type != REDIS_SET) {
2835 addReply(c,shared.wrongtypeerr);
2836 } else {
2837 s = o->ptr;
2838 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
2839 dictSize(s)));
2840 }
2841 }
2842}
2843
2844static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
2845 dict **d1 = (void*) s1, **d2 = (void*) s2;
2846
2847 return dictSize(*d1)-dictSize(*d2);
2848}
2849
2850static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
2851 dict **dv = zmalloc(sizeof(dict*)*setsnum);
2852 dictIterator *di;
2853 dictEntry *de;
2854 robj *lenobj = NULL, *dstset = NULL;
2855 int j, cardinality = 0;
2856
2857 if (!dv) oom("sinterCommand");
2858 for (j = 0; j < setsnum; j++) {
2859 robj *setobj;
2860
2861 setobj = dstkey ?
2862 lookupKeyWrite(c->db,setskeys[j]) :
2863 lookupKeyRead(c->db,setskeys[j]);
2864 if (!setobj) {
2865 zfree(dv);
2866 if (dstkey) {
2867 deleteKey(c->db,dstkey);
2868 addReply(c,shared.ok);
2869 } else {
2870 addReply(c,shared.nullmultibulk);
2871 }
2872 return;
2873 }
2874 if (setobj->type != REDIS_SET) {
2875 zfree(dv);
2876 addReply(c,shared.wrongtypeerr);
2877 return;
2878 }
2879 dv[j] = setobj->ptr;
2880 }
2881 /* Sort sets from the smallest to largest, this will improve our
2882 * algorithm's performace */
2883 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
2884
2885 /* The first thing we should output is the total number of elements...
2886 * since this is a multi-bulk write, but at this stage we don't know
2887 * the intersection set size, so we use a trick, append an empty object
2888 * to the output list and save the pointer to later modify it with the
2889 * right length */
2890 if (!dstkey) {
2891 lenobj = createObject(REDIS_STRING,NULL);
2892 addReply(c,lenobj);
2893 decrRefCount(lenobj);
2894 } else {
2895 /* If we have a target key where to store the resulting set
2896 * create this key with an empty set inside */
2897 dstset = createSetObject();
2898 deleteKey(c->db,dstkey);
2899 dictAdd(c->db->dict,dstkey,dstset);
2900 incrRefCount(dstkey);
2901 }
2902
2903 /* Iterate all the elements of the first (smallest) set, and test
2904 * the element against all the other sets, if at least one set does
2905 * not include the element it is discarded */
2906 di = dictGetIterator(dv[0]);
2907 if (!di) oom("dictGetIterator");
2908
2909 while((de = dictNext(di)) != NULL) {
2910 robj *ele;
2911
2912 for (j = 1; j < setsnum; j++)
2913 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
2914 if (j != setsnum)
2915 continue; /* at least one set does not contain the member */
2916 ele = dictGetEntryKey(de);
2917 if (!dstkey) {
2918 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
2919 addReply(c,ele);
2920 addReply(c,shared.crlf);
2921 cardinality++;
2922 } else {
2923 dictAdd(dstset->ptr,ele,NULL);
2924 incrRefCount(ele);
2925 }
2926 }
2927 dictReleaseIterator(di);
2928
2929 if (!dstkey) {
2930 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
2931 } else {
2932 addReply(c,shared.ok);
2933 server.dirty++;
2934 }
2935 zfree(dv);
2936}
2937
2938static void sinterCommand(redisClient *c) {
2939 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
2940}
2941
2942static void sinterstoreCommand(redisClient *c) {
2943 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
2944}
2945
2946static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
2947 dict **dv = zmalloc(sizeof(dict*)*setsnum);
2948 dictIterator *di;
2949 dictEntry *de;
2950 robj *lenobj = NULL, *dstset = NULL;
2951 int j, cardinality = 0;
2952
2953 if (!dv) oom("sunionCommand");
2954 for (j = 0; j < setsnum; j++) {
2955 robj *setobj;
2956
2957 setobj = dstkey ?
2958 lookupKeyWrite(c->db,setskeys[j]) :
2959 lookupKeyRead(c->db,setskeys[j]);
2960 if (!setobj) {
2961 dv[j] = NULL;
2962 continue;
2963 }
2964 if (setobj->type != REDIS_SET) {
2965 zfree(dv);
2966 addReply(c,shared.wrongtypeerr);
2967 return;
2968 }
2969 dv[j] = setobj->ptr;
2970 }
2971
2972 /* We need a temp set object to store our union. If the dstkey
2973 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
2974 * this set object will be the resulting object to set into the target key*/
2975 dstset = createSetObject();
2976
2977 /* The first thing we should output is the total number of elements...
2978 * since this is a multi-bulk write, but at this stage we don't know
2979 * the intersection set size, so we use a trick, append an empty object
2980 * to the output list and save the pointer to later modify it with the
2981 * right length */
2982 if (!dstkey) {
2983 lenobj = createObject(REDIS_STRING,NULL);
2984 addReply(c,lenobj);
2985 decrRefCount(lenobj);
2986 } else {
2987 /* If we have a target key where to store the resulting set
2988 * create this key with an empty set inside */
2989 deleteKey(c->db,dstkey);
2990 dictAdd(c->db->dict,dstkey,dstset);
2991 incrRefCount(dstkey);
2992 server.dirty++;
2993 }
2994
2995 /* Iterate all the elements of all the sets, add every element a single
2996 * time to the result set */
2997 for (j = 0; j < setsnum; j++) {
2998 if (!dv[j]) continue; /* non existing keys are like empty sets */
2999
3000 di = dictGetIterator(dv[j]);
3001 if (!di) oom("dictGetIterator");
3002
3003 while((de = dictNext(di)) != NULL) {
3004 robj *ele;
3005
3006 /* dictAdd will not add the same element multiple times */
3007 ele = dictGetEntryKey(de);
3008 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3009 incrRefCount(ele);
3010 if (!dstkey) {
3011 addReplySds(c,sdscatprintf(sdsempty(),
3012 "$%d\r\n",sdslen(ele->ptr)));
3013 addReply(c,ele);
3014 addReply(c,shared.crlf);
3015 cardinality++;
3016 }
3017 }
3018 }
3019 dictReleaseIterator(di);
3020 }
3021
3022 if (!dstkey) {
3023 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3024 decrRefCount(dstset);
3025 } else {
3026 addReply(c,shared.ok);
3027 server.dirty++;
3028 }
3029 zfree(dv);
3030}
3031
3032static void sunionCommand(redisClient *c) {
3033 sunionGenericCommand(c,c->argv+1,c->argc-1,NULL);
3034}
3035
3036static void sunionstoreCommand(redisClient *c) {
3037 sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3038}
3039
3040static void flushdbCommand(redisClient *c) {
3041 dictEmpty(c->db->dict);
3042 dictEmpty(c->db->expires);
3043 server.dirty++;
3044 addReply(c,shared.ok);
3045 rdbSave(server.dbfilename);
3046}
3047
3048static void flushallCommand(redisClient *c) {
3049 emptyDb();
3050 server.dirty++;
3051 addReply(c,shared.ok);
3052 rdbSave(server.dbfilename);
3053}
3054
3055redisSortOperation *createSortOperation(int type, robj *pattern) {
3056 redisSortOperation *so = zmalloc(sizeof(*so));
3057 if (!so) oom("createSortOperation");
3058 so->type = type;
3059 so->pattern = pattern;
3060 return so;
3061}
3062
3063/* Return the value associated to the key with a name obtained
3064 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3065robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3066 char *p;
3067 sds spat, ssub;
3068 robj keyobj;
3069 int prefixlen, sublen, postfixlen;
3070 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3071 struct {
3072 long len;
3073 long free;
3074 char buf[REDIS_SORTKEY_MAX+1];
3075 } keyname;
3076
3077 spat = pattern->ptr;
3078 ssub = subst->ptr;
3079 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3080 p = strchr(spat,'*');
3081 if (!p) return NULL;
3082
3083 prefixlen = p-spat;
3084 sublen = sdslen(ssub);
3085 postfixlen = sdslen(spat)-(prefixlen+1);
3086 memcpy(keyname.buf,spat,prefixlen);
3087 memcpy(keyname.buf+prefixlen,ssub,sublen);
3088 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3089 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3090 keyname.len = prefixlen+sublen+postfixlen;
3091
3092 keyobj.refcount = 1;
3093 keyobj.type = REDIS_STRING;
3094 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3095
3096 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3097 return lookupKeyRead(db,&keyobj);
3098}
3099
3100/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3101 * the additional parameter is not standard but a BSD-specific we have to
3102 * pass sorting parameters via the global 'server' structure */
3103static int sortCompare(const void *s1, const void *s2) {
3104 const redisSortObject *so1 = s1, *so2 = s2;
3105 int cmp;
3106
3107 if (!server.sort_alpha) {
3108 /* Numeric sorting. Here it's trivial as we precomputed scores */
3109 if (so1->u.score > so2->u.score) {
3110 cmp = 1;
3111 } else if (so1->u.score < so2->u.score) {
3112 cmp = -1;
3113 } else {
3114 cmp = 0;
3115 }
3116 } else {
3117 /* Alphanumeric sorting */
3118 if (server.sort_bypattern) {
3119 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3120 /* At least one compare object is NULL */
3121 if (so1->u.cmpobj == so2->u.cmpobj)
3122 cmp = 0;
3123 else if (so1->u.cmpobj == NULL)
3124 cmp = -1;
3125 else
3126 cmp = 1;
3127 } else {
3128 /* We have both the objects, use strcoll */
3129 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3130 }
3131 } else {
3132 /* Compare elements directly */
3133 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3134 }
3135 }
3136 return server.sort_desc ? -cmp : cmp;
3137}
3138
3139/* The SORT command is the most complex command in Redis. Warning: this code
3140 * is optimized for speed and a bit less for readability */
3141static void sortCommand(redisClient *c) {
3142 list *operations;
3143 int outputlen = 0;
3144 int desc = 0, alpha = 0;
3145 int limit_start = 0, limit_count = -1, start, end;
3146 int j, dontsort = 0, vectorlen;
3147 int getop = 0; /* GET operation counter */
3148 robj *sortval, *sortby = NULL;
3149 redisSortObject *vector; /* Resulting vector to sort */
3150
3151 /* Lookup the key to sort. It must be of the right types */
3152 sortval = lookupKeyRead(c->db,c->argv[1]);
3153 if (sortval == NULL) {
3154 addReply(c,shared.nokeyerr);
3155 return;
3156 }
3157 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3158 addReply(c,shared.wrongtypeerr);
3159 return;
3160 }
3161
3162 /* Create a list of operations to perform for every sorted element.
3163 * Operations can be GET/DEL/INCR/DECR */
3164 operations = listCreate();
3165 listSetFreeMethod(operations,zfree);
3166 j = 2;
3167
3168 /* Now we need to protect sortval incrementing its count, in the future
3169 * SORT may have options able to overwrite/delete keys during the sorting
3170 * and the sorted key itself may get destroied */
3171 incrRefCount(sortval);
3172
3173 /* The SORT command has an SQL-alike syntax, parse it */
3174 while(j < c->argc) {
3175 int leftargs = c->argc-j-1;
3176 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3177 desc = 0;
3178 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3179 desc = 1;
3180 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3181 alpha = 1;
3182 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3183 limit_start = atoi(c->argv[j+1]->ptr);
3184 limit_count = atoi(c->argv[j+2]->ptr);
3185 j+=2;
3186 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3187 sortby = c->argv[j+1];
3188 /* If the BY pattern does not contain '*', i.e. it is constant,
3189 * we don't need to sort nor to lookup the weight keys. */
3190 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3191 j++;
3192 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3193 listAddNodeTail(operations,createSortOperation(
3194 REDIS_SORT_GET,c->argv[j+1]));
3195 getop++;
3196 j++;
3197 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3198 listAddNodeTail(operations,createSortOperation(
3199 REDIS_SORT_DEL,c->argv[j+1]));
3200 j++;
3201 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3202 listAddNodeTail(operations,createSortOperation(
3203 REDIS_SORT_INCR,c->argv[j+1]));
3204 j++;
3205 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3206 listAddNodeTail(operations,createSortOperation(
3207 REDIS_SORT_DECR,c->argv[j+1]));
3208 j++;
3209 } else {
3210 decrRefCount(sortval);
3211 listRelease(operations);
3212 addReply(c,shared.syntaxerr);
3213 return;
3214 }
3215 j++;
3216 }
3217
3218 /* Load the sorting vector with all the objects to sort */
3219 vectorlen = (sortval->type == REDIS_LIST) ?
3220 listLength((list*)sortval->ptr) :
3221 dictSize((dict*)sortval->ptr);
3222 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3223 if (!vector) oom("allocating objects vector for SORT");
3224 j = 0;
3225 if (sortval->type == REDIS_LIST) {
3226 list *list = sortval->ptr;
3227 listNode *ln;
3228
3229 listRewind(list);
3230 while((ln = listYield(list))) {
3231 robj *ele = ln->value;
3232 vector[j].obj = ele;
3233 vector[j].u.score = 0;
3234 vector[j].u.cmpobj = NULL;
3235 j++;
3236 }
3237 } else {
3238 dict *set = sortval->ptr;
3239 dictIterator *di;
3240 dictEntry *setele;
3241
3242 di = dictGetIterator(set);
3243 if (!di) oom("dictGetIterator");
3244 while((setele = dictNext(di)) != NULL) {
3245 vector[j].obj = dictGetEntryKey(setele);
3246 vector[j].u.score = 0;
3247 vector[j].u.cmpobj = NULL;
3248 j++;
3249 }
3250 dictReleaseIterator(di);
3251 }
3252 assert(j == vectorlen);
3253
3254 /* Now it's time to load the right scores in the sorting vector */
3255 if (dontsort == 0) {
3256 for (j = 0; j < vectorlen; j++) {
3257 if (sortby) {
3258 robj *byval;
3259
3260 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3261 if (!byval || byval->type != REDIS_STRING) continue;
3262 if (alpha) {
3263 vector[j].u.cmpobj = byval;
3264 incrRefCount(byval);
3265 } else {
3266 vector[j].u.score = strtod(byval->ptr,NULL);
3267 }
3268 } else {
3269 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3270 }
3271 }
3272 }
3273
3274 /* We are ready to sort the vector... perform a bit of sanity check
3275 * on the LIMIT option too. We'll use a partial version of quicksort. */
3276 start = (limit_start < 0) ? 0 : limit_start;
3277 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3278 if (start >= vectorlen) {
3279 start = vectorlen-1;
3280 end = vectorlen-2;
3281 }
3282 if (end >= vectorlen) end = vectorlen-1;
3283
3284 if (dontsort == 0) {
3285 server.sort_desc = desc;
3286 server.sort_alpha = alpha;
3287 server.sort_bypattern = sortby ? 1 : 0;
3288 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3289 }
3290
3291 /* Send command output to the output buffer, performing the specified
3292 * GET/DEL/INCR/DECR operations if any. */
3293 outputlen = getop ? getop*(end-start+1) : end-start+1;
3294 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3295 for (j = start; j <= end; j++) {
3296 listNode *ln;
3297 if (!getop) {
3298 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3299 sdslen(vector[j].obj->ptr)));
3300 addReply(c,vector[j].obj);
3301 addReply(c,shared.crlf);
3302 }
3303 listRewind(operations);
3304 while((ln = listYield(operations))) {
3305 redisSortOperation *sop = ln->value;
3306 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3307 vector[j].obj);
3308
3309 if (sop->type == REDIS_SORT_GET) {
3310 if (!val || val->type != REDIS_STRING) {
3311 addReply(c,shared.nullbulk);
3312 } else {
3313 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3314 sdslen(val->ptr)));
3315 addReply(c,val);
3316 addReply(c,shared.crlf);
3317 }
3318 } else if (sop->type == REDIS_SORT_DEL) {
3319 /* TODO */
3320 }
3321 }
3322 }
3323
3324 /* Cleanup */
3325 decrRefCount(sortval);
3326 listRelease(operations);
3327 for (j = 0; j < vectorlen; j++) {
3328 if (sortby && alpha && vector[j].u.cmpobj)
3329 decrRefCount(vector[j].u.cmpobj);
3330 }
3331 zfree(vector);
3332}
3333
3334static void infoCommand(redisClient *c) {
3335 sds info;
3336 time_t uptime = time(NULL)-server.stat_starttime;
3337
3338 info = sdscatprintf(sdsempty(),
3339 "redis_version:%s\r\n"
3340 "connected_clients:%d\r\n"
3341 "connected_slaves:%d\r\n"
3342 "used_memory:%zu\r\n"
3343 "changes_since_last_save:%lld\r\n"
3344 "last_save_time:%d\r\n"
3345 "total_connections_received:%lld\r\n"
3346 "total_commands_processed:%lld\r\n"
3347 "uptime_in_seconds:%d\r\n"
3348 "uptime_in_days:%d\r\n"
3349 ,REDIS_VERSION,
3350 listLength(server.clients)-listLength(server.slaves),
3351 listLength(server.slaves),
3352 server.usedmemory,
3353 server.dirty,
3354 server.lastsave,
3355 server.stat_numconnections,
3356 server.stat_numcommands,
3357 uptime,
3358 uptime/(3600*24)
3359 );
3360 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3361 addReplySds(c,info);
3362 addReply(c,shared.crlf);
3363}
3364
3365static void monitorCommand(redisClient *c) {
3366 /* ignore MONITOR if aleady slave or in monitor mode */
3367 if (c->flags & REDIS_SLAVE) return;
3368
3369 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3370 c->slaveseldb = 0;
3371 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3372 addReply(c,shared.ok);
3373}
3374
3375/* ================================= Expire ================================= */
3376static int removeExpire(redisDb *db, robj *key) {
3377 if (dictDelete(db->expires,key) == DICT_OK) {
3378 return 1;
3379 } else {
3380 return 0;
3381 }
3382}
3383
3384static int setExpire(redisDb *db, robj *key, time_t when) {
3385 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3386 return 0;
3387 } else {
3388 incrRefCount(key);
3389 return 1;
3390 }
3391}
3392
3393/* Return the expire time of the specified key, or -1 if no expire
3394 * is associated with this key (i.e. the key is non volatile) */
3395static time_t getExpire(redisDb *db, robj *key) {
3396 dictEntry *de;
3397
3398 /* No expire? return ASAP */
3399 if (dictSize(db->expires) == 0 ||
3400 (de = dictFind(db->expires,key)) == NULL) return -1;
3401
3402 return (time_t) dictGetEntryVal(de);
3403}
3404
3405static int expireIfNeeded(redisDb *db, robj *key) {
3406 time_t when;
3407 dictEntry *de;
3408
3409 /* No expire? return ASAP */
3410 if (dictSize(db->expires) == 0 ||
3411 (de = dictFind(db->expires,key)) == NULL) return 0;
3412
3413 /* Lookup the expire */
3414 when = (time_t) dictGetEntryVal(de);
3415 if (time(NULL) <= when) return 0;
3416
3417 /* Delete the key */
3418 dictDelete(db->expires,key);
3419 return dictDelete(db->dict,key) == DICT_OK;
3420}
3421
3422static int deleteIfVolatile(redisDb *db, robj *key) {
3423 dictEntry *de;
3424
3425 /* No expire? return ASAP */
3426 if (dictSize(db->expires) == 0 ||
3427 (de = dictFind(db->expires,key)) == NULL) return 0;
3428
3429 /* Delete the key */
3430 server.dirty++;
3431 dictDelete(db->expires,key);
3432 return dictDelete(db->dict,key) == DICT_OK;
3433}
3434
3435static void expireCommand(redisClient *c) {
3436 dictEntry *de;
3437 int seconds = atoi(c->argv[2]->ptr);
3438
3439 de = dictFind(c->db->dict,c->argv[1]);
3440 if (de == NULL) {
3441 addReply(c,shared.czero);
3442 return;
3443 }
3444 if (seconds <= 0) {
3445 addReply(c, shared.czero);
3446 return;
3447 } else {
3448 time_t when = time(NULL)+seconds;
3449 if (setExpire(c->db,c->argv[1],when))
3450 addReply(c,shared.cone);
3451 else
3452 addReply(c,shared.czero);
3453 return;
3454 }
3455}
3456
3457/* =============================== Replication ============================= */
3458
3459static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3460 ssize_t nwritten, ret = size;
3461 time_t start = time(NULL);
3462
3463 timeout++;
3464 while(size) {
3465 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3466 nwritten = write(fd,ptr,size);
3467 if (nwritten == -1) return -1;
3468 ptr += nwritten;
3469 size -= nwritten;
3470 }
3471 if ((time(NULL)-start) > timeout) {
3472 errno = ETIMEDOUT;
3473 return -1;
3474 }
3475 }
3476 return ret;
3477}
3478
3479static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3480 ssize_t nread, totread = 0;
3481 time_t start = time(NULL);
3482
3483 timeout++;
3484 while(size) {
3485 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3486 nread = read(fd,ptr,size);
3487 if (nread == -1) return -1;
3488 ptr += nread;
3489 size -= nread;
3490 totread += nread;
3491 }
3492 if ((time(NULL)-start) > timeout) {
3493 errno = ETIMEDOUT;
3494 return -1;
3495 }
3496 }
3497 return totread;
3498}
3499
3500static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3501 ssize_t nread = 0;
3502
3503 size--;
3504 while(size) {
3505 char c;
3506
3507 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3508 if (c == '\n') {
3509 *ptr = '\0';
3510 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3511 return nread;
3512 } else {
3513 *ptr++ = c;
3514 *ptr = '\0';
3515 nread++;
3516 }
3517 }
3518 return nread;
3519}
3520
3521static void syncCommand(redisClient *c) {
3522 /* ignore SYNC if aleady slave or in monitor mode */
3523 if (c->flags & REDIS_SLAVE) return;
3524
3525 /* SYNC can't be issued when the server has pending data to send to
3526 * the client about already issued commands. We need a fresh reply
3527 * buffer registering the differences between the BGSAVE and the current
3528 * dataset, so that we can copy to other slaves if needed. */
3529 if (listLength(c->reply) != 0) {
3530 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3531 return;
3532 }
3533
3534 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3535 /* Here we need to check if there is a background saving operation
3536 * in progress, or if it is required to start one */
3537 if (server.bgsaveinprogress) {
3538 /* Ok a background save is in progress. Let's check if it is a good
3539 * one for replication, i.e. if there is another slave that is
3540 * registering differences since the server forked to save */
3541 redisClient *slave;
3542 listNode *ln;
3543
3544 listRewind(server.slaves);
3545 while((ln = listYield(server.slaves))) {
3546 slave = ln->value;
3547 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3548 }
3549 if (ln) {
3550 /* Perfect, the server is already registering differences for
3551 * another slave. Set the right state, and copy the buffer. */
3552 listRelease(c->reply);
3553 c->reply = listDup(slave->reply);
3554 if (!c->reply) oom("listDup copying slave reply list");
3555 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3556 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3557 } else {
3558 /* No way, we need to wait for the next BGSAVE in order to
3559 * register differences */
3560 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3561 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3562 }
3563 } else {
3564 /* Ok we don't have a BGSAVE in progress, let's start one */
3565 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3566 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3567 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3568 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3569 return;
3570 }
3571 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3572 }
3573 c->repldbfd = -1;
3574 c->flags |= REDIS_SLAVE;
3575 c->slaveseldb = 0;
3576 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3577 return;
3578}
3579
3580static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3581 redisClient *slave = privdata;
3582 REDIS_NOTUSED(el);
3583 REDIS_NOTUSED(mask);
3584 char buf[REDIS_IOBUF_LEN];
3585 ssize_t nwritten, buflen;
3586
3587 if (slave->repldboff == 0) {
3588 /* Write the bulk write count before to transfer the DB. In theory here
3589 * we don't know how much room there is in the output buffer of the
3590 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3591 * operations) will never be smaller than the few bytes we need. */
3592 sds bulkcount;
3593
3594 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3595 slave->repldbsize);
3596 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3597 {
3598 sdsfree(bulkcount);
3599 freeClient(slave);
3600 return;
3601 }
3602 sdsfree(bulkcount);
3603 }
3604 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3605 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3606 if (buflen <= 0) {
3607 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3608 (buflen == 0) ? "premature EOF" : strerror(errno));
3609 freeClient(slave);
3610 return;
3611 }
3612 if ((nwritten = write(fd,buf,buflen)) == -1) {
3613 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3614 strerror(errno));
3615 freeClient(slave);
3616 return;
3617 }
3618 slave->repldboff += nwritten;
3619 if (slave->repldboff == slave->repldbsize) {
3620 close(slave->repldbfd);
3621 slave->repldbfd = -1;
3622 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3623 slave->replstate = REDIS_REPL_ONLINE;
3624 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3625 sendReplyToClient, slave, NULL) == AE_ERR) {
3626 freeClient(slave);
3627 return;
3628 }
3629 addReplySds(slave,sdsempty());
3630 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3631 }
3632}
3633
3634static void updateSalvesWaitingBgsave(int bgsaveerr) {
3635 listNode *ln;
3636 int startbgsave = 0;
3637
3638 listRewind(server.slaves);
3639 while((ln = listYield(server.slaves))) {
3640 redisClient *slave = ln->value;
3641
3642 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3643 startbgsave = 1;
3644 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3645 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3646 struct stat buf;
3647
3648 if (bgsaveerr != REDIS_OK) {
3649 freeClient(slave);
3650 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3651 continue;
3652 }
3653 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3654 fstat(slave->repldbfd,&buf) == -1) {
3655 freeClient(slave);
3656 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3657 continue;
3658 }
3659 slave->repldboff = 0;
3660 slave->repldbsize = buf.st_size;
3661 slave->replstate = REDIS_REPL_SEND_BULK;
3662 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3663 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3664 freeClient(slave);
3665 continue;
3666 }
3667 }
3668 }
3669 if (startbgsave) {
3670 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3671 listRewind(server.slaves);
3672 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3673 while((ln = listYield(server.slaves))) {
3674 redisClient *slave = ln->value;
3675
3676 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3677 freeClient(slave);
3678 }
3679 }
3680 }
3681}
3682
3683static int syncWithMaster(void) {
3684 char buf[1024], tmpfile[256];
3685 int dumpsize;
3686 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3687 int dfd;
3688
3689 if (fd == -1) {
3690 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3691 strerror(errno));
3692 return REDIS_ERR;
3693 }
3694 /* Issue the SYNC command */
3695 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3696 close(fd);
3697 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3698 strerror(errno));
3699 return REDIS_ERR;
3700 }
3701 /* Read the bulk write count */
3702 if (syncReadLine(fd,buf,1024,5) == -1) {
3703 close(fd);
3704 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3705 strerror(errno));
3706 return REDIS_ERR;
3707 }
3708 dumpsize = atoi(buf+1);
3709 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3710 /* Read the bulk write data on a temp file */
3711 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3712 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3713 if (dfd == -1) {
3714 close(fd);
3715 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3716 return REDIS_ERR;
3717 }
3718 while(dumpsize) {
3719 int nread, nwritten;
3720
3721 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3722 if (nread == -1) {
3723 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3724 strerror(errno));
3725 close(fd);
3726 close(dfd);
3727 return REDIS_ERR;
3728 }
3729 nwritten = write(dfd,buf,nread);
3730 if (nwritten == -1) {
3731 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3732 close(fd);
3733 close(dfd);
3734 return REDIS_ERR;
3735 }
3736 dumpsize -= nread;
3737 }
3738 close(dfd);
3739 if (rename(tmpfile,server.dbfilename) == -1) {
3740 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3741 unlink(tmpfile);
3742 close(fd);
3743 return REDIS_ERR;
3744 }
3745 emptyDb();
3746 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3747 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3748 close(fd);
3749 return REDIS_ERR;
3750 }
3751 server.master = createClient(fd);
3752 server.master->flags |= REDIS_MASTER;
3753 server.replstate = REDIS_REPL_CONNECTED;
3754 return REDIS_OK;
3755}
3756
3757/* =================================== Main! ================================ */
3758
3759static void daemonize(void) {
3760 int fd;
3761 FILE *fp;
3762
3763 if (fork() != 0) exit(0); /* parent exits */
3764 setsid(); /* create a new session */
3765
3766 /* Every output goes to /dev/null. If Redis is daemonized but
3767 * the 'logfile' is set to 'stdout' in the configuration file
3768 * it will not log at all. */
3769 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3770 dup2(fd, STDIN_FILENO);
3771 dup2(fd, STDOUT_FILENO);
3772 dup2(fd, STDERR_FILENO);
3773 if (fd > STDERR_FILENO) close(fd);
3774 }
3775 /* Try to write the pid file */
3776 fp = fopen(server.pidfile,"w");
3777 if (fp) {
3778 fprintf(fp,"%d\n",getpid());
3779 fclose(fp);
3780 }
3781}
3782
3783int main(int argc, char **argv) {
3784 initServerConfig();
3785 if (argc == 2) {
3786 ResetServerSaveParams();
3787 loadServerConfig(argv[1]);
3788 } else if (argc > 2) {
3789 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
3790 exit(1);
3791 }
3792 initServer();
3793 if (server.daemonize) daemonize();
3794 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
3795 if (rdbLoad(server.dbfilename) == REDIS_OK)
3796 redisLog(REDIS_NOTICE,"DB loaded from disk");
3797 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
3798 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
3799 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
3800 aeMain(server.el);
3801 aeDeleteEventLoop(server.el);
3802 return 0;
3803}