Commit c2bffb43 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Fix problem with wrong RTT measurements.

parent 86c46a16
Pipeline #92 failed with stage
...@@ -23,7 +23,7 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create() ...@@ -23,7 +23,7 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, prrtTimedelta_t rtt) void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, prrtTimedelta_t rtt)
{ {
pthread_mutex_lock(&csi->lock); pthread_mutex_lock(&csi->lock);
int32_t delta = rtt - csi->rttMean; int32_t delta = (int32_t) rtt - (int32_t) csi->rttMean;
// TODO: ensure that there are no arithemtic problems via rounding etc. // TODO: ensure that there are no arithemtic problems via rounding etc.
csi->rttMean = (prrtTimedelta_t) (csi->rttMean + RRT_ALPHA * delta); csi->rttMean = (prrtTimedelta_t) (csi->rttMean + RRT_ALPHA * delta);
csi->rttDev = (prrtTimedelta_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev)); csi->rttDev = (prrtTimedelta_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev));
...@@ -34,7 +34,6 @@ void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi) ...@@ -34,7 +34,6 @@ void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi)
{ {
pthread_mutex_lock(&csi->lock); pthread_mutex_lock(&csi->lock);
check(csi != NULL, "Input should not be NULL."); check(csi != NULL, "Input should not be NULL.");
printf("RTT [us]: %d +- %d\n", csi->rttMean, csi->rttDev);
pthread_mutex_unlock(&csi->lock); pthread_mutex_unlock(&csi->lock);
return; return;
......
...@@ -6,9 +6,7 @@ ...@@ -6,9 +6,7 @@
#include "clock.h" #include "clock.h"
#include "packet.h" #include "packet.h"
#define diff_ts(timeA, timeB) abs(((int32_t) timeA) - ((int32_t)timeB)) prrtTimestamp_t PrrtClock_get_current_time_us()
prrtTimestamp_t PrrtClock_get_current_time_us()
{ {
struct timeval tv; struct timeval tv;
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
...@@ -22,7 +20,8 @@ uint32_t PrrtClock_get_current_time_ms() ...@@ -22,7 +20,8 @@ uint32_t PrrtClock_get_current_time_ms()
return (uint32_t) (1000 * tv.tv_sec + tv.tv_usec / 1000); return (uint32_t) (1000 * tv.tv_sec + tv.tv_usec / 1000);
} }
uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock) { uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock)
{
uint32_t currentTime = PrrtClock_get_current_time_us(); uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime; uint32_t virtualTime = clock->virtualTime;
...@@ -37,7 +36,8 @@ uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock) { ...@@ -37,7 +36,8 @@ uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock) {
} }
uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt){ uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
{
uint32_t currentTime = PrrtClock_get_current_time_us(); uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime; uint32_t virtualTime = clock->virtualTime;
int32_t clockSkew = clock->skew; int32_t clockSkew = clock->skew;
...@@ -60,7 +60,7 @@ uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt ...@@ -60,7 +60,7 @@ uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt
clockSkew = clockSkew / 16; clockSkew = clockSkew / 16;
} }
virtualTime = virtualTime + meanDeviation + period*clockSkew/400; virtualTime = virtualTime + meanDeviation + period * clockSkew / 400;
clock->meanDeviation = meanDeviation; clock->meanDeviation = meanDeviation;
clock->skew = clockSkew; clock->skew = clockSkew;
...@@ -70,8 +70,9 @@ uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt ...@@ -70,8 +70,9 @@ uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt
debug("Current Time: %d; Virtual Time %d; Skew: %d", currentTime, virtualTime, clockSkew); debug("Current Time: %d; Virtual Time %d; Skew: %d", currentTime, virtualTime, clockSkew);
} }
PrrtClock* PrrtClock_create() { PrrtClock *PrrtClock_create()
PrrtClock* clock = (PrrtClock*) calloc(1, sizeof(PrrtClock)); {
PrrtClock *clock = (PrrtClock *) calloc(1, sizeof(PrrtClock));
check_mem(clock); check_mem(clock);
clock->meanDeviation = 0; clock->meanDeviation = 0;
...@@ -82,11 +83,12 @@ PrrtClock* PrrtClock_create() { ...@@ -82,11 +83,12 @@ PrrtClock* PrrtClock_create() {
return clock; return clock;
error: error:
PERROR("Could not create clock.%s",""); PERROR("Could not create clock.%s", "");
return NULL; return NULL;
} }
bool PrrtClock_destroy(PrrtClock* clock) { bool PrrtClock_destroy(PrrtClock *clock)
{
free(clock); free(clock);
return true; return true;
} }
\ No newline at end of file
...@@ -22,4 +22,6 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock); ...@@ -22,4 +22,6 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
prrtTimestamp_t PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt); prrtTimestamp_t PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
#define diff_ts(timeA, timeB) abs(((int32_t) timeA) - ((int32_t)timeB))
#endif //PRRT_CLOCK_H #endif //PRRT_CLOCK_H
...@@ -474,10 +474,10 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP ...@@ -474,10 +474,10 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP
} }
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber, PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t roundTripTime, prrtSequenceNumber_t gapLength, prrtTimedelta_t groupRTT, prrtSequenceNumber_t gapLength,
prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength, prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength,
prrtSequenceNumber_t burstCount, uint32_t bandwidth, prrtSequenceNumber_t burstCount, uint32_t bandwidth,
uint32_t receiverAddr) uint32_t receiverAddr, prrtTimestamp_t forwardTripTime)
{ {
PrrtPacket *packet = create_header(priority, sequenceNumber, PRRT_PACKET_FEEDBACK_HEADER_SIZE, PACKET_TYPE_FEEDBACK, PrrtPacket *packet = create_header(priority, sequenceNumber, PRRT_PACKET_FEEDBACK_HEADER_SIZE, PACKET_TYPE_FEEDBACK,
index); index);
...@@ -487,8 +487,8 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, p ...@@ -487,8 +487,8 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, p
packet->payload = payload; packet->payload = payload;
payload->receiverAddress = receiverAddr; payload->receiverAddress = receiverAddr;
payload->groupRTT_us = roundTripTime; payload->groupRTT_us = groupRTT;
payload->forwardTripTimestamp_us = 0; payload->forwardTripTimestamp_us = forwardTripTime;
payload->erasureCount = 0; payload->erasureCount = 0;
payload->packetCount = 0; payload->packetCount = 0;
payload->gapLength = gapLength; payload->gapLength = gapLength;
......
...@@ -82,10 +82,10 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP ...@@ -82,10 +82,10 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
prrtPacketLength_t payloadLength, prrtSequenceNumber_t sequenceNumber); prrtPacketLength_t payloadLength, prrtSequenceNumber_t sequenceNumber);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber, PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t roundTripTime, prrtSequenceNumber_t gapLength, prrtTimedelta_t groupRTT, prrtSequenceNumber_t gapLength,
prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength, prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength,
prrtSequenceNumber_t burstCount, uint32_t bandwidth, prrtSequenceNumber_t burstCount, uint32_t bandwidth,
uint32_t receiverAddr); uint32_t receiverAddr, prrtTimestamp_t forwardTripTime);
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer, PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer,
prrtPacketLength_t payloadLength, prrtPacketLength_t payloadLength,
......
...@@ -64,17 +64,18 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) ...@@ -64,17 +64,18 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
hp = gethostbyname(remote_host); hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 4, 6, 8, 9, 5, prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
sock_ptr->address->sin_addr.s_addr); (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 0, 4, 6, 8, 9, 5,
sock_ptr->address->sin_addr.s_addr,
forwardTripTime);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr); prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length); void *buf = calloc(1, length);
check_mem(buf); check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small"); check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
prrtTimestamp_t forwardTripTime = htonl(PrrtClock_get_current_time_us() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forwardTripTimestamp_us = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed."); length, "Sending feedback failed.");
free(buf); free(buf);
...@@ -106,21 +107,20 @@ void *receive_data_loop(void *ptr) ...@@ -106,21 +107,20 @@ void *receive_data_loop(void *ptr)
check_mem(packet); check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed."); check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtPacket_print(packet); prrtPacketType_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
// TODO: make something useful with RTT approximation // TODO: make something useful with RTT approximation
PrrtPacketDataPayload* payload = packet->payload; PrrtPacketDataPayload *payload = packet->payload;
debug("RTT: %d, Packet Timeout: %d", payload->groupRTT_us, payload->packetTimeout_us); debug("RTT: %u, Packet Timeout: %u", payload->groupRTT_us, payload->packetTimeout_us);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet); sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
check(send_feedback(sock_ptr, remote), "Sending feedback failed."); check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
prrtPacketType_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
// TODO: packet.timestamp + packet.timeout < now: break // TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->sequenceNumber) == if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
packet->sequenceNumber) ==
false) { false) {
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
} else { } else {
......
...@@ -19,6 +19,15 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) ...@@ -19,6 +19,15 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
prrtPacketLength_t length = PrrtPacket_size(packet); prrtPacketLength_t length = PrrtPacket_size(packet);
prrtPacketType_t type = PrrtPacket_type(packet); prrtPacketType_t type = PrrtPacket_type(packet);
if(type == PACKET_TYPE_DATA) {
PrrtPacketDataPayload *ptr = (PrrtPacketDataPayload *) packet->payload;
prrtTimedelta_t targetDelay = PrrtApplicationConstraints_get_target_delay(sock_ptr->applicationConstraints);
prrtTimestamp_t currentTime = PrrtClock_get_current_time_us();
ptr->timestamp = currentTime;
ptr->packetTimeout_us = currentTime + targetDelay;
}
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small."); check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
// SENDING TO ALL RECEIVERS // SENDING TO ALL RECEIVERS
...@@ -36,14 +45,6 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) ...@@ -36,14 +45,6 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name) check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) {
PrrtPacketDataPayload *ptr = (PrrtPacketDataPayload *) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH;
prrtTimedelta_t targetDelay = PrrtApplicationConstraints_get_target_delay(sock_ptr->applicationConstraints);
prrtTimestamp_t currentTime = PrrtClock_get_current_time();
ptr->timestamp = htonl(currentTime);
//ptr->packetTimeout_us = htonl(currentTime + targetDelay);
//debug("TIMEOUT: %d %d", currentTime + targetDelay, ptr->packetTimeout_us);
}
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp. // TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sendto failed."); length, "Sendto failed.");
......
...@@ -316,11 +316,8 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length ...@@ -316,11 +316,8 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
printf("%s\n", inet_ntoa(a)); printf("%s\n", inet_ntoa(a));
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us; prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us;
debug("%d %d", receiveTime, forwardTripTimestamp);
double diff = difftime(receiveTime, forwardTripTimestamp); PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
printf("%f\n",diff);
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) diff);
PrrtChannelStateInformation_print(prrtSocket->csi); PrrtChannelStateInformation_print(prrtSocket->csi);
return prrtPacket; return prrtPacket;
...@@ -332,7 +329,7 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length ...@@ -332,7 +329,7 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value) bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value)
{ {
if(strcmp(name, "targetdelay") == 0) { if(strcmp(name, "targetdelay") == 0) {
sock_ptr->applicationConstraints->targetDelay = value; PrrtApplicationConstraints_set_target_delay(sock_ptr->applicationConstraints, value);
} else { } else {
return false; return false;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment