alpaka
Abstraction Library for Parallel Kernel Acceleration
CallbackThread.hpp
Go to the documentation of this file.
1 /* Copyright 2022 Antonio Di Pilato
2  * SPDX-License-Identifier: MPL-2.0
3  */
4 
5 #pragma once
6 
8 
9 #include <cassert>
10 #include <condition_variable>
11 #include <functional>
12 #include <future>
13 #include <iostream>
14 #include <mutex>
15 #include <queue>
16 #include <thread>
17 
18 namespace alpaka::core
19 {
21  {
22 #if BOOST_COMP_CLANG
23 # pragma clang diagnostic push
24 # pragma clang diagnostic ignored "-Wweak-vtables"
25 #endif
26  // A custom class is used because std::function<F> requires F to be copyable, and std::packaged_task provides a
27  // std::future which will keep the task alive and we cannot control the moment the future is set.
28  //! \todo with C++23 std::move_only_function should be used
29  struct Task
30 #if BOOST_COMP_CLANG
31 # pragma clang diagnostic pop
32 #endif
33  {
34  virtual ~Task() = default;
35  virtual void run() = 0;
36  };
37 
38  template<typename Function>
39  struct FunctionHolder : Task
40  {
41  Function m_func;
42 
43  template<typename FunctionFwd>
44  explicit FunctionHolder(FunctionFwd&& func) : m_func{std::forward<FunctionFwd>(func)}
45  {
46  }
47 
48  void run() override
49  {
50  // if m_func throws, let it propagate
51  m_func();
52  }
53  };
54 
55  using TaskPackage = std::pair<std::unique_ptr<Task>, std::promise<void>>;
56 
57  public:
59  {
60  {
61  std::unique_lock<std::mutex> lock{m_mutex};
62  m_stop = true;
63  m_cond.notify_one();
64  }
65 
66  if(m_thread.joinable())
67  {
68  if(std::this_thread::get_id() == m_thread.get_id())
69  {
70  std::cerr << "ERROR in ~CallbackThread: thread joins itself" << std::endl;
71  std::abort();
72  }
73  m_thread.join();
74  }
75  }
76 
77  //! It is guaranteed that the task is fully destroyed before the future's result is set.
78  //! @{
79  template<typename NullaryFunction>
80  auto submit(NullaryFunction&& nf) -> std::future<void>
81  {
82  using DecayedFunction = std::decay_t<NullaryFunction>;
83  static_assert(
84  std::is_void_v<std::invoke_result_t<DecayedFunction>>,
85  "Submitted function must not have any arguments and return void.");
86 
87  // FunctionHolder stores a copy of the user's task, but may be constructed from an expiring value to avoid
88  // the copy. We do NOT store a reference to the users task, which could dangle if the user isn't careful.
89  auto tp = std::pair(
90  std::unique_ptr<Task>(new FunctionHolder<DecayedFunction>{std::forward<NullaryFunction>(nf)}),
91  std::promise<void>{});
92  auto f = tp.second.get_future();
93  {
94  std::unique_lock<std::mutex> lock{m_mutex};
95  m_tasks.emplace(std::move(tp));
96  if(!m_thread.joinable())
97  startWorkerThread();
98  m_cond.notify_one();
99  }
100 
101  return f;
102  }
103 
104  //! @}
105 
106  //! @return True if queue is empty and no task is executed else false.
107  //! If only one tasks is enqueued and the task is executed the task will see the queue as not empty.
108  //! During the destruction of this single enqueued task the queue will already be accounted as empty.
109  [[nodiscard]] auto empty()
110  {
111  std::unique_lock<std::mutex> lock{m_mutex};
112  return m_tasks.empty();
113  }
114 
115  private:
116  std::thread m_thread;
117  std::condition_variable m_cond;
118  std::mutex m_mutex;
119  bool m_stop{false};
120  std::queue<TaskPackage> m_tasks;
121 
122  auto startWorkerThread() -> void
123  {
124  m_thread = std::thread(
125  [this]
126  {
127  while(true)
128  {
129  std::promise<void> taskPromise;
130  std::exception_ptr eptr;
131  {
132  // Task is destroyed before promise is updated but after the queue state is up to date.
133  std::unique_ptr<Task> task = nullptr;
134  {
135  std::unique_lock<std::mutex> lock{m_mutex};
136  m_cond.wait(lock, [this] { return m_stop || !m_tasks.empty(); });
137 
138  if(m_stop && m_tasks.empty())
139  break;
140 
141  task = std::move(m_tasks.front().first);
142  taskPromise = std::move(m_tasks.front().second);
143  }
144  assert(task);
145  try
146  {
147  task->run();
148  }
149  catch(...)
150  {
151  eptr = std::current_exception();
152  }
153  {
154  std::unique_lock<std::mutex> lock{m_mutex};
155  // Pop empty data from the queue, task and promise will be destroyed later in a
156  // well-defined order.
157  m_tasks.pop();
158  }
159  // Task will be destroyed here, the queue status is already updated.
160  }
161  // In case the executed tasks is the last task in the queue the waiting threads will see the
162  // queue as empty.
163  if(eptr)
164  taskPromise.set_exception(std::move(eptr));
165  else
166  taskPromise.set_value();
167  }
168  });
169  }
170  };
171 } // namespace alpaka::core
auto submit(NullaryFunction &&nf) -> std::future< void >
It is guaranteed that the task is fully destroyed before the future's result is set.