Commit 86c46a16 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Application constraints are locked.

parent ec5e890e
......@@ -7,6 +7,9 @@ PrrtApplicationConstraints *PrrtApplicationConstraints_create()
{
PrrtApplicationConstraints *constraints = calloc(1, sizeof(PrrtApplicationConstraints));
check_mem(constraints);
pthread_mutex_init(&constraints->lock, NULL);
return constraints;
error:
......@@ -16,11 +19,23 @@ PrrtApplicationConstraints *PrrtApplicationConstraints_create()
bool PrrtApplicationConstraints_destroy(PrrtApplicationConstraints *applicationConstraints)
{
pthread_mutex_destroy(&applicationConstraints->lock);
free(applicationConstraints);
return true;
}
prrtTimedelta_t PrrtApplicationConstraints_get_target_delay(PrrtApplicationConstraints *applicationConstraints)
{
return applicationConstraints->targetDelay;
pthread_mutex_lock(&applicationConstraints->lock);
prrtTimedelta_t targetDelay = applicationConstraints->targetDelay;
pthread_mutex_unlock(&applicationConstraints->lock);
return targetDelay;
}
bool PrrtApplicationConstraints_set_target_delay(PrrtApplicationConstraints *applicationConstraints, prrtTimedelta_t targetDelay)
{
pthread_mutex_lock(&applicationConstraints->lock);
applicationConstraints->targetDelay = targetDelay;
pthread_mutex_unlock(&applicationConstraints->lock);
return true;
}
......@@ -5,11 +5,13 @@
typedef struct applicationConstraints {
prrtTimedelta_t targetDelay;
pthread_mutex_t lock;
} PrrtApplicationConstraints;
PrrtApplicationConstraints *PrrtApplicationConstraints_create(void);
bool PrrtApplicationConstraints_destroy(PrrtApplicationConstraints *applicationConstraints);
prrtTimedelta_t PrrtApplicationConstraints_get_target_delay(PrrtApplicationConstraints *applicationConstraints);
bool PrrtApplicationConstraints_set_target_delay(PrrtApplicationConstraints *applicationConstraints, prrtTimedelta_t targetDelay);
#endif //PRRT_NETWORKCONTRAINTS_H
......@@ -106,9 +106,11 @@ void *receive_data_loop(void *ptr)
check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtPacket_print(packet);
// TODO: make something useful with RTT approximation
PrrtPacketDataPayload* payload = packet->payload;
debug("RTT: %d", payload->groupRTT_us);
debug("RTT: %d, Packet Timeout: %d", payload->groupRTT_us, payload->packetTimeout_us);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
......
......@@ -36,7 +36,14 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time_us()); }
if(type == PACKET_TYPE_DATA) {
PrrtPacketDataPayload *ptr = (PrrtPacketDataPayload *) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH;
prrtTimedelta_t targetDelay = PrrtApplicationConstraints_get_target_delay(sock_ptr->applicationConstraints);
prrtTimestamp_t currentTime = PrrtClock_get_current_time();
ptr->timestamp = htonl(currentTime);
//ptr->packetTimeout_us = htonl(currentTime + targetDelay);
//debug("TIMEOUT: %d %d", currentTime + targetDelay, ptr->packetTimeout_us);
}
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sendto failed.");
......
#include <netdb.h>
#include <stdio.h>
#include <arpa/inet.h>
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
#include "../clock.h"
#include "../socket.h"
#include "data_receiver.h"
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block)
{
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
}
error:
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
}
}
PrrtBlock_destroy(block);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
}
error:
PERROR("Decoding failed.%s", "")
}
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
{
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 50, 4, 6, 8, 9, 5, 1, sock_ptr->address->sin_addr.s_addr);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
uint32_t forwardTripTime = htonl(
PrrtClock_get_prrt_time_us(sock_ptr->clock) + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forward_trip_timestamp = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
free(buf);
PrrtPacket_destroy(feedback_pkt_ptr);
return true;
error:
if(buf != NULL) { free(buf); }
if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
return false;
}
void *receive_data_loop(void *ptr)
{
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
sock_ptr->lastReceivedTimestamp = PrrtClock_get_prrt_time_us(sock_ptr->clock);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
PrrtPacketDataPayload* payload = packet->payload;
PrrtClock_update(sock_ptr->clock, payload->timestamp, payload->group_round_trip_time);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
uint8_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
// TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
false) {
PrrtPacket_destroy(packet);
} else {
uint16_t baseSequenceNumber = packet->seqno - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
if(block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->seqno) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
}
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->base_seqno,
redundancyPayload->n)) {
PrrtPacket_destroy(packet);
} else {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
if(block == NULL) {
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = PrrtBlock_create(cpar, redundancyPayload->base_seqno);
PrrtCodingParams_destroy(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno,
block);
}
retrieve_data_blocks(sock_ptr, redundancyPayload->base_seqno, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(sock_ptr, block, redundancyPayload->base_seqno);
} else {
PrrtPacket_destroy(packet);
}
}
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
}
// TODO: occasionally clean up dataStore and blockStore !!!
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
}
#include <stdio.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include "../../defines.h"
#include "../block.h"
#include "../clock.h"
#include "../receiver.h"
#include "../socket.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "data_transmitter.h"
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
uint32_t length = PrrtPacket_size(packet);
uint8_t type = PrrtPacket_type(packet);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
// SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
PrrtReceiver* recv = cur->value;
struct hostent *hp;
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons(recv->port);
hp = gethostbyname(recv->host_name);
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(
PrrtClock_get_current_time_us()); }
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
usleep(1);
}
PrrtPacket_destroy(packet);
return true;
error:
PERROR("Sending packet failed.%s", "")
return false;
}
void * send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
PrrtCodingParams *cpar = PrrtCodingParams_create();
while (1) {
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
while (List_count(sock_ptr->outQueue) == 0) {
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
if(sock_ptr->closing) {
PrrtCodingParams_destroy(cpar);
if(block != NULL) {
PrrtBlock_destroy(block);
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
return NULL;
}
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0, "Cond wait failed.");
}
if(block == NULL) {
block = PrrtBlock_create(cpar, sock_ptr->sequenceNumberSource);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
packet->seqno = sock_ptr->sequenceNumberSource++;
packet->index = (uint8_t) (packet->seqno - block->baseSequenceNumber);
PrrtPacketDataPayload* payload = packet->payload;
// TODO: should lock here !
payload->group_round_trip_time = (uint16_t) (sock_ptr->csi.rttMean / 1000);
PrrtPacket* packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend);
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block) && false) {
uint32_t j = 0;
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
uint32_t redundancyBlocks = List_count(block->redundancyPackets);
for (j = 0; j < redundancyBlocks; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(block);
block = NULL;
}
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
}
error:
PERROR("Failed to send packet %s", "");
return NULL;
}
......@@ -316,7 +316,12 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
printf("%s\n", inet_ntoa(a));
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us;
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, receiveTime - forwardTripTimestamp);
debug("%d %d", receiveTime, forwardTripTimestamp);
double diff = difftime(receiveTime, forwardTripTimestamp);
printf("%f\n",diff);
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) diff);
PrrtChannelStateInformation_print(prrtSocket->csi);
return prrtPacket;
......
......@@ -37,7 +37,7 @@ int main(int argc, char* const argv[]) {
continue;
}
buffer[n] = '\0';
printf("[B (n: %3d, i: %3d)] %s\n", n, i, buffer);
//printf("[B (n: %3d, i: %3d)] %s\n", n, i, buffer);
i++;
usleep(1);
}
......
......@@ -15,13 +15,14 @@ int main(int argc, char *const argv[]) {
uint16_t local_port = 6000;
PrrtSocket* socket = PrrtSocket_create(true);
PrrtSocket_set_sock_opt(socket, "targetdelay", 100*1000);
printf("PRRT - SENDER\n");
PrrtSocket_bind(socket, "127.0.1.0", local_port);
check(socket != NULL, "Socket creation failed.");
char *remote_host = "127.0.0.1";
uint16_t remote_port = 5000;
uint16_t remote_port = 5003;
PrrtSocket_connect(socket, remote_host, remote_port);
char *remote_host2 = "127.0.0.1";
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment