len = dsKeyToPath(db,buf,key);
memcpy(buf2,buf,len);
- snprintf(buf2+len,sizeof(buf2)-len,"%ld.%ld",(long)time(NULL),(long)val);
+ snprintf(buf2+len,sizeof(buf2)-len,"_%ld_%ld",(long)time(NULL),(long)val);
fp = fopen(buf2,"w");
if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1)
return REDIS_ERR;
*/
void spawnIOThread(void);
+int cacheScheduleIOPushJobs(int onlyloads);
/* =================== Virtual Memory - Blocking Side ====================== */
}
}
if (best == NULL) {
- /* FIXME: If there are objects that are in the write queue
- * so we can't delete them we should block here, at the cost of
- * slowness as the object cache memory limit is considered
- * n hard limit. */
+ /* Was not able to fix a single object... we should check if our
+ * IO queues have stuff in queue, and try to consume the queue
+ * otherwise we'll use an infinite amount of memory if changes to
+ * the dataset are faster than I/O */
+ if (listLength(server.cache_io_queue) > 0) {
+ cacheScheduleIOPushJobs(0);
+ waitEmptyIOJobsQueue();
+ processAllPendingIOJobs();
+ return REDIS_OK;
+ }
+ /* Nothing to free at all... */
return REDIS_ERR;
}
key = dictGetEntryKey(best);
#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
-void cacheScheduleIOPushJobs(int onlyloads);
-
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag) {
struct dictEntry *de = dictFind(db->io_queued,key);
* (not protected by lookupKey() that will block on keys in IO_SAVEINPROG
* state. */
#define MAX_IO_JOBS_QUEUE 100
-void cacheScheduleIOPushJobs(int onlyloads) {
+int cacheScheduleIOPushJobs(int onlyloads) {
time_t now = time(NULL);
listNode *ln;
- int jobs, topush = 0;
+ int jobs, topush = 0, pushed = 0;
/* Sync stuff on disk, but only if we have less
* than MAX_IO_JOBS_QUEUE IO jobs. */
listDelNode(server.cache_io_queue,ln);
decrRefCount(op->key);
zfree(op);
+ pushed++;
} else {
break; /* too early */
}
}
+ return pushed;
}
void cacheCron(void) {