Commit ec5e890e authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Merge branch 'feature/clockSync'

Conflicts:
	src/prrt/clock.c
	src/prrt/clock.h
	src/prrt/packet.c
	src/prrt/processes/data_receiver.c
	src/prrt/processes/data_transmitter.c
	src/prrt/socket.c
	src/prrt/socket.h
parents 4ae53964 d6126aaf
#include <sys/time.h>
#include <stddef.h>
#include <stdlib.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include "clock.h"
#include "packet.h"
prrtTimestamp_t PrrtClock_get_current_time(void) {
#define diff_ts(timeA, timeB) abs(((int32_t) timeA) - ((int32_t)timeB))
prrtTimestamp_t PrrtClock_get_current_time_us()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (prrtTimestamp_t) (1000000 * tv.tv_sec + tv.tv_usec);
}
uint32_t PrrtClock_get_current_time_ms()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint32_t) (1000 * tv.tv_sec + tv.tv_usec / 1000);
}
uint32_t PrrtClock_get_prrt_time_us(PrrtClock *clock) {
uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime;
if(virtualTime) {
uint32_t lastMeasurement = clock->lastMeasurement;
int32_t diff = diff_ts(currentTime, lastMeasurement);
int32_t skew = (diff * clock->skew) / 400;
return virtualTime + clock->meanDeviation + skew;
} else {
return currentTime;
}
}
uint32_t PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt){
uint32_t currentTime = PrrtClock_get_current_time_us();
uint32_t virtualTime = clock->virtualTime;
int32_t clockSkew = clock->skew;
uint32_t delay = rtt >> 1; // half the rtt
int32_t phaseError = referenceTime - virtualTime + delay;
if(abs(phaseError) > 10000) {
clock->meanDeviation = 0;
clock->skew = 0;
clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime;
}
int32_t deviationSum = phaseError + 3 * clock->meanDeviation; // TODO: why 3?
int32_t meanDeviation = deviationSum / 4; // TODO: find out why???
int32_t period = diff_ts(currentTime, clock->lastMeasurement);
if(period > 0) {
clockSkew = (meanDeviation * 1000 / period) + 15 * clock->skew;
clockSkew = clockSkew / 16;
}
virtualTime = virtualTime + meanDeviation + period*clockSkew/400;
clock->meanDeviation = meanDeviation;
clock->skew = clockSkew;
clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime;
debug("Current Time: %d; Virtual Time %d; Skew: %d", currentTime, virtualTime, clockSkew);
}
PrrtClock* PrrtClock_create() {
PrrtClock* clock = (PrrtClock*) calloc(1, sizeof(PrrtClock));
check_mem(clock);
clock->meanDeviation = 0;
clock->virtualTime = 0;
clock->lastMeasurement = 0;
clock->skew;
return clock;
error:
PERROR("Could not create clock.%s","");
return NULL;
}
bool PrrtClock_destroy(PrrtClock* clock) {
free(clock);
return true;
}
\ No newline at end of file
......@@ -2,7 +2,24 @@
#define PRRT_CLOCK_H
#include "packet.h"
#include <stdint.h>
#include <stdbool.h>
prrtTimestamp_t PrrtClock_get_current_time(void);
typedef struct prrtClock {
uint32_t lastMeasurement;
uint32_t meanDeviation;
int32_t skew;
uint32_t virtualTime;
} PrrtClock;
PrrtClock* PrrtClock_create();
bool PrrtClock_destroy(PrrtClock* clock);
prrtTimestamp_t PrrtClock_get_current_time_us();
prrtTimestamp_t PrrtClock_get_current_time_ms();
prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
prrtTimestamp_t PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
#endif //PRRT_CLOCK_H
......@@ -434,7 +434,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
check_mem(dataPayload);
packet->payload = dataPayload;
dataPayload->timestamp = PrrtClock_get_current_time();
dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->groupRTT_us = 0;
dataPayload->packetTimeout_us = 110; // TODO: payload->packetTimeout_us = NOW + maximum delay
dataPayload->decodingTimeout_us = 150; // TODO: payload->decodingTimeout_us
......
......@@ -72,7 +72,7 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
prrtTimestamp_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
prrtTimestamp_t forwardTripTime = htonl(PrrtClock_get_current_time_us() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forwardTripTimestamp_us = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
......@@ -100,7 +100,7 @@ void *receive_data_loop(void *ptr)
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time();
sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
......
......@@ -36,7 +36,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time()); }
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time_us()); }
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sendto failed.");
......
#include <netdb.h>
#include <stdio.h>
#include <arpa/inet.h>
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
#include "../clock.h"
#include "../socket.h"
#include "data_receiver.h"
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block)
{
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
}
error:
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
}
}
PrrtBlock_destroy(block);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
}
error:
PERROR("Decoding failed.%s", "")
}
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
{
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 50, 4, 6, 8, 9, 5, 1, sock_ptr->address->sin_addr.s_addr);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
uint32_t forwardTripTime = htonl(
PrrtClock_get_prrt_time_us(sock_ptr->clock) + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forward_trip_timestamp = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
free(buf);
PrrtPacket_destroy(feedback_pkt_ptr);
return true;
error:
if(buf != NULL) { free(buf); }
if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
return false;
}
void *receive_data_loop(void *ptr)
{
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_prrt_time_us(sock_ptr->clock);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtPacketDataPayload* payload = packet->payload;
PrrtClock_update(sock_ptr->clock, payload->timestamp, payload->group_round_trip_time);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
uint8_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
// TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
false) {
PrrtPacket_destroy(packet);
} else {
uint16_t baseSequenceNumber = packet->seqno - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
if(block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->seqno) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
}
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->base_seqno,
redundancyPayload->n)) {
PrrtPacket_destroy(packet);
} else {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
if(block == NULL) {
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = PrrtBlock_create(cpar, redundancyPayload->base_seqno);
PrrtCodingParams_destroy(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno,
block);
}
retrieve_data_blocks(sock_ptr, redundancyPayload->base_seqno, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(sock_ptr, block, redundancyPayload->base_seqno);
} else {
PrrtPacket_destroy(packet);
}
}
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
}
// TODO: occasionally clean up dataStore and blockStore !!!
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
}
#include <stdio.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include "../../defines.h"
#include "../block.h"
#include "../clock.h"
#include "../receiver.h"
#include "../socket.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "data_transmitter.h"
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
uint32_t length = PrrtPacket_size(packet);
uint8_t type = PrrtPacket_type(packet);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
// SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
PrrtReceiver* recv = cur->value;
struct hostent *hp;
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons(recv->port);
hp = gethostbyname(recv->host_name);
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(
PrrtClock_get_current_time_us()); }
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
usleep(1);
}
PrrtPacket_destroy(packet);
return true;
error:
PERROR("Sending packet failed.%s", "")
return false;
}
void * send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
PrrtCodingParams *cpar = PrrtCodingParams_create();
while (1) {
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
while (List_count(sock_ptr->outQueue) == 0) {
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
if(sock_ptr->closing) {
PrrtCodingParams_destroy(cpar);
if(block != NULL) {
PrrtBlock_destroy(block);
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
return NULL;
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0, "Cond wait failed.");
}
if(block == NULL) {
block = PrrtBlock_create(cpar, sock_ptr->sequenceNumberSource);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
packet->seqno = sock_ptr->sequenceNumberSource++;
packet->index = (uint8_t) (packet->seqno - block->baseSequenceNumber);
PrrtPacketDataPayload* payload = packet->payload;
// TODO: should lock here !
payload->group_round_trip_time = (uint16_t) (sock_ptr->csi.rttMean / 1000);
PrrtPacket* packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend);
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block) && false) {
uint32_t j = 0;
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
uint32_t redundancyBlocks = List_count(block->redundancyPackets);
for (j = 0; j < redundancyBlocks; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(block);
block = NULL;
}
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
}
error:
PERROR("Failed to send packet %s", "");
return NULL;
}
......@@ -15,14 +15,17 @@
#include "socket.h"
#include "block.h"
#include "receiver.h"
#include "clock.h"
PrrtSocket *PrrtSocket_create(const bool is_sender)
{
PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
check_mem(sock_ptr);
sock_ptr->isSender = is_sender;
sock_ptr->clock = PrrtClock_create();
sock_ptr->isBound = false;
sock_ptr->sequenceNumberSource = 1;
......@@ -98,7 +101,6 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t
error:
PrrtSocket_close(sock_ptr);
return false;
}
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
......@@ -260,8 +262,17 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
free(sock_ptr->address);
}
check(PrrtChannelStateInformation_destroy(sock_ptr->csi), "Could not destroy channel state information.")
check(PrrtApplicationConstraints_destroy(sock_ptr->applicationConstraints), "Could not destroy application constraints.")
if(sock_ptr->csi != NULL) {
check(PrrtChannelStateInformation_destroy(sock_ptr->csi), "Could not destroy channel state information.")
}
if(sock_ptr->applicationConstraints) {
check(PrrtApplicationConstraints_destroy(sock_ptr->applicationConstraints), "Could not destroy application constraints.")
}
if(sock_ptr->clock != NULL) {
check(PrrtClock_destroy(sock_ptr->clock), "Destroy clock failed");
}
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
......@@ -290,7 +301,7 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
if (n == 0) {
return NULL;
}
prrtTimestamp_t receiveTime = PrrtClock_get_current_time();
prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
......
......@@ -8,6 +8,7 @@
#include "../util/bptree.h"
#include "channelStateInformation.h"
#include "applicationContraints.h"
#include "clock.h"
typedef struct prrtSocket {
......@@ -18,6 +19,8 @@ typedef struct prrtSocket {
struct sockaddr_in* address;
bool isBound;
PrrtClock* clock;
pthread_t sendThread;
pthread_mutex_t outQueueFilledMutex;
pthread_cond_t outQueueFilledCv;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment