1#include <vanetza/common/annotation.hpp>
2#include <vanetza/rpc/asio_stream.hpp>
4#include <boost/asio/read.hpp>
5#include <boost/asio/write.hpp>
13AsioStream::AsioStream(boost::asio::ip::tcp::socket socket) :
14 socket_(std::move(socket))
20 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
23kj::Promise<
void>
AsioStream::write(
const void* buffer, size_t size)
25 auto paf = kj::newPromiseAndFulfiller<
void>();
26 boost::asio::const_buffer buf(buffer, size);
27 boost::asio::async_write(socket_, buf, [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
28 mark_unused(bytes_transferred);
30 fulfiller->reject(KJ_EXCEPTION(FAILED,
"write", ec.message()));
36 return kj::mv(paf.promise);
39kj::Promise<
void>
AsioStream::write(kj::ArrayPtr<
const kj::ArrayPtr<
const kj::byte>> pieces)
41 auto paf = kj::newPromiseAndFulfiller<
void>();
42 std::vector<boost::asio::const_buffer> buffers;
43 buffers.reserve(pieces.size());
44 for (
const auto& piece : pieces) {
45 buffers.push_back(boost::asio::buffer(piece.begin(), piece.size()));
47 boost::asio::async_write(socket_, buffers, [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
48 mark_unused(bytes_transferred);
50 fulfiller->reject(KJ_EXCEPTION(FAILED,
"write", ec.message()));
55 return kj::mv(paf.promise);
58kj::Promise<
void>
AsioStream::whenWriteDisconnected()
60 return kj::NEVER_DONE;
63kj::Promise<size_t>
AsioStream::tryRead(
void* buffer, size_t minBytes, size_t maxBytes)
65 auto paf = kj::newPromiseAndFulfiller<size_t>();
66 boost::asio::async_read(socket_,
67 boost::asio::buffer(buffer, maxBytes),
68 boost::asio::transfer_at_least(minBytes),
69 [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
71 fulfiller->reject(KJ_EXCEPTION(FAILED,
"read", ec.message()));
73 fulfiller->fulfill(kj::mv(bytes_transferred));
76 return kj::mv(paf.promise);