Commit 4f4bfbd4 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Fix memory leak due to coder, feedback packets, blocks and coding params.

parent 11912f7f
......@@ -8,26 +8,26 @@
void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p) {
int i;
int m = List_count(block_ptr->data_blocks);
int m = List_count(block_ptr->dataPackets);
uint32_t redundancyBlocks = List_count(block_ptr->redundancy_blocks);
uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
for (i = 0; i < redundancyBlocks; i++) {
PrrtPacket* packet = List_shift(block_ptr->redundancy_blocks);
PrrtPacket* packet = List_shift(block_ptr->redundancyPackets);
PrrtPacket_copy_payload_to_buffer(fec[m+i], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m+i] = packet->index;
// TODO: PrrtPacket_destroy(packet);
PrrtPacket_destroy(packet);
}
}
void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p) {
int m = 0;
uint32_t dataBlocks = List_count(block_ptr->data_blocks);
uint32_t dataBlocks = List_count(block_ptr->dataPackets);
for (m = 0; m < dataBlocks; m++) {
PrrtPacket* packet = List_shift(block_ptr->data_blocks);
PrrtPacket* packet = List_shift(block_ptr->dataPackets);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[m] = packet->index;
// TODO: PrrtPacket_destroy(packet);
PrrtPacket_destroy(packet);
}
}
......@@ -38,70 +38,65 @@ void clear_list(gf *const *src, uint8_t k) {
}
}
void PrrtBlock_free(PrrtBlock **mblock) {
while(List_count((*mblock)->data_blocks) > 0){
PrrtPacket* pkt = List_shift((*mblock)->data_blocks);
void PrrtBlock_free(PrrtBlock *mblock) {
while(List_count(mblock->dataPackets) > 0){
PrrtPacket* pkt = List_shift(mblock->dataPackets);
PrrtPacket_destroy(pkt);
}
while(List_count((*mblock)->redundancy_blocks) > 0){
PrrtPacket* pkt = List_shift((*mblock)->redundancy_blocks);
while(List_count(mblock->redundancyPackets) > 0){
PrrtPacket* pkt = List_shift(mblock->redundancyPackets);
PrrtPacket_destroy(pkt);
}
List_destroy((*mblock)->data_blocks);
List_destroy((*mblock)->redundancy_blocks);
free(*mblock);
List_destroy(mblock->dataPackets);
List_destroy(mblock->redundancyPackets);
free(mblock);
}
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno) {
*mblock = calloc(1, sizeof(PrrtBlock));
check_mem(mblock);
(*mblock)->coding_params = *cpar;
(*mblock)->data_blocks =List_create();
(*mblock)->redundancy_blocks =List_create();
(*mblock)->base_seqno = base_seqno;
(*mblock)->largest_data_length = 0;
int PrrtBlock_alloc(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t base_seqno) {
mblock->codingParams = *cpar;
mblock->dataPackets =List_create();
mblock->redundancyPackets =List_create();
mblock->baseSequenceNumber = base_seqno;
mblock->largestDataLength = 0;
return 0;
error:
PNOTIMPLEMENTED("TODO");
}
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr) {
check(List_count(block_ptr->data_blocks) < block_ptr->coding_params.k, "Inserting an unnecessary item.");
List_push(block_ptr->data_blocks, packet_ptr);
block_ptr->largest_data_length = MAX(block_ptr->largest_data_length, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
check(List_count(block_ptr->dataPackets) < block_ptr->codingParams.k, "Inserting an unnecessary item.");
List_push(block_ptr->dataPackets, packet_ptr);
block_ptr->largestDataLength = MAX(block_ptr->largestDataLength, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return 0;
error:
PNOTIMPLEMENTED("HANDLING MISSING");
}
int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr) {
List_push(block_ptr->redundancy_blocks, ptr);
block_ptr->largest_data_length = (uint32_t) MAX(block_ptr->largest_data_length, ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
List_push(block_ptr->redundancyPackets, ptr);
block_ptr->largestDataLength = (uint32_t) MAX(block_ptr->largestDataLength, ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return 0;
}
int PrrtBlock_encode_ready(const PrrtBlock *block_ptr) {
return (List_count(block_ptr->data_blocks) == block_ptr->coding_params.k) ? TRUE : FALSE;
return (List_count(block_ptr->dataPackets) == block_ptr->codingParams.k) ? TRUE : FALSE;
}
int PrrtBlock_decode_ready(const PrrtBlock *block_ptr) {
return (List_count(block_ptr->data_blocks) + List_count(block_ptr->redundancy_blocks) == block_ptr->coding_params.k) ? TRUE : FALSE;
return (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams.k) ? TRUE : FALSE;
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) {
return List_shift(block_ptr->data_blocks);
return List_shift(block_ptr->dataPackets);
}
void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
int j = 0;
uint8_t k = block_ptr->coding_params.k;
uint8_t n = block_ptr->coding_params.n;
uint8_t r = block_ptr->coding_params.r;
uint32_t length = block_ptr->largest_data_length;
uint8_t k = block_ptr->codingParams.k;
uint8_t n = block_ptr->codingParams.n;
uint8_t r = block_ptr->codingParams.r;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder* coder = NULL;
......@@ -109,7 +104,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
gf** src = calloc(k, sizeof(gf*));
LIST_FOREACH(block_ptr->data_blocks, first, next, cur) {
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
src[j] = calloc(length, sizeof(gf));
PrrtPacket* pkt = cur->value;
pkt->index = (uint8_t) j;
......@@ -117,7 +112,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
j++;
}
PrrtPacket *first_packet = (PrrtPacket*) block_ptr->data_blocks->first->value;
PrrtPacket *first_packet = (PrrtPacket*) block_ptr->dataPackets->first->value;
uint16_t base_seqno = first_packet->seqno;
gf** fec = calloc(1, sizeof(gf*) * r);
......@@ -125,12 +120,12 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
fec[j] = calloc(length, sizeof(gf));
PrrtCoder_encode(coder, src, fec[j], j+k, length); // gf **src, gf *fec, int index, int sz
PrrtPacket* red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void*) fec[j], length, *seqno,
(uint8_t) (k + j), base_seqno, block_ptr->coding_params);
(uint8_t) (k + j), base_seqno, block_ptr->codingParams);
*seqno = (uint16_t) (*seqno + 1 % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
}
block_ptr->is_coded = TRUE;
block_ptr->isCoded = TRUE;
PrrtCoder_destroy(coder);
......@@ -142,11 +137,11 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
}
void PrrtBlock_decode(PrrtBlock *block_ptr) {
int i, j;
uint8_t n = block_ptr->coding_params.n;
uint8_t k = block_ptr->coding_params.k;
int i, j = 0;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint32_t length = block_ptr->largest_data_length;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder* coder = NULL;
......@@ -154,7 +149,7 @@ void PrrtBlock_decode(PrrtBlock *block_ptr) {
gf** fec = calloc(n, sizeof(gf*));
for (i = 0; i < n; i++) {
fec[i] = calloc(block_ptr->largest_data_length, sizeof(gf));
fec[i] = calloc(block_ptr->largestDataLength, sizeof(gf));
}
int *idx_p = calloc(k,sizeof(int));
......@@ -166,15 +161,17 @@ void PrrtBlock_decode(PrrtBlock *block_ptr) {
for(j = 0; j < k; j++) {
PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
PrrtPacket_create_data_packet(packet, 0, fec[j], length, block_ptr->base_seqno + j);
PrrtPacket_create_data_packet(packet, 0, fec[j], length, block_ptr->baseSequenceNumber + j);
PrrtBlock_insert_data_packet(block_ptr, packet);
}
PrrtCoder_destroy(coder);
clear_list(fec, n);
free(fec);
free(idx_p);
}
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr) {
return List_shift(block_ptr->redundancy_blocks);
return List_shift(block_ptr->redundancyPackets);
}
\ No newline at end of file
......@@ -6,24 +6,24 @@
#include <src/util/list.h>
typedef struct {
PrrtCodingParams coding_params;
uint32_t largest_data_length;
uint16_t base_seqno;
List* data_blocks;
List* redundancy_blocks;
uint8_t is_coded;
PrrtCodingParams codingParams;
uint32_t largestDataLength;
uint16_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
uint8_t isCoded;
} PrrtBlock;
/**
* Allocate space for a block.
*/
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno);
int PrrtBlock_alloc(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t base_seqno);
/**
* Frees the PrrtBlock data structure.
*/
void PrrtBlock_free(PrrtBlock **mblock);
void PrrtBlock_free(PrrtBlock *mblock);
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr);
int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr);
......
......@@ -16,25 +16,33 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr);
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
}
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if (block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->coding_params.k, block);
retrieve_data_blocks(sock_ptr, base_seqno, block->codingParams.k, block);
PrrtBlock_decode(block);
LIST_FOREACH(block->data_blocks, first, next, cur) {
PrrtPacket *pkt = cur->value;
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
} else {
PrrtPacket_destroy(pkt);
}
}
PrrtBlock_free(block);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
}
}
......@@ -53,6 +61,7 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, MAX_PAYLOAD_LENGTH);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) == EXIT_SUCCESS,
......@@ -60,6 +69,8 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
PrrtPacket_destroy(feedback_pkt_ptr);
return EXIT_SUCCESS;
error:
......@@ -79,7 +90,6 @@ void * receive_data_loop(void *ptr) {
while (1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
printf("RECEIVED: %d\n", n);
check(send_feedback(sock_ptr, remote) == EXIT_SUCCESS, "Sending feedback failed.");
packet = calloc(1, sizeof(PrrtPacket));
......@@ -92,6 +102,7 @@ void * receive_data_loop(void *ptr) {
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
FALSE) {
PrrtPacket_destroy(packet);
break;
}
......@@ -128,7 +139,11 @@ void * receive_data_loop(void *ptr) {
PrrtCodingParams_init(cpar);
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
PrrtBlock_alloc(&block, cpar, redundancyPayload->base_seqno);
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_alloc(block, cpar, redundancyPayload->base_seqno);
free(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno, block);
}
......@@ -136,7 +151,8 @@ void * receive_data_loop(void *ptr) {
decode_block(sock_ptr, redundancyPayload->base_seqno);
break;
default:
//PrrtPacket_print(packet);
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
break;
}
}
......
......@@ -57,7 +57,7 @@ void * send_data_loop(void *ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(sock_ptr->closing) {
free(cpar);
PrrtBlock_free(&block);
PrrtBlock_free(block);
block = NULL;
pthread_mutex_unlock(&sock_ptr->closingMutex);
......@@ -69,7 +69,10 @@ void * send_data_loop(void *ptr) {
}
if(block == NULL) {
PrrtBlock_alloc(&block, cpar, sock_ptr->sequenceNumberRedundancy);
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_alloc(block, cpar, sock_ptr->sequenceNumberRedundancy);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
......@@ -80,23 +83,27 @@ void * send_data_loop(void *ptr) {
int j = 0;
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
uint32_t redundancyBlocks = List_count(block->redundancy_blocks);
uint32_t redundancyBlocks = List_count(block->redundancyPackets);
for (j = 0; j < redundancyBlocks; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
uint32_t dataBlocks = List_count(block->data_blocks);
uint32_t dataBlocks = List_count(block->dataPackets);
for (j = 0; j < dataBlocks; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
send_packet(sock_ptr, data_pkt);
}
PrrtBlock_free(&block);
PrrtBlock_free(block);
block = NULL;
}
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
usleep(1);
}
error:
PERROR("FAIL");
return NULL;
}
......@@ -12,6 +12,7 @@
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
#include "socket.h"
#include "block.h"
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
......@@ -108,7 +109,6 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(sock_ptr->closing) {
// TODO: proper close
printf("CLOSE PROCESS");
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&t);
return -1;
......@@ -171,10 +171,28 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
if(sock_ptr->dataStore != NULL) {
List* dataList = List_create();
BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE-1);
while(List_count(dataList) > 0) {
PrrtPacket* packet = List_shift(dataList);
PrrtPacket_destroy(packet);
}
List_destroy(dataList);
sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
}
if(sock_ptr->blockStore != NULL) {
List* blockList = List_create();
BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE-1);
while(List_count(blockList) > 0) {
PrrtBlock* block = List_shift(blockList);
PrrtBlock_free(block);
}
List_destroy(blockList);
sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
}
......
......@@ -9,12 +9,12 @@ extern "C" {
class PrrtBlockTest : public ::testing::Test {
protected:
virtual void SetUp() {
encBlock = NULL;
decBlock = NULL;
encBlock = (PrrtBlock *) calloc(1, sizeof(PrrtBlock));
decBlock = (PrrtBlock *) calloc(1, sizeof(PrrtBlock));
PrrtCodingParams *cpar = (PrrtCodingParams *) calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&encBlock, cpar, 0);
PrrtBlock_alloc(&decBlock, cpar, 0);
PrrtBlock_alloc(encBlock, cpar, 0);
PrrtBlock_alloc(decBlock, cpar, 0);
}
PrrtBlock *encBlock;
PrrtBlock *decBlock;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment