alpaka
Abstraction Library for Parallel Kernel Acceleration
Loading...
Searching...
No Matches
ThreadPool.hpp
Go to the documentation of this file.
1/* Copyright 2023 Benjamin Worpitz, René Widera, Jan Stephan, Bernhard Manfred Gruber, Jeffrey Kelling
2 * SPDX-License-Identifier: MPL-2.0
3 */
4
5#pragma once
6
8
9#include <atomic>
10#include <future>
11#include <mutex>
12#include <optional>
13#include <queue>
14#include <vector>
15
17{
18 //! A thread pool yielding when there is not enough work to be done.
20 {
21 using Task = std::packaged_task<void()>;
22
23 //! Creates a thread pool with a given thread count
24 explicit ThreadPool(std::size_t threadCount)
25 {
26 if(threadCount < 1)
27 throw std::invalid_argument("The argument 'threadCount' has to be greate or equal to one!");
28 m_threads.reserve(threadCount);
29 for(std::size_t i = 0; i < threadCount; ++i)
30 m_threads.emplace_back([this] { threadFunc(); });
31 }
32
33 //! Destroys the thread pool, blocking until all enqueued work is done.
35 {
36 m_stop = true; // Signal that concurrent executors should not perform any new work
37 for(auto& t : m_threads)
38 {
39 if(std::this_thread::get_id() == t.get_id())
40 {
41 std::cerr << "ERROR in ThreadPool joins itself" << std::endl;
42 std::abort();
43 }
44 t.join();
45 }
46 }
47
48 //! Runs the given function on one of the pool in First In First Out (FIFO) order.
49 //!
50 //! \param task Function object to be called on the pool. Takes an arbitrary number of arguments. Must return
51 //! void.
52 //! \param args Arguments for task, cannot be moved. If such parameters must be used, use a lambda and capture
53 //! via move then move the lambda.
54 //! \return A future to the created task.
55 template<typename TFnObj, typename... TArgs>
56 auto enqueueTask(TFnObj&& task, TArgs&&... args) -> std::future<void>
57 {
58#if BOOST_COMP_MSVC
59// MSVC 14.39.33519 is throwing an error because the noexcept type deduction is not defined in original C++17
60// error C2065: 'task': undeclared identifier
61// see: https://stackoverflow.com/a/72467726
62# define ALPAKA_NOEXCEPT(...)
63#else
64# define ALPAKA_NOEXCEPT(...) noexcept(__VA_ARGS__)
65#endif
66 auto ptask
67 = Task{[=, t = std::forward<TFnObj>(task)]() ALPAKA_NOEXCEPT(noexcept(task(args...))) { t(args...); }};
68#undef ALPAKA_NOEXCEPT
69
70 auto future = ptask.get_future();
71 {
72 std::lock_guard<std::mutex> lock{m_mutex};
73 m_tasks.push(std::move(ptask));
74 }
75 return future;
76 }
77
78 private:
79 void threadFunc()
80 {
81 while(!m_stop.load(std::memory_order_relaxed))
82 {
83 std::optional<Task> task;
84 {
85 std::lock_guard<std::mutex> lock{m_mutex};
86 if(!m_tasks.empty())
87 {
88 task = std::move(m_tasks.front());
89 m_tasks.pop();
90 }
91 }
92 if(task)
93 (*task)();
94 else
95 std::this_thread::yield();
96 }
97 }
98
99 std::vector<std::thread> m_threads;
100 std::queue<Task> m_tasks; // TODO(bgruber): we could consider a lock-free queue here
101 std::mutex m_mutex;
102 std::atomic<bool> m_stop = false;
103 };
104} // namespace alpaka::core::detail
#define ALPAKA_NOEXCEPT(...)
Defines implementation details that should not be used directly by the user.
Definition Align.hpp:21
auto clipCast(V const &val) -> T
Definition ClipCast.hpp:16
A thread pool yielding when there is not enough work to be done.
std::packaged_task< void()> Task
ThreadPool(std::size_t threadCount)
Creates a thread pool with a given thread count.
~ThreadPool()
Destroys the thread pool, blocking until all enqueued work is done.
auto enqueueTask(TFnObj &&task, TArgs &&... args) -> std::future< void >
Runs the given function on one of the pool in First In First Out (FIFO) order.