Commit dda891c4 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Add error handling. Proper create/destroy.

* data_receiver.c
* block.c
* coding_params.c
* data_transmitter.c
* feedback_receiver.c
parent b6eae69c
Pipeline #65 failed with stage
......@@ -11,12 +11,11 @@ void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *
{
uint32_t i;
uint32_t m = 0;
PrrtPacket *packet = NULL;
uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
for(i = 0; i < redundancyBlocks; i++) {
packet = List_shift(block_ptr->redundancyPackets);
PrrtPacket *packet = List_shift(block_ptr->redundancyPackets);
while(idx_p[m] != -1) {
m++;
......@@ -62,15 +61,22 @@ void PrrtBlock_destroy(PrrtBlock *mblock)
free(mblock);
}
int PrrtBlock_create(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t base_seqno)
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno)
{
mblock->codingParams = *cpar;
mblock->dataPackets = List_create();
mblock->redundancyPackets = List_create();
mblock->baseSequenceNumber = base_seqno;
mblock->largestDataLength = 0;
PrrtBlock *block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
return 0;
block->codingParams = *cpar;
block->dataPackets = List_create();
block->redundancyPackets = List_create();
block->baseSequenceNumber = base_seqno;
block->largestDataLength = 0;
return block;
error:
PERROR("Memory issue.%s","");
return NULL;
}
bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket)
......@@ -139,27 +145,28 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
PrrtCoder *coder = NULL;
PrrtCoder_get_coder(&coder, n, k);
check(PrrtCoder_get_coder(&coder, n, k) == EXIT_SUCCESS, "Getting coding failed.");
gf **src = calloc(k, sizeof(gf *));
check_mem(src);
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
src[j] = calloc(length, sizeof(gf));
check_mem(src[j]);
PrrtPacket *pkt = cur->value;
pkt->index = (uint8_t) ((pkt->seqno - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
PrrtPacket *first_packet = (PrrtPacket *) block_ptr->dataPackets->first->value;
uint16_t base_seqno = first_packet->seqno;
gf **fec = calloc(1, sizeof(gf *) * r);
check_mem(fec);
for(j = 0; j < r; j++) {
fec[j] = calloc(length, sizeof(gf));
check_mem(fec[j]);
PrrtCoder_encode(coder, src, fec[j], j + k, length);
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), base_seqno,
(uint8_t) (k + j), block_ptr->baseSequenceNumber,
block_ptr->codingParams);
*seqno = (uint16_t) ((*seqno + 1) % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
......@@ -174,13 +181,14 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
clear_list(src, k);
free(src);
error:
PERROR("Encoding failed%s.", "");
}
void PrrtBlock_decode(PrrtBlock *block_ptr)
bool PrrtBlock_decode(PrrtBlock *block_ptr)
{
int i, j = 0;
gf **fec = NULL;
int *idx_p = NULL;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
......@@ -190,13 +198,13 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
PrrtCoder_get_coder(&coder, n, k);
fec = calloc(k, sizeof(gf *));
gf **fec = calloc(k, sizeof(gf *));
check_mem(fec);
for(i = 0; i < k; i++) {
fec[i] = calloc(block_ptr->largestDataLength, sizeof(gf));
}
idx_p = calloc(k, sizeof(int));
int *idx_p = calloc(k, sizeof(int));
check_mem(idx_p);
for(i = 0; i < k; i++) {
idx_p[i] = -1;
......@@ -205,20 +213,23 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
gather_data_packets(block_ptr, fec, idx_p);
gather_redundancy_packets(block_ptr, fec, idx_p);
int decodingRes = PrrtCoder_decode(coder, fec, idx_p, length);
check(decodingRes == EXIT_SUCCESS, "Decoding failed.");
check(PrrtCoder_decode(coder, fec, idx_p, length) == EXIT_SUCCESS, "Decoding failed.");
for(j = 0; j < k; j++) {
if(idx_p[j] >= k) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (baseSequenceNumber + j));
bool insertRes = PrrtBlock_insert_data_packet(block_ptr, packet);
if(insertRes == false) {
if(PrrtBlock_insert_data_packet(block_ptr, packet) == false) {
debug("Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
}
}
}
clear_list(fec, k);
free(fec);
free(idx_p);
return true;
error:
PrrtCoder_destroy(coder);
......@@ -229,6 +240,7 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
if(idx_p != NULL) {
free(idx_p);
}
return false;
}
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr)
......
......@@ -18,7 +18,7 @@ typedef struct prrtBlock {
/**
* Allocate space for a block.
*/
int PrrtBlock_create(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t base_seqno);
PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno);
/**
* Frees the PrrtBlock data structure.
......@@ -34,7 +34,7 @@ bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno);
void PrrtBlock_decode(PrrtBlock *block_ptr);
bool PrrtBlock_decode(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr);
......
#include "../util/common.h"
#include "../util/dbg.h"
#include "../defines.h"
#include "coding_params.h"
void PrrtCodingParams_init(PrrtCodingParams *cpar) {
PrrtCodingParams *PrrtCodingParams_create()
{
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
check_mem(cpar);
pthread_mutex_init(&cpar->lock, NULL);
cpar->k = K_START;
cpar->n = N_START;
cpar->r = cpar->n - cpar->k;
cpar->n_p = N_P_START;
return cpar;
error:
PERROR("Memory issue.%s","");
return NULL;
}
bool PrrtCodingParams_destroy(PrrtCodingParams * cpar) {
pthread_mutex_destroy(&cpar->lock);
free(cpar);
}
\ No newline at end of file
......@@ -13,7 +13,8 @@ typedef struct prrtCodingParams {
uint8_t n_p;
} PrrtCodingParams;
void PrrtCodingParams_init(PrrtCodingParams *cpar);
PrrtCodingParams * PrrtCodingParams_create();
bool PrrtCodingParams_destroy(PrrtCodingParams * cpar);
#endif //PRRT_CODING_PARAMS_H
......@@ -25,15 +25,15 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
PrrtBlock_decode(block);
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
}
......@@ -43,6 +43,8 @@ void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
}
error:
PERROR("Decoding failed.%s", "")
}
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
......@@ -62,6 +64,7 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
......@@ -73,6 +76,8 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
return true;
error:
if(buf != NULL) { free(buf); }
if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
return false;
}
......@@ -83,7 +88,6 @@ void *receive_data_loop(void *ptr)
socklen_t addrlen = sizeof(remote);
unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
PrrtPacketRedundancyPayload *redundancyPayload;
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
......@@ -92,7 +96,7 @@ void *receive_data_loop(void *ptr)
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
PrrtPacket_decode(buffer, (uint16_t) n, packet);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
uint8_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
......@@ -121,13 +125,13 @@ void *receive_data_loop(void *ptr)
// forward to application layer
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
}
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
redundancyPayload = packet->payload;
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->base_seqno,
......@@ -136,15 +140,14 @@ void *receive_data_loop(void *ptr)
} else {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
if(block == NULL) {
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_create(block, cpar, redundancyPayload->base_seqno);
free(cpar);
block = PrrtBlock_create(cpar, redundancyPayload->base_seqno);
PrrtCodingParams_destroy(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno,
block);
}
......
......@@ -33,9 +33,8 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
ssize_t sendtoRes = sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr));
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
usleep(1);
check(sendtoRes >= 0, "Sendto failed.")
}
PrrtPacket_destroy(packet);
......@@ -43,39 +42,34 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return true;
error:
PERROR("Something is wrong%s.", "")
PERROR("Sending packet failed.%s", "")
return false;
}
void * send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtCodingParams *cpar = PrrtCodingParams_create();
while (1) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
while (List_count(sock_ptr->outQueue) == 0) {
pthread_mutex_lock(&sock_ptr->closingMutex);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
if(sock_ptr->closing) {
free(cpar);
PrrtCodingParams_destroy(cpar);
if(block != NULL) {
PrrtBlock_destroy(block);
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
return NULL;
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0, "Cond wait failed.");
}
if(block == NULL) {
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberSource);
block = PrrtBlock_create(cpar, sock_ptr->sequenceNumberSource);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
......@@ -102,7 +96,7 @@ void * send_data_loop(void *ptr) {
block = NULL;
}
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
}
error:
......
#include <string.h>
#include <unistd.h>
#include "../../defines.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../packet.h"
#include "../socket.h"
#include "feedback_receiver.h"
......@@ -10,18 +12,22 @@ void * receive_feedback_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
pthread_mutex_lock(&sock_ptr->closingMutex);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
while (sock_ptr->closing == false) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
if (t != NULL) {
PrrtPacket_destroy(t);
}
usleep(1);
pthread_mutex_lock(&sock_ptr->closingMutex);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
return NULL;
error:
PERROR("Feedback reception failed.");
return NULL;
}
\ No newline at end of file
#include <malloc.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include "receiver.h"
......@@ -13,7 +14,7 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port)
error:
if(recv != NULL) { free(recv); }
return NULL;
PERROR("Memory issue.%s","");
}
bool PrrtReceiver_destroy(PrrtReceiver *receiver)
......
......@@ -236,7 +236,7 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
}
if (sock_ptr->forwardPacketTable != NULL) {
free(sock_ptr->forwardPacketTable);
check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed.");
sock_ptr->forwardPacketTable = NULL;
}
......
......@@ -42,9 +42,9 @@ int PrrtForwardPacketTable_create(PrrtForwardPacketTable *fpt_prt) {
return EXIT_SUCCESS;
}
int PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt) {
bool PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt) {
free(fpt_prt);
return EXIT_SUCCESS;
return true;
}
int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
......
......@@ -13,6 +13,7 @@ typedef struct {
} PrrtForwardPacketTable;
int PrrtForwardPacketTable_create(PrrtForwardPacketTable* fpt_prt);
bool PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt);
int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno);
......
......@@ -513,7 +513,7 @@ static void init_fec(PrrtCoder *cod) {
int prrt_coder_create(PrrtCoder **cod, uint8_t k, uint8_t n);
int PrrtCoder_get_coder(PrrtCoder **cod, uint8_t n, uint8_t k) {
int err = 0;
int err = EXIT_SUCCESS;
if (*cod == 0 ||
(*cod)->params.n != n ||
......
......@@ -14,13 +14,10 @@ class PrrtBlockTest : public ::testing::Test {
protected:
virtual void SetUp()
{
encBlock = (PrrtBlock *) calloc(1, sizeof(PrrtBlock));
decBlock = (PrrtBlock *) calloc(1, sizeof(PrrtBlock));
PrrtCodingParams *cpar = (PrrtCodingParams *) calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_create(encBlock, cpar, 1);
PrrtBlock_create(decBlock, cpar, 1);
free(cpar);
PrrtCodingParams *cpar = PrrtCodingParams_create();
encBlock = PrrtBlock_create(cpar, 1);
decBlock = PrrtBlock_create(cpar, 1);
PrrtCodingParams_destroy(cpar);
}
virtual void TearDown() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment