1#include <vanetza/rpc/asio_stream.hpp>
3#include <boost/asio/read.hpp>
4#include <boost/asio/write.hpp>
12AsioStream::AsioStream(boost::asio::ip::tcp::socket socket) :
13 socket_(std::move(socket))
17void AsioStream::shutdownWrite()
19 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
22kj::Promise<void> AsioStream::write(
const void* buffer,
size_t size)
24 auto paf = kj::newPromiseAndFulfiller<void>();
25 boost::asio::const_buffer buf(buffer, size);
26 boost::asio::async_write(socket_, buf, [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
28 fulfiller->reject(KJ_EXCEPTION(FAILED,
"write", ec.message()));
34 return kj::mv(paf.promise);
37kj::Promise<void> AsioStream::write(kj::ArrayPtr<
const kj::ArrayPtr<const kj::byte>> pieces)
39 auto paf = kj::newPromiseAndFulfiller<void>();
40 std::vector<boost::asio::const_buffer> buffers;
41 buffers.reserve(pieces.size());
42 for (
const auto& piece : pieces) {
43 buffers.push_back(boost::asio::buffer(piece.begin(), piece.size()));
45 boost::asio::async_write(socket_, buffers, [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
47 fulfiller->reject(KJ_EXCEPTION(FAILED,
"write", ec.message()));
52 return kj::mv(paf.promise);
55kj::Promise<void> AsioStream::whenWriteDisconnected()
57 return kj::NEVER_DONE;
60kj::Promise<size_t> AsioStream::tryRead(
void* buffer,
size_t minBytes,
size_t maxBytes)
62 auto paf = kj::newPromiseAndFulfiller<size_t>();
63 boost::asio::async_read(socket_,
64 boost::asio::buffer(buffer, maxBytes),
65 boost::asio::transfer_at_least(minBytes),
66 [fulfiller = std::move(paf.fulfiller)](
const boost::system::error_code& ec, std::size_t bytes_transferred)
mutable {
68 fulfiller->reject(KJ_EXCEPTION(FAILED,
"read", ec.message()));
70 fulfiller->fulfill(kj::mv(bytes_transferred));
73 return kj::mv(paf.promise);