Loading prrt/cprrt.pxd +3 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,9 @@ cdef extern from "proto/socket.h": uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) bint PrrtSocket_get_filled_pipe(PrrtSocket *s) uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) float PrrtSocket_get_pacing_gain(PrrtSocket *s) bint PrrtSocket_get_app_limited(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) Loading prrt/proto/bbr.c +55 −34 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ #include "receiver.h" uint32_t BBR_Inflight(BBR* bbr, float gain) prrtByteCount_t BBR_Inflight(BBR* bbr, float gain) { if (bbr->rtprop == Inf) return InitialCwnd; /* no valid RTT samples yet */ Loading @@ -20,10 +20,10 @@ void BBR_EnterStartup(BBR* bbr) bbr->cwnd_gain = BBRHighGain; } void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, prrtByteCount_t packet_delivered, prrtByteCount_t delivered) void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking) { if (packet_delivered >= bbr->next_round_delivered) { bbr->next_round_delivered = delivered; if (tracking->bytes_delivered >= bbr->next_round_delivered) { bbr->next_round_delivered = tracking->delivered; bbr->round_count++; bbr->round_start = true; } else { Loading @@ -49,7 +49,7 @@ void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs) bbr->filled_pipe = true; } bool BBR_IsNextCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight) bool BBR_IsNextCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtByteCount_t prior_inflight) { bool is_full_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > bbr->rtprop; if (bbr->pacing_gain == 1) Loading @@ -68,7 +68,7 @@ void BBR_AdvanceCyclePhase(BBR* bbr) debug(DEBUG_BBR, "Advanced cycle with gain: %f", bbr->pacing_gain); } void BBR_CheckCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight) { void BBR_CheckCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtByteCount_t prior_inflight) { if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, packets_lost, prior_inflight)) BBR_AdvanceCyclePhase(bbr); } Loading @@ -82,7 +82,7 @@ void BBR_EnterProbeBW(BBR* bbr) BBR_AdvanceCyclePhase(bbr); } void BBR_CheckDrain(BBR* bbr, prrtSequenceNumber_t packets_in_flight) void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight) { if (bbr->state == STARTUP && bbr->filled_pipe) { //Drain Loading @@ -90,7 +90,7 @@ void BBR_CheckDrain(BBR* bbr, prrtSequenceNumber_t packets_in_flight) bbr->pacing_gain = 1 / BBRHighGain; // pace slowly bbr->cwnd_gain = BBRHighGain; // maintain cwnd } if (bbr->state == DRAIN && packets_in_flight <= BBR_Inflight(bbr, 1.0)) if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0)) BBR_EnterProbeBW(bbr); // we estimate queue is drained } Loading @@ -115,29 +115,29 @@ void BBR_RestoreCwnd(BBR* bbr) bbr->cwnd = bbr->cwnd > bbr->prior_cwnd ? bbr->cwnd : bbr->prior_cwnd; } void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt, prrtByteCount_t delivered, prrtSequenceNumber_t packets_in_flight) void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt) { bbr->rtprop_expired = PrrtClock_get_current_time_us() > (bbr->rtprop_stamp + RTpropFilterLen); if (rtt >= 0 && (rtt <= bbr->rtprop || bbr->rtprop_expired)) { bbr->rtprop = rtt; bbr->rtprop_stamp = PrrtClock_get_current_time_us(); } } if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) { void BBR_EnterProbeRTT(BBR *bbr) { bbr->state = PROBE_RTT; bbr->pacing_gain = 1; bbr->cwnd_gain = 1; BBR_SaveCwnd(bbr); bbr->probe_rtt_done_stamp = 0; } if (bbr->state == PROBE_RTT) { void BBR_HandleProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) { tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1; /* Ignore low rate samples during ProbeRTT: */ prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (bbr->probe_rtt_done_stamp == 0 && packets_in_flight <= BBRMinPipeCwnd) { if (bbr->probe_rtt_done_stamp == 0 && tracking->pipe <= BBRMinPipeCwnd) { bbr->probe_rtt_done_stamp = now + ProbeRTTDuration; bbr->probe_rtt_round_done = false; bbr->next_round_delivered = delivered; bbr->next_round_delivered = tracking->delivered; } else if (bbr->probe_rtt_done_stamp != 0) { if (bbr->round_start) bbr->probe_rtt_round_done = true; Loading @@ -148,16 +148,30 @@ void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt, prrtByteCount_t delivered, } } } void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) { if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) { BBR_EnterProbeRTT(bbr); BBR_SaveCwnd(bbr); bbr->probe_rtt_done_stamp = 0; } if (bbr->state == PROBE_RTT) { BBR_HandleProbeRTT(bbr, tracking); } bbr->idle_restart = false; } void BBR_UpdateModel(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking) void BBR_UpdateModelAndState(BBR *bbr, PrrtChannelStateInformation *csi, PrrtRateSample *rs, PrrtPacketTracking *packetTracking) { BBR_UpdateBtlBw(bbr, rs, packetTracking->bytes_delivered, packetTracking->delivered); BBR_UpdateBtlBw(bbr, rs, packetTracking); BBR_CheckCyclePhase(bbr, packetTracking->packets_lost, packetTracking->prior_inflight); BBR_CheckFullPipe(bbr, rs); BBR_CheckDrain(bbr, packetTracking->packets_lost); BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi), packetTracking->delivered, packetTracking->packets_in_flight); BBR_CheckDrain(bbr, packetTracking->pipe); BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi)); BBR_CheckProbeRTT(bbr, packetTracking); } void BBR_UpdateTargetCwnd(BBR* bbr) Loading Loading @@ -211,7 +225,7 @@ void BBR_SetPacingRate(BBR* bbr) void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking) { BBR_UpdateModel(bbr, csi, rs, packetTracking); BBR_UpdateModelAndState(bbr, csi, rs, packetTracking); BBR_SetPacingRate(bbr); Loading @@ -223,8 +237,7 @@ BBR* BBR_Init(void) BBR* bbr = calloc(1, sizeof(BBR)); check_mem(bbr); bbr->btlBwFilter = WindowedFilter_Init(true); //bbr->rtprop = SRTT ? SRTT : Inf; bbr->rtprop = UINT32_MAX; bbr->rtprop = Inf; bbr->rtprop_stamp = PrrtClock_get_current_time_us(); bbr->probe_rtt_done_stamp = 0; bbr->probe_rtt_round_done = false; Loading Loading @@ -283,6 +296,14 @@ uint32_t BBR_getState(BBR* bbr) { return bbr->state; } uint32_t BBR_getCycleIndex(BBR* bbr) { return bbr->cycle_index; } float BBR_getPacingGain(BBR* bbr) { return bbr->pacing_gain; } bool BBR_getFilledPipe(BBR* bbr) { return bbr->filled_pipe; } Loading prrt/proto/bbr.h +6 −6 Original line number Diff line number Diff line Loading @@ -10,16 +10,14 @@ #include "types/rateSample.h" #include "../util/windowedFilter.h" #define BtlBwFilterLen 10 #define MTU 1500 #define RTpropFilterLen 10000000 //10s #define BBRHighGain ((2885 / 1000) + 1) #define ProbeRTTInterval 10 #define InitialCwnd 10 #define InitialCwnd (10 * MTU) #define BBRGainCycleLen 8 #define BBRMinPipeCwnd 4 //#define BBRMinPipeCwnd 200 #define BBRMinPipeCwnd (4 * MTU) #define ProbeRTTDuration 200000 //200ms #define Inf 0 //TODO: What is Inf #define Inf UINT32_MAX enum bbr_state { STARTUP, Loading Loading @@ -80,6 +78,8 @@ uint32_t BBR_getCwnd(BBR* bbr); prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr); uint32_t BBR_getState(BBR* bbr); uint64_t BBR_getFullBw(BBR* bbr); float BBR_getPacingGain(BBR* bbr); uint32_t BBR_getCycleIndex(BBR* bbr); bool BBR_getFilledPipe(BBR* bbr); uint32_t BBR_getRTProp(BBR* bbr); Loading prrt/proto/receiver.c +1 −1 Original line number Diff line number Diff line Loading @@ -185,7 +185,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu recv->packetTracking->packets_delivered++; recv->packetTracking->packets_in_flight = (prrtSequenceNumber_t) (PrrtInFlightPacketStore_get_queue_size(recv->dataInflightPacketStore) + PrrtInFlightPacketStore_get_queue_size(recv->redundancyInflightPacketStore)); recv->packetTracking->prior_inflight = recv->packetTracking->packets_in_flight; recv->packetTracking->prior_inflight = recv->packetTracking->pipe; if(recv->rateSample != NULL && result) { BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking); } Loading prrt/proto/socket.c +7 −0 Original line number Diff line number Diff line Loading @@ -614,6 +614,13 @@ uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) { return BBR_getFullBw(s->receiver->bbr); } float PrrtSocket_get_pacing_gain(PrrtSocket *s) { return BBR_getPacingGain(s->receiver->bbr); } uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) { return BBR_getCycleIndex(s->receiver->bbr); } bool PrrtSocket_get_app_limited(PrrtSocket *s) { return PrrtChannelStateInformation_get_app_limited(s->receiver->csi); Loading Loading
prrt/cprrt.pxd +3 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,9 @@ cdef extern from "proto/socket.h": uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) bint PrrtSocket_get_filled_pipe(PrrtSocket *s) uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) float PrrtSocket_get_pacing_gain(PrrtSocket *s) bint PrrtSocket_get_app_limited(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) Loading
prrt/proto/bbr.c +55 −34 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ #include "receiver.h" uint32_t BBR_Inflight(BBR* bbr, float gain) prrtByteCount_t BBR_Inflight(BBR* bbr, float gain) { if (bbr->rtprop == Inf) return InitialCwnd; /* no valid RTT samples yet */ Loading @@ -20,10 +20,10 @@ void BBR_EnterStartup(BBR* bbr) bbr->cwnd_gain = BBRHighGain; } void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, prrtByteCount_t packet_delivered, prrtByteCount_t delivered) void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking) { if (packet_delivered >= bbr->next_round_delivered) { bbr->next_round_delivered = delivered; if (tracking->bytes_delivered >= bbr->next_round_delivered) { bbr->next_round_delivered = tracking->delivered; bbr->round_count++; bbr->round_start = true; } else { Loading @@ -49,7 +49,7 @@ void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs) bbr->filled_pipe = true; } bool BBR_IsNextCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight) bool BBR_IsNextCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtByteCount_t prior_inflight) { bool is_full_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > bbr->rtprop; if (bbr->pacing_gain == 1) Loading @@ -68,7 +68,7 @@ void BBR_AdvanceCyclePhase(BBR* bbr) debug(DEBUG_BBR, "Advanced cycle with gain: %f", bbr->pacing_gain); } void BBR_CheckCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtSequenceNumber_t prior_inflight) { void BBR_CheckCyclePhase(BBR* bbr, prrtSequenceNumber_t packets_lost, prrtByteCount_t prior_inflight) { if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, packets_lost, prior_inflight)) BBR_AdvanceCyclePhase(bbr); } Loading @@ -82,7 +82,7 @@ void BBR_EnterProbeBW(BBR* bbr) BBR_AdvanceCyclePhase(bbr); } void BBR_CheckDrain(BBR* bbr, prrtSequenceNumber_t packets_in_flight) void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight) { if (bbr->state == STARTUP && bbr->filled_pipe) { //Drain Loading @@ -90,7 +90,7 @@ void BBR_CheckDrain(BBR* bbr, prrtSequenceNumber_t packets_in_flight) bbr->pacing_gain = 1 / BBRHighGain; // pace slowly bbr->cwnd_gain = BBRHighGain; // maintain cwnd } if (bbr->state == DRAIN && packets_in_flight <= BBR_Inflight(bbr, 1.0)) if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0)) BBR_EnterProbeBW(bbr); // we estimate queue is drained } Loading @@ -115,29 +115,29 @@ void BBR_RestoreCwnd(BBR* bbr) bbr->cwnd = bbr->cwnd > bbr->prior_cwnd ? bbr->cwnd : bbr->prior_cwnd; } void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt, prrtByteCount_t delivered, prrtSequenceNumber_t packets_in_flight) void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt) { bbr->rtprop_expired = PrrtClock_get_current_time_us() > (bbr->rtprop_stamp + RTpropFilterLen); if (rtt >= 0 && (rtt <= bbr->rtprop || bbr->rtprop_expired)) { bbr->rtprop = rtt; bbr->rtprop_stamp = PrrtClock_get_current_time_us(); } } if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) { void BBR_EnterProbeRTT(BBR *bbr) { bbr->state = PROBE_RTT; bbr->pacing_gain = 1; bbr->cwnd_gain = 1; BBR_SaveCwnd(bbr); bbr->probe_rtt_done_stamp = 0; } if (bbr->state == PROBE_RTT) { void BBR_HandleProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) { tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1; /* Ignore low rate samples during ProbeRTT: */ prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (bbr->probe_rtt_done_stamp == 0 && packets_in_flight <= BBRMinPipeCwnd) { if (bbr->probe_rtt_done_stamp == 0 && tracking->pipe <= BBRMinPipeCwnd) { bbr->probe_rtt_done_stamp = now + ProbeRTTDuration; bbr->probe_rtt_round_done = false; bbr->next_round_delivered = delivered; bbr->next_round_delivered = tracking->delivered; } else if (bbr->probe_rtt_done_stamp != 0) { if (bbr->round_start) bbr->probe_rtt_round_done = true; Loading @@ -148,16 +148,30 @@ void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt, prrtByteCount_t delivered, } } } void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) { if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) { BBR_EnterProbeRTT(bbr); BBR_SaveCwnd(bbr); bbr->probe_rtt_done_stamp = 0; } if (bbr->state == PROBE_RTT) { BBR_HandleProbeRTT(bbr, tracking); } bbr->idle_restart = false; } void BBR_UpdateModel(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking) void BBR_UpdateModelAndState(BBR *bbr, PrrtChannelStateInformation *csi, PrrtRateSample *rs, PrrtPacketTracking *packetTracking) { BBR_UpdateBtlBw(bbr, rs, packetTracking->bytes_delivered, packetTracking->delivered); BBR_UpdateBtlBw(bbr, rs, packetTracking); BBR_CheckCyclePhase(bbr, packetTracking->packets_lost, packetTracking->prior_inflight); BBR_CheckFullPipe(bbr, rs); BBR_CheckDrain(bbr, packetTracking->packets_lost); BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi), packetTracking->delivered, packetTracking->packets_in_flight); BBR_CheckDrain(bbr, packetTracking->pipe); BBR_UpdateRTprop(bbr, PrrtChannelStateInformation_get_rtprop(csi)); BBR_CheckProbeRTT(bbr, packetTracking); } void BBR_UpdateTargetCwnd(BBR* bbr) Loading Loading @@ -211,7 +225,7 @@ void BBR_SetPacingRate(BBR* bbr) void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking) { BBR_UpdateModel(bbr, csi, rs, packetTracking); BBR_UpdateModelAndState(bbr, csi, rs, packetTracking); BBR_SetPacingRate(bbr); Loading @@ -223,8 +237,7 @@ BBR* BBR_Init(void) BBR* bbr = calloc(1, sizeof(BBR)); check_mem(bbr); bbr->btlBwFilter = WindowedFilter_Init(true); //bbr->rtprop = SRTT ? SRTT : Inf; bbr->rtprop = UINT32_MAX; bbr->rtprop = Inf; bbr->rtprop_stamp = PrrtClock_get_current_time_us(); bbr->probe_rtt_done_stamp = 0; bbr->probe_rtt_round_done = false; Loading Loading @@ -283,6 +296,14 @@ uint32_t BBR_getState(BBR* bbr) { return bbr->state; } uint32_t BBR_getCycleIndex(BBR* bbr) { return bbr->cycle_index; } float BBR_getPacingGain(BBR* bbr) { return bbr->pacing_gain; } bool BBR_getFilledPipe(BBR* bbr) { return bbr->filled_pipe; } Loading
prrt/proto/bbr.h +6 −6 Original line number Diff line number Diff line Loading @@ -10,16 +10,14 @@ #include "types/rateSample.h" #include "../util/windowedFilter.h" #define BtlBwFilterLen 10 #define MTU 1500 #define RTpropFilterLen 10000000 //10s #define BBRHighGain ((2885 / 1000) + 1) #define ProbeRTTInterval 10 #define InitialCwnd 10 #define InitialCwnd (10 * MTU) #define BBRGainCycleLen 8 #define BBRMinPipeCwnd 4 //#define BBRMinPipeCwnd 200 #define BBRMinPipeCwnd (4 * MTU) #define ProbeRTTDuration 200000 //200ms #define Inf 0 //TODO: What is Inf #define Inf UINT32_MAX enum bbr_state { STARTUP, Loading Loading @@ -80,6 +78,8 @@ uint32_t BBR_getCwnd(BBR* bbr); prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr); uint32_t BBR_getState(BBR* bbr); uint64_t BBR_getFullBw(BBR* bbr); float BBR_getPacingGain(BBR* bbr); uint32_t BBR_getCycleIndex(BBR* bbr); bool BBR_getFilledPipe(BBR* bbr); uint32_t BBR_getRTProp(BBR* bbr); Loading
prrt/proto/receiver.c +1 −1 Original line number Diff line number Diff line Loading @@ -185,7 +185,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu recv->packetTracking->packets_delivered++; recv->packetTracking->packets_in_flight = (prrtSequenceNumber_t) (PrrtInFlightPacketStore_get_queue_size(recv->dataInflightPacketStore) + PrrtInFlightPacketStore_get_queue_size(recv->redundancyInflightPacketStore)); recv->packetTracking->prior_inflight = recv->packetTracking->packets_in_flight; recv->packetTracking->prior_inflight = recv->packetTracking->pipe; if(recv->rateSample != NULL && result) { BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking); } Loading
prrt/proto/socket.c +7 −0 Original line number Diff line number Diff line Loading @@ -614,6 +614,13 @@ uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) { return BBR_getFullBw(s->receiver->bbr); } float PrrtSocket_get_pacing_gain(PrrtSocket *s) { return BBR_getPacingGain(s->receiver->bbr); } uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) { return BBR_getCycleIndex(s->receiver->bbr); } bool PrrtSocket_get_app_limited(PrrtSocket *s) { return PrrtChannelStateInformation_get_app_limited(s->receiver->csi); Loading