alpaka
Abstraction Library for Parallel Kernel Acceleration
ThreadPool.hpp
Go to the documentation of this file.
1 /* Copyright 2023 Benjamin Worpitz, RenĂ© Widera, Jan Stephan, Bernhard Manfred Gruber, Jeffrey Kelling
2  * SPDX-License-Identifier: MPL-2.0
3  */
4 
5 #pragma once
6 
7 #include "alpaka/core/Common.hpp"
8 
9 #include <atomic>
10 #include <future>
11 #include <mutex>
12 #include <optional>
13 #include <queue>
14 #include <vector>
15 
16 namespace alpaka::core::detail
17 {
18  //! A thread pool yielding when there is not enough work to be done.
19  struct ThreadPool final
20  {
21  using Task = std::packaged_task<void()>;
22 
23  //! Creates a thread pool with a given thread count
24  explicit ThreadPool(std::size_t threadCount)
25  {
26  if(threadCount < 1)
27  throw std::invalid_argument("The argument 'threadCount' has to be greate or equal to one!");
28  m_threads.reserve(threadCount);
29  for(std::size_t i = 0; i < threadCount; ++i)
30  m_threads.emplace_back([this] { threadFunc(); });
31  }
32 
33  //! Destroys the thread pool, blocking until all enqueued work is done.
35  {
36  m_stop = true; // Signal that concurrent executors should not perform any new work
37  for(auto& t : m_threads)
38  {
39  if(std::this_thread::get_id() == t.get_id())
40  {
41  std::cerr << "ERROR in ThreadPool joins itself" << std::endl;
42  std::abort();
43  }
44  t.join();
45  }
46  }
47 
48  //! Runs the given function on one of the pool in First In First Out (FIFO) order.
49  //!
50  //! \param task Function object to be called on the pool. Takes an arbitrary number of arguments. Must return
51  //! void.
52  //! \param args Arguments for task, cannot be moved. If such parameters must be used, use a lambda and capture
53  //! via move then move the lambda.
54  //! \return A future to the created task.
55  template<typename TFnObj, typename... TArgs>
56  auto enqueueTask(TFnObj&& task, TArgs&&... args) -> std::future<void>
57  {
58  auto ptask = Task{[=, t = std::forward<TFnObj>(task)]() noexcept(noexcept(task(args...))) { t(args...); }};
59  auto future = ptask.get_future();
60  {
61  std::lock_guard<std::mutex> lock{m_mutex};
62  m_tasks.push(std::move(ptask));
63  }
64  return future;
65  }
66 
67  private:
68  void threadFunc()
69  {
70  while(!m_stop.load(std::memory_order_relaxed))
71  {
72  std::optional<Task> task;
73  {
74  std::lock_guard<std::mutex> lock{m_mutex};
75  if(!m_tasks.empty())
76  {
77  task = std::move(m_tasks.front());
78  m_tasks.pop();
79  }
80  }
81  if(task)
82  (*task)();
83  else
84  std::this_thread::yield();
85  }
86  }
87 
88  std::vector<std::thread> m_threads;
89  std::queue<Task> m_tasks; // TODO(bgruber): we could consider a lock-free queue here
90  std::mutex m_mutex;
91  std::atomic<bool> m_stop = false;
92  };
93 } // namespace alpaka::core::detail
Defines implementation details that should not be used directly by the user.
Definition: Align.hpp:21
A thread pool yielding when there is not enough work to be done.
Definition: ThreadPool.hpp:20
std::packaged_task< void()> Task
Definition: ThreadPool.hpp:21
ThreadPool(std::size_t threadCount)
Creates a thread pool with a given thread count.
Definition: ThreadPool.hpp:24
~ThreadPool()
Destroys the thread pool, blocking until all enqueued work is done.
Definition: ThreadPool.hpp:34
auto enqueueTask(TFnObj &&task, TArgs &&... args) -> std::future< void >
Runs the given function on one of the pool in First In First Out (FIFO) order.
Definition: ThreadPool.hpp:56