-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathpool_handler.hpp
123 lines (102 loc) · 4.76 KB
/
pool_handler.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include "injector/inject.hpp"
namespace kagome {
inline bool runningInThisThread(
std::shared_ptr<boost::asio::io_context> ioc) {
return ioc->get_executor().running_in_this_thread();
}
class PoolHandler {
public:
PoolHandler(PoolHandler &&) = delete;
PoolHandler(const PoolHandler &) = delete;
PoolHandler &operator=(PoolHandler &&) = delete;
PoolHandler &operator=(const PoolHandler &) = delete;
DONT_INJECT(PoolHandler);
explicit PoolHandler(std::shared_ptr<boost::asio::io_context> io_context)
: is_active_{false}, ioc_{std::move(io_context)} {}
~PoolHandler() = default;
void start() {
started_ = true;
is_active_.store(true);
}
void stop() {
is_active_.store(false);
}
template <typename F>
void execute(F &&func) {
if (is_active_.load(std::memory_order_acquire)) {
post(*ioc_, std::forward<F>(func));
} else if (not started_) {
throw std::logic_error{"PoolHandler lost callback before start()"};
}
}
friend void post(PoolHandler &self, auto f) {
return self.execute(std::move(f));
}
bool isInCurrentThread() const {
return runningInThisThread(ioc_);
}
friend bool runningInThisThread(const PoolHandler &self) {
return self.isInCurrentThread();
}
private:
std::atomic_bool is_active_;
std::atomic_bool started_ = false;
std::shared_ptr<boost::asio::io_context> ioc_;
};
auto wrap(PoolHandler &handler, auto f) {
return [&handler, f{std::move(f)}](auto &&...a) mutable {
handler.execute(
[f{std::move(f)}, ... a{std::forward<decltype(a)>(a)}]() mutable {
f(std::forward<decltype(a)>(a)...);
});
};
}
} // namespace kagome
#define REINVOKE(ctx, func, ...) \
({ \
if (not runningInThisThread(ctx)) { \
return post(ctx, \
[weak{weak_from_this()}, \
args = std::make_tuple(__VA_ARGS__)]() mutable { \
if (auto self = weak.lock()) { \
std::apply( \
[&](auto &&...args) mutable { \
self->func(std::forward<decltype(args)>(args)...); \
}, \
std::move(args)); \
} \
}); \
} \
})
#define EXPECT_THREAD(ctx) \
if (not runningInThisThread(ctx)) throw std::logic_error { \
"expected to execute on other thread" \
}
/// Reinvokes function once depending on `template <bool kReinvoke>` argument.
/// If `true` reinvoke takes place, otherwise direct call. After reinvoke called
/// function has `false` in kReinvoke.
#define REINVOKE_ONCE(ctx, func, ...) \
({ \
if constexpr (kReinvoke) { \
return post( \
ctx, \
[weak{weak_from_this()}, \
args = std::make_tuple(__VA_ARGS__)]() mutable { \
if (auto self = weak.lock()) { \
std::apply( \
[&](auto &&...args) mutable { \
self->func<false>(std::forward<decltype(args)>(args)...); \
}, \
std::move(args)); \
} \
}); \
} \
})