]> git.saurik.com Git - redis.git/blame - redis.c
Nasty bug of the new DB format fixed, objects sharing implemented
[redis.git] / redis.c
CommitLineData
ed9b544e 1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
5a6948fb 30#define REDIS_VERSION "0.08"
ed9b544e 31
32#include <stdio.h>
33#include <stdlib.h>
34#include <string.h>
35#include <time.h>
36#include <unistd.h>
37#include <signal.h>
38#include <sys/wait.h>
39#include <errno.h>
40#include <assert.h>
41#include <ctype.h>
42#include <stdarg.h>
43#include <inttypes.h>
44#include <arpa/inet.h>
45#include <sys/stat.h>
46#include <fcntl.h>
47#include <sys/time.h>
48#include <sys/resource.h>
f78fd11b 49#include <limits.h>
ed9b544e 50
51#include "ae.h" /* Event driven programming library */
52#include "sds.h" /* Dynamic safe strings */
53#include "anet.h" /* Networking the easy way */
54#include "dict.h" /* Hash tables */
55#include "adlist.h" /* Linked lists */
56#include "zmalloc.h" /* total memory usage aware version of malloc/free */
57
58/* Error codes */
59#define REDIS_OK 0
60#define REDIS_ERR -1
61
62/* Static server configuration */
63#define REDIS_SERVERPORT 6379 /* TCP port */
64#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
65#define REDIS_QUERYBUF_LEN 1024
66#define REDIS_LOADBUF_LEN 1024
67#define REDIS_MAX_ARGS 16
68#define REDIS_DEFAULT_DBNUM 16
69#define REDIS_CONFIGLINE_MAX 1024
70#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
71#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
72
73/* Hash table parameters */
74#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
75#define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
76
77/* Command flags */
78#define REDIS_CMD_BULK 1
79#define REDIS_CMD_INLINE 2
80
81/* Object types */
82#define REDIS_STRING 0
83#define REDIS_LIST 1
84#define REDIS_SET 2
85#define REDIS_HASH 3
f78fd11b 86
87/* Object types only used for dumping to disk */
ed9b544e 88#define REDIS_SELECTDB 254
89#define REDIS_EOF 255
90
f78fd11b 91/* Defines related to the dump file format. To store 32 bits lengths for short
92 * keys requires a lot of space, so we check the most significant 2 bits of
93 * the first byte to interpreter the length:
94 *
95 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
96 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
97 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
10c43610 98 * 11|000000 reserved for future uses
f78fd11b 99 *
10c43610 100 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
101 * values, will fit inside. */
f78fd11b 102#define REDIS_RDB_6BITLEN 0
103#define REDIS_RDB_14BITLEN 1
104#define REDIS_RDB_32BITLEN 2
105#define REDIS_RDB_64BITLEN 3
106#define REDIS_RDB_LENERR UINT_MAX
107
ed9b544e 108/* Client flags */
109#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
110#define REDIS_SLAVE 2 /* This client is a slave server */
111#define REDIS_MASTER 4 /* This client is a master server */
87eca727 112#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
ed9b544e 113
114/* Server replication state */
115#define REDIS_REPL_NONE 0 /* No active replication */
116#define REDIS_REPL_CONNECT 1 /* Must connect to master */
117#define REDIS_REPL_CONNECTED 2 /* Connected to master */
118
119/* List related stuff */
120#define REDIS_HEAD 0
121#define REDIS_TAIL 1
122
123/* Sort operations */
124#define REDIS_SORT_GET 0
125#define REDIS_SORT_DEL 1
126#define REDIS_SORT_INCR 2
127#define REDIS_SORT_DECR 3
128#define REDIS_SORT_ASC 4
129#define REDIS_SORT_DESC 5
130#define REDIS_SORTKEY_MAX 1024
131
132/* Log levels */
133#define REDIS_DEBUG 0
134#define REDIS_NOTICE 1
135#define REDIS_WARNING 2
136
137/* Anti-warning macro... */
138#define REDIS_NOTUSED(V) ((void) V)
139
140/*================================= Data types ============================== */
141
142/* A redis object, that is a type able to hold a string / list / set */
143typedef struct redisObject {
144 int type;
145 void *ptr;
146 int refcount;
147} robj;
148
149/* With multiplexing we need to take per-clinet state.
150 * Clients are taken in a liked list. */
151typedef struct redisClient {
152 int fd;
153 dict *dict;
154 int dictid;
155 sds querybuf;
156 robj *argv[REDIS_MAX_ARGS];
157 int argc;
158 int bulklen; /* bulk read len. -1 if not in bulk read mode */
159 list *reply;
160 int sentlen;
161 time_t lastinteraction; /* time of the last interaction, used for timeout */
87eca727 162 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
ed9b544e 163 int slaveseldb; /* slave selected db, if this client is a slave */
164} redisClient;
165
166struct saveparam {
167 time_t seconds;
168 int changes;
169};
170
171/* Global server state structure */
172struct redisServer {
173 int port;
174 int fd;
175 dict **dict;
10c43610 176 dict *sharingpool;
177 unsigned int sharingpoolsize;
ed9b544e 178 long long dirty; /* changes to DB from the last save */
179 list *clients;
87eca727 180 list *slaves, *monitors;
ed9b544e 181 char neterr[ANET_ERR_LEN];
182 aeEventLoop *el;
183 int cronloops; /* number of times the cron function run */
184 list *objfreelist; /* A list of freed objects to avoid malloc() */
185 time_t lastsave; /* Unix time of last save succeeede */
186 int usedmemory; /* Used memory in megabytes */
187 /* Fields used only for stats */
188 time_t stat_starttime; /* server start time */
189 long long stat_numcommands; /* number of processed commands */
190 long long stat_numconnections; /* number of connections received */
191 /* Configuration */
192 int verbosity;
193 int glueoutputbuf;
194 int maxidletime;
195 int dbnum;
196 int daemonize;
ed329fcf 197 char *pidfile;
ed9b544e 198 int bgsaveinprogress;
199 struct saveparam *saveparams;
200 int saveparamslen;
201 char *logfile;
202 char *bindaddr;
203 char *dbfilename;
10c43610 204 int shareobjects;
ed9b544e 205 /* Replication related */
206 int isslave;
207 char *masterhost;
208 int masterport;
209 redisClient *master;
210 int replstate;
211 /* Sort parameters - qsort_r() is only available under BSD so we
212 * have to take this state global, in order to pass it to sortCompare() */
213 int sort_desc;
214 int sort_alpha;
215 int sort_bypattern;
216};
217
218typedef void redisCommandProc(redisClient *c);
219struct redisCommand {
220 char *name;
221 redisCommandProc *proc;
222 int arity;
223 int flags;
224};
225
226typedef struct _redisSortObject {
227 robj *obj;
228 union {
229 double score;
230 robj *cmpobj;
231 } u;
232} redisSortObject;
233
234typedef struct _redisSortOperation {
235 int type;
236 robj *pattern;
237} redisSortOperation;
238
239struct sharedObjectsStruct {
c937aa89 240 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
7b45bfb2 241 *colon, *nullbulk, *nullmultibulk,
c937aa89 242 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
243 *outofrangeerr, *plus,
ed9b544e 244 *select0, *select1, *select2, *select3, *select4,
245 *select5, *select6, *select7, *select8, *select9;
246} shared;
247
248/*================================ Prototypes =============================== */
249
250static void freeStringObject(robj *o);
251static void freeListObject(robj *o);
252static void freeSetObject(robj *o);
253static void decrRefCount(void *o);
254static robj *createObject(int type, void *ptr);
255static void freeClient(redisClient *c);
f78fd11b 256static int rdbLoad(char *filename);
ed9b544e 257static void addReply(redisClient *c, robj *obj);
258static void addReplySds(redisClient *c, sds s);
259static void incrRefCount(robj *o);
f78fd11b 260static int rdbSaveBackground(char *filename);
ed9b544e 261static robj *createStringObject(char *ptr, size_t len);
87eca727 262static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
ed9b544e 263static int syncWithMaster(void);
10c43610 264static robj *tryObjectSharing(robj *o);
ed9b544e 265
266static void pingCommand(redisClient *c);
267static void echoCommand(redisClient *c);
268static void setCommand(redisClient *c);
269static void setnxCommand(redisClient *c);
270static void getCommand(redisClient *c);
271static void delCommand(redisClient *c);
272static void existsCommand(redisClient *c);
273static void incrCommand(redisClient *c);
274static void decrCommand(redisClient *c);
275static void incrbyCommand(redisClient *c);
276static void decrbyCommand(redisClient *c);
277static void selectCommand(redisClient *c);
278static void randomkeyCommand(redisClient *c);
279static void keysCommand(redisClient *c);
280static void dbsizeCommand(redisClient *c);
281static void lastsaveCommand(redisClient *c);
282static void saveCommand(redisClient *c);
283static void bgsaveCommand(redisClient *c);
284static void shutdownCommand(redisClient *c);
285static void moveCommand(redisClient *c);
286static void renameCommand(redisClient *c);
287static void renamenxCommand(redisClient *c);
288static void lpushCommand(redisClient *c);
289static void rpushCommand(redisClient *c);
290static void lpopCommand(redisClient *c);
291static void rpopCommand(redisClient *c);
292static void llenCommand(redisClient *c);
293static void lindexCommand(redisClient *c);
294static void lrangeCommand(redisClient *c);
295static void ltrimCommand(redisClient *c);
296static void typeCommand(redisClient *c);
297static void lsetCommand(redisClient *c);
298static void saddCommand(redisClient *c);
299static void sremCommand(redisClient *c);
300static void sismemberCommand(redisClient *c);
301static void scardCommand(redisClient *c);
302static void sinterCommand(redisClient *c);
303static void sinterstoreCommand(redisClient *c);
304static void syncCommand(redisClient *c);
305static void flushdbCommand(redisClient *c);
306static void flushallCommand(redisClient *c);
307static void sortCommand(redisClient *c);
308static void lremCommand(redisClient *c);
309static void infoCommand(redisClient *c);
70003d28 310static void mgetCommand(redisClient *c);
87eca727 311static void monitorCommand(redisClient *c);
ed9b544e 312
313/*================================= Globals ================================= */
314
315/* Global vars */
316static struct redisServer server; /* server global state */
317static struct redisCommand cmdTable[] = {
318 {"get",getCommand,2,REDIS_CMD_INLINE},
319 {"set",setCommand,3,REDIS_CMD_BULK},
320 {"setnx",setnxCommand,3,REDIS_CMD_BULK},
321 {"del",delCommand,2,REDIS_CMD_INLINE},
322 {"exists",existsCommand,2,REDIS_CMD_INLINE},
323 {"incr",incrCommand,2,REDIS_CMD_INLINE},
324 {"decr",decrCommand,2,REDIS_CMD_INLINE},
70003d28 325 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
ed9b544e 326 {"rpush",rpushCommand,3,REDIS_CMD_BULK},
327 {"lpush",lpushCommand,3,REDIS_CMD_BULK},
328 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
329 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
330 {"llen",llenCommand,2,REDIS_CMD_INLINE},
331 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
332 {"lset",lsetCommand,4,REDIS_CMD_BULK},
333 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
334 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
335 {"lrem",lremCommand,4,REDIS_CMD_BULK},
336 {"sadd",saddCommand,3,REDIS_CMD_BULK},
337 {"srem",sremCommand,3,REDIS_CMD_BULK},
338 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
339 {"scard",scardCommand,2,REDIS_CMD_INLINE},
340 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE},
341 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE},
342 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
343 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE},
344 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE},
345 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
346 {"select",selectCommand,2,REDIS_CMD_INLINE},
347 {"move",moveCommand,3,REDIS_CMD_INLINE},
348 {"rename",renameCommand,3,REDIS_CMD_INLINE},
349 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
350 {"keys",keysCommand,2,REDIS_CMD_INLINE},
351 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
352 {"ping",pingCommand,1,REDIS_CMD_INLINE},
353 {"echo",echoCommand,2,REDIS_CMD_BULK},
354 {"save",saveCommand,1,REDIS_CMD_INLINE},
355 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
356 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
357 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
358 {"type",typeCommand,2,REDIS_CMD_INLINE},
359 {"sync",syncCommand,1,REDIS_CMD_INLINE},
360 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
361 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
362 {"sort",sortCommand,-2,REDIS_CMD_INLINE},
363 {"info",infoCommand,1,REDIS_CMD_INLINE},
87eca727 364 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
ed9b544e 365 {NULL,NULL,0,0}
366};
367
368/*============================ Utility functions ============================ */
369
370/* Glob-style pattern matching. */
371int stringmatchlen(const char *pattern, int patternLen,
372 const char *string, int stringLen, int nocase)
373{
374 while(patternLen) {
375 switch(pattern[0]) {
376 case '*':
377 while (pattern[1] == '*') {
378 pattern++;
379 patternLen--;
380 }
381 if (patternLen == 1)
382 return 1; /* match */
383 while(stringLen) {
384 if (stringmatchlen(pattern+1, patternLen-1,
385 string, stringLen, nocase))
386 return 1; /* match */
387 string++;
388 stringLen--;
389 }
390 return 0; /* no match */
391 break;
392 case '?':
393 if (stringLen == 0)
394 return 0; /* no match */
395 string++;
396 stringLen--;
397 break;
398 case '[':
399 {
400 int not, match;
401
402 pattern++;
403 patternLen--;
404 not = pattern[0] == '^';
405 if (not) {
406 pattern++;
407 patternLen--;
408 }
409 match = 0;
410 while(1) {
411 if (pattern[0] == '\\') {
412 pattern++;
413 patternLen--;
414 if (pattern[0] == string[0])
415 match = 1;
416 } else if (pattern[0] == ']') {
417 break;
418 } else if (patternLen == 0) {
419 pattern--;
420 patternLen++;
421 break;
422 } else if (pattern[1] == '-' && patternLen >= 3) {
423 int start = pattern[0];
424 int end = pattern[2];
425 int c = string[0];
426 if (start > end) {
427 int t = start;
428 start = end;
429 end = t;
430 }
431 if (nocase) {
432 start = tolower(start);
433 end = tolower(end);
434 c = tolower(c);
435 }
436 pattern += 2;
437 patternLen -= 2;
438 if (c >= start && c <= end)
439 match = 1;
440 } else {
441 if (!nocase) {
442 if (pattern[0] == string[0])
443 match = 1;
444 } else {
445 if (tolower((int)pattern[0]) == tolower((int)string[0]))
446 match = 1;
447 }
448 }
449 pattern++;
450 patternLen--;
451 }
452 if (not)
453 match = !match;
454 if (!match)
455 return 0; /* no match */
456 string++;
457 stringLen--;
458 break;
459 }
460 case '\\':
461 if (patternLen >= 2) {
462 pattern++;
463 patternLen--;
464 }
465 /* fall through */
466 default:
467 if (!nocase) {
468 if (pattern[0] != string[0])
469 return 0; /* no match */
470 } else {
471 if (tolower((int)pattern[0]) != tolower((int)string[0]))
472 return 0; /* no match */
473 }
474 string++;
475 stringLen--;
476 break;
477 }
478 pattern++;
479 patternLen--;
480 if (stringLen == 0) {
481 while(*pattern == '*') {
482 pattern++;
483 patternLen--;
484 }
485 break;
486 }
487 }
488 if (patternLen == 0 && stringLen == 0)
489 return 1;
490 return 0;
491}
492
493void redisLog(int level, const char *fmt, ...)
494{
495 va_list ap;
496 FILE *fp;
497
498 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
499 if (!fp) return;
500
501 va_start(ap, fmt);
502 if (level >= server.verbosity) {
503 char *c = ".-*";
504 fprintf(fp,"%c ",c[level]);
505 vfprintf(fp, fmt, ap);
506 fprintf(fp,"\n");
507 fflush(fp);
508 }
509 va_end(ap);
510
511 if (server.logfile) fclose(fp);
512}
513
514/*====================== Hash table type implementation ==================== */
515
516/* This is an hash table type that uses the SDS dynamic strings libary as
517 * keys and radis objects as values (objects can hold SDS strings,
518 * lists, sets). */
519
520static int sdsDictKeyCompare(void *privdata, const void *key1,
521 const void *key2)
522{
523 int l1,l2;
524 DICT_NOTUSED(privdata);
525
526 l1 = sdslen((sds)key1);
527 l2 = sdslen((sds)key2);
528 if (l1 != l2) return 0;
529 return memcmp(key1, key2, l1) == 0;
530}
531
532static void dictRedisObjectDestructor(void *privdata, void *val)
533{
534 DICT_NOTUSED(privdata);
535
536 decrRefCount(val);
537}
538
539static int dictSdsKeyCompare(void *privdata, const void *key1,
540 const void *key2)
541{
542 const robj *o1 = key1, *o2 = key2;
543 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
544}
545
546static unsigned int dictSdsHash(const void *key) {
547 const robj *o = key;
548 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
549}
550
551static dictType setDictType = {
552 dictSdsHash, /* hash function */
553 NULL, /* key dup */
554 NULL, /* val dup */
555 dictSdsKeyCompare, /* key compare */
556 dictRedisObjectDestructor, /* key destructor */
557 NULL /* val destructor */
558};
559
560static dictType hashDictType = {
561 dictSdsHash, /* hash function */
562 NULL, /* key dup */
563 NULL, /* val dup */
564 dictSdsKeyCompare, /* key compare */
565 dictRedisObjectDestructor, /* key destructor */
566 dictRedisObjectDestructor /* val destructor */
567};
568
569/* ========================= Random utility functions ======================= */
570
571/* Redis generally does not try to recover from out of memory conditions
572 * when allocating objects or strings, it is not clear if it will be possible
573 * to report this condition to the client since the networking layer itself
574 * is based on heap allocation for send buffers, so we simply abort.
575 * At least the code will be simpler to read... */
576static void oom(const char *msg) {
577 fprintf(stderr, "%s: Out of memory\n",msg);
578 fflush(stderr);
579 sleep(1);
580 abort();
581}
582
583/* ====================== Redis server networking stuff ===================== */
584void closeTimedoutClients(void) {
585 redisClient *c;
586 listIter *li;
587 listNode *ln;
588 time_t now = time(NULL);
589
590 li = listGetIterator(server.clients,AL_START_HEAD);
591 if (!li) return;
592 while ((ln = listNextElement(li)) != NULL) {
593 c = listNodeValue(ln);
594 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
595 (now - c->lastinteraction > server.maxidletime)) {
596 redisLog(REDIS_DEBUG,"Closing idle client");
597 freeClient(c);
598 }
599 }
600 listReleaseIterator(li);
601}
602
603int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
604 int j, size, used, loops = server.cronloops++;
605 REDIS_NOTUSED(eventLoop);
606 REDIS_NOTUSED(id);
607 REDIS_NOTUSED(clientData);
608
609 /* Update the global state with the amount of used memory */
610 server.usedmemory = zmalloc_used_memory();
611
612 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
613 * we resize the hash table to save memory */
614 for (j = 0; j < server.dbnum; j++) {
615 size = dictGetHashTableSize(server.dict[j]);
616 used = dictGetHashTableUsed(server.dict[j]);
617 if (!(loops % 5) && used > 0) {
618 redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size);
619 // dictPrintStats(server.dict);
620 }
621 if (size && used && size > REDIS_HT_MINSLOTS &&
622 (used*100/size < REDIS_HT_MINFILL)) {
623 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
624 dictResize(server.dict[j]);
625 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
626 }
627 }
628
629 /* Show information about connected clients */
630 if (!(loops % 5)) {
631 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use",
632 listLength(server.clients)-listLength(server.slaves),
633 listLength(server.slaves),
10c43610 634 server.usedmemory,
635 dictGetHashTableUsed(server.sharingpool));
ed9b544e 636 }
637
638 /* Close connections of timedout clients */
639 if (!(loops % 10))
640 closeTimedoutClients();
641
642 /* Check if a background saving in progress terminated */
643 if (server.bgsaveinprogress) {
644 int statloc;
645 if (wait4(-1,&statloc,WNOHANG,NULL)) {
646 int exitcode = WEXITSTATUS(statloc);
647 if (exitcode == 0) {
648 redisLog(REDIS_NOTICE,
649 "Background saving terminated with success");
650 server.dirty = 0;
651 server.lastsave = time(NULL);
652 } else {
653 redisLog(REDIS_WARNING,
654 "Background saving error");
655 }
656 server.bgsaveinprogress = 0;
657 }
658 } else {
659 /* If there is not a background saving in progress check if
660 * we have to save now */
661 time_t now = time(NULL);
662 for (j = 0; j < server.saveparamslen; j++) {
663 struct saveparam *sp = server.saveparams+j;
664
665 if (server.dirty >= sp->changes &&
666 now-server.lastsave > sp->seconds) {
667 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
668 sp->changes, sp->seconds);
f78fd11b 669 rdbSaveBackground(server.dbfilename);
ed9b544e 670 break;
671 }
672 }
673 }
674 /* Check if we should connect to a MASTER */
675 if (server.replstate == REDIS_REPL_CONNECT) {
676 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
677 if (syncWithMaster() == REDIS_OK) {
678 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
679 }
680 }
681 return 1000;
682}
683
684static void createSharedObjects(void) {
685 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
686 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
687 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
c937aa89 688 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
689 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
690 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
691 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
692 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
693 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
ed9b544e 694 /* no such key */
ed9b544e 695 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
696 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
697 "-ERR Operation against a key holding the wrong kind of value\r\n"));
ed9b544e 698 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
699 "-ERR no such key\r\n"));
ed9b544e 700 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
701 "-ERR syntax error\r\n"));
c937aa89 702 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
703 "-ERR source and destination objects are the same\r\n"));
704 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
705 "-ERR index out of range\r\n"));
ed9b544e 706 shared.space = createObject(REDIS_STRING,sdsnew(" "));
c937aa89 707 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
708 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
ed9b544e 709 shared.select0 = createStringObject("select 0\r\n",10);
710 shared.select1 = createStringObject("select 1\r\n",10);
711 shared.select2 = createStringObject("select 2\r\n",10);
712 shared.select3 = createStringObject("select 3\r\n",10);
713 shared.select4 = createStringObject("select 4\r\n",10);
714 shared.select5 = createStringObject("select 5\r\n",10);
715 shared.select6 = createStringObject("select 6\r\n",10);
716 shared.select7 = createStringObject("select 7\r\n",10);
717 shared.select8 = createStringObject("select 8\r\n",10);
718 shared.select9 = createStringObject("select 9\r\n",10);
719}
720
721static void appendServerSaveParams(time_t seconds, int changes) {
722 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
723 if (server.saveparams == NULL) oom("appendServerSaveParams");
724 server.saveparams[server.saveparamslen].seconds = seconds;
725 server.saveparams[server.saveparamslen].changes = changes;
726 server.saveparamslen++;
727}
728
729static void ResetServerSaveParams() {
730 zfree(server.saveparams);
731 server.saveparams = NULL;
732 server.saveparamslen = 0;
733}
734
735static void initServerConfig() {
736 server.dbnum = REDIS_DEFAULT_DBNUM;
737 server.port = REDIS_SERVERPORT;
738 server.verbosity = REDIS_DEBUG;
739 server.maxidletime = REDIS_MAXIDLETIME;
740 server.saveparams = NULL;
741 server.logfile = NULL; /* NULL = log on standard output */
742 server.bindaddr = NULL;
743 server.glueoutputbuf = 1;
744 server.daemonize = 0;
ed329fcf 745 server.pidfile = "/var/run/redis.pid";
ed9b544e 746 server.dbfilename = "dump.rdb";
10c43610 747 server.shareobjects = 0;
ed9b544e 748 ResetServerSaveParams();
749
750 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
751 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
752 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
753 /* Replication related */
754 server.isslave = 0;
755 server.masterhost = NULL;
756 server.masterport = 6379;
757 server.master = NULL;
758 server.replstate = REDIS_REPL_NONE;
759}
760
761static void initServer() {
762 int j;
763
764 signal(SIGHUP, SIG_IGN);
765 signal(SIGPIPE, SIG_IGN);
766
767 server.clients = listCreate();
768 server.slaves = listCreate();
87eca727 769 server.monitors = listCreate();
ed9b544e 770 server.objfreelist = listCreate();
771 createSharedObjects();
772 server.el = aeCreateEventLoop();
773 server.dict = zmalloc(sizeof(dict*)*server.dbnum);
10c43610 774 server.sharingpool = dictCreate(&setDictType,NULL);
775 server.sharingpoolsize = 1024;
87eca727 776 if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
ed9b544e 777 oom("server initialization"); /* Fatal OOM */
778 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
779 if (server.fd == -1) {
780 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
781 exit(1);
782 }
10c43610 783 for (j = 0; j < server.dbnum; j++)
ed9b544e 784 server.dict[j] = dictCreate(&hashDictType,NULL);
ed9b544e 785 server.cronloops = 0;
786 server.bgsaveinprogress = 0;
787 server.lastsave = time(NULL);
788 server.dirty = 0;
789 server.usedmemory = 0;
790 server.stat_numcommands = 0;
791 server.stat_numconnections = 0;
792 server.stat_starttime = time(NULL);
793 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
794}
795
796/* Empty the whole database */
797static void emptyDb() {
798 int j;
799
800 for (j = 0; j < server.dbnum; j++)
801 dictEmpty(server.dict[j]);
802}
803
804/* I agree, this is a very rudimental way to load a configuration...
805 will improve later if the config gets more complex */
806static void loadServerConfig(char *filename) {
807 FILE *fp = fopen(filename,"r");
808 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
809 int linenum = 0;
810 sds line = NULL;
811
812 if (!fp) {
813 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
814 exit(1);
815 }
816 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
817 sds *argv;
818 int argc, j;
819
820 linenum++;
821 line = sdsnew(buf);
822 line = sdstrim(line," \t\r\n");
823
824 /* Skip comments and blank lines*/
825 if (line[0] == '#' || line[0] == '\0') {
826 sdsfree(line);
827 continue;
828 }
829
830 /* Split into arguments */
831 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
832 sdstolower(argv[0]);
833
834 /* Execute config directives */
835 if (!strcmp(argv[0],"timeout") && argc == 2) {
836 server.maxidletime = atoi(argv[1]);
837 if (server.maxidletime < 1) {
838 err = "Invalid timeout value"; goto loaderr;
839 }
840 } else if (!strcmp(argv[0],"port") && argc == 2) {
841 server.port = atoi(argv[1]);
842 if (server.port < 1 || server.port > 65535) {
843 err = "Invalid port"; goto loaderr;
844 }
845 } else if (!strcmp(argv[0],"bind") && argc == 2) {
846 server.bindaddr = zstrdup(argv[1]);
847 } else if (!strcmp(argv[0],"save") && argc == 3) {
848 int seconds = atoi(argv[1]);
849 int changes = atoi(argv[2]);
850 if (seconds < 1 || changes < 0) {
851 err = "Invalid save parameters"; goto loaderr;
852 }
853 appendServerSaveParams(seconds,changes);
854 } else if (!strcmp(argv[0],"dir") && argc == 2) {
855 if (chdir(argv[1]) == -1) {
856 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
857 argv[1], strerror(errno));
858 exit(1);
859 }
860 } else if (!strcmp(argv[0],"loglevel") && argc == 2) {
861 if (!strcmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
862 else if (!strcmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
863 else if (!strcmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
864 else {
865 err = "Invalid log level. Must be one of debug, notice, warning";
866 goto loaderr;
867 }
868 } else if (!strcmp(argv[0],"logfile") && argc == 2) {
869 FILE *fp;
870
871 server.logfile = zstrdup(argv[1]);
872 if (!strcmp(server.logfile,"stdout")) {
873 zfree(server.logfile);
874 server.logfile = NULL;
875 }
876 if (server.logfile) {
877 /* Test if we are able to open the file. The server will not
878 * be able to abort just for this problem later... */
879 fp = fopen(server.logfile,"a");
880 if (fp == NULL) {
881 err = sdscatprintf(sdsempty(),
882 "Can't open the log file: %s", strerror(errno));
883 goto loaderr;
884 }
885 fclose(fp);
886 }
887 } else if (!strcmp(argv[0],"databases") && argc == 2) {
888 server.dbnum = atoi(argv[1]);
889 if (server.dbnum < 1) {
890 err = "Invalid number of databases"; goto loaderr;
891 }
892 } else if (!strcmp(argv[0],"slaveof") && argc == 3) {
893 server.masterhost = sdsnew(argv[1]);
894 server.masterport = atoi(argv[2]);
895 server.replstate = REDIS_REPL_CONNECT;
896 } else if (!strcmp(argv[0],"glueoutputbuf") && argc == 2) {
897 sdstolower(argv[1]);
898 if (!strcmp(argv[1],"yes")) server.glueoutputbuf = 1;
899 else if (!strcmp(argv[1],"no")) server.glueoutputbuf = 0;
900 else {
901 err = "argument must be 'yes' or 'no'"; goto loaderr;
902 }
10c43610 903 } else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
904 sdstolower(argv[1]);
905 if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
906 else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
907 else {
908 err = "argument must be 'yes' or 'no'"; goto loaderr;
909 }
ed9b544e 910 } else if (!strcmp(argv[0],"daemonize") && argc == 2) {
911 sdstolower(argv[1]);
912 if (!strcmp(argv[1],"yes")) server.daemonize = 1;
913 else if (!strcmp(argv[1],"no")) server.daemonize = 0;
914 else {
915 err = "argument must be 'yes' or 'no'"; goto loaderr;
916 }
ed329fcf
LH
917 } else if (!strcmp(argv[0],"pidfile") && argc == 2) {
918 server.pidfile = zstrdup(argv[1]);
ed9b544e 919 } else {
920 err = "Bad directive or wrong number of arguments"; goto loaderr;
921 }
922 for (j = 0; j < argc; j++)
923 sdsfree(argv[j]);
924 zfree(argv);
925 sdsfree(line);
926 }
927 fclose(fp);
928 return;
929
930loaderr:
931 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
932 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
933 fprintf(stderr, ">>> '%s'\n", line);
934 fprintf(stderr, "%s\n", err);
935 exit(1);
936}
937
938static void freeClientArgv(redisClient *c) {
939 int j;
940
941 for (j = 0; j < c->argc; j++)
942 decrRefCount(c->argv[j]);
943 c->argc = 0;
944}
945
946static void freeClient(redisClient *c) {
947 listNode *ln;
948
949 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
950 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
951 sdsfree(c->querybuf);
952 listRelease(c->reply);
953 freeClientArgv(c);
954 close(c->fd);
955 ln = listSearchKey(server.clients,c);
956 assert(ln != NULL);
957 listDelNode(server.clients,ln);
958 if (c->flags & REDIS_SLAVE) {
87eca727 959 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
960 ln = listSearchKey(l,c);
ed9b544e 961 assert(ln != NULL);
87eca727 962 listDelNode(l,ln);
ed9b544e 963 }
964 if (c->flags & REDIS_MASTER) {
965 server.master = NULL;
966 server.replstate = REDIS_REPL_CONNECT;
967 }
968 zfree(c);
969}
970
971static void glueReplyBuffersIfNeeded(redisClient *c) {
972 int totlen = 0;
973 listNode *ln = c->reply->head, *next;
974 robj *o;
975
976 while(ln) {
977 o = ln->value;
978 totlen += sdslen(o->ptr);
979 ln = ln->next;
980 /* This optimization makes more sense if we don't have to copy
981 * too much data */
982 if (totlen > 1024) return;
983 }
984 if (totlen > 0) {
985 char buf[1024];
986 int copylen = 0;
987
988 ln = c->reply->head;
989 while(ln) {
990 next = ln->next;
991 o = ln->value;
992 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
993 copylen += sdslen(o->ptr);
994 listDelNode(c->reply,ln);
995 ln = next;
996 }
997 /* Now the output buffer is empty, add the new single element */
998 addReplySds(c,sdsnewlen(buf,totlen));
999 }
1000}
1001
1002static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1003 redisClient *c = privdata;
1004 int nwritten = 0, totwritten = 0, objlen;
1005 robj *o;
1006 REDIS_NOTUSED(el);
1007 REDIS_NOTUSED(mask);
1008
1009 if (server.glueoutputbuf && listLength(c->reply) > 1)
1010 glueReplyBuffersIfNeeded(c);
1011 while(listLength(c->reply)) {
1012 o = listNodeValue(listFirst(c->reply));
1013 objlen = sdslen(o->ptr);
1014
1015 if (objlen == 0) {
1016 listDelNode(c->reply,listFirst(c->reply));
1017 continue;
1018 }
1019
1020 if (c->flags & REDIS_MASTER) {
1021 nwritten = objlen - c->sentlen;
1022 } else {
1023 nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen);
1024 if (nwritten <= 0) break;
1025 }
1026 c->sentlen += nwritten;
1027 totwritten += nwritten;
1028 /* If we fully sent the object on head go to the next one */
1029 if (c->sentlen == objlen) {
1030 listDelNode(c->reply,listFirst(c->reply));
1031 c->sentlen = 0;
1032 }
1033 }
1034 if (nwritten == -1) {
1035 if (errno == EAGAIN) {
1036 nwritten = 0;
1037 } else {
1038 redisLog(REDIS_DEBUG,
1039 "Error writing to client: %s", strerror(errno));
1040 freeClient(c);
1041 return;
1042 }
1043 }
1044 if (totwritten > 0) c->lastinteraction = time(NULL);
1045 if (listLength(c->reply) == 0) {
1046 c->sentlen = 0;
1047 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1048 }
1049}
1050
1051static struct redisCommand *lookupCommand(char *name) {
1052 int j = 0;
1053 while(cmdTable[j].name != NULL) {
1054 if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j];
1055 j++;
1056 }
1057 return NULL;
1058}
1059
1060/* resetClient prepare the client to process the next command */
1061static void resetClient(redisClient *c) {
1062 freeClientArgv(c);
1063 c->bulklen = -1;
1064}
1065
1066/* If this function gets called we already read a whole
1067 * command, argments are in the client argv/argc fields.
1068 * processCommand() execute the command or prepare the
1069 * server for a bulk read from the client.
1070 *
1071 * If 1 is returned the client is still alive and valid and
1072 * and other operations can be performed by the caller. Otherwise
1073 * if 0 is returned the client was destroied (i.e. after QUIT). */
1074static int processCommand(redisClient *c) {
1075 struct redisCommand *cmd;
1076 long long dirty;
1077
1078 sdstolower(c->argv[0]->ptr);
1079 /* The QUIT command is handled as a special case. Normal command
1080 * procs are unable to close the client connection safely */
1081 if (!strcmp(c->argv[0]->ptr,"quit")) {
1082 freeClient(c);
1083 return 0;
1084 }
1085 cmd = lookupCommand(c->argv[0]->ptr);
1086 if (!cmd) {
1087 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1088 resetClient(c);
1089 return 1;
1090 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1091 (c->argc < -cmd->arity)) {
1092 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1093 resetClient(c);
1094 return 1;
1095 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1096 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1097
1098 decrRefCount(c->argv[c->argc-1]);
1099 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1100 c->argc--;
1101 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1102 resetClient(c);
1103 return 1;
1104 }
1105 c->argc--;
1106 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1107 /* It is possible that the bulk read is already in the
1108 * buffer. Check this condition and handle it accordingly */
1109 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1110 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1111 c->argc++;
1112 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1113 } else {
1114 return 1;
1115 }
1116 }
10c43610 1117 /* Let's try to share objects on the command arguments vector */
1118 if (server.shareobjects) {
1119 int j;
1120 for(j = 1; j < c->argc; j++)
1121 c->argv[j] = tryObjectSharing(c->argv[j]);
1122 }
ed9b544e 1123 /* Exec the command */
1124 dirty = server.dirty;
1125 cmd->proc(c);
1126 if (server.dirty-dirty != 0 && listLength(server.slaves))
87eca727 1127 replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc);
1128 if (listLength(server.monitors))
1129 replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc);
ed9b544e 1130 server.stat_numcommands++;
1131
1132 /* Prepare the client for the next command */
1133 if (c->flags & REDIS_CLOSE) {
1134 freeClient(c);
1135 return 0;
1136 }
1137 resetClient(c);
1138 return 1;
1139}
1140
87eca727 1141static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1142 listNode *ln = slaves->head;
ed9b544e 1143 robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
1144 int outc = 0, j;
1145
1146 for (j = 0; j < argc; j++) {
1147 if (j != 0) outv[outc++] = shared.space;
1148 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1149 robj *lenobj;
1150
1151 lenobj = createObject(REDIS_STRING,
1152 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1153 lenobj->refcount = 0;
1154 outv[outc++] = lenobj;
1155 }
1156 outv[outc++] = argv[j];
1157 }
1158 outv[outc++] = shared.crlf;
1159
1160 while(ln) {
1161 redisClient *slave = ln->value;
1162 if (slave->slaveseldb != dictid) {
1163 robj *selectcmd;
1164
1165 switch(dictid) {
1166 case 0: selectcmd = shared.select0; break;
1167 case 1: selectcmd = shared.select1; break;
1168 case 2: selectcmd = shared.select2; break;
1169 case 3: selectcmd = shared.select3; break;
1170 case 4: selectcmd = shared.select4; break;
1171 case 5: selectcmd = shared.select5; break;
1172 case 6: selectcmd = shared.select6; break;
1173 case 7: selectcmd = shared.select7; break;
1174 case 8: selectcmd = shared.select8; break;
1175 case 9: selectcmd = shared.select9; break;
1176 default:
1177 selectcmd = createObject(REDIS_STRING,
1178 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1179 selectcmd->refcount = 0;
1180 break;
1181 }
1182 addReply(slave,selectcmd);
1183 slave->slaveseldb = dictid;
1184 }
1185 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1186 ln = ln->next;
1187 }
1188}
1189
1190static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1191 redisClient *c = (redisClient*) privdata;
1192 char buf[REDIS_QUERYBUF_LEN];
1193 int nread;
1194 REDIS_NOTUSED(el);
1195 REDIS_NOTUSED(mask);
1196
1197 nread = read(fd, buf, REDIS_QUERYBUF_LEN);
1198 if (nread == -1) {
1199 if (errno == EAGAIN) {
1200 nread = 0;
1201 } else {
1202 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1203 freeClient(c);
1204 return;
1205 }
1206 } else if (nread == 0) {
1207 redisLog(REDIS_DEBUG, "Client closed connection");
1208 freeClient(c);
1209 return;
1210 }
1211 if (nread) {
1212 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1213 c->lastinteraction = time(NULL);
1214 } else {
1215 return;
1216 }
1217
1218again:
1219 if (c->bulklen == -1) {
1220 /* Read the first line of the query */
1221 char *p = strchr(c->querybuf,'\n');
1222 size_t querylen;
1223 if (p) {
1224 sds query, *argv;
1225 int argc, j;
1226
1227 query = c->querybuf;
1228 c->querybuf = sdsempty();
1229 querylen = 1+(p-(query));
1230 if (sdslen(query) > querylen) {
1231 /* leave data after the first line of the query in the buffer */
1232 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1233 }
1234 *p = '\0'; /* remove "\n" */
1235 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1236 sdsupdatelen(query);
1237
1238 /* Now we can split the query in arguments */
1239 if (sdslen(query) == 0) {
1240 /* Ignore empty query */
1241 sdsfree(query);
1242 return;
1243 }
1244 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1245 sdsfree(query);
1246 if (argv == NULL) oom("sdssplitlen");
1247 for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {
1248 if (sdslen(argv[j])) {
1249 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1250 c->argc++;
1251 } else {
1252 sdsfree(argv[j]);
1253 }
1254 }
1255 zfree(argv);
1256 /* Execute the command. If the client is still valid
1257 * after processCommand() return and there is something
1258 * on the query buffer try to process the next command. */
1259 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1260 return;
1261 } else if (sdslen(c->querybuf) >= 1024) {
1262 redisLog(REDIS_DEBUG, "Client protocol error");
1263 freeClient(c);
1264 return;
1265 }
1266 } else {
1267 /* Bulk read handling. Note that if we are at this point
1268 the client already sent a command terminated with a newline,
1269 we are reading the bulk data that is actually the last
1270 argument of the command. */
1271 int qbl = sdslen(c->querybuf);
1272
1273 if (c->bulklen <= qbl) {
1274 /* Copy everything but the final CRLF as final argument */
1275 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1276 c->argc++;
1277 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1278 processCommand(c);
1279 return;
1280 }
1281 }
1282}
1283
1284static int selectDb(redisClient *c, int id) {
1285 if (id < 0 || id >= server.dbnum)
1286 return REDIS_ERR;
1287 c->dict = server.dict[id];
1288 c->dictid = id;
1289 return REDIS_OK;
1290}
1291
1292static redisClient *createClient(int fd) {
1293 redisClient *c = zmalloc(sizeof(*c));
1294
1295 anetNonBlock(NULL,fd);
1296 anetTcpNoDelay(NULL,fd);
1297 if (!c) return NULL;
1298 selectDb(c,0);
1299 c->fd = fd;
1300 c->querybuf = sdsempty();
1301 c->argc = 0;
1302 c->bulklen = -1;
1303 c->sentlen = 0;
1304 c->flags = 0;
1305 c->lastinteraction = time(NULL);
1306 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1307 listSetFreeMethod(c->reply,decrRefCount);
1308 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1309 readQueryFromClient, c, NULL) == AE_ERR) {
1310 freeClient(c);
1311 return NULL;
1312 }
1313 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1314 return c;
1315}
1316
1317static void addReply(redisClient *c, robj *obj) {
1318 if (listLength(c->reply) == 0 &&
1319 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1320 sendReplyToClient, c, NULL) == AE_ERR) return;
1321 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1322 incrRefCount(obj);
1323}
1324
1325static void addReplySds(redisClient *c, sds s) {
1326 robj *o = createObject(REDIS_STRING,s);
1327 addReply(c,o);
1328 decrRefCount(o);
1329}
1330
1331static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1332 int cport, cfd;
1333 char cip[128];
1334 REDIS_NOTUSED(el);
1335 REDIS_NOTUSED(mask);
1336 REDIS_NOTUSED(privdata);
1337
1338 cfd = anetAccept(server.neterr, fd, cip, &cport);
1339 if (cfd == AE_ERR) {
1340 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1341 return;
1342 }
1343 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1344 if (createClient(cfd) == NULL) {
1345 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1346 close(cfd); /* May be already closed, just ingore errors */
1347 return;
1348 }
1349 server.stat_numconnections++;
1350}
1351
1352/* ======================= Redis objects implementation ===================== */
1353
1354static robj *createObject(int type, void *ptr) {
1355 robj *o;
1356
1357 if (listLength(server.objfreelist)) {
1358 listNode *head = listFirst(server.objfreelist);
1359 o = listNodeValue(head);
1360 listDelNode(server.objfreelist,head);
1361 } else {
1362 o = zmalloc(sizeof(*o));
1363 }
1364 if (!o) oom("createObject");
1365 o->type = type;
1366 o->ptr = ptr;
1367 o->refcount = 1;
1368 return o;
1369}
1370
1371static robj *createStringObject(char *ptr, size_t len) {
1372 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1373}
1374
1375static robj *createListObject(void) {
1376 list *l = listCreate();
1377
1378 if (!l) oom("listCreate");
1379 listSetFreeMethod(l,decrRefCount);
1380 return createObject(REDIS_LIST,l);
1381}
1382
1383static robj *createSetObject(void) {
1384 dict *d = dictCreate(&setDictType,NULL);
1385 if (!d) oom("dictCreate");
1386 return createObject(REDIS_SET,d);
1387}
1388
1389#if 0
1390static robj *createHashObject(void) {
1391 dict *d = dictCreate(&hashDictType,NULL);
1392 if (!d) oom("dictCreate");
1393 return createObject(REDIS_SET,d);
1394}
1395#endif
1396
1397static void freeStringObject(robj *o) {
1398 sdsfree(o->ptr);
1399}
1400
1401static void freeListObject(robj *o) {
1402 listRelease((list*) o->ptr);
1403}
1404
1405static void freeSetObject(robj *o) {
1406 dictRelease((dict*) o->ptr);
1407}
1408
1409static void freeHashObject(robj *o) {
1410 dictRelease((dict*) o->ptr);
1411}
1412
1413static void incrRefCount(robj *o) {
1414 o->refcount++;
1415}
1416
1417static void decrRefCount(void *obj) {
1418 robj *o = obj;
1419 if (--(o->refcount) == 0) {
1420 switch(o->type) {
1421 case REDIS_STRING: freeStringObject(o); break;
1422 case REDIS_LIST: freeListObject(o); break;
1423 case REDIS_SET: freeSetObject(o); break;
1424 case REDIS_HASH: freeHashObject(o); break;
1425 default: assert(0 != 0); break;
1426 }
1427 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1428 !listAddNodeHead(server.objfreelist,o))
1429 zfree(o);
1430 }
1431}
1432
10c43610 1433/* Try to share an object against the shared objects pool */
1434static robj *tryObjectSharing(robj *o) {
1435 struct dictEntry *de;
1436 unsigned long c;
1437
1438 if (server.shareobjects == 0) return o;
1439
1440 assert(o->type == REDIS_STRING);
1441 de = dictFind(server.sharingpool,o);
1442 if (de) {
1443 robj *shared = dictGetEntryKey(de);
1444
1445 c = ((unsigned long) dictGetEntryVal(de))+1;
1446 dictGetEntryVal(de) = (void*) c;
1447 incrRefCount(shared);
1448 decrRefCount(o);
1449 return shared;
1450 } else {
1451 /* Here we are using a stream algorihtm: Every time an object is
1452 * shared we increment its count, everytime there is a miss we
1453 * recrement the counter of a random object. If this object reaches
1454 * zero we remove the object and put the current object instead. */
1455 if (dictGetHashTableUsed(server.sharingpool) >=
1456 server.sharingpoolsize) {
1457 de = dictGetRandomKey(server.sharingpool);
1458 assert(de != NULL);
1459 c = ((unsigned long) dictGetEntryVal(de))-1;
1460 dictGetEntryVal(de) = (void*) c;
1461 if (c == 0) {
1462 dictDelete(server.sharingpool,de->key);
1463 }
1464 } else {
1465 c = 0; /* If the pool is empty we want to add this object */
1466 }
1467 if (c == 0) {
1468 int retval;
1469
1470 retval = dictAdd(server.sharingpool,o,(void*)1);
1471 assert(retval == DICT_OK);
1472 incrRefCount(o);
1473 }
1474 return o;
1475 }
1476}
1477
ed9b544e 1478/*============================ DB saving/loading ============================ */
1479
f78fd11b 1480static int rdbSaveType(FILE *fp, unsigned char type) {
1481 if (fwrite(&type,1,1,fp) == 0) return -1;
1482 return 0;
1483}
1484
1485static int rdbSaveLen(FILE *fp, uint32_t len) {
1486 unsigned char buf[2];
1487
1488 if (len < (1<<6)) {
1489 /* Save a 6 bit len */
10c43610 1490 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
f78fd11b 1491 if (fwrite(buf,1,1,fp) == 0) return -1;
1492 } else if (len < (1<<14)) {
1493 /* Save a 14 bit len */
10c43610 1494 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
f78fd11b 1495 buf[1] = len&0xFF;
1496 if (fwrite(buf,4,1,fp) == 0) return -1;
1497 } else {
1498 /* Save a 32 bit len */
10c43610 1499 buf[0] = (REDIS_RDB_32BITLEN<<6);
f78fd11b 1500 if (fwrite(buf,1,1,fp) == 0) return -1;
1501 len = htonl(len);
1502 if (fwrite(&len,4,1,fp) == 0) return -1;
1503 }
1504 return 0;
1505}
1506
10c43610 1507static int rdbSaveStringObject(FILE *fp, robj *obj) {
1508 size_t len = sdslen(obj->ptr);
1509
1510 if (rdbSaveLen(fp,len) == -1) return -1;
1511 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1512 return 0;
1513}
1514
ed9b544e 1515/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
f78fd11b 1516static int rdbSave(char *filename) {
ed9b544e 1517 dictIterator *di = NULL;
1518 dictEntry *de;
ed9b544e 1519 FILE *fp;
1520 char tmpfile[256];
1521 int j;
1522
1523 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1524 fp = fopen(tmpfile,"w");
1525 if (!fp) {
1526 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1527 return REDIS_ERR;
1528 }
f78fd11b 1529 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
ed9b544e 1530 for (j = 0; j < server.dbnum; j++) {
1531 dict *d = server.dict[j];
1532 if (dictGetHashTableUsed(d) == 0) continue;
1533 di = dictGetIterator(d);
1534 if (!di) {
1535 fclose(fp);
1536 return REDIS_ERR;
1537 }
1538
1539 /* Write the SELECT DB opcode */
f78fd11b 1540 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1541 if (rdbSaveLen(fp,j) == -1) goto werr;
ed9b544e 1542
1543 /* Iterate this DB writing every entry */
1544 while((de = dictNext(di)) != NULL) {
1545 robj *key = dictGetEntryKey(de);
1546 robj *o = dictGetEntryVal(de);
1547
f78fd11b 1548 if (rdbSaveType(fp,o->type) == -1) goto werr;
10c43610 1549 if (rdbSaveStringObject(fp,key) == -1) goto werr;
f78fd11b 1550 if (o->type == REDIS_STRING) {
ed9b544e 1551 /* Save a string value */
10c43610 1552 if (rdbSaveStringObject(fp,o) == -1) goto werr;
f78fd11b 1553 } else if (o->type == REDIS_LIST) {
ed9b544e 1554 /* Save a list value */
1555 list *list = o->ptr;
1556 listNode *ln = list->head;
1557
f78fd11b 1558 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
ed9b544e 1559 while(ln) {
1560 robj *eleobj = listNodeValue(ln);
f78fd11b 1561
10c43610 1562 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 1563 ln = ln->next;
1564 }
f78fd11b 1565 } else if (o->type == REDIS_SET) {
ed9b544e 1566 /* Save a set value */
1567 dict *set = o->ptr;
1568 dictIterator *di = dictGetIterator(set);
1569 dictEntry *de;
1570
1571 if (!set) oom("dictGetIteraotr");
f78fd11b 1572 if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr;
ed9b544e 1573 while((de = dictNext(di)) != NULL) {
10c43610 1574 robj *eleobj = dictGetEntryKey(de);
ed9b544e 1575
10c43610 1576 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ed9b544e 1577 }
1578 dictReleaseIterator(di);
1579 } else {
1580 assert(0 != 0);
1581 }
1582 }
1583 dictReleaseIterator(di);
1584 }
1585 /* EOF opcode */
f78fd11b 1586 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1587
1588 /* Make sure data will not remain on the OS's output buffers */
ed9b544e 1589 fflush(fp);
1590 fsync(fileno(fp));
1591 fclose(fp);
1592
1593 /* Use RENAME to make sure the DB file is changed atomically only
1594 * if the generate DB file is ok. */
1595 if (rename(tmpfile,filename) == -1) {
1596 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1597 unlink(tmpfile);
1598 return REDIS_ERR;
1599 }
1600 redisLog(REDIS_NOTICE,"DB saved on disk");
1601 server.dirty = 0;
1602 server.lastsave = time(NULL);
1603 return REDIS_OK;
1604
1605werr:
1606 fclose(fp);
1607 unlink(tmpfile);
1608 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1609 if (di) dictReleaseIterator(di);
1610 return REDIS_ERR;
1611}
1612
f78fd11b 1613static int rdbSaveBackground(char *filename) {
ed9b544e 1614 pid_t childpid;
1615
1616 if (server.bgsaveinprogress) return REDIS_ERR;
1617 if ((childpid = fork()) == 0) {
1618 /* Child */
1619 close(server.fd);
f78fd11b 1620 if (rdbSave(filename) == REDIS_OK) {
ed9b544e 1621 exit(0);
1622 } else {
1623 exit(1);
1624 }
1625 } else {
1626 /* Parent */
1627 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1628 server.bgsaveinprogress = 1;
1629 return REDIS_OK;
1630 }
1631 return REDIS_OK; /* unreached */
1632}
1633
f78fd11b 1634static int rdbLoadType(FILE *fp) {
1635 unsigned char type;
7b45bfb2 1636 if (fread(&type,1,1,fp) == 0) return -1;
1637 return type;
1638}
1639
f78fd11b 1640static uint32_t rdbLoadLen(FILE *fp, int rdbver) {
1641 unsigned char buf[2];
1642 uint32_t len;
1643
1644 if (rdbver == 0) {
1645 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1646 return ntohl(len);
1647 } else {
1648 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
1649 if ((buf[0]&0xC0) == REDIS_RDB_6BITLEN) {
1650 /* Read a 6 bit len */
1651 return buf[0];
1652 } else if ((buf[0]&0xC0) == REDIS_RDB_14BITLEN) {
1653 /* Read a 14 bit len */
1654 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
1655 return ((buf[0]&0x3F)<<8)|buf[1];
1656 } else {
1657 /* Read a 32 bit len */
1658 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1659 return ntohl(len);
1660 }
1661 }
f78fd11b 1662}
1663
1664static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
1665 uint32_t len = rdbLoadLen(fp,rdbver);
1666 sds val;
1667
1668 if (len == REDIS_RDB_LENERR) return NULL;
1669 val = sdsnewlen(NULL,len);
1670 if (len && fread(val,len,1,fp) == 0) {
1671 sdsfree(val);
1672 return NULL;
1673 }
10c43610 1674 return tryObjectSharing(createObject(REDIS_STRING,val));
f78fd11b 1675}
1676
1677static int rdbLoad(char *filename) {
ed9b544e 1678 FILE *fp;
f78fd11b 1679 robj *keyobj = NULL;
1680 uint32_t dbid;
7b45bfb2 1681 int type;
ed9b544e 1682 int retval;
1683 dict *d = server.dict[0];
f78fd11b 1684 char buf[1024];
1685 int rdbver;
ed9b544e 1686 fp = fopen(filename,"r");
1687 if (!fp) return REDIS_ERR;
1688 if (fread(buf,9,1,fp) == 0) goto eoferr;
f78fd11b 1689 buf[9] = '\0';
1690 if (memcmp(buf,"REDIS",5) != 0) {
ed9b544e 1691 fclose(fp);
1692 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
1693 return REDIS_ERR;
1694 }
f78fd11b 1695 rdbver = atoi(buf+5);
1696 if (rdbver > 1) {
1697 fclose(fp);
1698 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
1699 return REDIS_ERR;
1700 }
ed9b544e 1701 while(1) {
1702 robj *o;
1703
1704 /* Read type. */
f78fd11b 1705 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
ed9b544e 1706 if (type == REDIS_EOF) break;
1707 /* Handle SELECT DB opcode as a special case */
1708 if (type == REDIS_SELECTDB) {
f78fd11b 1709 if ((dbid = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR) goto eoferr;
ed9b544e 1710 if (dbid >= (unsigned)server.dbnum) {
f78fd11b 1711 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
ed9b544e 1712 exit(1);
1713 }
1714 d = server.dict[dbid];
1715 continue;
1716 }
1717 /* Read key */
f78fd11b 1718 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 1719
1720 if (type == REDIS_STRING) {
1721 /* Read string value */
f78fd11b 1722 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 1723 } else if (type == REDIS_LIST || type == REDIS_SET) {
1724 /* Read list/set value */
1725 uint32_t listlen;
f78fd11b 1726
1727 if ((listlen = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR)
1728 goto eoferr;
ed9b544e 1729 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
1730 /* Load every single element of the list/set */
1731 while(listlen--) {
1732 robj *ele;
1733
f78fd11b 1734 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
ed9b544e 1735 if (type == REDIS_LIST) {
1736 if (!listAddNodeTail((list*)o->ptr,ele))
1737 oom("listAddNodeTail");
1738 } else {
1739 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
1740 oom("dictAdd");
1741 }
ed9b544e 1742 }
1743 } else {
1744 assert(0 != 0);
1745 }
1746 /* Add the new object in the hash table */
f78fd11b 1747 retval = dictAdd(d,keyobj,o);
ed9b544e 1748 if (retval == DICT_ERR) {
f78fd11b 1749 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
ed9b544e 1750 exit(1);
1751 }
f78fd11b 1752 keyobj = o = NULL;
ed9b544e 1753 }
1754 fclose(fp);
1755 return REDIS_OK;
1756
1757eoferr: /* unexpected end of file is handled here with a fatal exit */
f78fd11b 1758 decrRefCount(keyobj);
ed9b544e 1759 redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now.");
1760 exit(1);
1761 return REDIS_ERR; /* Just to avoid warning */
1762}
1763
1764/*================================== Commands =============================== */
1765
1766static void pingCommand(redisClient *c) {
1767 addReply(c,shared.pong);
1768}
1769
1770static void echoCommand(redisClient *c) {
c937aa89 1771 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
ed9b544e 1772 (int)sdslen(c->argv[1]->ptr)));
1773 addReply(c,c->argv[1]);
1774 addReply(c,shared.crlf);
1775}
1776
1777/*=================================== Strings =============================== */
1778
1779static void setGenericCommand(redisClient *c, int nx) {
1780 int retval;
1781
1782 retval = dictAdd(c->dict,c->argv[1],c->argv[2]);
1783 if (retval == DICT_ERR) {
1784 if (!nx) {
1785 dictReplace(c->dict,c->argv[1],c->argv[2]);
1786 incrRefCount(c->argv[2]);
1787 } else {
c937aa89 1788 addReply(c,shared.czero);
ed9b544e 1789 return;
1790 }
1791 } else {
1792 incrRefCount(c->argv[1]);
1793 incrRefCount(c->argv[2]);
1794 }
1795 server.dirty++;
c937aa89 1796 addReply(c, nx ? shared.cone : shared.ok);
ed9b544e 1797}
1798
1799static void setCommand(redisClient *c) {
1800 return setGenericCommand(c,0);
1801}
1802
1803static void setnxCommand(redisClient *c) {
1804 return setGenericCommand(c,1);
1805}
1806
1807static void getCommand(redisClient *c) {
1808 dictEntry *de;
1809
1810 de = dictFind(c->dict,c->argv[1]);
1811 if (de == NULL) {
c937aa89 1812 addReply(c,shared.nullbulk);
ed9b544e 1813 } else {
1814 robj *o = dictGetEntryVal(de);
1815
1816 if (o->type != REDIS_STRING) {
c937aa89 1817 addReply(c,shared.wrongtypeerr);
ed9b544e 1818 } else {
c937aa89 1819 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
ed9b544e 1820 addReply(c,o);
1821 addReply(c,shared.crlf);
1822 }
1823 }
1824}
1825
70003d28 1826static void mgetCommand(redisClient *c) {
1827 dictEntry *de;
1828 int j;
1829
c937aa89 1830 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
70003d28 1831 for (j = 1; j < c->argc; j++) {
1832 de = dictFind(c->dict,c->argv[j]);
1833 if (de == NULL) {
c937aa89 1834 addReply(c,shared.nullbulk);
70003d28 1835 } else {
1836 robj *o = dictGetEntryVal(de);
1837
1838 if (o->type != REDIS_STRING) {
c937aa89 1839 addReply(c,shared.nullbulk);
70003d28 1840 } else {
c937aa89 1841 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
70003d28 1842 addReply(c,o);
1843 addReply(c,shared.crlf);
1844 }
1845 }
1846 }
1847}
1848
ed9b544e 1849static void incrDecrCommand(redisClient *c, int incr) {
1850 dictEntry *de;
1851 long long value;
1852 int retval;
1853 robj *o;
1854
1855 de = dictFind(c->dict,c->argv[1]);
1856 if (de == NULL) {
1857 value = 0;
1858 } else {
1859 robj *o = dictGetEntryVal(de);
1860
1861 if (o->type != REDIS_STRING) {
1862 value = 0;
1863 } else {
1864 char *eptr;
1865
1866 value = strtoll(o->ptr, &eptr, 10);
1867 }
1868 }
1869
1870 value += incr;
1871 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
1872 retval = dictAdd(c->dict,c->argv[1],o);
1873 if (retval == DICT_ERR) {
1874 dictReplace(c->dict,c->argv[1],o);
1875 } else {
1876 incrRefCount(c->argv[1]);
1877 }
1878 server.dirty++;
c937aa89 1879 addReply(c,shared.colon);
ed9b544e 1880 addReply(c,o);
1881 addReply(c,shared.crlf);
1882}
1883
1884static void incrCommand(redisClient *c) {
1885 return incrDecrCommand(c,1);
1886}
1887
1888static void decrCommand(redisClient *c) {
1889 return incrDecrCommand(c,-1);
1890}
1891
1892static void incrbyCommand(redisClient *c) {
1893 int incr = atoi(c->argv[2]->ptr);
1894 return incrDecrCommand(c,incr);
1895}
1896
1897static void decrbyCommand(redisClient *c) {
1898 int incr = atoi(c->argv[2]->ptr);
1899 return incrDecrCommand(c,-incr);
1900}
1901
1902/* ========================= Type agnostic commands ========================= */
1903
1904static void delCommand(redisClient *c) {
1905 if (dictDelete(c->dict,c->argv[1]) == DICT_OK) {
1906 server.dirty++;
c937aa89 1907 addReply(c,shared.cone);
ed9b544e 1908 } else {
c937aa89 1909 addReply(c,shared.czero);
ed9b544e 1910 }
1911}
1912
1913static void existsCommand(redisClient *c) {
1914 dictEntry *de;
1915
1916 de = dictFind(c->dict,c->argv[1]);
1917 if (de == NULL)
c937aa89 1918 addReply(c,shared.czero);
ed9b544e 1919 else
c937aa89 1920 addReply(c,shared.cone);
ed9b544e 1921}
1922
1923static void selectCommand(redisClient *c) {
1924 int id = atoi(c->argv[1]->ptr);
1925
1926 if (selectDb(c,id) == REDIS_ERR) {
1927 addReplySds(c,"-ERR invalid DB index\r\n");
1928 } else {
1929 addReply(c,shared.ok);
1930 }
1931}
1932
1933static void randomkeyCommand(redisClient *c) {
1934 dictEntry *de;
1935
1936 de = dictGetRandomKey(c->dict);
1937 if (de == NULL) {
1938 addReply(c,shared.crlf);
1939 } else {
c937aa89 1940 addReply(c,shared.plus);
ed9b544e 1941 addReply(c,dictGetEntryKey(de));
1942 addReply(c,shared.crlf);
1943 }
1944}
1945
1946static void keysCommand(redisClient *c) {
1947 dictIterator *di;
1948 dictEntry *de;
1949 sds pattern = c->argv[1]->ptr;
1950 int plen = sdslen(pattern);
1951 int numkeys = 0, keyslen = 0;
1952 robj *lenobj = createObject(REDIS_STRING,NULL);
1953
1954 di = dictGetIterator(c->dict);
1955 if (!di) oom("dictGetIterator");
1956 addReply(c,lenobj);
1957 decrRefCount(lenobj);
1958 while((de = dictNext(di)) != NULL) {
1959 robj *keyobj = dictGetEntryKey(de);
1960 sds key = keyobj->ptr;
1961 if ((pattern[0] == '*' && pattern[1] == '\0') ||
1962 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
1963 if (numkeys != 0)
1964 addReply(c,shared.space);
1965 addReply(c,keyobj);
1966 numkeys++;
1967 keyslen += sdslen(key);
1968 }
1969 }
1970 dictReleaseIterator(di);
c937aa89 1971 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
ed9b544e 1972 addReply(c,shared.crlf);
1973}
1974
1975static void dbsizeCommand(redisClient *c) {
1976 addReplySds(c,
c937aa89 1977 sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict)));
ed9b544e 1978}
1979
1980static void lastsaveCommand(redisClient *c) {
1981 addReplySds(c,
c937aa89 1982 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
ed9b544e 1983}
1984
1985static void typeCommand(redisClient *c) {
1986 dictEntry *de;
1987 char *type;
1988
1989 de = dictFind(c->dict,c->argv[1]);
1990 if (de == NULL) {
c937aa89 1991 type = "+none";
ed9b544e 1992 } else {
1993 robj *o = dictGetEntryVal(de);
1994
1995 switch(o->type) {
c937aa89 1996 case REDIS_STRING: type = "+string"; break;
1997 case REDIS_LIST: type = "+list"; break;
1998 case REDIS_SET: type = "+set"; break;
ed9b544e 1999 default: type = "unknown"; break;
2000 }
2001 }
2002 addReplySds(c,sdsnew(type));
2003 addReply(c,shared.crlf);
2004}
2005
2006static void saveCommand(redisClient *c) {
f78fd11b 2007 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed9b544e 2008 addReply(c,shared.ok);
2009 } else {
2010 addReply(c,shared.err);
2011 }
2012}
2013
2014static void bgsaveCommand(redisClient *c) {
2015 if (server.bgsaveinprogress) {
2016 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2017 return;
2018 }
f78fd11b 2019 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
ed9b544e 2020 addReply(c,shared.ok);
2021 } else {
2022 addReply(c,shared.err);
2023 }
2024}
2025
2026static void shutdownCommand(redisClient *c) {
2027 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
f78fd11b 2028 if (rdbSave(server.dbfilename) == REDIS_OK) {
ed329fcf
LH
2029 if (server.daemonize) {
2030 unlink(server.pidfile);
2031 }
ed9b544e 2032 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2033 exit(1);
2034 } else {
2035 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2036 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2037 }
2038}
2039
2040static void renameGenericCommand(redisClient *c, int nx) {
2041 dictEntry *de;
2042 robj *o;
2043
2044 /* To use the same key as src and dst is probably an error */
2045 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
c937aa89 2046 addReply(c,shared.sameobjecterr);
ed9b544e 2047 return;
2048 }
2049
2050 de = dictFind(c->dict,c->argv[1]);
2051 if (de == NULL) {
c937aa89 2052 addReply(c,shared.nokeyerr);
ed9b544e 2053 return;
2054 }
2055 o = dictGetEntryVal(de);
2056 incrRefCount(o);
2057 if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) {
2058 if (nx) {
2059 decrRefCount(o);
c937aa89 2060 addReply(c,shared.czero);
ed9b544e 2061 return;
2062 }
2063 dictReplace(c->dict,c->argv[2],o);
2064 } else {
2065 incrRefCount(c->argv[2]);
2066 }
2067 dictDelete(c->dict,c->argv[1]);
2068 server.dirty++;
c937aa89 2069 addReply(c,nx ? shared.cone : shared.ok);
ed9b544e 2070}
2071
2072static void renameCommand(redisClient *c) {
2073 renameGenericCommand(c,0);
2074}
2075
2076static void renamenxCommand(redisClient *c) {
2077 renameGenericCommand(c,1);
2078}
2079
2080static void moveCommand(redisClient *c) {
2081 dictEntry *de;
2082 robj *o, *key;
2083 dict *src, *dst;
2084 int srcid;
2085
2086 /* Obtain source and target DB pointers */
2087 src = c->dict;
2088 srcid = c->dictid;
2089 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
c937aa89 2090 addReply(c,shared.outofrangeerr);
ed9b544e 2091 return;
2092 }
2093 dst = c->dict;
2094 c->dict = src;
2095 c->dictid = srcid;
2096
2097 /* If the user is moving using as target the same
2098 * DB as the source DB it is probably an error. */
2099 if (src == dst) {
c937aa89 2100 addReply(c,shared.sameobjecterr);
ed9b544e 2101 return;
2102 }
2103
2104 /* Check if the element exists and get a reference */
2105 de = dictFind(c->dict,c->argv[1]);
2106 if (!de) {
c937aa89 2107 addReply(c,shared.czero);
ed9b544e 2108 return;
2109 }
2110
2111 /* Try to add the element to the target DB */
2112 key = dictGetEntryKey(de);
2113 o = dictGetEntryVal(de);
2114 if (dictAdd(dst,key,o) == DICT_ERR) {
c937aa89 2115 addReply(c,shared.czero);
ed9b544e 2116 return;
2117 }
2118 incrRefCount(key);
2119 incrRefCount(o);
2120
2121 /* OK! key moved, free the entry in the source DB */
2122 dictDelete(src,c->argv[1]);
2123 server.dirty++;
c937aa89 2124 addReply(c,shared.cone);
ed9b544e 2125}
2126
2127/* =================================== Lists ================================ */
2128static void pushGenericCommand(redisClient *c, int where) {
2129 robj *lobj;
2130 dictEntry *de;
2131 list *list;
2132
2133 de = dictFind(c->dict,c->argv[1]);
2134 if (de == NULL) {
2135 lobj = createListObject();
2136 list = lobj->ptr;
2137 if (where == REDIS_HEAD) {
2138 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2139 } else {
2140 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2141 }
2142 dictAdd(c->dict,c->argv[1],lobj);
2143 incrRefCount(c->argv[1]);
2144 incrRefCount(c->argv[2]);
2145 } else {
2146 lobj = dictGetEntryVal(de);
2147 if (lobj->type != REDIS_LIST) {
2148 addReply(c,shared.wrongtypeerr);
2149 return;
2150 }
2151 list = lobj->ptr;
2152 if (where == REDIS_HEAD) {
2153 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2154 } else {
2155 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2156 }
2157 incrRefCount(c->argv[2]);
2158 }
2159 server.dirty++;
2160 addReply(c,shared.ok);
2161}
2162
2163static void lpushCommand(redisClient *c) {
2164 pushGenericCommand(c,REDIS_HEAD);
2165}
2166
2167static void rpushCommand(redisClient *c) {
2168 pushGenericCommand(c,REDIS_TAIL);
2169}
2170
2171static void llenCommand(redisClient *c) {
2172 dictEntry *de;
2173 list *l;
2174
2175 de = dictFind(c->dict,c->argv[1]);
2176 if (de == NULL) {
c937aa89 2177 addReply(c,shared.czero);
ed9b544e 2178 return;
2179 } else {
2180 robj *o = dictGetEntryVal(de);
2181 if (o->type != REDIS_LIST) {
c937aa89 2182 addReply(c,shared.wrongtypeerr);
ed9b544e 2183 } else {
2184 l = o->ptr;
c937aa89 2185 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
ed9b544e 2186 }
2187 }
2188}
2189
2190static void lindexCommand(redisClient *c) {
2191 dictEntry *de;
2192 int index = atoi(c->argv[2]->ptr);
2193
2194 de = dictFind(c->dict,c->argv[1]);
2195 if (de == NULL) {
c937aa89 2196 addReply(c,shared.nullbulk);
ed9b544e 2197 } else {
2198 robj *o = dictGetEntryVal(de);
2199
2200 if (o->type != REDIS_LIST) {
c937aa89 2201 addReply(c,shared.wrongtypeerr);
ed9b544e 2202 } else {
2203 list *list = o->ptr;
2204 listNode *ln;
2205
2206 ln = listIndex(list, index);
2207 if (ln == NULL) {
c937aa89 2208 addReply(c,shared.nullbulk);
ed9b544e 2209 } else {
2210 robj *ele = listNodeValue(ln);
c937aa89 2211 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
ed9b544e 2212 addReply(c,ele);
2213 addReply(c,shared.crlf);
2214 }
2215 }
2216 }
2217}
2218
2219static void lsetCommand(redisClient *c) {
2220 dictEntry *de;
2221 int index = atoi(c->argv[2]->ptr);
2222
2223 de = dictFind(c->dict,c->argv[1]);
2224 if (de == NULL) {
2225 addReply(c,shared.nokeyerr);
2226 } else {
2227 robj *o = dictGetEntryVal(de);
2228
2229 if (o->type != REDIS_LIST) {
2230 addReply(c,shared.wrongtypeerr);
2231 } else {
2232 list *list = o->ptr;
2233 listNode *ln;
2234
2235 ln = listIndex(list, index);
2236 if (ln == NULL) {
c937aa89 2237 addReply(c,shared.outofrangeerr);
ed9b544e 2238 } else {
2239 robj *ele = listNodeValue(ln);
2240
2241 decrRefCount(ele);
2242 listNodeValue(ln) = c->argv[3];
2243 incrRefCount(c->argv[3]);
2244 addReply(c,shared.ok);
2245 server.dirty++;
2246 }
2247 }
2248 }
2249}
2250
2251static void popGenericCommand(redisClient *c, int where) {
2252 dictEntry *de;
2253
2254 de = dictFind(c->dict,c->argv[1]);
2255 if (de == NULL) {
c937aa89 2256 addReply(c,shared.nullbulk);
ed9b544e 2257 } else {
2258 robj *o = dictGetEntryVal(de);
2259
2260 if (o->type != REDIS_LIST) {
c937aa89 2261 addReply(c,shared.wrongtypeerr);
ed9b544e 2262 } else {
2263 list *list = o->ptr;
2264 listNode *ln;
2265
2266 if (where == REDIS_HEAD)
2267 ln = listFirst(list);
2268 else
2269 ln = listLast(list);
2270
2271 if (ln == NULL) {
c937aa89 2272 addReply(c,shared.nullbulk);
ed9b544e 2273 } else {
2274 robj *ele = listNodeValue(ln);
c937aa89 2275 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
ed9b544e 2276 addReply(c,ele);
2277 addReply(c,shared.crlf);
2278 listDelNode(list,ln);
2279 server.dirty++;
2280 }
2281 }
2282 }
2283}
2284
2285static void lpopCommand(redisClient *c) {
2286 popGenericCommand(c,REDIS_HEAD);
2287}
2288
2289static void rpopCommand(redisClient *c) {
2290 popGenericCommand(c,REDIS_TAIL);
2291}
2292
2293static void lrangeCommand(redisClient *c) {
2294 dictEntry *de;
2295 int start = atoi(c->argv[2]->ptr);
2296 int end = atoi(c->argv[3]->ptr);
2297
2298 de = dictFind(c->dict,c->argv[1]);
2299 if (de == NULL) {
c937aa89 2300 addReply(c,shared.nullmultibulk);
ed9b544e 2301 } else {
2302 robj *o = dictGetEntryVal(de);
2303
2304 if (o->type != REDIS_LIST) {
c937aa89 2305 addReply(c,shared.wrongtypeerr);
ed9b544e 2306 } else {
2307 list *list = o->ptr;
2308 listNode *ln;
2309 int llen = listLength(list);
2310 int rangelen, j;
2311 robj *ele;
2312
2313 /* convert negative indexes */
2314 if (start < 0) start = llen+start;
2315 if (end < 0) end = llen+end;
2316 if (start < 0) start = 0;
2317 if (end < 0) end = 0;
2318
2319 /* indexes sanity checks */
2320 if (start > end || start >= llen) {
2321 /* Out of range start or start > end result in empty list */
c937aa89 2322 addReply(c,shared.emptymultibulk);
ed9b544e 2323 return;
2324 }
2325 if (end >= llen) end = llen-1;
2326 rangelen = (end-start)+1;
2327
2328 /* Return the result in form of a multi-bulk reply */
2329 ln = listIndex(list, start);
c937aa89 2330 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
ed9b544e 2331 for (j = 0; j < rangelen; j++) {
2332 ele = listNodeValue(ln);
c937aa89 2333 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
ed9b544e 2334 addReply(c,ele);
2335 addReply(c,shared.crlf);
2336 ln = ln->next;
2337 }
2338 }
2339 }
2340}
2341
2342static void ltrimCommand(redisClient *c) {
2343 dictEntry *de;
2344 int start = atoi(c->argv[2]->ptr);
2345 int end = atoi(c->argv[3]->ptr);
2346
2347 de = dictFind(c->dict,c->argv[1]);
2348 if (de == NULL) {
2349 addReply(c,shared.nokeyerr);
2350 } else {
2351 robj *o = dictGetEntryVal(de);
2352
2353 if (o->type != REDIS_LIST) {
2354 addReply(c,shared.wrongtypeerr);
2355 } else {
2356 list *list = o->ptr;
2357 listNode *ln;
2358 int llen = listLength(list);
2359 int j, ltrim, rtrim;
2360
2361 /* convert negative indexes */
2362 if (start < 0) start = llen+start;
2363 if (end < 0) end = llen+end;
2364 if (start < 0) start = 0;
2365 if (end < 0) end = 0;
2366
2367 /* indexes sanity checks */
2368 if (start > end || start >= llen) {
2369 /* Out of range start or start > end result in empty list */
2370 ltrim = llen;
2371 rtrim = 0;
2372 } else {
2373 if (end >= llen) end = llen-1;
2374 ltrim = start;
2375 rtrim = llen-end-1;
2376 }
2377
2378 /* Remove list elements to perform the trim */
2379 for (j = 0; j < ltrim; j++) {
2380 ln = listFirst(list);
2381 listDelNode(list,ln);
2382 }
2383 for (j = 0; j < rtrim; j++) {
2384 ln = listLast(list);
2385 listDelNode(list,ln);
2386 }
2387 addReply(c,shared.ok);
2388 server.dirty++;
2389 }
2390 }
2391}
2392
2393static void lremCommand(redisClient *c) {
2394 dictEntry *de;
2395
2396 de = dictFind(c->dict,c->argv[1]);
2397 if (de == NULL) {
7b45bfb2 2398 addReply(c,shared.nokeyerr);
ed9b544e 2399 } else {
2400 robj *o = dictGetEntryVal(de);
2401
2402 if (o->type != REDIS_LIST) {
c937aa89 2403 addReply(c,shared.wrongtypeerr);
ed9b544e 2404 } else {
2405 list *list = o->ptr;
2406 listNode *ln, *next;
2407 int toremove = atoi(c->argv[2]->ptr);
2408 int removed = 0;
2409 int fromtail = 0;
2410
2411 if (toremove < 0) {
2412 toremove = -toremove;
2413 fromtail = 1;
2414 }
2415 ln = fromtail ? list->tail : list->head;
2416 while (ln) {
2417 next = fromtail ? ln->prev : ln->next;
2418 robj *ele = listNodeValue(ln);
2419 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2420 listDelNode(list,ln);
2421 server.dirty++;
2422 removed++;
2423 if (toremove && removed == toremove) break;
2424 }
2425 ln = next;
2426 }
c937aa89 2427 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
ed9b544e 2428 }
2429 }
2430}
2431
2432/* ==================================== Sets ================================ */
2433
2434static void saddCommand(redisClient *c) {
2435 dictEntry *de;
2436 robj *set;
2437
2438 de = dictFind(c->dict,c->argv[1]);
2439 if (de == NULL) {
2440 set = createSetObject();
2441 dictAdd(c->dict,c->argv[1],set);
2442 incrRefCount(c->argv[1]);
2443 } else {
2444 set = dictGetEntryVal(de);
2445 if (set->type != REDIS_SET) {
c937aa89 2446 addReply(c,shared.wrongtypeerr);
ed9b544e 2447 return;
2448 }
2449 }
2450 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2451 incrRefCount(c->argv[2]);
2452 server.dirty++;
c937aa89 2453 addReply(c,shared.cone);
ed9b544e 2454 } else {
c937aa89 2455 addReply(c,shared.czero);
ed9b544e 2456 }
2457}
2458
2459static void sremCommand(redisClient *c) {
2460 dictEntry *de;
2461
2462 de = dictFind(c->dict,c->argv[1]);
2463 if (de == NULL) {
c937aa89 2464 addReply(c,shared.czero);
ed9b544e 2465 } else {
2466 robj *set;
2467
2468 set = dictGetEntryVal(de);
2469 if (set->type != REDIS_SET) {
c937aa89 2470 addReply(c,shared.wrongtypeerr);
ed9b544e 2471 return;
2472 }
2473 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2474 server.dirty++;
c937aa89 2475 addReply(c,shared.cone);
ed9b544e 2476 } else {
c937aa89 2477 addReply(c,shared.czero);
ed9b544e 2478 }
2479 }
2480}
2481
2482static void sismemberCommand(redisClient *c) {
2483 dictEntry *de;
2484
2485 de = dictFind(c->dict,c->argv[1]);
2486 if (de == NULL) {
c937aa89 2487 addReply(c,shared.czero);
ed9b544e 2488 } else {
2489 robj *set;
2490
2491 set = dictGetEntryVal(de);
2492 if (set->type != REDIS_SET) {
c937aa89 2493 addReply(c,shared.wrongtypeerr);
ed9b544e 2494 return;
2495 }
2496 if (dictFind(set->ptr,c->argv[2]))
c937aa89 2497 addReply(c,shared.cone);
ed9b544e 2498 else
c937aa89 2499 addReply(c,shared.czero);
ed9b544e 2500 }
2501}
2502
2503static void scardCommand(redisClient *c) {
2504 dictEntry *de;
2505 dict *s;
2506
2507 de = dictFind(c->dict,c->argv[1]);
2508 if (de == NULL) {
c937aa89 2509 addReply(c,shared.czero);
ed9b544e 2510 return;
2511 } else {
2512 robj *o = dictGetEntryVal(de);
2513 if (o->type != REDIS_SET) {
c937aa89 2514 addReply(c,shared.wrongtypeerr);
ed9b544e 2515 } else {
2516 s = o->ptr;
c937aa89 2517 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
ed9b544e 2518 dictGetHashTableUsed(s)));
2519 }
2520 }
2521}
2522
2523static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
2524 dict **d1 = (void*) s1, **d2 = (void*) s2;
2525
2526 return dictGetHashTableUsed(*d1)-dictGetHashTableUsed(*d2);
2527}
2528
2529static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
2530 dict **dv = zmalloc(sizeof(dict*)*setsnum);
2531 dictIterator *di;
2532 dictEntry *de;
2533 robj *lenobj = NULL, *dstset = NULL;
2534 int j, cardinality = 0;
2535
2536 if (!dv) oom("sinterCommand");
2537 for (j = 0; j < setsnum; j++) {
2538 robj *setobj;
2539 dictEntry *de;
2540
2541 de = dictFind(c->dict,setskeys[j]);
2542 if (!de) {
2543 zfree(dv);
c937aa89 2544 addReply(c,shared.nokeyerr);
ed9b544e 2545 return;
2546 }
2547 setobj = dictGetEntryVal(de);
2548 if (setobj->type != REDIS_SET) {
2549 zfree(dv);
c937aa89 2550 addReply(c,shared.wrongtypeerr);
ed9b544e 2551 return;
2552 }
2553 dv[j] = setobj->ptr;
2554 }
2555 /* Sort sets from the smallest to largest, this will improve our
2556 * algorithm's performace */
2557 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
2558
2559 /* The first thing we should output is the total number of elements...
2560 * since this is a multi-bulk write, but at this stage we don't know
2561 * the intersection set size, so we use a trick, append an empty object
2562 * to the output list and save the pointer to later modify it with the
2563 * right length */
2564 if (!dstkey) {
2565 lenobj = createObject(REDIS_STRING,NULL);
2566 addReply(c,lenobj);
2567 decrRefCount(lenobj);
2568 } else {
2569 /* If we have a target key where to store the resulting set
2570 * create this key with an empty set inside */
2571 dstset = createSetObject();
2572 dictDelete(c->dict,dstkey);
2573 dictAdd(c->dict,dstkey,dstset);
2574 incrRefCount(dstkey);
2575 }
2576
2577 /* Iterate all the elements of the first (smallest) set, and test
2578 * the element against all the other sets, if at least one set does
2579 * not include the element it is discarded */
2580 di = dictGetIterator(dv[0]);
2581 if (!di) oom("dictGetIterator");
2582
2583 while((de = dictNext(di)) != NULL) {
2584 robj *ele;
2585
2586 for (j = 1; j < setsnum; j++)
2587 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
2588 if (j != setsnum)
2589 continue; /* at least one set does not contain the member */
2590 ele = dictGetEntryKey(de);
2591 if (!dstkey) {
c937aa89 2592 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
ed9b544e 2593 addReply(c,ele);
2594 addReply(c,shared.crlf);
2595 cardinality++;
2596 } else {
2597 dictAdd(dstset->ptr,ele,NULL);
2598 incrRefCount(ele);
2599 }
2600 }
2601 dictReleaseIterator(di);
2602
2603 if (!dstkey)
c937aa89 2604 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
ed9b544e 2605 else
2606 addReply(c,shared.ok);
2607 zfree(dv);
2608}
2609
2610static void sinterCommand(redisClient *c) {
2611 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
2612}
2613
2614static void sinterstoreCommand(redisClient *c) {
2615 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
2616}
2617
2618static void flushdbCommand(redisClient *c) {
2619 dictEmpty(c->dict);
2620 addReply(c,shared.ok);
f78fd11b 2621 rdbSave(server.dbfilename);
ed9b544e 2622}
2623
2624static void flushallCommand(redisClient *c) {
2625 emptyDb();
2626 addReply(c,shared.ok);
f78fd11b 2627 rdbSave(server.dbfilename);
ed9b544e 2628}
2629
2630redisSortOperation *createSortOperation(int type, robj *pattern) {
2631 redisSortOperation *so = zmalloc(sizeof(*so));
2632 if (!so) oom("createSortOperation");
2633 so->type = type;
2634 so->pattern = pattern;
2635 return so;
2636}
2637
2638/* Return the value associated to the key with a name obtained
2639 * substituting the first occurence of '*' in 'pattern' with 'subst' */
2640robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) {
2641 char *p;
2642 sds spat, ssub;
2643 robj keyobj;
2644 int prefixlen, sublen, postfixlen;
2645 dictEntry *de;
2646 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
2647 struct {
2648 long len;
2649 long free;
2650 char buf[REDIS_SORTKEY_MAX+1];
2651 } keyname;
2652
2653
2654 spat = pattern->ptr;
2655 ssub = subst->ptr;
2656 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
2657 p = strchr(spat,'*');
2658 if (!p) return NULL;
2659
2660 prefixlen = p-spat;
2661 sublen = sdslen(ssub);
2662 postfixlen = sdslen(spat)-(prefixlen+1);
2663 memcpy(keyname.buf,spat,prefixlen);
2664 memcpy(keyname.buf+prefixlen,ssub,sublen);
2665 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
2666 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
2667 keyname.len = prefixlen+sublen+postfixlen;
2668
2669 keyobj.refcount = 1;
2670 keyobj.type = REDIS_STRING;
2671 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
2672
2673 de = dictFind(dict,&keyobj);
2674 // printf("lookup '%s' => %p\n", keyname.buf,de);
2675 if (!de) return NULL;
2676 return dictGetEntryVal(de);
2677}
2678
2679/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
2680 * the additional parameter is not standard but a BSD-specific we have to
2681 * pass sorting parameters via the global 'server' structure */
2682static int sortCompare(const void *s1, const void *s2) {
2683 const redisSortObject *so1 = s1, *so2 = s2;
2684 int cmp;
2685
2686 if (!server.sort_alpha) {
2687 /* Numeric sorting. Here it's trivial as we precomputed scores */
2688 if (so1->u.score > so2->u.score) {
2689 cmp = 1;
2690 } else if (so1->u.score < so2->u.score) {
2691 cmp = -1;
2692 } else {
2693 cmp = 0;
2694 }
2695 } else {
2696 /* Alphanumeric sorting */
2697 if (server.sort_bypattern) {
2698 if (!so1->u.cmpobj || !so2->u.cmpobj) {
2699 /* At least one compare object is NULL */
2700 if (so1->u.cmpobj == so2->u.cmpobj)
2701 cmp = 0;
2702 else if (so1->u.cmpobj == NULL)
2703 cmp = -1;
2704 else
2705 cmp = 1;
2706 } else {
2707 /* We have both the objects, use strcoll */
2708 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
2709 }
2710 } else {
2711 /* Compare elements directly */
2712 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
2713 }
2714 }
2715 return server.sort_desc ? -cmp : cmp;
2716}
2717
2718/* The SORT command is the most complex command in Redis. Warning: this code
2719 * is optimized for speed and a bit less for readability */
2720static void sortCommand(redisClient *c) {
2721 dictEntry *de;
2722 list *operations;
2723 int outputlen = 0;
2724 int desc = 0, alpha = 0;
2725 int limit_start = 0, limit_count = -1, start, end;
2726 int j, dontsort = 0, vectorlen;
2727 int getop = 0; /* GET operation counter */
2728 robj *sortval, *sortby = NULL;
2729 redisSortObject *vector; /* Resulting vector to sort */
2730
2731 /* Lookup the key to sort. It must be of the right types */
2732 de = dictFind(c->dict,c->argv[1]);
2733 if (de == NULL) {
c937aa89 2734 addReply(c,shared.nokeyerr);
ed9b544e 2735 return;
2736 }
2737 sortval = dictGetEntryVal(de);
2738 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
c937aa89 2739 addReply(c,shared.wrongtypeerr);
ed9b544e 2740 return;
2741 }
2742
2743 /* Create a list of operations to perform for every sorted element.
2744 * Operations can be GET/DEL/INCR/DECR */
2745 operations = listCreate();
092dac2a 2746 listSetFreeMethod(operations,zfree);
ed9b544e 2747 j = 2;
2748
2749 /* Now we need to protect sortval incrementing its count, in the future
2750 * SORT may have options able to overwrite/delete keys during the sorting
2751 * and the sorted key itself may get destroied */
2752 incrRefCount(sortval);
2753
2754 /* The SORT command has an SQL-alike syntax, parse it */
2755 while(j < c->argc) {
2756 int leftargs = c->argc-j-1;
2757 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
2758 desc = 0;
2759 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
2760 desc = 1;
2761 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
2762 alpha = 1;
2763 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
2764 limit_start = atoi(c->argv[j+1]->ptr);
2765 limit_count = atoi(c->argv[j+2]->ptr);
2766 j+=2;
2767 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
2768 sortby = c->argv[j+1];
2769 /* If the BY pattern does not contain '*', i.e. it is constant,
2770 * we don't need to sort nor to lookup the weight keys. */
2771 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
2772 j++;
2773 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
2774 listAddNodeTail(operations,createSortOperation(
2775 REDIS_SORT_GET,c->argv[j+1]));
2776 getop++;
2777 j++;
2778 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
2779 listAddNodeTail(operations,createSortOperation(
2780 REDIS_SORT_DEL,c->argv[j+1]));
2781 j++;
2782 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
2783 listAddNodeTail(operations,createSortOperation(
2784 REDIS_SORT_INCR,c->argv[j+1]));
2785 j++;
2786 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
2787 listAddNodeTail(operations,createSortOperation(
2788 REDIS_SORT_DECR,c->argv[j+1]));
2789 j++;
2790 } else {
2791 decrRefCount(sortval);
2792 listRelease(operations);
c937aa89 2793 addReply(c,shared.syntaxerr);
ed9b544e 2794 return;
2795 }
2796 j++;
2797 }
2798
2799 /* Load the sorting vector with all the objects to sort */
2800 vectorlen = (sortval->type == REDIS_LIST) ?
2801 listLength((list*)sortval->ptr) :
2802 dictGetHashTableUsed((dict*)sortval->ptr);
2803 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
2804 if (!vector) oom("allocating objects vector for SORT");
2805 j = 0;
2806 if (sortval->type == REDIS_LIST) {
2807 list *list = sortval->ptr;
2808 listNode *ln = list->head;
2809 while(ln) {
2810 robj *ele = ln->value;
2811 vector[j].obj = ele;
2812 vector[j].u.score = 0;
2813 vector[j].u.cmpobj = NULL;
2814 ln = ln->next;
2815 j++;
2816 }
2817 } else {
2818 dict *set = sortval->ptr;
2819 dictIterator *di;
2820 dictEntry *setele;
2821
2822 di = dictGetIterator(set);
2823 if (!di) oom("dictGetIterator");
2824 while((setele = dictNext(di)) != NULL) {
2825 vector[j].obj = dictGetEntryKey(setele);
2826 vector[j].u.score = 0;
2827 vector[j].u.cmpobj = NULL;
2828 j++;
2829 }
2830 dictReleaseIterator(di);
2831 }
2832 assert(j == vectorlen);
2833
2834 /* Now it's time to load the right scores in the sorting vector */
2835 if (dontsort == 0) {
2836 for (j = 0; j < vectorlen; j++) {
2837 if (sortby) {
2838 robj *byval;
2839
2840 byval = lookupKeyByPattern(c->dict,sortby,vector[j].obj);
2841 if (!byval || byval->type != REDIS_STRING) continue;
2842 if (alpha) {
2843 vector[j].u.cmpobj = byval;
2844 incrRefCount(byval);
2845 } else {
2846 vector[j].u.score = strtod(byval->ptr,NULL);
2847 }
2848 } else {
2849 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
2850 }
2851 }
2852 }
2853
2854 /* We are ready to sort the vector... perform a bit of sanity check
2855 * on the LIMIT option too. We'll use a partial version of quicksort. */
2856 start = (limit_start < 0) ? 0 : limit_start;
2857 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
2858 if (start >= vectorlen) {
2859 start = vectorlen-1;
2860 end = vectorlen-2;
2861 }
2862 if (end >= vectorlen) end = vectorlen-1;
2863
2864 if (dontsort == 0) {
2865 server.sort_desc = desc;
2866 server.sort_alpha = alpha;
2867 server.sort_bypattern = sortby ? 1 : 0;
2868 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
2869 }
2870
2871 /* Send command output to the output buffer, performing the specified
2872 * GET/DEL/INCR/DECR operations if any. */
2873 outputlen = getop ? getop*(end-start+1) : end-start+1;
c937aa89 2874 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
ed9b544e 2875 for (j = start; j <= end; j++) {
2876 listNode *ln = operations->head;
2877 if (!getop) {
c937aa89 2878 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
ed9b544e 2879 sdslen(vector[j].obj->ptr)));
2880 addReply(c,vector[j].obj);
2881 addReply(c,shared.crlf);
2882 }
2883 while(ln) {
2884 redisSortOperation *sop = ln->value;
2885 robj *val = lookupKeyByPattern(c->dict,sop->pattern,
2886 vector[j].obj);
2887
2888 if (sop->type == REDIS_SORT_GET) {
2889 if (!val || val->type != REDIS_STRING) {
9eb00f21 2890 addReply(c,shared.nullbulk);
ed9b544e 2891 } else {
c937aa89 2892 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
ed9b544e 2893 sdslen(val->ptr)));
2894 addReply(c,val);
2895 addReply(c,shared.crlf);
2896 }
2897 } else if (sop->type == REDIS_SORT_DEL) {
2898 /* TODO */
2899 }
2900 ln = ln->next;
2901 }
2902 }
2903
2904 /* Cleanup */
2905 decrRefCount(sortval);
2906 listRelease(operations);
2907 for (j = 0; j < vectorlen; j++) {
2908 if (sortby && alpha && vector[j].u.cmpobj)
2909 decrRefCount(vector[j].u.cmpobj);
2910 }
2911 zfree(vector);
2912}
2913
2914static void infoCommand(redisClient *c) {
2915 sds info;
2916 time_t uptime = time(NULL)-server.stat_starttime;
2917
2918 info = sdscatprintf(sdsempty(),
2919 "redis_version:%s\r\n"
2920 "connected_clients:%d\r\n"
2921 "connected_slaves:%d\r\n"
2922 "used_memory:%d\r\n"
2923 "changes_since_last_save:%lld\r\n"
2924 "last_save_time:%d\r\n"
2925 "total_connections_received:%lld\r\n"
2926 "total_commands_processed:%lld\r\n"
2927 "uptime_in_seconds:%d\r\n"
2928 "uptime_in_days:%d\r\n"
2929 ,REDIS_VERSION,
2930 listLength(server.clients)-listLength(server.slaves),
2931 listLength(server.slaves),
2932 server.usedmemory,
2933 server.dirty,
2934 server.lastsave,
2935 server.stat_numconnections,
2936 server.stat_numcommands,
2937 uptime,
2938 uptime/(3600*24)
2939 );
c937aa89 2940 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
ed9b544e 2941 addReplySds(c,info);
70003d28 2942 addReply(c,shared.crlf);
ed9b544e 2943}
2944
2945/* =============================== Replication ============================= */
2946
2947/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */
2948static int flushClientOutput(redisClient *c) {
2949 int retval;
2950 time_t start = time(NULL);
2951
2952 while(listLength(c->reply)) {
2953 if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */
2954 retval = aeWait(c->fd,AE_WRITABLE,1000);
2955 if (retval == -1) {
2956 return REDIS_ERR;
2957 } else if (retval & AE_WRITABLE) {
2958 sendReplyToClient(NULL, c->fd, c, AE_WRITABLE);
2959 }
2960 }
2961 return REDIS_OK;
2962}
2963
2964static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) {
2965 ssize_t nwritten, ret = size;
2966 time_t start = time(NULL);
2967
2968 timeout++;
2969 while(size) {
2970 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
2971 nwritten = write(fd,ptr,size);
2972 if (nwritten == -1) return -1;
2973 ptr += nwritten;
2974 size -= nwritten;
2975 }
2976 if ((time(NULL)-start) > timeout) {
2977 errno = ETIMEDOUT;
2978 return -1;
2979 }
2980 }
2981 return ret;
2982}
2983
2984static int syncRead(int fd, void *ptr, ssize_t size, int timeout) {
2985 ssize_t nread, totread = 0;
2986 time_t start = time(NULL);
2987
2988 timeout++;
2989 while(size) {
2990 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
2991 nread = read(fd,ptr,size);
2992 if (nread == -1) return -1;
2993 ptr += nread;
2994 size -= nread;
2995 totread += nread;
2996 }
2997 if ((time(NULL)-start) > timeout) {
2998 errno = ETIMEDOUT;
2999 return -1;
3000 }
3001 }
3002 return totread;
3003}
3004
3005static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3006 ssize_t nread = 0;
3007
3008 size--;
3009 while(size) {
3010 char c;
3011
3012 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3013 if (c == '\n') {
3014 *ptr = '\0';
3015 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3016 return nread;
3017 } else {
3018 *ptr++ = c;
3019 *ptr = '\0';
3020 nread++;
3021 }
3022 }
3023 return nread;
3024}
3025
3026static void syncCommand(redisClient *c) {
3027 struct stat sb;
3028 int fd = -1, len;
3029 time_t start = time(NULL);
3030 char sizebuf[32];
3031
cf3f0c01 3032 /* ignore SYNC if aleady slave or in monitor mode */
3033 if (c->flags & REDIS_SLAVE) return;
3034
ed9b544e 3035 redisLog(REDIS_NOTICE,"Slave ask for syncronization");
f78fd11b 3036 if (flushClientOutput(c) == REDIS_ERR ||
3037 rdbSave(server.dbfilename) != REDIS_OK)
ed9b544e 3038 goto closeconn;
3039
3040 fd = open(server.dbfilename, O_RDONLY);
3041 if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn;
3042 len = sb.st_size;
3043
c937aa89 3044 snprintf(sizebuf,32,"$%d\r\n",len);
ed9b544e 3045 if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn;
3046 while(len) {
3047 char buf[1024];
3048 int nread;
3049
3050 if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn;
3051 nread = read(fd,buf,1024);
3052 if (nread == -1) goto closeconn;
3053 len -= nread;
3054 if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn;
3055 }
3056 if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn;
3057 close(fd);
3058 c->flags |= REDIS_SLAVE;
3059 c->slaveseldb = 0;
3060 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3061 redisLog(REDIS_NOTICE,"Syncronization with slave succeeded");
3062 return;
3063
3064closeconn:
3065 if (fd != -1) close(fd);
3066 c->flags |= REDIS_CLOSE;
3067 redisLog(REDIS_WARNING,"Syncronization with slave failed");
3068 return;
3069}
3070
3071static int syncWithMaster(void) {
3072 char buf[1024], tmpfile[256];
3073 int dumpsize;
3074 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3075 int dfd;
3076
3077 if (fd == -1) {
3078 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3079 strerror(errno));
3080 return REDIS_ERR;
3081 }
3082 /* Issue the SYNC command */
3083 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3084 close(fd);
3085 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3086 strerror(errno));
3087 return REDIS_ERR;
3088 }
3089 /* Read the bulk write count */
3090 if (syncReadLine(fd,buf,1024,5) == -1) {
3091 close(fd);
3092 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3093 strerror(errno));
3094 return REDIS_ERR;
3095 }
c937aa89 3096 dumpsize = atoi(buf+1);
ed9b544e 3097 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3098 /* Read the bulk write data on a temp file */
3099 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3100 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3101 if (dfd == -1) {
3102 close(fd);
3103 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3104 return REDIS_ERR;
3105 }
3106 while(dumpsize) {
3107 int nread, nwritten;
3108
3109 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3110 if (nread == -1) {
3111 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3112 strerror(errno));
3113 close(fd);
3114 close(dfd);
3115 return REDIS_ERR;
3116 }
3117 nwritten = write(dfd,buf,nread);
3118 if (nwritten == -1) {
3119 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3120 close(fd);
3121 close(dfd);
3122 return REDIS_ERR;
3123 }
3124 dumpsize -= nread;
3125 }
3126 close(dfd);
3127 if (rename(tmpfile,server.dbfilename) == -1) {
3128 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3129 unlink(tmpfile);
3130 close(fd);
3131 return REDIS_ERR;
3132 }
3133 emptyDb();
f78fd11b 3134 if (rdbLoad(server.dbfilename) != REDIS_OK) {
ed9b544e 3135 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3136 close(fd);
3137 return REDIS_ERR;
3138 }
3139 server.master = createClient(fd);
3140 server.master->flags |= REDIS_MASTER;
3141 server.replstate = REDIS_REPL_CONNECTED;
3142 return REDIS_OK;
3143}
3144
87eca727 3145static void monitorCommand(redisClient *c) {
cf3f0c01 3146 /* ignore MONITOR if aleady slave or in monitor mode */
3147 if (c->flags & REDIS_SLAVE) return;
3148
87eca727 3149 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3150 c->slaveseldb = 0;
3151 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3152 addReply(c,shared.ok);
3153}
3154
ed9b544e 3155/* =================================== Main! ================================ */
3156
3157static void daemonize(void) {
3158 int fd;
3159 FILE *fp;
3160
3161 if (fork() != 0) exit(0); /* parent exits */
3162 setsid(); /* create a new session */
3163
3164 /* Every output goes to /dev/null. If Redis is daemonized but
3165 * the 'logfile' is set to 'stdout' in the configuration file
3166 * it will not log at all. */
3167 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3168 dup2(fd, STDIN_FILENO);
3169 dup2(fd, STDOUT_FILENO);
3170 dup2(fd, STDERR_FILENO);
3171 if (fd > STDERR_FILENO) close(fd);
3172 }
3173 /* Try to write the pid file */
ed329fcf 3174 fp = fopen(server.pidfile,"w");
ed9b544e 3175 if (fp) {
3176 fprintf(fp,"%d\n",getpid());
3177 fclose(fp);
3178 }
3179}
3180
3181int main(int argc, char **argv) {
3182 initServerConfig();
3183 if (argc == 2) {
3184 ResetServerSaveParams();
3185 loadServerConfig(argv[1]);
3186 } else if (argc > 2) {
3187 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
3188 exit(1);
3189 }
3190 initServer();
3191 if (server.daemonize) daemonize();
3192 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
f78fd11b 3193 if (rdbLoad(server.dbfilename) == REDIS_OK)
ed9b544e 3194 redisLog(REDIS_NOTICE,"DB loaded from disk");
3195 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
3196 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
46713f83 3197 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
ed9b544e 3198 aeMain(server.el);
3199 aeDeleteEventLoop(server.el);
3200 return 0;
3201}