Commit 17d15612 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Channel state information uses proper atomics and locking.

parent 62cbf1dc
Loading
Loading
Loading
Loading
+57 −39
Original line number Diff line number Diff line
@@ -32,66 +32,84 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
    return NULL;
}

void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi)
{
    pthread_mutex_lock(&csi->lock);
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
    if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
        csi->rtprop = rtprop;
        csi->rtprop_stamp = now;
    }
    pthread_mutex_unlock(&csi->lock);
    check(pthread_mutex_destroy(&csi->lock) == EXIT_SUCCESS, "Destroy mutex failed.");
    free(csi);
    return true;
    error:
    return false;
}

// PLR
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
                                            prrtSequenceNumber_t packets) {
    pthread_mutex_lock(&csi->lock);
    check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
    csi->plr = ((float) erasures) / packets;
    pthread_mutex_unlock(&csi->lock);
    check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
    return;
    error:
    PERROR("PrrtChannelStateInformation_update_plr() failed.");
}

prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation *csi) {
    check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
    prrtPacketLossRate_t res = csi->plr;
    check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
    return res;
    error:
    PERROR("PrrtChannelStateInformation_get_plr() failed.");
    return 0;
}

void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, prrtDeliveryRate_t rate) {
    pthread_mutex_lock(&csi->lock);
    csi->deliveryRate = rate;
    pthread_mutex_unlock(&csi->lock);
// RTprop
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
{
    check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
    if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
        csi->rtprop = rtprop;
        csi->rtprop_stamp = now;
    }
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
    pthread_mutex_lock(&csi->lock);
    csi->appLimited = appLimited;
    pthread_mutex_unlock(&csi->lock);
    check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
    return;
    error:
    PERROR("PrrtChannelStateInformation_update_rtprop() failed.");
}

prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{
    pthread_mutex_lock(&csi->lock);
    prrtTimedelta_t res = csi->rtprop;
    check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
    prrtAtomicTimedelta_t res = csi->rtprop;
    pthread_mutex_unlock(&csi->lock);
    return res;
}

bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi)
{
    check(pthread_mutex_destroy(&csi->lock) == EXIT_SUCCESS, "Destroy mutex failed.");
    free(csi);
    return true;
    check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
    return (prrtTimedelta_t) res;
    error:
        return false;
    PERROR("PrrtChannelStateInformation_get_rtprop() failed.");
    return 0;
}

prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation *csi) {
    return csi->plr;
// Delivery Rate
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate) {
    atomic_store_explicit(&csi->deliveryRate, rate, memory_order_release);
}

prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
    return csi->deliveryRate;
    return (prrtDeliveryRate_t) atomic_load_explicit(&csi->deliveryRate, memory_order_acquire);
}

prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {
    return csi->btlbw;
// App Limited
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
    atomic_store_explicit(&csi->appLimited, appLimited, memory_order_release);
}

bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi) {
    return csi->appLimited;
    return atomic_load_explicit(&csi->appLimited, memory_order_acquire);
}

// BtlBw
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {
    return (prrtDeliveryRate_t) atomic_load_explicit(&csi->btlbw, memory_order_acquire);
}
+5 −5
Original line number Diff line number Diff line
@@ -7,22 +7,22 @@
typedef struct prrtChannelStateInformation {
    pthread_mutex_t lock;

    prrtTimedelta_t rtprop;
    prrtAtomicTimedelta_t rtprop;
    prrtTimestamp_t rtprop_stamp;
    prrtTimedelta_t rtprop_filter_length_us;
    bool rtprop_expired;

    prrtPacketLossRate_t plr;

    prrtDeliveryRate_t deliveryRate;
    prrtAtomicDeliveryRate_t deliveryRate;

    prrtDeliveryRate_t btlbw;
    prrtAtomicDeliveryRate_t btlbw;
    prrtByteCount_t btlbw_next_round_delivered;
    bool btlbw_round_start;
    uint32_t btlbw_round_count;
    uint8_t btlbw_filter_length;

    bool appLimited;
    atomic_bool appLimited;
} PrrtChannelStateInformation;

PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
@@ -35,7 +35,7 @@ bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *cs
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
                                                   prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);

+2 −1
Original line number Diff line number Diff line
@@ -213,7 +213,8 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt
    debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");

    if(valid_sample) {
        PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample->delivery_rate);
        PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi,
                                                         prrtSocket->receiver->rateSample->delivery_rate);
    }
    PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
    debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");
+8 −0
Original line number Diff line number Diff line
@@ -19,12 +19,20 @@ typedef enum {

typedef uint16_t prrtSequenceNumber_t;
typedef uint8_t prrtIndex_t;

typedef uint32_t prrtTimestamp_t; // microsecond
typedef atomic_uint_fast32_t prrtAtomicTimestamp_t;

typedef uint32_t prrtTimedelta_t; // microsecond
typedef atomic_uint_fast32_t prrtAtomicTimedelta_t;

typedef int32_t prrtTimeDifference_t; // microsecond
typedef uint32_t prrtPacketLength_t; // bytes
typedef float prrtPacketLossRate_t;

typedef uint32_t prrtDeliveryRate_t; // bits per second
typedef atomic_uint_fast32_t prrtAtomicDeliveryRate_t;

typedef uint32_t prrtByteCount_t;
typedef uint8_t prrtPacketType_t;