Commit 06862cde authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Fix memory leaks. Proper return types.

parent 5b8516a6
...@@ -16,6 +16,7 @@ void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int * ...@@ -16,6 +16,7 @@ void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *
PrrtPacket* packet = List_shift(block_ptr->redundancy_blocks); PrrtPacket* packet = List_shift(block_ptr->redundancy_blocks);
PrrtPacket_copy_payload_to_buffer(fec[m+i], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(fec[m+i], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m+i] = packet->index; idx_p[m+i] = packet->index;
PrrtPacket_destroy(packet);
} }
} }
...@@ -26,6 +27,7 @@ void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p) { ...@@ -26,6 +27,7 @@ void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p) {
PrrtPacket* packet = List_shift(block_ptr->data_blocks); PrrtPacket* packet = List_shift(block_ptr->data_blocks);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[m] = packet->index; idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
} }
} }
...@@ -37,7 +39,19 @@ void clear_list(gf *const *src, uint8_t k) { ...@@ -37,7 +39,19 @@ void clear_list(gf *const *src, uint8_t k) {
} }
void PrrtBlock_free(PrrtBlock **mblock) { void PrrtBlock_free(PrrtBlock **mblock) {
while(List_count((*mblock)->data_blocks) > 0){
PrrtPacket* pkt = List_shift((*mblock)->data_blocks);
PrrtPacket_destroy(pkt);
}
while(List_count((*mblock)->redundancy_blocks) > 0){
PrrtPacket* pkt = List_shift((*mblock)->redundancy_blocks);
PrrtPacket_destroy(pkt);
}
List_destroy((*mblock)->data_blocks);
List_destroy((*mblock)->redundancy_blocks);
free(*mblock);
} }
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno) { int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno) {
...@@ -83,7 +97,7 @@ PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) { ...@@ -83,7 +97,7 @@ PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) {
} }
void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) { void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
int j = 0, m = 0; int j = 0;
uint8_t k = block_ptr->coding_params.k; uint8_t k = block_ptr->coding_params.k;
uint8_t n = block_ptr->coding_params.n; uint8_t n = block_ptr->coding_params.n;
uint8_t r = block_ptr->coding_params.r; uint8_t r = block_ptr->coding_params.r;
...@@ -93,11 +107,10 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) { ...@@ -93,11 +107,10 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
PrrtCoder_get_coder(&coder, n, k); PrrtCoder_get_coder(&coder, n, k);
gf** src = calloc(1, sizeof(gf*) * k); gf** src = calloc(k, sizeof(gf*));
LIST_FOREACH(block_ptr->data_blocks, first, next, cur) { LIST_FOREACH(block_ptr->data_blocks, first, next, cur) {
src[j] = calloc(1, sizeof(gf) * length); src[j] = calloc(length, sizeof(gf));
memset(src[j], 0, sizeof(gf) * length);
PrrtPacket* pkt = cur->value; PrrtPacket* pkt = cur->value;
pkt->index = (uint8_t) j; pkt->index = (uint8_t) j;
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
...@@ -109,8 +122,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) { ...@@ -109,8 +122,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
gf** fec = calloc(1, sizeof(gf*) * r); gf** fec = calloc(1, sizeof(gf*) * r);
for(j = 0; j < r; j++) { for(j = 0; j < r; j++) {
fec[j] = calloc(1, sizeof(gf) * length); fec[j] = calloc(length, sizeof(gf));
memset(fec[j], 0, sizeof(gf) * length);
PrrtCoder_encode(coder, src, fec[j], j+k, length); // gf **src, gf *fec, int index, int sz PrrtCoder_encode(coder, src, fec[j], j+k, length); // gf **src, gf *fec, int index, int sz
PrrtPacket* red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void*) fec[j], length, *seqno, PrrtPacket* red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void*) fec[j], length, *seqno,
(uint8_t) (k + j), base_seqno, block_ptr->coding_params); (uint8_t) (k + j), base_seqno, block_ptr->coding_params);
...@@ -120,6 +132,8 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) { ...@@ -120,6 +132,8 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
block_ptr->is_coded = TRUE; block_ptr->is_coded = TRUE;
PrrtCoder_destroy(coder);
clear_list(fec, r); clear_list(fec, r);
free(fec); free(fec);
......
...@@ -5,17 +5,18 @@ ...@@ -5,17 +5,18 @@
#include <src/defines.h> #include <src/defines.h>
#include <src/prrt/socket.h> #include <src/prrt/socket.h>
#include <src/prrt/block.h> #include <src/prrt/block.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "data_transmitter.h" #include "data_transmitter.h"
int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) { int send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
int err = 0;
uint8_t buf[MAX_PAYLOAD_LENGTH]; uint8_t buf[MAX_PAYLOAD_LENGTH];
uint32_t length = PrrtPacket_size(data_pkt); memset(buf, 0, sizeof(buf));
if (PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, data_pkt) < 0) { uint32_t length = PrrtPacket_size(packet);
perror("BUF too small.");
exit(0); int encodeResult = PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet);
} check(encodeResult >= 0, "Buffer too small.");
// SENDING TO ALL RECEIVERS // SENDING TO ALL RECEIVERS
int i; int i;
...@@ -32,15 +33,17 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) { ...@@ -32,15 +33,17 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) {
hp = gethostbyname(recv.host_name); hp = gethostbyname(recv.host_name);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if ((sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) { ssize_t sendtoRes = sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr));
perror("sendto failed"); check(sendtoRes >= 0, "Sendto failed.")
exit(1);
}
} }
PrrtPacket_destroy(data_pkt); PrrtPacket_destroy(packet);
return EXIT_SUCCESS;
return err; error:
PERROR("Something is wrong.", "")
return EXIT_FAILURE;
} }
void *send_data_loop(void *ptr) { void *send_data_loop(void *ptr) {
...@@ -49,7 +52,6 @@ void *send_data_loop(void *ptr) { ...@@ -49,7 +52,6 @@ void *send_data_loop(void *ptr) {
PrrtBlock *block = NULL; PrrtBlock *block = NULL;
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams)); PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar); PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&block, cpar, sock_ptr->sequenceNumberRedundancy);
while (1) { while (1) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex); pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
...@@ -57,6 +59,10 @@ void *send_data_loop(void *ptr) { ...@@ -57,6 +59,10 @@ void *send_data_loop(void *ptr) {
pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex); pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
} }
if(block == NULL) {
PrrtBlock_alloc(&block, cpar, sock_ptr->sequenceNumberRedundancy);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue); PrrtPacket *packet = List_shift(sock_ptr->outQueue);
PrrtBlock_insert_data_packet(block, packet); PrrtBlock_insert_data_packet(block, packet);
...@@ -65,15 +71,20 @@ void *send_data_loop(void *ptr) { ...@@ -65,15 +71,20 @@ void *send_data_loop(void *ptr) {
int j = 0; int j = 0;
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy); PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
for (j = 0; j < List_count(block->redundancy_blocks); j++) { uint32_t redundancyBlocks = List_count(block->redundancy_blocks);
for (j = 0; j < redundancyBlocks; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block); PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt); send_packet(sock_ptr, red_pkt);
} }
for (j = 0; j < List_count(block->data_blocks); j++) { uint32_t dataBlocks = List_count(block->data_blocks);
for (j = 0; j < dataBlocks; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(block); PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
send_packet(sock_ptr, data_pkt); send_packet(sock_ptr, data_pkt);
} }
PrrtBlock_free(&block);
block = NULL;
} }
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex); pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
......
...@@ -110,23 +110,40 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) { ...@@ -110,23 +110,40 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE; uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
pthread_mutex_unlock(&t); pthread_mutex_unlock(&t);
return len; return len;
} }
} }
int PrrtSocket_close(const PrrtSocket *sock_ptr) { int PrrtSocket_close(PrrtSocket *sock_ptr) {
if(sock_ptr->dataStore != NULL) { if(sock_ptr->dataStore != NULL) {
BPTree_destroy(sock_ptr->dataStore); sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
}
if(sock_ptr->blockStore != NULL) {
sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
} }
void **res = NULL;
pthread_join(sock_ptr->sendThread, res);
// TODO: shut down threads; // TODO: shut down threads;
// TODO: clean up all receivers // TODO: clean up all receivers
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex); if(sock_ptr->outQueue != NULL) {
pthread_cond_destroy(&sock_ptr->outQueueFilledCv); pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
List_destroy(sock_ptr->outQueue); pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
List_destroy(sock_ptr->outQueue);
}
if(sock_ptr->inQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
List_destroy(sock_ptr->inQueue);
}
close(sock_ptr->dataSocketFd); close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd); close(sock_ptr->feedbackSocketFd);
......
...@@ -44,7 +44,7 @@ typedef struct { ...@@ -44,7 +44,7 @@ typedef struct {
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender); int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender);
int PrrtSocket_close(const PrrtSocket *sock_ptr); int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len); int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len);
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr); uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr);
......
...@@ -48,7 +48,7 @@ int main(int argc, char* const argv) { ...@@ -48,7 +48,7 @@ int main(int argc, char* const argv) {
free(line); free(line);
usleep(1000*1000); usleep(1000*1000*4);
PrrtSocket_close(&sock); PrrtSocket_close(&sock);
printf("COMPLETELY CLOSED\n"); printf("COMPLETELY CLOSED\n");
return 0; return 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment