Commit 293d07ce authored by Andreas Schmidt's avatar Andreas Schmidt

Introduce custom typedefs for prrt types.

parent c3b549f4
......@@ -63,7 +63,7 @@ cdef extern from "prrt/block.h":
uint32_t redundancy_count
PrrtCodingParams coding_params
uint32_t largest_data_length
uint16_t base_seqno
uint16_t baseSequenceNumber
List* data_blocks
List* redundancy_blocks
uint8_t is_coded
......@@ -74,9 +74,9 @@ cdef extern from "prrt/packet.h":
cdef struct prrtPacket:
uint8_t type_priority;
uint8_t index;
uint16_t seqno;
uint16_t sequenceNumber;
void* payload;
uint32_t payload_len;
uint32_t payloadLength;
ctypedef prrtPacket PrrtPacket
......@@ -121,7 +121,7 @@ cdef extern from "prrt/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(bint is_sender)
cdef PrrtSocket* PrrtSocket_create(bint isSender)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
......
......@@ -3,7 +3,7 @@ cimport cprrt
cdef extern from "prrt/stores/forwardPacketTable.c":
int PrrtForwardPacketTable_create(cprrt.PrrtForwardPacketTable* fpt_prt);
int PrrtForwardPacketTable_test_set_is_number_relevant(cprrt.PrrtForwardPacketTable *fpt_ptr, uint16_t seqno);
int PrrtForwardPacketTable_test_set_is_number_relevant(cprrt.PrrtForwardPacketTable *fpt_ptr, uint16_t sequenceNumber);
int PrrtForwardPacketTable_test_is_block_relevant(cprrt.PrrtForwardPacketTable * forwardPacketTable, uint16_t start, uint16_t length);
int PrrtForwardPacketTable_destroy(cprrt.PrrtForwardPacketTable* fpt_prt);
......@@ -42,7 +42,7 @@ cdef extern from "prrt/packet.c":
int PrrtPacket_print(cprrt.PrrtPacket *packet_ptr)
cdef extern from "prrt/socket.c":
cprrt.PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t is_sender)
cprrt.PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t isSender)
cdef extern from "util/bptree.c":
cprrt.BPTreeNode *BPTree_insert(cprrt.BPTreeNode *root, int key, void *value)
......
......@@ -61,7 +61,7 @@ void PrrtBlock_destroy(PrrtBlock *mblock)
free(mblock);
}
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno)
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber)
{
PrrtBlock *block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
......@@ -69,7 +69,7 @@ PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno)
block->codingParams = *cpar;
block->dataPackets = List_create();
block->redundancyPackets = List_create();
block->baseSequenceNumber = base_seqno;
block->baseSequenceNumber = baseSequenceNumber;
block->largestDataLength = 0;
return block;
......@@ -84,15 +84,15 @@ bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPa
bool found = false;
LIST_FOREACH(prrtBlock->dataPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == prrtPacket->seqno) {
if(pkt->sequenceNumber == prrtPacket->sequenceNumber) {
found = true;
}
}
if(found == false) {
List_push(prrtBlock->dataPackets, prrtPacket);
prrtBlock->largestDataLength = (uint32_t) MAX(prrtBlock->largestDataLength,
prrtPacket->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
prrtBlock->largestDataLength = (prrtPacketLength_t) MAX(prrtBlock->largestDataLength,
prrtPacket->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
return true;
} else {
return false;
......@@ -104,15 +104,15 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *
bool found = false;
LIST_FOREACH(block_ptr->redundancyPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == ptr->seqno) {
if(pkt->sequenceNumber == ptr->sequenceNumber) {
found = true;
}
}
if(found == false) {
List_push(block_ptr->redundancyPackets, ptr);
block_ptr->largestDataLength = (uint32_t) MAX(block_ptr->largestDataLength,
ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
block_ptr->largestDataLength = (prrtPacketLength_t) MAX(block_ptr->largestDataLength,
ptr->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return true;
} else {
return false;
......@@ -134,14 +134,14 @@ PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
return List_shift(block_ptr->dataPackets);
}
void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
{
int j = 0;
uint8_t k = block_ptr->codingParams.k;
uint8_t n = block_ptr->codingParams.n;
uint8_t r = block_ptr->codingParams.r;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
......@@ -154,7 +154,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
src[j] = calloc(length, sizeof(gf));
check_mem(src[j]);
PrrtPacket *pkt = cur->value;
pkt->index = (uint8_t) ((pkt->seqno - baseSequenceNumber) % SEQNO_SPACE);
pkt->index = (uint8_t) ((pkt->sequenceNumber - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
......@@ -168,7 +168,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), block_ptr->baseSequenceNumber,
block_ptr->codingParams);
*seqno = (uint16_t) ((*seqno + 1) % SEQNO_SPACE);
*seqno = (prrtSequenceNumber_t) ((*seqno + 1) % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
}
......@@ -193,8 +193,8 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
int *idx_p = NULL;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
......@@ -219,7 +219,7 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
for(j = 0; j < k; j++) {
if(idx_p[j] >= k) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (baseSequenceNumber + j));
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (prrtSequenceNumber_t) (baseSequenceNumber + j));
if(PrrtBlock_insert_data_packet(block_ptr, packet) == false) {
debug("Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
......
......@@ -7,8 +7,8 @@
typedef struct prrtBlock {
PrrtCodingParams codingParams;
uint32_t largestDataLength;
uint16_t baseSequenceNumber;
prrtPacketLength_t largestDataLength;
prrtSequenceNumber_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
bool isCoded;
......@@ -18,7 +18,7 @@ typedef struct prrtBlock {
/**
* Allocate space for a block.
*/
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno);
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber);
/**
* Frees the PrrtBlock data structure.
......@@ -33,7 +33,7 @@ bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno);
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno);
bool PrrtBlock_decode(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr);
......
......@@ -16,12 +16,12 @@ void PrrtChannelStateInformation_init(PrrtChannelStateInformation *csi)
PERROR("Should not happen.%s","");
}
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, uint32_t rtt)
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, prrtTimedelta_t rtt)
{
int32_t delta = rtt - csi->rttMean;
// TODO: ensure that there are no arithemtic problems via rounding etc.
csi->rttMean = (uint16_t) (csi->rttMean + RRT_ALPHA * delta);
csi->rttDev = (uint16_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev));
csi->rttMean = (prrtTimedelta_t) (csi->rttMean + RRT_ALPHA * delta);
csi->rttDev = (prrtTimedelta_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev));
}
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi)
......
#ifndef PRRT_CHANNELSTATEINFORMATION_H
#define PRRT_CHANNELSTATEINFORMATION_H
#include <stdint.h>
#include "packet.h"
typedef struct prrtChannelStateInformation {
uint16_t rttMean;
uint16_t rttDev;
prrtTimedelta_t rttMean;
prrtTimedelta_t rttDev;
} PrrtChannelStateInformation;
void PrrtChannelStateInformation_init(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation* csi, uint32_t rtt);
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation* csi, prrtTimedelta_t rtt);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
......
#include <sys/time.h>
#include <stddef.h>
#include "clock.h"
#include "packet.h"
uint32_t PrrtClock_get_current_time(void) {
prrtTimestamp_t PrrtClock_get_current_time(void) {
struct timeval tv;
gettimeofday(&tv, NULL);
uint32_t time_in_micros = (uint32_t) (1000000 * tv.tv_sec + tv.tv_usec);
return time_in_micros;
return (prrtTimestamp_t) (1000000 * tv.tv_sec + tv.tv_usec);
}
#ifndef PRRT_CLOCK_H
#define PRRT_CLOCK_H
#include <stdint.h>
#include "packet.h"
uint32_t PrrtClock_get_current_time(void);
prrtTimestamp_t PrrtClock_get_current_time(void);
#endif //PRRT_CLOCK_H
......@@ -3,6 +3,7 @@
#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
typedef struct prrtCodingParams {
pthread_mutex_t lock;
......
This diff is collapsed.
#ifndef PRRT_FRAME_H
#define PRRT_FRAME_H
// CREATE: PRRT_PACKET
// ENCODE: PRRT_PACKET -> BYTES
// DECODE: BYTES -> PRRT_PACKET
// DESTROY:
#include <stdint.h>
#include "codingParams.h"
#include <stdbool.h>
#define PACKET_TYPE_DATA 0
#define PACKET_TYPE_REPETITION 1
#define PACKET_TYPE_REDUNDANCY 2
#define PACKET_TYPE_FEEDBACK 3
#define PACKET_TYPE_PRE_REDUNDANCY 4
#define PACKET_TYPE_CHANNEL_FEEDBACK 5
typedef enum {
PACKET_TYPE_DATA = 0,
PACKET_TYPE_REPETITION = 1,
PACKET_TYPE_REDUNDANCY = 2,
PACKET_TYPE_FEEDBACK = 3,
PACKET_TYPE_PRESENT_REDUNDANCY = 4,
PACKET_TYPE_CHANNEL_FEEDBACK = 5
} prrtPacketType_t;
typedef uint16_t prrtSequenceNumber_t;
typedef uint8_t prrtIndex_t;
typedef uint32_t prrtTimestamp_t;
typedef uint32_t prrtTimedelta_t;
typedef uint32_t prrtPacketLength_t;
typedef struct prrtIncompleteBlock {
prrtSequenceNumber_t sequenceNumberBase;
uint16_t repairCycleIndex;
} PrrtIncompleteBlock;
typedef struct prrtPacket {
uint8_t type_priority;
uint8_t index;
uint16_t seqno;
void* payload;
uint32_t payload_len;
prrtIndex_t index;
prrtSequenceNumber_t sequenceNumber;
void *payload;
prrtPacketLength_t payloadLength;
} PrrtPacket;
#define PRRT_PACKET_GENERAL_HEADER_SIZE 8
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
......@@ -30,57 +38,68 @@ typedef struct prrtPacket {
#define SEQNO_SPACE UINT16_MAX // 2**16 as seqno is uint16_t
typedef struct prrtPacketDataPayload {
uint32_t timestamp;
uint16_t group_round_trip_time;
uint16_t packet_timeout;
uint16_t decoding_timeout;
uint16_t feedback_timeout;
prrtTimestamp_t timestamp;
prrtTimedelta_t groupRTT_us;
prrtTimedelta_t packetTimeout_us;
prrtTimedelta_t decodingTimeout_us;
prrtTimedelta_t feedbackTimeout_us;
} PrrtPacketDataPayload;
#define PRRT_PACKET_DATA_HEADER_SIZE sizeof(PrrtPacketDataPayload)
typedef struct prrtPacketRedundancyPayload {
uint16_t base_seqno;
prrtSequenceNumber_t baseSequenceNumber;
uint8_t n;
uint8_t k;
} PrrtPacketRedundancyPayload;
#define PRRT_PACKET_REDUNDANCY_HEADER_SIZE sizeof(PrrtPacketRedundancyPayload)
typedef struct prrtPacketFeedbackPayload {
uint32_t receiver_addr;
uint32_t group_round_trip_time;
uint32_t forward_trip_timestamp;
uint32_t packet_loss_rate;
uint16_t gap;
uint16_t ngap;
uint16_t burst;
uint16_t nburst;
uint32_t bandwidth_estimate;
uint32_t buffer_feedback;
uint32_t receiverAddress;
prrtTimedelta_t groupRTT_us;
prrtTimestamp_t forwardTripTimestamp_us;
prrtSequenceNumber_t erasureCount;
prrtSequenceNumber_t packetCount;
prrtSequenceNumber_t gapLength;
prrtSequenceNumber_t gapCount;
prrtSequenceNumber_t burstLength;
prrtSequenceNumber_t burstCount;
uint32_t bandwidthEstimate;
PrrtIncompleteBlock *incompleteBlocks;
} PrrtPacketFeedbackPayload;
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE sizeof(PrrtPacketFeedbackPayload)
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE sizeof(PrrtPacketFeedbackPayload) - sizeof(PrrtIncompleteBlock*)
prrtPacketType_t PrrtPacket_type(PrrtPacket *packet_ptr);
uint8_t PrrtPacket_type(PrrtPacket *packet_ptr);
uint8_t PrrtPacket_priority(PrrtPacket *packet_ptr);
uint16_t PrrtPacket_size(PrrtPacket *packet_ptr);
prrtPacketLength_t PrrtPacket_size(PrrtPacket *packet_ptr);
int PrrtPacket_print(PrrtPacket *packet_ptr);
PrrtPacket *PrrtPacket_copy(PrrtPacket *original);
PrrtPacket * PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer, uint32_t payloadLength, uint16_t sequenceNumber);
PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer,
prrtPacketLength_t payloadLength, prrtSequenceNumber_t sequenceNumber);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t sequenceNumber,
uint32_t roundTripTime, uint32_t packetLossRate, uint16_t gap,
uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth,
uint32_t bufferFeedback, uint32_t receiverAddr);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t roundTripTime, prrtSequenceNumber_t gapLength,
prrtSequenceNumber_t gapCount, prrtSequenceNumber_t burstLength,
prrtSequenceNumber_t burstCount, uint32_t bandwidth,
uint32_t receiverAddr);
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer, uint32_t payloadLength,
uint16_t sequenceNumber, uint8_t index, uint16_t baseSequenceNumber, PrrtCodingParams codingParams);
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer,
prrtPacketLength_t payloadLength,
prrtSequenceNumber_t sequenceNumber, uint8_t index,
prrtSequenceNumber_t baseSequenceNumber, PrrtCodingParams codingParams);
bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket);
bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr);
int PrrtPacket_destroy(PrrtPacket *packet);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload_len - header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload_len - header_size);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payloadLength - header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payloadLength - header_size);
#define PrrtPacket_get_data_timestamp(packet) ((PrrtPacketDataPayload*) packet->payload)->timestamp;
......
......@@ -9,7 +9,7 @@
#include "../socket.h"
#include "dataReceiver.h"
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block)
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
{
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
......@@ -17,20 +17,20 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->sequenceNumber);
}
error:
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, prrtSequenceNumber_t base_seqno)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
......@@ -62,15 +62,16 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 50, 4, 6, 8, 9, 5, 1, sock_ptr->address->sin_addr.s_addr);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 4, 6, 8, 9, 5,
sock_ptr->address->sin_addr.s_addr);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
uint32_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forward_trip_timestamp = forwardTripTime;
prrtTimestamp_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forwardTripTimestamp_us = forwardTripTime;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
......@@ -105,21 +106,21 @@ void *receive_data_loop(void *ptr)
// TODO: make something useful with RTT approximation
PrrtPacketDataPayload* payload = packet->payload;
debug("RTT: %d", payload->group_round_trip_time);
debug("RTT: %d", payload->groupRTT_us);
sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
uint8_t packetType = PrrtPacket_type(packet);
prrtPacketType_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
// TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->sequenceNumber) ==
false) {
PrrtPacket_destroy(packet);
} else {
uint16_t baseSequenceNumber = packet->seqno - packet->index;
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
......@@ -129,8 +130,8 @@ void *receive_data_loop(void *ptr)
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->seqno) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
} else {
PrrtPacket_destroy(reference);
}
......@@ -147,28 +148,28 @@ void *receive_data_loop(void *ptr)
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->base_seqno,
redundancyPayload->baseSequenceNumber,
redundancyPayload->n)) {
PrrtPacket_destroy(packet);
} else {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber);
if(block == NULL) {
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = PrrtBlock_create(cpar, redundancyPayload->base_seqno);
block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
PrrtCodingParams_destroy(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno,
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber,
block);
}
retrieve_data_blocks(sock_ptr, redundancyPayload->base_seqno, block->codingParams.k, block);
retrieve_data_blocks(sock_ptr, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(sock_ptr, block, redundancyPayload->base_seqno);
decode_block(sock_ptr, block, redundancyPayload->baseSequenceNumber);
} else {
PrrtPacket_destroy(packet);
}
......
......@@ -15,8 +15,8 @@
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
uint32_t length = PrrtPacket_size(packet);
uint8_t type = PrrtPacket_type(packet);
prrtPacketLength_t length = PrrtPacket_size(packet);
prrtPacketType_t type = PrrtPacket_type(packet);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
......@@ -77,12 +77,12 @@ void * send_data_loop(void *ptr) {
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
packet->seqno = sock_ptr->sequenceNumberSource++;
packet->index = (uint8_t) (packet->seqno - block->baseSequenceNumber);
packet->sequenceNumber = sock_ptr->sequenceNumberSource++;
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
PrrtPacketDataPayload* payload = packet->payload;
// TODO: should lock here !
payload->group_round_trip_time = (uint16_t) (sock_ptr->csi.rttMean / 1000);
payload->groupRTT_us = (prrtTimedelta_t) (sock_ptr->csi.rttMean / 1000);
PrrtPacket* packetToSend = PrrtPacket_copy(packet);
send_packet(sock_ptr, packetToSend);
......
......@@ -17,12 +17,12 @@
#include "receiver.h"
#include "clock.h"
PrrtSocket *PrrtSocket_create(const uint8_t is_sender)
PrrtSocket *PrrtSocket_create(const bool is_sender)
{
PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
check_mem(sock_ptr);
sock_ptr->is_sender = is_sender;
sock_ptr->isSender = is_sender;
sock_ptr->isBound = false;
sock_ptr->sequenceNumberSource = 1;
......@@ -83,7 +83,7 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t
check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
"Cannot bind data socket.");
if(sock_ptr->is_sender) {
if(sock_ptr->isSender) {
check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
......@@ -107,10 +107,10 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
}
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
check(sock_ptr->is_sender, "Cannot send on receiver socket.")
check(sock_ptr->isSender, "Cannot send on receiver socket.")
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, 0);
List_push(sock_ptr->outQueue, packet);
check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
......@@ -123,7 +123,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data
}
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
check(sock_ptr->is_sender == false, "Cannot receive on sender socket.")
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while (1) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
while (List_count(sock_ptr->inQueue) == 0) {
......@@ -138,7 +138,7 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
}
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
......@@ -286,7 +286,7 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
if (n == 0) {
return NULL;
}
uint32_t receiveTime = PrrtClock_get_current_time();
prrtTimestamp_t receiveTime = PrrtClock_get_current_time();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
......@@ -297,10 +297,10 @@ PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length
PrrtPacketFeedbackPayload* payload = prrtPacket->payload;
struct in_addr a;
a.s_addr = payload->receiver_addr;
a.s_addr = payload->receiverAddress;
printf("%s\n", inet_ntoa(a));
uint32_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forward_trip_timestamp;
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us;