Commit 0c35bba7 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Recovering first packets that are lost on the channel.

parent 90004e7a
......@@ -43,7 +43,7 @@ void PrrtBlock_free(PrrtBlock **mblock) {
}
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar) {
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno) {
*mblock = calloc(1, sizeof(PrrtBlock));
check_mem(mblock);
......@@ -52,7 +52,7 @@ int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar) {
(*mblock)->data_blocks =List_create();
(*mblock)->redundancy_count = 0;
(*mblock)->redundancy_blocks =List_create();
(*mblock)->base_seqno = base_seqno;
return 0;
error:
......@@ -159,7 +159,7 @@ void PrrtBlock_decode(PrrtBlock *block_ptr) {
for(j = 0; j < k; j++) {
PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
PrrtPacket_create_data_packet(packet, 0, fec[j], length, 0);
PrrtPacket_create_data_packet(packet, 0, fec[j], length, block_ptr->base_seqno + j);
PrrtBlock_insert_data_packet(block_ptr, packet);
}
......
......@@ -10,6 +10,7 @@ typedef struct {
uint32_t redundancy_count;
PrrtCodingParams coding_params;
uint32_t largest_data_length;
uint16_t base_seqno;
List* data_blocks;
List* redundancy_blocks;
uint8_t is_coded;
......@@ -19,7 +20,7 @@ typedef struct {
/**
* Allocate space for a block.
*/
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar);
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar, uint16_t base_seqno);
/**
* Frees the PrrtBlock data structure.
......
......@@ -5,8 +5,13 @@
#include <src/util/dbg.h>
#include <src/prrt/socket.h>
#include <src/prrt/block.h>
#include <src/prrt/packet.h>
#include "data_receiver.h"
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno);
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block);
int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
......@@ -41,8 +46,8 @@ void *receive_data_loop(void *ptr) {
unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
PrrtPacket *packet;
PrrtPacketRedundancyPayload*redundancyPayload;
PrrtBlock* block;
PrrtPacketRedundancyPayload *redundancyPayload;
PrrtBlock *block;
while (1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
......@@ -65,7 +70,6 @@ void *receive_data_loop(void *ptr) {
break;
}
// check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store
......@@ -77,11 +81,15 @@ void *receive_data_loop(void *ptr) {
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
decode_block(sock_ptr, packet->seqno - packet->index);
break;
case PACKET_TYPE_REDUNDANCY:
redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable, redundancyPayload->base_seqno, redundancyPayload->n)) {
if (!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->base_seqno,
redundancyPayload->n)) {
//printf("-------- IRRELEVANT -----------------------\n");
//PrrtPacket_print(packet);
//printf("-------- IRRELEVANT -----------------------\n");
......@@ -90,38 +98,58 @@ void *receive_data_loop(void *ptr) {
}
block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
if(block == NULL) {
if (block == NULL) {
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
PrrtBlock_alloc(&block, cpar);
// TODO: range selection on dataStore to insert remaining packets -> insert stuff
BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno, block);
PrrtBlock_alloc(&block, cpar, redundancyPayload->base_seqno);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno, block);
}
PrrtBlock_insert_redundancy_packet(block, packet);
List* res = List_create();
BPTree_get_range(sock_ptr->blockStore, res, redundancyPayload->base_seqno, redundancyPayload->base_seqno + redundancyPayload->k - 1);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet(block, packetPtr);
BPTree_delete(sock_ptr->blockStore, packetPtr->seqno);
}
if(PrrtBlock_decode_ready(block)) {
PrrtBlock_decode(block);
}
decode_block(sock_ptr, redundancyPayload->base_seqno);
break;
default:
//PrrtPacket_print(packet);
break;
}
}
error:
free(packet);
}
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if (block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->coding_params.k, block);
PrrtBlock_decode(block);
LIST_FOREACH(block->data_blocks, first, next, cur) {
PrrtPacket *pkt = cur->value;
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
printf("RECOVERED %d via coding.\n", pkt->seqno);
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
}
}
}
}
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block) {
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet(block, packetPtr);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, packetPtr->seqno);
}
}
\ No newline at end of file
......@@ -49,7 +49,7 @@ void *send_data_loop(void *ptr) {
PrrtBlock *block = NULL;
PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&block, cpar);
PrrtBlock_alloc(&block, cpar, sock_ptr->sequenceNumberRedundancy);
while (1) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
......@@ -65,16 +65,17 @@ void *send_data_loop(void *ptr) {
int j = 0;
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
uint32_t pkt_count = (block)->data_count;
for (j = 0; j < pkt_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
send_packet(sock_ptr, data_pkt);
}
uint32_t red_count = (block)->redundancy_count;
for (j = 0; j < red_count; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
uint32_t pkt_count = (block)->data_count;
for (j = 0; j < pkt_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
send_packet(sock_ptr, data_pkt);
}
}
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
......
......@@ -13,8 +13,8 @@ protected:
decBlock = NULL;
PrrtCodingParams *cpar = (PrrtCodingParams *) calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&encBlock, cpar);
PrrtBlock_alloc(&decBlock, cpar);
PrrtBlock_alloc(&encBlock, cpar, 0);
PrrtBlock_alloc(&decBlock, cpar, 0);
}
PrrtBlock *encBlock;
PrrtBlock *decBlock;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment