Vanetza
Loading...
Searching...
No Matches
link_layer_client.cpp
1#include <vanetza/access/data_request.hpp>
2#include <vanetza/access/pppp.hpp>
3#include <vanetza/net/packet_variant.hpp>
4#include <vanetza/rpc/link_layer_client.hpp>
5#include <vanetza/rpc/logger.hpp>
6
7#include <capnp/rpc-twoparty.h>
8#include <capnp/rpc.h>
9#include <kj/async.h>
10#include <kj/time.h>
11#include "vanetza.capnp.h"
12
13#include <array>
14
15namespace vanetza
16{
17namespace rpc
18{
19
20namespace
21{
22
23LinkLayerClient::ErrorCode map_error_code(vanetza::rpc::LinkLayer::ErrorCode in)
24{
25 switch (in)
26 {
27 case LinkLayer::ErrorCode::OK:
28 return LinkLayerClient::ErrorCode::Ok;
29 case LinkLayer::ErrorCode::INVALID_ARGUMENT:
30 return LinkLayerClient::ErrorCode::InvalidArgument;
31 case LinkLayer::ErrorCode::UNSUPPORTED:
32 return LinkLayerClient::ErrorCode::Unsupported;
33 case LinkLayer::ErrorCode::INTERNAL_ERROR:
34 default:
35 return LinkLayerClient::ErrorCode::InternalError;
36 };
37}
38
39class DataListener : public vanetza::rpc::LinkLayer::DataListener::Server
40{
41public:
42 DataListener(std::function<void(LinkLayerClient::Indication)> callback) :
43 callback_(callback)
44 {
45 }
46
47 kj::Promise<void> onDataIndication(OnDataIndicationContext context) override
48 {
49 auto frame = context.getParams().getFrame();
50 vanetza::ByteBuffer payload { frame.getPayload().begin(), frame.getPayload().end() };
51 LinkLayerClient::Indication indication { std::move(payload) };
52 assign(indication.source, frame.getSourceAddress());
53 assign(indication.destination, frame.getDestinationAddress());
54 if (context.getParams().hasRxParams()) {
55 if (context.getParams().getRxParams().isWlan()) {
56 indication.technology = LinkLayerClient::Technology::ITS_G5;
57 } else if (context.getParams().getRxParams().isCv2x()) {
58 indication.technology = LinkLayerClient::Technology::LTE_V2X;
59 }
60 }
61 callback_(std::move(indication));
62 return kj::READY_NOW;
63 }
64
65 void assign(MacAddress& into, const capnp::Data::Reader& from)
66 {
67 if (from.size() == MacAddress::length_bytes)
68 {
69 std::copy(from.begin(), from.end(), into.octets.begin());
70 }
71 else if (from.size() < MacAddress::length_bytes)
72 {
73 auto it = std::next(into.octets.begin(), MacAddress::length_bytes - from.size());
74 std::fill(into.octets.begin(), it, 0);
75 std::copy(from.begin(), from.end(), it);
76 }
77 else
78 {
79 auto it = std::next(from.begin(), from.size() - MacAddress::length_bytes);
80 std::copy(it, from.end(), into.octets.begin());
81 }
82 }
83
84private:
85 std::function<void(LinkLayerClient::Indication)> callback_;
86};
87
88class CbrListener : public vanetza::rpc::LinkLayer::CbrListener::Server
89{
90public:
91 CbrListener(std::function<void(dcc::ChannelLoad)> callback) :
92 callback_(callback)
93 {
94 }
95
96 kj::Promise<void> onCbrReport(OnCbrReportContext context) override
97 {
98 auto cbr = context.getParams().getCbr();
99 dcc::ChannelLoad channel_load;
100 if (cbr.getSamples() > 0 && cbr.getBusy() > 0) {
101 if (cbr.getSamples() >= cbr.getBusy()) {
102 channel_load = dcc::ChannelLoad(cbr.getBusy(), cbr.getSamples());
103 } else {
104 channel_load = dcc::ChannelLoad(cbr.getSamples(), cbr.getSamples());
105 }
106 };
107 callback_(channel_load);
108 return kj::READY_NOW;
109 }
110
111private:
112 std::function<void(dcc::ChannelLoad)> callback_;
113};
114
115} // namespace
116
117LinkLayerClient::Indication::Indication(vanetza::ByteBuffer buffer) :
118 packet(std::move(buffer), OsiLayer::Network)
119{
120}
121
122class LinkLayerClient::Context : public kj::TaskSet::ErrorHandler
123{
124public:
125 Context(kj::Timer& timer, kj::AsyncIoStream& connection, Logger* logger) :
126 logger_(logger),
127 timer_(timer),
128 task_set_(*this),
129 client_(connection),
130 link_layer_(client_.bootstrap().castAs<vanetza::rpc::LinkLayer>())
131 {
132 }
133
134 void taskFailed(kj::Exception&& exception) override
135 {
136 VANETZA_RPC_LOG_ERROR(logger_, "LinkLayerClient/task", exception.getDescription().cStr());
137 }
138
139 void addTask(kj::Promise<void>&& promise, kj::Duration timeout)
140 {
141 task_set_.add(timer_.timeoutAfter(timeout, kj::mv(promise)));
142 }
143
144 Logger* logger_ = nullptr;
145 kj::Timer& timer_;
146 kj::TaskSet task_set_;
147 capnp::TwoPartyClient client_;
148 vanetza::rpc::LinkLayer::Client link_layer_;
149};
150
151LinkLayerClient::LinkLayerClient(kj::Timer& timer, kj::AsyncIoStream& connection, Logger* logger) :
152 context_(std::make_unique<Context>(timer, connection, logger))
153{
154 auto rx_data = context_->link_layer_.subscribeDataRequest();
155 rx_data.setListener(kj::heap<DataListener>(std::bind(&LinkLayerClient::do_indicate, this, std::placeholders::_1)));
156 context_->addTask(rx_data.send().ignoreResult(), 1 * kj::SECONDS);
157
158 auto cbr = context_->link_layer_.subscribeCbrRequest();
159 cbr.setListener(kj::heap<CbrListener>(std::bind(&LinkLayerClient::do_report, this, std::placeholders::_1)));
160 context_->addTask(cbr.send().ignoreResult(), 1 * kj::SECONDS);
161}
162
163LinkLayerClient::~LinkLayerClient()
164{
165}
166
167void LinkLayerClient::configure(Technology technology)
168{
169 VANETZA_RPC_LOG_DEBUG(context_->logger_, "LinkLayerClient/configure", stringify(technology));
170 technology_ = technology;
171}
172
173void LinkLayerClient::add_task(kj::Promise<void>&& promise)
174{
175 VANETZA_RPC_LOG_DEBUG(context_->logger_, "LinkLayerClient/task", "add");
176 context_->task_set_.add(kj::mv(promise));
177}
178
179kj::Promise<LinkLayerClient::Identity> LinkLayerClient::identify()
180{
181 auto ident_request = context_->link_layer_.identifyRequest();
182 auto promise = ident_request.send().then(
183 [](capnp::Response<rpc::LinkLayer::IdentifyResults>&& results) mutable -> kj::Promise<Identity> {
184 Identity identity;
185 identity.id = results.getId();
186 identity.version = results.getVersion();
187 if (results.hasInfo()) {
188 identity.info = results.getInfo().cStr();
189 }
190 return identity;
191 });
192 return promise;
193}
194
195void LinkLayerClient::request(const access::DataRequest& request, std::unique_ptr<ChunkPacket> packet)
196{
197 auto tx_data = context_->link_layer_.transmitDataRequest();
198
199 auto frame = tx_data.initFrame();
200 frame.setSourceAddress(kj::ArrayPtr<const kj::byte> { request.source_addr.octets.data(), request.source_addr.octets.size() });
201 frame.setDestinationAddress(kj::ArrayPtr<const kj::byte> { request.destination_addr.octets.data(), request.destination_addr.octets.size() });
202 auto payload_view = create_byte_view(*packet, OsiLayer::Network, OsiLayer::Application);
203 vanetza::ByteBuffer payload { payload_view.begin(), payload_view.end() };
204 frame.setPayload(kj::ArrayPtr<const kj::byte> { payload.data(), payload.size() });
205
206 auto tx_params = tx_data.initTxParams();
207 if (technology_ == Technology::ITS_G5) {
208 auto wlan_tx_params = tx_params.initWlan();
209 wlan_tx_params.setPriority(access::user_priority(request.access_category));
210 } else if (technology_ == Technology::LTE_V2X) {
211 auto cv2x_tx_params = tx_params.initCv2x();
212 cv2x_tx_params.setPriority(access::pppp_from_ac(request.access_category));
213 }
214
215 auto promise = tx_data.send().then([this](capnp::Response<vanetza::rpc::LinkLayer::TransmitDataResults>&& results) -> kj::Promise<void> {
216 if (results.getError() != vanetza::rpc::LinkLayer::ErrorCode::OK) {
217 VANETZA_RPC_LOG_ERROR(context_->logger_, "LinkLayerClient/request", stringify(map_error_code(results.getError())));
218 } else {
219 VANETZA_RPC_LOG_DEBUG(context_->logger_, "LinkLayerClient/request", "ok");
220 }
221 return kj::READY_NOW;
222 });
223 context_->addTask(kj::mv(promise), 100 * kj::MILLISECONDS);
224}
225
226void LinkLayerClient::do_indicate(Indication indication)
227{
228 VANETZA_RPC_LOG_DEBUG(context_->logger_, "LinkLayerClient/indicate", stringify(indication.technology))
229 std::lock_guard<std::mutex> lock(callback_mutex_);
230 if (indication_callback_) {
231 indication_callback_(indication);
232 }
233}
234
235void LinkLayerClient::do_report(dcc::ChannelLoad cl)
236{
237 std::lock_guard<std::mutex> lock(callback_mutex_);
238 if (cbr_callback_) {
239 cbr_callback_(cl);
240 }
241}
242
243void LinkLayerClient::indicate(IndicationCallback callback)
244{
245 std::lock_guard<std::mutex> lock(callback_mutex_);
246 indication_callback_ = callback;
247}
248
249void LinkLayerClient::report_channel_load(ChannelLoadReportCallback callback)
250{
251 std::lock_guard<std::mutex> lock(callback_mutex_);
252 cbr_callback_ = callback;
253}
254
255kj::Promise<LinkLayerClient::ErrorCode> LinkLayerClient::set_source_address(const MacAddress& addr)
256{
257 auto request = context_->link_layer_.setSourceAddressRequest();
258 auto msg_addr = request.initAddress(MacAddress::length_bytes);
259 std::copy(addr.octets.begin(), addr.octets.end(), msg_addr.begin());
260
261 using Response = capnp::Response<vanetza::rpc::LinkLayer::SetSourceAddressResults>;
262 kj::ForkedPromise<ErrorCode> forked = request.send().then([this](Response&& response) -> kj::Promise<ErrorCode> {
263 auto result = map_error_code(response.getError());
264 VANETZA_RPC_LOG_DEBUG(context_->logger_, "LinkLayerClient/SetSourceAddress", stringify(result));
265 return result;
266 }).fork();
267 context_->addTask(forked.addBranch().ignoreResult(), 500 * kj::MILLISECONDS);
268 return forked.addBranch();
269}
270
271const char* stringify(LinkLayerClient::ErrorCode ec)
272{
273 using ErrorCode = LinkLayerClient::ErrorCode;
274 static const std::array<const char*, 4> strings = { "ok", "invalid argument", "unsupported", "internal error" };
275 static_assert(static_cast<std::size_t>(ErrorCode::Ok) == 0, "ErrorCode 'ok' is at index 0");
276 const auto idx = static_cast<std::size_t>(ec);
277 if (idx < 0 || idx >= strings.size()) {
278 return "unknown";
279 } else {
280 return strings[idx];
281 }
282}
283
284const char* stringify(LinkLayerClient::Technology tech)
285{
286 using Tech = LinkLayerClient::Technology;
287 if (tech == Tech::ITS_G5) {
288 return "ITS-G5";
289 } else if (tech == Tech::LTE_V2X) {
290 return "LTE-V2X";
291 } else if (tech == Tech::Unspecified) {
292 return "unspecified";
293 } else {
294 return "unknown";
295 }
296}
297
298} // namespace rpc
299} // namespace vanetza
STL namespace.