Commit b637fe3c authored by Andreas Schmidt's avatar Andreas Schmidt

Hardware timestamping support.

parent a45e3753
Pipeline #1547 passed with stages
in 1 minute and 22 seconds
# Predictable Reliable Realtime Transport
## Docker
sudo docker run --rm --name=prrt_recv --net=host prrt receiver 5000 127
sudo docker run --rm --name=prrt_send --net=host prrt sender 127.0.0.1 5000 127
bridge=docker network create --subnet="10.5.1.0/24" prrt | cut -c1-12
ovs-vsctl add-port of-switch br-$bridge
docker run --rm --name=prrt_recv --network="prrt" -v=/opt/prrt:/output --ip=10.5.1.52 --cap-add NET_ADMIN git.nt.uni-saarland.de:4567/larn/prrt:develop receiver 5000 127
docker run --rm --name=prrt_send --network="prrt" -v=/opt/prrt:/output --ip=10.5.1.51 --cap-add NET_ADMIN git.nt.uni-saarland.de:4567/larn/prrt:develop sender 10.50.1.52 5000 127 rate 1mbit
# Predictably Reliable Real-Time (PRRT)
## Hardware Timestamping
* Hardware timestamping can be enabled per socket by calling `PrrtSocket_enable_hardware_timestamping()` and specifying the interface.
* Requirements are a kernel that supports physical timestamping. Instructions can be found
### Compiling a hardware tmestamping enabled kernel on Ubuntu
* A detailed tutorial on kernel compilation can be found [here](https://help.ubuntu.com/community/Kernel/Compile).
* Create and enter a new folder, e.g. in your home directory that will store the newly compiled kernel.
* Run the following commands to download all resources required:
```bash
sudo apt install libncurses5 libncurses5-dev
sudo apt build-dep linux-image-`uname -r`
apt source linux-image-`uname -r`
cd linux-$(uname -r | awk '{split($1,a, "-"); print a[1]}')
```
* Copy the current kernel configuration to the current folder:
```bash
cp -vi /boot/config-`uname -r` .config
```
* Edit the `.config` file and ensure the following line is present (and the option is set to `y`):
```bash
...
CONFIG_NETWORK_PHY_TIMESTAMPING=y
...
```
* Call `make deb-pkg` (ideally with `-j5` or some other number to speed up the compilation by using multiple cores).
* Install the `linux-headers-*.deb` and `linux-image-*.deb` package via `dpkg -i` (where * stands for the respective kernel version).
* Reboot the system.
## Docker
The following shows how to run the PRRT sender and receiver on a host with OpenvSwitch.
```bash
bridge=$(docker network create --subnet="10.5.1.0/24" prrt | cut -c1-12)
ovs-vsctl add-port of-switch br-$bridge
docker run --rm --name=prrt_recv --network="prrt" -v=/opt/prrt:/output --ip=10.5.1.52 --cap-add NET_ADMIN docker.nt.uni-saarland.de/larn/prrt:develop receiver 5000 127
docker run --rm --name=prrt_send --network="prrt" -v=/opt/prrt:/output --ip=10.5.1.51 --cap-add NET_ADMIN docker.nt.uni-saarland.de/larn/prrt:develop sender 10.50.1.52 5000 127 rate 1mbit
```
\ No newline at end of file
#include <sys/time.h>
#include <stddef.h>
#include <stdlib.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include "clock.h"
#include "packet.h"
prrtTimestamp_t PrrtClock_get_current_time_us()
{
......
......@@ -27,8 +27,8 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
#define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB))
#define PrrtClock_TimespecToPrrtTimestamp(timespec) ((prrtTimestamp_t) (1000000 * timespec.tv_sec + timespec.tv_nsec / 1000))
#endif //PRRT_CLOCK_H
......@@ -24,23 +24,19 @@ static void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer);
static void *decode_feedback_header(void *dstBuffer, const void *srcBuffer);
prrtPacketType_t PrrtPacket_type(PrrtPacket *packet_ptr)
{
prrtPacketType_t PrrtPacket_type(PrrtPacket *packet_ptr) {
return (prrtPacketType_t) ((packet_ptr->type_priority >> 4) & 0x0F);
}
uint8_t PrrtPacket_priority(PrrtPacket *packet_ptr)
{
uint8_t PrrtPacket_priority(PrrtPacket *packet_ptr) {
return (uint8_t) (packet_ptr->type_priority & 0x0F);
}
prrtPacketLength_t PrrtPacket_size(PrrtPacket *packet_ptr)
{
prrtPacketLength_t PrrtPacket_size(PrrtPacket *packet_ptr) {
return (prrtPacketLength_t) (packet_ptr->payloadLength + PRRT_PACKET_GENERAL_HEADER_SIZE);
}
int PrrtPacket_print(PrrtPacket *packet_ptr)
{
int PrrtPacket_print(PrrtPacket *packet_ptr) {
printf(" 0 1 2 3\n"
" 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1\n"
"+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
......@@ -50,7 +46,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr)
packet_ptr->sequenceNumber);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
if(type == PACKET_TYPE_DATA) {
if (type == PACKET_TYPE_DATA) {
PrrtPacketDataPayload *payload = packet_ptr->payload;
printf("| %61u |\n", payload->timestamp);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
......@@ -64,11 +60,11 @@ int PrrtPacket_print(PrrtPacket *packet_ptr)
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61s |\n", (char *) (packet_ptr->payload + PRRT_PACKET_DATA_HEADER_SIZE));
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
} else if(type == PACKET_TYPE_REDUNDANCY) {
} else if (type == PACKET_TYPE_REDUNDANCY) {
PrrtPacketRedundancyPayload *payload = packet_ptr->payload;
printf("| %29u | %13u | %13u |\n", payload->baseSequenceNumber, payload->n, payload->k);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
} else if(type == PACKET_TYPE_FEEDBACK) {
} else if (type == PACKET_TYPE_FEEDBACK) {
prrtPacketLength_t i = 0;
PrrtPacketFeedbackPayload *payload = packet_ptr->payload;
......@@ -96,7 +92,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr)
PRRT_PACKET_FEEDBACK_HEADER_SIZE));
prrtPacketLength_t blockCount = (prrtPacketLength_t) (remainingSpace / sizeof(PrrtIncompleteBlock));
for(i = 0; i < blockCount; ++i) {
for (i = 0; i < blockCount; ++i) {
PrrtIncompleteBlock block = payload->incompleteBlocks[i];
printf("| %29u | %29u |\n", block.sequenceNumberBase, block.repairCycleIndex);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
......@@ -109,8 +105,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr)
return 0;
}
PrrtPacket *PrrtPacket_copy(PrrtPacket *original)
{
PrrtPacket *PrrtPacket_copy(PrrtPacket *original) {
PrrtPacket *newPacket = calloc(1, sizeof(PrrtPacket));
check_mem(newPacket);
void *payload = calloc(1, original->payloadLength);
......@@ -131,8 +126,8 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original)
return NULL;
}
static PrrtPacket *create_header(uint8_t priority, prrtSequenceNumber_t seqno, prrtPacketLength_t size, uint8_t type, uint8_t index)
{
static PrrtPacket *
create_header(uint8_t priority, prrtSequenceNumber_t seqno, prrtPacketLength_t size, uint8_t type, uint8_t index) {
PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
check_mem(packet);
......@@ -149,8 +144,7 @@ static PrrtPacket *create_header(uint8_t priority, prrtSequenceNumber_t seqno, p
return NULL;
}
bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
{
bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr) {
void *payload = packet_ptr->payload;
check(packet_ptr->payloadLength + PRRT_PACKET_GENERAL_HEADER_SIZE <= buf_size, "Buffer too small.");
......@@ -158,13 +152,13 @@ bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
buf_ptr = encode_general_header(buf_ptr, packet_ptr);
prrtPacketType_t type = PrrtPacket_type(packet_ptr);
if(type == PACKET_TYPE_DATA) {
if (type == PACKET_TYPE_DATA) {
buf_ptr = encode_data_header(buf_ptr, payload);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_DATA_HEADER_SIZE);
} else if(type == PACKET_TYPE_REDUNDANCY) {
} else if (type == PACKET_TYPE_REDUNDANCY) {
buf_ptr = encode_redundancy_header(buf_ptr, payload);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if(type == PACKET_TYPE_FEEDBACK) {
} else if (type == PACKET_TYPE_FEEDBACK) {
encode_feedback_header(buf_ptr, payload);
} else {
perror("NOT IMPLEMENTED");
......@@ -176,8 +170,7 @@ bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
return false;
}
void *encode_redundancy_header(void *buf_ptr, const void *payload)
{
void *encode_redundancy_header(void *buf_ptr, const void *payload) {
const PrrtPacketRedundancyPayload *redundancyPayload = payload;
prrtSequenceNumber_t *baseSeqNo = (prrtSequenceNumber_t *) buf_ptr;
......@@ -195,8 +188,7 @@ void *encode_redundancy_header(void *buf_ptr, const void *payload)
return buf_ptr;
}
void *encode_feedback_header(void *buf_ptr, const void *payload)
{
void *encode_feedback_header(void *buf_ptr, const void *payload) {
const PrrtPacketFeedbackPayload *feedbackPayload = payload;
uint32_t *receiverAddress = (uint32_t *) buf_ptr;
......@@ -243,11 +235,10 @@ void *encode_feedback_header(void *buf_ptr, const void *payload)
return buf_ptr;
}
void *encode_data_header(void *buf_ptr, const void *payload)
{
void *encode_data_header(void *buf_ptr, const void *payload) {
const PrrtPacketDataPayload *data_payload = payload;
prrtPacketLength_t *packetLength = (prrtPacketLength_t*) buf_ptr;
prrtPacketLength_t *packetLength = (prrtPacketLength_t *) buf_ptr;
*packetLength = htonl(data_payload->dataLength);
buf_ptr += sizeof(prrtPacketLength_t);
......@@ -273,8 +264,7 @@ void *encode_data_header(void *buf_ptr, const void *payload)
return buf_ptr;
}
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet)
{
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet) {
uint8_t *type_priority = (uint8_t *) buf_ptr;
*type_priority = packet->type_priority;
buf_ptr += sizeof(uint8_t);
......@@ -290,8 +280,7 @@ void *encode_general_header(void *buf_ptr, const PrrtPacket *packet)
return buf_ptr;
}
bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket)
{
bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket) {
prrtPacketLength_t payload_len = (prrtPacketLength_t) (srcBufferSize - PRRT_PACKET_GENERAL_HEADER_SIZE);
targetPacket->type_priority = *(uint8_t *) srcBuffer;
srcBuffer += 1;
......@@ -309,13 +298,13 @@ bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targ
targetPacket->payloadLength = payload_len;
prrtPacketType_t packetType = PrrtPacket_type(targetPacket);
if(packetType == PACKET_TYPE_DATA) {
if (packetType == PACKET_TYPE_DATA) {
srcBuffer = decode_data_header(srcBuffer, payload_buffer);
PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_DATA_HEADER_SIZE);
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
srcBuffer = decode_redundancy_header(srcBuffer, payload_buffer);
PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if(packetType == PACKET_TYPE_FEEDBACK) {
} else if (packetType == PACKET_TYPE_FEEDBACK) {
decode_feedback_header(srcBuffer, payload_buffer);
} else {
printf("NOT IMPLEMENTED\n");
......@@ -326,8 +315,7 @@ bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targ
return false;
}
void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer)
{
void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer) {
PrrtPacketRedundancyPayload *redundancyPayload = (PrrtPacketRedundancyPayload *) srcBuffer;
prrtSequenceNumber_t *baseSeqNo = (prrtSequenceNumber_t *) dstBuffer;
......@@ -345,8 +333,7 @@ void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer)
return dstBuffer;
}
void *decode_feedback_header(void *dstBuffer, const void *srcBuffer)
{
void *decode_feedback_header(void *dstBuffer, const void *srcBuffer) {
PrrtPacketFeedbackPayload *feedback_payload = (PrrtPacketFeedbackPayload *) srcBuffer;
uint32_t *receiverAddr = (uint32_t *) dstBuffer;
......@@ -393,8 +380,7 @@ void *decode_feedback_header(void *dstBuffer, const void *srcBuffer)
return dstBuffer;
}
void *decode_data_header(void *dstBuffer, const void *srcBuffer)
{
void *decode_data_header(void *dstBuffer, const void *srcBuffer) {
PrrtPacketDataPayload *data_payload = (PrrtPacketDataPayload *) srcBuffer;
prrtPacketLength_t *dataLength = (prrtPacketLength_t *) dstBuffer;
......@@ -424,9 +410,8 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer)
}
int PrrtPacket_destroy(PrrtPacket *packet)
{
if(packet->payload != NULL) {
int PrrtPacket_destroy(PrrtPacket *packet) {
if (packet->payload != NULL) {
free(packet->payload);
}
free(packet);
......@@ -435,10 +420,10 @@ int PrrtPacket_destroy(PrrtPacket *packet)
PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer,
prrtPacketLength_t dataLength, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t targetDelay)
{
prrtTimedelta_t targetDelay) {
PrrtPacket *packet = create_header(priority, sequenceNumber,
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE), PACKET_TYPE_DATA, 0);
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE),
PACKET_TYPE_DATA, 0);
PrrtPacketDataPayload *dataPayload = calloc(1, packet->payloadLength);
check_mem(dataPayload);
......@@ -447,7 +432,6 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
dataPayload->dataLength = dataLength;
dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->packetTimeout_us = dataPayload->timestamp + targetDelay;
debug(DEBUG_PACKET, "timeout = %lu + %lu", (unsigned long) dataPayload->timestamp, (unsigned long) targetDelay);
dataPayload->groupRTT_us = 0;
dataPayload->decodingTimeout_us = 150; // TODO: payload->decodingTimeout_us
dataPayload->feedbackTimer_us = 170; // TODO: payload->feedback_timer
......@@ -460,11 +444,13 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
return NULL;
}
PrrtPacket *PrrtPacket_reconstruct_data_packet(PrrtPacketDataPayload *payload, prrtIndex_t index, prrtSequenceNumber_t sequenceNumber) {
PrrtPacket *PrrtPacket_reconstruct_data_packet(PrrtPacketDataPayload *payload, prrtIndex_t index,
prrtSequenceNumber_t sequenceNumber) {
prrtPacketLength_t dataLength = payload->dataLength;
PrrtPacket *packet = create_header(0, sequenceNumber,
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE), PACKET_TYPE_DATA, index);
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE),
PACKET_TYPE_DATA, index);
PrrtPacketDataPayload *dataPayload = calloc(1, packet->payloadLength);
check_mem(dataPayload);
......@@ -482,8 +468,8 @@ PrrtPacket *PrrtPacket_reconstruct_data_packet(PrrtPacketDataPayload *payload, p
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer,
prrtPacketLength_t payloadLength,
prrtSequenceNumber_t sequenceNumber, uint8_t index,
prrtSequenceNumber_t baseSequenceNumber, PrrtCodingParams* codingParams)
{
prrtSequenceNumber_t baseSequenceNumber,
PrrtCodingParams *codingParams) {
PrrtPacket *packet = create_header(priority, sequenceNumber,
(prrtPacketLength_t) (payloadLength + PRRT_PACKET_REDUNDANCY_HEADER_SIZE),
PACKET_TYPE_REDUNDANCY, index);
......@@ -500,7 +486,7 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP
return packet;
error:
PERROR("Out of memory%s.","");
PERROR("Out of memory%s.", "");
return NULL;
}
......@@ -508,8 +494,7 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, p
prrtTimedelta_t groupRTT, prrtSequenceNumber_t gapLength,
prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength,
prrtSequenceNumber_t burstCount, uint32_t bandwidth,
uint32_t receiverAddr, prrtTimestamp_t forwardTripTime)
{
uint32_t receiverAddr, prrtTimestamp_t forwardTripTime) {
PrrtPacket *packet = create_header(priority, sequenceNumber, PRRT_PACKET_FEEDBACK_HEADER_SIZE, PACKET_TYPE_FEEDBACK,
index);
......
#include <netdb.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
......@@ -99,7 +100,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
sock_ptr->lastSentTimestamp = dataTimestamp;
PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
PrrtPacketTimeout* packetTimeout = PrrtPacketTimeout_create(packet);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
prrtSequenceNumber_t seqno = packet->sequenceNumber;
......@@ -126,7 +127,8 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
if (block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed: %d, %d", baseSequenceNumber, seqno);
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed: %d, %d", baseSequenceNumber,
seqno);
decode_block(sock_ptr, block);
} else {
if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
......@@ -177,6 +179,48 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
return;
}
void receive_from_socket(const PrrtSocket *socket_ptr, unsigned char *buffer_ptr, ssize_t *received_size,
struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_stamp_ptr) {
if(socket_ptr->isHardwareTimestamping) {
struct cmsghdr *cmsg;
struct msghdr msg;
struct iovec entry;
struct {
struct cmsghdr cm;
char control[512];
} control;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &entry;
msg.msg_iovlen = 1;
entry.iov_base = buffer_ptr;
entry.iov_len = MAX_PAYLOAD_LENGTH;
msg.msg_name = (caddr_t) remote_ptr;
msg.msg_namelen = *remote_len_ptr;
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
*received_size = recvmsg(socket_ptr->dataSocketFd, &msg, 0);
for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
switch(cmsg->cmsg_type) {
case SO_TIMESTAMPNS: {
memcpy(packet_stamp_ptr, (struct timespec*) CMSG_DATA(cmsg), sizeof(struct timespec));
break;
}
default:
PERROR("Unknown Control Msg Type: %d", cmsg->cmsg_type);
break;
}
}
} else {
*received_size = recvfrom(socket_ptr->dataSocketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
(struct sockaddr *) remote_ptr, remote_len_ptr);
clock_gettime(CLOCK_REALTIME, packet_stamp_ptr);
}
}
void *receive_data_loop(void *ptr) {
ssize_t n;
struct sockaddr_in remote;
......@@ -192,8 +236,12 @@ void *receive_data_loop(void *ptr) {
XlapTimestampPlaceholderInitialize(&tsph2);
XlapTimestampPlaceholderInitialize(&tsph3);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
struct timespec packet_recv_stamp;
receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_stamp);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_stamp.tv_sec,
packet_recv_stamp.tv_nsec, inet_ntoa(remote.sin_addr));
sock_ptr->lastReceivedTimestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_stamp);
XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
......@@ -209,6 +257,8 @@ void *receive_data_loop(void *ptr) {
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
if (packetType == PACKET_TYPE_DATA) {
XlapTimeStampValue(sock_ptr, ts_data_packet, seqno, ChannelReceive, packet_recv_stamp);
XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph1);
XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph2);
XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph3);
......@@ -217,6 +267,8 @@ void *receive_data_loop(void *ptr) {
handle_data_packet(sock_ptr, packet, remote);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketEnd);
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
XlapTimeStampValue(sock_ptr, ts_redundancy_packet, seqno, ChannelReceive, packet_recv_stamp);
XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph1);
XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph2);
XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph3);
......
#include <stdio.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include "../../defines.h"
#include "../block.h"
......@@ -10,6 +9,66 @@
#include "../../util/common.h"
#include "dataTransmitter.h"
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_stamp_ptr) {
if(sock_ptr->isHardwareTimestamping) {
struct msghdr msg;
struct iovec iov;
char control[1024];
ssize_t got;
int check = 0;
const int check_max = 9999;
iov.iov_base = buf;
iov.iov_len = length;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = recv->ai->ai_addr;
msg.msg_namelen = recv->ai->ai_addrlen;
msg.msg_control = control;
msg.msg_controllen = 0;
sendmsg(sock_ptr->dataSocketFd, &msg, 0);
msg.msg_control = control;
iov.iov_len = MAX_PAYLOAD_LENGTH;
do {
msg.msg_controllen = 1024;
got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
} while(got < 0 && errno == EAGAIN && check++ < check_max);
check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");
struct cmsghdr* cmsg;
for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET) {
continue;
}
switch(cmsg->cmsg_type) {
case SO_TIMESTAMPING: {
// Note: The raw stamp [2] is used, because the others are 0.
struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
memcpy(packet_stamp_ptr, hardstamp, sizeof(struct timespec));
debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_stamp_ptr->tv_sec, (long) packet_stamp_ptr->tv_nsec);
break;
}
default:
debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
break;
}
}
} else {
// TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
length, "Sendto failed.");
clock_gettime(CLOCK_REALTIME, packet_stamp_ptr);
debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_stamp_ptr->tv_sec, (long) packet_stamp_ptr->tv_nsec);
}
return true;
error:
PERROR("Sending packet failed.%s", "")
return false;
}
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
......@@ -31,15 +90,27 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
case PACKET_TYPE_CHANNEL_FEEDBACK:
default:;
}
uint32_t receiver_count = List_count(sock_ptr->receivers);
struct timespec **timestamps = calloc(receiver_count, sizeof(struct timespec *));
int i = 0;
// SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
PrrtReceiver *recv = cur->value;
// TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
length, "Sendto failed.");
timestamps[i] = calloc(sizeof(struct timespec), 1);
send_to_socket(sock_ptr, recv, buf, length, timestamps[i]);
i++;
usleep(1);
}
XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, *timestamps[i-1]);
// TODO: Make less ugly.
for(int j = 0; j < receiver_count; j++) {
free(timestamps[j]);
}
free(timestamps);
switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
......
This diff is collapsed.
......@@ -17,6 +17,11 @@
#include "clock.h"
#include "../xlap/xlap.h"
#ifndef SIOCSHWTSTAMP
# define SIOCSHWTSTAMP 0x89b0
# define SIOCGHWTSTAMP 0x89b1
#endif
typedef struct prrtSocket {
int dataSocketFd;
......@@ -66,39 +71,44 @@ typedef struct prrtSocket {
pthread_attr_t *receiveFeedbackThreadAttr;
pthread_attr_t *sendDataThreadAttr;
pthread_attr_t *receiveDataThreadAttr;
bool isHardwareTimestamping;
char *interfaceName;
} PrrtSocket;
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us);
PrrtSocket *PrrtSocket_create(bool is_sender, prrtTimedelta_t target_delay_us);
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name);
bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char *ipAddress, const uint16_t port);
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value);
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value);
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const char *name);
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name);
bool PrrtSocket_set_coding_parameters(PrrtSocket *sock_ptr, uint8_t k, uint8_t n);
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n);
int PrrtSocket_interrupt(PrrtSocket *sock_ptr);
int PrrtSocket_interrupt(PrrtSocket *s);
int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_close(PrrtSocket *s);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr);