Commit e314534f authored by Andreas Schmidt's avatar Andreas Schmidt

Block is now threadsafe.

parent f17b8b5f
Pipeline #964 failed with stages
in 55 seconds
......@@ -5,7 +5,6 @@
#include "../util/common.h"
#include "packet.h"
#include "block.h"
#include "codingParams.h"
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
......@@ -44,45 +43,59 @@ static void clear_list(gf *const *src, uint8_t k)
}
}
void PrrtBlock_destroy(PrrtBlock *mblock)
bool PrrtBlock_destroy(PrrtBlock *block_ptr)
{
while(List_count(mblock->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(mblock->dataPackets);
check(pthread_mutex_destroy(&block_ptr->lock) == 0, "Mutex init failed.");
while(List_count(block_ptr->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block_ptr->dataPackets);
PrrtPacket_destroy(pkt);
}
while(List_count(mblock->redundancyPackets) > 0) {
PrrtPacket *pkt = List_shift(mblock->redundancyPackets);
while(List_count(block_ptr->redundancyPackets) > 0) {
PrrtPacket *pkt = List_shift(block_ptr->redundancyPackets);
PrrtPacket_destroy(pkt);
}
List_destroy(mblock->dataPackets);
List_destroy(mblock->redundancyPackets);
free(mblock);
List_destroy(block_ptr->dataPackets);
List_destroy(block_ptr->redundancyPackets);
free(block_ptr);
return true;
error:
PERROR("FAILED");
return false;
}
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber)
{
PrrtBlock *block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock *block_ptr = calloc(1, sizeof(PrrtBlock));
check_mem(block_ptr);
block->codingParams = *cpar;
block->dataPackets = List_create();
block->redundancyPackets = List_create();
block->baseSequenceNumber = baseSequenceNumber;
block->largestDataLength = 0;
block_ptr->codingParams = *cpar;
block_ptr->dataPackets = List_create();
block_ptr->redundancyPackets = List_create();
block_ptr->baseSequenceNumber = baseSequenceNumber;
block_ptr->largestDataLength = 0;
return block;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&block_ptr->lock, &attr) == 0, "Mutex init failed.");
return block_ptr;
error:
PERROR("Memory issue.%s","");
return NULL;
}
bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket)
bool PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *prrtPacket)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
bool found = false;
LIST_FOREACH(prrtBlock->dataPackets, first, next, cur) {
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->sequenceNumber == prrtPacket->sequenceNumber) {
found = true;
......@@ -90,52 +103,80 @@ bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPa
}
if(found == false) {
List_push(prrtBlock->dataPackets, prrtPacket);
prrtBlock->largestDataLength = (prrtPacketLength_t) MAX(prrtBlock->largestDataLength,
List_push(block_ptr->dataPackets, prrtPacket);
block_ptr->largestDataLength = (prrtPacketLength_t) MAX(block_ptr->largestDataLength,
prrtPacket->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
return true;
} else {
return false;
}
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return found == false;
error:
PERROR("Insert data failed.")
return false;
}
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr)
{
bool found = false;
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
LIST_FOREACH(block_ptr->redundancyPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->sequenceNumber == ptr->sequenceNumber) {
found = true;
}
}
if(found == false) {
List_push(block_ptr->redundancyPackets, ptr);
block_ptr->largestDataLength = (prrtPacketLength_t) MAX(block_ptr->largestDataLength,
ptr->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return true;
} else {
return false;
ptr->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
}
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return found == false;
error:
PERROR("Insert data failed.")
return false;
}
bool PrrtBlock_encode_ready(const PrrtBlock *block_ptr)
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr)
{
return (List_count(block_ptr->dataPackets) == block_ptr->codingParams.k);
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
bool res = List_count(block_ptr->dataPackets) == block_ptr->codingParams.k;
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Encode ready failed.")
return false;
}
bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr)
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr)
{
return (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams.k);
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
bool res = (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams.k);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Decode ready failed.")
return false;
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
{
return List_shift(block_ptr->dataPackets);
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *res = List_shift(block_ptr->dataPackets);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Get first data failed.")
return NULL;
}
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
{
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
int j = 0;
uint8_t k = block_ptr->codingParams.k;
uint8_t n = block_ptr->codingParams.n;
......@@ -181,6 +222,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
clear_list(src, k);
free(src);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return;
......@@ -193,8 +235,10 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
int i, j = 0;
gf **fec = NULL;
int *idx_p = NULL;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint8_t n = 0, k = 0;
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
n = block_ptr->codingParams.n;
k = block_ptr->codingParams.k;
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestDataLength;
......@@ -233,6 +277,8 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
clear_list(fec, k);
free(fec);
free(idx_p);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
......@@ -250,5 +296,12 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr)
{
return List_shift(block_ptr->redundancyPackets);
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *res = (PrrtPacket *) List_shift(block_ptr->redundancyPackets);
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return res;
error:
PERROR("Get first data failed.")
return NULL;
}
......@@ -12,6 +12,7 @@ typedef struct prrtBlock {
List*dataPackets;
List*redundancyPackets;
bool isCoded;
pthread_mutex_t lock;
} PrrtBlock;
......@@ -23,13 +24,13 @@ PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, prrtSequenceNumber_t
/**
* Frees the PrrtBlock data structure.
*/
void PrrtBlock_destroy(PrrtBlock *mblock);
bool PrrtBlock_destroy(PrrtBlock *block_ptr);
bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
bool PrrtBlock_encode_ready(const PrrtBlock *block_ptr);
bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr);
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment