} else {
redisLog(REDIS_NOTICE,"Swap file allocated with success");
}
- server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
+ server.vm_bitmap = zcalloc((server.vm_pages+7)/8);
redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
(long long) (server.vm_pages+7)/8, server.vm_pages);
- memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
/* Initialize threaded I/O (used by Virtual Memory) */
server.io_newjobs = listCreate();
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
pthread_mutex_init(&server.io_mutex,NULL);
- pthread_mutex_init(&server.obj_freelist_mutex,NULL);
pthread_mutex_init(&server.io_swapfile_mutex,NULL);
server.io_active_threads = 0;
if (pipe(pipefds) == -1) {
/* LZF requires a lot of stack */
pthread_attr_init(&server.io_threads_attr);
pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+
+ /* Solaris may report a stacksize of 0, let's set it to 1 otherwise
+ * multiplying it by 2 in the while loop later will not really help ;) */
+ if (!stacksize) stacksize = 1;
+
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
/* Listen for events in the threaded I/O pipe */
* If we can't find enough contiguous empty pages to swap the object on disk
* NULL is returned. */
vmpointer *vmSwapObjectBlocking(robj *val) {
- off_t pages = rdbSavedObjectPages(val,NULL);
+ off_t pages = rdbSavedObjectPages(val);
off_t page;
vmpointer *vp;
double computeObjectSwappability(robj *o) {
/* actual age can be >= minage, but not < minage. As we use wrapping
* 21 bit clocks with minutes resolution for the LRU. */
- time_t minage = abs(server.lruclock - o->lru);
+ time_t minage = estimateObjectIdleTime(o);
long asize = 0, elesize;
robj *ele;
list *l;
z = (o->type == REDIS_ZSET);
d = z ? ((zset*)o->ptr)->dict : o->ptr;
- asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
- if (z) asize += sizeof(zset)-sizeof(dict);
- if (dictSize(d)) {
- de = dictGetRandomKey(d);
- ele = dictGetEntryKey(de);
- elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
- (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
- asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
- if (z) asize += sizeof(zskiplistNode)*dictSize(d);
+ if (!z && o->encoding == REDIS_ENCODING_INTSET) {
+ intset *is = o->ptr;
+ asize = sizeof(*is)+is->encoding*is->length;
+ } else {
+ asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+ if (z) asize += sizeof(zset)-sizeof(dict);
+ if (dictSize(d)) {
+ de = dictGetRandomKey(d);
+ ele = dictGetEntryKey(de);
+ elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+ (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
+ asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
+ if (z) asize += sizeof(zskiplistNode)*dictSize(d);
+ }
}
break;
case REDIS_HASH:
/* Every time a thread finished a Job, it writes a byte into the write side
* of an unix pipe in order to "awake" the main thread, and this function
- * is called. */
+ * is called.
+ *
+ * Note that this is called both by the event loop, when a I/O thread
+ * sends a byte in the notification pipe, and is also directly called from
+ * waitEmptyIOJobsQueue().
+ *
+ * In the latter case we don't want to swap more, so we use the
+ * "privdata" argument setting it to a not NULL value to signal this
+ * condition. */
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
+ if (privdata != NULL) trytoswap = 0; /* check the comments above... */
+
/* For every byte we read in the read side of the pipe, there is one
* I/O job completed to process. */
while((retval = read(fd,buf,1)) == 1) {
vmpointer *vp = (vmpointer*)j->id;
j->val = vmReadObjectFromSwap(j->page,vp->vtype);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
- FILE *fp = fopen("/dev/null","w+");
- j->pages = rdbSavedObjectPages(j->val,fp);
- fclose(fp);
+ j->pages = rdbSavedObjectPages(j->val);
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
io_processed_len = listLength(server.io_processed);
unlockThreadedIO();
if (io_processed_len) {
- vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0);
+ vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+ (void*)0xdeadbeef,0);
usleep(1000); /* 1 millisecond */
} else {
usleep(10000); /* 10 milliseconds */
listIter li;
struct dictEntry *de;
+ /* The key object might be destroyed when deleted from the c->io_keys
+ * list (and the "key" argument is physically the same object as the
+ * object inside the list), so we need to protect it. */
+ incrRefCount(key);
+
/* Remove the key from the list of keys this client is waiting for. */
listRewind(c->io_keys,&li);
while ((ln = listNext(&li)) != NULL) {
if (listLength(l) == 0)
dictDelete(c->db->io_keys,key);
+ decrRefCount(key);
return listLength(c->io_keys) == 0;
}