Commit 43734ead authored by Andreas Schmidt's avatar Andreas Schmidt

Add bindings for btlbw.

parent b3c8b737
Pipeline #2147 failed with stages
in 11 seconds
......@@ -146,6 +146,7 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
float PrrtSocket_get_plr(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw(PrrtSocket *socket)
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
......
......@@ -4,6 +4,7 @@
#include "../util/dbg.h"
#include "channelStateInformation.h"
#include "clock.h"
#include "receiver.h"
PrrtChannelStateInformation * PrrtChannelStateInformation_create()
{
......@@ -15,6 +16,15 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
csi->rtprop = TIMESTAMP_SPACE;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
csi->btlbw = 0;
csi->btlbw_next_round_delivered = 0;
csi->btlbw_round_start = false;
csi->btlbw_round_count = 0;
csi->btlbw_filter_length = 10;
csi->appLimited = 0;
csi->plr = 0.0;
return csi;
error:
......@@ -42,9 +52,20 @@ void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, pr
}
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate) {
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, PrrtRateSample *rateSample) {
pthread_mutex_lock(&csi->lock);
prrtDeliveryRate_t rate = rateSample->delivery_rate;
csi->deliveryRate = rate;
if(csi->btlbw_rounds > csi->btlbw_filter_length_rtts) {
csi->btlbw = 0;
}
csi->btlbw_rounds += 1;
if(csi->btlbw < rate) {
csi->btlbw = rate;
csi->btlbw_rounds = 0;
}
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
......@@ -78,6 +99,10 @@ prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStat
return csi->deliveryRate;
}
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {
return csi->btlbw;
}
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi) {
return csi->appLimited;
}
\ No newline at end of file
......@@ -6,12 +6,22 @@
typedef struct prrtChannelStateInformation {
pthread_mutex_t lock;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired;
prrtPacketLossRate_t plr;
prrtDeliveryRate_t deliveryRate;
prrtDeliveryRate_t btlbw;
prrtByteCount_t btlbw_next_round_delivered;
bool btlbw_round_start;
uint32_t btlbw_round_count;
uint8_t btlbw_filter_length;
bool appLimited;
} PrrtChannelStateInformation;
......@@ -19,12 +29,13 @@ PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, PrrtRateSample *rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
......
......@@ -40,9 +40,11 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->delivery_rate);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
......
......@@ -153,14 +153,14 @@ bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTrack
return true;
}
void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return;
} else return false;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
......@@ -169,13 +169,13 @@ void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
return;
return result;
error:
PERROR("Mutex error.%s", "");
......
......@@ -48,7 +48,7 @@ typedef struct prrtReceiver {
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port);
void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime);
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
......
......@@ -595,6 +595,10 @@ prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s) {
return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
}
prrtDeliveryRate_t PrrtSocket_get_btlbw(PrrtSocket *s) {
return PrrtChannelStateInformation_get_btlbw(s->receiver->csi);
}
bool PrrtSocket_get_app_limited(PrrtSocket *s) {
return PrrtChannelStateInformation_get_app_limited(s->receiver->csi);
}
......@@ -131,6 +131,7 @@ uint32_t PrrtSocket_get_rtprop(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
#endif // PRRT_SOCKET_H
......@@ -164,7 +164,6 @@ cdef class PrrtSocket:
raise Exception("Not a sender.")
cdef cprrt.PrrtCodingParams *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
result = PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
cprrt.PrrtCoding
return result
def __set__(self, params: PrrtCodingConfiguration):
......@@ -181,6 +180,12 @@ cdef class PrrtSocket:
raise Exception("Not a sender.")
return cprrt.PrrtSocket_get_delivery_rate(self._c_socket)
property btlbw:
def __get__(self):
if not self.isSender:
raise Exception("Not a sender.")
return cprrt.PrrtSocket_get_btlbw(self._c_socket)
property app_limited:
def __get__(self):
if not self.isSender:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment