]>
Commit | Line | Data |
---|---|---|
e2641e09 | 1 | #include "redis.h" |
2 | ||
3 | #include <fcntl.h> | |
4 | #include <pthread.h> | |
5 | #include <math.h> | |
6 | #include <signal.h> | |
7 | ||
33388d43 | 8 | /* dscache.c - Disk store cache for disk store backend. |
9 | * | |
10 | * When Redis is configured for using disk as backend instead of memory, the | |
11 | * memory is used as a cache, so that recently accessed keys are taken in | |
12 | * memory for fast read and write operations. | |
13 | * | |
14 | * Modified keys are marked to be flushed on disk, and will be flushed | |
15 | * as long as the maxium configured flush time elapsed. | |
16 | * | |
17 | * This file implements the whole caching subsystem and contains further | |
18 | * documentation. */ | |
19 | ||
20 | /* TODO: | |
21 | * | |
22 | * - The WATCH helper will be used to signal the cache system | |
23 | * we need to flush a given key/dbid into disk, adding this key/dbid | |
24 | * pair into a server.ds_cache_dirty linked list AND hash table (so that we | |
25 | * don't add the same thing multiple times). | |
26 | * | |
27 | * - cron() checks if there are elements on this list. When there are things | |
28 | * to flush, we create an IO Job for the I/O thread. | |
16d77878 | 29 | * NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects |
30 | * that are referenced an IO job for flushing on disk are marked as | |
31 | * o->storage == REDIS_DS_SAVING. | |
33388d43 | 32 | * |
33 | * - This is what we do on key lookup: | |
16d77878 | 34 | * 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY |
35 | * or it is object->storage == REDIS_DS_DIRTY: | |
33388d43 | 36 | * We don't do nothing special, lookup, return value object pointer. |
37 | * 2) The key is in memory but object->storage == REDIS_DS_SAVING. | |
16d77878 | 38 | * When this happens we block waiting for the I/O thread to process |
39 | * this object. Then continue. | |
33388d43 | 40 | * 3) The key is not in memory. We block to load the key from disk. |
41 | * Of course the key may not be present at all on the disk store as well, | |
42 | * in such case we just detect this condition and continue, returning | |
43 | * NULL from lookup. | |
44 | * | |
45 | * - Preloading of needed keys: | |
46 | * 1) As it was done with VM, also with this new system we try preloading | |
47 | * keys a client is going to use. We block the client, load keys | |
48 | * using the I/O thread, unblock the client. Same code as VM more or less. | |
49 | * | |
16d77878 | 50 | * - Reclaiming memory. |
51 | * In cron() we detect our memory limit was reached. What we | |
52 | * do is deleting keys that are REDIS_DS_MEMORY, using LRU. | |
53 | * | |
33388d43 | 54 | * If this is not enough to return again under the memory limits we also |
55 | * start to flush keys that need to be synched on disk synchronously, | |
16d77878 | 56 | * removing it from the memory. We do this blocking as memory limit is a |
57 | * much "harder" barrirer in the new design. | |
33388d43 | 58 | * |
59 | * - IO thread operations are no longer stopped for sync loading/saving of | |
16d77878 | 60 | * things. When a key is found to be in the process of being saved |
61 | * we simply wait for the IO thread to end its work. | |
33388d43 | 62 | * |
63 | * Otherwise if there is to load a key without any IO thread operation | |
64 | * just started it is blocking-loaded in the lookup function. | |
16d77878 | 65 | * |
66 | * - What happens when an object is destroyed? | |
67 | * | |
68 | * If o->storage == REDIS_DS_MEMORY then we simply destory the object. | |
69 | * If o->storage == REDIS_DS_DIRTY we can still remove the object. It had | |
70 | * changes not flushed on disk, but is being removed so | |
71 | * who cares. | |
72 | * if o->storage == REDIS_DS_SAVING then the object is being saved so | |
73 | * it is impossible that its refcount == 1, must be at | |
74 | * least two. When the object is saved the storage will | |
75 | * be set back to DS_MEMORY. | |
76 | * | |
77 | * - What happens when keys are deleted? | |
78 | * | |
79 | * We simply schedule a key flush operation as usually, but when the | |
80 | * IO thread will be created the object pointer will be set to NULL | |
81 | * so the IO thread will know that the work to do is to delete the key | |
82 | * from the disk store. | |
83 | * | |
84 | * - What happens with MULTI/EXEC? | |
85 | * | |
86 | * Good question. | |
33388d43 | 87 | */ |
88 | ||
e2641e09 | 89 | /* Virtual Memory is composed mainly of two subsystems: |
90 | * - Blocking Virutal Memory | |
91 | * - Threaded Virtual Memory I/O | |
92 | * The two parts are not fully decoupled, but functions are split among two | |
93 | * different sections of the source code (delimited by comments) in order to | |
94 | * make more clear what functionality is about the blocking VM and what about | |
95 | * the threaded (not blocking) VM. | |
96 | * | |
97 | * Redis VM design: | |
98 | * | |
99 | * Redis VM is a blocking VM (one that blocks reading swapped values from | |
100 | * disk into memory when a value swapped out is needed in memory) that is made | |
101 | * unblocking by trying to examine the command argument vector in order to | |
102 | * load in background values that will likely be needed in order to exec | |
103 | * the command. The command is executed only once all the relevant keys | |
104 | * are loaded into memory. | |
105 | * | |
106 | * This basically is almost as simple of a blocking VM, but almost as parallel | |
107 | * as a fully non-blocking VM. | |
108 | */ | |
109 | ||
110 | /* =================== Virtual Memory - Blocking Side ====================== */ | |
111 | ||
112 | /* Create a VM pointer object. This kind of objects are used in place of | |
113 | * values in the key -> value hash table, for swapped out objects. */ | |
114 | vmpointer *createVmPointer(int vtype) { | |
115 | vmpointer *vp = zmalloc(sizeof(vmpointer)); | |
116 | ||
117 | vp->type = REDIS_VMPOINTER; | |
118 | vp->storage = REDIS_VM_SWAPPED; | |
119 | vp->vtype = vtype; | |
120 | return vp; | |
121 | } | |
122 | ||
123 | void vmInit(void) { | |
124 | off_t totsize; | |
125 | int pipefds[2]; | |
126 | size_t stacksize; | |
127 | struct flock fl; | |
128 | ||
129 | if (server.vm_max_threads != 0) | |
130 | zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ | |
131 | ||
132 | redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); | |
133 | /* Try to open the old swap file, otherwise create it */ | |
134 | if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { | |
135 | server.vm_fp = fopen(server.vm_swap_file,"w+b"); | |
136 | } | |
137 | if (server.vm_fp == NULL) { | |
138 | redisLog(REDIS_WARNING, | |
139 | "Can't open the swap file: %s. Exiting.", | |
140 | strerror(errno)); | |
141 | exit(1); | |
142 | } | |
143 | server.vm_fd = fileno(server.vm_fp); | |
144 | /* Lock the swap file for writing, this is useful in order to avoid | |
145 | * another instance to use the same swap file for a config error. */ | |
146 | fl.l_type = F_WRLCK; | |
147 | fl.l_whence = SEEK_SET; | |
148 | fl.l_start = fl.l_len = 0; | |
149 | if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { | |
150 | redisLog(REDIS_WARNING, | |
151 | "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); | |
152 | exit(1); | |
153 | } | |
154 | /* Initialize */ | |
155 | server.vm_next_page = 0; | |
156 | server.vm_near_pages = 0; | |
157 | server.vm_stats_used_pages = 0; | |
158 | server.vm_stats_swapped_objects = 0; | |
159 | server.vm_stats_swapouts = 0; | |
160 | server.vm_stats_swapins = 0; | |
161 | totsize = server.vm_pages*server.vm_page_size; | |
162 | redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize); | |
163 | if (ftruncate(server.vm_fd,totsize) == -1) { | |
164 | redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.", | |
165 | strerror(errno)); | |
166 | exit(1); | |
167 | } else { | |
168 | redisLog(REDIS_NOTICE,"Swap file allocated with success"); | |
169 | } | |
399f2f40 | 170 | server.vm_bitmap = zcalloc((server.vm_pages+7)/8); |
e2641e09 | 171 | redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages", |
172 | (long long) (server.vm_pages+7)/8, server.vm_pages); | |
e2641e09 | 173 | |
174 | /* Initialize threaded I/O (used by Virtual Memory) */ | |
175 | server.io_newjobs = listCreate(); | |
176 | server.io_processing = listCreate(); | |
177 | server.io_processed = listCreate(); | |
178 | server.io_ready_clients = listCreate(); | |
179 | pthread_mutex_init(&server.io_mutex,NULL); | |
e2641e09 | 180 | pthread_mutex_init(&server.io_swapfile_mutex,NULL); |
181 | server.io_active_threads = 0; | |
182 | if (pipe(pipefds) == -1) { | |
183 | redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting." | |
184 | ,strerror(errno)); | |
185 | exit(1); | |
186 | } | |
187 | server.io_ready_pipe_read = pipefds[0]; | |
188 | server.io_ready_pipe_write = pipefds[1]; | |
189 | redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR); | |
190 | /* LZF requires a lot of stack */ | |
191 | pthread_attr_init(&server.io_threads_attr); | |
192 | pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); | |
556bdfba | 193 | |
194 | /* Solaris may report a stacksize of 0, let's set it to 1 otherwise | |
195 | * multiplying it by 2 in the while loop later will not really help ;) */ | |
196 | if (!stacksize) stacksize = 1; | |
197 | ||
e2641e09 | 198 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; |
199 | pthread_attr_setstacksize(&server.io_threads_attr, stacksize); | |
200 | /* Listen for events in the threaded I/O pipe */ | |
201 | if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, | |
202 | vmThreadedIOCompletedJob, NULL) == AE_ERR) | |
203 | oom("creating file event"); | |
204 | } | |
205 | ||
206 | /* Mark the page as used */ | |
207 | void vmMarkPageUsed(off_t page) { | |
208 | off_t byte = page/8; | |
209 | int bit = page&7; | |
210 | redisAssert(vmFreePage(page) == 1); | |
211 | server.vm_bitmap[byte] |= 1<<bit; | |
212 | } | |
213 | ||
214 | /* Mark N contiguous pages as used, with 'page' being the first. */ | |
215 | void vmMarkPagesUsed(off_t page, off_t count) { | |
216 | off_t j; | |
217 | ||
218 | for (j = 0; j < count; j++) | |
219 | vmMarkPageUsed(page+j); | |
220 | server.vm_stats_used_pages += count; | |
221 | redisLog(REDIS_DEBUG,"Mark USED pages: %lld pages at %lld\n", | |
222 | (long long)count, (long long)page); | |
223 | } | |
224 | ||
225 | /* Mark the page as free */ | |
226 | void vmMarkPageFree(off_t page) { | |
227 | off_t byte = page/8; | |
228 | int bit = page&7; | |
229 | redisAssert(vmFreePage(page) == 0); | |
230 | server.vm_bitmap[byte] &= ~(1<<bit); | |
231 | } | |
232 | ||
233 | /* Mark N contiguous pages as free, with 'page' being the first. */ | |
234 | void vmMarkPagesFree(off_t page, off_t count) { | |
235 | off_t j; | |
236 | ||
237 | for (j = 0; j < count; j++) | |
238 | vmMarkPageFree(page+j); | |
239 | server.vm_stats_used_pages -= count; | |
240 | redisLog(REDIS_DEBUG,"Mark FREE pages: %lld pages at %lld\n", | |
241 | (long long)count, (long long)page); | |
242 | } | |
243 | ||
244 | /* Test if the page is free */ | |
245 | int vmFreePage(off_t page) { | |
246 | off_t byte = page/8; | |
247 | int bit = page&7; | |
248 | return (server.vm_bitmap[byte] & (1<<bit)) == 0; | |
249 | } | |
250 | ||
251 | /* Find N contiguous free pages storing the first page of the cluster in *first. | |
252 | * Returns REDIS_OK if it was able to find N contiguous pages, otherwise | |
253 | * REDIS_ERR is returned. | |
254 | * | |
255 | * This function uses a simple algorithm: we try to allocate | |
256 | * REDIS_VM_MAX_NEAR_PAGES sequentially, when we reach this limit we start | |
257 | * again from the start of the swap file searching for free spaces. | |
258 | * | |
259 | * If it looks pretty clear that there are no free pages near our offset | |
260 | * we try to find less populated places doing a forward jump of | |
261 | * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages | |
262 | * without hurry, and then we jump again and so forth... | |
263 | * | |
264 | * This function can be improved using a free list to avoid to guess | |
265 | * too much, since we could collect data about freed pages. | |
266 | * | |
267 | * note: I implemented this function just after watching an episode of | |
268 | * Battlestar Galactica, where the hybrid was continuing to say "JUMP!" | |
269 | */ | |
270 | int vmFindContiguousPages(off_t *first, off_t n) { | |
271 | off_t base, offset = 0, since_jump = 0, numfree = 0; | |
272 | ||
273 | if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) { | |
274 | server.vm_near_pages = 0; | |
275 | server.vm_next_page = 0; | |
276 | } | |
277 | server.vm_near_pages++; /* Yet another try for pages near to the old ones */ | |
278 | base = server.vm_next_page; | |
279 | ||
280 | while(offset < server.vm_pages) { | |
281 | off_t this = base+offset; | |
282 | ||
283 | /* If we overflow, restart from page zero */ | |
284 | if (this >= server.vm_pages) { | |
285 | this -= server.vm_pages; | |
286 | if (this == 0) { | |
287 | /* Just overflowed, what we found on tail is no longer | |
288 | * interesting, as it's no longer contiguous. */ | |
289 | numfree = 0; | |
290 | } | |
291 | } | |
292 | if (vmFreePage(this)) { | |
293 | /* This is a free page */ | |
294 | numfree++; | |
295 | /* Already got N free pages? Return to the caller, with success */ | |
296 | if (numfree == n) { | |
297 | *first = this-(n-1); | |
298 | server.vm_next_page = this+1; | |
299 | redisLog(REDIS_DEBUG, "FOUND CONTIGUOUS PAGES: %lld pages at %lld\n", (long long) n, (long long) *first); | |
300 | return REDIS_OK; | |
301 | } | |
302 | } else { | |
303 | /* The current one is not a free page */ | |
304 | numfree = 0; | |
305 | } | |
306 | ||
307 | /* Fast-forward if the current page is not free and we already | |
308 | * searched enough near this place. */ | |
309 | since_jump++; | |
310 | if (!numfree && since_jump >= REDIS_VM_MAX_RANDOM_JUMP/4) { | |
311 | offset += random() % REDIS_VM_MAX_RANDOM_JUMP; | |
312 | since_jump = 0; | |
313 | /* Note that even if we rewind after the jump, we are don't need | |
314 | * to make sure numfree is set to zero as we only jump *if* it | |
315 | * is set to zero. */ | |
316 | } else { | |
317 | /* Otherwise just check the next page */ | |
318 | offset++; | |
319 | } | |
320 | } | |
321 | return REDIS_ERR; | |
322 | } | |
323 | ||
324 | /* Write the specified object at the specified page of the swap file */ | |
325 | int vmWriteObjectOnSwap(robj *o, off_t page) { | |
326 | if (server.vm_enabled) pthread_mutex_lock(&server.io_swapfile_mutex); | |
327 | if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) { | |
328 | if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex); | |
329 | redisLog(REDIS_WARNING, | |
330 | "Critical VM problem in vmWriteObjectOnSwap(): can't seek: %s", | |
331 | strerror(errno)); | |
332 | return REDIS_ERR; | |
333 | } | |
334 | rdbSaveObject(server.vm_fp,o); | |
335 | fflush(server.vm_fp); | |
336 | if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex); | |
337 | return REDIS_OK; | |
338 | } | |
339 | ||
340 | /* Transfers the 'val' object to disk. Store all the information | |
341 | * a 'vmpointer' object containing all the information needed to load the | |
342 | * object back later is returned. | |
343 | * | |
344 | * If we can't find enough contiguous empty pages to swap the object on disk | |
345 | * NULL is returned. */ | |
346 | vmpointer *vmSwapObjectBlocking(robj *val) { | |
bd70a5f5 | 347 | off_t pages = rdbSavedObjectPages(val); |
e2641e09 | 348 | off_t page; |
349 | vmpointer *vp; | |
350 | ||
351 | redisAssert(val->storage == REDIS_VM_MEMORY); | |
352 | redisAssert(val->refcount == 1); | |
353 | if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL; | |
354 | if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL; | |
355 | ||
356 | vp = createVmPointer(val->type); | |
357 | vp->page = page; | |
358 | vp->usedpages = pages; | |
359 | decrRefCount(val); /* Deallocate the object from memory. */ | |
360 | vmMarkPagesUsed(page,pages); | |
361 | redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)", | |
362 | (void*) val, | |
363 | (unsigned long long) page, (unsigned long long) pages); | |
364 | server.vm_stats_swapped_objects++; | |
365 | server.vm_stats_swapouts++; | |
366 | return vp; | |
367 | } | |
368 | ||
369 | robj *vmReadObjectFromSwap(off_t page, int type) { | |
370 | robj *o; | |
371 | ||
372 | if (server.vm_enabled) pthread_mutex_lock(&server.io_swapfile_mutex); | |
373 | if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) { | |
374 | redisLog(REDIS_WARNING, | |
375 | "Unrecoverable VM problem in vmReadObjectFromSwap(): can't seek: %s", | |
376 | strerror(errno)); | |
377 | _exit(1); | |
378 | } | |
379 | o = rdbLoadObject(type,server.vm_fp); | |
380 | if (o == NULL) { | |
381 | redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmReadObjectFromSwap(): can't load object from swap file: %s", strerror(errno)); | |
382 | _exit(1); | |
383 | } | |
384 | if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex); | |
385 | return o; | |
386 | } | |
387 | ||
388 | /* Load the specified object from swap to memory. | |
389 | * The newly allocated object is returned. | |
390 | * | |
391 | * If preview is true the unserialized object is returned to the caller but | |
392 | * the pages are not marked as freed, nor the vp object is freed. */ | |
393 | robj *vmGenericLoadObject(vmpointer *vp, int preview) { | |
394 | robj *val; | |
395 | ||
396 | redisAssert(vp->type == REDIS_VMPOINTER && | |
397 | (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING)); | |
398 | val = vmReadObjectFromSwap(vp->page,vp->vtype); | |
399 | if (!preview) { | |
400 | redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp); | |
401 | vmMarkPagesFree(vp->page,vp->usedpages); | |
402 | zfree(vp); | |
403 | server.vm_stats_swapped_objects--; | |
404 | } else { | |
405 | redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp); | |
406 | } | |
407 | server.vm_stats_swapins++; | |
408 | return val; | |
409 | } | |
410 | ||
411 | /* Plain object loading, from swap to memory. | |
412 | * | |
413 | * 'o' is actually a redisVmPointer structure that will be freed by the call. | |
414 | * The return value is the loaded object. */ | |
415 | robj *vmLoadObject(robj *o) { | |
416 | /* If we are loading the object in background, stop it, we | |
417 | * need to load this object synchronously ASAP. */ | |
418 | if (o->storage == REDIS_VM_LOADING) | |
419 | vmCancelThreadedIOJob(o); | |
420 | return vmGenericLoadObject((vmpointer*)o,0); | |
421 | } | |
422 | ||
423 | /* Just load the value on disk, without to modify the key. | |
424 | * This is useful when we want to perform some operation on the value | |
425 | * without to really bring it from swap to memory, like while saving the | |
426 | * dataset or rewriting the append only log. */ | |
427 | robj *vmPreviewObject(robj *o) { | |
428 | return vmGenericLoadObject((vmpointer*)o,1); | |
429 | } | |
430 | ||
431 | /* How a good candidate is this object for swapping? | |
432 | * The better candidate it is, the greater the returned value. | |
433 | * | |
434 | * Currently we try to perform a fast estimation of the object size in | |
435 | * memory, and combine it with aging informations. | |
436 | * | |
437 | * Basically swappability = idle-time * log(estimated size) | |
438 | * | |
439 | * Bigger objects are preferred over smaller objects, but not | |
440 | * proportionally, this is why we use the logarithm. This algorithm is | |
441 | * just a first try and will probably be tuned later. */ | |
442 | double computeObjectSwappability(robj *o) { | |
443 | /* actual age can be >= minage, but not < minage. As we use wrapping | |
444 | * 21 bit clocks with minutes resolution for the LRU. */ | |
ef59a8bc | 445 | time_t minage = estimateObjectIdleTime(o); |
e2641e09 | 446 | long asize = 0, elesize; |
447 | robj *ele; | |
448 | list *l; | |
449 | listNode *ln; | |
450 | dict *d; | |
451 | struct dictEntry *de; | |
452 | int z; | |
453 | ||
454 | if (minage <= 0) return 0; | |
455 | switch(o->type) { | |
456 | case REDIS_STRING: | |
457 | if (o->encoding != REDIS_ENCODING_RAW) { | |
458 | asize = sizeof(*o); | |
459 | } else { | |
460 | asize = sdslen(o->ptr)+sizeof(*o)+sizeof(long)*2; | |
461 | } | |
462 | break; | |
463 | case REDIS_LIST: | |
464 | if (o->encoding == REDIS_ENCODING_ZIPLIST) { | |
465 | asize = sizeof(*o)+ziplistSize(o->ptr); | |
466 | } else { | |
467 | l = o->ptr; | |
468 | ln = listFirst(l); | |
469 | asize = sizeof(list); | |
470 | if (ln) { | |
471 | ele = ln->value; | |
472 | elesize = (ele->encoding == REDIS_ENCODING_RAW) ? | |
473 | (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); | |
474 | asize += (sizeof(listNode)+elesize)*listLength(l); | |
475 | } | |
476 | } | |
477 | break; | |
478 | case REDIS_SET: | |
479 | case REDIS_ZSET: | |
480 | z = (o->type == REDIS_ZSET); | |
481 | d = z ? ((zset*)o->ptr)->dict : o->ptr; | |
482 | ||
5f19e8a4 | 483 | if (!z && o->encoding == REDIS_ENCODING_INTSET) { |
484 | intset *is = o->ptr; | |
485 | asize = sizeof(*is)+is->encoding*is->length; | |
486 | } else { | |
487 | asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); | |
488 | if (z) asize += sizeof(zset)-sizeof(dict); | |
489 | if (dictSize(d)) { | |
490 | de = dictGetRandomKey(d); | |
491 | ele = dictGetEntryKey(de); | |
492 | elesize = (ele->encoding == REDIS_ENCODING_RAW) ? | |
493 | (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); | |
494 | asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); | |
495 | if (z) asize += sizeof(zskiplistNode)*dictSize(d); | |
496 | } | |
e2641e09 | 497 | } |
498 | break; | |
499 | case REDIS_HASH: | |
500 | if (o->encoding == REDIS_ENCODING_ZIPMAP) { | |
501 | unsigned char *p = zipmapRewind((unsigned char*)o->ptr); | |
502 | unsigned int len = zipmapLen((unsigned char*)o->ptr); | |
503 | unsigned int klen, vlen; | |
504 | unsigned char *key, *val; | |
505 | ||
506 | if ((p = zipmapNext(p,&key,&klen,&val,&vlen)) == NULL) { | |
507 | klen = 0; | |
508 | vlen = 0; | |
509 | } | |
510 | asize = len*(klen+vlen+3); | |
511 | } else if (o->encoding == REDIS_ENCODING_HT) { | |
512 | d = o->ptr; | |
513 | asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); | |
514 | if (dictSize(d)) { | |
515 | de = dictGetRandomKey(d); | |
516 | ele = dictGetEntryKey(de); | |
517 | elesize = (ele->encoding == REDIS_ENCODING_RAW) ? | |
518 | (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); | |
519 | ele = dictGetEntryVal(de); | |
520 | elesize = (ele->encoding == REDIS_ENCODING_RAW) ? | |
521 | (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); | |
522 | asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); | |
523 | } | |
524 | } | |
525 | break; | |
526 | } | |
527 | return (double)minage*log(1+asize); | |
528 | } | |
529 | ||
530 | /* Try to swap an object that's a good candidate for swapping. | |
531 | * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible | |
532 | * to swap any object at all. | |
533 | * | |
534 | * If 'usethreaded' is true, Redis will try to swap the object in background | |
535 | * using I/O threads. */ | |
536 | int vmSwapOneObject(int usethreads) { | |
537 | int j, i; | |
538 | struct dictEntry *best = NULL; | |
539 | double best_swappability = 0; | |
540 | redisDb *best_db = NULL; | |
541 | robj *val; | |
542 | sds key; | |
543 | ||
544 | for (j = 0; j < server.dbnum; j++) { | |
545 | redisDb *db = server.db+j; | |
546 | /* Why maxtries is set to 100? | |
547 | * Because this way (usually) we'll find 1 object even if just 1% - 2% | |
548 | * are swappable objects */ | |
549 | int maxtries = 100; | |
550 | ||
551 | if (dictSize(db->dict) == 0) continue; | |
552 | for (i = 0; i < 5; i++) { | |
553 | dictEntry *de; | |
554 | double swappability; | |
555 | ||
556 | if (maxtries) maxtries--; | |
557 | de = dictGetRandomKey(db->dict); | |
558 | val = dictGetEntryVal(de); | |
559 | /* Only swap objects that are currently in memory. | |
560 | * | |
561 | * Also don't swap shared objects: not a good idea in general and | |
562 | * we need to ensure that the main thread does not touch the | |
563 | * object while the I/O thread is using it, but we can't | |
564 | * control other keys without adding additional mutex. */ | |
565 | if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) { | |
566 | if (maxtries) i--; /* don't count this try */ | |
567 | continue; | |
568 | } | |
569 | swappability = computeObjectSwappability(val); | |
570 | if (!best || swappability > best_swappability) { | |
571 | best = de; | |
572 | best_swappability = swappability; | |
573 | best_db = db; | |
574 | } | |
575 | } | |
576 | } | |
577 | if (best == NULL) return REDIS_ERR; | |
578 | key = dictGetEntryKey(best); | |
579 | val = dictGetEntryVal(best); | |
580 | ||
581 | redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", | |
582 | key, best_swappability); | |
583 | ||
584 | /* Swap it */ | |
585 | if (usethreads) { | |
586 | robj *keyobj = createStringObject(key,sdslen(key)); | |
587 | vmSwapObjectThreaded(keyobj,val,best_db); | |
588 | decrRefCount(keyobj); | |
589 | return REDIS_OK; | |
590 | } else { | |
591 | vmpointer *vp; | |
592 | ||
593 | if ((vp = vmSwapObjectBlocking(val)) != NULL) { | |
594 | dictGetEntryVal(best) = vp; | |
595 | return REDIS_OK; | |
596 | } else { | |
597 | return REDIS_ERR; | |
598 | } | |
599 | } | |
600 | } | |
601 | ||
602 | int vmSwapOneObjectBlocking() { | |
603 | return vmSwapOneObject(0); | |
604 | } | |
605 | ||
606 | int vmSwapOneObjectThreaded() { | |
607 | return vmSwapOneObject(1); | |
608 | } | |
609 | ||
610 | /* Return true if it's safe to swap out objects in a given moment. | |
611 | * Basically we don't want to swap objects out while there is a BGSAVE | |
612 | * or a BGAEOREWRITE running in backgroud. */ | |
613 | int vmCanSwapOut(void) { | |
614 | return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); | |
615 | } | |
616 | ||
617 | /* =================== Virtual Memory - Threaded I/O ======================= */ | |
618 | ||
619 | void freeIOJob(iojob *j) { | |
620 | if ((j->type == REDIS_IOJOB_PREPARE_SWAP || | |
621 | j->type == REDIS_IOJOB_DO_SWAP || | |
622 | j->type == REDIS_IOJOB_LOAD) && j->val != NULL) | |
623 | { | |
624 | /* we fix the storage type, otherwise decrRefCount() will try to | |
625 | * kill the I/O thread Job (that does no longer exists). */ | |
626 | if (j->val->storage == REDIS_VM_SWAPPING) | |
627 | j->val->storage = REDIS_VM_MEMORY; | |
628 | decrRefCount(j->val); | |
629 | } | |
630 | decrRefCount(j->key); | |
631 | zfree(j); | |
632 | } | |
633 | ||
634 | /* Every time a thread finished a Job, it writes a byte into the write side | |
635 | * of an unix pipe in order to "awake" the main thread, and this function | |
c1ae36ae | 636 | * is called. |
637 | * | |
638 | * Note that this is called both by the event loop, when a I/O thread | |
639 | * sends a byte in the notification pipe, and is also directly called from | |
640 | * waitEmptyIOJobsQueue(). | |
641 | * | |
642 | * In the latter case we don't want to swap more, so we use the | |
643 | * "privdata" argument setting it to a not NULL value to signal this | |
644 | * condition. */ | |
e2641e09 | 645 | void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, |
646 | int mask) | |
647 | { | |
648 | char buf[1]; | |
649 | int retval, processed = 0, toprocess = -1, trytoswap = 1; | |
650 | REDIS_NOTUSED(el); | |
651 | REDIS_NOTUSED(mask); | |
652 | REDIS_NOTUSED(privdata); | |
653 | ||
e5f257c2 | 654 | if (privdata != NULL) trytoswap = 0; /* check the comments above... */ |
c1ae36ae | 655 | |
e2641e09 | 656 | /* For every byte we read in the read side of the pipe, there is one |
657 | * I/O job completed to process. */ | |
658 | while((retval = read(fd,buf,1)) == 1) { | |
659 | iojob *j; | |
660 | listNode *ln; | |
661 | struct dictEntry *de; | |
662 | ||
663 | redisLog(REDIS_DEBUG,"Processing I/O completed job"); | |
664 | ||
665 | /* Get the processed element (the oldest one) */ | |
666 | lockThreadedIO(); | |
667 | redisAssert(listLength(server.io_processed) != 0); | |
668 | if (toprocess == -1) { | |
669 | toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; | |
670 | if (toprocess <= 0) toprocess = 1; | |
671 | } | |
672 | ln = listFirst(server.io_processed); | |
673 | j = ln->value; | |
674 | listDelNode(server.io_processed,ln); | |
675 | unlockThreadedIO(); | |
676 | /* If this job is marked as canceled, just ignore it */ | |
677 | if (j->canceled) { | |
678 | freeIOJob(j); | |
679 | continue; | |
680 | } | |
681 | /* Post process it in the main thread, as there are things we | |
682 | * can do just here to avoid race conditions and/or invasive locks */ | |
683 | redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); | |
684 | de = dictFind(j->db->dict,j->key->ptr); | |
685 | redisAssert(de != NULL); | |
686 | if (j->type == REDIS_IOJOB_LOAD) { | |
687 | redisDb *db; | |
688 | vmpointer *vp = dictGetEntryVal(de); | |
689 | ||
690 | /* Key loaded, bring it at home */ | |
691 | vmMarkPagesFree(vp->page,vp->usedpages); | |
692 | redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", | |
693 | (unsigned char*) j->key->ptr); | |
694 | server.vm_stats_swapped_objects--; | |
695 | server.vm_stats_swapins++; | |
696 | dictGetEntryVal(de) = j->val; | |
697 | incrRefCount(j->val); | |
698 | db = j->db; | |
699 | /* Handle clients waiting for this key to be loaded. */ | |
700 | handleClientsBlockedOnSwappedKey(db,j->key); | |
701 | freeIOJob(j); | |
702 | zfree(vp); | |
703 | } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { | |
704 | /* Now we know the amount of pages required to swap this object. | |
705 | * Let's find some space for it, and queue this task again | |
706 | * rebranded as REDIS_IOJOB_DO_SWAP. */ | |
707 | if (!vmCanSwapOut() || | |
708 | vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) | |
709 | { | |
710 | /* Ooops... no space or we can't swap as there is | |
711 | * a fork()ed Redis trying to save stuff on disk. */ | |
712 | j->val->storage = REDIS_VM_MEMORY; /* undo operation */ | |
713 | freeIOJob(j); | |
714 | } else { | |
715 | /* Note that we need to mark this pages as used now, | |
716 | * if the job will be canceled, we'll mark them as freed | |
717 | * again. */ | |
718 | vmMarkPagesUsed(j->page,j->pages); | |
719 | j->type = REDIS_IOJOB_DO_SWAP; | |
720 | lockThreadedIO(); | |
721 | queueIOJob(j); | |
722 | unlockThreadedIO(); | |
723 | } | |
724 | } else if (j->type == REDIS_IOJOB_DO_SWAP) { | |
725 | vmpointer *vp; | |
726 | ||
727 | /* Key swapped. We can finally free some memory. */ | |
728 | if (j->val->storage != REDIS_VM_SWAPPING) { | |
729 | vmpointer *vp = (vmpointer*) j->id; | |
730 | printf("storage: %d\n",vp->storage); | |
731 | printf("key->name: %s\n",(char*)j->key->ptr); | |
732 | printf("val: %p\n",(void*)j->val); | |
733 | printf("val->type: %d\n",j->val->type); | |
734 | printf("val->ptr: %s\n",(char*)j->val->ptr); | |
735 | } | |
736 | redisAssert(j->val->storage == REDIS_VM_SWAPPING); | |
737 | vp = createVmPointer(j->val->type); | |
738 | vp->page = j->page; | |
739 | vp->usedpages = j->pages; | |
740 | dictGetEntryVal(de) = vp; | |
741 | /* Fix the storage otherwise decrRefCount will attempt to | |
742 | * remove the associated I/O job */ | |
743 | j->val->storage = REDIS_VM_MEMORY; | |
744 | decrRefCount(j->val); | |
745 | redisLog(REDIS_DEBUG, | |
746 | "VM: object %s swapped out at %lld (%lld pages) (threaded)", | |
747 | (unsigned char*) j->key->ptr, | |
748 | (unsigned long long) j->page, (unsigned long long) j->pages); | |
749 | server.vm_stats_swapped_objects++; | |
750 | server.vm_stats_swapouts++; | |
751 | freeIOJob(j); | |
752 | /* Put a few more swap requests in queue if we are still | |
753 | * out of memory */ | |
754 | if (trytoswap && vmCanSwapOut() && | |
755 | zmalloc_used_memory() > server.vm_max_memory) | |
756 | { | |
757 | int more = 1; | |
758 | while(more) { | |
759 | lockThreadedIO(); | |
760 | more = listLength(server.io_newjobs) < | |
761 | (unsigned) server.vm_max_threads; | |
762 | unlockThreadedIO(); | |
763 | /* Don't waste CPU time if swappable objects are rare. */ | |
764 | if (vmSwapOneObjectThreaded() == REDIS_ERR) { | |
765 | trytoswap = 0; | |
766 | break; | |
767 | } | |
768 | } | |
769 | } | |
770 | } | |
771 | processed++; | |
772 | if (processed == toprocess) return; | |
773 | } | |
774 | if (retval < 0 && errno != EAGAIN) { | |
775 | redisLog(REDIS_WARNING, | |
776 | "WARNING: read(2) error in vmThreadedIOCompletedJob() %s", | |
777 | strerror(errno)); | |
778 | } | |
779 | } | |
780 | ||
781 | void lockThreadedIO(void) { | |
782 | pthread_mutex_lock(&server.io_mutex); | |
783 | } | |
784 | ||
785 | void unlockThreadedIO(void) { | |
786 | pthread_mutex_unlock(&server.io_mutex); | |
787 | } | |
788 | ||
789 | /* Remove the specified object from the threaded I/O queue if still not | |
790 | * processed, otherwise make sure to flag it as canceled. */ | |
791 | void vmCancelThreadedIOJob(robj *o) { | |
792 | list *lists[3] = { | |
793 | server.io_newjobs, /* 0 */ | |
794 | server.io_processing, /* 1 */ | |
795 | server.io_processed /* 2 */ | |
796 | }; | |
797 | int i; | |
798 | ||
799 | redisAssert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); | |
800 | again: | |
801 | lockThreadedIO(); | |
802 | /* Search for a matching object in one of the queues */ | |
803 | for (i = 0; i < 3; i++) { | |
804 | listNode *ln; | |
805 | listIter li; | |
806 | ||
807 | listRewind(lists[i],&li); | |
808 | while ((ln = listNext(&li)) != NULL) { | |
809 | iojob *job = ln->value; | |
810 | ||
811 | if (job->canceled) continue; /* Skip this, already canceled. */ | |
812 | if (job->id == o) { | |
813 | redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n", | |
814 | (void*)job, (char*)job->key->ptr, job->type, i); | |
815 | /* Mark the pages as free since the swap didn't happened | |
816 | * or happened but is now discarded. */ | |
817 | if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) | |
818 | vmMarkPagesFree(job->page,job->pages); | |
819 | /* Cancel the job. It depends on the list the job is | |
820 | * living in. */ | |
821 | switch(i) { | |
822 | case 0: /* io_newjobs */ | |
823 | /* If the job was yet not processed the best thing to do | |
824 | * is to remove it from the queue at all */ | |
825 | freeIOJob(job); | |
826 | listDelNode(lists[i],ln); | |
827 | break; | |
828 | case 1: /* io_processing */ | |
829 | /* Oh Shi- the thread is messing with the Job: | |
830 | * | |
831 | * Probably it's accessing the object if this is a | |
832 | * PREPARE_SWAP or DO_SWAP job. | |
833 | * If it's a LOAD job it may be reading from disk and | |
834 | * if we don't wait for the job to terminate before to | |
835 | * cancel it, maybe in a few microseconds data can be | |
836 | * corrupted in this pages. So the short story is: | |
837 | * | |
838 | * Better to wait for the job to move into the | |
839 | * next queue (processed)... */ | |
840 | ||
841 | /* We try again and again until the job is completed. */ | |
842 | unlockThreadedIO(); | |
843 | /* But let's wait some time for the I/O thread | |
844 | * to finish with this job. After all this condition | |
845 | * should be very rare. */ | |
846 | usleep(1); | |
847 | goto again; | |
848 | case 2: /* io_processed */ | |
849 | /* The job was already processed, that's easy... | |
850 | * just mark it as canceled so that we'll ignore it | |
851 | * when processing completed jobs. */ | |
852 | job->canceled = 1; | |
853 | break; | |
854 | } | |
855 | /* Finally we have to adjust the storage type of the object | |
856 | * in order to "UNDO" the operaiton. */ | |
857 | if (o->storage == REDIS_VM_LOADING) | |
858 | o->storage = REDIS_VM_SWAPPED; | |
859 | else if (o->storage == REDIS_VM_SWAPPING) | |
860 | o->storage = REDIS_VM_MEMORY; | |
861 | unlockThreadedIO(); | |
862 | redisLog(REDIS_DEBUG,"*** DONE"); | |
863 | return; | |
864 | } | |
865 | } | |
866 | } | |
867 | unlockThreadedIO(); | |
868 | printf("Not found: %p\n", (void*)o); | |
869 | redisAssert(1 != 1); /* We should never reach this */ | |
870 | } | |
871 | ||
872 | void *IOThreadEntryPoint(void *arg) { | |
873 | iojob *j; | |
874 | listNode *ln; | |
875 | REDIS_NOTUSED(arg); | |
876 | ||
877 | pthread_detach(pthread_self()); | |
878 | while(1) { | |
879 | /* Get a new job to process */ | |
880 | lockThreadedIO(); | |
881 | if (listLength(server.io_newjobs) == 0) { | |
882 | /* No new jobs in queue, exit. */ | |
883 | redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do", | |
884 | (long) pthread_self()); | |
885 | server.io_active_threads--; | |
886 | unlockThreadedIO(); | |
887 | return NULL; | |
888 | } | |
889 | ln = listFirst(server.io_newjobs); | |
890 | j = ln->value; | |
891 | listDelNode(server.io_newjobs,ln); | |
892 | /* Add the job in the processing queue */ | |
893 | j->thread = pthread_self(); | |
894 | listAddNodeTail(server.io_processing,j); | |
895 | ln = listLast(server.io_processing); /* We use ln later to remove it */ | |
896 | unlockThreadedIO(); | |
897 | redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", | |
898 | (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); | |
899 | ||
900 | /* Process the Job */ | |
901 | if (j->type == REDIS_IOJOB_LOAD) { | |
902 | vmpointer *vp = (vmpointer*)j->id; | |
903 | j->val = vmReadObjectFromSwap(j->page,vp->vtype); | |
904 | } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { | |
bd70a5f5 | 905 | j->pages = rdbSavedObjectPages(j->val); |
e2641e09 | 906 | } else if (j->type == REDIS_IOJOB_DO_SWAP) { |
907 | if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) | |
908 | j->canceled = 1; | |
909 | } | |
910 | ||
911 | /* Done: insert the job into the processed queue */ | |
912 | redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", | |
913 | (long) pthread_self(), (void*)j, (char*)j->key->ptr); | |
914 | lockThreadedIO(); | |
915 | listDelNode(server.io_processing,ln); | |
916 | listAddNodeTail(server.io_processed,j); | |
917 | unlockThreadedIO(); | |
918 | ||
919 | /* Signal the main thread there is new stuff to process */ | |
920 | redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); | |
921 | } | |
922 | return NULL; /* never reached */ | |
923 | } | |
924 | ||
925 | void spawnIOThread(void) { | |
926 | pthread_t thread; | |
927 | sigset_t mask, omask; | |
928 | int err; | |
929 | ||
930 | sigemptyset(&mask); | |
931 | sigaddset(&mask,SIGCHLD); | |
932 | sigaddset(&mask,SIGHUP); | |
933 | sigaddset(&mask,SIGPIPE); | |
934 | pthread_sigmask(SIG_SETMASK, &mask, &omask); | |
935 | while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) { | |
936 | redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s", | |
937 | strerror(err)); | |
938 | usleep(1000000); | |
939 | } | |
940 | pthread_sigmask(SIG_SETMASK, &omask, NULL); | |
941 | server.io_active_threads++; | |
942 | } | |
943 | ||
944 | /* We need to wait for the last thread to exit before we are able to | |
945 | * fork() in order to BGSAVE or BGREWRITEAOF. */ | |
946 | void waitEmptyIOJobsQueue(void) { | |
947 | while(1) { | |
948 | int io_processed_len; | |
949 | ||
950 | lockThreadedIO(); | |
951 | if (listLength(server.io_newjobs) == 0 && | |
952 | listLength(server.io_processing) == 0 && | |
953 | server.io_active_threads == 0) | |
954 | { | |
955 | unlockThreadedIO(); | |
956 | return; | |
957 | } | |
958 | /* While waiting for empty jobs queue condition we post-process some | |
959 | * finshed job, as I/O threads may be hanging trying to write against | |
960 | * the io_ready_pipe_write FD but there are so much pending jobs that | |
961 | * it's blocking. */ | |
962 | io_processed_len = listLength(server.io_processed); | |
963 | unlockThreadedIO(); | |
964 | if (io_processed_len) { | |
c1ae36ae | 965 | vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, |
966 | (void*)0xdeadbeef,0); | |
e2641e09 | 967 | usleep(1000); /* 1 millisecond */ |
968 | } else { | |
969 | usleep(10000); /* 10 milliseconds */ | |
970 | } | |
971 | } | |
972 | } | |
973 | ||
974 | void vmReopenSwapFile(void) { | |
975 | /* Note: we don't close the old one as we are in the child process | |
976 | * and don't want to mess at all with the original file object. */ | |
977 | server.vm_fp = fopen(server.vm_swap_file,"r+b"); | |
978 | if (server.vm_fp == NULL) { | |
979 | redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.", | |
980 | server.vm_swap_file); | |
981 | _exit(1); | |
982 | } | |
983 | server.vm_fd = fileno(server.vm_fp); | |
984 | } | |
985 | ||
986 | /* This function must be called while with threaded IO locked */ | |
987 | void queueIOJob(iojob *j) { | |
988 | redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", | |
989 | (void*)j, j->type, (char*)j->key->ptr); | |
990 | listAddNodeTail(server.io_newjobs,j); | |
991 | if (server.io_active_threads < server.vm_max_threads) | |
992 | spawnIOThread(); | |
993 | } | |
994 | ||
995 | int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { | |
996 | iojob *j; | |
997 | ||
998 | j = zmalloc(sizeof(*j)); | |
999 | j->type = REDIS_IOJOB_PREPARE_SWAP; | |
1000 | j->db = db; | |
1001 | j->key = key; | |
1002 | incrRefCount(key); | |
1003 | j->id = j->val = val; | |
1004 | incrRefCount(val); | |
1005 | j->canceled = 0; | |
1006 | j->thread = (pthread_t) -1; | |
1007 | val->storage = REDIS_VM_SWAPPING; | |
1008 | ||
1009 | lockThreadedIO(); | |
1010 | queueIOJob(j); | |
1011 | unlockThreadedIO(); | |
1012 | return REDIS_OK; | |
1013 | } | |
1014 | ||
1015 | /* ============ Virtual Memory - Blocking clients on missing keys =========== */ | |
1016 | ||
1017 | /* This function makes the clinet 'c' waiting for the key 'key' to be loaded. | |
1018 | * If there is not already a job loading the key, it is craeted. | |
1019 | * The key is added to the io_keys list in the client structure, and also | |
1020 | * in the hash table mapping swapped keys to waiting clients, that is, | |
1021 | * server.io_waited_keys. */ | |
1022 | int waitForSwappedKey(redisClient *c, robj *key) { | |
1023 | struct dictEntry *de; | |
1024 | robj *o; | |
1025 | list *l; | |
1026 | ||
1027 | /* If the key does not exist or is already in RAM we don't need to | |
1028 | * block the client at all. */ | |
1029 | de = dictFind(c->db->dict,key->ptr); | |
1030 | if (de == NULL) return 0; | |
1031 | o = dictGetEntryVal(de); | |
1032 | if (o->storage == REDIS_VM_MEMORY) { | |
1033 | return 0; | |
1034 | } else if (o->storage == REDIS_VM_SWAPPING) { | |
1035 | /* We were swapping the key, undo it! */ | |
1036 | vmCancelThreadedIOJob(o); | |
1037 | return 0; | |
1038 | } | |
1039 | ||
1040 | /* OK: the key is either swapped, or being loaded just now. */ | |
1041 | ||
1042 | /* Add the key to the list of keys this client is waiting for. | |
1043 | * This maps clients to keys they are waiting for. */ | |
1044 | listAddNodeTail(c->io_keys,key); | |
1045 | incrRefCount(key); | |
1046 | ||
1047 | /* Add the client to the swapped keys => clients waiting map. */ | |
1048 | de = dictFind(c->db->io_keys,key); | |
1049 | if (de == NULL) { | |
1050 | int retval; | |
1051 | ||
1052 | /* For every key we take a list of clients blocked for it */ | |
1053 | l = listCreate(); | |
1054 | retval = dictAdd(c->db->io_keys,key,l); | |
1055 | incrRefCount(key); | |
1056 | redisAssert(retval == DICT_OK); | |
1057 | } else { | |
1058 | l = dictGetEntryVal(de); | |
1059 | } | |
1060 | listAddNodeTail(l,c); | |
1061 | ||
1062 | /* Are we already loading the key from disk? If not create a job */ | |
1063 | if (o->storage == REDIS_VM_SWAPPED) { | |
1064 | iojob *j; | |
1065 | vmpointer *vp = (vmpointer*)o; | |
1066 | ||
1067 | o->storage = REDIS_VM_LOADING; | |
1068 | j = zmalloc(sizeof(*j)); | |
1069 | j->type = REDIS_IOJOB_LOAD; | |
1070 | j->db = c->db; | |
1071 | j->id = (robj*)vp; | |
1072 | j->key = key; | |
1073 | incrRefCount(key); | |
1074 | j->page = vp->page; | |
1075 | j->val = NULL; | |
1076 | j->canceled = 0; | |
1077 | j->thread = (pthread_t) -1; | |
1078 | lockThreadedIO(); | |
1079 | queueIOJob(j); | |
1080 | unlockThreadedIO(); | |
1081 | } | |
1082 | return 1; | |
1083 | } | |
1084 | ||
1085 | /* Preload keys for any command with first, last and step values for | |
1086 | * the command keys prototype, as defined in the command table. */ | |
1087 | void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { | |
1088 | int j, last; | |
1089 | if (cmd->vm_firstkey == 0) return; | |
1090 | last = cmd->vm_lastkey; | |
1091 | if (last < 0) last = argc+last; | |
1092 | for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { | |
1093 | redisAssert(j < argc); | |
1094 | waitForSwappedKey(c,argv[j]); | |
1095 | } | |
1096 | } | |
1097 | ||
1098 | /* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands. | |
1099 | * Note that the number of keys to preload is user-defined, so we need to | |
1100 | * apply a sanity check against argc. */ | |
1101 | void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { | |
1102 | int i, num; | |
1103 | REDIS_NOTUSED(cmd); | |
1104 | ||
1105 | num = atoi(argv[2]->ptr); | |
1106 | if (num > (argc-3)) return; | |
1107 | for (i = 0; i < num; i++) { | |
1108 | waitForSwappedKey(c,argv[3+i]); | |
1109 | } | |
1110 | } | |
1111 | ||
1112 | /* Preload keys needed to execute the entire MULTI/EXEC block. | |
1113 | * | |
1114 | * This function is called by blockClientOnSwappedKeys when EXEC is issued, | |
1115 | * and will block the client when any command requires a swapped out value. */ | |
1116 | void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { | |
1117 | int i, margc; | |
1118 | struct redisCommand *mcmd; | |
1119 | robj **margv; | |
1120 | REDIS_NOTUSED(cmd); | |
1121 | REDIS_NOTUSED(argc); | |
1122 | REDIS_NOTUSED(argv); | |
1123 | ||
1124 | if (!(c->flags & REDIS_MULTI)) return; | |
1125 | for (i = 0; i < c->mstate.count; i++) { | |
1126 | mcmd = c->mstate.commands[i].cmd; | |
1127 | margc = c->mstate.commands[i].argc; | |
1128 | margv = c->mstate.commands[i].argv; | |
1129 | ||
1130 | if (mcmd->vm_preload_proc != NULL) { | |
1131 | mcmd->vm_preload_proc(c,mcmd,margc,margv); | |
1132 | } else { | |
1133 | waitForMultipleSwappedKeys(c,mcmd,margc,margv); | |
1134 | } | |
1135 | } | |
1136 | } | |
1137 | ||
1138 | /* Is this client attempting to run a command against swapped keys? | |
1139 | * If so, block it ASAP, load the keys in background, then resume it. | |
1140 | * | |
1141 | * The important idea about this function is that it can fail! If keys will | |
1142 | * still be swapped when the client is resumed, this key lookups will | |
1143 | * just block loading keys from disk. In practical terms this should only | |
1144 | * happen with SORT BY command or if there is a bug in this function. | |
1145 | * | |
1146 | * Return 1 if the client is marked as blocked, 0 if the client can | |
1147 | * continue as the keys it is going to access appear to be in memory. */ | |
1148 | int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { | |
1149 | if (cmd->vm_preload_proc != NULL) { | |
1150 | cmd->vm_preload_proc(c,cmd,c->argc,c->argv); | |
1151 | } else { | |
1152 | waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); | |
1153 | } | |
1154 | ||
1155 | /* If the client was blocked for at least one key, mark it as blocked. */ | |
1156 | if (listLength(c->io_keys)) { | |
1157 | c->flags |= REDIS_IO_WAIT; | |
1158 | aeDeleteFileEvent(server.el,c->fd,AE_READABLE); | |
1159 | server.vm_blocked_clients++; | |
1160 | return 1; | |
1161 | } else { | |
1162 | return 0; | |
1163 | } | |
1164 | } | |
1165 | ||
1166 | /* Remove the 'key' from the list of blocked keys for a given client. | |
1167 | * | |
1168 | * The function returns 1 when there are no longer blocking keys after | |
1169 | * the current one was removed (and the client can be unblocked). */ | |
1170 | int dontWaitForSwappedKey(redisClient *c, robj *key) { | |
1171 | list *l; | |
1172 | listNode *ln; | |
1173 | listIter li; | |
1174 | struct dictEntry *de; | |
1175 | ||
c8a10631 PN |
1176 | /* The key object might be destroyed when deleted from the c->io_keys |
1177 | * list (and the "key" argument is physically the same object as the | |
1178 | * object inside the list), so we need to protect it. */ | |
1179 | incrRefCount(key); | |
1180 | ||
e2641e09 | 1181 | /* Remove the key from the list of keys this client is waiting for. */ |
1182 | listRewind(c->io_keys,&li); | |
1183 | while ((ln = listNext(&li)) != NULL) { | |
1184 | if (equalStringObjects(ln->value,key)) { | |
1185 | listDelNode(c->io_keys,ln); | |
1186 | break; | |
1187 | } | |
1188 | } | |
1189 | redisAssert(ln != NULL); | |
1190 | ||
1191 | /* Remove the client form the key => waiting clients map. */ | |
1192 | de = dictFind(c->db->io_keys,key); | |
1193 | redisAssert(de != NULL); | |
1194 | l = dictGetEntryVal(de); | |
1195 | ln = listSearchKey(l,c); | |
1196 | redisAssert(ln != NULL); | |
1197 | listDelNode(l,ln); | |
1198 | if (listLength(l) == 0) | |
1199 | dictDelete(c->db->io_keys,key); | |
1200 | ||
c8a10631 | 1201 | decrRefCount(key); |
e2641e09 | 1202 | return listLength(c->io_keys) == 0; |
1203 | } | |
1204 | ||
1205 | /* Every time we now a key was loaded back in memory, we handle clients | |
1206 | * waiting for this key if any. */ | |
1207 | void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { | |
1208 | struct dictEntry *de; | |
1209 | list *l; | |
1210 | listNode *ln; | |
1211 | int len; | |
1212 | ||
1213 | de = dictFind(db->io_keys,key); | |
1214 | if (!de) return; | |
1215 | ||
1216 | l = dictGetEntryVal(de); | |
1217 | len = listLength(l); | |
1218 | /* Note: we can't use something like while(listLength(l)) as the list | |
1219 | * can be freed by the calling function when we remove the last element. */ | |
1220 | while (len--) { | |
1221 | ln = listFirst(l); | |
1222 | redisClient *c = ln->value; | |
1223 | ||
1224 | if (dontWaitForSwappedKey(c,key)) { | |
1225 | /* Put the client in the list of clients ready to go as we | |
1226 | * loaded all the keys about it. */ | |
1227 | listAddNodeTail(server.io_ready_clients,c); | |
1228 | } | |
1229 | } | |
1230 | } |