Vanetza
Loading...
Searching...
No Matches
asio_stream.cpp
1#include <vanetza/common/annotation.hpp>
2#include <vanetza/rpc/asio_stream.hpp>
3
4#include <boost/asio/read.hpp>
5#include <boost/asio/write.hpp>
6#include <kj/debug.h>
7
8namespace vanetza
9{
10namespace rpc
11{
12
13AsioStream::AsioStream(boost::asio::ip::tcp::socket socket) :
14 socket_(std::move(socket))
15{
16}
17
18void AsioStream::shutdownWrite()
19{
20 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
21}
22
23kj::Promise<void> AsioStream::write(const void* buffer, size_t size)
24{
25 auto paf = kj::newPromiseAndFulfiller<void>();
26 boost::asio::const_buffer buf(buffer, size);
27 boost::asio::async_write(socket_, buf, [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
28 mark_unused(bytes_transferred);
29 if (ec) {
30 fulfiller->reject(KJ_EXCEPTION(FAILED, "write", ec.message()));
31 } else {
32 fulfiller->fulfill();
33 }
34 });
35
36 return kj::mv(paf.promise);
37}
38
39kj::Promise<void> AsioStream::write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces)
40{
41 auto paf = kj::newPromiseAndFulfiller<void>();
42 std::vector<boost::asio::const_buffer> buffers;
43 buffers.reserve(pieces.size());
44 for (const auto& piece : pieces) {
45 buffers.push_back(boost::asio::buffer(piece.begin(), piece.size()));
46 }
47 boost::asio::async_write(socket_, buffers, [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
48 mark_unused(bytes_transferred);
49 if (ec) {
50 fulfiller->reject(KJ_EXCEPTION(FAILED, "write", ec.message()));
51 } else {
52 fulfiller->fulfill();
53 }
54 });
55 return kj::mv(paf.promise);
56}
57
58kj::Promise<void> AsioStream::whenWriteDisconnected()
59{
60 return kj::NEVER_DONE;
61}
62
63kj::Promise<size_t> AsioStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes)
64{
65 auto paf = kj::newPromiseAndFulfiller<size_t>();
66 boost::asio::async_read(socket_,
67 boost::asio::buffer(buffer, maxBytes),
68 boost::asio::transfer_at_least(minBytes),
69 [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
70 if (ec) {
71 fulfiller->reject(KJ_EXCEPTION(FAILED, "read", ec.message()));
72 } else {
73 fulfiller->fulfill(kj::mv(bytes_transferred));
74 }
75 });
76 return kj::mv(paf.promise);
77}
78
79
80} // namespace rpc
81} // namespace vanteza