Commit 0c31b3cc authored by Andreas Schmidt's avatar Andreas Schmidt

Extend clock with traditional sync.

parent 09366a58
Pipeline #80 passed with stage
#include <sys/time.h>
#include <stddef.h>
#include <stdlib.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include "clock.h"
uint32_t PrrtClock_get_current_time(void) {
#define diff_ts(timeA, timeB) abs(((int32_t) timeA) - ((int32_t)timeB))
uint32_t PrrtClock_get_current_time_us()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint32_t) (1000000 * tv.tv_sec + tv.tv_usec);
}
uint32_t PrrtClock_get_current_time_ms()
{
struct timeval tv;
gettimeofday(&tv, NULL);
uint32_t time_in_micros = (uint32_t) (1000000 * tv.tv_sec + tv.tv_usec);
return time_in_micros;
return (uint32_t) (1000 * tv.tv_sec + tv.tv_usec / 1000);
}
uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock) {
uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime;
if(virtualTime) {
uint32_t lastMeasurement = clock->lastMeasurement;
int32_t diff = diff_ts(currentTime, lastMeasurement);
int32_t skew = (diff * clock->skew) / 400;
return virtualTime + clock->meanDeviation + skew;
} else {
return currentTime;
}
}
uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt){
uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime;
int32_t clockSkew = clock->skew;
uint32_t delay = rtt >> 1; // half the rtt
int32_t phaseError = referenceTime - virtualTime + delay;
if(abs(phaseError) > 10000) {
clock->meanDeviation = 0;
clock->skew = 0;
clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime;
}
int32_t deviationSum = phaseError + 3 * clock->meanDeviation; // TODO: why 3?
int32_t meanDeviation = deviationSum / 4; // TODO: find out why???
int32_t period = diff_ts(currentTime, clock->lastMeasurement);
if(period > 0) {
clockSkew = (meanDeviation * 1000 / period) + 15 * clock->skew;
clockSkew = clockSkew / 16;
}
virtualTime = virtualTime + meanDeviation + period*clockSkew/400;
clock->meanDeviation = meanDeviation;
clock->skew = clockSkew;
clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime;
debug("Current Time: %lu; Virtual Time %lu; Skew: %d", currentTime, virtualTime, clockSkew);
}
PrrtClock* PrrtClock_create() {
PrrtClock* clock = (PrrtClock*) calloc(1, sizeof(PrrtClock));
check_mem(clock);
clock->meanDeviation = 0;
clock->virtualTime = 0;
clock->lastMeasurement = 0;
clock->skew;
return clock;
error:
PERROR("Could not create clock.%s","");
return NULL;
}
bool PrrtClock_destroy(PrrtClock* clock) {
free(clock);
return true;
}
\ No newline at end of file
......@@ -2,7 +2,23 @@
#define PRRT_CLOCK_H
#include <stdint.h>
#include <stdbool.h>
uint32_t PrrtClock_get_current_time(void);
typedef struct prrtClock {
uint32_t lastMeasurement;
uint32_t meanDeviation;
int32_t skew;
uint32_t virtualTime;
} PrrtClock;
PrrtClock* PrrtClock_create();
bool PrrtClock_destroy(PrrtClock* clock);
uint32_t PrrtClock_get_current_time_us();
uint32_t PrrtClock_get_current_time_ms();
uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt);
#endif //PRRT_CLOCK_H
......@@ -418,7 +418,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
check_mem(dataPayload);
packet->payload = dataPayload;
dataPayload->timestamp = PrrtClock_get_current_time();
dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->group_round_trip_time = 95; // TODO: payload->rtt = CURRENT ESTIMATE
dataPayload->packet_timeout = 110; // TODO: payload->packet_timeout = NOW + maximum delay
dataPayload->decoding_timeout = 150; // TODO: payload->decoding_timeout
......
......@@ -69,7 +69,8 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
uint32_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
uint32_t forwardTripTime = htonl(
PrrtClock_get_prrt_time_us(sock_ptr->clock) + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forward_trip_timestamp = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
......@@ -97,15 +98,14 @@ void *receive_data_loop(void *ptr)
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time();
sock_ptr->lastReceivedTimestamp = PrrtClock_get_prrt_time_us(sock_ptr->clock);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
// TODO: make something useful with RTT approximation
PrrtPacketDataPayload* payload = packet->payload;
debug("RTT: %d", payload->group_round_trip_time);
PrrtClock_update(sock_ptr->clock, payload->timestamp, payload->group_round_trip_time);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
......
......@@ -35,7 +35,8 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time()); }
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(
PrrtClock_get_current_time_us()); }
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
usleep(1);
......
......@@ -15,13 +15,14 @@
#include "socket.h"
#include "block.h"
#include "receiver.h"
#include "clock.h"
PrrtSocket *PrrtSocket_create(const uint8_t is_sender)
{
PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
check_mem(sock_ptr);
sock_ptr->clock = PrrtClock_create();
sock_ptr->is_sender = is_sender;
sock_ptr->isBound = false;
......@@ -97,7 +98,6 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t
error:
PrrtSocket_close(sock_ptr);
return false;
}
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
......@@ -259,6 +259,10 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
free(sock_ptr->address);
}
if(sock_ptr->clock != NULL) {
check(PrrtClock_destroy(sock_ptr->clock), "Destroy clock failed");
}
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
debug("Socket closed.");
......@@ -286,7 +290,7 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
if (n == 0) {
return NULL;
}
uint32_t receiveTime = PrrtClock_get_current_time();
uint32_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
......
......@@ -7,6 +7,7 @@
#include "../util/list.h"
#include "../util/bptree.h"
#include "channelStateInformation.h"
#include "clock.h"
typedef struct prrtSocket {
......@@ -17,6 +18,8 @@ typedef struct prrtSocket {
struct sockaddr_in* address;
bool isBound;
PrrtClock* clock;
pthread_t sendThread;
pthread_mutex_t outQueueFilledMutex;
pthread_cond_t outQueueFilledCv;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment