Commit b725a254 authored by Andreas Schmidt's avatar Andreas Schmidt

Merge branch 'develop' into timer

parents 0c850fd9 fb9d8722
Pipeline #2417 passed with stages
in 1 minute and 46 seconds
......@@ -8,6 +8,7 @@
* Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](http://xlap.larn.systems)
* [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping)
......@@ -25,7 +26,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
......@@ -44,7 +45,7 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=port)
s.connect(host, port)
for i in range(10):
......
......@@ -3,11 +3,12 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(("127.0.0.1", port))
while True:
d = s.recv()
d, addr = s.recv()
d = d.decode("utf8")
if d != "Close":
print d
print(d, addr)
else:
break
......@@ -3,10 +3,11 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port, isSender=True)
s.connect(host, port)
s = prrt.PrrtSocket(("127.0.1.1", localport), mtu=150)
s.connect((host, port))
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
......@@ -15,7 +15,7 @@ add_subdirectory(proto)
add_subdirectory(util)
add_executable(sender sender.c)
add_executable(receiver receiver.c)
add_executable(receiver receiver.c ../tests/common.h)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
include "posix/time.pxd"
include "sockets.pxd"
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.string cimport const_char
cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t:
pass
......@@ -26,18 +28,17 @@ cdef extern from "proto/channelStateInformation.h":
ctypedef prrtChannelStateInformation PrrtChannelStateInformation
cdef extern from "proto/codingParams.h":
ctypedef struct prrtCodingParams:
ctypedef struct prrtCodingConfiguration:
uint8_t k
uint8_t r
uint8_t n
uint8_t c
uint8_t *n_cycle
ctypedef prrtCodingParams PrrtCodingParams
PrrtCodingParams *PrrtCodingParams_create()
PrrtCodingParams *PrrtCodingParams_copy(PrrtCodingParams *cpar)
bint PrrtCodingParams_update(PrrtCodingParams *cpar, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
bint PrrtCodingParams_destroy(PrrtCodingParams *cpar)
ctypedef prrtCodingConfiguration PrrtCodingConfiguration
PrrtCodingConfiguration *PrrtCodingConfiguration_create()
PrrtCodingConfiguration *PrrtCodingConfiguration_copy(PrrtCodingConfiguration *cpar)
bint PrrtCodingConfiguration_destroy(PrrtCodingConfiguration *cpar)
cdef extern from "util/list.h":
cdef struct list:
......@@ -62,7 +63,7 @@ cdef extern from "proto/block.h":
cdef struct prrtBlock:
uint32_t data_count
uint32_t redundancy_count
PrrtCodingParams coding_params
PrrtCodingConfiguration coding_params
uint32_t largest_data_length
uint16_t baseSequenceNumber
List* data_blocks
......@@ -94,9 +95,7 @@ cdef extern from "proto/receiver.h":
cdef extern from "proto/socket.h":
cdef struct prrtSocket:
int dataSocketFd
int feedbackSocketFd
pthread_t receiveFeedbackThread
int socketFd
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
......@@ -120,34 +119,41 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(bint isSender, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, timespec* deadline) nogil
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s)
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
float PrrtSocket_get_plr(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
char *PrrtSocket_inet_ntoa(in_addr*)
uint16_t PrrtSocket_ntohs(uint16_t v)
cdef extern from "proto/stores/packetDeliveryStore.h":
ctypedef struct PrrtPacketDeliveryStore:
pass
......@@ -170,3 +176,4 @@ cdef extern from "util/pipe.h":
cdef extern from "util/mpsc_queue.h":
ctypedef struct MPSCQueue:
pass
......@@ -5,6 +5,13 @@
# define __builtin_ia32_rdtsc() (0)
#endif
#ifndef __cplusplus
# include <stdatomic.h>
#else
# include <atomic>
# define _Atomic(X) std::atomic< X >
#endif
#ifndef MAX
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#endif
......@@ -29,6 +36,8 @@
#define RRT_ALPHA 0.125
#define MIN_RTT 300
// Uncomment the line below if you are compiling on Windows.
// #define WINDOWS
#include <stdio.h>
......
add_library(PRRT ../defines.h
set (PRRT_SOURCES ../defines.h
block.c block.h
channelStateInformation.c channelStateInformation.h
clock.c clock.h
......@@ -9,7 +9,6 @@ add_library(PRRT ../defines.h
../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
......@@ -23,4 +22,10 @@ add_library(PRRT ../defines.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif()
add_library(PRRT ${PRRT_SOURCES})
target_link_libraries(PRRT rt)
......@@ -57,6 +57,14 @@ bool PrrtBlock_destroy(PrrtBlock *block_ptr)
PrrtPacket_destroy(pkt);
}
if(block_ptr->coder != NULL) {
PrrtCoder_destroy(block_ptr->coder);
}
if(block_ptr->codingParams != NULL) {
PrrtCodingConfiguration_destroy(block_ptr->codingParams);
}
List_destroy(block_ptr->dataPackets);
List_destroy(block_ptr->redundancyPackets);
free(block_ptr);
......@@ -68,17 +76,17 @@ bool PrrtBlock_destroy(PrrtBlock *block_ptr)
return false;
}
PrrtBlock * PrrtBlock_create(PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber)
PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, prrtSequenceNumber_t baseSequenceNumber)
{
PrrtBlock *block_ptr = calloc(1, sizeof(PrrtBlock));
check_mem(block_ptr);
block_ptr->coder = coder;
block_ptr->codingParams = cpar;
block_ptr->dataPackets = List_create();
block_ptr->redundancyPackets = List_create();
block_ptr->baseSequenceNumber = baseSequenceNumber;
block_ptr->largestPayloadLength = 0;
block_ptr->coder = NULL;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
......@@ -184,8 +192,6 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
gf **src = calloc(k, sizeof(gf *));
check_mem(src);
......@@ -203,7 +209,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
for(j = 0; j < r; j++) {
fec[j] = calloc(length, sizeof(gf));
check_mem(fec[j]);
PrrtCoder_encode(coder, src, fec[j], j + k, length);
PrrtCoder_encode(block_ptr->coder, src, fec[j], j + k, length);
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), block_ptr->baseSequenceNumber,
block_ptr->codingParams);
......@@ -237,8 +243,6 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
fec = calloc(k, sizeof(gf *));
check_mem(fec);
for(i = 0; i < k; i++) {
......@@ -254,7 +258,7 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
gather_data_packets(block_ptr, fec, idx_p);
gather_redundancy_packets(block_ptr, fec, idx_p);
check(PrrtCoder_decode(coder, fec, idx_p, length) == EXIT_SUCCESS, "Could not decode current block.");
check(PrrtCoder_decode(block_ptr->coder, fec, idx_p, length) == EXIT_SUCCESS, "Could not decode current block.");
for(j = 0; j < k; j++) {
if(idx_p[j] >= k) {
......
......@@ -8,21 +8,21 @@
#include "vdmcode/block_code.h"
typedef struct prrtBlock {
PrrtCodingParams* codingParams;
PrrtCodingConfiguration* codingParams;
prrtPacketLength_t largestPayloadLength;
prrtSequenceNumber_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
bool isCoded;
PrrtCoder *coder;
pthread_mutex_t lock;
PrrtCoder *coder;
} PrrtBlock;
/**
* Allocate space for a block.
*/
PrrtBlock * PrrtBlock_create(PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber);
PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, prrtSequenceNumber_t baseSequenceNumber);
/**
* Frees the PrrtBlock data structure.
......
......@@ -4,6 +4,7 @@
#include "../util/dbg.h"
#include "channelStateInformation.h"
#include "clock.h"
#include "receiver.h"
PrrtChannelStateInformation * PrrtChannelStateInformation_create()
{
......@@ -15,6 +16,15 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
csi->rtprop = TIMESTAMP_SPACE;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
csi->btlbw = 0;
csi->btlbw_next_round_delivered = 0;
csi->btlbw_round_start = false;
csi->btlbw_round_count = 0;
csi->btlbw_filter_length = 10;
csi->appLimited = 0;
csi->plr = 0.0;
return csi;
error:
......@@ -42,11 +52,16 @@ void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, pr
}
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate) {
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, prrtDeliveryRate_t rate) {
pthread_mutex_lock(&csi->lock);
csi->deliveryRate = rate;
pthread_mutex_unlock(&csi->lock);
}
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
pthread_mutex_lock(&csi->lock);
csi->appLimited = appLimited;
pthread_mutex_unlock(&csi->lock);
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{
......@@ -71,4 +86,12 @@ prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInforma
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
return csi->deliveryRate;
}
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {
return csi->btlbw;
}
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi) {
return csi->appLimited;
}
\ No newline at end of file
......@@ -6,22 +6,37 @@
typedef struct prrtChannelStateInformation {
pthread_mutex_t lock;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired;
prrtPacketLossRate_t plr;
prrtDeliveryRate_t deliveryRate;
prrtDeliveryRate_t btlbw;
prrtByteCount_t btlbw_next_round_delivered;
bool btlbw_round_start;
uint32_t btlbw_round_count;
uint8_t btlbw_filter_length;
bool appLimited;
} PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
......
......@@ -2,70 +2,37 @@
#include "../util/common.h"
#include "../util/dbg.h"
#include "../defines.h"
#include "codingParams.h"
PrrtCodingParams *PrrtCodingParams_create(void)
{
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
check_mem(cpar);
pthread_mutex_init(&cpar->lock, NULL);
cpar->coder = NULL;
uint8_t* n_cycle = calloc(1, sizeof(uint8_t));
n_cycle[0] = N_START - K_START;
PrrtCodingParams_update(cpar, K_START, N_START, 1, n_cycle);
PrrtCodingConfiguration *PrrtCodingConfiguration_create(uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
{
PrrtCodingConfiguration *cc = calloc(1, sizeof(PrrtCodingConfiguration));
check_mem(cc);
cc->k = k;
cc->n = n;
cc->r = cc->n - cc->k;
cc->c = c;
if(n_cycle != NULL) {
cc->n_cycle = (uint8_t*) realloc(cc->n_cycle, cc->c * sizeof(int8_t));
memcpy(cc->n_cycle, n_cycle, cc->c * sizeof(int8_t));
}
return cpar;
return cc;
error:
PERROR("Memory issue.%s","");
return NULL;
}
bool PrrtCodingParams_update(PrrtCodingParams *cpar, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
cpar->k = k;
cpar->n = n;
cpar->r = cpar->n - cpar->k;
cpar->c = c;
if(cpar->n_cycle != NULL) {
free(cpar->n_cycle);
}
cpar->n_cycle = n_cycle;
PrrtCoder_get_coder(&cpar->coder, n, k);
return true;
PrrtCodingConfiguration *PrrtCodingConfiguration_copy(PrrtCodingConfiguration *codingConfiguration) {
return PrrtCodingConfiguration_create(codingConfiguration->k, codingConfiguration->n, codingConfiguration->c, codingConfiguration->n_cycle);
}
PrrtCodingParams* PrrtCodingParams_copy(PrrtCodingParams *cpar) {
PrrtCodingParams *result = PrrtCodingParams_create();
check(pthread_mutex_lock(&cpar->lock) == EXIT_SUCCESS, "Lock failed.");
result->k = cpar->k;
result->n = cpar->n;
result->r = cpar->r;
result->c = cpar->c;
result->n_cycle = (uint8_t*) realloc(result->n_cycle, result->c * sizeof(int8_t));
memcpy(result->n_cycle, cpar->n_cycle, result->c * sizeof(int8_t));
// PrrtCoder_get_coder(&result->coder, result->n, result->k); // TODO
check(pthread_mutex_unlock(&cpar->lock) == EXIT_SUCCESS, "Unlock failed");
return result;
error:
PERROR("Could not copy%s", "");
return NULL;
}
bool PrrtCodingParams_destroy(PrrtCodingParams * cpar) {
pthread_mutex_destroy(&cpar->lock);
PrrtCoder_destroy(cpar->coder);
if(cpar->n_cycle != NULL) {
free(cpar->n_cycle);
bool PrrtCodingConfiguration_destroy(PrrtCodingConfiguration *codingConfiguration) {
if(codingConfiguration->n_cycle != NULL) {
free(codingConfiguration->n_cycle);
}
free(cpar);
free(codingConfiguration);
return true;
}
......@@ -4,23 +4,17 @@
#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
#include "vdmcode/block_code.h"
typedef struct prrtCodingParams {
pthread_mutex_t lock;
typedef struct prrtCodingConfiguration {
uint8_t k;
uint8_t r;
uint8_t n;
uint8_t c;
uint8_t* n_cycle;
} PrrtCodingConfiguration;
PrrtCoder* coder;
} PrrtCodingParams;
PrrtCodingParams * PrrtCodingParams_create(void);
PrrtCodingParams * PrrtCodingParams_copy(PrrtCodingParams *cpar);
bool PrrtCodingParams_update(PrrtCodingParams * cpar, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle);
bool PrrtCodingParams_destroy(PrrtCodingParams * cpar);
PrrtCodingConfiguration *PrrtCodingConfiguration_create(uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle);
PrrtCodingConfiguration *PrrtCodingConfiguration_copy(PrrtCodingConfiguration *codingConfiguration);
bool PrrtCodingConfiguration_destroy(PrrtCodingConfiguration *codingConfiguration);
#endif //PRRT_CODING_PARAMS_H
This diff is collapsed.
......@@ -75,7 +75,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
targetaddr.sin_port = htons((uint16_t) (remote_port));
struct hostent *hp;
hp = gethostbyname(remote_host);
......@@ -98,7 +98,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
free(buf);
......@@ -184,9 +184,10 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
redundancyPayload->baseSequenceNumber);
if (block == NULL) {
PrrtCodingParams_update(socket->codingParameters, redundancyPayload->k, redundancyPayload->n, 0, NULL);
PrrtCodingConfiguration* codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
redundancyPayload->n, 0, NULL);
block = PrrtBlock_create(socket->codingParameters, redundancyPayload->baseSequenceNumber);
block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber);
PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
}
......@@ -202,6 +203,32 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
return;
}
void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample->delivery_rate);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop ");
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr ");
return;
error:
PERROR("handle_feedback_packet failed.");
}
void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr,
uint64_t *packet_cyclestamp_ptr) {
......@@ -225,7 +252,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
*received_size = recvmsg(socket_ptr->dataSocketFd, &msg, 0);
*received_size = recvmsg(socket_ptr->socketFd, &msg, 0);
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
switch (cmsg->cmsg_type) {
......@@ -239,9 +266,9 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
}
}
} else {
*received_size = recvfrom(socket_ptr->dataSocketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
*received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
(struct sockaddr *) remote_ptr, remote_len_ptr);
clock_gettime(CLOCK_MONOTONIC, packet_timestamp_ptr);
clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
}
*packet_cyclestamp_ptr = __builtin_ia32_rdtsc();
}
......@@ -266,20 +293,17 @@ void *receive_data_loop(void *ptr) {
receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
memcpy(&packet->sender_addr, &remote, addrlen);
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
......@@ -292,8 +316,12 @@ void *receive_data_loop(void *ptr) {
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
kind = ts_redundancy_packet;
sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
kind = ts_feedback_packet;
}
if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp);
......@@ -303,8 +331,6 @@ void *receive_data_loop(void *ptr) {
XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart);
send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
if (packetType == PACKET_TYPE_DATA) {
handle_data_packet(sock_ptr, packet);
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
......@@ -312,15 +338,21 @@ void *receive_data_loop(void *ptr) {
} else {
goto error;
}
send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp);
PrrtPacket_destroy(packet);
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(sock_ptr);
debug(DEBUG_DATARECEIVER, "Cleaned");
}
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
}
......@@ -27,14 +27,14 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
msg.msg_control = control;
msg.msg_controllen = 0;
sendmsg(sock_ptr->dataSocketFd, &msg, 0);
sendmsg(sock_ptr->socketFd, &msg, 0);
*packet_clockstamp = __builtin_ia32_rdtsc();
msg.msg_control = control;
iov.iov_len = MAX_PAYLOAD_LENGTH;
do {
msg.msg_controllen = 1024;
got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
} while(got < 0 && errno == EAGAIN && check++ < check_max);
check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");
......@@ -58,9 +58,9 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
}
} else {
// TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.