[ VIGRA Homepage | Function Index | Class Index | Namespaces | File List | Main Page ]

threadpool.hxx VIGRA

1 /************************************************************************/
2 /* */
3 /* Copyright 2014-2015 by Thorsten Beier, Philip Schill */
4 /* and Ullrich Koethe */
5 /* */
6 /* This file is part of the VIGRA computer vision library. */
7 /* The VIGRA Website is */
8 /* http://hci.iwr.uni-heidelberg.de/vigra/ */
9 /* Please direct questions, bug reports, and contributions to */
10 /* ullrich.koethe@iwr.uni-heidelberg.de or */
11 /* vigra@informatik.uni-hamburg.de */
12 /* */
13 /* Permission is hereby granted, free of charge, to any person */
14 /* obtaining a copy of this software and associated documentation */
15 /* files (the "Software"), to deal in the Software without */
16 /* restriction, including without limitation the rights to use, */
17 /* copy, modify, merge, publish, distribute, sublicense, and/or */
18 /* sell copies of the Software, and to permit persons to whom the */
19 /* Software is furnished to do so, subject to the following */
20 /* conditions: */
21 /* */
22 /* The above copyright notice and this permission notice shall be */
23 /* included in all copies or substantial portions of the */
24 /* Software. */
25 /* */
26 /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
27 /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
28 /* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
29 /* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
30 /* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
31 /* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
32 /* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
33 /* OTHER DEALINGS IN THE SOFTWARE. */
34 /* */
35 /************************************************************************/
36 #ifndef VIGRA_THREADPOOL_HXX
37 #define VIGRA_THREADPOOL_HXX
38 
39 #include <vector>
40 #include <queue>
41 #include <stdexcept>
42 #include <cmath>
43 #include "mathutil.hxx"
44 #include "counting_iterator.hxx"
45 #include "threading.hxx"
46 
47 
48 namespace vigra
49 {
50 
51 /** \addtogroup ParallelProcessing
52 */
53 
54 //@{
55 
56  /**\brief Option base class for parallel algorithms.
57 
58  <b>\#include</b> <vigra/threadpool.hxx><br>
59  Namespace: vigra
60  */
62 {
63  public:
64 
65  /** Constants for special settings.
66  */
67  enum {
68  Auto = -1, ///< Determine number of threads automatically (from <tt>threading::thread::hardware_concurrency()</tt>)
69  Nice = -2, ///< Use half as many threads as <tt>Auto</tt> would.
70  NoThreads = 0 ///< Switch off multi-threading (i.e. execute tasks sequentially)
71  };
72 
74  : numThreads_(actualNumThreads(Auto))
75  {}
76 
77  /** \brief Get desired number of threads.
78 
79  <b>Note:</b> This function may return 0, which means that multi-threading
80  shall be switched off entirely. If an algorithm receives this value,
81  it should revert to a sequential implementation. In contrast, if
82  <tt>numThread() == 1</tt>, the parallel algorithm version shall be
83  executed with a single thread.
84  */
85  int getNumThreads() const
86  {
87  return numThreads_;
88  }
89 
90  /** \brief Get desired number of threads.
91 
92  In contrast to <tt>numThread()</tt>, this will always return a value <tt>>=1</tt>.
93  */
94  int getActualNumThreads() const
95  {
96  return std::max(1,numThreads_);
97  }
98 
99  /** \brief Set the number of threads or one of the constants <tt>Auto</tt>,
100  <tt>Nice</tt> and <tt>NoThreads</tt>.
101 
102  Default: <tt>ParallelOptions::Auto</tt> (use system default)
103 
104  This setting is ignored if the preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>
105  is defined. Then, the number of threads is set to 0 and all tasks revert to
106  sequential algorithm implementations. The same can be achieved at runtime
107  by passing <tt>n = 0</tt> to this function. In contrast, passing <tt>n = 1</tt>
108  causes the parallel algorithm versions to be executed with a single thread.
109  Both possibilities are mainly useful for debugging.
110  */
112  {
113  numThreads_ = actualNumThreads(n);
114  return *this;
115  }
116 
117 
118  private:
119  // helper function to compute the actual number of threads
120  static size_t actualNumThreads(const int userNThreads)
121  {
122  #ifdef VIGRA_SINGLE_THREADED
123  return 0;
124  #else
125  return userNThreads >= 0
126  ? userNThreads
127  : userNThreads == Nice
128  ? threading::thread::hardware_concurrency() / 2
129  : threading::thread::hardware_concurrency();
130  #endif
131  }
132 
133  int numThreads_;
134 };
135 
136 /********************************************************/
137 /* */
138 /* ThreadPool */
139 /* */
140 /********************************************************/
141 
142  /**\brief Thread pool class to manage a set of parallel workers.
143 
144  <b>\#include</b> <vigra/threadpool.hxx><br>
145  Namespace: vigra
146  */
148 {
149  public:
150 
151  /** Create a thread pool from ParallelOptions. The constructor just launches
152  the desired number of workers. If the number of threads is zero,
153  no workers are started, and all tasks will be executed in synchronously
154  in the present thread.
155  */
156  ThreadPool(const ParallelOptions & options)
157  : stop(false)
158  {
159  init(options);
160  }
161 
162  /** Create a thread pool with n threads. The constructor just launches
163  the desired number of workers. If \arg n is <tt>ParallelOptions::Auto</tt>,
164  the number of threads is determined by <tt>threading::thread::hardware_concurrency()</tt>.
165  <tt>ParallelOptions::Nice</tt> will create half as many threads.
166  If <tt>n = 0</tt>, no workers are started, and all tasks will be executed
167  synchronously in the present thread. If the preprocessor flag
168  <tt>VIGRA_SINGLE_THREADED</tt> is defined, the number of threads is always set
169  to zero (i.e. synchronous execution), regardless of the value of \arg n. This
170  is useful for debugging.
171  */
172  ThreadPool(const int n)
173  : stop(false)
174  {
175  init(ParallelOptions().numThreads(n));
176  }
177 
178  /**
179  * The destructor joins all threads.
180  */
181  ~ThreadPool();
182 
183  /**
184  * Enqueue a task that will be executed by the thread pool.
185  * The task result can be obtained using the get() function of the returned future.
186  * If the task throws an exception, it will be raised on the call to get().
187  */
188  template<class F>
189  auto enqueueReturning(F&& f) -> threading::future<decltype(f(0))>;
190 
191  /**
192  * Enqueue function for tasks without return value.
193  * This is a special case of the enqueueReturning template function, but
194  * some compilers fail on <tt>std::result_of<F(int)>::type</tt> for void(int) functions.
195  */
196  template<class F>
197  threading::future<void> enqueue(F&& f) ;
198 
199  /**
200  * Block until all tasks are finished.
201  */
203  {
204  threading::unique_lock<threading::mutex> lock(queue_mutex);
205  finish_condition.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
206  }
207 
208  /**
209  * Return the number of worker threads.
210  */
211  size_t nThreads() const
212  {
213  return workers.size();
214  }
215 
216 private:
217 
218  // helper function to init the thread pool
219  void init(const ParallelOptions & options);
220 
221  // need to keep track of threads so we can join them
222  std::vector<threading::thread> workers;
223 
224  // the task queue
225  std::queue<std::function<void(int)> > tasks;
226 
227  // synchronization
228  threading::mutex queue_mutex;
229  threading::condition_variable worker_condition;
230  threading::condition_variable finish_condition;
231  bool stop;
232  threading::atomic_long busy, processed;
233 };
234 
235 inline void ThreadPool::init(const ParallelOptions & options)
236 {
237  busy.store(0);
238  processed.store(0);
239 
240  const size_t actualNThreads = options.getNumThreads();
241  for(size_t ti = 0; ti<actualNThreads; ++ti)
242  {
243  workers.emplace_back(
244  [ti,this]
245  {
246  for(;;)
247  {
248  std::function<void(int)> task;
249  {
250  threading::unique_lock<threading::mutex> lock(this->queue_mutex);
251 
252  // will wait if : stop == false AND queue is empty
253  // if stop == true AND queue is empty thread function will return later
254  //
255  // so the idea of this wait, is : If where are not in the destructor
256  // (which sets stop to true, we wait here for new jobs)
257  this->worker_condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
258  if(!this->tasks.empty())
259  {
260  ++busy;
261  task = std::move(this->tasks.front());
262  this->tasks.pop();
263  lock.unlock();
264  task(ti);
265  ++processed;
266  --busy;
267  finish_condition.notify_one();
268  }
269  else if(stop)
270  {
271  return;
272  }
273  }
274  }
275  }
276  );
277  }
278 }
279 
281 {
282  {
283  threading::unique_lock<threading::mutex> lock(queue_mutex);
284  stop = true;
285  }
286  worker_condition.notify_all();
287  for(threading::thread &worker: workers)
288  worker.join();
289 }
290 
291 template<class F>
292 inline auto
293 ThreadPool::enqueueReturning(F&& f) -> threading::future<decltype(f(0))>
294 {
295  typedef decltype(f(0)) result_type;
296  typedef threading::packaged_task<result_type(int)> PackageType;
297 
298  auto task = std::make_shared<PackageType>(f);
299  auto res = task->get_future();
300 
301  if(workers.size()>0){
302  {
303  threading::unique_lock<threading::mutex> lock(queue_mutex);
304 
305  // don't allow enqueueing after stopping the pool
306  if(stop)
307  throw std::runtime_error("enqueue on stopped ThreadPool");
308 
309  tasks.emplace(
310  [task](int tid)
311  {
312  (*task)(std::move(tid));
313  }
314  );
315  }
316  worker_condition.notify_one();
317  }
318  else{
319  (*task)(0);
320  }
321 
322  return res;
323 }
324 
325 template<class F>
326 inline threading::future<void>
328 {
329 #if defined(USE_BOOST_THREAD) && \
330  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
331  // Without variadic templates, boost:thread::packaged_task only
332  // supports the signature 'R()' (functions with no arguments).
333  // We bind the thread_id parameter to 0, so this parameter
334  // must NOT be used in function f (fortunately, this is the case
335  // for the blockwise versions of convolution, labeling and
336  // watersheds).
337  typedef threading::packaged_task<void()> PackageType;
338  auto task = std::make_shared<PackageType>(std::bind(f, 0));
339 #else
340  typedef threading::packaged_task<void(int)> PackageType;
341  auto task = std::make_shared<PackageType>(f);
342 #endif
343 
344  auto res = task->get_future();
345  if(workers.size()>0){
346  {
347  threading::unique_lock<threading::mutex> lock(queue_mutex);
348 
349  // don't allow enqueueing after stopping the pool
350  if(stop)
351  throw std::runtime_error("enqueue on stopped ThreadPool");
352 
353  tasks.emplace(
354  [task](int tid)
355  {
356 #if defined(USE_BOOST_THREAD) && \
357  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
358  (*task)();
359 #else
360  (*task)(std::move(tid));
361 #endif
362  }
363  );
364  }
365  worker_condition.notify_one();
366  }
367  else{
368 #if defined(USE_BOOST_THREAD) && \
369  !defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
370  (*task)();
371 #else
372  (*task)(0);
373 #endif
374  }
375  return res;
376 }
377 
378 /********************************************************/
379 /* */
380 /* parallel_foreach */
381 /* */
382 /********************************************************/
383 
384 // nItems must be either zero or std::distance(iter, end).
385 // NOTE: the redundancy of nItems and iter,end here is due to the fact that, for forward iterators,
386 // computing the distance from iterators is costly, and, for input iterators, we might not know in advance
387 // how many items there are (e.g., stream iterators).
388 template<class ITER, class F>
389 inline void parallel_foreach_impl(
390  ThreadPool & pool,
391  const std::ptrdiff_t nItems,
392  ITER iter,
393  ITER end,
394  F && f,
395  std::random_access_iterator_tag
396 ){
397  std::ptrdiff_t workload = std::distance(iter, end);
398  vigra_precondition(workload == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
399  const float workPerThread = float(workload)/pool.nThreads();
400  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
401 
402  std::vector<threading::future<void> > futures;
403  for( ;iter<end; iter+=chunkedWorkPerThread)
404  {
405  const size_t lc = std::min(workload, chunkedWorkPerThread);
406  workload-=lc;
407  futures.emplace_back(
408  pool.enqueue(
409  [&f, iter, lc]
410  (int id)
411  {
412  for(size_t i=0; i<lc; ++i)
413  f(id, iter[i]);
414  }
415  )
416  );
417  }
418  for (auto & fut : futures)
419  {
420  fut.get();
421  }
422 }
423 
424 
425 
426 // nItems must be either zero or std::distance(iter, end).
427 template<class ITER, class F>
428 inline void parallel_foreach_impl(
429  ThreadPool & pool,
430  const std::ptrdiff_t nItems,
431  ITER iter,
432  ITER end,
433  F && f,
434  std::forward_iterator_tag
435 ){
436  if (nItems == 0)
437  nItems = std::distance(iter, end);
438 
439  std::ptrdiff_t workload = nItems;
440  const float workPerThread = float(workload)/pool.nThreads();
441  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
442 
443  std::vector<threading::future<void> > futures;
444  for(;;)
445  {
446  const size_t lc = std::min(chunkedWorkPerThread, workload);
447  workload -= lc;
448  futures.emplace_back(
449  pool.enqueue(
450  [&f, iter, lc]
451  (int id)
452  {
453  auto iterCopy = iter;
454  for(size_t i=0; i<lc; ++i){
455  f(id, *iterCopy);
456  ++iterCopy;
457  }
458  }
459  )
460  );
461  for (size_t i = 0; i < lc; ++i)
462  {
463  ++iter;
464  if (iter == end)
465  {
466  vigra_postcondition(workload == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
467  break;
468  }
469  }
470  if(workload==0)
471  break;
472  }
473  for (auto & fut : futures)
474  fut.get();
475 }
476 
477 
478 
479 // nItems must be either zero or std::distance(iter, end).
480 template<class ITER, class F>
481 inline void parallel_foreach_impl(
482  ThreadPool & pool,
483  const std::ptrdiff_t nItems,
484  ITER iter,
485  ITER end,
486  F && f,
487  std::input_iterator_tag
488 ){
489  std::ptrdiff_t num_items = 0;
490  std::vector<threading::future<void> > futures;
491  for (; iter != end; ++iter)
492  {
493  auto item = *iter;
494  futures.emplace_back(
495  pool.enqueue(
496  [&f, &item](int id){
497  f(id, item);
498  }
499  )
500  );
501  ++num_items;
502  }
503  vigra_postcondition(num_items == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
504  for (auto & fut : futures)
505  fut.get();
506 }
507 
508 // Runs foreach on a single thread.
509 // Used for API compatibility when the numbe of threads is 0.
510 template<class ITER, class F>
511 inline void parallel_foreach_single_thread(
512  ITER begin,
513  ITER end,
514  F && f,
515  const std::ptrdiff_t nItems = 0
516 ){
517  std::ptrdiff_t n = 0;
518  for (; begin != end; ++begin)
519  {
520  f(0, *begin);
521  ++n;
522  }
523  vigra_postcondition(n == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
524 }
525 
526 /** \brief Apply a functor to all items in a range in parallel.
527 
528  <b> Declarations:</b>
529 
530  \code
531  namespace vigra {
532  // pass the desired number of threads or ParallelOptions::Auto
533  // (creates an internal thread pool accordingly)
534  template<class ITER, class F>
535  void parallel_foreach(int64_t nThreads,
536  ITER begin, ITER end,
537  F && f,
538  const uint64_t nItems = 0);
539 
540  // use an existing thread pool
541  template<class ITER, class F>
542  void parallel_foreach(ThreadPool & pool,
543  ITER begin, ITER end,
544  F && f,
545  const uint64_t nItems = 0);
546 
547  // pass the integers from 0 ... (nItems-1) to the functor f,
548  // using the given number of threads or ParallelOptions::Auto
549  template<class F>
550  void parallel_foreach(int64_t nThreads,
551  uint64_t nItems,
552  F && f);
553 
554  // likewise with an existing thread pool
555  template<class F>
556  void parallel_foreach(ThreadPool & threadpool,
557  uint64_t nItems,
558  F && f);
559  }
560  \endcode
561 
562  Create a thread pool (or use an existing one) to apply the functor \arg f
563  to all items in the range <tt>[begin, end)</tt> in parallel. \arg f must
564  be callable with two arguments of type <tt>size_t</tt> and <tt>T</tt>, where
565  the first argument is the thread index (starting at 0) and T is convertible
566  from the iterator's <tt>reference_type</tt> (i.e. the result of <tt>*begin</tt>).
567 
568  If the iterators are forward iterators (<tt>std::forward_iterator_tag</tt>), you
569  can provide the optional argument <tt>nItems</tt> to avoid the a
570  <tt>std::distance(begin, end)</tt> call to compute the range's length.
571 
572  Parameter <tt>nThreads</tt> controls the number of threads. <tt>parallel_foreach</tt>
573  will split the work into about three times as many parallel tasks.
574  If <tt>nThreads = ParallelOptions::Auto</tt>, the number of threads is set to
575  the machine default (<tt>std::thread::hardware_concurrency()</tt>).
576 
577  If <tt>nThreads = 0</tt>, the function will not use threads,
578  but will call the functor sequentially. This can also be enforced by setting the
579  preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>, ignoring the value of
580  <tt>nThreads</tt> (useful for debugging).
581 
582  <b>Usage:</b>
583 
584  \code
585  #include <iostream>
586  #include <algorithm>
587  #include <vector>
588  #include <vigra/threadpool.hxx>
589 
590  using namespace std;
591  using namespace vigra;
592 
593  int main()
594  {
595  size_t const n_threads = 4;
596  size_t const n = 2000;
597  vector<int> input(n);
598 
599  auto iter = input.begin(),
600  end = input.end();
601 
602  // fill input with 0, 1, 2, ...
603  iota(iter, end, 0);
604 
605  // compute the sum of the elements in the input vector.
606  // (each thread computes the partial sum of the items it sees
607  // and stores the sum at the appropriate index of 'results')
608  vector<int> results(n_threads, 0);
609  parallel_foreach(n_threads, iter, end,
610  // the functor to be executed, defined as a lambda function
611  // (first argument: thread ID, second argument: result of *iter)
612  [&results](size_t thread_id, int items)
613  {
614  results[thread_id] += items;
615  }
616  );
617 
618  // collect the partial sums of all threads
619  int sum = accumulate(results.begin(), results.end(), 0);
620 
621  cout << "The sum " << sum << " should be equal to " << (n*(n-1))/2 << endl;
622  }
623  \endcode
624  */
625 doxygen_overloaded_function(template <...> void parallel_foreach)
626 
627 template<class ITER, class F>
628 inline void parallel_foreach(
629  ThreadPool & pool,
630  ITER begin,
631  ITER end,
632  F && f,
633  const std::ptrdiff_t nItems = 0)
634 {
635  if(pool.nThreads()>1)
636  {
637  parallel_foreach_impl(pool,nItems, begin, end, f,
638  typename std::iterator_traits<ITER>::iterator_category());
639  }
640  else
641  {
642  parallel_foreach_single_thread(begin, end, f, nItems);
643  }
644 }
645 
646 template<class ITER, class F>
647 inline void parallel_foreach(
648  int64_t nThreads,
649  ITER begin,
650  ITER end,
651  F && f,
652  const std::ptrdiff_t nItems = 0)
653 {
654 
655  ThreadPool pool(nThreads);
656  parallel_foreach(pool, begin, end, f, nItems);
657 }
658 
659 template<class F>
660 inline void parallel_foreach(
661  int64_t nThreads,
662  std::ptrdiff_t nItems,
663  F && f)
664 {
665  auto iter = range(nItems);
666  parallel_foreach(nThreads, iter, iter.end(), f, nItems);
667 }
668 
669 
670 template<class F>
671 inline void parallel_foreach(
672  ThreadPool & threadpool,
673  std::ptrdiff_t nItems,
674  F && f)
675 {
676  auto iter = range(nItems);
677  parallel_foreach(threadpool, iter, iter.end(), f, nItems);
678 }
679 
680 //@}
681 
682 } // namespace vigra
683 
684 #endif // VIGRA_THREADPOOL_HXX
Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:85
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:111
Determine number of threads automatically (from threading::thread::hardware_concurrency()) ...
Definition: threadpool.hxx:68
~ThreadPool()
Definition: threadpool.hxx:280
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:147
auto enqueueReturning(F &&f) -> threading::future< decltype(f(0))>
Definition: threadpool.hxx:293
size_t nThreads() const
Definition: threadpool.hxx:211
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.
Option base class for parallel algorithms.
Definition: threadpool.hxx:61
void waitFinished()
Definition: threadpool.hxx:202
threading::future< void > enqueue(F &&f)
Definition: threadpool.hxx:327
Use half as many threads as Auto would.
Definition: threadpool.hxx:69
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:70
ThreadPool(const int n)
Definition: threadpool.hxx:172
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:94
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:156

© Ullrich Köthe (ullrich.koethe@iwr.uni-heidelberg.de)
Heidelberg Collaboratory for Image Processing, University of Heidelberg, Germany

html generated using doxygen and Python
vigra 1.11.1 (Fri May 19 2017)