Commit 6b093d47 authored by Andreas Schmidt's avatar Andreas Schmidt

WIP: block refactoring for efficiency

parent 2a3604ad
Pipeline #3133 passed with stages
in 2 minutes and 25 seconds
......@@ -22,6 +22,9 @@ build:prrt:
- which cmake
- which gcc
- which g++
- cmake --version
- gcc --version
- g++ --version
- pip3 list | grep Cython
- pip3 list | grep numpy
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
......
......@@ -10,7 +10,7 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
set(CMAKE_C_FLAGS "-O2 -Wall -std=gnu11 -D_GNU_SOURCE -fPIC" )
set(CMAKE_C_FLAGS_DEBUG "-O0 -fsanitize=undefined -fsanitize=address -g3" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE -fsanitize=undefined -fsanitize=address" )
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -Wall -ggdb -fsanitize=undefined -fsanitize=address -g3" )
set(CMAKE_CXX_FLAGS_RELEASE "-Os -Wall" )
......
......@@ -41,8 +41,9 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
if (data_relevant) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
// TODO add a method for this (PrrtBlock_has_next_data)
while (block->nextDataPacket < block->dataPacketCount) {
PrrtPacket *pkt = PrrtBlock_get_next_data(block);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
pkt->sequenceNumber)) {
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
......
......@@ -187,7 +187,7 @@ void retransmission_round_handler(void *arg) {
uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
PrrtPacket *red_pkt = PrrtBlock_get_next_red_data(block);
bool sendResult = send_packet(socket, red_pkt);
if(!sendResult) {
debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
......
......@@ -6,13 +6,16 @@
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
{
uint32_t i;
uint32_t m = 0;
uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
int k = PrrtCoder_get_k(block_ptr->coder);
int n = PrrtCoder_get_n(block_ptr->coder);
for(i = 0; i < redundancyBlocks; i++) {
PrrtPacket *packet = List_shift(block_ptr->redundancyPackets);
for(int i = k; i < n; ++i) {
PrrtPacket *packet = block_ptr->packets[i];
if (packet == NULL) {
continue;
}
while(idx_p[m] != -1) {
m++;
......@@ -20,14 +23,19 @@ static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
}
}
static void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
{
LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
PrrtPacket *packet = current->value;
int k = PrrtCoder_get_k(block_ptr->coder);
for (int i = 0; i < k; ++i) {
PrrtPacket *packet = block_ptr->packets[i];
if (packet == NULL) {
continue;
}
PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, 0);
idx_p[packet->index] = packet->index;
}
......@@ -47,19 +55,21 @@ void PrrtBlock_print(PrrtBlock *block) {
"+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %29u | %13u | %13u |\n", block->baseSequenceNumber, block->codingParams->n, block->codingParams->k);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %13d | %13d |\n", List_count(block->dataPackets), List_count(block->redundancyPackets));
printf("| %13d | %13d |\n", block->dataPacketCount, block->redPacketCount);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
}
bool PrrtBlock_destroy(PrrtBlock *block)
{
check(pthread_mutex_lock(&block->lock) == EXIT_SUCCESS, "Lock failed.");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
PrrtPacket_destroy(pkt);
}
while(List_count(block->redundancyPackets) > 0) {
PrrtPacket *pkt = List_shift(block->redundancyPackets);
int n = PrrtCoder_get_n(block->coder);
for (int i = 0; i < n; ++i) {
PrrtPacket *pkt = block->packets[i];
if (pkt == NULL) {
continue;
}
PrrtPacket_destroy(pkt);
}
......@@ -71,8 +81,7 @@ bool PrrtBlock_destroy(PrrtBlock *block)
PrrtCodingConfiguration_destroy(block->codingParams);
}
List_destroy(block->dataPackets);
List_destroy(block->redundancyPackets);
free(block->packets);
check(pthread_mutex_unlock(&block->lock) == EXIT_SUCCESS, "Unlock failed.");
check(pthread_mutex_destroy(&block->lock) == EXIT_SUCCESS, "Mutex init failed.");
......@@ -92,13 +101,16 @@ PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, pr
block_ptr->coder = coder;
block_ptr->codingParams = cpar;
block_ptr->dataPackets = List_create();
block_ptr->redundancyPackets = List_create();
block_ptr->baseSequenceNumber = baseSequenceNumber;
block_ptr->largestPayloadLength = 0;
block_ptr->nextRedundancyPacket = 0;
block_ptr->nextDataPacket = 0;
block_ptr->senderBlock = false;
block_ptr->packets = calloc(cpar->n, sizeof(PrrtPacket*));
block_ptr->dataPacketCount = 0;
block_ptr->redPacketCount = 0;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
......@@ -114,15 +126,16 @@ PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, pr
bool insert_data_packet(PrrtBlock * block, const PrrtPacket *packet) {
bool found = false;
debug(DEBUG_BLOCK, "S: %d, Insert [D]: %d", block->senderBlock, packet->sequenceNumber);
LIST_FOREACH(block->dataPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->sequenceNumber == packet->sequenceNumber) {
found = true;
}
int index = packet->index;
if (block->packets[index] != NULL) {
found = true;
}
if(found == false) {
List_push(block->dataPackets, packet);
block->packets[index] = packet;
block->dataPacketCount += 1;
}
return found;
}
......@@ -147,14 +160,15 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block, const PrrtPacket *pack
bool found = false;
check(pthread_mutex_lock(&block->lock) == EXIT_SUCCESS, "Lock failed.");
debug(DEBUG_BLOCK, "S: %d, Insert [R]: %d", block->senderBlock, packet->sequenceNumber);
LIST_FOREACH(block->redundancyPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->sequenceNumber == packet->sequenceNumber) {
found = true;
}
int index = packet->index;
if (block->packets[index] != NULL) {
found = true;
}
if(found == false) {
List_push(block->redundancyPackets, packet);
block->packets[index] = packet;
block->redPacketCount += 1;
block->largestPayloadLength = (prrtPacketLength_t) MAX(block->largestPayloadLength,
packet->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
}
......@@ -169,7 +183,7 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block, const PrrtPacket *pack
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
bool res = List_count(block_ptr->dataPackets) == block_ptr->codingParams->k;
bool res = block_ptr->dataPacketCount == block_ptr->codingParams->k;
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
......@@ -181,7 +195,7 @@ bool PrrtBlock_encode_ready(PrrtBlock *block_ptr)
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
bool res = (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams->k);
bool res = (block_ptr->dataPacketCount + block_ptr->redPacketCount == block_ptr->codingParams->k);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
......@@ -190,18 +204,6 @@ bool PrrtBlock_decode_ready(PrrtBlock *block_ptr)
return false;
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *res = List_shift(block_ptr->dataPackets);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Get first data failed.")
return NULL;
}
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
......@@ -214,10 +216,10 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
gf **src = calloc(k, sizeof(gf *));
check_mem(src);
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
for (int i = 0; i < block_ptr->codingParams->k; ++i) {
src[j] = calloc(length, sizeof(gf));
check_mem(src[j]);
PrrtPacket *pkt = cur->value;
PrrtPacket *pkt = block_ptr->packets[i];
pkt->index = (uint8_t) ((pkt->sequenceNumber - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, 0);
j++;
......@@ -235,6 +237,8 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
*seqno = (prrtSequenceNumber_t) ((*seqno + 1) % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
}
block_ptr->nextDataPacket = 0;
block_ptr->nextRedundancyPacket = k;
block_ptr->isCoded = true;
......@@ -314,24 +318,39 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
return false;
}
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr)
PrrtPacket *PrrtBlock_get_next_red_data(PrrtBlock *block_ptr)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
// TODO: This copy should be avoided.
PrrtPacket *res = PrrtPacket_copy(PrrtBlock_get_red_data(block_ptr, block_ptr->nextRedundancyPacket));
PrrtPacket *res = PrrtBlock_get_packet(block_ptr, block_ptr->nextRedundancyPacket);
block_ptr->nextRedundancyPacket++;
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Get first redundancy data failed.")
return NULL;
}
PrrtPacket *PrrtBlock_get_next_data(PrrtBlock *block_ptr)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *res = PrrtBlock_get_packet(block_ptr, block_ptr->nextDataPacket);
block_ptr->nextDataPacket++;
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Get first data failed.")
return NULL;
}
PrrtPacket *PrrtBlock_get_red_data(PrrtBlock *block_ptr, uint8_t index)
PrrtPacket *PrrtBlock_get_packet(PrrtBlock *block_ptr, int index)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *res = (PrrtPacket *) List_get(block_ptr->redundancyPackets, index);
PrrtPacket *res = block_ptr->packets[index];
block_ptr->packets[index] = NULL;
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
......
......@@ -11,14 +11,18 @@ typedef struct prrtBlock {
PrrtCodingConfiguration* codingParams;
prrtPacketLength_t largestPayloadLength;
prrtSequenceNumber_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
PrrtPacket **packets;
int dataPacketCount;
int redPacketCount;
bool isCoded;
bool isDecoded;
pthread_mutex_t lock;
PrrtCoder *coder;
uint16_t inRound;
uint8_t nextRedundancyPacket;
uint8_t nextDataPacket;
bool senderBlock;
} PrrtBlock;
......@@ -39,13 +43,14 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr);
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
void PrrtBlock_print(PrrtBlock *block);
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno);
bool PrrtBlock_decode(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_red_data(PrrtBlock *block_ptr, uint8_t index);
PrrtPacket *PrrtBlock_get_next_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_next_red_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_packet(PrrtBlock *block_ptr, int index);
#endif //PRRT_BLOCK_H
......@@ -128,15 +128,15 @@ TEST_F(PrrtBlockTest, EncodeDecode)
PrrtPacket *redPackets[4];
for(uint32_t i = 0; i < 4; i++) {
char data[3];
char data[10];
sprintf(data, "%d", i);
if(i == 3) {
// Last packet has a different size.
sprintf(data, "D:%d", i);
}
packets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data), i + 1, 0);
packets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data) + 1, i + 1, 0);
packets[i]->index = (uint8_t) i;
refPackets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data), i + 1, 0);
refPackets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data) + 1, i + 1, 0);
ASSERT_TRUE(PrrtBlock_insert_data_packet(encBlock, packets[i]));
}
......@@ -145,14 +145,14 @@ TEST_F(PrrtBlockTest, EncodeDecode)
uint32_t j = 0;
PrrtBlock_encode(encBlock, &base);
uint32_t pkt_count = List_count(encBlock->dataPackets);
uint32_t pkt_count = encBlock->dataPacketCount;
for(j = 0; j < pkt_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(encBlock);
PrrtPacket *data_pkt = PrrtBlock_get_next_data(encBlock);
ASSERT_EQ(refPackets[j]->sequenceNumber, data_pkt->sequenceNumber);
}
uint32_t red_count = List_count(encBlock->redundancyPackets);
uint32_t red_count = encBlock->redPacketCount;
for(j = 0; j < red_count; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(encBlock);
PrrtPacket *red_pkt = PrrtBlock_get_next_red_data(encBlock);
redPackets[j] = red_pkt;
}
......@@ -169,7 +169,7 @@ TEST_F(PrrtBlockTest, EncodeDecode)
PrrtBlock_decode(decBlock);
for(int k = 0; k < 4; ++k) {
PrrtPacket *ptr = PrrtBlock_get_first_data(decBlock);
PrrtPacket *ptr = PrrtBlock_get_next_data(decBlock);
prrtIndex_t idx = static_cast<prrtIndex_t>(ptr->sequenceNumber - 1);
const char *s1 = (const char *) ((char *) refPackets[idx]->payload + PRRT_PACKET_DATA_HEADER_SIZE);
......@@ -223,12 +223,12 @@ TEST_F(PrrtBlockTest, NoCodingConfigured)
uint32_t j = 0;
PrrtBlock_encode(encBlock, &base);
uint32_t pkt_count = List_count(encBlock->dataPackets);
uint32_t pkt_count = encBlock->dataPacketCount;
for(j = 0; j < pkt_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(encBlock);
PrrtPacket *data_pkt = PrrtBlock_get_next_data(encBlock);
ASSERT_EQ(refPackets[j]->sequenceNumber, data_pkt->sequenceNumber);
}
uint32_t red_count = List_count(encBlock->redundancyPackets);
uint32_t red_count = encBlock->redPacketCount;
ASSERT_EQ(red_count, 0);
for (int i = 0; i < 1; ++i) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment