Commit 00f430d8 authored by Andreas Schmidt's avatar Andreas Schmidt

Multiplex packets by type and not socket.

* Single thread for reception of data and feedback.
parent 973501b7
Pipeline #2202 failed with stages
in 17 seconds
......@@ -94,7 +94,6 @@ cdef extern from "proto/receiver.h":
cdef extern from "proto/socket.h":
cdef struct prrtSocket:
int socketFd
pthread_t receiveFeedbackThread
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
......
......@@ -8,7 +8,6 @@ add_library(PRRT ../defines.h
../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
......
......@@ -203,6 +203,32 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
return;
}
void handle_feedback_packet(const PrrtSocket *prrtSocket, const PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop ");
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr ");
return;
error:
PERROR("handle_feedback_packet failed.");
}
void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr,
uint64_t *packet_cyclestamp_ptr) {
......@@ -255,6 +281,7 @@ void *receive_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
while (1) {
debug(DEBUG_DATARECEIVER, "About to receive.");
XlapTimestampPlaceholder tsph1;
XlapTimestampPlaceholder tsph2;
XlapTimestampPlaceholder tsph3;
......@@ -267,20 +294,16 @@ void *receive_data_loop(void *ptr) {
receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
......@@ -293,8 +316,12 @@ void *receive_data_loop(void *ptr) {
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
kind = ts_redundancy_packet;
sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
kind = ts_feedback_packet;
}
if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp);
......@@ -313,14 +340,19 @@ void *receive_data_loop(void *ptr) {
}
send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp);
PrrtPacket_destroy(packet);
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(sock_ptr);
debug(DEBUG_DATARECEIVER, "Cleaned");
}
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
}
......@@ -152,6 +152,7 @@ void *send_data_loop(void *ptr) {
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
......@@ -175,6 +176,5 @@ void *send_data_loop(void *ptr) {
PrrtBlock_destroy(block);
block = NULL;
}
PrrtSocket_cleanup(sock_ptr);
}
}
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/poll.h>
#include "../../defines.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../clock.h"
#include "../socket.h"
#include "feedbackReceiver.h"
static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
PrrtPacket *prrtPacket = NULL;
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
struct pollfd fds;
int timeout_msecs = 1000;
fds.fd = prrtSocket->socketFd;
fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs);
check(n >= 0, "Select failed.");
if(n == 0) {
return;
}
prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->socketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
prrtPacket = calloc(1, sizeof(PrrtPacket));
check_mem(prrtPacket);
PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
error:
if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
}
void *receive_feedback_loop(void *ptr)
{
PrrtSocket *sock_ptr = ptr;
while (!atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
}
return NULL;
// error:
// PERROR("Feedback reception failed.%s","");
// return NULL;
}
#ifndef PRRT_FEEDBACK_RECEIVER_H
#define PRRT_FEEDBACK_RECEIVER_H
void * receive_feedback_loop(void *ptr);
#endif //PRRT_FEEDBACK_RECEIVER_H
......@@ -39,9 +39,11 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv->ai = info;
check(pthread_mutex_init(&recv->lock, NULL) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
check(pthread_cond_init(&recv->recordNotFoundCv, NULL) == 0, "recordNotFound init failed.");
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");
return recv;
......@@ -79,8 +81,6 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
check(pthread_cond_destroy(&receiver->pipeNotFullCv) == 0, "pipeNotFullCv destroy failed.");
check(pthread_cond_destroy(&receiver->recordNotFoundCv) == 0, "recordNotFoundCv destroy failed.");
free(receiver);
return true;
......@@ -161,19 +161,17 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return false;
bool result = false;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
while (packet == NULL) {
pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
if (packet != NULL) {
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
return result;
......@@ -204,15 +202,9 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->recordNotFoundCv) == 0, "Signal failed.");
return;
error:
PERROR("Lock error.%s", "");
}
void PrrtReceiver_interrupt(PrrtReceiver *recv) {
pthread_cond_broadcast(&recv->recordNotFoundCv);
pthread_cond_broadcast(&recv->pipeNotFullCv);
}
......@@ -35,8 +35,6 @@ typedef struct prrtReceiver {
uint16_t port;
struct addrinfo *ai;
PrrtChannelStateInformation *csi;
pthread_cond_t pipeNotFullCv;
pthread_cond_t recordNotFoundCv;
pthread_mutex_t lock;
PrrtInFlightPacketStore *dataPacketStates;
......@@ -55,8 +53,6 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
void PrrtReceiver_on_application_write(PrrtReceiver* receiver);
void PrrtReceiver_interrupt(PrrtReceiver *recv);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H
......@@ -14,7 +14,6 @@
#include "../util/common.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
#include "processes/feedbackReceiver.h"
#include "stores/deliveredPacketTable.h"
#include "types/packetTimeout.h"
#include "socket.h"
......@@ -182,33 +181,22 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
}
s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveFeedbackThreadAttr);
s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->sendDataThreadAttr);
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
pin_thread_to_core(s->sendDataThreadAttr, 2);
pin_thread_to_core(s->receiveDataThreadAttr, 2);
pin_thread_to_core(s->sendDataThreadAttr, 1);
}
check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
receive_feedback_loop,
(void *) s) ==
EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
(void *) s) ==
EXIT_SUCCESS,
"Cannot create send thread.");
(void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveDataThreadAttr, 3);
}
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
(void *) s) ==
EXIT_SUCCESS,
"Cannot create data receiving thread.");
(void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");
s->isBound = true;
......@@ -322,10 +310,6 @@ int PrrtSocket_interrupt(PrrtSocket *s) {
PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
}
if(s->receiver != NULL) {
PrrtReceiver_interrupt(s->receiver);
}
if(s->sendDataQueue != NULL) {
Pipe_wake(s->sendDataQueue);
}
......@@ -344,13 +328,6 @@ int PrrtSocket_interrupt(PrrtSocket *s) {
s->receiveDataThread = 0;
}
if (s->receiveFeedbackThread != 0) {
check(pthread_join(s->receiveFeedbackThread, res) == 0, "Join failed.");
pthread_attr_destroy(s->receiveFeedbackThreadAttr);
s->receiveFeedbackThread = 0;
}
return EXIT_SUCCESS;
error:
......@@ -432,11 +409,6 @@ int PrrtSocket_close(PrrtSocket *s) {
free(s->receiveDataThreadAttr);
}
if (s->receiveFeedbackThreadAttr != NULL) {
free(s->receiveFeedbackThreadAttr);
}
close(s->socketFd);
debug(DEBUG_SOCKET, "Socket closed.");
return 0;
......@@ -481,11 +453,13 @@ PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
bool PrrtSocket_cleanup(PrrtSocket *s) {
debug(DEBUG_CLEANUP, "PrrtSocket_cleanup");
if (s->packetTimeoutTable != NULL) {
List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
PrrtClock_get_prrt_time_us(
&s->clock));
uint32_t expired_count = List_count(expired_packets);
debug(DEBUG_CLEANUP, "EXPIRED");
if (expired_count > 0) {
PrrtPacketTimeout *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
......@@ -517,13 +491,16 @@ bool PrrtSocket_cleanup(PrrtSocket *s) {
}
List_destroy(expired_packets);
}
debug(DEBUG_CLEANUP, "Expire block range.");
if(s->deliveredPacketTable != NULL) {
prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
(prrtSequenceNumber_t) (current_start - 1));
}
debug(DEBUG_CLEANUP, "Loss stats.");
s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
debug(DEBUG_CLEANUP, "PrrtSocket_cleanup done");
return true;
}
......
......@@ -28,7 +28,6 @@
typedef struct prrtSocket {
int socketFd;
pthread_t receiveFeedbackThread;
struct sockaddr_in *address;
bool isBound;
......@@ -41,7 +40,6 @@ typedef struct prrtSocket {
pthread_t receiveDataThread;
PrrtPacketDeliveryStore* packetDeliveryStore;
PrrtPacketTimeoutTable *packetTimeoutTable;
PrrtDataPacketStore *dataPacketStore;
PrrtRepairBlockStore *repairBlockStore;
......@@ -73,7 +71,6 @@ typedef struct prrtSocket {
_Atomic (XlapTimestampTable *) tstable[2];
pthread_attr_t *receiveFeedbackThreadAttr;
pthread_attr_t *sendDataThreadAttr;
pthread_attr_t *receiveDataThreadAttr;
......
......@@ -38,9 +38,6 @@ cdef extern from "proto/types/packetTimeout.c":
cdef extern from "proto/types/lossStatistics.c":
pass
cdef extern from "proto/processes/feedbackReceiver.c":
pass
cdef extern from "proto/processes/dataReceiver.c":
pass
......
......@@ -15,6 +15,7 @@
#define DEBUG_SENDER 0
#define DEBUG_SOCKET 0
#define DEBUG_DATARECEIVER 0
#define DEBUG_CLEANUP 1
#define DEBUG_DATATRANSMITTER 0
#define DEBUG_HARDSTAMPING 0
#define DEBUG_FEEDBACK 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment