Commit df890213 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Merge branch 'feature/rttMeasurement' into develop

parents b8c06bb9 82fa5a5e
Pipeline #76 passed with stage
...@@ -13,7 +13,7 @@ if __name__ == "__main__": ...@@ -13,7 +13,7 @@ if __name__ == "__main__":
# TODO: support multiple tests via proper socket termination # TODO: support multiple tests via proper socket termination
setups = [ setups = [
perf.TestSetup(packets=2**10,delay=0,loss=0.5,reorder=0,duplicate=0) perf.TestSetup(packets=2**10,delay=0,loss=0,reorder=0,duplicate=0)
] ]
for setup in setups: for setup in setups:
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#define N_START 7 #define N_START 7
#define N_P_START 1 #define N_P_START 1
#define RRT_ALPHA 0.125
// Uncomment the line below if you are compiling on Windows. // Uncomment the line below if you are compiling on Windows.
// #define WINDOWS // #define WINDOWS
#include <stdio.h> #include <stdio.h>
......
add_library(PRRT ../defines.h socket.c block.c block.h packet.c packet.h receiver.c receiver.h processes/feedback_receiver.c processes/feedback_receiver.h processes/data_transmitter.c processes/data_transmitter.h coding_params.c coding_params.h vdmcode/block_code.c vdmcode/block_code.h coding_params.c coding_params.h stores/forward_packet_table.c stores/forward_packet_table.h processes/data_receiver.c processes/data_receiver.h) add_library(PRRT ../defines.h socket.c block.c block.h packet.c packet.h receiver.c receiver.h clock.c clock.h channelStateInformation.c channelStateInformation.h processes/feedback_receiver.c processes/feedback_receiver.h processes/data_transmitter.c processes/data_transmitter.h coding_params.c coding_params.h vdmcode/block_code.c vdmcode/block_code.h coding_params.c coding_params.h stores/forward_packet_table.c stores/forward_packet_table.h processes/data_receiver.c processes/data_receiver.h)
\ No newline at end of file \ No newline at end of file
#include <stdlib.h>
#include "../defines.h"
#include "../util/common.h"
#include "../util/dbg.h"
#include "channelStateInformation.h"
void PrrtChannelStateInformation_init(PrrtChannelStateInformation *csi)
{
check(csi != NULL, "Input should not be NULL.");
csi->rttMean = 0;
csi->rttDev = 0;
return;
error:
PERROR("Should not happen.%s","");
}
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, uint32_t rtt)
{
int32_t delta = rtt - csi->rttMean;
// TODO: ensure that there are no arithemtic problems via rounding etc.
csi->rttMean = (uint16_t) (csi->rttMean + RRT_ALPHA * delta);
csi->rttDev = (uint16_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev));
}
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi)
{
check(csi != NULL, "Input should not be NULL.");
printf("RTT [us]: %d +- %d\n", csi->rttMean, csi->rttDev);
return;
error:
PERROR("Should not happen.%s","");
}
\ No newline at end of file
#ifndef PRRT_CHANNELSTATEINFORMATION_H
#define PRRT_CHANNELSTATEINFORMATION_H
#include <stdint.h>
typedef struct prrtChannelStateInformation {
uint16_t rttMean;
uint16_t rttDev;
} PrrtChannelStateInformation;
void PrrtChannelStateInformation_init(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation* csi, uint32_t rtt);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
#endif //PRRT_CHANNELSTATEINFORMATION_H
#include <sys/time.h>
#include <stddef.h>
#include "clock.h"
uint32_t PrrtClock_get_current_time(void) {
struct timeval tv;
gettimeofday(&tv, NULL);
uint32_t time_in_micros = (uint32_t) (1000000 * tv.tv_sec + tv.tv_usec);
return time_in_micros;
}
#ifndef PRRT_CLOCK_H
#define PRRT_CLOCK_H
#include <stdint.h>
uint32_t PrrtClock_get_current_time(void);
#endif //PRRT_CLOCK_H
...@@ -6,7 +6,9 @@ ...@@ -6,7 +6,9 @@
#include "../util/common.h" #include "../util/common.h"
#include "../util/dbg.h" #include "../util/dbg.h"
#include <stdbool.h> #include <stdbool.h>
#include <arpa/inet.h>
#include "packet.h" #include "packet.h"
#include "clock.h"
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet); void *encode_general_header(void *buf_ptr, const PrrtPacket *packet);
...@@ -64,9 +66,16 @@ int PrrtPacket_print(PrrtPacket *packet_ptr) ...@@ -64,9 +66,16 @@ int PrrtPacket_print(PrrtPacket *packet_ptr)
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
} else if(type == PACKET_TYPE_FEEDBACK) { } else if(type == PACKET_TYPE_FEEDBACK) {
PrrtPacketFeedbackPayload *payload = packet_ptr->payload; PrrtPacketFeedbackPayload *payload = packet_ptr->payload;
struct sockaddr_in receiver;
receiver.sin_addr.s_addr = payload->receiver_addr;
char* address = inet_ntoa(receiver.sin_addr);
printf("| %61s |\n", address);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->group_round_trip_time); printf("| %61u |\n", payload->group_round_trip_time);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->forward_trip_time); printf("| %61u |\n", payload->forward_trip_timestamp);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->packet_loss_rate); printf("| %61u |\n", payload->packet_loss_rate);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n"); printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
...@@ -142,8 +151,7 @@ bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr) ...@@ -142,8 +151,7 @@ bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
buf_ptr = encode_redundancy_header(buf_ptr, payload); buf_ptr = encode_redundancy_header(buf_ptr, payload);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if(type == PACKET_TYPE_FEEDBACK) { } else if(type == PACKET_TYPE_FEEDBACK) {
buf_ptr = encode_feedback_header(buf_ptr, payload); encode_feedback_header(buf_ptr, payload);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_FEEDBACK_HEADER_SIZE);
} else { } else {
perror("NOT IMPLEMENTED"); perror("NOT IMPLEMENTED");
return false; return false;
...@@ -178,7 +186,7 @@ void *encode_feedback_header(void *buf_ptr, const void *payload) ...@@ -178,7 +186,7 @@ void *encode_feedback_header(void *buf_ptr, const void *payload)
const PrrtPacketFeedbackPayload *feedbackPayload = payload; const PrrtPacketFeedbackPayload *feedbackPayload = payload;
uint32_t *receiver_ip = (uint32_t *) buf_ptr; uint32_t *receiver_ip = (uint32_t *) buf_ptr;
*receiver_ip = htonl(feedbackPayload->receiver_addr); *receiver_ip = feedbackPayload->receiver_addr;
buf_ptr += 4; buf_ptr += 4;
uint32_t *group_round_trip_time = (uint32_t *) buf_ptr; uint32_t *group_round_trip_time = (uint32_t *) buf_ptr;
...@@ -186,7 +194,7 @@ void *encode_feedback_header(void *buf_ptr, const void *payload) ...@@ -186,7 +194,7 @@ void *encode_feedback_header(void *buf_ptr, const void *payload)
buf_ptr += 4; buf_ptr += 4;
uint32_t *forward_trip_time = (uint32_t *) buf_ptr; uint32_t *forward_trip_time = (uint32_t *) buf_ptr;
*forward_trip_time = htonl(feedbackPayload->forward_trip_time); *forward_trip_time = htonl(feedbackPayload->forward_trip_timestamp);
buf_ptr += 4; buf_ptr += 4;
uint32_t *packet_loss_rate = (uint32_t *) buf_ptr; uint32_t *packet_loss_rate = (uint32_t *) buf_ptr;
...@@ -281,15 +289,15 @@ bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targ ...@@ -281,15 +289,15 @@ bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targ
targetPacket->payload = payload_buffer; targetPacket->payload = payload_buffer;
targetPacket->payload_len = payload_len; targetPacket->payload_len = payload_len;
if(PrrtPacket_type(targetPacket) == PACKET_TYPE_DATA) { uint8_t packetType = PrrtPacket_type(targetPacket);
if(packetType == PACKET_TYPE_DATA) {
srcBuffer = decode_data_header(srcBuffer, payload_buffer); srcBuffer = decode_data_header(srcBuffer, payload_buffer);
PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_DATA_HEADER_SIZE);
} else if(PrrtPacket_type(targetPacket)) { } else if(packetType == PACKET_TYPE_REDUNDANCY) {
srcBuffer = decode_redundancy_header(srcBuffer, payload_buffer); srcBuffer = decode_redundancy_header(srcBuffer, payload_buffer);
PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_REDUNDANCY_HEADER_SIZE); PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if(PrrtPacket_type(targetPacket) == PACKET_TYPE_FEEDBACK) { } else if(packetType == PACKET_TYPE_FEEDBACK) {
srcBuffer = decode_feedback_header(srcBuffer, payload_buffer); decode_feedback_header(srcBuffer, payload_buffer);
PrrtPacket_copy_buffer_to_payload(targetPacket, srcBuffer, PRRT_PACKET_FEEDBACK_HEADER_SIZE);
} else { } else {
printf("NOT IMPLEMENTED\n"); printf("NOT IMPLEMENTED\n");
} }
...@@ -323,7 +331,7 @@ void *decode_feedback_header(void *dstBuffer, const void *srcBuffer) ...@@ -323,7 +331,7 @@ void *decode_feedback_header(void *dstBuffer, const void *srcBuffer)
PrrtPacketFeedbackPayload *feedback_payload = (PrrtPacketFeedbackPayload *) srcBuffer; PrrtPacketFeedbackPayload *feedback_payload = (PrrtPacketFeedbackPayload *) srcBuffer;
uint32_t *receiverAddr = (uint32_t *) dstBuffer; uint32_t *receiverAddr = (uint32_t *) dstBuffer;
feedback_payload->receiver_addr = ntohl(*receiverAddr); feedback_payload->receiver_addr = *receiverAddr;
dstBuffer += 4; dstBuffer += 4;
uint32_t *groupRoundTripTime = (uint32_t *) dstBuffer; uint32_t *groupRoundTripTime = (uint32_t *) dstBuffer;
...@@ -331,7 +339,7 @@ void *decode_feedback_header(void *dstBuffer, const void *srcBuffer) ...@@ -331,7 +339,7 @@ void *decode_feedback_header(void *dstBuffer, const void *srcBuffer)
dstBuffer += 4; dstBuffer += 4;
uint32_t *forwardTripTime = (uint32_t *) dstBuffer; uint32_t *forwardTripTime = (uint32_t *) dstBuffer;
feedback_payload->forward_trip_time = ntohl(*forwardTripTime); feedback_payload->forward_trip_timestamp = ntohl(*forwardTripTime);
dstBuffer += 4; dstBuffer += 4;
uint32_t *packetLossRate = (uint32_t *) dstBuffer; uint32_t *packetLossRate = (uint32_t *) dstBuffer;
...@@ -410,11 +418,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP ...@@ -410,11 +418,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
check_mem(dataPayload); check_mem(dataPayload);
packet->payload = dataPayload; packet->payload = dataPayload;
struct timeval tv; dataPayload->timestamp = PrrtClock_get_current_time();
gettimeofday(&tv, NULL);
unsigned long time_in_micros = (unsigned long) (1000000 * tv.tv_sec + tv.tv_usec);
dataPayload->timestamp = (uint32_t) time_in_micros;
dataPayload->group_round_trip_time = 95; // TODO: payload->rtt = CURRENT ESTIMATE dataPayload->group_round_trip_time = 95; // TODO: payload->rtt = CURRENT ESTIMATE
dataPayload->packet_timeout = 110; // TODO: payload->packet_timeout = NOW + maximum delay dataPayload->packet_timeout = 110; // TODO: payload->packet_timeout = NOW + maximum delay
dataPayload->decoding_timeout = 150; // TODO: payload->decoding_timeout dataPayload->decoding_timeout = 150; // TODO: payload->decoding_timeout
...@@ -452,9 +456,9 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP ...@@ -452,9 +456,9 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP
} }
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t sequenceNumber, PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t sequenceNumber,
uint32_t roundTripTime, uint32_t forwardTripTime, uint32_t packetLossRate, uint32_t roundTripTime, uint32_t packetLossRate, uint16_t gap,
uint16_t gap, uint16_t ngap, uint16_t burst, uint16_t nburst, uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth,
uint32_t bandwidth, uint32_t bufferFeedback) uint32_t bufferFeedback)
{ {
PrrtPacket *packet = create_header(priority, sequenceNumber, PRRT_PACKET_FEEDBACK_HEADER_SIZE, PACKET_TYPE_FEEDBACK, PrrtPacket *packet = create_header(priority, sequenceNumber, PRRT_PACKET_FEEDBACK_HEADER_SIZE, PACKET_TYPE_FEEDBACK,
index); index);
...@@ -463,8 +467,9 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, u ...@@ -463,8 +467,9 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, u
check_mem(payload); check_mem(payload);
packet->payload = payload; packet->payload = payload;
payload->receiver_addr = inet_addr("10.0.0.1");
payload->group_round_trip_time = roundTripTime; payload->group_round_trip_time = roundTripTime;
payload->forward_trip_time = forwardTripTime; payload->forward_trip_timestamp = 0;
payload->packet_loss_rate = packetLossRate; payload->packet_loss_rate = packetLossRate;
payload->gap = gap; payload->gap = gap;
payload->ngap = ngap; payload->ngap = ngap;
......
...@@ -25,6 +25,7 @@ typedef struct prrtPacket { ...@@ -25,6 +25,7 @@ typedef struct prrtPacket {
uint32_t payload_len; uint32_t payload_len;
} PrrtPacket; } PrrtPacket;
#define PRRT_PACKET_GENERAL_HEADER_SIZE 8 #define PRRT_PACKET_GENERAL_HEADER_SIZE 8
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
#define SEQNO_SPACE 65536 // 2**16 as seqno is uint16_t #define SEQNO_SPACE 65536 // 2**16 as seqno is uint16_t
...@@ -47,7 +48,7 @@ typedef struct prrtPacketRedundancyPayload { ...@@ -47,7 +48,7 @@ typedef struct prrtPacketRedundancyPayload {
typedef struct prrtPacketFeedbackPayload { typedef struct prrtPacketFeedbackPayload {
uint32_t receiver_addr; uint32_t receiver_addr;
uint32_t group_round_trip_time; uint32_t group_round_trip_time;
uint32_t forward_trip_time; uint32_t forward_trip_timestamp;
uint32_t packet_loss_rate; uint32_t packet_loss_rate;
uint16_t gap; uint16_t gap;
uint16_t ngap; uint16_t ngap;
...@@ -66,8 +67,8 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original); ...@@ -66,8 +67,8 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original);
PrrtPacket * PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer, uint32_t payloadLength, uint16_t sequenceNumber); PrrtPacket * PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer, uint32_t payloadLength, uint16_t sequenceNumber);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t sequenceNumber, uint32_t roundTripTime, PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t sequenceNumber,
uint32_t forwardTripTime, uint32_t packetLossRate, uint16_t gap, uint32_t roundTripTime, uint32_t packetLossRate, uint16_t gap,
uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth, uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth,
uint32_t bufferFeedback); uint32_t bufferFeedback);
...@@ -81,4 +82,6 @@ int PrrtPacket_destroy(PrrtPacket *packet); ...@@ -81,4 +82,6 @@ int PrrtPacket_destroy(PrrtPacket *packet);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload_len - header_size); #define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload_len - header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload_len - header_size); #define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload_len - header_size);
#define PrrtPacket_get_data_timestamp(packet) ((PrrtPacketDataPayload*) packet->payload)->timestamp;
#endif //PRRT_FRAME_H #endif //PRRT_FRAME_H
#include <netdb.h> #include <netdb.h>
#include <stdio.h> #include <stdio.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <src/prrt/clock.h>
#include "../../defines.h" #include "../../defines.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
#include "../../util/common.h" #include "../../util/common.h"
...@@ -61,12 +62,16 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) ...@@ -61,12 +62,16 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
hp = gethostbyname(remote_host); hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1); PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 50, 4, 6, 8, 9, 5, 1);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr); uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length); void *buf = calloc(1, length);
check_mem(buf); check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small"); check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
uint32_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forward_trip_timestamp = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed."); length, "Sending feedback failed.");
free(buf); free(buf);
...@@ -92,12 +97,20 @@ void *receive_data_loop(void *ptr) ...@@ -92,12 +97,20 @@ void *receive_data_loop(void *ptr)
while(1) { while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH); memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen); n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
check(send_feedback(sock_ptr, remote), "Sending feedback failed."); sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time();
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket)); PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet); check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed."); check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
// TODO: make something useful with RTT approximation
PrrtPacketDataPayload* payload = packet->payload;
debug("RTT: %d", payload->group_round_trip_time);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
uint8_t packetType = PrrtPacket_type(packet); uint8_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) { if(packetType == PACKET_TYPE_DATA) {
// TODO: packet.timestamp + packet.timeout < now: break // TODO: packet.timestamp + packet.timeout < now: break
......
...@@ -3,9 +3,10 @@ ...@@ -3,9 +3,10 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include "../../defines.h" #include "../../defines.h"
#include "../socket.h"
#include "../block.h" #include "../block.h"
#include "../clock.h"
#include "../receiver.h" #include "../receiver.h"
#include "../socket.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "data_transmitter.h" #include "data_transmitter.h"
...@@ -15,6 +16,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -15,6 +16,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH]; uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
uint32_t length = PrrtPacket_size(packet); uint32_t length = PrrtPacket_size(packet);
uint8_t type = PrrtPacket_type(packet);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small."); check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
...@@ -33,6 +35,8 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -33,6 +35,8 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name) check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time()); }
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed."); check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
usleep(1); usleep(1);
} }
...@@ -76,6 +80,10 @@ void * send_data_loop(void *ptr) { ...@@ -76,6 +80,10 @@ void * send_data_loop(void *ptr) {
packet->seqno = sock_ptr->sequenceNumberSource++; packet->seqno = sock_ptr->sequenceNumberSource++;
packet->index = (uint8_t) (packet->seqno - block->baseSequenceNumber); packet->index = (uint8_t) (packet->seqno - block->baseSequenceNumber);
PrrtPacketDataPayload* payload = packet->payload;
// TODO: should lock here !
payload->group_round_trip_time = (uint16_t) (sock_ptr->csi.rttMean / 1000);
PrrtPacket* packetToSend = PrrtPacket_copy(packet); PrrtPacket* packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend); send_packet(sock_ptr, packetToSend);
......
...@@ -3,20 +3,18 @@ ...@@ -3,20 +3,18 @@
#include "../../defines.h" #include "../../defines.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
#include "../clock.h"
#include "../packet.h" #include "../packet.h"
#include "../socket.h" #include "../socket.h"
#include "feedback_receiver.h" #include "feedback_receiver.h"
void * receive_feedback_loop(void *ptr) { void * receive_feedback_loop(void *ptr) {
char bufin[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
while (sock_ptr->closing == false) { while (sock_ptr->closing == false) {
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
memset(bufin, 0, MAX_PAYLOAD_LENGTH); PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
if (t != NULL) { if (t != NULL) {
PrrtPacket_destroy(t); PrrtPacket_destroy(t);
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "socket.h" #include "socket.h"
#include "block.h" #include "block.h"
#include "receiver.h" #include "receiver.h"
#include "clock.h"
PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) { PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket)); PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
...@@ -25,6 +26,8 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) { ...@@ -25,6 +26,8 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
sock_ptr->sequenceNumberSource = 1; sock_ptr->sequenceNumberSource = 1;
sock_ptr->sequenceNumberRedundancy = 1; sock_ptr->sequenceNumberRedundancy = 1;
PrrtChannelStateInformation_init(&sock_ptr->csi);
sock_ptr->dataStore = NULL; sock_ptr->dataStore = NULL;
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.") check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
...@@ -252,14 +255,16 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { ...@@ -252,14 +255,16 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
return -1; return -1;
} }
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) { PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
ssize_t n; ssize_t n;
struct sockaddr_in remote; struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote); socklen_t addrlen = sizeof(remote);
struct pollfd fds; struct pollfd fds;
int timeout_msecs = 1000; int timeout_msecs = 1000;
fds.fd = sock_ptr->feedbackSocketFd; fds.fd = prrtSocket->feedbackSocketFd;
fds.events = POLLIN; fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs); n = poll(&fds, 1, timeout_msecs);
...@@ -267,18 +272,19 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co ...@@ -267,18 +272,19 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
if (n == 0) { if (n == 0) {
return NULL; return NULL;
} }
uint32_t receiveTime = PrrtClock_get_current_time();
n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen); n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed."); check(n >= 0, "Receiving feedback failed.");
//uint16_t remote_port = ntohs(remote.sin_port); PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket));
//char *remote_host = inet_ntoa(remote.sin_addr); check_mem(prrtPacket);
//debug("Received feedback %s:%d", remote_host, remote_port); PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
uint32_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forward_trip_timestamp;
PrrtChannelStateInformation_update_rtt(&prrtSocket->csi, receiveTime - forwardTripTimestamp);
PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket)); return prrtPacket;
check_mem(packet_ptr);
PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
return packet_ptr;
error: error:
return NULL; return NULL;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "stores/forward_packet_table.h" #include