Commit 4288f189 authored by Andreas Schmidt's avatar Andreas Schmidt

Drop CVs from receiver.

parent 1fa687f4
Pipeline #2126 failed with stages
in 21 seconds
......@@ -134,7 +134,7 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
if (is_timeout(now, payload->packetTimeout_us)) {
PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
PrrtPacket_destroy(packet);
debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
debug(DEBUG_DATARECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
} else if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
packet->sequenceNumber) ==
......
......@@ -45,10 +45,6 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
check(pthread_cond_init(&recv->recordNotFoundCv, NULL) == 0, "recordNotFound init failed.");
return recv;
error:
......@@ -89,8 +85,6 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
check(pthread_cond_destroy(&receiver->pipeNotFullCv) == 0, "pipeNotFullCv destroy failed.");
check(pthread_cond_destroy(&receiver->recordNotFoundCv) == 0, "recordNotFoundCv destroy failed.");
free(receiver);
return true;
......@@ -174,13 +168,15 @@ void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packe
packet->delivered_time = 0;
}
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length) {
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
debug(DEBUG_RECEIVER, "OnApplicationWrite: %u, %d", sequenceNumber, send_queue_length);
PrrtPacketTracking *tracking = receiver->packetTracking;
if(send_queue_length == 0 && tracking->pipe < BBR_getCwnd(receiver->bbr)) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
}
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
debug(DEBUG_RECEIVER, "OnApplicationWrite done.", send_queue_length);
return;
error:
......@@ -189,7 +185,7 @@ void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_que
}
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
//debug(DEBUG_RECEIVER, "PrrtReceiver_on_ack");
PrrtChannelStateInformation_update_plr(receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime, rtt);
......@@ -244,33 +240,31 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
inflightPacketStore = recv->redundancyInflightPacketStore;
} else return false;
bool result = false;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
while (packet == NULL) {
pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum);
}
prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum);
recv->packetTracking->bytes_lost = lostBytes;
if(packet != NULL) {
prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum);
recv->packetTracking->bytes_lost = lostBytes;
if (lostBytes > 0) {
BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
if (lostBytes > 0) {
BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->prior_inflight = recv->packetTracking->pipe;
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->prior_inflight = recv->packetTracking->pipe;
if(recv->rateSample != NULL) {
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
if(recv->rateSample != NULL) {
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
}
}
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
return result;
......@@ -302,15 +296,9 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->recordNotFoundCv) == 0, "Signal failed.");
return;
error:
PERROR("Lock error.%s", "");
}
void PrrtReceiver_interrupt(PrrtReceiver *recv) {
pthread_cond_broadcast(&recv->recordNotFoundCv);
pthread_cond_broadcast(&recv->pipeNotFullCv);
}
}
\ No newline at end of file
......@@ -16,8 +16,6 @@ typedef struct prrtReceiver {
uint16_t port;
struct addrinfo *ai;
PrrtChannelStateInformation *csi;
pthread_cond_t pipeNotFullCv;
pthread_cond_t recordNotFoundCv;
pthread_mutex_t lock;
BBR* bbr;
......@@ -38,11 +36,9 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPacketType_t param);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i, prrtSequenceNumber_t i1);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);
void PrrtReceiver_interrupt(PrrtReceiver *recv);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H
......@@ -265,7 +265,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Pipe_push(s->sendDataQueue, &packet->asListNode);
PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue));
PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
......@@ -384,10 +384,6 @@ int PrrtSocket_interrupt(PrrtSocket *s) {
PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
}
if(s->receiver != NULL) {
PrrtReceiver_interrupt(s->receiver);
}
if(s->sendDataQueue != NULL) {
Pipe_wake(s->sendDataQueue);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment