Commit e55699a3 authored by Andreas Schmidt's avatar Andreas Schmidt

Fix block code. Indizes were wrong, decoding matrix creation was erroneous.

parent 251267f9
......@@ -13,8 +13,8 @@
#define GF_BITS 8
#define K_START 2
#define N_START 4
#define K_START 4
#define N_START 7
#define N_P_START 1
// Uncomment the line below if you are compiling on Windows.
......
......@@ -10,28 +10,30 @@
void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
int i;
uint32_t m = List_count(block_ptr->dataPackets);
uint32_t m = 0;
PrrtPacket *packet = NULL;
uint32_t redundancyBlocks = MIN(List_count(block_ptr->redundancyPackets), block_ptr->codingParams.k - m);
uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
for(i = 0; i < redundancyBlocks; i++) {
packet = List_shift(block_ptr->redundancyPackets);
PrrtPacket_copy_payload_to_buffer(fec[m + i], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m + i] = (int) packet->index;
while(idx_p[m] != -1) {
m++;
}
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
}
}
void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
int m = 0;
uint32_t dataBlocks = List_count(block_ptr->dataPackets);
for(m = 0; m < dataBlocks; m++) {
PrrtPacket *packet = List_shift(block_ptr->dataPackets);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
PrrtPacket *packet = current->value;
PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[packet->index] = packet->index;
}
}
......@@ -73,27 +75,42 @@ int PrrtBlock_create(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t b
int PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket)
{
check(List_count(prrtBlock->dataPackets) < prrtBlock->codingParams.k, "Inserting an unnecessary item.");
List_push(prrtBlock->dataPackets, prrtPacket);
check(prrtPacket->payload_len >= PRRT_PACKET_DATA_HEADER_SIZE, "Inserted packet too small.")
prrtBlock->largestDataLength = (uint32_t) MAX(prrtBlock->largestDataLength,
prrtPacket->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return 0;
int found = FALSE;
LIST_FOREACH(prrtBlock->dataPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == prrtPacket->seqno) {
found = TRUE;
}
}
error:
PNOTIMPLEMENTED("HANDLING MISSING");
if(found == FALSE) {
List_push(prrtBlock->dataPackets, prrtPacket);
prrtBlock->largestDataLength = (uint32_t) MAX(prrtBlock->largestDataLength,
prrtPacket->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return EXIT_SUCCESS;
} else {
return EXIT_FAILURE;
}
}
int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr)
{
List_push(block_ptr->redundancyPackets, ptr);
check(ptr->payload_len >= PRRT_PACKET_REDUNDANCY_HEADER_SIZE, "Inserted packet too small.")
block_ptr->largestDataLength = (uint32_t) MAX(block_ptr->largestDataLength,
ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return 0;
int found = FALSE;
LIST_FOREACH(block_ptr->redundancyPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == ptr->seqno) {
found = TRUE;
}
}
error:
PNOTIMPLEMENTED("HANDLING MISSING");
if(found == FALSE) {
List_push(block_ptr->redundancyPackets, ptr);
block_ptr->largestDataLength = (uint32_t) MAX(block_ptr->largestDataLength,
ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return EXIT_SUCCESS;
} else {
return EXIT_FAILURE;
}
}
int PrrtBlock_encode_ready(const PrrtBlock *block_ptr)
......@@ -118,6 +135,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
uint8_t k = block_ptr->codingParams.k;
uint8_t n = block_ptr->codingParams.n;
uint8_t r = block_ptr->codingParams.r;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
......@@ -129,7 +147,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
src[j] = calloc(length, sizeof(gf));
PrrtPacket *pkt = cur->value;
pkt->index = (uint8_t) j;
pkt->index = (uint8_t) ((pkt->seqno - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
......@@ -144,7 +162,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), base_seqno,
block_ptr->codingParams);
*seqno = (uint16_t) (*seqno + 1 % SEQNO_SPACE);
*seqno = (uint16_t) ((*seqno + 1) % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
}
......@@ -166,7 +184,7 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
int *idx_p = NULL;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
......@@ -181,15 +199,25 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
idx_p = calloc(k, sizeof(int));
check_mem(idx_p);
for(i = 0; i < k; i++) {
idx_p[i] = -1;
}
gather_redundancy_packets(block_ptr, fec, idx_p);
gather_data_packets(block_ptr, fec, idx_p);
gather_redundancy_packets(block_ptr, fec, idx_p);
PrrtCoder_decode(coder, fec, idx_p, length);
int decodingRes = PrrtCoder_decode(coder, fec, idx_p, length);
check(decodingRes == EXIT_SUCCESS, "Decoding failed.");
for(j = 0; j < k; j++) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (block_ptr->baseSequenceNumber + j));
PrrtBlock_insert_data_packet(block_ptr, packet);
if(idx_p[j] >= k) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (baseSequenceNumber + j));
int insertRes = PrrtBlock_insert_data_packet(block_ptr, packet);
if(insertRes == EXIT_FAILURE) {
debug("Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
}
}
}
error:
......
......@@ -15,20 +15,17 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr);
int insertRes = PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr);
check(insertRes == EXIT_SUCCESS, "Insert failed!")
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
}
List_destroy(res);
error:
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno)
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
{
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if(block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->codingParams.k, block);
PrrtBlock_decode(block);
while(List_count(block->dataPackets) > 0) {
......@@ -102,18 +99,29 @@ void *receive_data_loop(void *ptr)
switch(PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
// packet.timestamp + packet.timeout < now: break
// TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
FALSE) {
PrrtPacket_destroy(packet);
} else {
// check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store
uint16_t baseSequenceNumber = packet->seqno - packet->index;
PrrtPacket* reference = PrrtPacket_copy(packet);
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
if(block != NULL) {
int res = PrrtBlock_insert_data_packet(block, reference);
check(res == EXIT_SUCCESS, "Inserting failed")
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->seqno) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
......@@ -121,8 +129,6 @@ void *receive_data_loop(void *ptr)
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
decode_block(sock_ptr, packet->seqno - packet->index);
}
break;
case PACKET_TYPE_REDUNDANCY:
......@@ -148,8 +154,14 @@ void *receive_data_loop(void *ptr)
block);
}
PrrtBlock_insert_redundancy_packet(block, packet);
decode_block(sock_ptr, redundancyPayload->base_seqno);
retrieve_data_blocks(sock_ptr, redundancyPayload->base_seqno, block->codingParams.k, block);
int res = PrrtBlock_insert_redundancy_packet(block, packet);
if(res == EXIT_SUCCESS) {
decode_block(sock_ptr, block, redundancyPayload->base_seqno);
} else {
PrrtPacket_destroy(packet);
}
}
break;
default:
......
......@@ -60,8 +60,6 @@ void * send_data_loop(void *ptr) {
if(block != NULL) {
PrrtBlock_destroy(block);
}
block = NULL;
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
return NULL;
......@@ -74,10 +72,11 @@ void * send_data_loop(void *ptr) {
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberRedundancy);
PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberSource);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
packet->seqno = sock_ptr->sequenceNumberSource++;
PrrtBlock_insert_data_packet(block, packet);
......
......@@ -95,7 +95,7 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
List_push(sock_ptr->outQueue, packet);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
......@@ -123,7 +123,6 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
pthread_mutex_unlock(&t);
......
......@@ -5,7 +5,7 @@
#include "forward_packet_table.h"
int check_position(const PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
int is_position_relevant(const PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
uint16_t stop = (uint16_t) (SEQNO_SPACE / 2 + fpt_ptr->start);
if (fpt_ptr->start < stop && !(fpt_ptr->start <= seqno && seqno <= stop)) {
return FALSE;
......@@ -42,7 +42,7 @@ int PrrtForwardPacketTable_create(PrrtForwardPacketTable *fpt_prt) {
}
int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
int res = check_position(fpt_ptr, seqno);
int res = is_position_relevant(fpt_ptr, seqno);
if(res) {
uint16_t which_byte = (uint16_t) (seqno / 32);
......@@ -63,7 +63,7 @@ int PrrtForwardPacketTable_test_is_block_relevant(PrrtForwardPacketTable * forwa
int i;
for (i = 0; i < length; i++) {
res = res || check_position(forwardPacketTable, (uint16_t) (start + i));
res = res || is_position_relevant(forwardPacketTable, (uint16_t) (start + i));
}
return res;
......
......@@ -37,7 +37,7 @@ int main(int argc, char* const argv[]) {
continue;
}
buffer[n] = '\0';
printf("[B (n: %d, i: %3d)] %s", n, i, buffer);
printf("[B (n: %3d, i: %3d)] %s", n, i, buffer);
i++;
usleep(1);
}
......
......@@ -18,9 +18,9 @@ int main(int argc, char *const argv) {
int res = PrrtSocket_create(sock, local_port, TRUE);
check(res == EXIT_SUCCESS, "Socket creation failed.");
char *remote_host = "127.0.0.1";
uint16_t remote_port = 5000;
PrrtSocket_connect(sock, remote_host, remote_port);
//char *remote_host = "127.0.0.1";
//uint16_t remote_port = 5000;
//PrrtSocket_connect(sock, remote_host, remote_port);
char *remote_host2 = "127.0.0.1";
uint16_t remote_port2 = 5004;
......@@ -32,17 +32,24 @@ int main(int argc, char *const argv) {
char *line = NULL;
size_t len = 0;
fp = fopen("/opt/in.txt", "r");
if (fp == NULL)
exit(EXIT_FAILURE);
int j = 0;
int rounds = 1;
while ((getline(&line, &len, fp)) != -1) {
char buf[MAX_PAYLOAD_LENGTH];
sprintf(buf, "%s", line);
PrrtSocket_send(sock, buf, strlen(buf));
while(j < rounds) {
fp = fopen("/opt/in.txt", "r");
if (fp == NULL)
exit(EXIT_FAILURE);
while ((getline(&line, &len, fp)) != -1) {
char buf[MAX_PAYLOAD_LENGTH];
sprintf(buf, "%s", line);
PrrtSocket_send(sock, buf, strlen(buf));
}
fclose(fp);
j++;
}
fclose(fp);
if (line)
free(line);
......
#include <gtest/gtest.h>
extern "C" {
#include "src/prrt/block.h"
#include "src/prrt/packet.h"
#include "src/prrt/coding_params.h"
#include "src/util/dbg.h"
#include "src/prrt/vdmcode/block_code.h"
#include "src/util/common.h"
}
class PrrtBlockTest : public ::testing::Test {
......@@ -14,30 +18,97 @@ protected:
decBlock = (PrrtBlock *) calloc(1, sizeof(PrrtBlock));
PrrtCodingParams *cpar = (PrrtCodingParams *) calloc(1, sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_create(encBlock, cpar, 0);
PrrtBlock_create(decBlock, cpar, 0);
PrrtBlock_create(encBlock, cpar, 1);
PrrtBlock_create(decBlock, cpar, 1);
}
PrrtBlock *encBlock;
PrrtBlock *decBlock;
};
TEST_F(PrrtBlockTest, VDMCode)
{
int i, j;
uint8_t k = 4;
uint8_t n = 7;
uint8_t r = n - k;
uint8_t length = 3;
PrrtCoder *coder = NULL;
PrrtCoder_get_coder(&coder, n, k);
gf **src = (gf **) calloc(k, sizeof(gf *));
src[0] = (gf *) calloc(length, sizeof(gf));
src[0][0] = 1; src[0][2] = 3;
src[1] = (gf *) calloc(length, sizeof(gf));
src[1][0] = 2; src[1][2] = 7;
src[2] = (gf *) calloc(length, sizeof(gf));
src[2][0] = 3; src[2][2] = 6;
src[3] = (gf *) calloc(length, sizeof(gf));
src[3][0] = 4; src[3][1]= 9; src[3][2] = 5;
printf("----\n");
for(i = 0; i < k; ++i) {
print_gf(src[i], length);
}
printf("----\n");
gf **encFec = (gf **) calloc(1, sizeof(gf *) * r);
for(j = 0; j < r; j++) {
encFec[j] = (gf *) calloc(length, sizeof(gf));
PrrtCoder_encode(coder, src, encFec[j], j + k, length);
}
for(i = 0; i < r; ++i) {
print_gf(encFec[i], length);
}
printf("----\n");
gf **decFec = (gf **) calloc(1, sizeof(gf *) * n);
int *idx_p = (int *) calloc(k, sizeof(int));
for(i = 0; i < n; i++) {
decFec[i] = (gf *) calloc(length, sizeof(gf));
}
memcpy(decFec[0], src[0], length);
idx_p[0] = 0;
memcpy(decFec[1], src[1], length);
idx_p[1] = 1;
memcpy(decFec[2], encFec[0], length);
idx_p[2] = 4;
memcpy(decFec[3], src[3], length);
idx_p[3] = 3;
PrrtCoder_decode(coder, decFec, idx_p, length);
for(i = 0; i < n; ++i) {
print_gf(decFec[i], length);
}
printf("----\n");
}
TEST_F(PrrtBlockTest, Encode)
TEST_F(PrrtBlockTest, EncodeDecode)
{
uint16_t base = 0;
uint16_t base = 1;
PrrtPacket *packets[2];
PrrtPacket *refPackets[2];
PrrtPacket *packets[4];
PrrtPacket *refPackets[4];
PrrtPacket *redPackets = (PrrtPacket *) calloc(2, sizeof(PrrtPacket));
PrrtPacket *redPackets = (PrrtPacket *) calloc(4, sizeof(PrrtPacket));
for(int i = 1; i < 3; i++) {
for(int i = 0; i < 4; i++) {
char data[3];
sprintf(data, "%d", i);
packets[i - 1] = PrrtPacket_create_data_packet(0, data, (uint32_t) strlen(data), i);
refPackets[i - 1] = PrrtPacket_create_data_packet(0, data, (uint32_t) strlen(data), i);
PrrtBlock_insert_data_packet(encBlock, packets[i-1]);
packets[i] = PrrtPacket_create_data_packet(0, data, (uint32_t) strlen(data), i+1);
packets[i]->index = (uint8_t) i;
refPackets[i] = PrrtPacket_create_data_packet(0, data, (uint32_t) strlen(data), i+1);
ASSERT_TRUE(PrrtBlock_insert_data_packet(encBlock, packets[i]) == EXIT_SUCCESS);
}
// ENCODING
......@@ -59,14 +130,16 @@ TEST_F(PrrtBlockTest, Encode)
printf("------------------------\n");
// DECODING
PrrtBlock_insert_redundancy_packet(decBlock, redPackets);
PrrtBlock_insert_data_packet(decBlock, packets[0]);
PrrtBlock_insert_redundancy_packet(decBlock, &redPackets[0]);
PrrtBlock_insert_data_packet(decBlock, packets[1]);
PrrtBlock_insert_data_packet(decBlock, packets[2]);
ASSERT_TRUE(PrrtBlock_decode_ready(decBlock));
PrrtBlock_decode(decBlock);
for(int k = 0; k < 2; ++k) {
for(int k = 0; k < 4; ++k) {
PrrtPacket *ptr = PrrtBlock_get_first_data(decBlock);
const char *s1 = (const char *) ((char *) refPackets[k]->payload + PRRT_PACKET_DATA_HEADER_SIZE);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment