Commit ce9f2b74 authored by Andreas Schmidt's avatar Andreas Schmidt

Improve BBR.

* Fix float vs. uint for pacing_rate.
* Add loss detection.
* Remove RTprop filter from CSI.
parent 253f8338
Pipeline #2118 failed with stages
in 22 seconds
......@@ -5,11 +5,11 @@
#include <math.h>
prrtByteCount_t BBR_Inflight(BBR* bbr, float gain)
prrtByteCount_t BBR_Inflight(BBR* bbr, double gain)
{
if (bbr->rtprop == Inf)
return InitialCwnd; /* no valid RTT samples yet */
uint32_t quanta = 3 * bbr->send_quantum;
uint32_t quanta = 0;
uint32_t estimated_bdp = (uint32_t) round(((double)bbr->bw * bbr->rtprop) / (1000 * 1000));
return (uint32_t)(gain * estimated_bdp + quanta);
}
......@@ -106,15 +106,14 @@ void BBR_ExitProbeRTT(BBR* bbr)
uint32_t BBR_SaveCwnd(BBR* bbr)
{
//if (!InLossRecovery() && bbr->state != PROBE_RTT)
if (bbr->state != PROBE_RTT)
if (!bbr->is_loss_recovery && bbr->state != PROBE_RTT)
return bbr->cwnd;
return MAX(bbr->prior_cwnd ,bbr->cwnd);
}
void BBR_RestoreCwnd(BBR* bbr)
{
bbr->cwnd = bbr->cwnd > bbr->prior_cwnd ? bbr->cwnd : bbr->prior_cwnd;
bbr->cwnd = MAX(bbr->cwnd, bbr->prior_cwnd);
}
void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt)
......@@ -167,13 +166,13 @@ void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
}
void BBR_UpdateModelAndState(BBR *bbr, PrrtChannelStateInformation *csi, PrrtRateSample *rs,
PrrtPacketTracking *packetTracking)
PrrtPacketTracking *packetTracking, prrtTimedelta_t rtt)
{
BBR_UpdateBtlBw(bbr, rs, packetTracking);
BBR_CheckCyclePhase(bbr, packetTracking->bytes_lost, packetTracking->prior_inflight);
BBR_CheckFullPipe(bbr, rs);
BBR_CheckDrain(bbr, packetTracking->pipe);
BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi));
BBR_UpdateRTprop(bbr, rtt);
BBR_CheckProbeRTT(bbr, packetTracking);
}
......@@ -190,7 +189,6 @@ void BBR_ModulateCwndForProbeRTT(BBR* bbr)
void BBR_ModulateCwndForRecovery(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t pipe, prrtByteCount_t delivered)
{
// TODO: [cwnd] = Bytes not packets
if (bytes_lost > 0)
bbr->cwnd = MAX(bbr->cwnd - bytes_lost, SMSS);
if (bbr->packet_conservation)
......@@ -213,9 +211,9 @@ void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
debug(DEBUG_BBR, "New cwnd: %u, State: %u", bbr->cwnd, bbr->state);
}
void BBR_SetPacingRateWithGain(BBR* bbr, float pacing_gain)
void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain)
{
uint32_t rate = (uint32_t) (pacing_gain * ((float)bbr->bw));
double rate = (pacing_gain * ((double)bbr->bw));
debug(DEBUG_BBR, "Current rate: %u, Pacing gain: %f, BtlBw: %u, Calc Rate: %u, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
......@@ -233,23 +231,45 @@ void BBR_SetSendQuantum(BBR* bbr) {
} else if (bbr->pacing_rate < 3000000) { // 24 Mbps = 20 * 1.2Mbps = 3000000
bbr->send_quantum = 2 * SMSS;
} else {
bbr->send_quantum = MIN((prrtByteCount_t) round(((double)bbr->pacing_rate) / 1000), 64000);
bbr->send_quantum = MIN((prrtByteCount_t) round((double) bbr->pacing_rate / 1000), 64000);
}
}
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking)
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt)
{
BBR_UpdateModelAndState(bbr, csi, rs, packetTracking);
BBR_UpdateModelAndState(bbr, csi, rs, packetTracking, rtt);
BBR_SetPacingRate(bbr);
BBR_SetSendQuantum(bbr);
BBR_SetCwnd(bbr, packetTracking);
}
void BBR_OnLoss(BBR *bbr, PrrtPacketTracking *tracking) {
if(!bbr->is_loss_recovery) {
bbr->is_loss_recovery = true;
bbr->loss_recovery_stamp = PrrtClock_get_current_time_us();
bbr->prior_cwnd = BBR_SaveCwnd(bbr);
bbr->cwnd = SMSS;
//bbr->cwnd = tracking->pipe + MAX(tracking->delivered, 1);
//bbr->packet_conservation = true;
} else if (PrrtClock_get_current_time_us() > bbr->loss_recovery_stamp + bbr->rtprop){
bbr->packet_conservation = false;
}
}
void BBR_OnLossExit(BBR *bbr) {
if(bbr->is_loss_recovery) {
bbr->is_loss_recovery = false;
bbr->packet_conservation = false;
BBR_RestoreCwnd(bbr);
}
}
BBR* BBR_Init(void)
{
BBR* bbr = calloc(1, sizeof(BBR));
check_mem(bbr);
bbr->has_seen_rtt = false;
bbr->btlBwFilter = WindowedFilter_Init(true);
bbr->rtprop = Inf;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
......@@ -258,6 +278,7 @@ BBR* BBR_Init(void)
bbr->packet_conservation = false;
bbr->prior_cwnd = 0;
bbr->idle_restart = false;
bbr->is_loss_recovery = false;
//Init round counting
bbr->next_round_delivered = 0;
......@@ -270,8 +291,10 @@ BBR* BBR_Init(void)
bbr->full_bw_count = 0;
//Init pacing rate
float nominal_bandwidth = InitialCwnd / (bbr->has_seen_rtt ? bbr->rtt : 1);
bbr->pacing_rate = (uint32_t)(bbr->pacing_gain * nominal_bandwidth);
double nominal_bandwidth = InitialCwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000);
bbr->pacing_rate = bbr->pacing_gain * nominal_bandwidth;
BBR_EnterStartup(bbr);
return bbr;
......@@ -286,7 +309,7 @@ void BBR_destroy(BBR* bbr)
free(bbr);
}
uint32_t BBR_getPacingRate(BBR* bbr)
double BBR_getPacingRate(BBR* bbr)
{
return bbr->pacing_rate;
}
......@@ -314,7 +337,7 @@ uint32_t BBR_getCycleIndex(BBR* bbr) {
return bbr->cycle_index;
}
float BBR_getPacingGain(BBR* bbr) {
double BBR_getPacingGain(BBR* bbr) {
return bbr->pacing_gain;
}
......
......@@ -10,12 +10,12 @@
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define SMSS 1200
#define SMSS 1500
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((2885 / 1000) + 1)
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define InitialCwnd (10 * SMSS)
#define BBRGainCycleLen 8
#define BBRMinPipeCwnd (2 * SMSS)
#define BBRMinPipeCwnd (4 * SMSS)
#define ProbeRTTDuration 200000 //200ms
#define Inf UINT32_MAX
......@@ -39,16 +39,15 @@ typedef struct bbr {
enum bbr_state state;
float pacing_gain;
double pacing_gain;
float cwnd_gain;
bool filled_pipe;
uint64_t full_bw;
uint32_t full_bw_count;
uint32_t pacing_rate;
double pacing_rate;
bool has_seen_rtt;
uint32_t rtt;
uint32_t next_round_delivered;
bool round_start;
......@@ -58,6 +57,9 @@ typedef struct bbr {
uint32_t rtt_count;
bool rtprop_expired;
bool is_loss_recovery;
prrtTimestamp_t loss_recovery_stamp;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
......@@ -70,15 +72,17 @@ typedef struct bbr {
} BBR;
BBR* BBR_Init(void);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* recv_stats);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);
uint32_t BBR_getPacingRate(BBR* bbr);
double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
uint64_t BBR_getFullBw(BBR* bbr);
float BBR_getPacingGain(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
......
......@@ -16,8 +16,6 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&csi->lock, &attr) == 0, "Mutex init failed.");
csi->rtprop = TIMESTAMP_SPACE;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
csi->appLimited = 0;
......@@ -29,18 +27,6 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
return NULL;
}
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
{
pthread_mutex_lock(&csi->lock);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
csi->rtprop = rtprop;
csi->rtprop_stamp = now;
}
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets) {
pthread_mutex_lock(&csi->lock);
......@@ -49,10 +35,9 @@ void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, pr
}
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, PrrtRateSample *rateSample) {
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtRateSample *rateSample) {
pthread_mutex_lock(&csi->lock);
prrtDeliveryRate_t rate = rateSample->delivery_rate;
csi->deliveryRate = rate;
csi->deliveryRate = rateSample->delivery_rate;
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
......@@ -61,14 +46,6 @@ void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation
pthread_mutex_unlock(&csi->lock);
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{
pthread_mutex_lock(&csi->lock);
prrtTimedelta_t res = csi->rtprop;
pthread_mutex_unlock(&csi->lock);
return res;
}
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi)
{
check(pthread_mutex_destroy(&csi->lock) == EXIT_SUCCESS, "Destroy mutex failed.");
......
......@@ -8,11 +8,6 @@
typedef struct prrtChannelStateInformation {
pthread_mutex_t lock;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired;
prrtPacketLossRate_t plr;
prrtDeliveryRate_t deliveryRate;
......@@ -21,15 +16,13 @@ typedef struct prrtChannelStateInformation {
} PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, PrrtRateSample *rate);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtRateSample *rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
......
......@@ -108,11 +108,11 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) {
uint32_t pacing_rate = BBR_getPacingRate(sock_ptr->receiver->bbr);
double pacing_rate = BBR_getPacingRate(sock_ptr->receiver->bbr);
if(pacing_rate != 0) {
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, pacing_rate, packet->payloadLength /
pacing_rate);
sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round((((double) 1000 * 1000 * packet->payloadLength) / pacing_rate)));
sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * ((double)packet->payloadLength)) / pacing_rate)));
}
}
......
......@@ -39,17 +39,9 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
PrrtReceiver_on_ack(prrtSocket->receiver, feedbackPayload, receiveTime, rtt);
error:
if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
......
......@@ -40,7 +40,12 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv->ai = info;
recv->bbr = BBR_Init();
check(pthread_mutex_init(&recv->lock, NULL) == 0, "lock init failed.");
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
check(pthread_cond_init(&recv->recordNotFoundCv, NULL) == 0, "recordNotFound init failed.");
......@@ -126,6 +131,24 @@ void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_que
PERROR("Mutex error.%s", "");
}
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
PrrtChannelStateInformation_update_plr(receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime, rtt);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(receiver->csi, receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(receiver->csi, receiver->rateSample->is_app_limited);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return;
error:
PERROR("Mutex error.%s", "");
}
bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
/* Clear app-limited field */
......@@ -158,7 +181,7 @@ bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTrack
}
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime) {
prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
PrrtInFlightPacketStore *inflightPacketStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
inflightPacketStore = recv->dataInflightPacketStore;
......@@ -177,13 +200,22 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
recv->packetTracking->pipe -= lostBytes;
recv->packetTracking->bytes_lost = lostBytes;
// TODO: Experimental fast recovery.
if (lostBytes > 0) {
BBR_OnLoss(recv->bbr, recv->packetTracking);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->prior_inflight = recv->packetTracking->pipe;
if(recv->rateSample != NULL) {
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking);
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
}
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
......
......@@ -32,11 +32,12 @@ typedef struct prrtReceiver {
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port);
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime);
prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);
void PrrtReceiver_interrupt(PrrtReceiver *recv);
......
......@@ -40,7 +40,7 @@ prrtByteCount_t PrrtInFlightPacketStore_remove_outstanding_packet(PrrtInFlightPa
// TODO: Cleanup and loss marking. Make this more approriate with a "timeout".
List *packetStateList = List_create();
if(seqNum > SEQNO_SPACE/2) {
if(seqNum >= SEQNO_SPACE/2) {
BPTree_get_range(packetStore->outstandingPackets, packetStateList, seqNum - (SEQNO_SPACE / 2), seqNum - 1);
} else {
BPTree_get_range(packetStore->outstandingPackets, packetStateList, 0, seqNum - 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment