#include "lzf.h" /* LZF compression library */
#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
+/* #define REDIS_HELGRIND_FRIENDLY */
+#if defined(__GNUC__) && defined(REDIS_HELGRIND_FRIENDLY)
+#warning "Remember to undef REDIS_HELGRIND_FRIENDLY before to commit"
+#endif
+
/* Error codes */
#define REDIS_OK 0
#define REDIS_ERR -1
o->ptr = ptr;
o->refcount = 1;
if (server.vm_enabled) {
+ /* Note that this code may run in the context of an I/O thread
+ * and accessing to server.unixtime in theory is an error
+ * (no locks). But in practice this is safe, and even if we read
+ * garbage Redis will not fail, as it's just a statistical info */
o->vm.atime = server.unixtime;
o->storage = REDIS_VM_MEMORY;
}
);
}
if (server.vm_enabled) {
+ lockThreadedIO();
info = sdscatprintf(info,
"vm_conf_max_memory:%llu\r\n"
"vm_conf_page_size:%llu\r\n"
(unsigned long) listLength(server.io_clients),
(unsigned long) server.io_active_threads
);
+ unlockThreadedIO();
}
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
de = dictGetRandomKey(db->dict);
key = dictGetEntryKey(de);
val = dictGetEntryVal(de);
- if (key->storage != REDIS_VM_MEMORY) {
+ /* Only swap objects that are currently in memory.
+ *
+ * Also don't swap shared objects if threaded VM is on, as we
+ * try to ensure that the main thread does not touch the
+ * object while the I/O thread is using it, but we can't
+ * control other keys without adding additional mutex. */
+ if (key->storage != REDIS_VM_MEMORY ||
+ (server.vm_max_threads != 0 && val->refcount != 1)) {
if (maxtries) i--; /* don't count this try */
continue;
}
struct dictEntry *de;
redisLog(REDIS_DEBUG,"Processing I/O completed job");
- assert(listLength(server.io_processed) != 0);
/* Get the processed element (the oldest one) */
lockThreadedIO();
+ assert(listLength(server.io_processed) != 0);
ln = listFirst(server.io_processed);
j = ln->value;
listDelNode(server.io_processed,ln);
int i;
assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
lockThreadedIO();
/* Search for a matching key in one of the queues */
for (i = 0; i < 3; i++) {
if (job->canceled) continue; /* Skip this, already canceled. */
if (compareStringObjects(job->key,o) == 0) {
- redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
- (void*)job, (char*)o->ptr);
+ redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n",
+ (void*)job, (char*)o->ptr, i);
/* Mark the pages as free since the swap didn't happened
* or happened but is now discarded. */
if (job->type == REDIS_IOJOB_DO_SWAP)
listDelNode(lists[i],ln);
break;
case 1: /* io_processing */
+ /* Oh Shi- the thread is messing with the Job, and
+ * probably with the object if this is a
+ * PREPARE_SWAP or DO_SWAP job. Better to wait for the
+ * job to move into the next queue... */
+ if (job->type != REDIS_IOJOB_LOAD) {
+ /* Yes, we try again and again until the job
+ * is completed. */
+ unlockThreadedIO();
+ /* But let's wait some time for the I/O thread
+ * to finish with this job. After all this condition
+ * should be very rare. */
+ usleep(1);
+ goto again;
+ } else {
+ job->canceled = 1;
+ break;
+ }
case 2: /* io_processed */
+ /* The job was already processed, that's easy...
+ * just mark it as canceled so that we'll ignore it
+ * when processing completed jobs. */
job->canceled = 1;
break;
}
/* Get a new job to process */
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
+#ifdef REDIS_HELGRIND_FRIENDLY
+ /* No new jobs? Wait and retry, because to be Helgrind
+ * (valgrind --tool=helgrind) what's needed is to take
+ * the same threads running instead to create/destroy threads
+ * as needed (otherwise valgrind will fail) */
+ unlockThreadedIO();
+ usleep(1); /* Give some time for the I/O thread to work. */
+ continue;
+#endif
/* No new jobs in queue, exit. */
redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
(long long) pthread_self());