Vanetza
Loading...
Searching...
No Matches
asio_stream.cpp
1#include <vanetza/rpc/asio_stream.hpp>
2
3#include <boost/asio/read.hpp>
4#include <boost/asio/write.hpp>
5#include <kj/debug.h>
6
7namespace vanetza
8{
9namespace rpc
10{
11
12AsioStream::AsioStream(boost::asio::ip::tcp::socket socket) :
13 socket_(std::move(socket))
14{
15}
16
17void AsioStream::shutdownWrite()
18{
19 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
20}
21
22kj::Promise<void> AsioStream::write(const void* buffer, size_t size)
23{
24 auto paf = kj::newPromiseAndFulfiller<void>();
25 boost::asio::const_buffer buf(buffer, size);
26 boost::asio::async_write(socket_, buf, [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
27 if (ec) {
28 fulfiller->reject(KJ_EXCEPTION(FAILED, "write", ec.message()));
29 } else {
30 fulfiller->fulfill();
31 }
32 });
33
34 return kj::mv(paf.promise);
35}
36
37kj::Promise<void> AsioStream::write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces)
38{
39 auto paf = kj::newPromiseAndFulfiller<void>();
40 std::vector<boost::asio::const_buffer> buffers;
41 buffers.reserve(pieces.size());
42 for (const auto& piece : pieces) {
43 buffers.push_back(boost::asio::buffer(piece.begin(), piece.size()));
44 }
45 boost::asio::async_write(socket_, buffers, [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
46 if (ec) {
47 fulfiller->reject(KJ_EXCEPTION(FAILED, "write", ec.message()));
48 } else {
49 fulfiller->fulfill();
50 }
51 });
52 return kj::mv(paf.promise);
53}
54
55kj::Promise<void> AsioStream::whenWriteDisconnected()
56{
57 return kj::NEVER_DONE;
58}
59
60kj::Promise<size_t> AsioStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes)
61{
62 auto paf = kj::newPromiseAndFulfiller<size_t>();
63 boost::asio::async_read(socket_,
64 boost::asio::buffer(buffer, maxBytes),
65 boost::asio::transfer_at_least(minBytes),
66 [fulfiller = std::move(paf.fulfiller)](const boost::system::error_code& ec, std::size_t bytes_transferred) mutable {
67 if (ec) {
68 fulfiller->reject(KJ_EXCEPTION(FAILED, "read", ec.message()));
69 } else {
70 fulfiller->fulfill(kj::mv(bytes_transferred));
71 }
72 });
73 return kj::mv(paf.promise);
74}
75
76
77} // namespace rpc
78} // namespace vanteza