1#include <vanetza/access/data_request.hpp>
2#include <vanetza/access/pppp.hpp>
3#include <vanetza/net/packet_variant.hpp>
4#include <vanetza/rpc/link_layer_client.hpp>
5#include <vanetza/rpc/logger.hpp>
7#include <capnp/rpc-twoparty.h>
11#include "vanetza.capnp.h"
23LinkLayerClient::ErrorCode map_error_code(vanetza::rpc::LinkLayer::ErrorCode in)
27 case LinkLayer::ErrorCode::OK:
28 return LinkLayerClient::ErrorCode::Ok;
29 case LinkLayer::ErrorCode::INVALID_ARGUMENT:
30 return LinkLayerClient::ErrorCode::InvalidArgument;
31 case LinkLayer::ErrorCode::UNSUPPORTED:
32 return LinkLayerClient::ErrorCode::Unsupported;
33 case LinkLayer::ErrorCode::INTERNAL_ERROR:
35 return LinkLayerClient::ErrorCode::InternalError;
39class DataListener :
public vanetza::rpc::LinkLayer::DataListener::Server
42 DataListener(std::function<
void(LinkLayerClient::Indication)> callback) :
47 kj::Promise<void> onDataIndication(OnDataIndicationContext context)
override
49 auto frame = context.getParams().getFrame();
50 vanetza::ByteBuffer payload { frame.getPayload().begin(), frame.getPayload().end() };
51 LinkLayerClient::Indication indication { std::move(payload) };
52 assign(indication.source, frame.getSourceAddress());
53 assign(indication.destination, frame.getDestinationAddress());
54 if (context.getParams().hasRxParams()) {
55 if (context.getParams().getRxParams().isWlan()) {
56 indication.technology = LinkLayerClient::Technology::ITS_G5;
57 }
else if (context.getParams().getRxParams().isCv2x()) {
58 indication.technology = LinkLayerClient::Technology::LTE_V2X;
61 callback_(std::move(indication));
65 void assign(MacAddress& into,
const capnp::Data::Reader& from)
67 if (from.size() == MacAddress::length_bytes)
69 std::copy(from.begin(), from.end(), into.octets.begin());
71 else if (from.size() < MacAddress::length_bytes)
73 auto it = std::next(into.octets.begin(), MacAddress::length_bytes - from.size());
74 std::fill(into.octets.begin(), it, 0);
75 std::copy(from.begin(), from.end(), it);
79 auto it = std::next(from.begin(), from.size() - MacAddress::length_bytes);
80 std::copy(it, from.end(), into.octets.begin());
85 std::function<void(LinkLayerClient::Indication)> callback_;
88class CbrListener :
public vanetza::rpc::LinkLayer::CbrListener::Server
91 CbrListener(std::function<
void(dcc::ChannelLoad)> callback) :
96 kj::Promise<void> onCbrReport(OnCbrReportContext context)
override
98 auto cbr = context.getParams().getCbr();
99 dcc::ChannelLoad channel_load;
100 if (cbr.getSamples() > 0 && cbr.getBusy() > 0) {
101 if (cbr.getSamples() >= cbr.getBusy()) {
102 channel_load = dcc::ChannelLoad(cbr.getBusy(), cbr.getSamples());
104 channel_load = dcc::ChannelLoad(cbr.getSamples(), cbr.getSamples());
107 callback_(channel_load);
108 return kj::READY_NOW;
112 std::function<void(dcc::ChannelLoad)> callback_;
117LinkLayerClient::Indication::Indication(vanetza::ByteBuffer buffer) :
118 packet(std::move(buffer), OsiLayer::Network)
122class LinkLayerClient::Context :
public kj::TaskSet::ErrorHandler
125 Context(kj::Timer& timer, kj::AsyncIoStream& connection, Logger* logger) :
130 link_layer_(client_.bootstrap().castAs<vanetza::rpc::LinkLayer>())
134 void taskFailed(kj::Exception&& exception)
override
136 VANETZA_RPC_LOG_ERROR(logger_,
"LinkLayerClient/task", exception.getDescription().cStr());
139 void addTask(kj::Promise<void>&& promise, kj::Duration timeout)
141 task_set_.add(timer_.timeoutAfter(timeout, kj::mv(promise)));
144 Logger* logger_ =
nullptr;
146 kj::TaskSet task_set_;
147 capnp::TwoPartyClient client_;
148 vanetza::rpc::LinkLayer::Client link_layer_;
151LinkLayerClient::LinkLayerClient(kj::Timer& timer, kj::AsyncIoStream& connection, Logger* logger) :
152 context_(
std::make_unique<Context>(timer, connection, logger))
154 auto rx_data = context_->link_layer_.subscribeDataRequest();
155 rx_data.setListener(kj::heap<DataListener>(std::bind(&LinkLayerClient::do_indicate,
this, std::placeholders::_1)));
156 context_->addTask(rx_data.send().ignoreResult(), 1 * kj::SECONDS);
158 auto cbr = context_->link_layer_.subscribeCbrRequest();
159 cbr.setListener(kj::heap<CbrListener>(std::bind(&LinkLayerClient::do_report,
this, std::placeholders::_1)));
160 context_->addTask(cbr.send().ignoreResult(), 1 * kj::SECONDS);
163LinkLayerClient::~LinkLayerClient()
167void LinkLayerClient::configure(Technology technology)
169 VANETZA_RPC_LOG_DEBUG(context_->logger_,
"LinkLayerClient/configure", stringify(technology));
170 technology_ = technology;
173void LinkLayerClient::add_task(kj::Promise<void>&& promise)
175 VANETZA_RPC_LOG_DEBUG(context_->logger_,
"LinkLayerClient/task",
"add");
176 context_->task_set_.add(kj::mv(promise));
179kj::Promise<LinkLayerClient::Identity> LinkLayerClient::identify()
181 auto ident_request = context_->link_layer_.identifyRequest();
182 auto promise = ident_request.send().then(
183 [](capnp::Response<rpc::LinkLayer::IdentifyResults>&& results)
mutable -> kj::Promise<Identity> {
185 identity.id = results.getId();
186 identity.version = results.getVersion();
187 if (results.hasInfo()) {
188 identity.info = results.getInfo().cStr();
195void LinkLayerClient::request(
const access::DataRequest& request, std::unique_ptr<ChunkPacket> packet)
197 auto tx_data = context_->link_layer_.transmitDataRequest();
199 auto frame = tx_data.initFrame();
200 frame.setSourceAddress(kj::ArrayPtr<const kj::byte> { request.source_addr.octets.data(), request.source_addr.octets.size() });
201 frame.setDestinationAddress(kj::ArrayPtr<const kj::byte> { request.destination_addr.octets.data(), request.destination_addr.octets.size() });
202 auto payload_view = create_byte_view(*packet, OsiLayer::Network, OsiLayer::Application);
203 vanetza::ByteBuffer payload { payload_view.begin(), payload_view.end() };
204 frame.setPayload(kj::ArrayPtr<const kj::byte> { payload.data(), payload.size() });
206 auto tx_params = tx_data.initTxParams();
207 if (technology_ == Technology::ITS_G5) {
208 auto wlan_tx_params = tx_params.initWlan();
209 wlan_tx_params.setPriority(access::user_priority(request.access_category));
210 }
else if (technology_ == Technology::LTE_V2X) {
211 auto cv2x_tx_params = tx_params.initCv2x();
212 cv2x_tx_params.setPriority(access::pppp_from_ac(request.access_category));
215 auto promise = tx_data.send().then([
this](capnp::Response<vanetza::rpc::LinkLayer::TransmitDataResults>&& results) -> kj::Promise<void> {
216 if (results.getError() != vanetza::rpc::LinkLayer::ErrorCode::OK) {
217 VANETZA_RPC_LOG_ERROR(context_->logger_,
"LinkLayerClient/request", stringify(map_error_code(results.getError())));
219 VANETZA_RPC_LOG_DEBUG(context_->logger_,
"LinkLayerClient/request",
"ok");
221 return kj::READY_NOW;
223 context_->addTask(kj::mv(promise), 100 * kj::MILLISECONDS);
226void LinkLayerClient::do_indicate(Indication indication)
228 VANETZA_RPC_LOG_DEBUG(context_->logger_,
"LinkLayerClient/indicate", stringify(indication.technology))
229 std::lock_guard<
std::mutex> lock(callback_mutex_);
230 if (indication_callback_) {
231 indication_callback_(indication);
235void LinkLayerClient::do_report(dcc::ChannelLoad cl)
237 std::lock_guard<std::mutex> lock(callback_mutex_);
243void LinkLayerClient::indicate(IndicationCallback callback)
245 std::lock_guard<std::mutex> lock(callback_mutex_);
246 indication_callback_ = callback;
249void LinkLayerClient::report_channel_load(ChannelLoadReportCallback callback)
251 std::lock_guard<std::mutex> lock(callback_mutex_);
252 cbr_callback_ = callback;
255kj::Promise<LinkLayerClient::ErrorCode> LinkLayerClient::set_source_address(
const MacAddress& addr)
257 auto request = context_->link_layer_.setSourceAddressRequest();
258 auto msg_addr = request.initAddress(MacAddress::length_bytes);
259 std::copy(addr.octets.begin(), addr.octets.end(), msg_addr.begin());
261 using Response = capnp::Response<vanetza::rpc::LinkLayer::SetSourceAddressResults>;
262 kj::ForkedPromise<ErrorCode> forked = request.send().then([
this](Response&& response) -> kj::Promise<ErrorCode> {
263 auto result = map_error_code(response.getError());
264 VANETZA_RPC_LOG_DEBUG(context_->logger_,
"LinkLayerClient/SetSourceAddress", stringify(result));
267 context_->addTask(forked.addBranch().ignoreResult(), 500 * kj::MILLISECONDS);
268 return forked.addBranch();
271const char* stringify(LinkLayerClient::ErrorCode ec)
273 using ErrorCode = LinkLayerClient::ErrorCode;
274 static const std::array<const char*, 4> strings = {
"ok",
"invalid argument",
"unsupported",
"internal error" };
275 static_assert(
static_cast<std::size_t
>(ErrorCode::Ok) == 0,
"ErrorCode 'ok' is at index 0");
276 const auto idx =
static_cast<std::size_t
>(ec);
277 if (idx < 0 || idx >= strings.size()) {
284const char* stringify(LinkLayerClient::Technology tech)
286 using Tech = LinkLayerClient::Technology;
287 if (tech == Tech::ITS_G5) {
289 }
else if (tech == Tech::LTE_V2X) {
291 }
else if (tech == Tech::Unspecified) {
292 return "unspecified";