Commit 44b41686 authored by Andreas Schmidt's avatar Andreas Schmidt

Add first BBR pieces.

parent 43734ead
Pipeline #2059 failed with stages
in 22 seconds
add_library(PRRT ../defines.h
block.c block.h
bbr.c bbr.h
channelStateInformation.c channelStateInformation.h
clock.c clock.h
codingParams.c codingParams.h
......@@ -20,6 +21,6 @@ add_library(PRRT ../defines.h
types/packetTimeout.c types/packetTimeout.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
target_link_libraries(PRRT rt)
#include "bbr.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "receiver.h"
uint32_t BBR_Inflight(BBR* bbr, float gain)
{
if (bbr->rtprop == Inf)
return InitialCwnd; /* no valid RTT samples yet */
uint32_t quanta = 3 * bbr->send_quantum;
uint32_t estimated_bdp = bbr->bw * bbr->rtprop;
return (uint32_t)(gain * estimated_bdp + quanta);
}
void BBR_EnterStartup(BBR* bbr)
{
bbr->state = STARTUP;
bbr->pacing_gain = BBRHighGain;
bbr->cwnd_gain = BBRHighGain;
}
void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, prrtByteCount_t packet_delivered, prrtByteCount_t delivered)
{
if (packet_delivered >= bbr->next_round_delivered) {
bbr->next_round_delivered = delivered;
bbr->round_count++;
bbr->round_start = true;
} else {
bbr->round_start = false;
}
if ((rs->delivery_rate >= bbr->bw || !rs->is_app_limited) && rs->delivery_rate != 0) {
bbr->bw = (uint32_t)WindowedFilter_push(bbr->btlBwFilter, (uint32_t)rs->delivery_rate);
debug(DEBUG_BBR, "Current BtlBw: %u, RS delivery rate: %u", bbr->bw, (uint32_t)rs->delivery_rate);
}
}
void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs)
{
if (bbr->filled_pipe || !bbr->round_start || rs->is_app_limited)
return; // no need to check for a full pipe now
if (bbr->bw >= bbr->full_bw * 1.25) { // BBR.BtlBw still growing?
bbr->full_bw = bbr->bw; // record new baseline level
bbr->full_bw_count = 0;
return;
}
bbr->full_bw_count++; // another round w/o much growth
if (bbr->full_bw_count >= 3)
bbr->filled_pipe = true;
}
bool BBR_IsNextCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight)
{
bool is_full_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > bbr->rtprop;
if (bbr->pacing_gain == 1)
return is_full_length;
if (bbr->pacing_gain > 1)
return is_full_length && (packets_lost > 0 || prior_inflight >= BBR_Inflight(bbr, bbr->pacing_gain));
return is_full_length || (prior_inflight <= BBR_Inflight(bbr, 1.0));
}
void BBR_AdvanceCyclePhase(BBR* bbr)
{
bbr->cycle_stamp = PrrtClock_get_current_time_us();
bbr->cycle_index = (uint8_t )((bbr->cycle_index + 1) % BBRGainCycleLen);
float pacing_gain_cycle[BBRGainCycleLen] = {1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0};
bbr->pacing_gain = pacing_gain_cycle[bbr->cycle_index];
debug(DEBUG_BBR, "Advanced cycle with gain: %f", bbr->pacing_gain);
}
void BBR_CheckCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight) {
if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, packets_lost, prior_inflight))
BBR_AdvanceCyclePhase(bbr);
}
void BBR_EnterProbeBW(BBR* bbr)
{
bbr->state = PROBE_BW;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 2;
bbr->cycle_index = (uint8_t)(BBRGainCycleLen - 1 - (random() % 7));
BBR_AdvanceCyclePhase(bbr);
}
void BBR_CheckDrain(BBR* bbr, prrtSequenceNumber_t packets_in_flight)
{
if (bbr->state == STARTUP && bbr->filled_pipe) {
//Drain
bbr->state = DRAIN;
bbr->pacing_gain = 1 / BBRHighGain; // pace slowly
bbr->cwnd_gain = BBRHighGain; // maintain cwnd
}
if (bbr->state == DRAIN && packets_in_flight <= BBR_Inflight(bbr, 1.0))
BBR_EnterProbeBW(bbr); // we estimate queue is drained
}
void BBR_ExitProbeRTT(BBR* bbr)
{
if (bbr->filled_pipe)
BBR_EnterProbeBW(bbr);
else
BBR_EnterStartup(bbr);
}
uint32_t BBR_SaveCwnd(BBR* bbr)
{
//if (!InLossRecovery() && bbr->state != PROBE_RTT)
if (bbr->state != PROBE_RTT)
return bbr->cwnd;
return MAX(bbr->prior_cwnd ,bbr->cwnd);
}
void BBR_RestoreCwnd(BBR* bbr)
{
bbr->cwnd = bbr->cwnd > bbr->prior_cwnd ? bbr->cwnd : bbr->prior_cwnd;
}
void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt, prrtByteCount_t delivered, prrtSequenceNumber_t packets_in_flight)
{
bbr->rtprop_expired = PrrtClock_get_current_time_us() > (bbr->rtprop_stamp + RTpropFilterLen);
if (rtt >= 0 && (rtt <= bbr->rtprop || bbr->rtprop_expired)) {
bbr->rtprop = rtt;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
}
if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) {
bbr->state = PROBE_RTT;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 1;
BBR_SaveCwnd(bbr);
bbr->probe_rtt_done_stamp = 0;
}
if (bbr->state == PROBE_RTT) {
/* Ignore low rate samples during ProbeRTT: */
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (bbr->probe_rtt_done_stamp == 0 && packets_in_flight <= BBRMinPipeCwnd) {
bbr->probe_rtt_done_stamp = now + ProbeRTTDuration;
bbr->probe_rtt_round_done = false;
bbr->next_round_delivered = delivered;
} else if (bbr->probe_rtt_done_stamp != 0) {
if (bbr->round_start)
bbr->probe_rtt_round_done = true;
if (bbr->probe_rtt_round_done && (now > bbr->probe_rtt_done_stamp)) {
bbr->rtprop_stamp = now;
BBR_RestoreCwnd(bbr);
BBR_ExitProbeRTT(bbr);
}
}
}
bbr->idle_restart = false;
}
void BBR_UpdateModel(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking)
{
BBR_UpdateBtlBw(bbr, rs, packetTracking->bytes_delivered, packetTracking->delivered);
BBR_CheckCyclePhase(bbr, packetTracking->packets_lost, packetTracking->prior_inflight);
BBR_CheckFullPipe(bbr, rs);
BBR_CheckDrain(bbr, packetTracking->packets_lost);
BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi), packetTracking->delivered, packetTracking->packets_in_flight);
}
void BBR_UpdateTargetCwnd(BBR* bbr)
{
bbr->target_cwnd = BBR_Inflight(bbr, bbr->cwnd_gain);
}
void BBR_ModulateCwndForProbeRTT(BBR* bbr)
{
if (bbr->state == PROBE_RTT)
bbr->cwnd = MIN(bbr->cwnd, BBRMinPipeCwnd);
}
void BBR_ModulateCwndForRecovery(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t packets_in_flight, prrtSequenceNumber_t packets_delivered)
{
if (packets_lost > 0)
bbr->cwnd = MAX(bbr->cwnd - packets_lost, 1);
if (bbr->packet_conservation)
bbr->cwnd = MAX(bbr->cwnd, packets_in_flight + packets_delivered);
}
void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
{
BBR_UpdateTargetCwnd(bbr);
BBR_ModulateCwndForRecovery(bbr, packetTracking->packets_lost, packetTracking->packets_in_flight, packetTracking->packets_delivered);
if (!bbr->packet_conservation) {
if (bbr->filled_pipe)
bbr->cwnd = MIN(bbr->cwnd + packetTracking->packets_delivered, bbr->target_cwnd);
else if (bbr->cwnd < bbr->target_cwnd || packetTracking->delivered < InitialCwnd)
bbr->cwnd = bbr->cwnd + packetTracking->packets_delivered;
bbr->cwnd = MAX(bbr->cwnd, BBRMinPipeCwnd);
}
BBR_ModulateCwndForProbeRTT(bbr);
debug(DEBUG_BBR, "New cwnd: %u, State: %u", bbr->cwnd, bbr->state);
}
void BBR_SetPacingRateWithGain(BBR* bbr, float pacing_gain)
{
uint32_t rate = (uint32_t) (pacing_gain * (float)bbr->bw);
debug(DEBUG_BBR, "Current rate: %u, Pacing gain: %f, BtlBw: %u, Calc Rate: %u, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
bbr->pacing_rate = rate;
}
void BBR_SetPacingRate(BBR* bbr)
{
BBR_SetPacingRateWithGain(bbr, bbr->pacing_gain);
}
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking)
{
BBR_UpdateModel(bbr, csi, rs, packetTracking);
BBR_SetPacingRate(bbr);
BBR_SetCwnd(bbr, packetTracking);
}
BBR* BBR_Init()
{
BBR* bbr = calloc(1, sizeof(BBR));
check_mem(bbr);
bbr->btlBwFilter = WindowedFilter_Init(true);
//bbr->rtprop = SRTT ? SRTT : Inf;
bbr->rtprop = UINT32_MAX;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
bbr->probe_rtt_done_stamp = 0;
bbr->probe_rtt_round_done = false;
bbr->packet_conservation = false;
bbr->prior_cwnd = 0;
bbr->idle_restart = false;
//Init round counting
bbr->next_round_delivered = 0;
bbr->round_start = false;
bbr->round_count = 0;
//Init full pipe
bbr->filled_pipe = false;
bbr->full_bw = 0;
bbr->full_bw_count = 0;
//Init pacing rate
float nominal_bandwidth = InitialCwnd / (bbr->has_seen_rtt ? bbr->rtt : 1);
bbr->pacing_rate = (uint32_t)(bbr->pacing_gain * nominal_bandwidth);
BBR_EnterStartup(bbr);
return bbr;
error:
PERROR("Failed to init BBR%s.", "");
return NULL;
}
void BBR_destroy(BBR* bbr)
{
WindowedFilter_destroy(bbr->btlBwFilter);
free(bbr);
}
uint32_t BBR_getPacingRate(BBR* bbr)
{
return bbr->pacing_rate;
}
uint32_t BBR_getCwnd(BBR* bbr)
{
return bbr->cwnd;
}
uint32_t BBR_getBtlBw(BBR* bbr)
{
return bbr->bw;
}
uint32_t BBR_getRTProp(BBR* bbr)
{
return bbr->rtprop;
}
\ No newline at end of file
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include <stdint.h>
#include "stdbool.h"
#include "types/packet.h"
#include "clock.h"
#include "channelStateInformation.h"
#include "types/packetTracking.h"
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define BtlBwFilterLen 10
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((2885 / 1000) + 1)
#define ProbeRTTInterval 10
#define InitialCwnd 10
#define BBRGainCycleLen 8
#define BBRMinPipeCwnd 4
//#define BBRMinPipeCwnd 200
#define ProbeRTTDuration 200000 //200ms
#define Inf 0 //TODO: What is Inf
enum bbr_state {
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
typedef struct bbr {
uint32_t rtprop;
prrtTimestamp_t rtprop_stamp;
uint32_t probe_rtt_done_stamp;
bool probe_rtt_round_done;
bool packet_conservation;
uint32_t prior_cwnd;
uint32_t cwnd;
uint32_t target_cwnd;
bool idle_restart;
enum bbr_state state;
float pacing_gain;
float cwnd_gain;
bool filled_pipe;
uint64_t full_bw;
uint32_t full_bw_count;
uint32_t pacing_rate;
bool has_seen_rtt;
uint32_t rtt;
uint32_t next_round_delivered;
bool round_start;
uint32_t round_count;
uint32_t next_rtt_delivered;
uint32_t rtt_count;
bool rtprop_expired;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
uint32_t send_quantum;
uint32_t bw;
WindowedFilter* btlBwFilter;
} BBR;
BBR* BBR_Init();
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* recv_stats);
void BBR_destroy(BBR* bbr);
uint32_t BBR_getPacingRate(BBR* bbr);
uint32_t BBR_getCwnd(BBR* bbr);
uint32_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
#endif //PRRT_BBR_H
......@@ -56,16 +56,6 @@ void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformatio
pthread_mutex_lock(&csi->lock);
prrtDeliveryRate_t rate = rateSample->delivery_rate;
csi->deliveryRate = rate;
if(csi->btlbw_rounds > csi->btlbw_filter_length_rtts) {
csi->btlbw = 0;
}
csi->btlbw_rounds += 1;
if(csi->btlbw < rate) {
csi->btlbw = rate;
csi->btlbw_rounds = 0;
}
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
......
......@@ -2,6 +2,7 @@
#define PRRT_CHANNELSTATEINFORMATION_H
#include <stdbool.h>
#include "types/rateSample.h"
#include "types/packet.h"
typedef struct prrtChannelStateInformation {
......
......@@ -59,7 +59,6 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
kind = ts_redundancy_packet;
}
debug(DEBUG_FEEDBACK, "Send feedback %d %d", type, seqno);
XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);
prrtFeedback_t feedback = {
......
......@@ -93,6 +93,20 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
default:;
}
prrtTimeDifference_t diff;
do {
diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us();
if(diff > 0) {
usleep_nano(diff / 2);
}
}
while (diff > 0);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (BBR_getPacingRate(sock_ptr->receiver->bbr) != 0) {
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, BBR_getPacingRate(sock_ptr->receiver->bbr), packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr));
sock_ptr->nextSendTime = now + (packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr));
}
struct timespec timestamp;
uint64_t cyclestamp;
send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp, &cyclestamp);
......
......@@ -13,7 +13,7 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
check_mem(recv->host_name);
recv->port = port;
PrrtRateSample *rateSample = calloc(1, sizeof(PrrtRateSample));
PrrtRateSample* rateSample = calloc(1, sizeof(PrrtRateSample));
check_mem(rateSample);
recv->rateSample = rateSample;
......@@ -24,8 +24,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv->csi = PrrtChannelStateInformation_create();
recv->dataPacketStates = PrrtInFlightPacketStore_create();
recv->redundancyPacketStates = PrrtInFlightPacketStore_create();
recv->dataInflightPacketStore = PrrtInFlightPacketStore_create();
recv->redundancyInflightPacketStore = PrrtInFlightPacketStore_create();
struct addrinfo *info;
......@@ -38,6 +38,7 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
check(0 == getaddrinfo(host, portstr, &hints, &info), "getaddrinfo");
recv->ai = info;
recv->bbr = BBR_Init();
check(pthread_mutex_init(&recv->lock, NULL) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
......@@ -55,16 +56,20 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
}
bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
if(receiver->bbr) {
BBR_destroy(receiver->bbr);
}
if (receiver->csi != NULL) {
PrrtChannelStateInformation_destroy(receiver->csi);
}
if (receiver->dataPacketStates != NULL) {
PrrtInFlightPacketStore_destroy(receiver->dataPacketStates);
if (receiver->dataInflightPacketStore != NULL) {
PrrtInFlightPacketStore_destroy(receiver->dataInflightPacketStore);
}
if (receiver->redundancyPacketStates != NULL) {
PrrtInFlightPacketStore_destroy(receiver->redundancyPacketStates);
if (receiver->redundancyInflightPacketStore != NULL) {
PrrtInFlightPacketStore_destroy(receiver->redundancyInflightPacketStore);
}
if (receiver->packetTracking != NULL) {
......@@ -111,8 +116,8 @@ void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packe
void PrrtReceiver_on_application_write(PrrtReceiver* receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
PrrtPacketTracking *tracking = receiver->packetTracking;
if(PrrtInFlightPacketStore_get_queue_size(receiver->dataPacketStates) +
PrrtInFlightPacketStore_get_queue_size(receiver->redundancyPacketStates) == 0) {
if(PrrtInFlightPacketStore_get_queue_size(receiver->dataInflightPacketStore) +
PrrtInFlightPacketStore_get_queue_size(receiver->redundancyInflightPacketStore) == 0) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
}
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
......@@ -155,26 +160,34 @@ bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTrack
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime) {
PrrtInFlightPacketStore *packetStore = NULL;
PrrtInFlightPacketStore *inflightPacketStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
inflightPacketStore = recv->dataInflightPacketStore;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
inflightPacketStore = recv->redundancyInflightPacketStore;
} else return false;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
while (packet == NULL) {
pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
recv->packetTracking->packets_delivered++;
recv->packetTracking->packets_in_flight = (prrtSequenceNumber_t) (PrrtInFlightPacketStore_get_queue_size(recv->dataInflightPacketStore) + PrrtInFlightPacketStore_get_queue_size(recv->redundancyInflightPacketStore));
recv->packetTracking->prior_inflight = recv->packetTracking->packets_in_flight;
if(recv->rateSample != NULL) {
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking);
}
return result;
error:
......@@ -184,9 +197,9 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
packetStore = recv->dataInflightPacketStore;
} else if (PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
packetStore = recv->redundancyInflightPacketStore;
} else return;
//printf("Adding Packet #%u to %u\n", packet->sequenceNumber, PrrtPacket_type(packet));
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
......
......@@ -8,27 +8,8 @@
#include <netdb.h>
#include "channelStateInformation.h"
#include "stores/inFlightPacketStore.h"
#include "bbr.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // Bps
} PrrtRateSample;
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
} PrrtPacketTracking;
typedef struct prrtReceiver {
const char *host_name;
......@@ -39,8 +20,10 @@ typedef struct prrtReceiver {
pthread_cond_t recordNotFoundCv;
pthread_mutex_t lock;
PrrtInFlightPacketStore *dataPacketStates;
PrrtInFlightPacketStore *redundancyPacketStates;
BBR* bbr;
PrrtInFlightPacketStore *dataInflightPacketStore;
PrrtInFlightPacketStore *redundancyInflightPacketStore;
PrrtRateSample *rateSample;
PrrtPacketTracking *packetTracking;
......
......@@ -62,6 +62,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
s->isThreadPinning = false;
PrrtClock_init(&s->clock);
s->nextSendTime = PrrtClock_get_current_time_us();
s->isBound = false;
s->receiver = NULL;
......
......@@ -54,6 +54,8 @@ typedef struct prrtSocket {
atomic_bool closing;
prrtTimestamp_t nextSendTime;
prrtSequenceNumber_t packetsCount;
prrtSequenceNumber_t sequenceNumberSource;
prrtSequenceNumber_t sequenceNumberRepetition;
......
#include "packetTracking.h"
#ifndef PRRT_PACKETTRACKING_H
#define PRRT_PACKETTRACKING_H
#include "packet.h"
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
prrtByteCount_t bytes_delivered;
prrtSequenceNumber_t packets_lost;
prrtSequenceNumber_t prior_inflight;
prrtSequenceNumber_t packets_in_flight;
prrtSequenceNumber_t packets_delivered;
} PrrtPacketTracking;
#endif //PRRT_PACKETTRACKING_H
#include "rateSample.h"
#ifndef PRRT_RATESAMPLE_H
#define PRRT_RATESAMPLE_H
#include "packet.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // Bps
} PrrtRateSample;
#endif //PRRT_RATESAMPLE_H
add_library(UTIL ../defines.h common.c common.h list.c list.h pipe.c pipe.h dbg.h bptree.c bptree.h bitmap.c bitmap.h mpsc_queue.c mpsc_queue.h)
add_library(UTIL ../defines.h
common.c common.h
list.c list.h
pipe.c pipe.h
dbg.h
bptree.c bptree.h
bitmap.c bitmap.h
mpsc_queue.c mpsc_queue.h
windowedFilter.c windowedFilter.h)
set_property(TARGET UTIL PROPERTY C_STANDARD 99)
target_link_libraries(UTIL ${M_LIB})
......@@ -9,13 +9,14 @@
#include <string.h>
#define DEBUG_ALL 0
#define DEBUG_BBR 1
#define DEBUG_BLOCK 0
#define DEBUG_PACKET 0
#define DEBUG_RECEIVER 0
#define DEBUG_SENDER 0
#define DEBUG_SOCKET 0
#define DEBUG_DATARECEIVER 0
#define DEBUG_DATATRANSMITTER 0
#define DEBUG_DATATRANSMITTER 1
#define DEBUG_HARDSTAMPING 0
#define DEBUG_FEEDBACK 1
......
#include "windowedFilter.h"
#include "dbg.h"
#include "common.h"
WindowedFilter* WindowedFilter_Init(bool isMaxFilter)
{
WindowedFilter* filter = calloc(1, sizeof(WindowedFilter));
check_mem(filter);
filter->isMaxFilter = isMaxFilter;
filter->last = 9;
return filter;
error:
PERROR("Creating WindowedFilter failed%s.", "");
return NULL;
}
void WindowedFilter_destroy(WindowedFilter* filter)
{
free(filter);
}
uint8_t WindowedFilter_findNewExtremum(WindowedFilter* filter)
{
uint32_t extremum;
uint8_t pos_extremum = 0;
if (filter->isMaxFilter) {
extremum = 0;
for (uint8_t i = 0; i < DefaultFilterLen; i++) {
if (extremum < filter->list[i]) {
extremum = filter->list[i];
pos_extremum = i;
}
}
} else {
extremum = UINT32_MAX;
for (uint8_t i = 0; i < DefaultFilterLen; i++) {
if (extremum > filter->list[i]) {
extremum = filter->list[i];