+ if (!freed) return; /* nothing to free... */
+ }
+}
+
+/* ============================== Append Only file ========================== */
+
+static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+ sds buf = sdsempty();
+ int j;
+ ssize_t nwritten;
+ time_t now;
+ robj *tmpargv[3];
+
+ /* The DB this command was targetting is not the same as the last command
+ * we appendend. To issue a SELECT command is needed. */
+ if (dictid != server.appendseldb) {
+ char seldb[64];
+
+ snprintf(seldb,sizeof(seldb),"%d",dictid);
+ buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
+ (unsigned long)strlen(seldb),seldb);
+ server.appendseldb = dictid;
+ }
+
+ /* "Fix" the argv vector if the command is EXPIRE. We want to translate
+ * EXPIREs into EXPIREATs calls */
+ if (cmd->proc == expireCommand) {
+ long when;
+
+ tmpargv[0] = createStringObject("EXPIREAT",8);
+ tmpargv[1] = argv[1];
+ incrRefCount(argv[1]);
+ when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
+ tmpargv[2] = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),"%ld",when));
+ argv = tmpargv;
+ }
+
+ /* Append the actual command */
+ buf = sdscatprintf(buf,"*%d\r\n",argc);
+ for (j = 0; j < argc; j++) {
+ robj *o = argv[j];
+
+ o = getDecodedObject(o);
+ buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
+ buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
+ buf = sdscatlen(buf,"\r\n",2);
+ decrRefCount(o);
+ }
+
+ /* Free the objects from the modified argv for EXPIREAT */
+ if (cmd->proc == expireCommand) {
+ for (j = 0; j < 3; j++)
+ decrRefCount(argv[j]);
+ }
+
+ /* We want to perform a single write. This should be guaranteed atomic
+ * at least if the filesystem we are writing is a real physical one.
+ * While this will save us against the server being killed I don't think
+ * there is much to do about the whole server stopping for power problems
+ * or alike */
+ nwritten = write(server.appendfd,buf,sdslen(buf));
+ if (nwritten != (signed)sdslen(buf)) {
+ /* Ooops, we are in troubles. The best thing to do for now is
+ * to simply exit instead to give the illusion that everything is
+ * working as expected. */
+ if (nwritten == -1) {
+ redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+ } else {
+ redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+ }
+ exit(1);
+ }
+ /* If a background append only file rewriting is in progress we want to
+ * accumulate the differences between the child DB and the current one
+ * in a buffer, so that when the child process will do its work we
+ * can append the differences to the new append only file. */
+ if (server.bgrewritechildpid != -1)
+ server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
+
+ sdsfree(buf);
+ now = time(NULL);
+ if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+ (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+ now-server.lastfsync > 1))
+ {
+ fsync(server.appendfd); /* Let's try to get this data on the disk */
+ server.lastfsync = now;
+ }
+}
+
+/* In Redis commands are always executed in the context of a client, so in
+ * order to load the append only file we need to create a fake client. */
+static struct redisClient *createFakeClient(void) {
+ struct redisClient *c = zmalloc(sizeof(*c));
+
+ selectDb(c,0);
+ c->fd = -1;
+ c->querybuf = sdsempty();
+ c->argc = 0;
+ c->argv = NULL;
+ c->flags = 0;
+ /* We set the fake client as a slave waiting for the synchronization
+ * so that Redis will not try to send replies to this client. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ c->reply = listCreate();
+ listSetFreeMethod(c->reply,decrRefCount);
+ listSetDupMethod(c->reply,dupClientReplyValue);
+ return c;
+}
+
+static void freeFakeClient(struct redisClient *c) {
+ sdsfree(c->querybuf);
+ listRelease(c->reply);
+ zfree(c);
+}
+
+/* Replay the append log file. On error REDIS_OK is returned. On non fatal
+ * error (the append only file is zero-length) REDIS_ERR is returned. On
+ * fatal error an error message is logged and the program exists. */
+int loadAppendOnlyFile(char *filename) {
+ struct redisClient *fakeClient;
+ FILE *fp = fopen(filename,"r");
+ struct redis_stat sb;
+ unsigned long long loadedkeys = 0;
+
+ if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
+ return REDIS_ERR;
+
+ if (fp == NULL) {
+ redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
+ exit(1);
+ }
+
+ fakeClient = createFakeClient();
+ while(1) {
+ int argc, j;
+ unsigned long len;
+ robj **argv;
+ char buf[128];
+ sds argsds;
+ struct redisCommand *cmd;
+
+ if (fgets(buf,sizeof(buf),fp) == NULL) {
+ if (feof(fp))
+ break;
+ else
+ goto readerr;
+ }
+ if (buf[0] != '*') goto fmterr;
+ argc = atoi(buf+1);
+ argv = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
+ if (buf[0] != '$') goto fmterr;
+ len = strtol(buf+1,NULL,10);
+ argsds = sdsnewlen(NULL,len);
+ if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
+ argv[j] = createObject(REDIS_STRING,argsds);
+ if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
+ }
+
+ /* Command lookup */
+ cmd = lookupCommand(argv[0]->ptr);
+ if (!cmd) {
+ redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
+ exit(1);
+ }
+ /* Try object encoding */
+ if (cmd->flags & REDIS_CMD_BULK)
+ argv[argc-1] = tryObjectEncoding(argv[argc-1]);
+ /* Run the command in the context of a fake client */
+ fakeClient->argc = argc;
+ fakeClient->argv = argv;
+ cmd->proc(fakeClient);
+ /* Discard the reply objects list from the fake client */
+ while(listLength(fakeClient->reply))
+ listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+ /* Clean up, ready for the next command */
+ for (j = 0; j < argc; j++) decrRefCount(argv[j]);
+ zfree(argv);
+ /* Handle swapping while loading big datasets when VM is on */
+ loadedkeys++;
+ if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+ while (zmalloc_used_memory() > server.vm_max_memory) {
+ if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
+ }
+ }
+ }
+ fclose(fp);
+ freeFakeClient(fakeClient);
+ return REDIS_OK;
+
+readerr:
+ if (feof(fp)) {
+ redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
+ } else {
+ redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
+ }
+ exit(1);
+fmterr:
+ redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+ exit(1);
+}
+
+/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
+static int fwriteBulkObject(FILE *fp, robj *obj) {
+ char buf[128];
+ int decrrc = 0;
+
+ /* Avoid the incr/decr ref count business if possible to help
+ * copy-on-write (we are often in a child process when this function
+ * is called).
+ * Also makes sure that key objects don't get incrRefCount-ed when VM
+ * is enabled */
+ if (obj->encoding != REDIS_ENCODING_RAW) {
+ obj = getDecodedObject(obj);
+ decrrc = 1;
+ }
+ snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
+ if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
+ if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
+ goto err;
+ if (fwrite("\r\n",2,1,fp) == 0) goto err;
+ if (decrrc) decrRefCount(obj);
+ return 1;
+err:
+ if (decrrc) decrRefCount(obj);
+ return 0;
+}
+
+/* Write binary-safe string into a file in the bulkformat
+ * $<count>\r\n<payload>\r\n */
+static int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
+ char buf[128];
+
+ snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len);
+ if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+ if (len && fwrite(s,len,1,fp) == 0) return 0;
+ if (fwrite("\r\n",2,1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
+static int fwriteBulkDouble(FILE *fp, double d) {
+ char buf[128], dbuf[128];
+
+ snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
+ snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
+ if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+ if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
+static int fwriteBulkLong(FILE *fp, long l) {
+ char buf[128], lbuf[128];
+
+ snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
+ snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
+ if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+ if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
+ return 1;
+}
+
+/* Write a sequence of commands able to fully rebuild the dataset into
+ * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
+static int rewriteAppendOnlyFile(char *filename) {
+ dictIterator *di = NULL;
+ dictEntry *de;
+ FILE *fp;
+ char tmpfile[256];
+ int j;
+ time_t now = time(NULL);
+
+ /* Note that we have to use a different temp name here compared to the
+ * one used by rewriteAppendOnlyFileBackground() function. */
+ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
+ fp = fopen(tmpfile,"w");
+ if (!fp) {
+ redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
+ return REDIS_ERR;
+ }
+ for (j = 0; j < server.dbnum; j++) {
+ char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
+ redisDb *db = server.db+j;
+ dict *d = db->dict;
+ if (dictSize(d) == 0) continue;
+ di = dictGetIterator(d);
+ if (!di) {
+ fclose(fp);
+ return REDIS_ERR;
+ }
+
+ /* SELECT the new DB */
+ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkLong(fp,j) == 0) goto werr;
+
+ /* Iterate this DB writing every entry */
+ while((de = dictNext(di)) != NULL) {
+ robj *key, *o;
+ time_t expiretime;
+ int swapped;
+
+ key = dictGetEntryKey(de);
+ /* If the value for this key is swapped, load a preview in memory.
+ * We use a "swapped" flag to remember if we need to free the
+ * value object instead to just increment the ref count anyway
+ * in order to avoid copy-on-write of pages if we are forked() */
+ if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
+ key->storage == REDIS_VM_SWAPPING) {
+ o = dictGetEntryVal(de);
+ swapped = 0;
+ } else {
+ o = vmPreviewObject(key);
+ swapped = 1;
+ }
+ expiretime = getExpire(db,key);
+
+ /* Save the key and associated value */
+ if (o->type == REDIS_STRING) {
+ /* Emit a SET command */
+ char cmd[]="*3\r\n$3\r\nSET\r\n";
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ /* Key and value */
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkObject(fp,o) == 0) goto werr;
+ } else if (o->type == REDIS_LIST) {
+ /* Emit the RPUSHes needed to rebuild the list */
+ list *list = o->ptr;
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while((ln = listNext(&li))) {
+ char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
+ robj *eleobj = listNodeValue(ln);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ } else if (o->type == REDIS_SET) {
+ /* Emit the SADDs needed to rebuild the set */
+ dict *set = o->ptr;
+ dictIterator *di = dictGetIterator(set);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ char cmd[]="*3\r\n$4\r\nSADD\r\n";
+ robj *eleobj = dictGetEntryKey(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_ZSET) {
+ /* Emit the ZADDs needed to rebuild the sorted set */
+ zset *zs = o->ptr;
+ dictIterator *di = dictGetIterator(zs->dict);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ char cmd[]="*4\r\n$4\r\nZADD\r\n";
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkDouble(fp,*score) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ dictReleaseIterator(di);
+ } else if (o->type == REDIS_HASH) {
+ char cmd[]="*4\r\n$4\r\nHSET\r\n";
+
+ /* Emit the HSETs needed to rebuild the hash */
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ unsigned char *p = zipmapRewind(o->ptr);
+ unsigned char *field, *val;
+ unsigned int flen, vlen;
+
+ while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkString(fp,(char*)field,flen) == -1)
+ return -1;
+ if (fwriteBulkString(fp,(char*)val,vlen) == -1)
+ return -1;
+ }
+ } else {
+ dictIterator *di = dictGetIterator(o->ptr);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *field = dictGetEntryKey(de);
+ robj *val = dictGetEntryVal(de);
+
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkObject(fp,field) == -1) return -1;
+ if (fwriteBulkObject(fp,val) == -1) return -1;
+ }
+ dictReleaseIterator(di);
+ }
+ } else {
+ redisPanic("Unknown object type");
+ }
+ /* Save the expire time */
+ if (expiretime != -1) {
+ char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
+ /* If this key is already expired skip it */
+ if (expiretime < now) continue;
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,key) == 0) goto werr;
+ if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
+ }
+ if (swapped) decrRefCount(o);
+ }
+ dictReleaseIterator(di);
+ }
+
+ /* Make sure data will not remain on the OS's output buffers */
+ fflush(fp);
+ fsync(fileno(fp));
+ fclose(fp);
+
+ /* Use RENAME to make sure the DB file is changed atomically only
+ * if the generate DB file is ok. */
+ if (rename(tmpfile,filename) == -1) {
+ redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
+ unlink(tmpfile);
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
+ return REDIS_OK;
+
+werr:
+ fclose(fp);
+ unlink(tmpfile);
+ redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
+ if (di) dictReleaseIterator(di);
+ return REDIS_ERR;
+}
+
+/* This is how rewriting of the append only file in background works:
+ *
+ * 1) The user calls BGREWRITEAOF
+ * 2) Redis calls this function, that forks():
+ * 2a) the child rewrite the append only file in a temp file.
+ * 2b) the parent accumulates differences in server.bgrewritebuf.
+ * 3) When the child finished '2a' exists.
+ * 4) The parent will trap the exit code, if it's OK, will append the
+ * data accumulated into server.bgrewritebuf into the temp file, and
+ * finally will rename(2) the temp file in the actual file name.
+ * The the new file is reopened as the new append only file. Profit!
+ */
+static int rewriteAppendOnlyFileBackground(void) {
+ pid_t childpid;
+
+ if (server.bgrewritechildpid != -1) return REDIS_ERR;
+ if (server.vm_enabled) waitEmptyIOJobsQueue();
+ if ((childpid = fork()) == 0) {
+ /* Child */
+ char tmpfile[256];
+
+ if (server.vm_enabled) vmReopenSwapFile();
+ close(server.fd);
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
+ if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
+ _exit(0);
+ } else {
+ _exit(1);
+ }
+ } else {
+ /* Parent */
+ if (childpid == -1) {
+ redisLog(REDIS_WARNING,
+ "Can't rewrite append only file in background: fork: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ redisLog(REDIS_NOTICE,
+ "Background append only file rewriting started by pid %d",childpid);
+ server.bgrewritechildpid = childpid;
+ updateDictResizePolicy();
+ /* We set appendseldb to -1 in order to force the next call to the
+ * feedAppendOnlyFile() to issue a SELECT command, so the differences
+ * accumulated by the parent into server.bgrewritebuf will start
+ * with a SELECT statement and it will be safe to merge. */
+ server.appendseldb = -1;
+ return REDIS_OK;
+ }
+ return REDIS_OK; /* unreached */
+}
+
+static void bgrewriteaofCommand(redisClient *c) {
+ if (server.bgrewritechildpid != -1) {
+ addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+ return;
+ }
+ if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
+ char *status = "+Background append only file rewriting started\r\n";
+ addReplySds(c,sdsnew(status));
+ } else {
+ addReply(c,shared.err);
+ }
+}
+
+static void aofRemoveTempFile(pid_t childpid) {
+ char tmpfile[256];
+
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
+ unlink(tmpfile);
+}
+
+/* Virtual Memory is composed mainly of two subsystems:
+ * - Blocking Virutal Memory
+ * - Threaded Virtual Memory I/O
+ * The two parts are not fully decoupled, but functions are split among two
+ * different sections of the source code (delimited by comments) in order to
+ * make more clear what functionality is about the blocking VM and what about
+ * the threaded (not blocking) VM.
+ *
+ * Redis VM design:
+ *
+ * Redis VM is a blocking VM (one that blocks reading swapped values from
+ * disk into memory when a value swapped out is needed in memory) that is made
+ * unblocking by trying to examine the command argument vector in order to
+ * load in background values that will likely be needed in order to exec
+ * the command. The command is executed only once all the relevant keys
+ * are loaded into memory.
+ *
+ * This basically is almost as simple of a blocking VM, but almost as parallel
+ * as a fully non-blocking VM.
+ */
+
+/* =================== Virtual Memory - Blocking Side ====================== */
+
+/* substitute the first occurrence of '%p' with the process pid in the
+ * swap file name. */
+static void expandVmSwapFilename(void) {
+ char *p = strstr(server.vm_swap_file,"%p");
+ sds new;
+
+ if (!p) return;
+ new = sdsempty();
+ *p = '\0';
+ new = sdscat(new,server.vm_swap_file);
+ new = sdscatprintf(new,"%ld",(long) getpid());
+ new = sdscat(new,p+2);
+ zfree(server.vm_swap_file);
+ server.vm_swap_file = new;
+}
+
+static void vmInit(void) {
+ off_t totsize;
+ int pipefds[2];
+ size_t stacksize;
+
+ if (server.vm_max_threads != 0)
+ zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
+
+ expandVmSwapFilename();
+ redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
+ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
+ server.vm_fp = fopen(server.vm_swap_file,"w+b");
+ }
+ if (server.vm_fp == NULL) {
+ redisLog(REDIS_WARNING,
+ "Impossible to open the swap file: %s. Exiting.",
+ strerror(errno));
+ exit(1);
+ }
+ server.vm_fd = fileno(server.vm_fp);
+ server.vm_next_page = 0;
+ server.vm_near_pages = 0;
+ server.vm_stats_used_pages = 0;
+ server.vm_stats_swapped_objects = 0;
+ server.vm_stats_swapouts = 0;
+ server.vm_stats_swapins = 0;
+ totsize = server.vm_pages*server.vm_page_size;
+ redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
+ if (ftruncate(server.vm_fd,totsize) == -1) {
+ redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
+ strerror(errno));
+ exit(1);
+ } else {
+ redisLog(REDIS_NOTICE,"Swap file allocated with success");
+ }
+ server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
+ redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
+ (long long) (server.vm_pages+7)/8, server.vm_pages);
+ memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
+
+ /* Initialize threaded I/O (used by Virtual Memory) */
+ server.io_newjobs = listCreate();
+ server.io_processing = listCreate();
+ server.io_processed = listCreate();
+ server.io_ready_clients = listCreate();
+ pthread_mutex_init(&server.io_mutex,NULL);
+ pthread_mutex_init(&server.obj_freelist_mutex,NULL);
+ pthread_mutex_init(&server.io_swapfile_mutex,NULL);
+ server.io_active_threads = 0;
+ if (pipe(pipefds) == -1) {
+ redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting."
+ ,strerror(errno));
+ exit(1);
+ }
+ server.io_ready_pipe_read = pipefds[0];
+ server.io_ready_pipe_write = pipefds[1];
+ redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
+ /* LZF requires a lot of stack */
+ pthread_attr_init(&server.io_threads_attr);
+ pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+ pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
+ /* Listen for events in the threaded I/O pipe */
+ if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
+ vmThreadedIOCompletedJob, NULL) == AE_ERR)
+ oom("creating file event");
+}
+
+/* Mark the page as used */
+static void vmMarkPageUsed(off_t page) {
+ off_t byte = page/8;
+ int bit = page&7;
+ redisAssert(vmFreePage(page) == 1);
+ server.vm_bitmap[byte] |= 1<<bit;
+}
+
+/* Mark N contiguous pages as used, with 'page' being the first. */
+static void vmMarkPagesUsed(off_t page, off_t count) {
+ off_t j;
+
+ for (j = 0; j < count; j++)
+ vmMarkPageUsed(page+j);
+ server.vm_stats_used_pages += count;
+ redisLog(REDIS_DEBUG,"Mark USED pages: %lld pages at %lld\n",
+ (long long)count, (long long)page);
+}
+
+/* Mark the page as free */
+static void vmMarkPageFree(off_t page) {
+ off_t byte = page/8;
+ int bit = page&7;
+ redisAssert(vmFreePage(page) == 0);
+ server.vm_bitmap[byte] &= ~(1<<bit);
+}
+
+/* Mark N contiguous pages as free, with 'page' being the first. */
+static void vmMarkPagesFree(off_t page, off_t count) {
+ off_t j;
+
+ for (j = 0; j < count; j++)
+ vmMarkPageFree(page+j);
+ server.vm_stats_used_pages -= count;
+ redisLog(REDIS_DEBUG,"Mark FREE pages: %lld pages at %lld\n",
+ (long long)count, (long long)page);
+}
+
+/* Test if the page is free */
+static int vmFreePage(off_t page) {
+ off_t byte = page/8;
+ int bit = page&7;
+ return (server.vm_bitmap[byte] & (1<<bit)) == 0;
+}
+
+/* Find N contiguous free pages storing the first page of the cluster in *first.
+ * Returns REDIS_OK if it was able to find N contiguous pages, otherwise
+ * REDIS_ERR is returned.
+ *
+ * This function uses a simple algorithm: we try to allocate
+ * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start
+ * again from the start of the swap file searching for free spaces.
+ *
+ * If it looks pretty clear that there are no free pages near our offset
+ * we try to find less populated places doing a forward jump of
+ * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages
+ * without hurry, and then we jump again and so forth...
+ *
+ * This function can be improved using a free list to avoid to guess
+ * too much, since we could collect data about freed pages.
+ *
+ * note: I implemented this function just after watching an episode of
+ * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
+ */
+static int vmFindContiguousPages(off_t *first, off_t n) {
+ off_t base, offset = 0, since_jump = 0, numfree = 0;
+
+ if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
+ server.vm_near_pages = 0;
+ server.vm_next_page = 0;
+ }
+ server.vm_near_pages++; /* Yet another try for pages near to the old ones */
+ base = server.vm_next_page;
+
+ while(offset < server.vm_pages) {
+ off_t this = base+offset;
+
+ /* If we overflow, restart from page zero */
+ if (this >= server.vm_pages) {
+ this -= server.vm_pages;
+ if (this == 0) {
+ /* Just overflowed, what we found on tail is no longer
+ * interesting, as it's no longer contiguous. */
+ numfree = 0;
+ }
+ }
+ if (vmFreePage(this)) {
+ /* This is a free page */
+ numfree++;
+ /* Already got N free pages? Return to the caller, with success */
+ if (numfree == n) {
+ *first = this-(n-1);
+ server.vm_next_page = this+1;
+ redisLog(REDIS_DEBUG, "FOUND CONTIGUOUS PAGES: %lld pages at %lld\n", (long long) n, (long long) *first);
+ return REDIS_OK;
+ }
+ } else {
+ /* The current one is not a free page */
+ numfree = 0;
+ }
+
+ /* Fast-forward if the current page is not free and we already
+ * searched enough near this place. */
+ since_jump++;
+ if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) {
+ offset += random() % REDIS_VM_MAX_RANDOM_JUMP;
+ since_jump = 0;
+ /* Note that even if we rewind after the jump, we are don't need
+ * to make sure numfree is set to zero as we only jump *if* it
+ * is set to zero. */
+ } else {
+ /* Otherwise just check the next page */
+ offset++;
+ }
+ }
+ return REDIS_ERR;
+}
+
+/* Write the specified object at the specified page of the swap file */
+static int vmWriteObjectOnSwap(robj *o, off_t page) {
+ if (server.vm_enabled) pthread_mutex_lock(&server.io_swapfile_mutex);
+ if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
+ if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex);
+ redisLog(REDIS_WARNING,
+ "Critical VM problem in vmWriteObjectOnSwap(): can't seek: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ rdbSaveObject(server.vm_fp,o);
+ fflush(server.vm_fp);
+ if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex);
+ return REDIS_OK;
+}
+
+/* Swap the 'val' object relative to 'key' into disk. Store all the information
+ * needed to later retrieve the object into the key object.
+ * If we can't find enough contiguous empty pages to swap the object on disk
+ * REDIS_ERR is returned. */
+static int vmSwapObjectBlocking(robj *key, robj *val) {
+ off_t pages = rdbSavedObjectPages(val,NULL);
+ off_t page;
+
+ assert(key->storage == REDIS_VM_MEMORY);
+ assert(key->refcount == 1);
+ if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
+ if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
+ key->vm.page = page;
+ key->vm.usedpages = pages;
+ key->storage = REDIS_VM_SWAPPED;
+ key->vtype = val->type;
+ decrRefCount(val); /* Deallocate the object from memory. */
+ vmMarkPagesUsed(page,pages);
+ redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
+ (unsigned char*) key->ptr,
+ (unsigned long long) page, (unsigned long long) pages);
+ server.vm_stats_swapped_objects++;
+ server.vm_stats_swapouts++;
+ return REDIS_OK;
+}
+
+static robj *vmReadObjectFromSwap(off_t page, int type) {
+ robj *o;
+
+ if (server.vm_enabled) pthread_mutex_lock(&server.io_swapfile_mutex);
+ if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
+ redisLog(REDIS_WARNING,
+ "Unrecoverable VM problem in vmReadObjectFromSwap(): can't seek: %s",
+ strerror(errno));
+ _exit(1);
+ }
+ o = rdbLoadObject(type,server.vm_fp);
+ if (o == NULL) {
+ redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmReadObjectFromSwap(): can't load object from swap file: %s", strerror(errno));
+ _exit(1);
+ }
+ if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex);
+ return o;
+}
+
+/* Load the value object relative to the 'key' object from swap to memory.
+ * The newly allocated object is returned.
+ *
+ * If preview is true the unserialized object is returned to the caller but
+ * no changes are made to the key object, nor the pages are marked as freed */
+static robj *vmGenericLoadObject(robj *key, int preview) {
+ robj *val;
+
+ redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING);
+ val = vmReadObjectFromSwap(key->vm.page,key->vtype);
+ if (!preview) {
+ key->storage = REDIS_VM_MEMORY;
+ key->vm.atime = server.unixtime;
+ vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+ redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
+ (unsigned char*) key->ptr);
+ server.vm_stats_swapped_objects--;
+ } else {
+ redisLog(REDIS_DEBUG, "VM: object %s previewed from disk",
+ (unsigned char*) key->ptr);
+ }
+ server.vm_stats_swapins++;
+ return val;
+}
+
+/* Plain object loading, from swap to memory */
+static robj *vmLoadObject(robj *key) {
+ /* If we are loading the object in background, stop it, we
+ * need to load this object synchronously ASAP. */
+ if (key->storage == REDIS_VM_LOADING)
+ vmCancelThreadedIOJob(key);
+ return vmGenericLoadObject(key,0);
+}
+
+/* Just load the value on disk, without to modify the key.
+ * This is useful when we want to perform some operation on the value
+ * without to really bring it from swap to memory, like while saving the
+ * dataset or rewriting the append only log. */
+static robj *vmPreviewObject(robj *key) {
+ return vmGenericLoadObject(key,1);
+}
+
+/* How a good candidate is this object for swapping?
+ * The better candidate it is, the greater the returned value.
+ *
+ * Currently we try to perform a fast estimation of the object size in
+ * memory, and combine it with aging informations.
+ *
+ * Basically swappability = idle-time * log(estimated size)
+ *
+ * Bigger objects are preferred over smaller objects, but not
+ * proportionally, this is why we use the logarithm. This algorithm is
+ * just a first try and will probably be tuned later. */
+static double computeObjectSwappability(robj *o) {
+ time_t age = server.unixtime - o->vm.atime;
+ long asize = 0;
+ list *l;
+ dict *d;
+ struct dictEntry *de;
+ int z;
+
+ if (age <= 0) return 0;
+ switch(o->type) {
+ case REDIS_STRING:
+ if (o->encoding != REDIS_ENCODING_RAW) {
+ asize = sizeof(*o);
+ } else {
+ asize = sdslen(o->ptr)+sizeof(*o)+sizeof(long)*2;
+ }
+ break;
+ case REDIS_LIST:
+ l = o->ptr;
+ listNode *ln = listFirst(l);
+
+ asize = sizeof(list);
+ if (ln) {
+ robj *ele = ln->value;
+ long elesize;
+
+ elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+ (sizeof(*o)+sdslen(ele->ptr)) :
+ sizeof(*o);
+ asize += (sizeof(listNode)+elesize)*listLength(l);
+ }
+ break;
+ case REDIS_SET:
+ case REDIS_ZSET:
+ z = (o->type == REDIS_ZSET);
+ d = z ? ((zset*)o->ptr)->dict : o->ptr;
+
+ asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+ if (z) asize += sizeof(zset)-sizeof(dict);
+ if (dictSize(d)) {
+ long elesize;
+ robj *ele;
+
+ de = dictGetRandomKey(d);
+ ele = dictGetEntryKey(de);
+ elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+ (sizeof(*o)+sdslen(ele->ptr)) :
+ sizeof(*o);
+ asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
+ if (z) asize += sizeof(zskiplistNode)*dictSize(d);
+ }
+ break;
+ case REDIS_HASH:
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ unsigned char *p = zipmapRewind((unsigned char*)o->ptr);
+ unsigned int len = zipmapLen((unsigned char*)o->ptr);
+ unsigned int klen, vlen;
+ unsigned char *key, *val;
+
+ if ((p = zipmapNext(p,&key,&klen,&val,&vlen)) == NULL) {
+ klen = 0;
+ vlen = 0;
+ }
+ asize = len*(klen+vlen+3);
+ } else if (o->encoding == REDIS_ENCODING_HT) {
+ d = o->ptr;
+ asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+ if (dictSize(d)) {
+ long elesize;
+ robj *ele;
+
+ de = dictGetRandomKey(d);
+ ele = dictGetEntryKey(de);
+ elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+ (sizeof(*o)+sdslen(ele->ptr)) :
+ sizeof(*o);
+ ele = dictGetEntryVal(de);
+ elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+ (sizeof(*o)+sdslen(ele->ptr)) :
+ sizeof(*o);
+ asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
+ }
+ }
+ break;
+ }
+ return (double)age*log(1+asize);
+}
+
+/* Try to swap an object that's a good candidate for swapping.
+ * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
+ * to swap any object at all.
+ *
+ * If 'usethreaded' is true, Redis will try to swap the object in background
+ * using I/O threads. */
+static int vmSwapOneObject(int usethreads) {
+ int j, i;
+ struct dictEntry *best = NULL;
+ double best_swappability = 0;
+ redisDb *best_db = NULL;
+ robj *key, *val;
+
+ for (j = 0; j < server.dbnum; j++) {
+ redisDb *db = server.db+j;
+ /* Why maxtries is set to 100?
+ * Because this way (usually) we'll find 1 object even if just 1% - 2%
+ * are swappable objects */
+ int maxtries = 100;
+
+ if (dictSize(db->dict) == 0) continue;
+ for (i = 0; i < 5; i++) {
+ dictEntry *de;
+ double swappability;
+
+ if (maxtries) maxtries--;
+ de = dictGetRandomKey(db->dict);
+ key = dictGetEntryKey(de);
+ val = dictGetEntryVal(de);
+ /* Only swap objects that are currently in memory.
+ *
+ * Also don't swap shared objects if threaded VM is on, as we
+ * try to ensure that the main thread does not touch the
+ * object while the I/O thread is using it, but we can't
+ * control other keys without adding additional mutex. */
+ if (key->storage != REDIS_VM_MEMORY ||
+ (server.vm_max_threads != 0 && val->refcount != 1)) {
+ if (maxtries) i--; /* don't count this try */
+ continue;
+ }
+ swappability = computeObjectSwappability(val);
+ if (!best || swappability > best_swappability) {
+ best = de;
+ best_swappability = swappability;
+ best_db = db;
+ }
+ }
+ }
+ if (best == NULL) return REDIS_ERR;
+ key = dictGetEntryKey(best);
+ val = dictGetEntryVal(best);
+
+ redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
+ key->ptr, best_swappability);
+
+ /* Unshare the key if needed */
+ if (key->refcount > 1) {
+ robj *newkey = dupStringObject(key);
+ decrRefCount(key);
+ key = dictGetEntryKey(best) = newkey;
+ }
+ /* Swap it */
+ if (usethreads) {
+ vmSwapObjectThreaded(key,val,best_db);
+ return REDIS_OK;
+ } else {
+ if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
+ dictGetEntryVal(best) = NULL;
+ return REDIS_OK;
+ } else {
+ return REDIS_ERR;
+ }
+ }
+}
+
+static int vmSwapOneObjectBlocking() {
+ return vmSwapOneObject(0);
+}
+
+static int vmSwapOneObjectThreaded() {
+ return vmSwapOneObject(1);
+}
+
+/* Return true if it's safe to swap out objects in a given moment.
+ * Basically we don't want to swap objects out while there is a BGSAVE
+ * or a BGAEOREWRITE running in backgroud. */
+static int vmCanSwapOut(void) {
+ return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
+}
+
+/* Delete a key if swapped. Returns 1 if the key was found, was swapped
+ * and was deleted. Otherwise 0 is returned. */
+static int deleteIfSwapped(redisDb *db, robj *key) {
+ dictEntry *de;
+ robj *foundkey;
+
+ if ((de = dictFind(db->dict,key)) == NULL) return 0;
+ foundkey = dictGetEntryKey(de);
+ if (foundkey->storage == REDIS_VM_MEMORY) return 0;
+ deleteKey(db,key);
+ return 1;
+}
+
+/* =================== Virtual Memory - Threaded I/O ======================= */
+
+static void freeIOJob(iojob *j) {
+ if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
+ j->type == REDIS_IOJOB_DO_SWAP ||
+ j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
+ decrRefCount(j->val);
+ /* We don't decrRefCount the j->key field as we did't incremented
+ * the count creating IO Jobs. This is because the key field here is
+ * just used as an indentifier and if a key is removed the Job should
+ * never be touched again. */
+ zfree(j);
+}
+
+/* Every time a thread finished a Job, it writes a byte into the write side
+ * of an unix pipe in order to "awake" the main thread, and this function
+ * is called. */
+static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
+ int mask)
+{
+ char buf[1];
+ int retval, processed = 0, toprocess = -1, trytoswap = 1;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ /* For every byte we read in the read side of the pipe, there is one
+ * I/O job completed to process. */
+ while((retval = read(fd,buf,1)) == 1) {
+ iojob *j;
+ listNode *ln;
+ robj *key;
+ struct dictEntry *de;
+
+ redisLog(REDIS_DEBUG,"Processing I/O completed job");
+
+ /* Get the processed element (the oldest one) */
+ lockThreadedIO();
+ assert(listLength(server.io_processed) != 0);
+ if (toprocess == -1) {
+ toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
+ if (toprocess <= 0) toprocess = 1;
+ }
+ ln = listFirst(server.io_processed);
+ j = ln->value;
+ listDelNode(server.io_processed,ln);
+ unlockThreadedIO();
+ /* If this job is marked as canceled, just ignore it */
+ if (j->canceled) {
+ freeIOJob(j);
+ continue;
+ }
+ /* Post process it in the main thread, as there are things we
+ * can do just here to avoid race conditions and/or invasive locks */
+ redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
+ de = dictFind(j->db->dict,j->key);
+ assert(de != NULL);
+ key = dictGetEntryKey(de);
+ if (j->type == REDIS_IOJOB_LOAD) {
+ redisDb *db;
+
+ /* Key loaded, bring it at home */
+ key->storage = REDIS_VM_MEMORY;
+ key->vm.atime = server.unixtime;
+ vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+ redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
+ (unsigned char*) key->ptr);
+ server.vm_stats_swapped_objects--;
+ server.vm_stats_swapins++;
+ dictGetEntryVal(de) = j->val;
+ incrRefCount(j->val);
+ db = j->db;
+ freeIOJob(j);
+ /* Handle clients waiting for this key to be loaded. */
+ handleClientsBlockedOnSwappedKey(db,key);
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ /* Now we know the amount of pages required to swap this object.
+ * Let's find some space for it, and queue this task again
+ * rebranded as REDIS_IOJOB_DO_SWAP. */
+ if (!vmCanSwapOut() ||
+ vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
+ {
+ /* Ooops... no space or we can't swap as there is
+ * a fork()ed Redis trying to save stuff on disk. */
+ freeIOJob(j);
+ key->storage = REDIS_VM_MEMORY; /* undo operation */
+ } else {
+ /* Note that we need to mark this pages as used now,
+ * if the job will be canceled, we'll mark them as freed
+ * again. */
+ vmMarkPagesUsed(j->page,j->pages);
+ j->type = REDIS_IOJOB_DO_SWAP;
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ }
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ robj *val;
+
+ /* Key swapped. We can finally free some memory. */
+ if (key->storage != REDIS_VM_SWAPPING) {
+ printf("key->storage: %d\n",key->storage);
+ printf("key->name: %s\n",(char*)key->ptr);
+ printf("key->refcount: %d\n",key->refcount);
+ printf("val: %p\n",(void*)j->val);
+ printf("val->type: %d\n",j->val->type);
+ printf("val->ptr: %s\n",(char*)j->val->ptr);
+ }
+ redisAssert(key->storage == REDIS_VM_SWAPPING);
+ val = dictGetEntryVal(de);
+ key->vm.page = j->page;
+ key->vm.usedpages = j->pages;
+ key->storage = REDIS_VM_SWAPPED;
+ key->vtype = j->val->type;
+ decrRefCount(val); /* Deallocate the object from memory. */
+ dictGetEntryVal(de) = NULL;
+ redisLog(REDIS_DEBUG,
+ "VM: object %s swapped out at %lld (%lld pages) (threaded)",
+ (unsigned char*) key->ptr,
+ (unsigned long long) j->page, (unsigned long long) j->pages);
+ server.vm_stats_swapped_objects++;
+ server.vm_stats_swapouts++;
+ freeIOJob(j);
+ /* Put a few more swap requests in queue if we are still
+ * out of memory */
+ if (trytoswap && vmCanSwapOut() &&
+ zmalloc_used_memory() > server.vm_max_memory)
+ {
+ int more = 1;
+ while(more) {
+ lockThreadedIO();
+ more = listLength(server.io_newjobs) <
+ (unsigned) server.vm_max_threads;
+ unlockThreadedIO();
+ /* Don't waste CPU time if swappable objects are rare. */
+ if (vmSwapOneObjectThreaded() == REDIS_ERR) {
+ trytoswap = 0;
+ break;
+ }
+ }
+ }
+ }
+ processed++;
+ if (processed == toprocess) return;
+ }
+ if (retval < 0 && errno != EAGAIN) {
+ redisLog(REDIS_WARNING,
+ "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
+ strerror(errno));
+ }
+}
+
+static void lockThreadedIO(void) {
+ pthread_mutex_lock(&server.io_mutex);
+}
+
+static void unlockThreadedIO(void) {
+ pthread_mutex_unlock(&server.io_mutex);
+}
+
+/* Remove the specified object from the threaded I/O queue if still not
+ * processed, otherwise make sure to flag it as canceled. */
+static void vmCancelThreadedIOJob(robj *o) {
+ list *lists[3] = {
+ server.io_newjobs, /* 0 */
+ server.io_processing, /* 1 */
+ server.io_processed /* 2 */
+ };
+ int i;
+
+ assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
+ lockThreadedIO();
+ /* Search for a matching key in one of the queues */
+ for (i = 0; i < 3; i++) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(lists[i],&li);
+ while ((ln = listNext(&li)) != NULL) {
+ iojob *job = ln->value;
+
+ if (job->canceled) continue; /* Skip this, already canceled. */
+ if (job->key == o) {
+ redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
+ (void*)job, (char*)o->ptr, job->type, i);
+ /* Mark the pages as free since the swap didn't happened
+ * or happened but is now discarded. */
+ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP)
+ vmMarkPagesFree(job->page,job->pages);
+ /* Cancel the job. It depends on the list the job is
+ * living in. */
+ switch(i) {
+ case 0: /* io_newjobs */
+ /* If the job was yet not processed the best thing to do
+ * is to remove it from the queue at all */
+ freeIOJob(job);
+ listDelNode(lists[i],ln);
+ break;
+ case 1: /* io_processing */
+ /* Oh Shi- the thread is messing with the Job:
+ *
+ * Probably it's accessing the object if this is a
+ * PREPARE_SWAP or DO_SWAP job.
+ * If it's a LOAD job it may be reading from disk and
+ * if we don't wait for the job to terminate before to
+ * cancel it, maybe in a few microseconds data can be
+ * corrupted in this pages. So the short story is:
+ *
+ * Better to wait for the job to move into the
+ * next queue (processed)... */
+
+ /* We try again and again until the job is completed. */
+ unlockThreadedIO();
+ /* But let's wait some time for the I/O thread
+ * to finish with this job. After all this condition
+ * should be very rare. */
+ usleep(1);
+ goto again;
+ case 2: /* io_processed */
+ /* The job was already processed, that's easy...
+ * just mark it as canceled so that we'll ignore it
+ * when processing completed jobs. */
+ job->canceled = 1;
+ break;
+ }
+ /* Finally we have to adjust the storage type of the object
+ * in order to "UNDO" the operaiton. */
+ if (o->storage == REDIS_VM_LOADING)
+ o->storage = REDIS_VM_SWAPPED;
+ else if (o->storage == REDIS_VM_SWAPPING)
+ o->storage = REDIS_VM_MEMORY;
+ unlockThreadedIO();
+ return;
+ }
+ }
+ }
+ unlockThreadedIO();
+ assert(1 != 1); /* We should never reach this */
+}
+
+static void *IOThreadEntryPoint(void *arg) {
+ iojob *j;
+ listNode *ln;
+ REDIS_NOTUSED(arg);
+
+ pthread_detach(pthread_self());
+ while(1) {
+ /* Get a new job to process */
+ lockThreadedIO();
+ if (listLength(server.io_newjobs) == 0) {
+ /* No new jobs in queue, exit. */
+ redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
+ (long) pthread_self());
+ server.io_active_threads--;
+ unlockThreadedIO();
+ return NULL;
+ }
+ ln = listFirst(server.io_newjobs);
+ j = ln->value;
+ listDelNode(server.io_newjobs,ln);
+ /* Add the job in the processing queue */
+ j->thread = pthread_self();
+ listAddNodeTail(server.io_processing,j);
+ ln = listLast(server.io_processing); /* We use ln later to remove it */
+ unlockThreadedIO();
+ redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
+ (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+
+ /* Process the Job */
+ if (j->type == REDIS_IOJOB_LOAD) {
+ j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ FILE *fp = fopen("/dev/null","w+");
+ j->pages = rdbSavedObjectPages(j->val,fp);
+ fclose(fp);
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
+ j->canceled = 1;
+ }
+
+ /* Done: insert the job into the processed queue */
+ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
+ (long) pthread_self(), (void*)j, (char*)j->key->ptr);
+ lockThreadedIO();
+ listDelNode(server.io_processing,ln);
+ listAddNodeTail(server.io_processed,j);
+ unlockThreadedIO();
+
+ /* Signal the main thread there is new stuff to process */
+ assert(write(server.io_ready_pipe_write,"x",1) == 1);
+ }
+ return NULL; /* never reached */
+}
+
+static void spawnIOThread(void) {
+ pthread_t thread;
+ sigset_t mask, omask;
+ int err;
+
+ sigemptyset(&mask);
+ sigaddset(&mask,SIGCHLD);
+ sigaddset(&mask,SIGHUP);
+ sigaddset(&mask,SIGPIPE);
+ pthread_sigmask(SIG_SETMASK, &mask, &omask);
+ while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
+ redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
+ strerror(err));
+ usleep(1000000);
+ }
+ pthread_sigmask(SIG_SETMASK, &omask, NULL);
+ server.io_active_threads++;
+}
+
+/* We need to wait for the last thread to exit before we are able to
+ * fork() in order to BGSAVE or BGREWRITEAOF. */
+static void waitEmptyIOJobsQueue(void) {
+ while(1) {
+ int io_processed_len;
+
+ lockThreadedIO();
+ if (listLength(server.io_newjobs) == 0 &&
+ listLength(server.io_processing) == 0 &&
+ server.io_active_threads == 0)
+ {
+ unlockThreadedIO();
+ return;
+ }
+ /* While waiting for empty jobs queue condition we post-process some
+ * finshed job, as I/O threads may be hanging trying to write against
+ * the io_ready_pipe_write FD but there are so much pending jobs that
+ * it's blocking. */
+ io_processed_len = listLength(server.io_processed);
+ unlockThreadedIO();
+ if (io_processed_len) {
+ vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0);
+ usleep(1000); /* 1 millisecond */
+ } else {
+ usleep(10000); /* 10 milliseconds */
+ }
+ }
+}
+
+static void vmReopenSwapFile(void) {
+ /* Note: we don't close the old one as we are in the child process
+ * and don't want to mess at all with the original file object. */
+ server.vm_fp = fopen(server.vm_swap_file,"r+b");
+ if (server.vm_fp == NULL) {
+ redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.",
+ server.vm_swap_file);
+ _exit(1);
+ }
+ server.vm_fd = fileno(server.vm_fp);
+}
+
+/* This function must be called while with threaded IO locked */
+static void queueIOJob(iojob *j) {
+ redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
+ (void*)j, j->type, (char*)j->key->ptr);
+ listAddNodeTail(server.io_newjobs,j);
+ if (server.io_active_threads < server.vm_max_threads)
+ spawnIOThread();
+}
+
+static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+ iojob *j;
+
+ assert(key->storage == REDIS_VM_MEMORY);
+ assert(key->refcount == 1);
+
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_PREPARE_SWAP;
+ j->db = db;
+ j->key = key;
+ j->val = val;
+ incrRefCount(val);
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ key->storage = REDIS_VM_SWAPPING;
+
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ return REDIS_OK;
+}
+
+/* ============ Virtual Memory - Blocking clients on missing keys =========== */
+
+/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
+ * If there is not already a job loading the key, it is craeted.
+ * The key is added to the io_keys list in the client structure, and also
+ * in the hash table mapping swapped keys to waiting clients, that is,
+ * server.io_waited_keys. */
+static int waitForSwappedKey(redisClient *c, robj *key) {
+ struct dictEntry *de;
+ robj *o;
+ list *l;
+
+ /* If the key does not exist or is already in RAM we don't need to
+ * block the client at all. */
+ de = dictFind(c->db->dict,key);
+ if (de == NULL) return 0;
+ o = dictGetEntryKey(de);
+ if (o->storage == REDIS_VM_MEMORY) {
+ return 0;
+ } else if (o->storage == REDIS_VM_SWAPPING) {
+ /* We were swapping the key, undo it! */
+ vmCancelThreadedIOJob(o);
+ return 0;
+ }
+
+ /* OK: the key is either swapped, or being loaded just now. */
+
+ /* Add the key to the list of keys this client is waiting for.
+ * This maps clients to keys they are waiting for. */
+ listAddNodeTail(c->io_keys,key);
+ incrRefCount(key);
+
+ /* Add the client to the swapped keys => clients waiting map. */
+ de = dictFind(c->db->io_keys,key);
+ if (de == NULL) {
+ int retval;
+
+ /* For every key we take a list of clients blocked for it */
+ l = listCreate();
+ retval = dictAdd(c->db->io_keys,key,l);
+ incrRefCount(key);
+ assert(retval == DICT_OK);
+ } else {
+ l = dictGetEntryVal(de);
+ }
+ listAddNodeTail(l,c);
+
+ /* Are we already loading the key from disk? If not create a job */
+ if (o->storage == REDIS_VM_SWAPPED) {
+ iojob *j;
+
+ o->storage = REDIS_VM_LOADING;
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_LOAD;
+ j->db = c->db;
+ j->key = o;
+ j->key->vtype = o->vtype;
+ j->page = o->vm.page;
+ j->val = NULL;
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ }
+ return 1;
+}
+
+/* Preload keys needed for the ZUNION and ZINTER commands. */
+static void zunionInterBlockClientOnSwappedKeys(redisClient *c) {
+ int i, num;
+ num = atoi(c->argv[2]->ptr);
+ for (i = 0; i < num; i++) {
+ waitForSwappedKey(c,c->argv[3+i]);
+ }
+}
+
+/* Is this client attempting to run a command against swapped keys?
+ * If so, block it ASAP, load the keys in background, then resume it.
+ *
+ * The important idea about this function is that it can fail! If keys will
+ * still be swapped when the client is resumed, this key lookups will
+ * just block loading keys from disk. In practical terms this should only
+ * happen with SORT BY command or if there is a bug in this function.
+ *
+ * Return 1 if the client is marked as blocked, 0 if the client can
+ * continue as the keys it is going to access appear to be in memory. */
+static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) {
+ int j, last;
+
+ if (cmd->vm_preload_proc != NULL) {
+ cmd->vm_preload_proc(c);
+ } else {
+ if (cmd->vm_firstkey == 0) return 0;
+ last = cmd->vm_lastkey;
+ if (last < 0) last = c->argc+last;
+ for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep)
+ waitForSwappedKey(c,c->argv[j]);
+ }
+
+ /* If the client was blocked for at least one key, mark it as blocked. */
+ if (listLength(c->io_keys)) {
+ c->flags |= REDIS_IO_WAIT;
+ aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
+ server.vm_blocked_clients++;
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* Remove the 'key' from the list of blocked keys for a given client.
+ *
+ * The function returns 1 when there are no longer blocking keys after
+ * the current one was removed (and the client can be unblocked). */
+static int dontWaitForSwappedKey(redisClient *c, robj *key) {
+ list *l;
+ listNode *ln;
+ listIter li;
+ struct dictEntry *de;
+
+ /* Remove the key from the list of keys this client is waiting for. */
+ listRewind(c->io_keys,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ if (compareStringObjects(ln->value,key) == 0) {
+ listDelNode(c->io_keys,ln);
+ break;
+ }
+ }
+ assert(ln != NULL);
+
+ /* Remove the client form the key => waiting clients map. */
+ de = dictFind(c->db->io_keys,key);
+ assert(de != NULL);
+ l = dictGetEntryVal(de);
+ ln = listSearchKey(l,c);
+ assert(ln != NULL);
+ listDelNode(l,ln);
+ if (listLength(l) == 0)
+ dictDelete(c->db->io_keys,key);
+
+ return listLength(c->io_keys) == 0;
+}
+
+static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
+ struct dictEntry *de;
+ list *l;
+ listNode *ln;
+ int len;
+
+ de = dictFind(db->io_keys,key);
+ if (!de) return;
+
+ l = dictGetEntryVal(de);
+ len = listLength(l);
+ /* Note: we can't use something like while(listLength(l)) as the list
+ * can be freed by the calling function when we remove the last element. */
+ while (len--) {
+ ln = listFirst(l);
+ redisClient *c = ln->value;
+
+ if (dontWaitForSwappedKey(c,key)) {
+ /* Put the client in the list of clients ready to go as we
+ * loaded all the keys about it. */
+ listAddNodeTail(server.io_ready_clients,c);
+ }
+ }
+}
+
+/* =========================== Remote Configuration ========================= */
+
+static void configSetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[3]);
+ if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
+ zfree(server.dbfilename);
+ server.dbfilename = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+ zfree(server.requirepass);
+ server.requirepass = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
+ zfree(server.masterauth);
+ server.masterauth = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
+ server.maxmemory = strtoll(o->ptr, NULL, 10);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR not supported CONFIG parameter %s\r\n",
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+ return;
+ }
+ decrRefCount(o);
+ addReply(c,shared.ok);
+}
+
+static void configGetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[2]);
+ robj *lenobj = createObject(REDIS_STRING,NULL);
+ char *pattern = o->ptr;
+ int matches = 0;
+
+ addReply(c,lenobj);
+ decrRefCount(lenobj);
+
+ if (stringmatch(pattern,"dbfilename",0)) {
+ addReplyBulkCString(c,"dbfilename");
+ addReplyBulkCString(c,server.dbfilename);
+ matches++;
+ }
+ if (stringmatch(pattern,"requirepass",0)) {
+ addReplyBulkCString(c,"requirepass");
+ addReplyBulkCString(c,server.requirepass);
+ matches++;
+ }
+ if (stringmatch(pattern,"masterauth",0)) {
+ addReplyBulkCString(c,"masterauth");
+ addReplyBulkCString(c,server.masterauth);
+ matches++;
+ }
+ if (stringmatch(pattern,"maxmemory",0)) {
+ char buf[128];
+
+ snprintf(buf,128,"%llu\n",server.maxmemory);
+ addReplyBulkCString(c,"maxmemory");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ decrRefCount(o);
+ lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+}
+
+static void configCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"set")) {
+ if (c->argc != 4) goto badarity;
+ configSetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"get")) {
+ if (c->argc != 3) goto badarity;
+ configGetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) {
+ if (c->argc != 2) goto badarity;
+ server.stat_numcommands = 0;
+ server.stat_numconnections = 0;
+ server.stat_expiredkeys = 0;
+ server.stat_starttime = time(NULL);
+ addReply(c,shared.ok);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));