alpaka
Abstraction Library for Parallel Kernel Acceleration
EventGenericThreads.hpp
Go to the documentation of this file.
1 /* Copyright 2023 Axel Hübl, Benjamin Worpitz, Matthias Werner, René Widera, Jan Stephan, Bernhard Manfred Gruber
2  * SPDX-License-Identifier: MPL-2.0
3  */
4 
5 #pragma once
6 
7 #include "alpaka/core/Assert.hpp"
9 #include "alpaka/dev/Traits.hpp"
10 #include "alpaka/event/Traits.hpp"
13 #include "alpaka/wait/Traits.hpp"
14 
15 #include <condition_variable>
16 #include <future>
17 #include <mutex>
18 #include <utility>
19 #if ALPAKA_DEBUG >= ALPAKA_DEBUG_MINIMAL
20 # include <iostream>
21 #endif
22 
23 namespace alpaka
24 {
25  namespace generic::detail
26  {
27  //! The CPU device event implementation.
28  template<typename TDev>
30  : public interface::Implements<ConceptCurrentThreadWaitFor, EventGenericThreadsImpl<TDev>>
31  {
32  public:
33  EventGenericThreadsImpl(TDev dev) noexcept : m_dev(std::move(dev))
34  {
35  }
36 
39 
40  auto isReady() noexcept -> bool
41  {
43  }
44 
45  auto wait(std::size_t const& enqueueCount, std::unique_lock<std::mutex>& lk) const noexcept -> void
46  {
47  ALPAKA_ASSERT(enqueueCount <= m_enqueueCount);
48 
49  while(enqueueCount > m_LastReadyEnqueueCount)
50  {
51  auto future = m_future;
52  lk.unlock();
53  future.get();
54  lk.lock();
55  }
56  }
57 
58  TDev const m_dev; //!< The device this event is bound to.
59 
60  std::mutex mutable m_mutex; //!< The mutex used to synchronize access to the event.
61  std::shared_future<void> m_future; //!< The future signaling the event completion.
62  std::size_t m_enqueueCount = 0u; //!< The number of times this event has been enqueued.
63  std::size_t m_LastReadyEnqueueCount = 0u; //!< The time this event has been ready the last time.
64  //!< Ready means that the event was not waiting within a queue
65  //!< (not enqueued or already completed). If m_enqueueCount ==
66  //!< m_LastReadyEnqueueCount, the event is currently not enqueued
67  };
68  } // namespace generic::detail
69 
70  //! The CPU device event.
71  template<typename TDev>
72  class EventGenericThreads final
73  : public interface::Implements<ConceptCurrentThreadWaitFor, EventGenericThreads<TDev>>
74  , public interface::Implements<ConceptGetDev, EventGenericThreads<TDev>>
75  {
76  public:
77  //! \param bBusyWaiting Unused. EventGenericThreads never does busy waiting.
78  EventGenericThreads(TDev const& dev, [[maybe_unused]] bool bBusyWaiting = true)
79  : m_spEventImpl(std::make_shared<generic::detail::EventGenericThreadsImpl<TDev>>(dev))
80  {
81  }
82 
83  auto operator==(EventGenericThreads<TDev> const& rhs) const -> bool
84  {
85  return (m_spEventImpl == rhs.m_spEventImpl);
86  }
87 
88  auto operator!=(EventGenericThreads<TDev> const& rhs) const -> bool
89  {
90  return !((*this) == rhs);
91  }
92 
93  public:
94  std::shared_ptr<generic::detail::EventGenericThreadsImpl<TDev>> m_spEventImpl;
95  };
96 
97  namespace trait
98  {
99  //! The CPU device event device type trait specialization.
100  template<typename TDev>
102  {
103  using type = TDev;
104  };
105 
106  //! The CPU device event device get trait specialization.
107  template<typename TDev>
109  {
110  ALPAKA_FN_HOST static auto getDev(EventGenericThreads<TDev> const& event) -> TDev
111  {
112  return event.m_spEventImpl->m_dev;
113  }
114  };
115 
116  //! The CPU device event test trait specialization.
117  template<typename TDev>
119  {
120  //! \return If the event is not waiting within a queue (not enqueued or already handled).
121  ALPAKA_FN_HOST static auto isComplete(EventGenericThreads<TDev> const& event) -> bool
122  {
123  std::lock_guard<std::mutex> lk(event.m_spEventImpl->m_mutex);
124 
125  return event.m_spEventImpl->isReady();
126  }
127  };
128 
129  //! The CPU non-blocking device queue enqueue trait specialization.
130  template<typename TDev>
132  {
133  ALPAKA_FN_HOST static auto enqueue(
135  EventGenericThreads<TDev>& event) -> void
136  {
138 
139  // Copy the shared pointer of the event implementation.
140  // This is forwarded to the lambda that is enqueued into the queue to ensure that the event
141  // implementation is alive as long as it is enqueued.
142  auto spEventImpl = event.m_spEventImpl;
143 
144  // Setting the event state and enqueuing it has to be atomic.
145  std::lock_guard<std::mutex> lk(spEventImpl->m_mutex);
146 
147  ++spEventImpl->m_enqueueCount;
148 
149  auto const enqueueCount = spEventImpl->m_enqueueCount;
150 
151  // Enqueue a task that only resets the events flag if it is completed.
152  spEventImpl->m_future = queueImpl.m_workerThread.submit(
153  [spEventImpl, enqueueCount]() mutable
154  {
155  std::unique_lock<std::mutex> lk2(spEventImpl->m_mutex);
156 
157  // Nothing to do if it has been re-enqueued to a later position in the queue.
158  if(enqueueCount == spEventImpl->m_enqueueCount)
159  {
160  spEventImpl->m_LastReadyEnqueueCount
161  = std::max(enqueueCount, spEventImpl->m_LastReadyEnqueueCount);
162  }
163  });
164  }
165  };
166 
167  //! The CPU non-blocking device queue enqueue trait specialization.
168  template<typename TDev>
170  {
171  ALPAKA_FN_HOST static auto enqueue(
173  EventGenericThreads<TDev>& event) -> void
174  {
176 
177  alpaka::enqueue(*queue.m_spQueueImpl, event);
178  }
179  };
180 
181  //! The CPU blocking device queue enqueue trait specialization.
182  template<typename TDev>
184  {
185  ALPAKA_FN_HOST static auto enqueue(
187  EventGenericThreads<TDev>& event) -> void
188  {
190 
191  std::promise<void> promise;
192  {
193  std::lock_guard<std::mutex> lk(queueImpl.m_mutex);
194 
195  queueImpl.m_bCurrentlyExecutingTask = true;
196 
197  auto& eventImpl(*event.m_spEventImpl);
198 
199  {
200  // Setting the event state and enqueuing it has to be atomic.
201  std::lock_guard<std::mutex> evLk(eventImpl.m_mutex);
202 
203  ++eventImpl.m_enqueueCount;
204  // NOTE: Difference to non-blocking version: directly set the event state instead of enqueuing.
205  eventImpl.m_LastReadyEnqueueCount = eventImpl.m_enqueueCount;
206 
207  eventImpl.m_future = promise.get_future();
208  }
209 
210  queueImpl.m_bCurrentlyExecutingTask = false;
211  }
212  promise.set_value();
213  }
214  };
215 
216  //! The CPU blocking device queue enqueue trait specialization.
217  template<typename TDev>
219  {
220  ALPAKA_FN_HOST static auto enqueue(
222  EventGenericThreads<TDev>& event) -> void
223  {
225 
226  alpaka::enqueue(*queue.m_spQueueImpl, event);
227  }
228  };
229  } // namespace trait
230 
231  namespace trait
232  {
233  namespace generic
234  {
235  template<typename TDev>
236  ALPAKA_FN_HOST auto currentThreadWaitForDevice(TDev const& dev) -> void
237  {
238  // Get all the queues on the device at the time of invocation.
239  // All queues added afterwards are ignored.
240  auto vQueues = dev.getAllQueues();
241  // Furthermore there should not even be a chance to enqueue something between getting the queues and
242  // adding our wait events!
243  std::vector<EventGenericThreads<TDev>> vEvents;
244  for(auto&& spQueue : vQueues)
245  {
246  vEvents.emplace_back(dev);
247  spQueue->enqueue(vEvents.back());
248  }
249 
250  // Now wait for all the events.
251  for(auto&& event : vEvents)
252  {
253  wait(event);
254  }
255  }
256  } // namespace generic
257 
258  //! The CPU device event thread wait trait specialization.
259  //!
260  //! Waits until the event itself and therefore all tasks preceding it in the queue it is enqueued to have been
261  //! completed. If the event is not enqueued to a queue the method returns immediately.
262  template<typename TDev>
264  {
266  {
267  wait(*event.m_spEventImpl);
268  }
269  };
270 
271  //! The CPU device event implementation thread wait trait specialization.
272  //!
273  //! Waits until the event itself and therefore all tasks preceding it in the queue it is enqueued to have been
274  //! completed. If the event is not enqueued to a queue the method returns immediately.
275  //!
276  //! NOTE: This method is for internal usage only.
277  template<typename TDev>
279  {
282  {
283  std::unique_lock<std::mutex> lk(eventImpl.m_mutex);
284 
285  auto const enqueueCount = eventImpl.m_enqueueCount;
286  eventImpl.wait(enqueueCount, lk);
287  }
288  };
289 
290  //! The CPU non-blocking device queue event wait trait specialization.
291  template<typename TDev>
294  EventGenericThreads<TDev>>
295  {
298  EventGenericThreads<TDev> const& event) -> void
299  {
300  // Copy the shared pointer of the event implementation.
301  // This is forwarded to the lambda that is enqueued into the queue to ensure that the event
302  // implementation is alive as long as it is enqueued.
303  auto spEventImpl = event.m_spEventImpl;
304 
305  std::lock_guard<std::mutex> lk(spEventImpl->m_mutex);
306 
307  if(!spEventImpl->isReady())
308  {
309  auto oldFuture = spEventImpl->m_future;
310 
311  // Enqueue a task that waits for the given future of the event.
312  queueImpl.m_workerThread.submit([oldFuture]() { oldFuture.get(); });
313  }
314  }
315  };
316 
317  //! The CPU non-blocking device queue event wait trait specialization.
318  template<typename TDev>
320  {
323  EventGenericThreads<TDev> const& event) -> void
324  {
325  wait(*queue.m_spQueueImpl, event);
326  }
327  };
328 
329  //! The CPU blocking device queue event wait trait specialization.
330  template<typename TDev>
332  {
335  EventGenericThreads<TDev> const& event) -> void
336  {
337  // NOTE: Difference to non-blocking version: directly wait for event.
338  wait(*event.m_spEventImpl);
339  }
340  };
341 
342  //! The CPU blocking device queue event wait trait specialization.
343  template<typename TDev>
345  {
348  EventGenericThreads<TDev> const& event) -> void
349  {
350  wait(*queue.m_spQueueImpl, event);
351  }
352  };
353 
354  //! The CPU non-blocking device event wait trait specialization.
355  //!
356  //! Any future work submitted in any queue of this device will wait for event to complete before beginning
357  //! execution.
358  template<typename TDev>
359  struct WaiterWaitFor<TDev, EventGenericThreads<TDev>>
360  {
361  ALPAKA_FN_HOST static auto waiterWaitFor(TDev& dev, EventGenericThreads<TDev> const& event) -> void
362  {
363  // Get all the queues on the device at the time of invocation.
364  // All queues added afterwards are ignored.
365  auto vspQueues(dev.getAllQueues());
366 
367  // Let all the queues wait for this event.
368  // Furthermore there should not even be a chance to enqueue something between getting the queues and
369  // adding our wait events!
370  for(auto&& spQueue : vspQueues)
371  {
372  spQueue->wait(event);
373  }
374  }
375  };
376 
377  //! The CPU non-blocking device queue thread wait trait specialization.
378  //!
379  //! Blocks execution of the calling thread until the queue has finished processing all previously requested
380  //! tasks (kernels, data copies, ...)
381  template<typename TDev>
383  {
385  {
386  // Enqueue a dummy tasks into the worker thread of the queue will provide a future we can wait for.
387  // Previously we enqueued an event into the queue but this will not guarantee that queue is empty
388  // after the event is finished because the event handling can be finished before the event task is
389  // fully removed from the queue.
390  auto f = queue.m_spQueueImpl->m_workerThread.submit([]() noexcept {});
391  f.wait();
392  }
393  };
394  } // namespace trait
395 } // namespace alpaka
#define ALPAKA_ASSERT(...)
The assert can be explicit disabled by defining NDEBUG.
Definition: Assert.hpp:13
#define ALPAKA_DEBUG_MINIMAL_LOG_SCOPE
Definition: Debug.hpp:55
EventGenericThreads(TDev const &dev, [[maybe_unused]] bool bBusyWaiting=true)
std::shared_ptr< generic::detail::EventGenericThreadsImpl< TDev > > m_spEventImpl
auto operator==(EventGenericThreads< TDev > const &rhs) const -> bool
auto operator!=(EventGenericThreads< TDev > const &rhs) const -> bool
std::size_t m_LastReadyEnqueueCount
The time this event has been ready the last time. Ready means that the event was not waiting within a...
std::shared_future< void > m_future
The future signaling the event completion.
EventGenericThreadsImpl(EventGenericThreadsImpl< TDev > const &)=delete
std::mutex m_mutex
The mutex used to synchronize access to the event.
std::size_t m_enqueueCount
The number of times this event has been enqueued.
TDev const m_dev
The device this event is bound to.
auto wait(std::size_t const &enqueueCount, std::unique_lock< std::mutex > &lk) const noexcept -> void
auto operator=(EventGenericThreadsImpl< TDev > const &) -> EventGenericThreadsImpl< TDev > &=delete
#define ALPAKA_FN_HOST
Definition: Common.hpp:40
ALPAKA_NO_HOST_ACC_WARNING ALPAKA_FN_HOST_ACC auto max(T const &max_ctx, Tx const &x, Ty const &y)
Returns the larger of two arguments. NaNs are treated as missing data (between a NaN and a numeric va...
Definition: Traits.hpp:1263
ALPAKA_FN_HOST auto currentThreadWaitForDevice(TDev const &dev) -> void
The alpaka accelerator library.
ALPAKA_FN_HOST auto enqueue(TQueue &queue, TTask &&task) -> void
Queues the given task in the given queue.
Definition: Traits.hpp:47
ALPAKA_FN_HOST auto wait(TAwaited const &awaited) -> void
Waits the thread for the completion of the given awaited action to complete.
Definition: Traits.hpp:34
Tag used in class inheritance hierarchies that describes that a specific interface (TInterface) is im...
Definition: Interface.hpp:15
static ALPAKA_FN_HOST auto currentThreadWaitFor(EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto currentThreadWaitFor(QueueGenericThreadsNonBlocking< TDev > const &queue) -> void
static ALPAKA_FN_HOST auto currentThreadWaitFor(alpaka::generic::detail::EventGenericThreadsImpl< TDev > const &eventImpl) -> void
The thread wait trait.
Definition: Traits.hpp:21
The device type trait.
Definition: Traits.hpp:23
static ALPAKA_FN_HOST auto enqueue(QueueGenericThreadsBlocking< TDev > &queue, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue(QueueGenericThreadsNonBlocking< TDev > &queue, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue(alpaka::generic::detail::QueueGenericThreadsBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue([[maybe_unused]] alpaka::generic::detail::QueueGenericThreadsNonBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > &event) -> void
The queue enqueue trait.
Definition: Traits.hpp:27
static ALPAKA_FN_HOST auto getDev(EventGenericThreads< TDev > const &event) -> TDev
The device get trait.
Definition: Traits.hpp:27
static ALPAKA_FN_HOST auto isComplete(EventGenericThreads< TDev > const &event) -> bool
The event tester trait.
Definition: Traits.hpp:21
static ALPAKA_FN_HOST auto waiterWaitFor(QueueGenericThreadsBlocking< TDev > &queue, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(QueueGenericThreadsNonBlocking< TDev > &queue, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(TDev &dev, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(alpaka::generic::detail::QueueGenericThreadsBlockingImpl< TDev > &, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(alpaka::generic::detail::QueueGenericThreadsNonBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > const &event) -> void
The waiter wait trait.
Definition: Traits.hpp:25