X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/4ee9488d7ebdbb0c4e3fff6e93ce5523f4534076..f6c0bba8bc1b11fd789b0139dd86c852194ebc4c:/redis.c diff --git a/redis.c b/redis.c index 651d4d45..76784deb 100644 --- a/redis.c +++ b/redis.c @@ -164,11 +164,11 @@ #define REDIS_VM_MAX_NEAR_PAGES 65536 #define REDIS_VM_MAX_RANDOM_JUMP 4096 #define REDIS_VM_MAX_THREADS 32 -/* The following is the number of completed I/O jobs to process when the - * handelr is called. 1 is the minimum, and also the default, as it allows - * to block as little as possible other accessing clients. While Virtual - * Memory I/O operations are performed by threads, this operations must - * be processed by the main thread when completed to take effect. */ +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) +/* The following is the *percentage* of completed I/O jobs to process when the + * handelr is called. While Virtual Memory I/O operations are performed by + * threads, this operations must be processed by the main thread when completed + * in order to take effect. */ #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 /* Client flags */ @@ -382,6 +382,7 @@ struct redisServer { int sort_bypattern; /* Virtual memory configuration */ int vm_enabled; + char *vm_swap_file; off_t vm_page_size; off_t vm_pages; unsigned long long vm_max_memory; @@ -403,6 +404,7 @@ struct redisServer { pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */ pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ /* Our main thread is blocked on the event loop, locking for sockets ready @@ -561,7 +563,9 @@ static void freeIOJob(iojob *j); static void queueIOJob(iojob *j); static int vmWriteObjectOnSwap(robj *o, off_t page); static robj *vmReadObjectFromSwap(off_t page, int type); -static void waitZeroActiveThreads(void); +static void waitEmptyIOJobsQueue(void); +static void vmReopenSwapFile(void); +static int vmFreePage(off_t page); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -867,7 +871,7 @@ static void redisLog(int level, const char *fmt, ...) { now = time(NULL); strftime(buf,64,"%d %b %H:%M:%S",localtime(&now)); - fprintf(fp,"%s %c ",buf,c[level]); + fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]); vfprintf(fp, fmt, ap); fprintf(fp,"\n"); fflush(fp); @@ -1370,6 +1374,7 @@ static void initServerConfig() { server.blockedclients = 0; server.maxmemory = 0; server.vm_enabled = 0; + server.vm_swap_file = zstrdup("/tmp/redis-%p.vm"); server.vm_page_size = 256; /* 256 bytes per page */ server.vm_pages = 1024*1024*100; /* 104 millions of pages */ server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */ @@ -1613,15 +1618,18 @@ static void loadServerConfig(char *filename) { goto loaderr; } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { - server.requirepass = zstrdup(argv[1]); + server.requirepass = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) { - server.pidfile = zstrdup(argv[1]); + server.pidfile = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) { - server.dbfilename = zstrdup(argv[1]); + server.dbfilename = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) { if ((server.vm_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) { + zfree(server.vm_swap_file); + server.vm_swap_file = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { server.vm_max_memory = strtoll(argv[1], NULL, 10); } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { @@ -2417,6 +2425,10 @@ static robj *createObject(int type, void *ptr) { o->ptr = ptr; o->refcount = 1; if (server.vm_enabled) { + /* Note that this code may run in the context of an I/O thread + * and accessing to server.unixtime in theory is an error + * (no locks). But in practice this is safe, and even if we read + * garbage Redis will not fail, as it's just a statistical info */ o->vm.atime = server.unixtime; o->storage = REDIS_VM_MEMORY; } @@ -2486,7 +2498,8 @@ static void incrRefCount(robj *o) { static void decrRefCount(void *obj) { robj *o = obj; - /* Object is swapped out, or in the process of being loaded. */ + /* Object is a key of a swapped out value, or in the process of being + * loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { @@ -2817,8 +2830,6 @@ static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { outlen = sdslen(obj->ptr)-4; if (outlen <= 0) return 0; if ((out = zmalloc(outlen+1)) == NULL) return 0; - printf("Calling LZF with ptr: %p\n", (void*)obj->ptr); - fflush(stdout); comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); if (comprlen == 0) { zfree(out); @@ -2997,6 +3008,12 @@ static int rdbSave(char *filename) { int j; time_t now = time(NULL); + /* Wait for I/O therads to terminate, just in case this is a + * foreground-saving, to avoid seeking the swap file descriptor at the + * same time. */ + if (server.vm_enabled) + waitEmptyIOJobsQueue(); + snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { @@ -3086,9 +3103,10 @@ static int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsavechildpid != -1) return REDIS_ERR; - if (server.vm_enabled) waitZeroActiveThreads(); + if (server.vm_enabled) waitEmptyIOJobsQueue(); if ((childpid = fork()) == 0) { /* Child */ + if (server.vm_enabled) vmReopenSwapFile(); close(server.fd); if (rdbSave(filename) == REDIS_OK) { exit(0); @@ -3759,6 +3777,7 @@ static void shutdownCommand(redisClient *c) { if (server.appendonly) { /* Append only file: fsync() the AOF and exit */ fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); exit(0); } else { /* Snapshotting. Perform a SYNC SAVE and exit */ @@ -3767,6 +3786,7 @@ static void shutdownCommand(redisClient *c) { unlink(server.pidfile); redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + if (server.vm_enabled) unlink(server.vm_swap_file); exit(0); } else { /* Ooops.. error saving! The best we can do is to continue operating. @@ -5399,8 +5419,6 @@ static void sortCommand(redisClient *c) { } dictReleaseIterator(di); } - printf("**************************** %d == %d\n", - j, vectorlen); redisAssert(j == vectorlen); /* Now it's time to load the right scores in the sorting vector */ @@ -5631,6 +5649,7 @@ static sds genRedisInfoString(void) { ); } if (server.vm_enabled) { + lockThreadedIO(); info = sdscatprintf(info, "vm_conf_max_memory:%llu\r\n" "vm_conf_page_size:%llu\r\n" @@ -5657,6 +5676,7 @@ static sds genRedisInfoString(void) { (unsigned long) listLength(server.io_clients), (unsigned long) server.io_active_threads ); + unlockThreadedIO(); } for (j = 0; j < server.dbnum; j++) { long long keys, vkeys; @@ -6908,12 +6928,13 @@ static int rewriteAppendOnlyFileBackground(void) { pid_t childpid; if (server.bgrewritechildpid != -1) return REDIS_ERR; - if (server.vm_enabled) waitZeroActiveThreads(); + if (server.vm_enabled) waitEmptyIOJobsQueue(); if ((childpid = fork()) == 0) { /* Child */ char tmpfile[256]; - close(server.fd); + if (server.vm_enabled) vmReopenSwapFile(); + close(server.fd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { exit(0); @@ -6983,13 +7004,40 @@ static void aofRemoveTempFile(pid_t childpid) { */ /* =================== Virtual Memory - Blocking Side ====================== */ + +/* substitute the first occurrence of '%p' with the process pid in the + * swap file name. */ +static void expandVmSwapFilename(void) { + char *p = strstr(server.vm_swap_file,"%p"); + sds new; + + if (!p) return; + new = sdsempty(); + *p = '\0'; + new = sdscat(new,server.vm_swap_file); + new = sdscatprintf(new,"%ld",(long) getpid()); + new = sdscat(new,p+2); + zfree(server.vm_swap_file); + server.vm_swap_file = new; +} + static void vmInit(void) { off_t totsize; int pipefds[2]; + size_t stacksize; + + if (server.vm_max_threads != 0) + zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - server.vm_fp = fopen("/tmp/redisvm","w+b"); + expandVmSwapFilename(); + redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); + if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { + server.vm_fp = fopen(server.vm_swap_file,"w+b"); + } if (server.vm_fp == NULL) { - redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting."); + redisLog(REDIS_WARNING, + "Impossible to open the swap file: %s. Exiting.", + strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); @@ -7012,9 +7060,6 @@ static void vmInit(void) { redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages", (long long) (server.vm_pages+7)/8, server.vm_pages); memset(server.vm_bitmap,0,(server.vm_pages+7)/8); - /* Try to remove the swap file, so the OS will really delete it from the - * file system when Redis exists. */ - unlink("/tmp/redisvm"); /* Initialize threaded I/O (used by Virtual Memory) */ server.io_newjobs = listCreate(); @@ -7033,6 +7078,11 @@ static void vmInit(void) { server.io_ready_pipe_read = pipefds[0]; server.io_ready_pipe_write = pipefds[1]; redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR); + /* LZF requires a lot of stack */ + pthread_attr_init(&server.io_threads_attr); + pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); + while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; + pthread_attr_setstacksize(&server.io_threads_attr, stacksize); /* Listen for events in the threaded I/O pipe */ if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, vmThreadedIOCompletedJob, NULL) == AE_ERR) @@ -7043,6 +7093,7 @@ static void vmInit(void) { static void vmMarkPageUsed(off_t page) { off_t byte = page/8; int bit = page&7; + redisAssert(vmFreePage(page) == 1); server.vm_bitmap[byte] |= 1< 100000000) { + *((char*)-1) = 'x'; + } } /* Test if the page is free */ @@ -7112,7 +7169,6 @@ static int vmFindContiguousPages(off_t *first, off_t n) { while(offset < server.vm_pages) { off_t this = base+offset; - redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X'); /* If we overflow, restart from page zero */ if (this >= server.vm_pages) { this -= server.vm_pages; @@ -7122,6 +7178,7 @@ static int vmFindContiguousPages(off_t *first, off_t n) { numfree = 0; } } + redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X'); if (vmFreePage(this)) { /* This is a free page */ numfree++; @@ -7349,7 +7406,14 @@ static int vmSwapOneObject(int usethreads) { de = dictGetRandomKey(db->dict); key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (key->storage != REDIS_VM_MEMORY) { + /* Only swap objects that are currently in memory. + * + * Also don't swap shared objects if threaded VM is on, as we + * try to ensure that the main thread does not touch the + * object while the I/O thread is using it, but we can't + * control other keys without adding additional mutex. */ + if (key->storage != REDIS_VM_MEMORY || + (server.vm_max_threads != 0 && val->refcount != 1)) { if (maxtries) i--; /* don't count this try */ continue; } @@ -7438,6 +7502,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, char buf[1]; int retval; int processed = 0; + int toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); @@ -7451,10 +7516,14 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); - assert(listLength(server.io_processed) != 0); /* Get the processed element (the oldest one) */ lockThreadedIO(); + assert(listLength(server.io_processed) != 0); + if (toprocess == -1) { + toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; + if (toprocess <= 0) toprocess = 1; + } ln = listFirst(server.io_processed); j = ln->value; listDelNode(server.io_processed,ln); @@ -7484,9 +7553,13 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again * rebranded as REDIS_IOJOB_DO_SWAP. */ - if (vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) { - /* Ooops... no space! */ + if (!vmCanSwapOut() || + vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) + { + /* Ooops... no space or we can't swap as there is + * a fork()ed Redis trying to save stuff on disk. */ freeIOJob(j); + key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed @@ -7526,7 +7599,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, freeIOJob(j); /* Put a few more swap requests in queue if we are still * out of memory */ - if (zmalloc_used_memory() > server.vm_max_memory) { + if (vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory){ int more = 1; while(more) { lockThreadedIO(); @@ -7539,7 +7612,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } } processed++; - if (processed == REDIS_MAX_COMPLETED_JOBS_PROCESSED) return; + if (processed == toprocess) return; } if (retval < 0 && errno != EAGAIN) { redisLog(REDIS_WARNING, @@ -7567,6 +7640,7 @@ static void vmCancelThreadedIOJob(robj *o) { int i; assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); +again: lockThreadedIO(); /* Search for a matching key in one of the queues */ for (i = 0; i < 3; i++) { @@ -7579,8 +7653,14 @@ static void vmCancelThreadedIOJob(robj *o) { if (job->canceled) continue; /* Skip this, already canceled. */ if (compareStringObjects(job->key,o) == 0) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n", - (void*)job, (char*)o->ptr); + redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)o->ptr, job->type, i); + /* Mark the pages as free since the swap didn't happened + * or happened but is now discarded. */ + if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) + vmMarkPagesFree(job->page,job->pages); + /* Cancel the job. It depends on the list the job is + * living in. */ switch(i) { case 0: /* io_newjobs */ /* If the job was yet not processed the best thing to do @@ -7589,14 +7669,30 @@ static void vmCancelThreadedIOJob(robj *o) { listDelNode(lists[i],ln); break; case 1: /* io_processing */ + /* Oh Shi- the thread is messing with the Job, and + * probably with the object if this is a + * PREPARE_SWAP or DO_SWAP job. Better to wait for the + * job to move into the next queue... */ + if (job->type != REDIS_IOJOB_LOAD) { + /* Yes, we try again and again until the job + * is completed. */ + unlockThreadedIO(); + /* But let's wait some time for the I/O thread + * to finish with this job. After all this condition + * should be very rare. */ + usleep(1); + goto again; + } else { + job->canceled = 1; + break; + } case 2: /* io_processed */ + /* The job was already processed, that's easy... + * just mark it as canceled so that we'll ignore it + * when processing completed jobs. */ job->canceled = 1; break; } - /* Mark the pages as free since the swap didn't happened - * or happened but is not discarded. */ - if (job->type == REDIS_IOJOB_DO_SWAP) - vmMarkPagesFree(job->page,job->pages); /* Finally we have to adjust the storage type of the object * in order to "UNDO" the operaiton. */ if (o->storage == REDIS_VM_LOADING) @@ -7668,24 +7764,50 @@ static void *IOThreadEntryPoint(void *arg) { static void spawnIOThread(void) { pthread_t thread; - pthread_create(&thread,NULL,IOThreadEntryPoint,NULL); + pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL); server.io_active_threads++; } /* We need to wait for the last thread to exit before we are able to * fork() in order to BGSAVE or BGREWRITEAOF. */ -static void waitZeroActiveThreads(void) { +static void waitEmptyIOJobsQueue(void) { while(1) { + int io_processed_len; + lockThreadedIO(); - if (server.io_active_threads == 0) { + if (listLength(server.io_newjobs) == 0 && + listLength(server.io_processing) == 0 && + server.io_active_threads == 0) + { unlockThreadedIO(); return; } + /* While waiting for empty jobs queue condition we post-process some + * finshed job, as I/O threads may be hanging trying to write against + * the io_ready_pipe_write FD but there are so much pending jobs that + * it's blocking. */ + io_processed_len = listLength(server.io_processed); unlockThreadedIO(); - usleep(10000); /* 10 milliseconds */ + if (io_processed_len) { + vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0); + usleep(1000); /* 1 millisecond */ + } else { + usleep(10000); /* 10 milliseconds */ + } } } +static void vmReopenSwapFile(void) { + fclose(server.vm_fp); + server.vm_fp = fopen(server.vm_swap_file,"r+b"); + if (server.vm_fp == NULL) { + redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.", + server.vm_swap_file); + exit(1); + } + server.vm_fd = fileno(server.vm_fp); +} + /* This function must be called while with threaded IO locked */ static void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", @@ -7758,7 +7880,7 @@ static void debugCommand(redisClient *c) { "+Key at:%p refcount:%d, value at:%p refcount:%d " "encoding:%d serializedlength:%lld\r\n", (void*)key, key->refcount, (void*)val, val->refcount, - val->encoding, rdbSavedObjectLen(val,NULL))); + val->encoding, (long long) rdbSavedObjectLen(val,NULL))); } else { addReplySds(c,sdscatprintf(sdsempty(), "+Key at:%p refcount:%d, value swapped at: page %llu "