Commit ce45efc6 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

RTT is now RTprop and uses a min-filter. Receiver has CSI.

parent ebc1f03b
Pipeline #1707 passed with stages
in 1 minute and 34 seconds
...@@ -140,7 +140,7 @@ cdef extern from "proto/socket.h": ...@@ -140,7 +140,7 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n) bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n)
PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s) PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket) bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket) uint32_t PrrtSocket_get_rtprop(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket) bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "../util/common.h" #include "../util/common.h"
#include "../util/dbg.h" #include "../util/dbg.h"
#include "channelStateInformation.h" #include "channelStateInformation.h"
#include "clock.h"
PrrtChannelStateInformation * PrrtChannelStateInformation_create() PrrtChannelStateInformation * PrrtChannelStateInformation_create()
{ {
...@@ -11,8 +12,8 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create() ...@@ -11,8 +12,8 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
check(pthread_mutex_init(&csi->lock, NULL) == 0, "Mutex init failed."); check(pthread_mutex_init(&csi->lock, NULL) == 0, "Mutex init failed.");
csi->rttMean = 0; csi->rtprop = TIMESTAMP_SPACE;
csi->rttDev = 0; csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
return csi; return csi;
error: error:
...@@ -20,31 +21,23 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create() ...@@ -20,31 +21,23 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
return NULL; return NULL;
} }
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, prrtTimedelta_t rtt) void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
{ {
pthread_mutex_lock(&csi->lock); pthread_mutex_lock(&csi->lock);
int32_t delta = (int32_t) rtt - (int32_t) csi->rttMean; prrtTimestamp_t now = PrrtClock_get_current_time_us();
// TODO: ensure that there are no arithemtic problems via rounding etc. csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
csi->rttMean = (prrtTimedelta_t) (csi->rttMean + RRT_ALPHA * delta); if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
csi->rttDev = (prrtTimedelta_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev)); csi->rtprop = rtprop;
csi->rtprop_stamp = now;
}
pthread_mutex_unlock(&csi->lock); pthread_mutex_unlock(&csi->lock);
} }
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi)
{
pthread_mutex_lock(&csi->lock);
check(csi != NULL, "Input should not be NULL.");
pthread_mutex_unlock(&csi->lock);
return;
error:
PERROR("Should not happen.%s","");
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtt(PrrtChannelStateInformation *csi) prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{ {
pthread_mutex_lock(&csi->lock); pthread_mutex_lock(&csi->lock);
prrtTimedelta_t res = csi->rttMean; prrtTimedelta_t res = csi->rtprop;
pthread_mutex_unlock(&csi->lock); pthread_mutex_unlock(&csi->lock);
return res; return res;
} }
......
...@@ -6,13 +6,15 @@ ...@@ -6,13 +6,15 @@
typedef struct prrtChannelStateInformation { typedef struct prrtChannelStateInformation {
pthread_mutex_t lock; pthread_mutex_t lock;
prrtTimedelta_t rttMean; prrtTimedelta_t rtprop;
prrtTimedelta_t rttDev; prrtTimestamp_t rtprop_stamp;
prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired;
} PrrtChannelStateInformation; } PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void); PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation* csi, prrtTimedelta_t rtt); void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtt(PrrtChannelStateInformation* csi); prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi); bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi); void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
......
...@@ -50,7 +50,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr) { ...@@ -50,7 +50,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr) {
PrrtPacketDataPayload *payload = packet_ptr->payload; PrrtPacketDataPayload *payload = packet_ptr->payload;
printf("| %61u |\n", payload->timestamp); printf("| %61u |\n", payload->timestamp);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->groupRTT_us); printf("| %61u |\n", payload->groupRTprop_us);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->packetTimeout_us); printf("| %61u |\n", payload->packetTimeout_us);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
...@@ -247,7 +247,7 @@ void *encode_data_header(void *buf_ptr, const void *payload) { ...@@ -247,7 +247,7 @@ void *encode_data_header(void *buf_ptr, const void *payload) {
buf_ptr += sizeof(prrtTimestamp_t); buf_ptr += sizeof(prrtTimestamp_t);
prrtTimedelta_t *group_round_trip_time = (prrtTimedelta_t *) buf_ptr; prrtTimedelta_t *group_round_trip_time = (prrtTimedelta_t *) buf_ptr;
*group_round_trip_time = htonl(data_payload->groupRTT_us); *group_round_trip_time = htonl(data_payload->groupRTprop_us);
buf_ptr += sizeof(prrtTimedelta_t); buf_ptr += sizeof(prrtTimedelta_t);
prrtTimedelta_t *packet_timeout = (prrtTimedelta_t *) buf_ptr; prrtTimedelta_t *packet_timeout = (prrtTimedelta_t *) buf_ptr;
...@@ -392,7 +392,7 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer) { ...@@ -392,7 +392,7 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer) {
dstBuffer += sizeof(prrtTimestamp_t); dstBuffer += sizeof(prrtTimestamp_t);
prrtTimedelta_t *group_round_trip_time = (prrtTimedelta_t *) dstBuffer; prrtTimedelta_t *group_round_trip_time = (prrtTimedelta_t *) dstBuffer;
data_payload->groupRTT_us = ntohl(*group_round_trip_time); data_payload->groupRTprop_us = ntohl(*group_round_trip_time);
dstBuffer += sizeof(prrtTimedelta_t); dstBuffer += sizeof(prrtTimedelta_t);
prrtTimedelta_t *packet_timeout = (prrtTimedelta_t *) dstBuffer; prrtTimedelta_t *packet_timeout = (prrtTimedelta_t *) dstBuffer;
...@@ -432,7 +432,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP ...@@ -432,7 +432,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
dataPayload->dataLength = dataLength; dataPayload->dataLength = dataLength;
dataPayload->timestamp = PrrtClock_get_current_time_us(); dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->packetTimeout_us = dataPayload->timestamp + targetDelay; dataPayload->packetTimeout_us = dataPayload->timestamp + targetDelay;
dataPayload->groupRTT_us = 0; dataPayload->groupRTprop_us = 0;
dataPayload->decodingTimeout_us = 150; // TODO: payload->decodingTimeout_us dataPayload->decodingTimeout_us = 150; // TODO: payload->decodingTimeout_us
dataPayload->feedbackTimer_us = 170; // TODO: payload->feedback_timer dataPayload->feedbackTimer_us = 170; // TODO: payload->feedback_timer
PrrtPacket_copy_buffer_to_payload(packet, payloadPointer, PRRT_PACKET_DATA_HEADER_SIZE) PrrtPacket_copy_buffer_to_payload(packet, payloadPointer, PRRT_PACKET_DATA_HEADER_SIZE)
......
...@@ -42,7 +42,7 @@ typedef struct prrtPacket { ...@@ -42,7 +42,7 @@ typedef struct prrtPacket {
typedef struct prrtPacketDataPayload { typedef struct prrtPacketDataPayload {
prrtPacketLength_t dataLength; prrtPacketLength_t dataLength;
prrtTimestamp_t timestamp; prrtTimestamp_t timestamp;
prrtTimedelta_t groupRTT_us; prrtTimedelta_t groupRTprop_us;
prrtTimestamp_t packetTimeout_us; prrtTimestamp_t packetTimeout_us;
prrtTimestamp_t decodingTimeout_us; prrtTimestamp_t decodingTimeout_us;
prrtTimedelta_t feedbackTimer_us; prrtTimedelta_t feedbackTimer_us;
......
...@@ -98,7 +98,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct ...@@ -98,7 +98,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
PrrtPacketDataPayload *payload = packet->payload; PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t dataTimestamp = payload->timestamp; prrtTimestamp_t dataTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = dataTimestamp; sock_ptr->lastSentTimestamp = dataTimestamp;
PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTT_us); PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTprop_us);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet); PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet."); check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
......
...@@ -145,7 +145,7 @@ void *send_data_loop(void *ptr) { ...@@ -145,7 +145,7 @@ void *send_data_loop(void *ptr) {
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber); packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload; PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTT_us = PrrtChannelStateInformation_get_rtt(sock_ptr->csi); payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet); PrrtPacket *packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend); send_packet(sock_ptr, packetToSend);
......
...@@ -39,8 +39,8 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length) ...@@ -39,8 +39,8 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload *) prrtPacket->payload)->forwardTripTimestamp_us; prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload *) prrtPacket->payload)->forwardTripTimestamp_us;
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
PrrtChannelStateInformation_print(prrtSocket->csi); (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
error: error:
if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); } if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
......
...@@ -22,6 +22,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) ...@@ -22,6 +22,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port)
recv->ai = info; recv->ai = info;
recv->csi = PrrtChannelStateInformation_create();
return recv; return recv;
error: error:
...@@ -33,8 +35,11 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) ...@@ -33,8 +35,11 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port)
return NULL; return NULL;
} }
bool PrrtReceiver_destroy(PrrtReceiver *receiver) bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
{ if (receiver->csi != NULL) {
PrrtChannelStateInformation_destroy(receiver->csi);
}
freeaddrinfo(receiver->ai); freeaddrinfo(receiver->ai);
free((void *) receiver->host_name); free((void *) receiver->host_name);
free(receiver); free(receiver);
......
...@@ -6,11 +6,13 @@ ...@@ -6,11 +6,13 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <netdb.h>
#include "channelStateInformation.h"
typedef struct prrtReceiver { typedef struct prrtReceiver {
const char* host_name; const char* host_name;
uint16_t port; uint16_t port;
struct addrinfo *ai; struct addrinfo *ai;
PrrtChannelStateInformation *csi;
} PrrtReceiver; } PrrtReceiver;
PrrtReceiver* PrrtReceiver_create(const char *host, uint16_t port); PrrtReceiver* PrrtReceiver_create(const char *host, uint16_t port);
......
...@@ -72,7 +72,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay ...@@ -72,7 +72,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
s->sequenceNumberSource = 1; s->sequenceNumberSource = 1;
s->sequenceNumberRedundancy = 1; s->sequenceNumberRedundancy = 1;
s->csi = PrrtChannelStateInformation_create();
check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1) check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us); s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
...@@ -439,10 +439,6 @@ int PrrtSocket_close(PrrtSocket *s) { ...@@ -439,10 +439,6 @@ int PrrtSocket_close(PrrtSocket *s) {
free(s->address); free(s->address);
} }
if (s->csi != NULL) {
check(PrrtChannelStateInformation_destroy(s->csi), "Could not destroy channel state information.")
}
if (s->applicationConstraints != NULL) { if (s->applicationConstraints != NULL) {
check(PrrtApplicationConstraints_destroy(s->applicationConstraints), check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
"Could not destroy application constraints.") "Could not destroy application constraints.")
...@@ -558,6 +554,6 @@ bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) { ...@@ -558,6 +554,6 @@ bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire); return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
} }
uint32_t PrrtSocket_get_rtt(PrrtSocket *s) { uint32_t PrrtSocket_get_rtprop(PrrtSocket *s) {
return PrrtChannelStateInformation_get_rtt(s->csi); return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
} }
...@@ -62,8 +62,6 @@ typedef struct prrtSocket { ...@@ -62,8 +62,6 @@ typedef struct prrtSocket {
prrtTimestamp_t lastSentTimestamp; prrtTimestamp_t lastSentTimestamp;
prrtTimestamp_t lastReceivedTimestamp; prrtTimestamp_t lastReceivedTimestamp;
PrrtChannelStateInformation *csi;
PrrtApplicationConstraints *applicationConstraints; PrrtApplicationConstraints *applicationConstraints;
PrrtCodingParams *codingParameters; PrrtCodingParams *codingParameters;
...@@ -122,6 +120,6 @@ bool PrrtSocket_closing(PrrtSocket *s); ...@@ -122,6 +120,6 @@ bool PrrtSocket_closing(PrrtSocket *s);
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s); bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);
uint32_t PrrtSocket_get_rtt(PrrtSocket *s); uint32_t PrrtSocket_get_rtprop(PrrtSocket *s);
#endif // PRRT_SOCKET_H #endif // PRRT_SOCKET_H
...@@ -148,7 +148,7 @@ cdef class PrrtSocket: ...@@ -148,7 +148,7 @@ cdef class PrrtSocket:
def __get__(self): def __get__(self):
if not self.isSender: if not self.isSender:
raise Exception("Not a sender.") raise Exception("Not a sender.")
return cprrt.PrrtSocket_get_rtt(self._c_socket) * 0.000001 return cprrt.PrrtSocket_get_rtprop(self._c_socket) * 0.000001
property coding_configuration: property coding_configuration:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment