36 #ifndef VIGRA_THREADPOOL_HXX
37 #define VIGRA_THREADPOOL_HXX
43 #include "mathutil.hxx"
44 #include "counting_iterator.hxx"
45 #include "threading.hxx"
74 : numThreads_(actualNumThreads(
Auto))
96 return std::max(1,numThreads_);
113 numThreads_ = actualNumThreads(n);
120 static size_t actualNumThreads(
const int userNThreads)
122 #ifdef VIGRA_SINGLE_THREADED
125 return userNThreads >= 0
127 : userNThreads ==
Nice
128 ? threading::thread::hardware_concurrency() / 2
129 : threading::thread::hardware_concurrency();
197 threading::future<void>
enqueue(F&& f) ;
204 threading::unique_lock<threading::mutex> lock(queue_mutex);
205 finish_condition.wait(lock, [
this](){
return tasks.empty() && (busy == 0); });
213 return workers.size();
222 std::vector<threading::thread> workers;
225 std::queue<std::function<void(int)> > tasks;
228 threading::mutex queue_mutex;
229 threading::condition_variable worker_condition;
230 threading::condition_variable finish_condition;
232 threading::atomic_long busy, processed;
235 inline void ThreadPool::init(
const ParallelOptions & options)
240 const size_t actualNThreads = options.getNumThreads();
241 for(
size_t ti = 0; ti<actualNThreads; ++ti)
243 workers.emplace_back(
248 std::function<void(int)> task;
250 threading::unique_lock<threading::mutex> lock(this->queue_mutex);
257 this->worker_condition.wait(lock, [
this]{
return this->stop || !this->tasks.empty(); });
258 if(!this->tasks.empty())
261 task = std::move(this->tasks.front());
267 finish_condition.notify_one();
283 threading::unique_lock<threading::mutex> lock(queue_mutex);
286 worker_condition.notify_all();
287 for(threading::thread &worker: workers)
295 typedef decltype(f(0)) result_type;
296 typedef threading::packaged_task<result_type(int)> PackageType;
298 auto task = std::make_shared<PackageType>(f);
299 auto res = task->get_future();
301 if(workers.size()>0){
303 threading::unique_lock<threading::mutex> lock(queue_mutex);
307 throw std::runtime_error(
"enqueue on stopped ThreadPool");
312 (*task)(std::move(tid));
316 worker_condition.notify_one();
326 inline threading::future<void>
329 #if defined(USE_BOOST_THREAD) && \
330 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
337 typedef threading::packaged_task<void()> PackageType;
338 auto task = std::make_shared<PackageType>(std::bind(f, 0));
340 typedef threading::packaged_task<void(int)> PackageType;
341 auto task = std::make_shared<PackageType>(f);
344 auto res = task->get_future();
345 if(workers.size()>0){
347 threading::unique_lock<threading::mutex> lock(queue_mutex);
351 throw std::runtime_error(
"enqueue on stopped ThreadPool");
356 #if defined(USE_BOOST_THREAD) && \
357 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
360 (*task)(std::move(tid));
365 worker_condition.notify_one();
368 #if defined(USE_BOOST_THREAD) && \
369 !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
388 template<
class ITER,
class F>
389 inline void parallel_foreach_impl(
391 const std::ptrdiff_t nItems,
395 std::random_access_iterator_tag
397 std::ptrdiff_t workload = std::distance(iter, end);
398 vigra_precondition(workload == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
399 const float workPerThread = float(workload)/pool.
nThreads();
400 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(
roundi(workPerThread/3.0), 1);
402 std::vector<threading::future<void> > futures;
403 for( ;iter<end; iter+=chunkedWorkPerThread)
405 const size_t lc = std::min(workload, chunkedWorkPerThread);
407 futures.emplace_back(
412 for(
size_t i=0; i<lc; ++i)
418 for (
auto & fut : futures)
427 template<
class ITER,
class F>
428 inline void parallel_foreach_impl(
430 const std::ptrdiff_t nItems,
434 std::forward_iterator_tag
437 nItems = std::distance(iter, end);
439 std::ptrdiff_t workload = nItems;
440 const float workPerThread = float(workload)/pool.nThreads();
441 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(
roundi(workPerThread/3.0), 1);
443 std::vector<threading::future<void> > futures;
446 const size_t lc = std::min(chunkedWorkPerThread, workload);
448 futures.emplace_back(
453 auto iterCopy = iter;
454 for(
size_t i=0; i<lc; ++i){
461 for (
size_t i = 0; i < lc; ++i)
466 vigra_postcondition(workload == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
473 for (
auto & fut : futures)
480 template<
class ITER,
class F>
481 inline void parallel_foreach_impl(
483 const std::ptrdiff_t nItems,
487 std::input_iterator_tag
489 std::ptrdiff_t num_items = 0;
490 std::vector<threading::future<void> > futures;
491 for (; iter != end; ++iter)
494 futures.emplace_back(
503 vigra_postcondition(num_items == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
504 for (
auto & fut : futures)
510 template<
class ITER,
class F>
511 inline void parallel_foreach_single_thread(
515 const std::ptrdiff_t nItems = 0
517 std::ptrdiff_t n = 0;
518 for (; begin != end; ++begin)
523 vigra_postcondition(n == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
625 doxygen_overloaded_function(template <...>
void parallel_foreach)
627 template<
class ITER,
class F>
633 const std::ptrdiff_t nItems = 0)
635 if(pool.nThreads()>1)
637 parallel_foreach_impl(pool,nItems, begin, end, f,
638 typename std::iterator_traits<ITER>::iterator_category());
642 parallel_foreach_single_thread(begin, end, f, nItems);
646 template<
class ITER,
class F>
652 const std::ptrdiff_t nItems = 0)
655 ThreadPool pool(nThreads);
662 std::ptrdiff_t nItems,
665 auto iter = range(nItems);
672 ThreadPool & threadpool,
673 std::ptrdiff_t nItems,
676 auto iter = range(nItems);
684 #endif // VIGRA_THREADPOOL_HXX
Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:85
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:111
Determine number of threads automatically (from threading::thread::hardware_concurrency()) ...
Definition: threadpool.hxx:68
~ThreadPool()
Definition: threadpool.hxx:280
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:147
auto enqueueReturning(F &&f) -> threading::future< decltype(f(0))>
Definition: threadpool.hxx:293
size_t nThreads() const
Definition: threadpool.hxx:211
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.
Option base class for parallel algorithms.
Definition: threadpool.hxx:61
void waitFinished()
Definition: threadpool.hxx:202
threading::future< void > enqueue(F &&f)
Definition: threadpool.hxx:327
Use half as many threads as Auto would.
Definition: threadpool.hxx:69
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:70
ThreadPool(const int n)
Definition: threadpool.hxx:172
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:94
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:156