Commit 00c47278 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Fix BBR.

* Add locking.
* RTO prototype.
* Proper pacing.
parent 6db56833
Loading
Loading
Loading
Loading
Loading
+129 −18
Original line number Diff line number Diff line
@@ -237,38 +237,72 @@ void BBR_SetSendQuantum(BBR* bbr) {

void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt)
{
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    BBR_UpdateModelAndState(bbr, csi, rs, packetTracking, rtt);

    BBR_SetPacingRate(bbr);
    BBR_SetSendQuantum(bbr);
    BBR_SetCwnd(bbr, packetTracking);
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return;
    error:
    PERROR("BBR_OnACK failed.")
}

void BBR_OnLoss(BBR *bbr, PrrtPacketTracking *tracking) {
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking) {
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    if(!bbr->is_loss_recovery) {
        bbr->is_loss_recovery = true;
        bbr->loss_recovery_stamp = PrrtClock_get_current_time_us();
        bbr->prior_cwnd = BBR_SaveCwnd(bbr);
        bbr->cwnd = SMSS;
        //bbr->cwnd = tracking->pipe + MAX(tracking->delivered, 1);
        //bbr->packet_conservation = true;
        bbr->cwnd = tracking->pipe + MAX(tracking->delivered, 1);
        bbr->packet_conservation = true;
    } else if (PrrtClock_get_current_time_us() > bbr->loss_recovery_stamp + bbr->rtprop){
        bbr->packet_conservation = false;
    }
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return;
    error:
    PERROR("BBR_OnACK failed.")
}

void BBR_OnRTOLoss(BBR *bbr) {
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    if(!bbr->is_loss_recovery) {
        bbr->is_loss_recovery = true;
        bbr->prior_cwnd = BBR_SaveCwnd(bbr);
        bbr->cwnd = SMSS;
    }
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return;
    error:
    PERROR("BBR_OnACK failed.")
}

void BBR_OnLossExit(BBR *bbr) {
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    if(bbr->is_loss_recovery) {
        bbr->is_loss_recovery = false;
        bbr->packet_conservation = false;
        BBR_RestoreCwnd(bbr);
    }
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return;
    error:
    PERROR("BBR_OnACK failed.")
}

BBR* BBR_Init(void)
{
    BBR* bbr = calloc(1, sizeof(BBR));
    check_mem(bbr);

    pthread_mutexattr_t attr;
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
    check(pthread_mutex_init(&bbr->lock, &attr) == 0, "lock init failed.");


    bbr->has_seen_rtt = false;
    bbr->btlBwFilter = WindowedFilter_Init(true);
    bbr->rtprop = Inf;
@@ -277,6 +311,7 @@ BBR* BBR_Init(void)
    bbr->probe_rtt_round_done = false;
    bbr->packet_conservation = false;
    bbr->prior_cwnd = 0;
    bbr->cwnd = InitialCwnd;
    bbr->idle_restart = false;
    bbr->is_loss_recovery = false;

@@ -294,8 +329,8 @@ BBR* BBR_Init(void)
    double nominal_bandwidth = InitialCwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000);
    bbr->pacing_rate = bbr->pacing_gain * nominal_bandwidth;


    BBR_EnterStartup(bbr);

    return bbr;

    error:
@@ -306,57 +341,133 @@ BBR* BBR_Init(void)
void BBR_destroy(BBR* bbr)
{
    WindowedFilter_destroy(bbr->btlBwFilter);
    check(pthread_mutex_destroy(&bbr->lock) == 0, "lock destroy failed.");
    free(bbr);
    return;
    error:
    PERROR("BBR_destroy failed%s.", "");
}

double BBR_getPacingRate(BBR* bbr)
{
    return bbr->pacing_rate;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    double res = bbr->pacing_rate;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getPacingRate failed.")
    return 0;
}

prrtByteCount_t BBR_getCwnd(BBR* bbr)
{
    return bbr->cwnd;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    prrtByteCount_t res = bbr->cwnd;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getCwnd failed.")
    return 0;
}

prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr)
{
    return bbr->bw;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    prrtDeliveryRate_t res = bbr->bw;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getBtlBw failed.")
    return 0;
}

uint32_t BBR_getRTProp(BBR* bbr)
{
    return bbr->rtprop;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    uint32_t res = bbr->rtprop;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getRTProp failed.")
    return 0;
}

uint32_t BBR_getState(BBR* bbr) {
    return bbr->state;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    uint32_t res = bbr->state;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getState failed.")
    return 0;
}

uint32_t BBR_getCycleIndex(BBR* bbr) {
    return bbr->cycle_index;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    uint32_t res = bbr->cycle_index;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getCycleIndex failed.")
    return 0;
}

double BBR_getPacingGain(BBR* bbr) {
    return bbr->pacing_gain;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    double res = bbr->pacing_gain;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getPacingGain failed.")
    return 0;
}

bool BBR_getFilledPipe(BBR* bbr) {
    return bbr->filled_pipe;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    bool res = bbr->filled_pipe;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getFilledPipe failed.")
    return 0;
}

bool BBR_getRoundStart(BBR* bbr) {
    return bbr->round_start;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    bool res = bbr->round_start;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getRoundStart failed.")
    return 0;
}

uint64_t BBR_getFullBw(BBR* bbr) {
    return bbr->full_bw;
prrtByteCount_t BBR_getFullBw(BBR* bbr) {
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    prrtByteCount_t res = bbr->full_bw;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getInflight failed.")
    return 0;
}

prrtByteCount_t BBR_getInflight(BBR* bbr) {
    return BBR_Inflight(bbr, bbr->pacing_gain);
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    prrtByteCount_t res = BBR_Inflight(bbr, bbr->pacing_gain);
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getInflight failed.")
    return 0;
}

prrtByteCount_t BBR_getSendQuantum(BBR* bbr) {
    return bbr->send_quantum;
    check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
    prrtByteCount_t res = bbr->send_quantum;
    check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
    return res;
    error:
    PERROR("BBR_getSendQuantum failed.")
    return 0;
}
 No newline at end of file
+6 −3
Original line number Diff line number Diff line
@@ -27,6 +27,8 @@ enum bbr_state {
};

typedef struct bbr {
    pthread_mutex_t lock;

    prrtTimedelta_t rtprop;
    prrtTimestamp_t rtprop_stamp;
    prrtTimestamp_t probe_rtt_done_stamp;
@@ -43,7 +45,7 @@ typedef struct bbr {
    float cwnd_gain;

    bool filled_pipe;
    uint64_t full_bw;
    prrtByteCount_t full_bw;
    uint32_t full_bw_count;

    double pacing_rate;
@@ -73,7 +75,8 @@ typedef struct bbr {

BBR* BBR_Init(void);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnRTOLoss(BBR *bbr);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);

@@ -81,7 +84,7 @@ double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
uint64_t BBR_getFullBw(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
+24 −13
Original line number Diff line number Diff line
@@ -94,18 +94,29 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
        default:;
    }

    double payloadLength = (double)packet->payloadLength;
    if (sock_ptr->pacingEnabled) {
        prrtTimeDifference_t diff;
        prrtByteCount_t space;
    prrtPacketLength_t payloadLength = packet->payloadLength;
    if (sock_ptr->pacingEnabled && sock_ptr->nextSendTime != 0) {
        debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
        prrtTimeDifference_t diff = 0;
        int64_t space;
        do {
            
            diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us();
            space = MAX(BBR_getCwnd(sock_ptr->receiver->bbr) - PrrtReceiver_get_pipe(sock_ptr->receiver), 0);
            if (diff > 0 && payloadLength <= space) {
                usleep_nano(diff / 2);
            }
        } while (diff > 0 && payloadLength <= space);
            prrtTimeDifference_t now = (prrtTimeDifference_t) PrrtClock_get_current_time_us();
            diff = ((prrtTimeDifference_t) sock_ptr->nextSendTime) - now;
            prrtByteCount_t cwnd = BBR_getCwnd(sock_ptr->receiver->bbr);
            prrtByteCount_t pipe = PrrtReceiver_get_pipe(sock_ptr->receiver);
            space = cwnd - pipe;
            debug(DEBUG_DATATRANSMITTER, "C: %u, P: %u, S: %d", cwnd, pipe, space);
            if(space < 0) {
                space = 0;
                diff = 10;
            }
            if (diff > 0 || payloadLength > space) {
                debug(DEBUG_DATATRANSMITTER, "S: %u, Pacing for %d (%u).", packet->sequenceNumber, diff, now);
                debug(DEBUG_DATATRANSMITTER, "P: %u, S: %d, N: %d, %u", payloadLength, space, diff, sock_ptr->nextSendTime);
                usleep_nano((uint32_t) diff);
            }
            //PrrtReceiver_check_rto(sock_ptr->receiver, packet->sequenceNumber, PrrtPacket_type(packet));
        } while (diff > 0 || payloadLength > space);
    } else {
        usleep_nano(1);
    }
@@ -114,9 +125,9 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
    if (sock_ptr->pacingEnabled) {
        double pacing_rate = BBR_getPacingRate(sock_ptr->receiver->bbr);
        if(pacing_rate != 0) {
            debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, pacing_rate, packet->payloadLength /
                                                                                                                             pacing_rate);
            sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * payloadLength) / pacing_rate)));
            prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate));
            debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
            sock_ptr->nextSendTime = now + pacingTime;
        }
    }

+46 −7
Original line number Diff line number Diff line
@@ -100,6 +100,50 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
}


prrtByteCount_t PrrtReceiver_removePacketAndLostData(PrrtReceiver *recv, PrrtPacket *packet, PrrtInFlightPacketStore *inflightPacketStore,
                                                     prrtSequenceNumber_t seqnum) {
    recv->packetTracking->pipe -= packet->payloadLength;
    prrtByteCount_t lostBytes = PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
    recv->packetTracking->pipe -= lostBytes;
    return lostBytes;
}

void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prrtPacketType_t packetType) {
    prrtByteCount_t lostBytes = 0;
    PrrtInFlightPacketStore *inflightPacketStore = NULL;
    if (packetType == PACKET_TYPE_DATA) {
        inflightPacketStore = recv->dataInflightPacketStore;
    } else if (packetType == PACKET_TYPE_REDUNDANCY) {
        inflightPacketStore = recv->redundancyInflightPacketStore;
    } else return;

    check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");

    PrrtPacket* packet = PrrtInFlightPacketStore_get_first_packet_before(inflightPacketStore, seqNum);
    if(packet != NULL) {
        prrtTimeDifference_t diff = ((prrtTimeDifference_t) PrrtClock_get_current_time_us()) - (packet->sent_time + 3 * BBR_getRTProp(recv->bbr));
        if(diff > 0) {
            lostBytes += packet->payloadLength;
            lostBytes += PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, packet->sequenceNumber);
            recv->packetTracking->bytes_lost = lostBytes;

            if (lostBytes > 0) {
                BBR_OnRTOLoss(recv->bbr);
            }
            if (lostBytes == 0) {
                BBR_OnLossExit(recv->bbr);
            }
        }
    }
    check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
    return;

    error:
    PERROR("PrrtReceiver_check_rto failed.")
    return;
}


prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) {
    prrtByteCount_t res = 0;
    check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
@@ -207,21 +251,16 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
        pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
        packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
    }
    recv->packetTracking->pipe -= packet->payloadLength;
    prrtByteCount_t lostBytes = PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
    recv->packetTracking->pipe -= lostBytes;
    prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum);
    recv->packetTracking->bytes_lost = lostBytes;

    // TODO: Experimental fast recovery.
    if (lostBytes > 0) {
        BBR_OnLoss(recv->bbr, recv->packetTracking);
        BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
    }
    if (lostBytes == 0) {
        BBR_OnLossExit(recv->bbr);
    }



    PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
    bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
    recv->packetTracking->prior_inflight = recv->packetTracking->pipe;
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu

void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPacketType_t param);

void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *  feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);
Loading