/* Log levels */
#define REDIS_DEBUG 0
-#define REDIS_NOTICE 1
-#define REDIS_WARNING 2
+#define REDIS_VERBOSE 1
+#define REDIS_NOTICE 2
+#define REDIS_WARNING 3
/* Anti-warning macro... */
#define REDIS_NOTUSED(V) ((void) V)
off_t vm_near_pages; /* Number of pages allocated sequentially */
unsigned char *vm_bitmap; /* Bitmap of free/used pages */
time_t unixtime; /* Unix time sampled every second. */
+ /* Virtual memory stats */
+ unsigned long long vm_stats_used_pages;
+ unsigned long long vm_stats_swapped_objects;
+ unsigned long long vm_stats_swapouts;
+ unsigned long long vm_stats_swapins;
};
typedef void redisCommandProc(redisClient *c);
static int removeExpire(redisDb *db, robj *key);
static int expireIfNeeded(redisDb *db, robj *key);
static int deleteIfVolatile(redisDb *db, robj *key);
+static int deleteIfSwapped(redisDb *db, robj *key);
static int deleteKey(redisDb *db, robj *key);
static time_t getExpire(redisDb *db, robj *key);
static int setExpire(redisDb *db, robj *key, time_t when);
static void vmInit(void);
static void vmMarkPagesFree(off_t page, off_t count);
static robj *vmLoadObject(robj *key);
+static robj *vmPreviewObject(robj *key);
static int vmSwapOneObject(void);
+static int vmCanSwapOut(void);
+static void freeOneObjectFromFreelist(void);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
(now - c->lastinteraction > server.maxidletime))
{
- redisLog(REDIS_DEBUG,"Closing idle client");
+ redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) {
for (j = 0; j < server.dbnum; j++) {
if (htNeedsResize(server.db[j].dict)) {
- redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
+ redisLog(REDIS_VERBOSE,"The hash table %d is too sparse, resize it...",j);
dictResize(server.db[j].dict);
- redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
+ redisLog(REDIS_VERBOSE,"Hash table %d resized.",j);
}
if (htNeedsResize(server.db[j].expires))
dictResize(server.db[j].expires);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (!(loops % 5) && (used || vkeys)) {
- redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
+ redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
/* Show information about connected clients */
if (!(loops % 5)) {
- redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
+ redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
server.usedmemory,
}
/* Swap a few keys on disk if we are over the memory limit and VM
- * is enbled. */
- while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) {
- if (vmSwapOneObject() == REDIS_ERR) {
- redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit reached but unable to swap more objects out!");
- break;
+ * is enbled. Try to free objects from the free list first. */
+ if (vmCanSwapOut()) {
+ while (server.vm_enabled && zmalloc_used_memory() >
+ server.vm_max_memory)
+ {
+ if (listLength(server.objfreelist)) {
+ freeOneObjectFromFreelist();
+ } else if (vmSwapOneObject() == REDIS_ERR) {
+ if ((loops % 30) == 0 && zmalloc_used_memory() >
+ (server.vm_max_memory+server.vm_max_memory/10)) {
+ redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
+ }
+ break;
+ }
}
}
static void initServerConfig() {
server.dbnum = REDIS_DEFAULT_DBNUM;
server.port = REDIS_SERVERPORT;
- server.verbosity = REDIS_DEBUG;
+ server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL;
server.logfile = NULL; /* NULL = log on standard output */
}
} else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
+ else if (!strcasecmp(argv[1],"verbose")) server.verbosity = REDIS_VERBOSE;
else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
else {
if (errno == EAGAIN) {
nwritten = 0;
} else {
- redisLog(REDIS_DEBUG,
+ redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
/* write all collected blocks at once */
if((nwritten = writev(fd, iov, ion)) < 0) {
if (errno != EAGAIN) {
- redisLog(REDIS_DEBUG,
+ redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
return;
} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
- redisLog(REDIS_DEBUG, "Client protocol error");
+ redisLog(REDIS_VERBOSE, "Client protocol error");
freeClient(c);
return;
}
if (errno == EAGAIN) {
nread = 0;
} else {
- redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
+ redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
- redisLog(REDIS_DEBUG, "Client closed connection");
+ redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
cfd = anetAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
- redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
- redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
+ redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
if ((c = createClient(cfd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */
}
static void incrRefCount(robj *o) {
- assert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
+ redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
o->refcount++;
}
/* REDIS_VM_SWAPPED */
if (server.vm_enabled && o->storage == REDIS_VM_SWAPPED) {
- assert(o->refcount == 1);
- assert(o->type == REDIS_STRING);
+ redisAssert(o->refcount == 1);
+ redisAssert(o->type == REDIS_STRING);
freeStringObject(o);
vmMarkPagesFree(o->vm.page,o->vm.usedpages);
if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
!listAddNodeHead(server.objfreelist,o))
zfree(o);
+ server.vm_stats_swapped_objects--;
return;
}
/* REDIS_VM_MEMORY */
key->vm.atime = server.unixtime;
} else {
/* Our value was swapped on disk. Bring it at home. */
- assert(val == NULL);
+ redisAssert(val == NULL);
val = vmLoadObject(key);
dictGetEntryVal(de) = val;
}
if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
if (rdbSaveTime(fp,expiretime) == -1) goto werr;
}
- /* Save the key and associated value */
- if (rdbSaveType(fp,o->type) == -1) goto werr;
- if (rdbSaveStringObject(fp,key) == -1) goto werr;
- /* Save the actual value */
- if (rdbSaveObject(fp,o) == -1) goto werr;
+ /* Save the key and associated value. This requires special
+ * handling if the value is swapped out. */
+ if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY) {
+ /* Save type, key, value */
+ if (rdbSaveType(fp,o->type) == -1) goto werr;
+ if (rdbSaveStringObject(fp,key) == -1) goto werr;
+ if (rdbSaveObject(fp,o) == -1) goto werr;
+ } else {
+ robj *po, *newkey;
+ /* Get a preview of the object in memory */
+ po = vmPreviewObject(key);
+ /* Also duplicate the key object, to pass around a standard
+ * string object. */
+ newkey = dupStringObject(key);
+ /* Save type, key, value */
+ if (rdbSaveType(fp,key->vtype) == -1) goto werr;
+ if (rdbSaveStringObject(fp,newkey) == -1) goto werr;
+ if (rdbSaveObject(fp,po) == -1) goto werr;
+ /* Remove the loaded object from memory */
+ decrRefCount(po);
+ decrRefCount(newkey);
+ }
}
dictReleaseIterator(di);
}
redisDb *db = server.db+0;
char buf[1024];
time_t expiretime = -1, now = time(NULL);
+ long long loadedkeys = 0;
fp = fopen(filename,"r");
if (!fp) return REDIS_ERR;
expiretime = -1;
}
keyobj = o = NULL;
+ /* Handle swapping while loading big datasets when VM is on */
+ loadedkeys++;
+ if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+ while (zmalloc_used_memory() > server.vm_max_memory) {
+ if (vmSwapOneObject() == REDIS_ERR) break;
+ }
+ }
}
fclose(fp);
return REDIS_OK;
retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
if (retval == DICT_ERR) {
if (!nx) {
+ /* If the key is about a swapped value, we want a new key object
+ * to overwrite the old. So we delete the old key in the database.
+ * This will also make sure that swap pages about the old object
+ * will be marked as free. */
+ if (deleteIfSwapped(c->db,c->argv[1]))
+ incrRefCount(c->argv[1]);
dictReplace(c->db->dict,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
} else {
"redis_version:%s\r\n"
"arch_bits:%s\r\n"
"multiplexing_api:%s\r\n"
+ "process_id:%ld\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
"connected_clients:%d\r\n"
"bgrewriteaof_in_progress:%d\r\n"
"total_connections_received:%lld\r\n"
"total_commands_processed:%lld\r\n"
+ "vm_enabled:%d\r\n"
"role:%s\r\n"
,REDIS_VERSION,
(sizeof(long) == 8) ? "64" : "32",
aeGetApiName(),
+ (long) getpid(),
uptime,
uptime/(3600*24),
listLength(server.clients)-listLength(server.slaves),
server.bgrewritechildpid != -1,
server.stat_numconnections,
server.stat_numcommands,
+ server.vm_enabled != 0,
server.masterhost == NULL ? "master" : "slave"
);
if (server.masterhost) {
server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
);
}
+ if (server.vm_enabled) {
+ info = sdscatprintf(info,
+ "vm_conf_max_memory:%llu\r\n"
+ "vm_conf_page_size:%llu\r\n"
+ "vm_conf_pages:%llu\r\n"
+ "vm_stats_used_pages:%llu\r\n"
+ "vm_stats_swapped_objects:%llu\r\n"
+ "vm_stats_swappin_count:%llu\r\n"
+ "vm_stats_swappout_count:%llu\r\n"
+ ,(unsigned long long) server.vm_max_memory,
+ (unsigned long long) server.vm_page_size,
+ (unsigned long long) server.vm_pages,
+ (unsigned long long) server.vm_stats_used_pages,
+ (unsigned long long) server.vm_stats_swapped_objects,
+ (unsigned long long) server.vm_stats_swapins,
+ (unsigned long long) server.vm_stats_swapouts
+ );
+ }
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
- redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
+ redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
return;
/* ============================ Maxmemory directive ======================== */
+/* Free one object form the pre-allocated objects free list. This is useful
+ * under low mem conditions as by default we take 1 million free objects
+ * allocated. */
+static void freeOneObjectFromFreelist(void) {
+ robj *o;
+
+ listNode *head = listFirst(server.objfreelist);
+ o = listNodeValue(head);
+ listDelNode(server.objfreelist,head);
+ zfree(o);
+}
+
/* This function gets called when 'maxmemory' is set on the config file to limit
* the max memory used by the server, and we are out of memory.
* This function will try to, in order:
static void freeMemoryIfNeeded(void) {
while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
if (listLength(server.objfreelist)) {
- robj *o;
-
- listNode *head = listFirst(server.objfreelist);
- o = listNodeValue(head);
- listDelNode(server.objfreelist,head);
- zfree(o);
+ freeOneObjectFromFreelist();
} else {
int j, k, freed = 0;
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
+ unsigned long long loadedkeys = 0;
if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
return REDIS_ERR;
/* Clean up, ready for the next command */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
+ /* Handle swapping while loading big datasets when VM is on */
+ loadedkeys++;
+ if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+ while (zmalloc_used_memory() > server.vm_max_memory) {
+ if (vmSwapOneObject() == REDIS_ERR) break;
+ }
+ }
}
fclose(fp);
freeFakeClient(fakeClient);
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- robj *key = dictGetEntryKey(de);
- robj *o = dictGetEntryVal(de);
- time_t expiretime = getExpire(db,key);
+ robj *key, *o;
+ time_t expiretime;
+ int swapped;
+
+ key = dictGetEntryKey(de);
+ if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY) {
+ o = dictGetEntryVal(de);
+ swapped = 0;
+ } else {
+ o = vmPreviewObject(key);
+ key = dupStringObject(key);
+ swapped = 1;
+ }
+ expiretime = getExpire(db,key);
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
if (fwriteBulk(fp,key) == 0) goto werr;
if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
}
+ /* We created a few temp objects if the key->value pair
+ * was about a swapped out object. Free both. */
+ if (swapped) {
+ decrRefCount(key);
+ decrRefCount(o);
+ }
}
dictReleaseIterator(di);
}
server.vm_fd = fileno(server.vm_fp);
server.vm_next_page = 0;
server.vm_near_pages = 0;
+ server.vm_stats_used_pages = 0;
+ server.vm_stats_swapped_objects = 0;
+ server.vm_stats_swapouts = 0;
+ server.vm_stats_swapins = 0;
totsize = server.vm_pages*server.vm_page_size;
redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
if (ftruncate(server.vm_fd,totsize) == -1) {
redisLog(REDIS_NOTICE,"Swap file allocated with success");
}
server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
- redisLog(REDIS_DEBUG,"Allocated %lld bytes page table for %lld pages",
+ redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
(long long) (server.vm_pages+7)/8, server.vm_pages);
memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
/* Try to remove the swap file, so the OS will really delete it from the
off_t byte = page/8;
int bit = page&7;
server.vm_bitmap[byte] |= 1<<bit;
- printf("Mark used: %lld (byte:%lld bit:%d)\n", (long long)page,
- (long long)byte, bit);
+ redisLog(REDIS_DEBUG,"Mark used: %lld (byte:%lld bit:%d)\n",
+ (long long)page, (long long)byte, bit);
}
/* Mark N contiguous pages as used, with 'page' being the first. */
for (j = 0; j < count; j++)
vmMarkPageUsed(page+j);
+ server.vm_stats_used_pages += count;
}
/* Mark the page as free */
for (j = 0; j < count; j++)
vmMarkPageFree(page+j);
+ server.vm_stats_used_pages -= count;
}
/* Test if the page is free */
while(offset < server.vm_pages) {
off_t this = base+offset;
- printf("THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
+ redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
/* If we overflow, restart from page zero */
if (this >= server.vm_pages) {
this -= server.vm_pages;
redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
(unsigned char*) key->ptr,
(unsigned long long) page, (unsigned long long) pages);
+ server.vm_stats_swapped_objects++;
+ server.vm_stats_swapouts++;
return REDIS_OK;
}
/* Load the value object relative to the 'key' object from swap to memory.
- * The newly allocated object is returned. */
-static robj *vmLoadObject(robj *key) {
+ * The newly allocated object is returned.
+ *
+ * If preview is true the unserialized object is returned to the caller but
+ * no changes are made to the key object, nor the pages are marked as freed */
+static robj *vmGenericLoadObject(robj *key, int preview) {
robj *val;
- assert(key->storage == REDIS_VM_SWAPPED);
+ redisAssert(key->storage == REDIS_VM_SWAPPED);
if (fseeko(server.vm_fp,key->vm.page*server.vm_page_size,SEEK_SET) == -1) {
redisLog(REDIS_WARNING,
"Unrecoverable VM problem in vmLoadObject(): can't seek: %s",
redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno));
exit(1);
}
- key->storage = REDIS_VM_MEMORY;
- key->vm.atime = server.unixtime;
- vmMarkPagesFree(key->vm.page,key->vm.usedpages);
- redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
- (unsigned char*) key->ptr);
+ if (!preview) {
+ key->storage = REDIS_VM_MEMORY;
+ key->vm.atime = server.unixtime;
+ vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+ redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
+ (unsigned char*) key->ptr);
+ server.vm_stats_swapped_objects--;
+ } else {
+ redisLog(REDIS_DEBUG, "VM: object %s previewed from disk",
+ (unsigned char*) key->ptr);
+ }
+ server.vm_stats_swapins++;
return val;
}
+/* Plain object loading, from swap to memory */
+static robj *vmLoadObject(robj *key) {
+ return vmGenericLoadObject(key,0);
+}
+
+/* Just load the value on disk, without to modify the key.
+ * This is useful when we want to perform some operation on the value
+ * without to really bring it from swap to memory, like while saving the
+ * dataset or rewriting the append only log. */
+static robj *vmPreviewObject(robj *key) {
+ return vmGenericLoadObject(key,1);
+}
+
/* How a good candidate is this object for swapping?
* The better candidate it is, the greater the returned value.
*
}
}
+/* Return true if it's safe to swap out objects in a given moment.
+ * Basically we don't want to swap objects out while there is a BGSAVE
+ * or a BGAEOREWRITE running in backgroud. */
+static int vmCanSwapOut(void) {
+ return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
+}
+
+/* Delete a key if swapped. Returns 1 if the key was found, was swapped
+ * and was deleted. Otherwise 0 is returned. */
+static int deleteIfSwapped(redisDb *db, robj *key) {
+ dictEntry *de;
+ robj *foundkey;
+
+ if ((de = dictFind(db->dict,key)) == NULL) return 0;
+ foundkey = dictGetEntryKey(de);
+ if (foundkey->storage == REDIS_VM_MEMORY) return 0;
+ deleteKey(db,key);
+ return 1;
+}
+
/* ================================= Debugging ============================== */
static void debugCommand(redisClient *c) {