Commit 89e337a9 authored by Andreas Schmidt's avatar Andreas Schmidt

Add support for delivery rate estimation.

parent 21857f60
Pipeline #1761 failed with stages
in 1 minute and 8 seconds
...@@ -71,7 +71,7 @@ cdef extern from "proto/block.h": ...@@ -71,7 +71,7 @@ cdef extern from "proto/block.h":
ctypedef prrtBlock PrrtBlock ctypedef prrtBlock PrrtBlock
cdef extern from "proto/packet.h": cdef extern from "proto/types/packet.h":
cdef struct prrtPacket: cdef struct prrtPacket:
uint8_t type_priority; uint8_t type_priority;
uint8_t index; uint8_t index;
...@@ -145,6 +145,7 @@ cdef extern from "proto/socket.h": ...@@ -145,6 +145,7 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket) bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket) uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
float PrrtSocket_get_plr(PrrtSocket *socket) float PrrtSocket_get_plr(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h": cdef extern from "proto/stores/packetDeliveryStore.h":
......
...@@ -3,22 +3,23 @@ add_library(PRRT ../defines.h ...@@ -3,22 +3,23 @@ add_library(PRRT ../defines.h
channelStateInformation.c channelStateInformation.h channelStateInformation.c channelStateInformation.h
clock.c clock.h clock.c clock.h
codingParams.c codingParams.h codingParams.c codingParams.h
packet.c packet.h
receiver.c receiver.h receiver.c receiver.h
socket.c socket.h socket.c socket.h
../xlap/xlap.c ../xlap/xlap.h ../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h applicationConstraints.c applicationConstraints.h
vdmcode/block_code.c vdmcode/block_code.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
types/lossStatistics.c types/lossStatistics.h
processes/dataReceiver.c processes/dataReceiver.h processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h processes/dataTransmitter.c processes/dataTransmitter.h
stores/repairBlockStore.c stores/repairBlockStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h stores/dataPacketStore.c stores/dataPacketStore.h
types/packetTimeout.c types/packetTimeout.h stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h) stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
target_link_libraries(PRRT rt) target_link_libraries(PRRT rt)
#ifndef PRRT_NETWORKCONTRAINTS_H #ifndef PRRT_NETWORKCONTRAINTS_H
#define PRRT_NETWORKCONTRAINTS_H #define PRRT_NETWORKCONTRAINTS_H
#include "packet.h" #include "types/packet.h"
typedef struct applicationConstraints { typedef struct applicationConstraints {
prrtTimedelta_t targetDelay_us; prrtTimedelta_t targetDelay_us;
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include "../util/list.h" #include "../util/list.h"
#include "../util/dbg.h" #include "../util/dbg.h"
#include "../util/common.h" #include "../util/common.h"
#include "packet.h" #include "types/packet.h"
#include "block.h" #include "block.h"
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p) static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <stdbool.h> #include <stdbool.h>
#include "codingParams.h" #include "codingParams.h"
#include "packet.h" #include "types/packet.h"
#include "../util/list.h" #include "../util/list.h"
#include "vdmcode/block_code.h" #include "vdmcode/block_code.h"
......
...@@ -14,6 +14,7 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create() ...@@ -14,6 +14,7 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
csi->rtprop = TIMESTAMP_SPACE; csi->rtprop = TIMESTAMP_SPACE;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
return csi; return csi;
error: error:
...@@ -41,6 +42,12 @@ void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, pr ...@@ -41,6 +42,12 @@ void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, pr
} }
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate) {
pthread_mutex_lock(&csi->lock);
csi->deliveryRate = rate;
pthread_mutex_unlock(&csi->lock);
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi) prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{ {
pthread_mutex_lock(&csi->lock); pthread_mutex_lock(&csi->lock);
...@@ -61,3 +68,7 @@ bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi) ...@@ -61,3 +68,7 @@ bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi)
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation *csi) { prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation *csi) {
return csi->plr; return csi->plr;
} }
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
return csi->deliveryRate;
}
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#define PRRT_CHANNELSTATEINFORMATION_H #define PRRT_CHANNELSTATEINFORMATION_H
#include <stdbool.h> #include <stdbool.h>
#include "packet.h" #include "types/packet.h"
typedef struct prrtChannelStateInformation { typedef struct prrtChannelStateInformation {
pthread_mutex_t lock; pthread_mutex_t lock;
...@@ -11,14 +11,17 @@ typedef struct prrtChannelStateInformation { ...@@ -11,14 +11,17 @@ typedef struct prrtChannelStateInformation {
prrtTimedelta_t rtprop_filter_length_us; prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired; bool rtprop_expired;
prrtPacketLossRate_t plr; prrtPacketLossRate_t plr;
prrtDeliveryRate_t deliveryRate;
} PrrtChannelStateInformation; } PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void); PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop); void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi); prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi); prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures, void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets); prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi); bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi); void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
......
#ifndef PRRT_CLOCK_H #ifndef PRRT_CLOCK_H
#define PRRT_CLOCK_H #define PRRT_CLOCK_H
#include "packet.h" #include "types/packet.h"
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
......
...@@ -50,7 +50,25 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { ...@@ -50,7 +50,25 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
PERROR("Decoding failed.%s", "") PERROR("Decoding failed.%s", "")
} }
static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote) { static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp,
prrtTimestamp_t sentTimestamp, prrtPacketType_t type) {
enum XlapTimestampPacketKind kind = ts_data_packet;
if (type == PACKET_TYPE_DATA) {
kind = ts_data_packet;
} else if (type == PACKET_TYPE_REDUNDANCY) {
kind = ts_redundancy_packet;
}
debug(DEBUG_FEEDBACK, "Send feedback %d %d", type, seqno);
XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);
prrtFeedback_t feedback = {
.seqNo = seqno,
.type = type,
.receivedTime = receiveStamp,
.sentTime = sentTimestamp
};
uint16_t remote_port = ntohs(remote.sin_port); uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr); char *remote_host = inet_ntoa(remote.sin_addr);
...@@ -72,7 +90,8 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote) { ...@@ -72,7 +90,8 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote) {
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT, PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength, stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime, stats.burstCount, forwardTripTime,
stats.erasureCount, stats.packetCount); stats.erasureCount, stats.packetCount, feedback.seqNo,
feedback.type);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr); prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length); void *buf = calloc(1, length);
check_mem(buf); check_mem(buf);
...@@ -84,6 +103,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote) { ...@@ -84,6 +103,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote) {
free(buf); free(buf);
PrrtPacket_destroy(feedback_pkt_ptr); PrrtPacket_destroy(feedback_pkt_ptr);
XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackEnd);
return true; return true;
...@@ -98,11 +118,12 @@ static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) { ...@@ -98,11 +118,12 @@ static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
return false; return false;
} }
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote) { static void
handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacketDataPayload *payload = packet->payload; PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t dataTimestamp = payload->timestamp; prrtTimestamp_t sentTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = dataTimestamp; sock_ptr->lastSentTimestamp = sentTimestamp;
PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTprop_us); PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet); PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet."); check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
...@@ -110,14 +131,9 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct ...@@ -110,14 +131,9 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
prrtSequenceNumber_t seqno = packet->sequenceNumber; prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno); PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackStart);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackEnd);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock); prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if (is_timeout(now, payload->packetTimeout_us)) { if (is_timeout(now, payload->packetTimeout_us)) {
PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber); PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
// TODO: note this as loss
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now, debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us); (unsigned long) payload->packetTimeout_us);
...@@ -247,7 +263,8 @@ void *receive_data_loop(void *ptr) { ...@@ -247,7 +263,8 @@ void *receive_data_loop(void *ptr) {
receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_stamp); receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_stamp);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_stamp.tv_sec, debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_stamp.tv_sec,
packet_recv_stamp.tv_nsec, inet_ntoa(remote.sin_addr)); packet_recv_stamp.tv_nsec, inet_ntoa(remote.sin_addr));
sock_ptr->lastReceivedTimestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_stamp); prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_stamp);
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive); XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive); XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
...@@ -263,11 +280,15 @@ void *receive_data_loop(void *ptr) { ...@@ -263,11 +280,15 @@ void *receive_data_loop(void *ptr) {
prrtPacketType_t packetType = PrrtPacket_type(packet); prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno); debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
enum XlapTimestampPacketKind kind = ts_any_packet; enum XlapTimestampPacketKind kind = ts_any_packet;
prrtTimestamp_t sentTimestamp;
if (packetType == PACKET_TYPE_DATA) { if (packetType == PACKET_TYPE_DATA) {
kind = ts_data_packet; kind = ts_data_packet;
sentTimestamp = PrrtPacket_get_data_timestamp(packet);
} else if (packetType == PACKET_TYPE_REDUNDANCY) { } else if (packetType == PACKET_TYPE_REDUNDANCY) {
kind = ts_redundancy_packet; kind = ts_redundancy_packet;
sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
} }
if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) { if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_stamp); XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_stamp);
...@@ -277,8 +298,11 @@ void *receive_data_loop(void *ptr) { ...@@ -277,8 +298,11 @@ void *receive_data_loop(void *ptr) {
XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph3); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph3);
XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart); XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart);
send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
if (packetType == PACKET_TYPE_DATA) { if (packetType == PACKET_TYPE_DATA) {
handle_data_packet(sock_ptr, packet, remote); handle_data_packet(sock_ptr, packet);
} else if (packetType == PACKET_TYPE_REDUNDANCY) { } else if (packetType == PACKET_TYPE_REDUNDANCY) {
handle_redundancy_packet(sock_ptr, packet); handle_redundancy_packet(sock_ptr, packet);
} else { } else {
......
#ifndef PRRT_DATA_RECEIVER_H #ifndef PRRT_DATA_RECEIVER_H
#define PRRT_DATA_RECEIVER_H #define PRRT_DATA_RECEIVER_H
#include "../types/packet.h"
typedef struct prrtFeedback {
prrtSequenceNumber_t seqNo;
prrtPacketType_t type;
prrtTimestamp_t sentTime;
prrtTimestamp_t receivedTime;
} prrtFeedback_t;
void * receive_data_loop(void *ptr); void * receive_data_loop(void *ptr);
#endif //PRRT_DATA_RECEIVER_H #endif //PRRT_DATA_RECEIVER_H
...@@ -95,6 +95,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -95,6 +95,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp); send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp);
XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp); XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));
switch (PrrtPacket_type(packet)) { switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA: case PACKET_TYPE_DATA:
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
...@@ -111,8 +113,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -111,8 +113,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
default:; default:;
} }
PrrtPacket_destroy(packet);
return true; return true;
error: error:
......
...@@ -40,6 +40,10 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length) ...@@ -40,6 +40,10 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload; PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us; prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->delivery_rate);
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi, PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount); PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
......
#include <malloc.h> #include <malloc.h>
#include "../xlap/xlap.h"
#include "../util/common.h" #include "../util/common.h"
#include "../util/dbg.h" #include "../util/dbg.h"
#include "stores/inFlightPacketStore.h"
#include "receiver.h" #include "receiver.h"
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
{
PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver)); PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
check_mem(recv); check_mem(recv);
recv->host_name = strdup(host); recv->host_name = strdup(host);
check_mem(recv->host_name); check_mem(recv->host_name);
recv->port = port; recv->port = port;
struct addrinfo *info; PrrtRateSample *rateSample = calloc(1, sizeof(PrrtRateSample));
struct addrinfo hints; check_mem(rateSample);
char portstr[sizeof(port)*8+1]; recv->rateSample = rateSample;
snprintf(portstr, sizeof(portstr), "%u", (unsigned int) port);
memset(&hints, 0x0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
check(0 == getaddrinfo(host, portstr, &hints, &info), "getaddrinfo");
recv->ai = info;
recv->csi = PrrtChannelStateInformation_create(); PrrtPacketTracking *packetTracking = calloc(1, sizeof(PrrtPacketTracking));
check_mem(packetTracking);
recv->packetTracking = packetTracking;
recv->csi = PrrtChannelStateInformation_create();
recv->dataPacketStates = PrrtInFlightPacketStore_create();
recv->redundancyPacketStates = PrrtInFlightPacketStore_create();
struct addrinfo *info;
struct addrinfo hints;
char portstr[sizeof(port) * 8 + 1];
snprintf(portstr, sizeof(portstr), "%u", (unsigned int) port);
memset(&hints, 0x0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
check(0 == getaddrinfo(host, portstr, &hints, &info), "getaddrinfo");
recv->ai = info;
check(pthread_mutex_init(&recv->lock, NULL) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
check(pthread_cond_init(&recv->recordNotFoundCv, NULL) == 0, "recordNotFound init failed.");
return recv; return recv;
error: error:
if (recv != NULL) { if (recv != NULL) {
free((void *) recv->host_name); free((void *) recv->host_name);
free(recv); free(recv);
} }
//PERROR("Memory issue.%s",""); //PERROR("Memory issue.%s","");
return NULL; return NULL;
} }
bool PrrtReceiver_destroy(PrrtReceiver *receiver) { bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
if (receiver->csi != NULL) { if (receiver->csi != NULL) {
PrrtChannelStateInformation_destroy(receiver->csi); PrrtChannelStateInformation_destroy(receiver->csi);
} }
freeaddrinfo(receiver->ai); if (receiver->dataPacketStates != NULL) {
free((void *) receiver->host_name); PrrtInFlightPacketStore_destroy(receiver->dataPacketStates);
free(receiver); }
return true;
if (receiver->redundancyPacketStates != NULL) {
PrrtInFlightPacketStore_destroy(receiver->redundancyPacketStates);
}
if (receiver->packetTracking != NULL) {
free(receiver->packetTracking);
}
if (receiver->rateSample != NULL) {
free(receiver->rateSample);
}
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
check(pthread_cond_destroy(&receiver->pipeNotFullCv) == 0, "pipeNotFullCv destroy failed.");
check(pthread_cond_destroy(&receiver->recordNotFoundCv) == 0, "recordNotFoundCv destroy failed.");
free(receiver);
return true;
error:
PERROR("PrrtReceiver_destroy failed.")
return false;
}
void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime,
PrrtPacketTracking *packetTracking) {
if (packet->delivered_time == 0)
return;
packetTracking->delivered += packet->payloadLength;
packetTracking->delivered_time = receiveTime;
if (packet->delivered > rateSample->prior_delivered) {
rateSample->prior_delivered = packet->delivered;
rateSample->prior_time = packet->delivered_time;
rateSample->is_app_limited = packet->is_app_limited;
rateSample->send_elapsed = packet->sent_time - packet->first_sent_time;
rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time;
packetTracking->first_sent_time = packet->sent_time;
}
packet->delivered_time = 0;
}
void PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
/* Clear app-limited field */
if (packetTracking->app_limited && packetTracking->delivered > packetTracking->app_limited)
packetTracking->app_limited = 0;
if (rateSample->prior_time == 0) {
//printf("Prior Time is 0, Cancelling Rate sample calculation\n");
return;
}
rateSample->interval = MAX(rateSample->send_elapsed, rateSample->ack_elapsed);
rateSample->delivered = packetTracking->delivered - rateSample->prior_delivered;
if (rateSample->interval != 0) {
// delivered: bytes; interval: us; convert to bps
rateSample->delivery_rate =
rateSample->delivered * 1000 * 1000 * 8 / rateSample->interval;
}
debug(DEBUG_FEEDBACK, "RS interval: %u, RS delivered: %u, RS delivery_rate: %u", rateSample->interval,
rateSample->delivered, rateSample->delivery_rate);
/* Normally we expect interval >= MinRTT.
* Note that rate may still be over-estimated when a spuriously
* retransmitted skb was first (s)acked because "interval"
* is under-estimated (up to an RTT). However, continuously
* measuring the delivery rate during loss recovery is crucial
* for connections suffer heavy or prolonged losses.
if (packetStore->rateSample->interval < MinRTT(tp))
packetStore->rateSample->interval = -1;
return false // no reliable sample
*/
}
void PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
while (packet == NULL) {
pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
return;
error:
PERROR("Mutex error.%s", "");
}
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
} else if (PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return;
//printf("Adding Packet #%u to %u\n", packet->sequenceNumber, PrrtPacket_type(packet));
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
recv->packetTracking->app_limited = packet->is_app_limited;
if (PrrtInFlightPacketStore_get_queue_size(recv->dataPacketStates) +
PrrtInFlightPacketStore_get_queue_size(recv->redundancyPacketStates) == 0) {
recv->packetTracking->first_sent_time = sentTime;
recv->packetTracking->delivered_time = sentTime;
}
packet->first_sent_time = recv->packetTracking->first_sent_time;
packet->sent_time = sentTime;
packet->delivered_time = recv->packetTracking->delivered_time;
packet->delivered = recv->packetTracking->delivered;
//packet->is_app_limited = packetStore->app_limited;
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);