Commit 59fe6d48 authored by Andreas Schmidt's avatar Andreas Schmidt

Add retransmissions.

parent 21f9500b
Pipeline #2694 passed with stages
in 1 minute and 37 seconds
......@@ -89,9 +89,16 @@ local ex_type = Field.new("prrt.type")
local function getType() return ex_type()() end
local function getTypeName() return prrtPacketTypeNames[getType()] end
local ex_index = Field.new("prrt.index")
local function getIndex() return ex_index()() end
local ex_data_length = Field.new("prrt.data.length")
local function getDataLength() return ex_data_length()() end
local ex_red_baseseqno = Field.new("prrt.redundancy.baseSequenceNumber")
local function getRedBaseSeqNo() return ex_red_baseseqno()() end
local ex_red_n = Field.new("prrt.redundancy.n")
local function getRedN() return ex_red_n()() end
......@@ -109,7 +116,7 @@ local function dissect_data(buffer, pinfo, root)
tree:add(pf_data_groupRTprop, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4))
local label = "DATA Len=" .. getDataLength()
local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -121,7 +128,7 @@ local function dissect_redundancy(buffer, pinfo, root)
tree:add(pf_red_n, buffer:range(6,1))
tree:add(pf_red_k, buffer:range(7,1))
local label = "REDUNDANCY n=" .. getRedN() .. " k=" .. getRedK()
local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -139,7 +146,7 @@ local function dissect_feedback(buffer, pinfo, root)
tree:add(pf_fb_acktype, buffer:range(20,1))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2))
local label = "FEEDBACK"
local label = "[F]"
tree:set_text(label)
pinfo.cols.info:set(label)
end
......
......@@ -3,10 +3,7 @@ set (PRRT_SOURCES ../defines.h
clock.c clock.h
receiver.c receiver.h
socket.c socket.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
timer.c timer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
......@@ -16,9 +13,13 @@ set (PRRT_SOURCES ../defines.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP)
......
......@@ -6,9 +6,9 @@
prrtByteCount_t BBR_Inflight(BBR* bbr, double gain)
{
if (bbr->rtprop == Inf)
if (bbr->rtprop == RTprop_Inf)
return bbr->initial_cwnd; /* no valid RTT samples yet */
uint32_t quanta = 0;
uint32_t quanta = bbr->mps;
uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000));
return (uint32_t)(gain * estimated_bdp + quanta);
}
......@@ -188,13 +188,18 @@ void BBR_ModulateCwndForProbeRTT(BBR* bbr)
void BBR_ModulateCwndForRecovery(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t pipe, prrtByteCount_t delivered)
{
if (bytes_lost > 0)
bbr->cwnd = MAX(bbr->cwnd - bytes_lost, bbr->mps);
if (bytes_lost > 0) {
if (bbr->cwnd > bytes_lost) {
bbr->cwnd = MAX(bbr->cwnd - bytes_lost, bbr->mps);
} else {
bbr->cwnd = bbr->mps;
}
}
if (bbr->packet_conservation)
bbr->cwnd = MAX(bbr->cwnd, pipe + delivered);
}
void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
{
BBR_UpdateTargetCwnd(bbr);
......@@ -213,7 +218,7 @@ void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain)
{
double rate = (pacing_gain * ((double)bbr->bw));
debug(DEBUG_BBR, "Current rate: %u, Pacing gain: %f, BtlBw: %u, Calc Rate: %u, Filled pipe: %u", bbr->pacing_rate,
debug(DEBUG_BBR, "Current rate: %f, Pacing gain: %f, BtlBw: %u, Calc Rate: %f, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
bbr->pacing_rate = rate;
......@@ -306,7 +311,7 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
bbr->initial_cwnd = 4 * maximum_payload_size;
bbr->has_seen_rtt = false;
bbr->btlBwFilter = WindowedFilter_create(true, 10);
bbr->rtprop = Inf;
bbr->rtprop = RTprop_Inf;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
bbr->probe_rtt_done_stamp = 0;
bbr->probe_rtt_round_done = false;
......
......@@ -16,7 +16,7 @@
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define Inf UINT32_MAX
#define RTprop_Inf UINT32_MAX
enum bbr_state {
STARTUP,
......
......@@ -4,24 +4,27 @@
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "../types/lossStatistics.h"
#include "../types/block.h"
#include "../clock.h"
#include "../socket.h"
#include "dataReceiver.h"
static void retrieve_data_blocks(PrrtSocket *sock_ptr,
prrtSequenceNumber_t base_seqno,
uint8_t k,
const PrrtBlock *block) {
static void retrieve_data_packets_for_block(PrrtSocket *sock_ptr,
prrtSequenceNumber_t base_seqno,
uint8_t k,
const PrrtBlock *block) {
List *res = List_create();
prrtSequenceNumber_t last_seqno = (prrtSequenceNumber_t) (base_seqno + k - 1);
debug(DEBUG_BLOCK, "Size: %d", PrrtDataPacketStore_size(sock_ptr->dataPacketStore));
PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
(prrtSequenceNumber_t) (base_seqno + k - 1));
last_seqno);
debug(DEBUG_BLOCK, "Retrieve %d packets in range: %u-%u.", List_count(res), base_seqno, last_seqno);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
PrrtPacket *packet = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packet), "Insert failed!")
}
List_destroy(res);
return;
......@@ -32,18 +35,22 @@ static void retrieve_data_blocks(PrrtSocket *sock_ptr,
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
if (block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
pkt->sequenceNumber)) {
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
} else {
PrrtPacket_destroy(pkt);
bool data_relevant = PrrtDeliveredPacketTable_test_is_block_relevant(sock_ptr->deliveredPacketTable,
block->baseSequenceNumber,
block->codingParams->n);
if (data_relevant) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
pkt->sequenceNumber)) {
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
} else {
PrrtPacket_destroy(pkt);
}
}
}
PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
PrrtBlock_destroy(block);
}
......@@ -106,8 +113,10 @@ static bool send_feedback(PrrtSocket *sock_ptr,
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(buf);
PrrtPacket_destroy(feedback_pkt_ptr);
......@@ -121,16 +130,12 @@ static bool send_feedback(PrrtSocket *sock_ptr,
return false;
}
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
/* TODO: implement */
return false;
}
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t sentTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = sentTimestamp;
PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
debug(DEBUG_DATARECEIVER, "Timeout: %lu", payload->packetTimeout_us);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
......@@ -138,38 +143,45 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if (is_timeout(now, payload->packetTimeout_us)) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (PrrtTimestamp_cmp(now, payload->packetTimeout_us) > 0) {
debug(DEBUG_DATARECEIVER, "Timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
PrrtPacket_destroy(packet);
debug(DEBUG_DATARECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
} else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
packet->sequenceNumber)) {
debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno);
PrrtPacket_destroy(packet);
} else {
PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw);
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
// forward to application layer
debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
if (block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed: %d, %d", baseSequenceNumber,
seqno);
decode_block(sock_ptr, block);
if(PrrtBlock_insert_data_packet(block, reference)) {
decode_block(sock_ptr, block);
} else {
PrrtPacket_destroy(reference);
}
} else {
debug(DEBUG_DATARECEIVER, "Inserting data packet %d for later.", reference->sequenceNumber);
if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
debug(DEBUG_DATARECEIVER, "Failed to insert %d.", reference->sequenceNumber);
PrrtPacket_destroy(reference);
} else {
debug(DEBUG_DATARECEIVER, "Inserted %d.", reference->sequenceNumber);
}
}
// forward to application layer
debug(DEBUG_DATARECEIVER, "forward %u", seqno);
XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
}
return;
......@@ -201,9 +213,9 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
redundancyPayload->baseSequenceNumber);
PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
}
retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
retrieve_data_packets_for_block(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
}
if (PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(socket, block);
......@@ -213,15 +225,15 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
}
}
void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
void handle_feedback_packet(PrrtSocket *socket, PrrtPacket *packet, prrtTimestamp_t receiveTime) {
check(packet != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) packet->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
PrrtReceiver_on_ack(prrtSocket->receiver, feedbackPayload, receiveTime, rtt);
PrrtReceiver_on_ack(socket->receiver, feedbackPayload, receiveTime, rtt, socket->applicationConstraints);
return;
error:
......@@ -292,9 +304,7 @@ void *receive_data_loop(void *ptr) {
struct timespec packet_recv_timestamp;
uint64_t packet_recv_cyclestamp = 0;
receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
if (PrrtSocket_closing(s)) {
break;
}
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
......@@ -355,9 +365,8 @@ void *receive_data_loop(void *ptr) {
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(s);
debug(DEBUG_DATARECEIVER, "Cleaned");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
}
PrrtSocket_cleanup(s);
return NULL;
error:
PERROR("receive_data_loop() ended unexpectedly.");
......
......@@ -2,11 +2,13 @@
#include <netdb.h>
#include <string.h>
#include "../../defines.h"
#include "../timer.h"
#include "../receiver.h"
#include "../socket.h"
#include "../types/block.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "dataTransmitter.h"
#include <math.h>
......@@ -79,22 +81,23 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
memset(buf, 0, sizeof(buf));
prrtPacketLength_t length = PrrtPacket_size(packet);
prrtPacketLength_t payloadLength = packet->payloadLength;
bool paceSuccessful = PrrtSocket_pace(sock_ptr);
if (!paceSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");
int64_t space = PrrtReceiver_get_space(sock_ptr->receiver);
while (space < sock_ptr->maximum_payload_size) {
//PrrtReceiver_check_rto(sock_ptr->receiver, packet->sequenceNumber, PrrtPacket_type(packet));
PrrtReceiver_wait_for_space(sock_ptr->receiver);
if(PrrtSocket_closing(sock_ptr)) {
PrrtPacket_destroy(packet);
return false;
}
space = PrrtReceiver_get_space(sock_ptr->receiver);
bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
sock_ptr->applicationConstraints);
if(!waitSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Space available.");
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) {
......@@ -160,11 +163,59 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false;
}
typedef struct timer_arg {
PrrtSocket* socket;
PrrtBlock* block;
} RetransmissionTimerArgs;
void retransmission_round_handler(void *arg) {
uint8_t j;
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;
PrrtBlock *block = args->block;
PrrtSocket *socket = args->socket;
if(block->inRound > 0) {
PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
}
if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
PrrtBlock_destroy(block);
free(arg);
return;
}
uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
bool sendResult = send_packet(socket, red_pkt);
if(!sendResult) {
debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
PrrtBlock_destroy(block);
free(arg);
return;
}
}
block->inRound++;
PrrtTimerTask task = {
.arg = arg,
.fun = retransmission_round_handler
};
uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
prrtTimerDate deadline = abstime_from_now(waittime_us);
debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (sock_ptr->receiveBlock == NULL) {
sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
sock_ptr->receiveBlock->senderBlock = true;
}
packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);
......@@ -174,47 +225,48 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(sock_ptr->receiveBlock->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(sock_ptr->receiveBlock);
send_packet(sock_ptr, red_pkt);
int sendResult = send_packet(sock_ptr, packetToSend);
if (sendResult) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
PERROR("Failed to insert packet: %d", packet->sequenceNumber);
}
PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
args->block = sock_ptr->receiveBlock;
sock_ptr->receiveBlock = NULL;
args->socket = sock_ptr;
retransmission_round_handler(args);
}
} else {
PrrtPacket_destroy(packet);
}
}
void *PrrtDataTransmitter_send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtSocket *s = ptr;
while (1) {
ListNode *job;
do {
job = Pipe_pull(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) {
if (sock_ptr->receiveBlock != NULL) {
PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
job = Pipe_pull(s->sendDataQueue);
if (PrrtSocket_closing(s)) {
if (s->receiveBlock != NULL) {
PrrtBlock_destroy(s->receiveBlock);
s->receiveBlock = NULL;
}
return NULL;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
PrrtDataTransmitter_transmit(sock_ptr, packet);
PrrtDataTransmitter_transmit(s, packet);
}
}
......@@ -7,12 +7,17 @@
#include "stores/inFlightPacketStore.h"
#include "receiver.h"
prrtByteCount_t remove_packet_and_lost_data(PrrtReceiver *recv, PrrtPacket *packet,
PrrtInFlightPacketStore *inflightPacketStore,
prrtSequenceNumber_t seqnum) {
recv->packetTracking->pipe -= packet->payloadLength;
prrtByteCount_t lostBytes = PrrtInFlightPacketStore_remove_outstanding_packet(inflightPacketStore, seqnum);
prrtByteCount_t clean_rto(PrrtReceiver *recv, PrrtApplicationConstraints *constraints) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
uint32_t retransmission_delay = PrrtReceiver_get_retransmission_delay(recv, constraints);
prrtTimestamp_t deadline = now - retransmission_delay;
prrtByteCount_t lostBytes = 0;
lostBytes += PrrtInFlightPacketStore_clear_before(recv->dataInflightPacketStore, deadline);
lostBytes += PrrtInFlightPacketStore_clear_before(recv->redundancyInflightPacketStore, deadline);
recv->packetTracking->pipe -= lostBytes;
recv->packetTracking->bytes_lost = lostBytes;
return lostBytes;
}
......@@ -66,7 +71,8 @@ bool gnerate_rate_sample(PrrtRateSample *rateSample, PrrtPacketTracking *packetT
}
bool update_and_generate_rate_sample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
prrtTimestamp_t receiveTime, prrtTimedelta_t rtt,
PrrtApplicationConstraints *constraints) {
PrrtInFlightPacketStore *inflightPacketStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
inflightPacketStore = recv->dataInflightPacketStore;
......@@ -75,11 +81,13 @@ bool update_and_generate_rate_sample(PrrtReceiver *recv, prrtSequenceNumber_t se
} else return false;
bool result = false;
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet_by_seqno(inflightPacketStore, seqnum);
if(packet != NULL) {
update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking);
prrtByteCount_t lostBytes = remove_packet_and_lost_data(recv, packet, inflightPacketStore, seqnum);
recv->packetTracking->bytes_lost = lostBytes;
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(inflightPacketStore, seqnum);
prrtByteCount_t lostBytes = clean_rto(recv, constraints);
if (lostBytes > 0) {
BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
......@@ -132,6 +140,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount
recv->ai = info;
recv->bbr = BBR_Init(maximum_payload_size);
recv->closing = false;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
......@@ -188,45 +198,25 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
return false;
}
void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prrtPacketType_t packetType) {
prrtByteCount_t lostBytes = 0;
PrrtInFlightPacketStore *inflightPacketStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
inflightPacketStore = recv->dataInflightPacketStore;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
inflightPacketStore = recv->redundancyInflightPacketStore;
} else return;
void PrrtReceiver_rto_check(PrrtReceiver *recv, PrrtApplicationConstraints *constraints) {
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket* packet = PrrtInFlightPacketStore_get_first_packet_before(inflightPacketStore, seqNum);
if(packet != NULL) {
prrtTimeDifference_t diff = ((prrtTimeDifference_t) PrrtClock_get_current_time_us()) - (packet->sent_time + 3 * BBR_getRTProp(recv->bbr));
if(diff > 0) {
lostBytes += packet->payloadLength;
lostBytes += remove_packet_and_lost_data(recv, packet, inflightPacketStore, packet->sequenceNumber);
recv->packetTracking->bytes_lost = lostBytes;
if (lostBytes > 0) {
BBR_OnRTOLoss(recv->bbr);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
}
prrtByteCount_t lostBytes = clean_rto(recv, constraints);
if (lostBytes > 0) {
BBR_OnRTOLoss(recv->bbr);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
pthread_cond_broadcast(&recv->wait_for_space);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
return;