Commit 1939c4ab authored by Andreas Schmidt's avatar Andreas Schmidt

Merge branch 'timestamping' into develop

parents 633a0120 130b31a5
Pipeline #943 failed with stages
in 3 minutes and 43 seconds
......@@ -26,21 +26,23 @@ build_prrt:
- CC=gcc-5 CXX=g++-5 cmake .
- make
build_container:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test_prrt_mem:
stage: test
tags:
- valgrind
script:
- which valgrind
- export prrtResult=0
- valgrind --tool=memcheck --track-origins=yes --leak-check=full --error-exitcode=1 ./bin/receiver 5000 & export prrtReceiverProcessID=$!
- valgrind --tool=memcheck --track-origins=yes --leak-check=full --error-exitcode=1 ./bin/sender 100 || prrtResult=$?; kill -INT $prrtReceiverProcessID; wait $prrtReceiverProcessID || prrtResult=$?
- exit $prrtResult
#performanceEvaluation:
# script:
# - python3 setup.py build_ext --inplace
# - make perftest
- bash memtest.sh
test_prrt_functional:
stage: test
......@@ -65,3 +67,4 @@ clean_pypirc:
when: always
script:
- rm -vf ~/.pypirc
\ No newline at end of file
......@@ -5,7 +5,8 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/build)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/build)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -pedantic " )
set(CMAKE_C_FLAGS "-O2 -Wall -std=gnu11 -D_GNU_SOURCE" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE" )
set(CMAKE_CXX_FLAGS_DEBUG "-O2 -Wall -ggdb" )
set(CMAKE_CXX_FLAGS_RELEASE "-Os -Wall" )
......@@ -21,5 +22,6 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_subdirectory(prrt)
add_subdirectory(tests)
add_custom_target(funtest ./bin/prrtTests)
add_custom_target(perftest python3 eval.py)
add_custom_target(funtest COMMAND ./bin/prrtTests)
add_custom_target(perftest COMMAND python3 eval.py)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
FROM gcc:5
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y cmake
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
WORKDIR /prrt
RUN cmake . \
&& make
ENV PATH /prrt/bin:$PATH
WORKDIR /prrt/bin
VOLUME /output
# Predictable Reliable Realtime Transport
## Docker
sudo docker run --rm --name=prrt_recv --net=host prrt receiver 5000 127
sudo docker run --rm --name=prrt_send --net=host prrt sender 127.0.0.1 5000 127
#!/usr/bin/env bash
rm prrt/prrt.c
python3 setup.py build_ext --inplace
cp prrt*.so ./build/
\ No newline at end of file
cp prrt*.so ./build/
#!/bin/sh
which valgrind
check() {
#echo "CHECK $1 $2"
if [ "$1" -ne 0 ]; then
echo >&2 "$2"
exit $1
fi
}
to="timeout 60"
valgrind="valgrind --tool=memcheck --track-origins=yes --leak-check=full --error-exitcode=1"
$to $valgrind ./bin/receiver 5000 127 receiver.csv &
$to $valgrind ./bin/sender 127.0.0.1 5000 127 sender.csv
check "$?" "sender failed"
wait
check "$?" "receiver failed"
......@@ -2,9 +2,7 @@ add_subdirectory(proto)
add_subdirectory(util)
add_executable(sender sender.c)
set_property(TARGET sender PROPERTY C_STANDARD 99)
add_executable(receiver receiver.c)
set_property(TARGET receiver PROPERTY C_STANDARD 99)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
......@@ -6,13 +6,17 @@ add_library(PRRT ../defines.h
packet.c packet.h
receiver.c receiver.h
socket.c socket.h
timestamp.c timestamp.h
applicationConstraints.c applicationConstraints.h
vdmcode/block_code.c vdmcode/block_code.h
stores/forwardPacketTable.c stores/forwardPacketTable.h
stores/lossGatherer.c stores/lossGatherer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h processes/cleaner.c processes/cleaner.h stores/repairBlockStore.c stores/repairBlockStore.h stores/packetTimeoutTable.c stores/packetTimeoutTable.h stores/dataPacketStore.c stores/dataPacketStore.h)
processes/dataTransmitter.c processes/dataTransmitter.h
processes/cleaner.c processes/cleaner.h
stores/repairBlockStore.c stores/repairBlockStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h)
set_property(TARGET PRRT PROPERTY C_STANDARD 99)
target_link_libraries(PRRT rt)
\ No newline at end of file
target_link_libraries(PRRT rt)
......@@ -7,7 +7,7 @@
#include "block.h"
#include "codingParams.h"
void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
uint32_t i;
uint32_t m = 0;
......@@ -27,7 +27,7 @@ void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *
}
}
void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
static void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
PrrtPacket *packet = current->value;
......@@ -36,7 +36,7 @@ void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
}
}
void clear_list(gf *const *src, uint8_t k)
static void clear_list(gf *const *src, uint8_t k)
{
int j = 0;
for(j = 0; j < k; j++) {
......@@ -224,7 +224,7 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length,
(prrtSequenceNumber_t) (baseSequenceNumber + j), 0);
if(PrrtBlock_insert_data_packet(block_ptr, packet) == false) {
debug("Tried to insert unnecessary packet.");
debug(DEBUG_BLOCK, "Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
}
}
......
......@@ -69,15 +69,20 @@ bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
return true;
}
PrrtClock *PrrtClock_create()
void PrrtClock_init(PrrtClock *clock)
{
PrrtClock *clock = (PrrtClock *) calloc(1, sizeof(PrrtClock));
check_mem(clock);
clock->meanDeviation = 0;
clock->virtualTime = 0;
clock->lastMeasurement = 0;
clock->skew = 0;
}
PrrtClock *PrrtClock_create()
{
PrrtClock *clock = (PrrtClock *) calloc(1, sizeof(PrrtClock));
check_mem(clock);
PrrtClock_init(clock);
return clock;
......@@ -90,4 +95,4 @@ bool PrrtClock_destroy(PrrtClock *clock)
{
free(clock);
return true;
}
\ No newline at end of file
}
......@@ -17,6 +17,7 @@ typedef struct prrtClock {
PrrtClock* PrrtClock_create(void);
void PrrtClock_init(PrrtClock *clock);
bool PrrtClock_destroy(PrrtClock* clock);
prrtTimestamp_t PrrtClock_get_current_time_us(void);
......
......@@ -10,19 +10,19 @@
#include "packet.h"
#include "clock.h"
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet);
static void *encode_general_header(void *buf_ptr, const PrrtPacket *packet);
void *encode_data_header(void *buf_ptr, const void *payload);
static void *encode_data_header(void *buf_ptr, const void *payload);
void *encode_redundancy_header(void *buf_ptr, const void *payload);
static void *encode_redundancy_header(void *buf_ptr, const void *payload);
void *encode_feedback_header(void *buf_ptr, const void *payload);
static void *encode_feedback_header(void *buf_ptr, const void *payload);
void *decode_data_header(void *dstBuffer, const void *srcBuffer);
static void *decode_data_header(void *dstBuffer, const void *srcBuffer);
void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer);
static void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer);
void *decode_feedback_header(void *dstBuffer, const void *srcBuffer);
static void *decode_feedback_header(void *dstBuffer, const void *srcBuffer);
prrtPacketType_t PrrtPacket_type(PrrtPacket *packet_ptr)
{
......@@ -131,7 +131,7 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original)
return NULL;
}
PrrtPacket *create_header(uint8_t priority, prrtSequenceNumber_t seqno, prrtPacketLength_t size, uint8_t type, uint8_t index)
static PrrtPacket *create_header(uint8_t priority, prrtSequenceNumber_t seqno, prrtPacketLength_t size, uint8_t type, uint8_t index)
{
PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
check_mem(packet);
......@@ -437,6 +437,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->packetTimeout_us = dataPayload->timestamp + targetDelay;
debug(DEBUG_PACKET, "timeout = %lu + %lu", (unsigned long) dataPayload->timestamp, (unsigned long) targetDelay);
dataPayload->groupRTT_us = 0;
dataPayload->decodingTimeout_us = 150; // TODO: payload->decodingTimeout_us
dataPayload->feedbackTimer_us = 170; // TODO: payload->feedback_timer
......@@ -504,4 +505,4 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, p
error:
return NULL;
}
\ No newline at end of file
}
#include <unistd.h>
#include <stdatomic.h>
#include "../socket.h"
#include"../../util/dbg.h"
#include "cleaner.h"
......@@ -13,7 +14,7 @@ void *cleanup(void *ptr)
if(socket->packetTimeoutTable != NULL) {
List *expired_packets = PrrtPacketTimeoutTable_expire_packets(socket->packetTimeoutTable,
PrrtClock_get_prrt_time_us(
socket->clock));
&socket->clock));
if(List_count(expired_packets) > 0) {
PrrtPacket *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = first->sequenceNumber - first->index - SEQNO_SPACE/4;
......@@ -38,6 +39,8 @@ void *cleanup(void *ptr)
}
}
if (!atomic_load_explicit(&socket->closing, memory_order_acquire))
break;
usleep(1000000);
}
......
......@@ -9,7 +9,7 @@
#include "../socket.h"
#include "dataReceiver.h"
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
{
List *res = List_create();
......@@ -27,7 +27,7 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno,
List_destroy(res);
}
void decode_block(PrrtSocket *socket, PrrtBlock *block)
static void decode_block(PrrtSocket *socket, PrrtBlock *block)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
......@@ -37,7 +37,7 @@ void decode_block(PrrtSocket *socket, PrrtBlock *block)
if(PrrtForwardPacketTable_test_set_is_number_relevant(socket->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&socket->inQueueFilledMutex) == 0, "Lock failed.");
List_push(socket->inQueue, pkt);
check(pthread_cond_signal(&socket->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_cond_broadcast(&socket->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&socket->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
......@@ -54,7 +54,7 @@ void decode_block(PrrtSocket *socket, PrrtBlock *block)
PERROR("Decoding failed.%s", "")
}
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
static bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
{
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
......@@ -94,24 +94,34 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
return false;
}
void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote)
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to)
{
PrrtPacketDataPayload *payload = packet->payload;
/* TODO: implement */
return false;
}
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote)
{
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t dataTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = dataTimestamp;
PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
PrrtPacket *copiedPacket = PrrtPacket_copy(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, copiedPacket), "Could not insert data packet.");
prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackStart);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackEnd);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
if(now > payload->packetTimeout_us) {
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if(is_timeout(now, payload->packetTimeout_us)) {
PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->sequenceNumber);
// TODO: note this as loss
PrrtPacket_destroy(packet);
debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now, (unsigned long) payload->packetTimeout_us);
}
else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
packet->sequenceNumber) ==
......@@ -133,7 +143,9 @@ void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockadd
}
// forward to application layer
debug(DEBUG_DATARECEIVER, "forward %u", seqno);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
......@@ -146,7 +158,7 @@ void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockadd
return;
}
void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet)
static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet)
{
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
......@@ -187,19 +199,46 @@ void *receive_data_loop(void *ptr)
PrrtSocket *sock_ptr = ptr;
while(1) {
PrrtTimestampPlaceholder tsph1;
PrrtTimestampPlaceholder tsph2;
PrrtTimestampPlaceholder tsph3;
PrrtTimestampPlaceholderInitialize(&tsph1);
PrrtTimestampPlaceholderInitialize(&tsph2);
PrrtTimestampPlaceholderInitialize(&tsph3);
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
PrrtTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
PrrtTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
if(packetType == PACKET_TYPE_DATA) {
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph1);
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph2);
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph3);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketStart);
handle_data_packet(sock_ptr, packet, remote);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketEnd);
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph1);
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph2);
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph3);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketStart);
handle_redundancy_packet(sock_ptr, packet);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketEnd);
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
......
......@@ -11,7 +11,7 @@
#include "dataTransmitter.h"
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
{
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
......@@ -19,26 +19,43 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
break;
case PACKET_TYPE_REDUNDANCY:
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
break;
case PACKET_TYPE_REPETITION:
case PACKET_TYPE_FEEDBACK:
case PACKET_TYPE_PRESENT_REDUNDANCY:
case PACKET_TYPE_CHANNEL_FEEDBACK:
default:;
}
// SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
PrrtReceiver *recv = cur->value;
struct hostent *hp;
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons(recv->port);
hp = gethostbyname(recv->host_name);
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
// TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
length, "Sendto failed.");
usleep(1);
}
switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
break;
case PACKET_TYPE_REDUNDANCY:
PrrtTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
break;
case PACKET_TYPE_REPETITION:
case PACKET_TYPE_FEEDBACK:
case PACKET_TYPE_PRESENT_REDUNDANCY:
case PACKET_TYPE_CHANNEL_FEEDBACK:
default:;
}
PrrtPacket_destroy(packet);
......@@ -57,21 +74,19 @@ void *send_data_loop(void *ptr)
while(1) {
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
while(List_count(sock_ptr->outQueue) == 0) {
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
if(sock_ptr->closing) {
if (atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
if(block != NULL) {
PrrtBlock_destroy(block);
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
return NULL;
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0,
"Cond wait failed.");
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if(block == NULL) {
block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
}
......@@ -83,16 +98,20 @@ void *send_data_loop(void *ptr)
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if(PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyBlocks = List_count(block->redundancyPackets);
for(j = 0; j < redundancyBlocks; j++) {
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for(j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
......
......@@ -10,7 +10,7 @@
#include "../socket.h"
#include "feedbackReceiver.h"
void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
PrrtPacket *prrtPacket = NULL;
......@@ -50,18 +50,14 @@ void *receive_feedback_loop(void *ptr)
{
PrrtSocket *sock_ptr = ptr;
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
while(sock_ptr->closing == false) {
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
while (!atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
usleep(1);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
return NULL;
error:
PERROR("Feedback reception failed.%s","");
return NULL;
}
\ No newline at end of file
// error:
// PERROR("Feedback reception failed.%s","");
// return NULL;
}
......@@ -8,18 +8,34 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port)
PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
check_mem(recv);
recv->host_name = strdup(host);
check_mem(recv->host_name);
recv->port = port;
struct addrinfo *info;
struct addrinfo hints;
char portstr[sizeof(port)*8+1];
snprintf(portstr, sizeof(portstr), "%u", (unsigned int) port);
memset(&hints, 0x0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
check(0 == getaddrinfo(host, portstr, &hints, &info), "getaddrinfo");
recv->ai = info;
return recv;
error:
if(recv != NULL) { free(recv); }
PERROR("Memory issue.%s","");
if (recv != NULL) {
free((void *) recv->host_name);
free(recv);
}
//PERROR("Memory issue.%s","");
return NULL;
}
bool PrrtReceiver_destroy(PrrtReceiver *receiver)
{
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
free(receiver);
return true;
......
......@@ -3,10 +3,14 @@
#include <stdbool.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
typedef struct prrtReceiver {
const char* host_name;
uint16_t port;
struct addrinfo *ai;
} PrrtReceiver;