Commit 770f62df authored by Andreas Schmidt's avatar Andreas Schmidt

Extract xlap timestamping into separate module.

parent 9ad12fa7
Pipeline #976 passed with stages
in 1 minute and 8 seconds
variables: variables:
PYPI_USER: SECURE PYPI_USER: SECURE
PYPI_PASSWORD: SECURE PYPI_PASSWORD: SECURE
GIT_SUBMODULE_STRATEGY: recursive
stages: stages:
- build - build
......
[submodule "prrt/xlap"]
path = prrt/xlap
url = ../../as/X-Lap.git
...@@ -121,16 +121,16 @@ cdef extern from "proto/socket.h": ...@@ -121,16 +121,16 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value) bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name) uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
cdef extern from "proto/timestamp.h": cdef extern from "xlap/xlap.h":
ctypedef struct PrrtTimestampTable: ctypedef struct XlapTimestampTable:
pass pass
cdef enum PrrtTimestampPacketKind: cdef enum XlapTimestampPacketKind:
ts_data_packet = 0, ts_data_packet = 0,
ts_any_packet = 0, ts_any_packet = 0,
ts_redundancy_packet = 1 ts_redundancy_packet = 1
void PrrtTimestampTableInstall(PrrtSocket* sck, PrrtTimestampPacketKind kind, PrrtTimestampTable* tstp); void XlapTimestampTableInstall(PrrtSocket* sck, XlapTimestampPacketKind kind, XlapTimestampTable* tstp);
cdef extern from "util/bptree.h": cdef extern from "util/bptree.h":
ctypedef struct BPTreeNode: ctypedef struct BPTreeNode:
......
...@@ -6,7 +6,7 @@ add_library(PRRT ../defines.h ...@@ -6,7 +6,7 @@ add_library(PRRT ../defines.h
packet.c packet.h packet.c packet.h
receiver.c receiver.h receiver.c receiver.h
socket.c socket.h socket.c socket.h
timestamp.c timestamp.h ../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h applicationConstraints.c applicationConstraints.h
vdmcode/block_code.c vdmcode/block_code.h vdmcode/block_code.c vdmcode/block_code.h
stores/forwardPacketTable.c stores/forwardPacketTable.h stores/forwardPacketTable.c stores/forwardPacketTable.h
......
...@@ -111,9 +111,9 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct ...@@ -111,9 +111,9 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
prrtSequenceNumber_t seqno = packet->sequenceNumber; prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackStart);
check(send_feedback(sock_ptr, remote), "Sending feedback failed."); check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackEnd);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock); prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if(is_timeout(now, payload->packetTimeout_us)) { if(is_timeout(now, payload->packetTimeout_us)) {
...@@ -144,7 +144,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct ...@@ -144,7 +144,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
// forward to application layer // forward to application layer
debug(DEBUG_DATARECEIVER, "forward %u", seqno); debug(DEBUG_DATARECEIVER, "forward %u", seqno);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed."); check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet); List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed."); check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
...@@ -198,46 +198,46 @@ void *receive_data_loop(void *ptr) ...@@ -198,46 +198,46 @@ void *receive_data_loop(void *ptr)
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
while(1) { while(1) {
PrrtTimestampPlaceholder tsph1; XlapTimestampPlaceholder tsph1;
PrrtTimestampPlaceholder tsph2; XlapTimestampPlaceholder tsph2;
PrrtTimestampPlaceholder tsph3; XlapTimestampPlaceholder tsph3;
PrrtTimestampPlaceholderInitialize(&tsph1); XlapTimestampPlaceholderInitialize(&tsph1);
PrrtTimestampPlaceholderInitialize(&tsph2); XlapTimestampPlaceholderInitialize(&tsph2);
PrrtTimestampPlaceholderInitialize(&tsph3); XlapTimestampPlaceholderInitialize(&tsph3);
memset(buffer, 0, MAX_PAYLOAD_LENGTH); memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen); n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us(); sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
PrrtTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive); XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive); XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket)); PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet); check_mem(packet);
PrrtTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart); XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed."); check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd); XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber; prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtPacketType_t packetType = PrrtPacket_type(packet); prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno); debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
if(packetType == PACKET_TYPE_DATA) { if(packetType == PACKET_TYPE_DATA) {
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph1); XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph1);
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph2); XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph2);
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph3); XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph3);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketStart);
handle_data_packet(sock_ptr, packet, remote); handle_data_packet(sock_ptr, packet, remote);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketEnd);
} else if(packetType == PACKET_TYPE_REDUNDANCY) { } else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph1); XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph1);
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph2); XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph2);
PrrtTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph3); XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph3);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketStart); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketStart);
handle_redundancy_packet(sock_ptr, packet); handle_redundancy_packet(sock_ptr, packet);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketEnd); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketEnd);
} else { } else {
PrrtPacket_print(packet); PrrtPacket_print(packet);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
......
...@@ -21,10 +21,10 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) ...@@ -21,10 +21,10 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
switch (PrrtPacket_type(packet)) { switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA: case PACKET_TYPE_DATA:
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
break; break;
case PACKET_TYPE_REDUNDANCY: case PACKET_TYPE_REDUNDANCY:
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
break; break;
case PACKET_TYPE_REPETITION: case PACKET_TYPE_REPETITION:
case PACKET_TYPE_FEEDBACK: case PACKET_TYPE_FEEDBACK:
...@@ -43,12 +43,12 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) ...@@ -43,12 +43,12 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
} }
switch (PrrtPacket_type(packet)) { switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA: case PACKET_TYPE_DATA:
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
break; break;
case PACKET_TYPE_REDUNDANCY: case PACKET_TYPE_REDUNDANCY:
PrrtTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd); XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
break; break;
case PACKET_TYPE_REPETITION: case PACKET_TYPE_REPETITION:
case PACKET_TYPE_FEEDBACK: case PACKET_TYPE_FEEDBACK:
...@@ -86,7 +86,7 @@ void *send_data_loop(void *ptr) ...@@ -86,7 +86,7 @@ void *send_data_loop(void *ptr)
} }
PrrtPacket *packet = List_shift(sock_ptr->outQueue); PrrtPacket *packet = List_shift(sock_ptr->outQueue);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if(block == NULL) { if(block == NULL) {
block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber); block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
} }
...@@ -98,7 +98,7 @@ void *send_data_loop(void *ptr) ...@@ -98,7 +98,7 @@ void *send_data_loop(void *ptr)
PrrtPacket *packetToSend = PrrtPacket_copy(packet); PrrtPacket *packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend); send_packet(sock_ptr, packetToSend);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet); PrrtBlock_insert_data_packet(block, packet);
...@@ -106,9 +106,9 @@ void *send_data_loop(void *ptr) ...@@ -106,9 +106,9 @@ void *send_data_loop(void *ptr)
if(PrrtBlock_encode_ready(block)) { if(PrrtBlock_encode_ready(block)) {
uint32_t j = 0; uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy; unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy); PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd); XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets); uint32_t redundancyPackets = List_count(block->redundancyPackets);
for(j = 0; j < redundancyPackets; j++) { for(j = 0; j < redundancyPackets; j++) {
......
...@@ -152,24 +152,24 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po ...@@ -152,24 +152,24 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
{ {
check(sock_ptr->isSender, "Cannot send on receiver socket.") check(sock_ptr->isSender, "Cannot send on receiver socket.")
PrrtTimestampPlaceholder tsph; XlapTimestampPlaceholder tsph;
PrrtTimestampPlaceholderInitialize(&tsph); XlapTimestampPlaceholderInitialize(&tsph);
PrrtTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart); XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
PrrtTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart); XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
prrtSequenceNumber_t sequenceNumber = sock_ptr->sequenceNumberSource++; prrtSequenceNumber_t sequenceNumber = sock_ptr->sequenceNumberSource++;
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber, PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
PrrtApplicationConstraints_get_target_delay( PrrtApplicationConstraints_get_target_delay(
sock_ptr->applicationConstraints)); sock_ptr->applicationConstraints));
PrrtTimestampPlaceholderUse(sock_ptr, ts_data_packet, packet->sequenceNumber, &tsph); XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, packet->sequenceNumber, &tsph);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed."); check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->outQueue, packet); List_push(sock_ptr->outQueue, packet);
check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed."); check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed"); check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSendEnd); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSendEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
return 0; return 0;
error: error:
PERROR("There was a failure while sending from socket.%s", ""); PERROR("There was a failure while sending from socket.%s", "");
...@@ -190,19 +190,19 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) ...@@ -190,19 +190,19 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr)
check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed."); check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed.");
} }
PrrtPacket *packet = List_shift(sock_ptr->inQueue); PrrtPacket *packet = List_shift(sock_ptr->inQueue);
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE); prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
debug(DEBUG_SOCKET, "len: %d", (int) len); debug(DEBUG_SOCKET, "len: %d", (int) len);
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed."); check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
return len; return len;
} }
...@@ -237,19 +237,19 @@ int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t ...@@ -237,19 +237,19 @@ int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t
} }
} }
PrrtPacket *packet = List_shift(sock_ptr->inQueue); PrrtPacket *packet = List_shift(sock_ptr->inQueue);
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE); prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
debug(DEBUG_SOCKET, "len: %d", (int) len); debug(DEBUG_SOCKET, "len: %d", (int) len);
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed."); check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
return len; return len;
} }
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "stores/packetTimeoutTable.h" #include "stores/packetTimeoutTable.h"
#include "stores/repairBlockStore.h" #include "stores/repairBlockStore.h"
#include "clock.h" #include "clock.h"
#include "timestamp.h" #include "../xlap/xlap.h"
typedef struct prrtSocket { typedef struct prrtSocket {
...@@ -63,7 +63,7 @@ typedef struct prrtSocket { ...@@ -63,7 +63,7 @@ typedef struct prrtSocket {
PrrtCodingParams *codingParameters; PrrtCodingParams *codingParameters;
_Atomic(PrrtTimestampTable *) tstable[2]; _Atomic(XlapTimestampTable *) tstable[2];
pthread_attr_t* receiveFeedbackThreadAttr; pthread_attr_t* receiveFeedbackThreadAttr;
pthread_attr_t* sendDataThreadAttr; pthread_attr_t* sendDataThreadAttr;
......
#include "timestamp.h"
#include <stdatomic.h>
void PrrtTimestampTableDumpHeader(FILE *out)
{
fprintf(out, "SeqNo,Kind" );
# define OUT(id) fprintf(out, ",%s_T,%s_C", #id, #id);
PP_foreach(PP_join_space, OUT, TIMESTAMP_ID_LIST)
# undef OUT
fprintf(out, "\n");
}
static inline unsigned long long timestampByTime(struct timespec *ts)
{
// convert timespec to microseconds
unsigned long long x = ts->tv_sec;
x *= 1000000ULL;
x += ts->tv_nsec / 1000;
return x;
}
void PrrtTimestampTableDump(FILE *out, PrrtTimestampPacketKind kind, PrrtTimestampTable *table)
{
# define OUT(id) fprintf(out, ",%llu,%llu", timestampByTime(&table->rows[row].time[ts_##id].actual.t), (unsigned long long) table->rows[row].time[ts_##id].actual.c);
for (unsigned int row = 0; row < TS_ROWS; row++) {
fprintf(out, "%u,%u", row, (unsigned) kind);
PP_foreach(PP_join_space, OUT, TIMESTAMP_ID_LIST)
fprintf(out, "\n");
}
# undef OUT
}
#ifndef PRRT_TIMESTAMP_H
#define PRRT_TIMESTAMP_H
#include <stdio.h>
#include <stdint.h>
#include <time.h>
/*
* Each timestamp contains both a cycle value and a time value, but some values
* might remain zero
*/
typedef union PrrtTimestamp {
struct PrrtActualTimestamp {
uint64_t c;
struct timespec t;
} actual;
char _cacheline[64];
} PrrtTimestamp;
#include "../util/pp.h"
/*
* Comma-separated list of timestamp IDs
*/
#define TIMESTAMP_ID_LIST \
PrrtSendStart, \
PrrtSendEnd, \
PrrtSubmitPackage, \
PrrtEncodeStart, \
PrrtEncodeEnd, \
PrrtTransmitStart, \
PrrtTransmitEnd, \
LinkTransmitStart, \
LinkTransmitEnd, \
LinkReceive, \
DecodeStart, \
DecodeEnd, \
HandlePacketStart, \
HandlePacketEnd, \
CopyOutputStart, \
CopyOutputEnd, \
SendFeedbackStart, \
SendFeedbackEnd, \
PrrtReturnPackage, \
PrrtReceivePackage, \
PrrtDeliver \
#define TIMESSTAMP_ID_TO_NAME(id) ts_##id
/*
* enum containing all timestamp IDs
*/
typedef enum PrrtTimestampId {
PP_foreach(PP_join_comma, TIMESSTAMP_ID_TO_NAME, TIMESTAMP_ID_LIST),
ts_count
} PrrtTimestampId;
/*
* enum to distinguish between data and redundancy packet timstamps
*/
typedef enum PrrtTimestampPacketKind {
ts_data_packet = 0,
ts_any_packet = 0,
ts_redundancy_packet = 1,
} PrrtTimestampPacketKind;
/*
* Table that stores timestamps for each timestamp ID
*/
typedef struct PrrtTimestampTableRow {
PrrtTimestamp time[ts_count];
} PrrtTimestampTableRow;
/*
* by default, store timestamps for 128 packages
*/
#ifndef TS_ROWS
#define TS_ROWS (1u<<12)
#endif
/*
* Table that stores timestamp table rows
*/
typedef struct PrrtTimestampTable {
PrrtTimestampTableRow rows[TS_ROWS];
} PrrtTimestampTable;
/*
* Dummy data structure to store a single timestamp table row.
*/
typedef struct PrrtTimestampPlaceholder {
_Atomic(PrrtTimestampTable *) tstable[1];
PrrtTimestampTableRow rows[1];
} PrrtTimestampPlaceholder;
/*
* update the clock value of a timestamp
*
* This macro will cause a SIGSEGV if the application does not install a
* timestamp table to the prrt socket.
*/
#define PrrtTimeStampClock(sck, kind, seqno, id) do { \
clock_gettime(CLOCK_MONOTONIC, &atomic_load_explicit(&(sck)->tstable[kind], memory_order_acquire)->rows[(seqno) % TS_ROWS].time[ts_##id].actual.t); \
} while (0);
/*
* update the cycle value of a timestamp
*
* This macro will cause a SIGSEGV if the application does not install a
* timestamp table to the prrt socket.
*/
#define PrrtTimeStampCycle(sck, kind, seqno, id) do { \
atomic_load_explicit(&(sck)->tstable[kind], memory_order_acquire)->rows[(seqno) % TS_ROWS].time[ts_##id].actual.c = __builtin_ia32_rdtsc(); \
} while (0);
/*
* install a time stamp table to a socket
*/
#define PrrtTimestampTableInstall(sck, kind, tstp) do { \
PrrtTimestampTable *__tstp = (tstp); \
memset(__tstp, 0, sizeof(PrrtTimestampTable)); \
atomic_store_explicit(&(sck)->tstable[kind], __tstp, memory_order_release); \
} while (0)
/*
* print a timestamp dump header
*/
extern void PrrtTimestampTableDumpHeader(FILE *);
/*
* dump a timestamp table
*/
extern void PrrtTimestampTableDump(FILE *, PrrtTimestampPacketKind, PrrtTimestampTable *);
/*
* use a specific timestamp table
*/
extern void PrrtTimestampTableUse(PrrtTimestampTable *);
/*
* intialize a timestamp table placeholder
*/
#define PrrtTimestampPlaceholderInitialize(ph) do { \
PrrtTimestampPlaceholder *__ph = (ph); \
atomic_store_explicit(&__ph->tstable[0], (PrrtTimestampTable *) &__ph->rows, memory_order_release); \
memset(&__ph->rows, 0x0, sizeof(PrrtTimestampTableRow)); \
} while (0);
/*
* copy a timestamp table placeholder into an actual timestamp table
*
* Since every timestamp is taken at most once, either the timestamptable or
* the placeholder value must be zero (for each timestamp).
*/
#define PrrtTimestampPlaceholderUse(sck, kind, seqno, ph) do { \
PrrtTimestampPlaceholder *__ph = (ph); \
PrrtTimestampTable *__ts = atomic_load_explicit(&(sck)->tstable[kind], memory_order_acquire); \
for (unsigned int __t = 0; __t < ts_count; __t++) { \
__ts->rows[seqno % TS_ROWS].time[__t].actual.t.tv_sec += __ph->rows[0].time[__t].actual.t.tv_sec; \
__ts->rows[seqno % TS_ROWS].time[__t].actual.t.tv_nsec += __ph->rows[0].time[__t].actual.t.tv_nsec; \
__ts->rows[seqno % TS_ROWS].time[__t].actual.c += __ph->rows[0].time[__t].actual.c; \
} \
} while (0)
#endif // PRRT_TIMESTAMP_H
...@@ -6,7 +6,7 @@ cimport cprrt ...@@ -6,7 +6,7 @@ cimport cprrt
cdef extern from "proto/applicationConstraints.c": cdef extern from "proto/applicationConstraints.c":
pass pass
cdef extern from "proto/timestamp.c": cdef extern from "xlap/xlap.c":
pass pass
cdef extern from "proto/stores/dataPacketStore.c": cdef extern from "proto/stores/dataPacketStore.c":
...@@ -69,18 +69,18 @@ cdef extern from "util/list.c": ...@@ -69,18 +69,18 @@ cdef extern from "util/list.c":
cdef class PrrtSocket: cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket cdef cprrt.PrrtSocket* _c_socket
cdef bint isSender cdef bint isSender
cdef cprrt.PrrtTimestampTable* tstable_data cdef cprrt.XlapTimestampTable* tstable_data
cdef cprrt.PrrtTimestampTable* tstable_redundancy cdef cprrt.XlapTimestampTable* tstable_redundancy