Commit e6e59503 authored by rna's avatar rna
Browse files

Refactor receiver. Add condition variable.

parent 085ff10d
Loading
Loading
Loading
Loading
+97 −82
Original line number Diff line number Diff line
@@ -6,6 +6,15 @@
#include "stores/inFlightPacketStore.h"
#include "receiver.h"

prrtByteCount_t remove_packet_and_lost_data(PrrtReceiver *recv, PrrtPacket *packet,
                                            PrrtInFlightPacketStore *inflightPacketStore,
                                            prrtSequenceNumber_t seqnum) {
    recv->packetTracking->pipe -= packet->payloadLength;
    prrtByteCount_t lostBytes = PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
    recv->packetTracking->pipe -= lostBytes;
    return lostBytes;
}

void update_rate_sample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime,
                        PrrtPacketTracking *packetTracking) {
    if (packet->delivered_time == 0)
@@ -25,6 +34,69 @@ void update_rate_sample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTime
    packet->delivered_time = 0;
}

bool gnerate_rate_sample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
    /* Clear app-limited field */
    if (packetTracking->app_limited > 0 && packetTracking->delivered > packetTracking->app_limited)
        packetTracking->app_limited = 0;

    if (rateSample->prior_time == 0) {
        //printf("Prior Time is 0, Cancelling Rate sample calculation\n");
        return false;
    }

    prrtTimedelta_t interval = MAX(rateSample->send_elapsed, rateSample->ack_elapsed);
    if(interval < MIN_RTT) {
        return false;
    }

    rateSample->interval = interval;

    rateSample->delivered = packetTracking->delivered - rateSample->prior_delivered;

    if (rateSample->interval != 0) {
        // delivered: bytes; interval: us; convert to bps
        rateSample->delivery_rate = (uint32_t) round(
                (((double) rateSample->delivered) * 1000.0 * 1000.0) / ((double) rateSample->interval));
    }
    debug(DEBUG_FEEDBACK, "RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u", rateSample->interval,
          rateSample->delivered, rateSample->delivery_rate, rateSample->is_app_limited);

    return true;
}

bool update_and_generate_rate_sample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
                                     prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
    PrrtInFlightPacketStore *inflightPacketStore = NULL;
    if (packetType == PACKET_TYPE_DATA) {
        inflightPacketStore = recv->dataInflightPacketStore;
    } else if (packetType == PACKET_TYPE_REDUNDANCY) {
        inflightPacketStore = recv->redundancyInflightPacketStore;
    } else return false;
    bool result = false;

    PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
    if(packet != NULL) {
        update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking);
        prrtByteCount_t lostBytes = remove_packet_and_lost_data(recv, packet, inflightPacketStore, seqnum);
        recv->packetTracking->bytes_lost = lostBytes;

        if (lostBytes > 0) {
            BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
        }
        if (lostBytes == 0) {
            BBR_OnLossExit(recv->bbr);
        }

        result = gnerate_rate_sample(recv->rateSample, recv->packetTracking);
        recv->packetTracking->prior_inflight = recv->packetTracking->pipe;

        if(recv->rateSample != NULL) {
            BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
        }
    }
    return result;
}

PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss) {
    PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
    check_mem(recv);
@@ -63,6 +135,9 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
    check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");

    check(pthread_cond_init(&recv->wait_for_space, NULL) == EXIT_SUCCESS, "Condition init failed.");

    return recv;

    error:
@@ -103,6 +178,7 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
    freeaddrinfo(receiver->ai);
    free((void *) receiver->host_name);
    check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
    check(pthread_cond_destroy(&receiver->wait_for_space) == 0, "cond destroy failed.");
    free(receiver);
    return true;

@@ -112,13 +188,6 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
}


prrtByteCount_t PrrtReceiver_removePacketAndLostData(PrrtReceiver *recv, PrrtPacket *packet, PrrtInFlightPacketStore *inflightPacketStore,
                                                     prrtSequenceNumber_t seqnum) {
    recv->packetTracking->pipe -= packet->payloadLength;
    prrtByteCount_t lostBytes = PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
    recv->packetTracking->pipe -= lostBytes;
    return lostBytes;
}

void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prrtPacketType_t packetType) {
    prrtByteCount_t lostBytes = 0;
@@ -136,7 +205,7 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prr
        prrtTimeDifference_t diff = ((prrtTimeDifference_t) PrrtClock_get_current_time_us()) - (packet->sent_time + 3 * BBR_getRTProp(recv->bbr));
        if(diff > 0) {
            lostBytes += packet->payloadLength;
            lostBytes += PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, packet->sequenceNumber);
            lostBytes += remove_packet_and_lost_data(recv, packet, inflightPacketStore, packet->sequenceNumber);
            recv->packetTracking->bytes_lost = lostBytes;

            if (lostBytes > 0) {
@@ -147,6 +216,7 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prr
            }
        }
    }
    pthread_cond_broadcast(&recv->wait_for_space);
    check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
    return;

@@ -187,12 +257,14 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * fee
    //debug(DEBUG_RECEIVER, "PrrtReceiver_on_ack");
    PrrtChannelStateInformation_update_plr(receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);

    bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime, rtt);
    bool valid_sample = update_and_generate_rate_sample(receiver, feedbackPayload->ackSequenceNumber,
                                                        feedbackPayload->ackPacketType, receiveTime, rtt);
    if(valid_sample) {
        PrrtChannelStateInformation_update_delivery_rate(receiver->csi, receiver->rateSample->delivery_rate);
    }
    PrrtChannelStateInformation_update_app_limited(receiver->csi, receiver->rateSample->is_app_limited);

    pthread_cond_broadcast(&receiver->wait_for_space);
    check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
    return;

@@ -211,97 +283,38 @@ double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) {
    return 0.0;
}

prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver) {
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver) {
    check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
    prrtByteCount_t res = BBR_getCwnd(receiver->bbr);
    prrtByteCount_t cwnd = BBR_getCwnd(receiver->bbr);
    prrtByteCount_t pipe = receiver->packetTracking->pipe;
    check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
    return res;
    return cwnd - pipe;

    error:
    PERROR("Mutex error.%s", "");
    return 0;
}

prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver) {
void PrrtReceiver_wait_for_space(PrrtReceiver *receiver) {
    check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
    prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr);
    pthread_cond_wait(&receiver->wait_for_space, &receiver->lock);
    check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
    return res;
    return;

    error:
    PERROR("Mutex error.%s", "");
    return 0;
}


bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
    /* Clear app-limited field */
    if (packetTracking->app_limited > 0 && packetTracking->delivered > packetTracking->app_limited)
        packetTracking->app_limited = 0;

    if (rateSample->prior_time == 0) {
        //printf("Prior Time is 0, Cancelling Rate sample calculation\n");
        return false;
    }

    prrtTimedelta_t interval = MAX(rateSample->send_elapsed, rateSample->ack_elapsed);
    if(interval < MIN_RTT) {
        return false;
    }

    rateSample->interval = interval;

    rateSample->delivered = packetTracking->delivered - rateSample->prior_delivered;

    if (rateSample->interval != 0) {
        // delivered: bytes; interval: us; convert to bps
        rateSample->delivery_rate = (uint32_t) round(
                (((double) rateSample->delivered) * 1000.0 * 1000.0) / ((double) rateSample->interval));
    }
    debug(DEBUG_FEEDBACK, "RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u", rateSample->interval,
          rateSample->delivered, rateSample->delivery_rate, rateSample->is_app_limited);

    return true;
}

bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
                                              prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
    PrrtInFlightPacketStore *inflightPacketStore = NULL;
    if (packetType == PACKET_TYPE_DATA) {
        inflightPacketStore = recv->dataInflightPacketStore;
    } else if (packetType == PACKET_TYPE_REDUNDANCY) {
        inflightPacketStore = recv->redundancyInflightPacketStore;
    } else return false;
    bool result = false;

    check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
    PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
    if(packet != NULL) {
        update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking);
        prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum);
        recv->packetTracking->bytes_lost = lostBytes;

        if (lostBytes > 0) {
            BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
        }
        if (lostBytes == 0) {
            BBR_OnLossExit(recv->bbr);
        }

        result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
        recv->packetTracking->prior_inflight = recv->packetTracking->pipe;

        if(recv->rateSample != NULL) {
            BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
        }
    return;
}
    check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");

    return result;
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver) {
    check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
    prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr);
    check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
    return res;

    error:
    PERROR("Mutex error.%s", "");
    return false;
    return 0;
}

void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime) {
@@ -326,6 +339,8 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
    packet->is_app_limited = (recv->packetTracking->app_limited != 0);

    PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);

    pthread_cond_broadcast(&recv->wait_for_space);
    check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");

    return;
+5 −5
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ typedef struct prrtReceiver {
    struct addrinfo *ai;
    PrrtChannelStateInformation *csi;
    pthread_mutex_t lock;
    pthread_cond_t wait_for_space;

    BBR* bbr;

@@ -29,9 +30,6 @@ typedef struct prrtReceiver {

PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss);

bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
                                              prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);

void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPacketType_t param);
@@ -39,9 +37,11 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPack
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i, prrtSequenceNumber_t i1);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *  feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);

void PrrtReceiver_wait_for_space(PrrtReceiver *receiver);

prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver);
prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);

bool PrrtReceiver_destroy(PrrtReceiver *receiver);