alpaka
Abstraction Library for Parallel Kernel Acceleration
ThreadPool.hpp
Go to the documentation of this file.
1 /* Copyright 2023 Benjamin Worpitz, RenĂ© Widera, Jan Stephan, Bernhard Manfred Gruber, Jeffrey Kelling
2  * SPDX-License-Identifier: MPL-2.0
3  */
4 
5 #pragma once
6 
7 #include "alpaka/core/Common.hpp"
8 
9 #include <atomic>
10 #include <future>
11 #include <mutex>
12 #include <optional>
13 #include <queue>
14 #include <vector>
15 
16 namespace alpaka::core::detail
17 {
18  //! A thread pool yielding when there is not enough work to be done.
19  struct ThreadPool final
20  {
21  using Task = std::packaged_task<void()>;
22 
23  //! Creates a thread pool with a given thread count
24  explicit ThreadPool(std::size_t threadCount)
25  {
26  if(threadCount < 1)
27  throw std::invalid_argument("The argument 'threadCount' has to be greate or equal to one!");
28  m_threads.reserve(threadCount);
29  for(std::size_t i = 0; i < threadCount; ++i)
30  m_threads.emplace_back([this] { threadFunc(); });
31  }
32 
33  //! Destroys the thread pool, blocking until all enqueued work is done.
35  {
36  m_stop = true; // Signal that concurrent executors should not perform any new work
37  for(auto& t : m_threads)
38  {
39  if(std::this_thread::get_id() == t.get_id())
40  {
41  std::cerr << "ERROR in ThreadPool joins itself" << std::endl;
42  std::abort();
43  }
44  t.join();
45  }
46  }
47 
48  //! Runs the given function on one of the pool in First In First Out (FIFO) order.
49  //!
50  //! \param task Function object to be called on the pool. Takes an arbitrary number of arguments. Must return
51  //! void.
52  //! \param args Arguments for task, cannot be moved. If such parameters must be used, use a lambda and capture
53  //! via move then move the lambda.
54  //! \return A future to the created task.
55  template<typename TFnObj, typename... TArgs>
56  auto enqueueTask(TFnObj&& task, TArgs&&... args) -> std::future<void>
57  {
58 #if BOOST_COMP_MSVC
59 // MSVC 14.39.33519 is throwing an error because the noexcept type deduction is not defined in original C++17
60 // error C2065: 'task': undeclared identifier
61 // see: https://stackoverflow.com/a/72467726
62 # define ALPAKA_NOEXCEPT(...)
63 #else
64 # define ALPAKA_NOEXCEPT(...) noexcept(__VA_ARGS__)
65 #endif
66  auto ptask
67  = Task{[=, t = std::forward<TFnObj>(task)]() ALPAKA_NOEXCEPT(noexcept(task(args...))) { t(args...); }};
68 #undef ALPAKA_NOEXCEPT
69 
70  auto future = ptask.get_future();
71  {
72  std::lock_guard<std::mutex> lock{m_mutex};
73  m_tasks.push(std::move(ptask));
74  }
75  return future;
76  }
77 
78  private:
79  void threadFunc()
80  {
81  while(!m_stop.load(std::memory_order_relaxed))
82  {
83  std::optional<Task> task;
84  {
85  std::lock_guard<std::mutex> lock{m_mutex};
86  if(!m_tasks.empty())
87  {
88  task = std::move(m_tasks.front());
89  m_tasks.pop();
90  }
91  }
92  if(task)
93  (*task)();
94  else
95  std::this_thread::yield();
96  }
97  }
98 
99  std::vector<std::thread> m_threads;
100  std::queue<Task> m_tasks; // TODO(bgruber): we could consider a lock-free queue here
101  std::mutex m_mutex;
102  std::atomic<bool> m_stop = false;
103  };
104 } // namespace alpaka::core::detail
#define ALPAKA_NOEXCEPT(...)
Defines implementation details that should not be used directly by the user.
Definition: Align.hpp:21
A thread pool yielding when there is not enough work to be done.
Definition: ThreadPool.hpp:20
std::packaged_task< void()> Task
Definition: ThreadPool.hpp:21
ThreadPool(std::size_t threadCount)
Creates a thread pool with a given thread count.
Definition: ThreadPool.hpp:24
~ThreadPool()
Destroys the thread pool, blocking until all enqueued work is done.
Definition: ThreadPool.hpp:34
auto enqueueTask(TFnObj &&task, TArgs &&... args) -> std::future< void >
Runs the given function on one of the pool in First In First Out (FIFO) order.
Definition: ThreadPool.hpp:56