pid_t bgsavechildpid;
pid_t bgrewritechildpid;
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
+ sds aofbuf; /* AOF buffer, written before entering the event loop */
struct saveparam *saveparams;
int saveparamslen;
char *logfile;
static robj *dupStringObject(robj *o);
static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
+static void flushAppendOnlyFile(void);
static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
static robj *tryObjectEncoding(robj *o);
static int compareStringObjects(robj *a, robj *b);
static void usage();
static int rewriteAppendOnlyFileBackground(void);
+static int vmSwapObjectBlocking(robj *key, robj *val);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
+ /* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listNode *ln;
processInputBuffer(c);
}
}
+ /* Write the AOF buffer on disk */
+ flushAppendOnlyFile();
}
static void createSharedObjects(void) {
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
server.bgrewritebuf = sdsempty();
+ server.aofbuf = sdsempty();
server.lastsave = time(NULL);
server.dirty = 0;
server.stat_numcommands = 0;
static int rdbLoad(char *filename) {
FILE *fp;
- robj *keyobj = NULL;
uint32_t dbid;
int type, retval, rdbver;
+ int swap_all_values = 0;
dict *d = server.db[0].dict;
redisDb *db = server.db+0;
char buf[1024];
- time_t expiretime = -1, now = time(NULL);
+ time_t expiretime, now = time(NULL);
long long loadedkeys = 0;
fp = fopen(filename,"r");
return REDIS_ERR;
}
while(1) {
- robj *o;
+ robj *key, *val;
+ expiretime = -1;
/* Read type. */
if ((type = rdbLoadType(fp)) == -1) goto eoferr;
if (type == REDIS_EXPIRETIME) {
continue;
}
/* Read key */
- if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr;
+ if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr;
/* Read value */
- if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr;
+ if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr;
/* Add the new object in the hash table */
- retval = dictAdd(d,keyobj,o);
+ retval = dictAdd(d,key,val);
if (retval == DICT_ERR) {
- redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
+ redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr);
exit(1);
}
+ loadedkeys++;
/* Set the expire time if needed */
if (expiretime != -1) {
- setExpire(db,keyobj,expiretime);
+ setExpire(db,key,expiretime);
/* Delete this key if already expired */
- if (expiretime < now) deleteKey(db,keyobj);
- expiretime = -1;
+ if (expiretime < now) {
+ deleteKey(db,key);
+ continue; /* don't try to swap this out */
+ }
}
- keyobj = o = NULL;
+
/* Handle swapping while loading big datasets when VM is on */
- loadedkeys++;
- if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+
+ /* If we detecter we are hopeless about fitting something in memory
+ * we just swap every new key on disk. Directly...
+ * Note that's important to check for this condition before resorting
+ * to random sampling, otherwise we may try to swap already
+ * swapped keys. */
+ if (swap_all_values) {
+ dictEntry *de = dictFind(d,key);
+
+ /* de may be NULL since the key already expired */
+ if (de) {
+ key = dictGetEntryKey(de);
+ val = dictGetEntryVal(de);
+
+ if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
+ dictGetEntryVal(de) = NULL;
+ }
+ }
+ continue;
+ }
+
+ /* If we have still some hope of having some value fitting memory
+ * then we try random sampling. */
+ if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
+ if (zmalloc_used_memory() > server.vm_max_memory)
+ swap_all_values = 1; /* We are already using too much mem */
}
}
fclose(fp);
return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
- if (keyobj) decrRefCount(keyobj);
redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
unlink(server.pidfile);
redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
redisLog(REDIS_WARNING,"Server exit now, bye bye...");
- if (server.vm_enabled) unlink(server.vm_swap_file);
exit(0);
} else {
/* Ooops.. error saving! The best we can do is to continue
/* ============================== Append Only file ========================== */
+/* Write the append only file buffer on disk.
+ *
+ * Since we are required to write the AOF before replying to the client,
+ * and the only way the client socket can get a write is entering when the
+ * the event loop, we accumulate all the AOF writes in a memory
+ * buffer and write it on disk using this function just before entering
+ * the event loop again. */
+static void flushAppendOnlyFile(void) {
+ time_t now;
+ ssize_t nwritten;
+
+ if (sdslen(server.aofbuf) == 0) return;
+
+ /* We want to perform a single write. This should be guaranteed atomic
+ * at least if the filesystem we are writing is a real physical one.
+ * While this will save us against the server being killed I don't think
+ * there is much to do about the whole server stopping for power problems
+ * or alike */
+ nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
+ if (nwritten != (signed)sdslen(server.aofbuf)) {
+ /* Ooops, we are in troubles. The best thing to do for now is
+ * aborting instead of giving the illusion that everything is
+ * working as expected. */
+ if (nwritten == -1) {
+ redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+ } else {
+ redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+ }
+ exit(1);
+ }
+ sdsfree(server.aofbuf);
+ server.aofbuf = sdsempty();
+
+ /* Fsync if needed */
+ now = time(NULL);
+ if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+ (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+ now-server.lastfsync > 1))
+ {
+ /* aof_fsync is defined as fdatasync() for Linux in order to avoid
+ * flushing metadata. */
+ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
+ server.lastfsync = now;
+ }
+}
+
static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
int j;
- ssize_t nwritten;
- time_t now;
robj *tmpargv[3];
/* The DB this command was targetting is not the same as the last command
decrRefCount(argv[j]);
}
- /* We want to perform a single write. This should be guaranteed atomic
- * at least if the filesystem we are writing is a real physical one.
- * While this will save us against the server being killed I don't think
- * there is much to do about the whole server stopping for power problems
- * or alike */
- nwritten = write(server.appendfd,buf,sdslen(buf));
- if (nwritten != (signed)sdslen(buf)) {
- /* Ooops, we are in troubles. The best thing to do for now is
- * to simply exit instead to give the illusion that everything is
- * working as expected. */
- if (nwritten == -1) {
- redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
- } else {
- redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
- }
- exit(1);
- }
+ /* Append to the AOF buffer. This will be flushed on disk just before
+ * of re-entering the event loop, so before the client will get a
+ * positive reply about the operation performed. */
+ server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf));
+
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
sdsfree(buf);
- now = time(NULL);
- if (server.appendfsync == APPENDFSYNC_ALWAYS ||
- (server.appendfsync == APPENDFSYNC_EVERYSEC &&
- now-server.lastfsync > 1))
- {
- /* aof_fsync is defined as fdatasync() for Linux in order to avoid
- * flushing metadata. */
- aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
- server.lastfsync = now;
- }
}
/* In Redis commands are always executed in the context of a client, so in
/* =================== Virtual Memory - Blocking Side ====================== */
-/* substitute the first occurrence of '%p' with the process pid in the
- * swap file name. */
-static void expandVmSwapFilename(void) {
- char *p = strstr(server.vm_swap_file,"%p");
- sds new;
-
- if (!p) return;
- new = sdsempty();
- *p = '\0';
- new = sdscat(new,server.vm_swap_file);
- new = sdscatprintf(new,"%ld",(long) getpid());
- new = sdscat(new,p+2);
- zfree(server.vm_swap_file);
- server.vm_swap_file = new;
-}
-
static void vmInit(void) {
off_t totsize;
int pipefds[2];
size_t stacksize;
+ struct flock fl;
if (server.vm_max_threads != 0)
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
- expandVmSwapFilename();
redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
+ /* Try to open the old swap file, otherwise create it */
if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
server.vm_fp = fopen(server.vm_swap_file,"w+b");
}
if (server.vm_fp == NULL) {
redisLog(REDIS_WARNING,
- "Impossible to open the swap file: %s. Exiting.",
+ "Can't open the swap file: %s. Exiting.",
strerror(errno));
exit(1);
}
server.vm_fd = fileno(server.vm_fp);
+ /* Lock the swap file for writing, this is useful in order to avoid
+ * another instance to use the same swap file for a config error. */
+ fl.l_type = F_WRLCK;
+ fl.l_whence = SEEK_SET;
+ fl.l_start = fl.l_len = 0;
+ if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) {
+ redisLog(REDIS_WARNING,
+ "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno));
+ exit(1);
+ }
+ /* Initialize */
server.vm_next_page = 0;
server.vm_near_pages = 0;
server.vm_stats_used_pages = 0;
} else {
addReply(c,shared.err);
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
+ long keys, j;
+ robj *key, *val;
+ char buf[128];
+
+ if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != REDIS_OK)
+ return;
+ for (j = 0; j < keys; j++) {
+ snprintf(buf,sizeof(buf),"key:%lu",j);
+ key = createStringObject(buf,strlen(buf));
+ if (lookupKeyRead(c->db,key) != NULL) {
+ decrRefCount(key);
+ continue;
+ }
+ snprintf(buf,sizeof(buf),"value:%lu",j);
+ val = createStringObject(buf,strlen(buf));
+ dictAdd(c->db->dict,key,val);
+ }
+ addReply(c,shared.ok);
} else {
addReplySds(c,sdsnew(
"-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
static struct redisFunctionSym symsTable[] = {
{"IOThreadEntryPoint",(unsigned long)IOThreadEntryPoint},
{"_redisAssert",(unsigned long)_redisAssert},
+{"_redisPanic",(unsigned long)_redisPanic},
{"acceptHandler",(unsigned long)acceptHandler},
{"addReply",(unsigned long)addReply},
{"addReplyBulk",(unsigned long)addReplyBulk},
{"createSharedObjects",(unsigned long)createSharedObjects},
{"createSortOperation",(unsigned long)createSortOperation},
{"createStringObject",(unsigned long)createStringObject},
+{"createStringObjectFromLongLong",(unsigned long)createStringObjectFromLongLong},
{"createZsetObject",(unsigned long)createZsetObject},
{"daemonize",(unsigned long)daemonize},
{"dbsizeCommand",(unsigned long)dbsizeCommand},
{"dupStringObject",(unsigned long)dupStringObject},
{"echoCommand",(unsigned long)echoCommand},
{"execCommand",(unsigned long)execCommand},
+{"execCommandReplicateMulti",(unsigned long)execCommandReplicateMulti},
{"existsCommand",(unsigned long)existsCommand},
-{"expandVmSwapFilename",(unsigned long)expandVmSwapFilename},
{"expireCommand",(unsigned long)expireCommand},
{"expireGenericCommand",(unsigned long)expireGenericCommand},
{"expireIfNeeded",(unsigned long)expireIfNeeded},
{"expireatCommand",(unsigned long)expireatCommand},
{"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
{"findFuncName",(unsigned long)findFuncName},
+{"flushAppendOnlyFile",(unsigned long)flushAppendOnlyFile},
{"flushallCommand",(unsigned long)flushallCommand},
{"flushdbCommand",(unsigned long)flushdbCommand},
{"freeClient",(unsigned long)freeClient},
{"genericZrangebyscoreCommand",(unsigned long)genericZrangebyscoreCommand},
{"getCommand",(unsigned long)getCommand},
{"getDecodedObject",(unsigned long)getDecodedObject},
+{"getDoubleFromObject",(unsigned long)getDoubleFromObject},
+{"getDoubleFromObjectOrReply",(unsigned long)getDoubleFromObjectOrReply},
{"getExpire",(unsigned long)getExpire},
{"getGenericCommand",(unsigned long)getGenericCommand},
+{"getLongFromObjectOrReply",(unsigned long)getLongFromObjectOrReply},
+{"getLongLongFromObject",(unsigned long)getLongLongFromObject},
+{"getLongLongFromObjectOrReply",(unsigned long)getLongLongFromObjectOrReply},
{"getMcontextEip",(unsigned long)getMcontextEip},
{"getsetCommand",(unsigned long)getsetCommand},
{"glueReplyBuffersIfNeeded",(unsigned long)glueReplyBuffersIfNeeded},
{"handleClientsBlockedOnSwappedKey",(unsigned long)handleClientsBlockedOnSwappedKey},
{"handleClientsWaitingListPush",(unsigned long)handleClientsWaitingListPush},
+{"hashCurrent",(unsigned long)hashCurrent},
+{"hashDelete",(unsigned long)hashDelete},
+{"hashExists",(unsigned long)hashExists},
+{"hashGet",(unsigned long)hashGet},
+{"hashInitIterator",(unsigned long)hashInitIterator},
+{"hashLookupWriteOrCreate",(unsigned long)hashLookupWriteOrCreate},
+{"hashNext",(unsigned long)hashNext},
+{"hashReleaseIterator",(unsigned long)hashReleaseIterator},
+{"hashSet",(unsigned long)hashSet},
+{"hashTryConversion",(unsigned long)hashTryConversion},
+{"hashTryObjectEncoding",(unsigned long)hashTryObjectEncoding},
{"hdelCommand",(unsigned long)hdelCommand},
{"hexistsCommand",(unsigned long)hexistsCommand},
{"hgetCommand",(unsigned long)hgetCommand},
{"hincrbyCommand",(unsigned long)hincrbyCommand},
{"hkeysCommand",(unsigned long)hkeysCommand},
{"hlenCommand",(unsigned long)hlenCommand},
+{"hmgetCommand",(unsigned long)hmgetCommand},
+{"hmsetCommand",(unsigned long)hmsetCommand},
{"hsetCommand",(unsigned long)hsetCommand},
+{"hsetnxCommand",(unsigned long)hsetnxCommand},
{"htNeedsResize",(unsigned long)htNeedsResize},
{"hvalsCommand",(unsigned long)hvalsCommand},
{"incrCommand",(unsigned long)incrCommand},
{"incrDecrCommand",(unsigned long)incrDecrCommand},
{"incrRefCount",(unsigned long)incrRefCount},
{"incrbyCommand",(unsigned long)incrbyCommand},
+{"incrementallyRehash",(unsigned long)incrementallyRehash},
{"infoCommand",(unsigned long)infoCommand},
{"initClientMultiState",(unsigned long)initClientMultiState},
{"initServer",(unsigned long)initServer},
{"renameCommand",(unsigned long)renameCommand},
{"renameGenericCommand",(unsigned long)renameGenericCommand},
{"renamenxCommand",(unsigned long)renamenxCommand},
+{"replicationFeedMonitors",(unsigned long)replicationFeedMonitors},
{"replicationFeedSlaves",(unsigned long)replicationFeedSlaves},
{"resetClient",(unsigned long)resetClient},
{"resetServerSaveParams",(unsigned long)resetServerSaveParams},
{"sdiffCommand",(unsigned long)sdiffCommand},
{"sdiffstoreCommand",(unsigned long)sdiffstoreCommand},
{"sdsDictKeyCompare",(unsigned long)sdsDictKeyCompare},
+{"sdscatrepr",(unsigned long)sdscatrepr},
{"segvHandler",(unsigned long)segvHandler},
{"selectCommand",(unsigned long)selectCommand},
{"selectDb",(unsigned long)selectDb},
{"setCommand",(unsigned long)setCommand},
{"setExpire",(unsigned long)setExpire},
{"setGenericCommand",(unsigned long)setGenericCommand},
+{"setexCommand",(unsigned long)setexCommand},
{"setnxCommand",(unsigned long)setnxCommand},
{"setupSigSegvAction",(unsigned long)setupSigSegvAction},
{"shutdownCommand",(unsigned long)shutdownCommand},