5#include <vanetza/access/data_request.hpp>
6#include <vanetza/access/interface.hpp>
7#include <vanetza/common/runtime.hpp>
16 m_runtime(runtime), m_trc(trc), m_access(ifc), m_queue_length(0)
30 if (transmit_immediately(transmission)) {
32 transmit(request, std::move(packet));
34 enqueue(request, std::move(packet));
41 auto transmission = dequeue();
44 transmit(transmission->request, std::move(transmission->packet));
49 schedule_trigger(*next);
55 auto callback_delay = m_trc
.delay(tx);
56 m_runtime.schedule(callback_delay, std::bind(&
FlowControl::trigger,
this),
this);
61 const bool first_packet = empty();
62 const auto ac = map_profile_onto_ac(request.dcc_profile);
63 auto expiry = m_runtime
.now() + request.lifetime;
64 while (m_queue_length > 0 && m_queues[ac].size() >= m_queue_length) {
65 m_queues[ac].pop_front();
66 m_packet_drop_hook(ac, packet.get());
68 m_queues[ac].emplace_back(expiry, request, std::move(packet));
71 schedule_trigger(m_queues[ac].back());
78 Queue* queue = next_queue();
80 transmission = std::move(queue->front());
89 const auto ac = map_profile_onto_ac(transmission.profile());
92 bool contention =
false;
93 for (
auto it = m_queues.cbegin(); it != m_queues.end(); ++it) {
94 if (it->first >= ac && !it->second.empty()) {
100 return !contention && m_trc.delay(transmission) == Clock::duration::zero();
105 return std::all_of(m_queues.cbegin(), m_queues.cend(),
106 [](
const std::pair<access::AccessCategory,
const Queue&>& kv) {
107 return kv.second.empty();
113 Queue* next =
nullptr;
114 Clock::duration min_delay = Clock::duration::max();
116 for (
auto& kv : m_queues) {
117 Queue& queue = kv.second;
118 if (!queue.empty()) {
119 const auto delay = m_trc.delay(queue.front());
120 if (delay < min_delay) {
131 Queue* queue = next_queue();
132 return queue ? &queue->front() :
nullptr;
137 for (
auto& kv : m_queues) {
138 access::AccessCategory ac = kv.first;
139 Queue& queue = kv.second;
140 queue.remove_if([
this, ac](
const PendingTransmission& transmission) {
141 bool drop = transmission.expiry < m_runtime.now();
143 m_packet_drop_hook(ac, transmission.packet.get());
153 access_request.source_addr = request.source;
154 access_request.destination_addr = request.destination;
155 access_request.ether_type = request.ether_type;
156 access_request.access_category = map_profile_onto_ac(request.dcc_profile);
158 m_packet_transmit_hook(access_request.access_category, packet.get());
159 m_access.request(access_request, std::move(packet));
164 m_packet_drop_hook = std::move(cb);
169 m_packet_transmit_hook = std::move(cb);
174 m_queue_length = length;
182 schedule_trigger(*next);
virtual void cancel(const void *scope)=0
virtual Clock::time_point now() const =0
FlowControl(Runtime &, TransmitRateControl &, access::Interface &)
void set_packet_drop_hook(PacketDropHook::callback_type &&)
void queue_length(std::size_t length)
void request(const DataRequest &, std::unique_ptr< ChunkPacket >) override
void set_packet_transmit_hook(PacketTransmitHook::callback_type &&)
virtual void notify(const Transmission &tx)=0
virtual Clock::duration delay(const Transmission &tx)=0