8 /* dscache.c - Disk store cache for disk store backend. 
  10  * When Redis is configured for using disk as backend instead of memory, the 
  11  * memory is used as a cache, so that recently accessed keys are taken in 
  12  * memory for fast read and write operations. 
  14  * Modified keys are marked to be flushed on disk, and will be flushed 
  15  * as long as the maxium configured flush time elapsed. 
  17  * This file implements the whole caching subsystem and contains further 
  22  * - The WATCH helper will be used to signal the cache system 
  23  *   we need to flush a given key/dbid into disk, adding this key/dbid 
  24  *   pair into a server.ds_cache_dirty linked list AND hash table (so that we 
  25  *   don't add the same thing multiple times). 
  27  * - cron() checks if there are elements on this list. When there are things 
  28  *   to flush, we create an IO Job for the I/O thread. 
  29  *   NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects 
  30  *   that are referenced an IO job for flushing on disk are marked as 
  31  *   o->storage == REDIS_DS_SAVING. 
  33  * - This is what we do on key lookup: 
  34  *   1) The key already exists in memory. object->storage == REDIS_DS_MEMORY 
  35  *      or it is object->storage == REDIS_DS_DIRTY: 
  36  *      We don't do nothing special, lookup, return value object pointer. 
  37  *   2) The key is in memory but object->storage == REDIS_DS_SAVING. 
  38  *      When this happens we block waiting for the I/O thread to process 
  39  *      this object. Then continue. 
  40  *   3) The key is not in memory. We block to load the key from disk. 
  41  *      Of course the key may not be present at all on the disk store as well, 
  42  *      in such case we just detect this condition and continue, returning 
  45  * - Preloading of needed keys: 
  46  *   1) As it was done with VM, also with this new system we try preloading 
  47  *      keys a client is going to use. We block the client, load keys 
  48  *      using the I/O thread, unblock the client. Same code as VM more or less. 
  50  * - Reclaiming memory. 
  51  *   In cron() we detect our memory limit was reached. What we 
  52  *   do is deleting keys that are REDIS_DS_MEMORY, using LRU. 
  54  *   If this is not enough to return again under the memory limits we also 
  55  *   start to flush keys that need to be synched on disk synchronously, 
  56  *   removing it from the memory. We do this blocking as memory limit is a 
  57  *   much "harder" barrirer in the new design. 
  59  * - IO thread operations are no longer stopped for sync loading/saving of 
  60  *   things. When a key is found to be in the process of being saved 
  61  *   we simply wait for the IO thread to end its work. 
  63  *   Otherwise if there is to load a key without any IO thread operation 
  64  *   just started it is blocking-loaded in the lookup function. 
  66  * - What happens when an object is destroyed? 
  68  *   If o->storage == REDIS_DS_MEMORY then we simply destory the object. 
  69  *   If o->storage == REDIS_DS_DIRTY we can still remove the object. It had 
  70  *                    changes not flushed on disk, but is being removed so 
  72  *   if o->storage == REDIS_DS_SAVING then the object is being saved so 
  73  *                    it is impossible that its refcount == 1, must be at 
  74  *                    least two. When the object is saved the storage will 
  75  *                    be set back to DS_MEMORY. 
  77  * - What happens when keys are deleted? 
  79  *   We simply schedule a key flush operation as usually, but when the 
  80  *   IO thread will be created the object pointer will be set to NULL 
  81  *   so the IO thread will know that the work to do is to delete the key 
  82  *   from the disk store. 
  84  * - What happens with MULTI/EXEC? 
  89 /* Virtual Memory is composed mainly of two subsystems: 
  90  * - Blocking Virutal Memory 
  91  * - Threaded Virtual Memory I/O 
  92  * The two parts are not fully decoupled, but functions are split among two 
  93  * different sections of the source code (delimited by comments) in order to 
  94  * make more clear what functionality is about the blocking VM and what about 
  95  * the threaded (not blocking) VM. 
  99  * Redis VM is a blocking VM (one that blocks reading swapped values from 
 100  * disk into memory when a value swapped out is needed in memory) that is made 
 101  * unblocking by trying to examine the command argument vector in order to 
 102  * load in background values that will likely be needed in order to exec 
 103  * the command. The command is executed only once all the relevant keys 
 104  * are loaded into memory. 
 106  * This basically is almost as simple of a blocking VM, but almost as parallel 
 107  * as a fully non-blocking VM. 
 110 /* =================== Virtual Memory - Blocking Side  ====================== */ 
 118     zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ 
 120     redisLog(REDIS_NOTICE
,"Initializing Disk Store at %s", server
.ds_path
); 
 121     /* Open Disk Store */ 
 122     if (dsOpen() != REDIS_OK
) { 
 123         redisLog(REDIS_WARNING
,"Fatal error opening disk store. Exiting."); 
 127     /* Initialize threaded I/O for Object Cache */ 
 128     server
.io_newjobs 
= listCreate(); 
 129     server
.io_processing 
= listCreate(); 
 130     server
.io_processed 
= listCreate(); 
 131     server
.io_ready_clients 
= listCreate(); 
 132     pthread_mutex_init(&server
.io_mutex
,NULL
); 
 133     server
.io_active_threads 
= 0; 
 134     if (pipe(pipefds
) == -1) { 
 135         redisLog(REDIS_WARNING
,"Unable to intialized DS: pipe(2): %s. Exiting." 
 139     server
.io_ready_pipe_read 
= pipefds
[0]; 
 140     server
.io_ready_pipe_write 
= pipefds
[1]; 
 141     redisAssert(anetNonBlock(NULL
,server
.io_ready_pipe_read
) != ANET_ERR
); 
 142     /* LZF requires a lot of stack */ 
 143     pthread_attr_init(&server
.io_threads_attr
); 
 144     pthread_attr_getstacksize(&server
.io_threads_attr
, &stacksize
); 
 146     /* Solaris may report a stacksize of 0, let's set it to 1 otherwise 
 147      * multiplying it by 2 in the while loop later will not really help ;) */ 
 148     if (!stacksize
) stacksize 
= 1; 
 150     while (stacksize 
< REDIS_THREAD_STACK_SIZE
) stacksize 
*= 2; 
 151     pthread_attr_setstacksize(&server
.io_threads_attr
, stacksize
); 
 152     /* Listen for events in the threaded I/O pipe */ 
 153     if (aeCreateFileEvent(server
.el
, server
.io_ready_pipe_read
, AE_READABLE
, 
 154         vmThreadedIOCompletedJob
, NULL
) == AE_ERR
) 
 155         oom("creating file event"); 
 157     /* Spawn our I/O thread */ 
 161 /* Compute how good candidate the specified object is for eviction. 
 162  * An higher number means a better candidate. */ 
 163 double computeObjectSwappability(robj 
*o
) { 
 164     /* actual age can be >= minage, but not < minage. As we use wrapping 
 165      * 21 bit clocks with minutes resolution for the LRU. */ 
 166     return (double) estimateObjectIdleTime(o
); 
 169 /* Try to free one entry from the diskstore object cache */ 
 170 int cacheFreeOneEntry(void) { 
 172     struct dictEntry 
*best 
= NULL
; 
 173     double best_swappability 
= 0; 
 174     redisDb 
*best_db 
= NULL
; 
 178     for (j 
= 0; j 
< server
.dbnum
; j
++) { 
 179         redisDb 
*db 
= server
.db
+j
; 
 180         /* Why maxtries is set to 100? 
 181          * Because this way (usually) we'll find 1 object even if just 1% - 2% 
 182          * are swappable objects */ 
 185         if (dictSize(db
->dict
) == 0) continue; 
 186         for (i 
= 0; i 
< 5; i
++) { 
 190             if (maxtries
) maxtries
--; 
 191             de 
= dictGetRandomKey(db
->dict
); 
 192             val 
= dictGetEntryVal(de
); 
 193             /* Only swap objects that are currently in memory. 
 195              * Also don't swap shared objects: not a good idea in general and 
 196              * we need to ensure that the main thread does not touch the 
 197              * object while the I/O thread is using it, but we can't 
 198              * control other keys without adding additional mutex. */ 
 199             if (val
->storage 
!= REDIS_DS_MEMORY
) { 
 200                 if (maxtries
) i
--; /* don't count this try */ 
 203             swappability 
= computeObjectSwappability(val
); 
 204             if (!best 
|| swappability 
> best_swappability
) { 
 206                 best_swappability 
= swappability
; 
 212         /* FIXME: If there are objects marked as DS_DIRTY or DS_SAVING 
 213          * let's wait for this objects to be clear and retry... 
 215          * Object cache vm limit is considered an hard limit. */ 
 218     key 
= dictGetEntryKey(best
); 
 219     val 
= dictGetEntryVal(best
); 
 221     redisLog(REDIS_DEBUG
,"Key selected for cache eviction: %s swappability:%f", 
 222         key
, best_swappability
); 
 224     /* Delete this key from memory */ 
 226         robj 
*kobj 
= createStringObject(key
,sdslen(key
)); 
 227         dbDelete(best_db
,kobj
); 
 232 /* Return true if it's safe to swap out objects in a given moment. 
 233  * Basically we don't want to swap objects out while there is a BGSAVE 
 234  * or a BGAEOREWRITE running in backgroud. */ 
 235 int dsCanTouchDiskStore(void) { 
 236     return (server
.bgsavechildpid 
== -1 && server
.bgrewritechildpid 
== -1); 
 239 /* =================== Virtual Memory - Threaded I/O  ======================= */ 
 241 void freeIOJob(iojob 
*j
) { 
 242     if ((j
->type 
== REDIS_IOJOB_PREPARE_SWAP 
|| 
 243         j
->type 
== REDIS_IOJOB_DO_SWAP 
|| 
 244         j
->type 
== REDIS_IOJOB_LOAD
) && j
->val 
!= NULL
) 
 246          /* we fix the storage type, otherwise decrRefCount() will try to 
 247           * kill the I/O thread Job (that does no longer exists). */ 
 248         if (j
->val
->storage 
== REDIS_VM_SWAPPING
) 
 249             j
->val
->storage 
= REDIS_VM_MEMORY
; 
 250         decrRefCount(j
->val
); 
 252     decrRefCount(j
->key
); 
 256 /* Every time a thread finished a Job, it writes a byte into the write side 
 257  * of an unix pipe in order to "awake" the main thread, and this function 
 260  * Note that this is called both by the event loop, when a I/O thread 
 261  * sends a byte in the notification pipe, and is also directly called from 
 262  * waitEmptyIOJobsQueue(). 
 264  * In the latter case we don't want to swap more, so we use the 
 265  * "privdata" argument setting it to a not NULL value to signal this 
 267 void vmThreadedIOCompletedJob(aeEventLoop 
*el
, int fd
, void *privdata
, 
 271     int retval
, processed 
= 0, toprocess 
= -1, trytoswap 
= 1; 
 274     REDIS_NOTUSED(privdata
); 
 276     if (privdata 
!= NULL
) trytoswap 
= 0; /* check the comments above... */ 
 278     /* For every byte we read in the read side of the pipe, there is one 
 279      * I/O job completed to process. */ 
 280     while((retval 
= read(fd
,buf
,1)) == 1) { 
 283         struct dictEntry 
*de
; 
 285         redisLog(REDIS_DEBUG
,"Processing I/O completed job"); 
 287         /* Get the processed element (the oldest one) */ 
 289         redisAssert(listLength(server
.io_processed
) != 0); 
 290         if (toprocess 
== -1) { 
 291             toprocess 
= (listLength(server
.io_processed
)*REDIS_MAX_COMPLETED_JOBS_PROCESSED
)/100; 
 292             if (toprocess 
<= 0) toprocess 
= 1; 
 294         ln 
= listFirst(server
.io_processed
); 
 296         listDelNode(server
.io_processed
,ln
); 
 298         /* If this job is marked as canceled, just ignore it */ 
 303         /* Post process it in the main thread, as there are things we 
 304          * can do just here to avoid race conditions and/or invasive locks */ 
 305         redisLog(REDIS_DEBUG
,"COMPLETED Job type: %d, ID %p, key: %s", j
->type
, (void*)j
->id
, (unsigned char*)j
->key
->ptr
); 
 306         de 
= dictFind(j
->db
->dict
,j
->key
->ptr
); 
 307         redisAssert(de 
!= NULL
); 
 308         if (j
->type 
== REDIS_IOJOB_LOAD
) { 
 310             vmpointer 
*vp 
= dictGetEntryVal(de
); 
 312             /* Key loaded, bring it at home */ 
 313             vmMarkPagesFree(vp
->page
,vp
->usedpages
); 
 314             redisLog(REDIS_DEBUG
, "VM: object %s loaded from disk (threaded)", 
 315                 (unsigned char*) j
->key
->ptr
); 
 316             server
.vm_stats_swapped_objects
--; 
 317             server
.vm_stats_swapins
++; 
 318             dictGetEntryVal(de
) = j
->val
; 
 319             incrRefCount(j
->val
); 
 321             /* Handle clients waiting for this key to be loaded. */ 
 322             handleClientsBlockedOnSwappedKey(db
,j
->key
); 
 325         } else if (j
->type 
== REDIS_IOJOB_PREPARE_SWAP
) { 
 326             /* Now we know the amount of pages required to swap this object. 
 327              * Let's find some space for it, and queue this task again 
 328              * rebranded as REDIS_IOJOB_DO_SWAP. */ 
 329             if (!vmCanSwapOut() || 
 330                 vmFindContiguousPages(&j
->page
,j
->pages
) == REDIS_ERR
) 
 332                 /* Ooops... no space or we can't swap as there is 
 333                  * a fork()ed Redis trying to save stuff on disk. */ 
 334                 j
->val
->storage 
= REDIS_VM_MEMORY
; /* undo operation */ 
 337                 /* Note that we need to mark this pages as used now, 
 338                  * if the job will be canceled, we'll mark them as freed 
 340                 vmMarkPagesUsed(j
->page
,j
->pages
); 
 341                 j
->type 
= REDIS_IOJOB_DO_SWAP
; 
 346         } else if (j
->type 
== REDIS_IOJOB_DO_SWAP
) { 
 349             /* Key swapped. We can finally free some memory. */ 
 350             if (j
->val
->storage 
!= REDIS_VM_SWAPPING
) { 
 351                 vmpointer 
*vp 
= (vmpointer
*) j
->id
; 
 352                 printf("storage: %d\n",vp
->storage
); 
 353                 printf("key->name: %s\n",(char*)j
->key
->ptr
); 
 354                 printf("val: %p\n",(void*)j
->val
); 
 355                 printf("val->type: %d\n",j
->val
->type
); 
 356                 printf("val->ptr: %s\n",(char*)j
->val
->ptr
); 
 358             redisAssert(j
->val
->storage 
== REDIS_VM_SWAPPING
); 
 359             vp 
= createVmPointer(j
->val
->type
); 
 361             vp
->usedpages 
= j
->pages
; 
 362             dictGetEntryVal(de
) = vp
; 
 363             /* Fix the storage otherwise decrRefCount will attempt to 
 364              * remove the associated I/O job */ 
 365             j
->val
->storage 
= REDIS_VM_MEMORY
; 
 366             decrRefCount(j
->val
); 
 367             redisLog(REDIS_DEBUG
, 
 368                 "VM: object %s swapped out at %lld (%lld pages) (threaded)", 
 369                 (unsigned char*) j
->key
->ptr
, 
 370                 (unsigned long long) j
->page
, (unsigned long long) j
->pages
); 
 371             server
.vm_stats_swapped_objects
++; 
 372             server
.vm_stats_swapouts
++; 
 374             /* Put a few more swap requests in queue if we are still 
 376             if (trytoswap 
&& vmCanSwapOut() && 
 377                 zmalloc_used_memory() > server
.vm_max_memory
) 
 382                     more 
= listLength(server
.io_newjobs
) < 
 383                             (unsigned) server
.vm_max_threads
; 
 385                     /* Don't waste CPU time if swappable objects are rare. */ 
 386                     if (vmSwapOneObjectThreaded() == REDIS_ERR
) { 
 394         if (processed 
== toprocess
) return; 
 396     if (retval 
< 0 && errno 
!= EAGAIN
) { 
 397         redisLog(REDIS_WARNING
, 
 398             "WARNING: read(2) error in vmThreadedIOCompletedJob() %s", 
 403 void lockThreadedIO(void) { 
 404     pthread_mutex_lock(&server
.io_mutex
); 
 407 void unlockThreadedIO(void) { 
 408     pthread_mutex_unlock(&server
.io_mutex
); 
 411 void *IOThreadEntryPoint(void *arg
) { 
 416     pthread_detach(pthread_self()); 
 418         /* Get a new job to process */ 
 420         if (listLength(server
.io_newjobs
) == 0) { 
 421             /* No new jobs in queue, exit. */ 
 422             redisLog(REDIS_DEBUG
,"Thread %ld exiting, nothing to do", 
 423                 (long) pthread_self()); 
 424             server
.io_active_threads
--; 
 428         ln 
= listFirst(server
.io_newjobs
); 
 430         listDelNode(server
.io_newjobs
,ln
); 
 431         /* Add the job in the processing queue */ 
 432         j
->thread 
= pthread_self(); 
 433         listAddNodeTail(server
.io_processing
,j
); 
 434         ln 
= listLast(server
.io_processing
); /* We use ln later to remove it */ 
 436         redisLog(REDIS_DEBUG
,"Thread %ld got a new job (type %d): %p about key '%s'", 
 437             (long) pthread_self(), j
->type
, (void*)j
, (char*)j
->key
->ptr
); 
 439         /* Process the Job */ 
 440         if (j
->type 
== REDIS_IOJOB_LOAD
) { 
 441             vmpointer 
*vp 
= (vmpointer
*)j
->id
; 
 442             j
->val 
= vmReadObjectFromSwap(j
->page
,vp
->vtype
); 
 443         } else if (j
->type 
== REDIS_IOJOB_PREPARE_SWAP
) { 
 444             j
->pages 
= rdbSavedObjectPages(j
->val
); 
 445         } else if (j
->type 
== REDIS_IOJOB_DO_SWAP
) { 
 446             if (vmWriteObjectOnSwap(j
->val
,j
->page
) == REDIS_ERR
) 
 450         /* Done: insert the job into the processed queue */ 
 451         redisLog(REDIS_DEBUG
,"Thread %ld completed the job: %p (key %s)", 
 452             (long) pthread_self(), (void*)j
, (char*)j
->key
->ptr
); 
 454         listDelNode(server
.io_processing
,ln
); 
 455         listAddNodeTail(server
.io_processed
,j
); 
 458         /* Signal the main thread there is new stuff to process */ 
 459         redisAssert(write(server
.io_ready_pipe_write
,"x",1) == 1); 
 461     return NULL
; /* never reached */ 
 464 void spawnIOThread(void) { 
 466     sigset_t mask
, omask
; 
 470     sigaddset(&mask
,SIGCHLD
); 
 471     sigaddset(&mask
,SIGHUP
); 
 472     sigaddset(&mask
,SIGPIPE
); 
 473     pthread_sigmask(SIG_SETMASK
, &mask
, &omask
); 
 474     while ((err 
= pthread_create(&thread
,&server
.io_threads_attr
,IOThreadEntryPoint
,NULL
)) != 0) { 
 475         redisLog(REDIS_WARNING
,"Unable to spawn an I/O thread: %s", 
 479     pthread_sigmask(SIG_SETMASK
, &omask
, NULL
); 
 480     server
.io_active_threads
++; 
 483 /* We need to wait for the last thread to exit before we are able to 
 484  * fork() in order to BGSAVE or BGREWRITEAOF. */ 
 485 void waitEmptyIOJobsQueue(void) { 
 487         int io_processed_len
; 
 490         if (listLength(server
.io_newjobs
) == 0 && 
 491             listLength(server
.io_processing
) == 0 && 
 492             server
.io_active_threads 
== 0) 
 497         /* While waiting for empty jobs queue condition we post-process some 
 498          * finshed job, as I/O threads may be hanging trying to write against 
 499          * the io_ready_pipe_write FD but there are so much pending jobs that 
 501         io_processed_len 
= listLength(server
.io_processed
); 
 503         if (io_processed_len
) { 
 504             vmThreadedIOCompletedJob(NULL
,server
.io_ready_pipe_read
, 
 505                                                         (void*)0xdeadbeef,0); 
 506             usleep(1000); /* 1 millisecond */ 
 508             usleep(10000); /* 10 milliseconds */ 
 513 /* This function must be called while with threaded IO locked */ 
 514 void queueIOJob(iojob 
*j
) { 
 515     redisLog(REDIS_DEBUG
,"Queued IO Job %p type %d about key '%s'\n", 
 516         (void*)j
, j
->type
, (char*)j
->key
->ptr
); 
 517     listAddNodeTail(server
.io_newjobs
,j
); 
 518     if (server
.io_active_threads 
< server
.vm_max_threads
) 
 522 int vmSwapObjectThreaded(robj 
*key
, robj 
*val
, redisDb 
*db
) { 
 525     j 
= zmalloc(sizeof(*j
)); 
 526     j
->type 
= REDIS_IOJOB_PREPARE_SWAP
; 
 530     j
->id 
= j
->val 
= val
; 
 533     j
->thread 
= (pthread_t
) -1; 
 534     val
->storage 
= REDIS_VM_SWAPPING
; 
 542 /* ============ Virtual Memory - Blocking clients on missing keys =========== */ 
 544 /* This function makes the clinet 'c' waiting for the key 'key' to be loaded. 
 545  * If there is not already a job loading the key, it is craeted. 
 546  * The key is added to the io_keys list in the client structure, and also 
 547  * in the hash table mapping swapped keys to waiting clients, that is, 
 548  * server.io_waited_keys. */ 
 549 int waitForSwappedKey(redisClient 
*c
, robj 
*key
) { 
 550     struct dictEntry 
*de
; 
 554     /* If the key does not exist or is already in RAM we don't need to 
 555      * block the client at all. */ 
 556     de 
= dictFind(c
->db
->dict
,key
->ptr
); 
 557     if (de 
== NULL
) return 0; 
 558     o 
= dictGetEntryVal(de
); 
 559     if (o
->storage 
== REDIS_VM_MEMORY
) { 
 561     } else if (o
->storage 
== REDIS_VM_SWAPPING
) { 
 562         /* We were swapping the key, undo it! */ 
 563         vmCancelThreadedIOJob(o
); 
 567     /* OK: the key is either swapped, or being loaded just now. */ 
 569     /* Add the key to the list of keys this client is waiting for. 
 570      * This maps clients to keys they are waiting for. */ 
 571     listAddNodeTail(c
->io_keys
,key
); 
 574     /* Add the client to the swapped keys => clients waiting map. */ 
 575     de 
= dictFind(c
->db
->io_keys
,key
); 
 579         /* For every key we take a list of clients blocked for it */ 
 581         retval 
= dictAdd(c
->db
->io_keys
,key
,l
); 
 583         redisAssert(retval 
== DICT_OK
); 
 585         l 
= dictGetEntryVal(de
); 
 587     listAddNodeTail(l
,c
); 
 589     /* Are we already loading the key from disk? If not create a job */ 
 590     if (o
->storage 
== REDIS_VM_SWAPPED
) { 
 592         vmpointer 
*vp 
= (vmpointer
*)o
; 
 594         o
->storage 
= REDIS_VM_LOADING
; 
 595         j 
= zmalloc(sizeof(*j
)); 
 596         j
->type 
= REDIS_IOJOB_LOAD
; 
 604         j
->thread 
= (pthread_t
) -1; 
 612 /* Preload keys for any command with first, last and step values for 
 613  * the command keys prototype, as defined in the command table. */ 
 614 void waitForMultipleSwappedKeys(redisClient 
*c
, struct redisCommand 
*cmd
, int argc
, robj 
**argv
) { 
 616     if (cmd
->vm_firstkey 
== 0) return; 
 617     last 
= cmd
->vm_lastkey
; 
 618     if (last 
< 0) last 
= argc
+last
; 
 619     for (j 
= cmd
->vm_firstkey
; j 
<= last
; j 
+= cmd
->vm_keystep
) { 
 620         redisAssert(j 
< argc
); 
 621         waitForSwappedKey(c
,argv
[j
]); 
 625 /* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands. 
 626  * Note that the number of keys to preload is user-defined, so we need to 
 627  * apply a sanity check against argc. */ 
 628 void zunionInterBlockClientOnSwappedKeys(redisClient 
*c
, struct redisCommand 
*cmd
, int argc
, robj 
**argv
) { 
 632     num 
= atoi(argv
[2]->ptr
); 
 633     if (num 
> (argc
-3)) return; 
 634     for (i 
= 0; i 
< num
; i
++) { 
 635         waitForSwappedKey(c
,argv
[3+i
]); 
 639 /* Preload keys needed to execute the entire MULTI/EXEC block. 
 641  * This function is called by blockClientOnSwappedKeys when EXEC is issued, 
 642  * and will block the client when any command requires a swapped out value. */ 
 643 void execBlockClientOnSwappedKeys(redisClient 
*c
, struct redisCommand 
*cmd
, int argc
, robj 
**argv
) { 
 645     struct redisCommand 
*mcmd
; 
 651     if (!(c
->flags 
& REDIS_MULTI
)) return; 
 652     for (i 
= 0; i 
< c
->mstate
.count
; i
++) { 
 653         mcmd 
= c
->mstate
.commands
[i
].cmd
; 
 654         margc 
= c
->mstate
.commands
[i
].argc
; 
 655         margv 
= c
->mstate
.commands
[i
].argv
; 
 657         if (mcmd
->vm_preload_proc 
!= NULL
) { 
 658             mcmd
->vm_preload_proc(c
,mcmd
,margc
,margv
); 
 660             waitForMultipleSwappedKeys(c
,mcmd
,margc
,margv
); 
 665 /* Is this client attempting to run a command against swapped keys? 
 666  * If so, block it ASAP, load the keys in background, then resume it. 
 668  * The important idea about this function is that it can fail! If keys will 
 669  * still be swapped when the client is resumed, this key lookups will 
 670  * just block loading keys from disk. In practical terms this should only 
 671  * happen with SORT BY command or if there is a bug in this function. 
 673  * Return 1 if the client is marked as blocked, 0 if the client can 
 674  * continue as the keys it is going to access appear to be in memory. */ 
 675 int blockClientOnSwappedKeys(redisClient 
*c
, struct redisCommand 
*cmd
) { 
 676     if (cmd
->vm_preload_proc 
!= NULL
) { 
 677         cmd
->vm_preload_proc(c
,cmd
,c
->argc
,c
->argv
); 
 679         waitForMultipleSwappedKeys(c
,cmd
,c
->argc
,c
->argv
); 
 682     /* If the client was blocked for at least one key, mark it as blocked. */ 
 683     if (listLength(c
->io_keys
)) { 
 684         c
->flags 
|= REDIS_IO_WAIT
; 
 685         aeDeleteFileEvent(server
.el
,c
->fd
,AE_READABLE
); 
 686         server
.vm_blocked_clients
++; 
 693 /* Remove the 'key' from the list of blocked keys for a given client. 
 695  * The function returns 1 when there are no longer blocking keys after 
 696  * the current one was removed (and the client can be unblocked). */ 
 697 int dontWaitForSwappedKey(redisClient 
*c
, robj 
*key
) { 
 701     struct dictEntry 
*de
; 
 703     /* The key object might be destroyed when deleted from the c->io_keys 
 704      * list (and the "key" argument is physically the same object as the 
 705      * object inside the list), so we need to protect it. */ 
 708     /* Remove the key from the list of keys this client is waiting for. */ 
 709     listRewind(c
->io_keys
,&li
); 
 710     while ((ln 
= listNext(&li
)) != NULL
) { 
 711         if (equalStringObjects(ln
->value
,key
)) { 
 712             listDelNode(c
->io_keys
,ln
); 
 716     redisAssert(ln 
!= NULL
); 
 718     /* Remove the client form the key => waiting clients map. */ 
 719     de 
= dictFind(c
->db
->io_keys
,key
); 
 720     redisAssert(de 
!= NULL
); 
 721     l 
= dictGetEntryVal(de
); 
 722     ln 
= listSearchKey(l
,c
); 
 723     redisAssert(ln 
!= NULL
); 
 725     if (listLength(l
) == 0) 
 726         dictDelete(c
->db
->io_keys
,key
); 
 729     return listLength(c
->io_keys
) == 0; 
 732 /* Every time we now a key was loaded back in memory, we handle clients 
 733  * waiting for this key if any. */ 
 734 void handleClientsBlockedOnSwappedKey(redisDb 
*db
, robj 
*key
) { 
 735     struct dictEntry 
*de
; 
 740     de 
= dictFind(db
->io_keys
,key
); 
 743     l 
= dictGetEntryVal(de
); 
 745     /* Note: we can't use something like while(listLength(l)) as the list 
 746      * can be freed by the calling function when we remove the last element. */ 
 749         redisClient 
*c 
= ln
->value
; 
 751         if (dontWaitForSwappedKey(c
,key
)) { 
 752             /* Put the client in the list of clients ready to go as we 
 753              * loaded all the keys about it. */ 
 754             listAddNodeTail(server
.io_ready_clients
,c
);