Commit 509b0571 authored by Andreas Schmidt's avatar Andreas Schmidt

Propagate app_limited information.

parent acce59a7
Pipeline #2052 failed with stages
in 21 seconds
......@@ -146,6 +146,7 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
float PrrtSocket_get_plr(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h":
......
......@@ -47,6 +47,11 @@ void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformatio
csi->deliveryRate = rate;
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
pthread_mutex_lock(&csi->lock);
csi->appLimited = appLimited;
pthread_mutex_unlock(&csi->lock);
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{
......@@ -71,4 +76,8 @@ prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInforma
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
return csi->deliveryRate;
}
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi) {
return csi->appLimited;
}
\ No newline at end of file
......@@ -12,16 +12,20 @@ typedef struct prrtChannelStateInformation {
bool rtprop_expired;
prrtPacketLossRate_t plr;
prrtDeliveryRate_t deliveryRate;
bool appLimited;
} PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
......
......@@ -43,6 +43,7 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->delivery_rate);
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
......
......@@ -107,6 +107,21 @@ void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packe
packet->delivered_time = 0;
}
void PrrtReceiver_on_application_write(PrrtReceiver* receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
PrrtPacketTracking *tracking = receiver->packetTracking;
if(PrrtInFlightPacketStore_get_queue_size(receiver->dataPacketStates) +
PrrtInFlightPacketStore_get_queue_size(receiver->redundancyPacketStates) == 0) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
}
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return;
error:
PERROR("Mutex error.%s", "");
}
void PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
/* Clear app-limited field */
if (packetTracking->app_limited && packetTracking->delivered > packetTracking->app_limited)
......@@ -159,6 +174,7 @@ void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
......@@ -178,18 +194,18 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
} else return;
//printf("Adding Packet #%u to %u\n", packet->sequenceNumber, PrrtPacket_type(packet));
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
recv->packetTracking->app_limited = packet->is_app_limited;
if (PrrtInFlightPacketStore_get_queue_size(recv->dataPacketStates) +
PrrtInFlightPacketStore_get_queue_size(recv->redundancyPacketStates) == 0) {
recv->packetTracking->first_sent_time = sentTime;
recv->packetTracking->delivered_time = sentTime;
}
recv->packetTracking->pipe += packet->payloadLength;
packet->first_sent_time = recv->packetTracking->first_sent_time;
packet->sent_time = sentTime;
packet->delivered_time = recv->packetTracking->delivered_time;
packet->delivered = recv->packetTracking->delivered;
//packet->is_app_limited = packetStore->app_limited;
packet->is_app_limited = (recv->packetTracking->app_limited != 0);
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
......
......@@ -23,10 +23,11 @@ typedef struct prrtRateSample {
} PrrtRateSample;
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
bool app_limited;
prrtSequenceNumber_t app_limited;
} PrrtPacketTracking;
typedef struct prrtReceiver {
......@@ -52,6 +53,8 @@ void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
void PrrtReceiver_on_application_write(PrrtReceiver* receiver);
void PrrtReceiver_interrupt(PrrtReceiver *recv);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
......
......@@ -258,6 +258,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Pipe_push(s->sendDataQueue, &packet->asListNode);
PrrtReceiver_on_application_write(s->receiver);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
......@@ -593,3 +594,7 @@ prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s) {
prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s) {
return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
}
bool PrrtSocket_get_app_limited(PrrtSocket *s) {
return PrrtChannelStateInformation_get_app_limited(s->receiver->csi);
}
......@@ -131,5 +131,6 @@ uint32_t PrrtSocket_get_rtprop(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
#endif // PRRT_SOCKET_H
......@@ -101,6 +101,9 @@ class PrrtCodingConfiguration:
def __repr__(self):
return "PrrtCodingConfiguration(n={},k={},n_cycle={})".format(self.n, self.k, self.n_cycle)
def __repr__(self):
return "({},{},{})".format(self.n, self.k, self.n_cycle)
#property n_cycle:
# def __get__(self):
# return list(<uint8_t[:self._c_config.c]> self._c_config.n_cycle)
......@@ -160,7 +163,9 @@ cdef class PrrtSocket:
if not self.isSender:
raise Exception("Not a sender.")
cdef cprrt.PrrtCodingParams *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
return PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
result = PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
cprrt.PrrtCoding
return result
def __set__(self, params: PrrtCodingConfiguration):
cdef uint8_t* n_cycle
......@@ -176,6 +181,12 @@ cdef class PrrtSocket:
raise Exception("Not a sender.")
return cprrt.PrrtSocket_get_delivery_rate(self._c_socket)
property app_limited:
def __get__(self):
if not self.isSender:
raise Exception("Not a sender.")
return cprrt.PrrtSocket_get_app_limited(self._c_socket)
def recv(self):
cdef char buffer[65536]
cdef int32_t len
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment