alpaka
Abstraction Library for Parallel Kernel Acceleration
Loading...
Searching...
No Matches
EventGenericThreads.hpp
Go to the documentation of this file.
1/* Copyright 2023 Axel Hübl, Benjamin Worpitz, Matthias Werner, René Widera, Jan Stephan, Bernhard Manfred Gruber
2 * SPDX-License-Identifier: MPL-2.0
3 */
4
5#pragma once
6
14
15#include <condition_variable>
16#include <future>
17#include <mutex>
18#include <utility>
19#if ALPAKA_DEBUG >= ALPAKA_DEBUG_MINIMAL
20# include <iostream>
21#endif
22
23namespace alpaka
24{
25 namespace generic::detail
26 {
27 //! The CPU device event implementation.
28 template<typename TDev>
30 : public interface::Implements<ConceptCurrentThreadWaitFor, EventGenericThreadsImpl<TDev>>
31 {
32 public:
33 EventGenericThreadsImpl(TDev dev) noexcept : m_dev(std::move(dev))
34 {
35 }
36
39
40 auto isReady() noexcept -> bool
41 {
43 }
44
45 auto wait(std::size_t const& enqueueCount, std::unique_lock<std::mutex>& lk) const noexcept -> void
46 {
47 ALPAKA_ASSERT(enqueueCount <= m_enqueueCount);
48
49 while(enqueueCount > m_LastReadyEnqueueCount)
50 {
51 auto future = m_future;
52 lk.unlock();
53 future.get();
54 lk.lock();
55 }
56 }
57
58 TDev const m_dev; //!< The device this event is bound to.
59
60 std::mutex mutable m_mutex; //!< The mutex used to synchronize access to the event.
61 std::shared_future<void> m_future; //!< The future signaling the event completion.
62 std::size_t m_enqueueCount = 0u; //!< The number of times this event has been enqueued.
63 std::size_t m_LastReadyEnqueueCount = 0u; //!< The time this event has been ready the last time.
64 //!< Ready means that the event was not waiting within a queue
65 //!< (not enqueued or already completed). If m_enqueueCount ==
66 //!< m_LastReadyEnqueueCount, the event is currently not enqueued
67 };
68 } // namespace generic::detail
69
70 //! The CPU device event.
71 template<typename TDev>
73 : public interface::Implements<ConceptCurrentThreadWaitFor, EventGenericThreads<TDev>>
74 , public interface::Implements<ConceptGetDev, EventGenericThreads<TDev>>
75 {
76 public:
77 //! \param bBusyWaiting Unused. EventGenericThreads never does busy waiting.
78 EventGenericThreads(TDev const& dev, [[maybe_unused]] bool bBusyWaiting = true)
79 : m_spEventImpl(std::make_shared<generic::detail::EventGenericThreadsImpl<TDev>>(dev))
80 {
81 }
82
83 auto operator==(EventGenericThreads<TDev> const& rhs) const -> bool
84 {
85 return (m_spEventImpl == rhs.m_spEventImpl);
86 }
87
88 auto operator!=(EventGenericThreads<TDev> const& rhs) const -> bool
89 {
90 return !((*this) == rhs);
91 }
92
93 public:
94 std::shared_ptr<generic::detail::EventGenericThreadsImpl<TDev>> m_spEventImpl;
95 };
96
97 namespace trait
98 {
99 //! The CPU device event device type trait specialization.
100 template<typename TDev>
102 {
103 using type = TDev;
104 };
105
106 //! The CPU device event device get trait specialization.
107 template<typename TDev>
109 {
110 ALPAKA_FN_HOST static auto getDev(EventGenericThreads<TDev> const& event) -> TDev
111 {
112 return event.m_spEventImpl->m_dev;
113 }
114 };
115
116 //! The CPU device event test trait specialization.
117 template<typename TDev>
119 {
120 //! \return If the event is not waiting within a queue (not enqueued or already handled).
121 ALPAKA_FN_HOST static auto isComplete(EventGenericThreads<TDev> const& event) -> bool
122 {
123 std::lock_guard<std::mutex> lk(event.m_spEventImpl->m_mutex);
124
125 return event.m_spEventImpl->isReady();
126 }
127 };
128
129 //! The CPU non-blocking device queue enqueue trait specialization.
130 template<typename TDev>
132 {
135 EventGenericThreads<TDev>& event) -> void
136 {
138
139 // Copy the shared pointer of the event implementation.
140 // This is forwarded to the lambda that is enqueued into the queue to ensure that the event
141 // implementation is alive as long as it is enqueued.
142 auto spEventImpl = event.m_spEventImpl;
143
144 // Setting the event state and enqueuing it has to be atomic.
145 std::lock_guard<std::mutex> lk(spEventImpl->m_mutex);
146
147 ++spEventImpl->m_enqueueCount;
148
149 auto const enqueueCount = spEventImpl->m_enqueueCount;
150
151 // Enqueue a task that only resets the events flag if it is completed.
152 spEventImpl->m_future = queueImpl.m_workerThread.submit(
153 [spEventImpl, enqueueCount]() mutable
154 {
155 std::unique_lock<std::mutex> lk2(spEventImpl->m_mutex);
156
157 // Nothing to do if it has been re-enqueued to a later position in the queue.
158 if(enqueueCount == spEventImpl->m_enqueueCount)
159 {
160 spEventImpl->m_LastReadyEnqueueCount
161 = std::max(enqueueCount, spEventImpl->m_LastReadyEnqueueCount);
162 }
163 });
164 }
165 };
166
167 //! The CPU non-blocking device queue enqueue trait specialization.
168 template<typename TDev>
170 {
173 EventGenericThreads<TDev>& event) -> void
174 {
176
177 alpaka::enqueue(*queue.m_spQueueImpl, event);
178 }
179 };
180
181 //! The CPU blocking device queue enqueue trait specialization.
182 template<typename TDev>
184 {
187 EventGenericThreads<TDev>& event) -> void
188 {
190
191 std::promise<void> promise;
192 {
193 std::lock_guard<std::mutex> lk(queueImpl.m_mutex);
194
195 queueImpl.m_bCurrentlyExecutingTask = true;
196
197 auto& eventImpl(*event.m_spEventImpl);
198
199 {
200 // Setting the event state and enqueuing it has to be atomic.
201 std::lock_guard<std::mutex> evLk(eventImpl.m_mutex);
202
203 ++eventImpl.m_enqueueCount;
204 // NOTE: Difference to non-blocking version: directly set the event state instead of enqueuing.
205 eventImpl.m_LastReadyEnqueueCount = eventImpl.m_enqueueCount;
206
207 eventImpl.m_future = promise.get_future();
208 }
209
210 queueImpl.m_bCurrentlyExecutingTask = false;
211 }
212 promise.set_value();
213 }
214 };
215
216 //! The CPU blocking device queue enqueue trait specialization.
217 template<typename TDev>
219 {
222 EventGenericThreads<TDev>& event) -> void
223 {
225
226 alpaka::enqueue(*queue.m_spQueueImpl, event);
227 }
228 };
229 } // namespace trait
230
231 namespace trait
232 {
233 namespace generic
234 {
235 template<typename TDev>
236 ALPAKA_FN_HOST auto currentThreadWaitForDevice(TDev const& dev) -> void
237 {
238 // Get all the queues on the device at the time of invocation.
239 // All queues added afterwards are ignored.
240 auto vQueues = dev.getAllQueues();
241 // Furthermore there should not even be a chance to enqueue something between getting the queues and
242 // adding our wait events!
243 std::vector<EventGenericThreads<TDev>> vEvents;
244 for(auto&& spQueue : vQueues)
245 {
246 vEvents.emplace_back(dev);
247 spQueue->enqueue(vEvents.back());
248 }
249
250 // Now wait for all the events.
251 for(auto&& event : vEvents)
252 {
253 wait(event);
254 }
255 }
256 } // namespace generic
257
258 //! The CPU device event thread wait trait specialization.
259 //!
260 //! Waits until the event itself and therefore all tasks preceding it in the queue it is enqueued to have been
261 //! completed. If the event is not enqueued to a queue the method returns immediately.
262 template<typename TDev>
264 {
266 {
267 wait(*event.m_spEventImpl);
268 }
269 };
270
271 //! The CPU device event implementation thread wait trait specialization.
272 //!
273 //! Waits until the event itself and therefore all tasks preceding it in the queue it is enqueued to have been
274 //! completed. If the event is not enqueued to a queue the method returns immediately.
275 //!
276 //! NOTE: This method is for internal usage only.
277 template<typename TDev>
279 {
282 {
283 std::unique_lock<std::mutex> lk(eventImpl.m_mutex);
284
285 auto const enqueueCount = eventImpl.m_enqueueCount;
286 eventImpl.wait(enqueueCount, lk);
287 }
288 };
289
290 //! The CPU non-blocking device queue event wait trait specialization.
291 template<typename TDev>
295 {
298 EventGenericThreads<TDev> const& event) -> void
299 {
300 // Copy the shared pointer of the event implementation.
301 // This is forwarded to the lambda that is enqueued into the queue to ensure that the event
302 // implementation is alive as long as it is enqueued.
303 auto spEventImpl = event.m_spEventImpl;
304
305 std::lock_guard<std::mutex> lk(spEventImpl->m_mutex);
306
307 if(!spEventImpl->isReady())
308 {
309 auto oldFuture = spEventImpl->m_future;
310
311 // Enqueue a task that waits for the given future of the event.
312 queueImpl.m_workerThread.submit([oldFuture]() { oldFuture.get(); });
313 }
314 }
315 };
316
317 //! The CPU non-blocking device queue event wait trait specialization.
318 template<typename TDev>
320 {
323 EventGenericThreads<TDev> const& event) -> void
324 {
325 wait(*queue.m_spQueueImpl, event);
326 }
327 };
328
329 //! The CPU blocking device queue event wait trait specialization.
330 template<typename TDev>
332 {
335 EventGenericThreads<TDev> const& event) -> void
336 {
337 // NOTE: Difference to non-blocking version: directly wait for event.
338 wait(*event.m_spEventImpl);
339 }
340 };
341
342 //! The CPU blocking device queue event wait trait specialization.
343 template<typename TDev>
345 {
348 EventGenericThreads<TDev> const& event) -> void
349 {
350 wait(*queue.m_spQueueImpl, event);
351 }
352 };
353
354 //! The CPU non-blocking device event wait trait specialization.
355 //!
356 //! Any future work submitted in any queue of this device will wait for event to complete before beginning
357 //! execution.
358 template<typename TDev>
360 {
361 ALPAKA_FN_HOST static auto waiterWaitFor(TDev& dev, EventGenericThreads<TDev> const& event) -> void
362 {
363 // Get all the queues on the device at the time of invocation.
364 // All queues added afterwards are ignored.
365 auto vspQueues(dev.getAllQueues());
366
367 // Let all the queues wait for this event.
368 // Furthermore there should not even be a chance to enqueue something between getting the queues and
369 // adding our wait events!
370 for(auto&& spQueue : vspQueues)
371 {
372 spQueue->wait(event);
373 }
374 }
375 };
376
377 //! The CPU non-blocking device queue thread wait trait specialization.
378 //!
379 //! Blocks execution of the calling thread until the queue has finished processing all previously requested
380 //! tasks (kernels, data copies, ...)
381 template<typename TDev>
383 {
385 {
386 // Enqueue a dummy tasks into the worker thread of the queue will provide a future we can wait for.
387 // Previously we enqueued an event into the queue but this will not guarantee that queue is empty
388 // after the event is finished because the event handling can be finished before the event task is
389 // fully removed from the queue.
390 auto f = queue.m_spQueueImpl->m_workerThread.submit([]() noexcept {});
391 f.wait();
392 }
393 };
394 } // namespace trait
395} // namespace alpaka
#define ALPAKA_ASSERT(...)
The assert can be explicit disabled by defining NDEBUG.
Definition Assert.hpp:13
#define ALPAKA_DEBUG_MINIMAL_LOG_SCOPE
Definition Debug.hpp:55
EventGenericThreads(TDev const &dev, bool bBusyWaiting=true)
std::shared_ptr< generic::detail::EventGenericThreadsImpl< TDev > > m_spEventImpl
auto operator==(EventGenericThreads< TDev > const &rhs) const -> bool
auto operator!=(EventGenericThreads< TDev > const &rhs) const -> bool
std::size_t m_LastReadyEnqueueCount
The time this event has been ready the last time. Ready means that the event was not waiting within a...
std::shared_future< void > m_future
The future signaling the event completion.
EventGenericThreadsImpl(EventGenericThreadsImpl< TDev > const &)=delete
std::mutex m_mutex
The mutex used to synchronize access to the event.
std::size_t m_enqueueCount
The number of times this event has been enqueued.
TDev const m_dev
The device this event is bound to.
auto wait(std::size_t const &enqueueCount, std::unique_lock< std::mutex > &lk) const noexcept -> void
auto operator=(EventGenericThreadsImpl< TDev > const &) -> EventGenericThreadsImpl< TDev > &=delete
#define ALPAKA_FN_HOST
Definition Common.hpp:40
ALPAKA_FN_HOST auto currentThreadWaitForDevice(TDev const &dev) -> void
The alpaka accelerator library.
ALPAKA_FN_HOST auto enqueue(TQueue &queue, TTask &&task) -> void
Queues the given task in the given queue.
Definition Traits.hpp:47
ALPAKA_FN_HOST auto wait(TAwaited const &awaited) -> void
Waits the thread for the completion of the given awaited action to complete.
Definition Traits.hpp:34
STL namespace.
Tag used in class inheritance hierarchies that describes that a specific interface (TInterface) is im...
Definition Interface.hpp:15
static ALPAKA_FN_HOST auto currentThreadWaitFor(EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto currentThreadWaitFor(QueueGenericThreadsNonBlocking< TDev > const &queue) -> void
static ALPAKA_FN_HOST auto currentThreadWaitFor(alpaka::generic::detail::EventGenericThreadsImpl< TDev > const &eventImpl) -> void
The thread wait trait.
Definition Traits.hpp:21
The device type trait.
Definition Traits.hpp:23
static ALPAKA_FN_HOST auto enqueue(QueueGenericThreadsBlocking< TDev > &queue, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue(QueueGenericThreadsNonBlocking< TDev > &queue, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue(alpaka::generic::detail::QueueGenericThreadsBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > &event) -> void
static ALPAKA_FN_HOST auto enqueue(alpaka::generic::detail::QueueGenericThreadsNonBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > &event) -> void
The queue enqueue trait.
Definition Traits.hpp:27
static ALPAKA_FN_HOST auto getDev(EventGenericThreads< TDev > const &event) -> TDev
The device get trait.
Definition Traits.hpp:27
static ALPAKA_FN_HOST auto isComplete(EventGenericThreads< TDev > const &event) -> bool
The event tester trait.
Definition Traits.hpp:21
static ALPAKA_FN_HOST auto waiterWaitFor(QueueGenericThreadsBlocking< TDev > &queue, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(QueueGenericThreadsNonBlocking< TDev > &queue, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(TDev &dev, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(alpaka::generic::detail::QueueGenericThreadsBlockingImpl< TDev > &, EventGenericThreads< TDev > const &event) -> void
static ALPAKA_FN_HOST auto waiterWaitFor(alpaka::generic::detail::QueueGenericThreadsNonBlockingImpl< TDev > &queueImpl, EventGenericThreads< TDev > const &event) -> void
The waiter wait trait.
Definition Traits.hpp:25