2  * Copyright (c) 2008-2009 Apple Inc. All rights reserved. 
   4  * @APPLE_APACHE_LICENSE_HEADER_START@ 
   6  * Licensed under the Apache License, Version 2.0 (the "License"); 
   7  * you may not use this file except in compliance with the License. 
   8  * You may obtain a copy of the License at 
  10  *     http://www.apache.org/licenses/LICENSE-2.0 
  12  * Unless required by applicable law or agreed to in writing, software 
  13  * distributed under the License is distributed on an "AS IS" BASIS, 
  14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15  * See the License for the specific language governing permissions and 
  16  * limitations under the License. 
  18  * @APPLE_APACHE_LICENSE_HEADER_END@ 
  22 // We'd use __attribute__((aligned(x))), but it does not atually increase the 
  23 // alignment of stack variables. All we really need is the stack usage of the 
  24 // local thread to be sufficiently away to avoid cache-line contention with the 
  25 // busy 'da_index' variable. 
  27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic  
  28 struct dispatch_apply_s 
{ 
  29         long    _da_pad0
[DISPATCH_CACHELINE_SIZE 
/ sizeof(long)]; 
  30         void    (*da_func
)(void *, size_t); 
  35         dispatch_semaphore_t da_sema
; 
  36         long    _da_pad1
[DISPATCH_CACHELINE_SIZE 
/ sizeof(long)]; 
  40 _dispatch_apply2(void *_ctxt
) 
  42         struct dispatch_apply_s 
*da 
= _ctxt
; 
  43         size_t const iter 
= da
->da_iterations
; 
  44         typeof(da
->da_func
) const func 
= da
->da_func
; 
  45         void *const ctxt 
= da
->da_ctxt
; 
  48         _dispatch_workitem_dec(); // this unit executes many items 
  50         // Striding is the responsibility of the caller. 
  51         while (fastpath((idx 
= dispatch_atomic_inc(&da
->da_index
) - 1) < iter
)) { 
  53                 _dispatch_workitem_inc(); 
  56         if (dispatch_atomic_dec(&da
->da_thr_cnt
) == 0) { 
  57                 dispatch_semaphore_signal(da
->da_sema
); 
  62 _dispatch_apply_serial(void *context
) 
  64         struct dispatch_apply_s 
*da 
= context
; 
  67         _dispatch_workitem_dec(); // this unit executes many items 
  69                 da
->da_func(da
->da_ctxt
, idx
); 
  70                 _dispatch_workitem_inc(); 
  71         } while (++idx 
< da
->da_iterations
); 
  75 #if DISPATCH_COCOA_COMPAT 
  78 _dispatch_apply_slow(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t)) 
  80         struct Block_basic 
*bb 
= (void *)_dispatch_Block_copy((void *)work
); 
  81         dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
); 
  87 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t)) 
  89 #if DISPATCH_COCOA_COMPAT 
  90         // Under GC, blocks transferred to other threads must be Block_copy()ed 
  91         // rdar://problem/7455071 
  92         if (dispatch_begin_thread_4GC
) { 
  93                 return _dispatch_apply_slow(iterations
, dq
, work
); 
  96         struct Block_basic 
*bb 
= (void *)work
; 
  97         dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
); 
 101 // 256 threads should be good enough for the short to mid term 
 102 #define DISPATCH_APPLY_MAX_CPUS 256 
 106 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t)) 
 108         struct dispatch_apply_dc_s 
{ 
 109                 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s
); 
 110         } da_dc
[DISPATCH_APPLY_MAX_CPUS
]; 
 111         struct dispatch_apply_s da
; 
 116         da
.da_iterations 
= iterations
; 
 118         da
.da_thr_cnt 
= _dispatch_hw_config
.cc_max_active
; 
 120         if (da
.da_thr_cnt 
> DISPATCH_APPLY_MAX_CPUS
) { 
 121                 da
.da_thr_cnt 
= DISPATCH_APPLY_MAX_CPUS
; 
 123         if (slowpath(iterations 
== 0)) { 
 126         if (iterations 
< da
.da_thr_cnt
) { 
 127                 da
.da_thr_cnt 
= (uint32_t)iterations
; 
 129         if (slowpath(dq
->dq_width 
<= 2 || da
.da_thr_cnt 
<= 1)) { 
 130                 return dispatch_sync_f(dq
, &da
, _dispatch_apply_serial
); 
 133         for (i 
= 0; i 
< da
.da_thr_cnt
; i
++) { 
 134                 da_dc
[i
].do_vtable 
= NULL
; 
 135                 da_dc
[i
].do_next 
= &da_dc
[i 
+ 1]; 
 136                 da_dc
[i
].dc_func 
= _dispatch_apply2
; 
 137                 da_dc
[i
].dc_ctxt 
= &da
; 
 140         da
.da_sema 
= _dispatch_get_thread_semaphore(); 
 142         // some queues are easy to borrow and some are not 
 143         if (slowpath(dq
->do_targetq
)) { 
 144                 _dispatch_queue_push_list(dq
, (void *)&da_dc
[0], (void *)&da_dc
[da
.da_thr_cnt 
- 1]); 
 146                 dispatch_queue_t old_dq 
= _dispatch_thread_getspecific(dispatch_queue_key
); 
 147                 // root queues are always concurrent and safe to borrow 
 148                 _dispatch_queue_push_list(dq
, (void *)&da_dc
[1], (void *)&da_dc
[da
.da_thr_cnt 
- 1]); 
 149                 _dispatch_thread_setspecific(dispatch_queue_key
, dq
); 
 150                 // The first da_dc[] element was explicitly not pushed on to the queue. 
 151                 // We need to either call it like so: 
 152                 //     da_dc[0].dc_func(da_dc[0].dc_ctxt); 
 153                 // Or, given that we know the 'func' and 'ctxt', we can call it directly: 
 154                 _dispatch_apply2(&da
); 
 155                 _dispatch_workitem_inc(); 
 156                 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
); 
 158         dispatch_semaphore_wait(da
.da_sema
, DISPATCH_TIME_FOREVER
); 
 159         _dispatch_put_thread_semaphore(da
.da_sema
); 
 165 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t)) 
 167         struct Block_basic 
*bb 
= (void *)work
; 
 168         dispatch_stride_f(offset
, stride
, iterations
, dq
, bb
, (void *)bb
->Block_invoke
); 
 174 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
, 
 175                 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t)) 
 180         dispatch_apply(iterations 
/ stride
, queue
, ^(size_t idx
) { 
 181                 size_t i 
= idx 
* stride 
+ offset
; 
 182                 size_t stop 
= i 
+ stride
; 
 188         dispatch_sync(queue
, ^{ 
 190                 for (i 
= iterations 
- (iterations 
% stride
); i 
< iterations
; i
++) { 
 191                         func(ctxt
, i 
+ offset
);