alpaka
Abstraction Library for Parallel Kernel Acceleration
Loading...
Searching...
No Matches
CallbackThread.hpp
Go to the documentation of this file.
1/* Copyright 2022 Antonio Di Pilato
2 * SPDX-License-Identifier: MPL-2.0
3 */
4
5#pragma once
6
8
9#include <cassert>
10#include <condition_variable>
11#include <functional>
12#include <future>
13#include <iostream>
14#include <mutex>
15#include <queue>
16#include <thread>
17
18namespace alpaka::core
19{
21 {
22#if BOOST_COMP_CLANG
23# pragma clang diagnostic push
24# pragma clang diagnostic ignored "-Wweak-vtables"
25#endif
26 // A custom class is used because std::function<F> requires F to be copyable, and std::packaged_task provides a
27 // std::future which will keep the task alive and we cannot control the moment the future is set.
28 //! \todo with C++23 std::move_only_function should be used
29 struct Task
32#endif
33 {
34 virtual ~Task() = default;
35 virtual void run() = 0;
36 };
37
38 template<typename Function>
39 struct FunctionHolder : Task
40 {
41 Function m_func;
42
43 template<typename FunctionFwd>
44 explicit FunctionHolder(FunctionFwd&& func) : m_func{std::forward<FunctionFwd>(func)}
45 {
46 }
47
48 void run() override
49 {
50 // if m_func throws, let it propagate
51 m_func();
52 }
53 };
54
55 using TaskPackage = std::pair<std::unique_ptr<Task>, std::promise<void>>;
56
57 public:
59 {
60 {
61 std::unique_lock<std::mutex> lock{m_mutex};
62 m_stop = true;
63 m_cond.notify_one();
64 }
65
66 if(m_thread.joinable())
67 {
68 if(std::this_thread::get_id() == m_thread.get_id())
69 {
70 std::cerr << "ERROR in ~CallbackThread: thread joins itself" << std::endl;
71 std::abort();
72 }
73 m_thread.join();
74 }
75 }
76
77 //! It is guaranteed that the task is fully destroyed before the future's result is set.
78 //! @{
79 template<typename NullaryFunction>
80 auto submit(NullaryFunction&& nf) -> std::future<void>
81 {
82 using DecayedFunction = std::decay_t<NullaryFunction>;
83 static_assert(
84 std::is_void_v<std::invoke_result_t<DecayedFunction>>,
85 "Submitted function must not have any arguments and return void.");
86
87 // FunctionHolder stores a copy of the user's task, but may be constructed from an expiring value to avoid
88 // the copy. We do NOT store a reference to the users task, which could dangle if the user isn't careful.
89 auto tp = std::pair(
90 std::unique_ptr<Task>(new FunctionHolder<DecayedFunction>{std::forward<NullaryFunction>(nf)}),
91 std::promise<void>{});
92 auto f = tp.second.get_future();
93 {
94 std::unique_lock<std::mutex> lock{m_mutex};
95 m_tasks.emplace(std::move(tp));
96 if(!m_thread.joinable())
97 startWorkerThread();
98 m_cond.notify_one();
99 }
100
101 return f;
102 }
103
104 //! @}
105
106 //! @return True if queue is empty and no task is executed else false.
107 //! If only one tasks is enqueued and the task is executed the task will see the queue as not empty.
108 //! During the destruction of this single enqueued task the queue will already be accounted as empty.
109 [[nodiscard]] auto empty()
110 {
111 std::unique_lock<std::mutex> lock{m_mutex};
112 return m_tasks.empty();
113 }
114
115 private:
116 std::thread m_thread;
117 std::condition_variable m_cond;
118 std::mutex m_mutex;
119 bool m_stop{false};
120 std::queue<TaskPackage> m_tasks;
121
122 auto startWorkerThread() -> void
123 {
124 m_thread = std::thread(
125 [this]
126 {
127 while(true)
128 {
129 std::promise<void> taskPromise;
130 std::exception_ptr eptr;
131 {
132 // Task is destroyed before promise is updated but after the queue state is up to date.
133 std::unique_ptr<Task> task = nullptr;
134 {
135 std::unique_lock<std::mutex> lock{m_mutex};
136 m_cond.wait(lock, [this] { return m_stop || !m_tasks.empty(); });
137
138 if(m_stop && m_tasks.empty())
139 break;
140
141 task = std::move(m_tasks.front().first);
142 taskPromise = std::move(m_tasks.front().second);
143 }
144 assert(task);
145 try
146 {
147 task->run();
148 }
149 catch(...)
150 {
151 eptr = std::current_exception();
152 }
153 {
154 std::unique_lock<std::mutex> lock{m_mutex};
155 // Pop empty data from the queue, task and promise will be destroyed later in a
156 // well-defined order.
157 m_tasks.pop();
158 }
159 // Task will be destroyed here, the queue status is already updated.
160 }
161 // In case the executed tasks is the last task in the queue the waiting threads will see the
162 // queue as empty.
163 if(eptr)
164 taskPromise.set_exception(std::move(eptr));
165 else
166 taskPromise.set_value();
167 }
168 });
169 }
170 };
171} // namespace alpaka::core
auto submit(NullaryFunction &&nf) -> std::future< void >
It is guaranteed that the task is fully destroyed before the future's result is set.
auto clipCast(V const &val) -> T
Definition ClipCast.hpp:16