Commit e988ccd2 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Rename encoding functions of PrrtBlock. Add decoding.

parent fdafe0ba
...@@ -3,7 +3,10 @@ ...@@ -3,7 +3,10 @@
#include <src/prrt/vdmcode/block_code.h> #include <src/prrt/vdmcode/block_code.h>
#include <stdlib.h> #include <stdlib.h>
#include <src/util/list.h> #include <src/util/list.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "block.h" #include "block.h"
#include "coding_params.h"
void clear_list(gf *const *src, uint8_t k) { void clear_list(gf *const *src, uint8_t k) {
int j = 0; int j = 0;
...@@ -17,20 +20,27 @@ void PrrtBlock_free(PrrtBlock **mblock) { ...@@ -17,20 +20,27 @@ void PrrtBlock_free(PrrtBlock **mblock) {
} }
int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar) { int PrrtBlock_alloc(PrrtBlock **mblock, const PrrtCodingParams *cpar) {
*mblock = malloc(sizeof(PrrtBlock)); *mblock = calloc(1, sizeof(PrrtBlock));
check_mem(mblock);
(*mblock)->coding_params = *cpar; (*mblock)->coding_params = *cpar;
(*mblock)->data_blocks =List_create(); (*mblock)->data_blocks =List_create();
(*mblock)->redundancy_blocks =List_create(); (*mblock)->redundancy_blocks =List_create();
return 0; return 0;
error:
PNOTIMPLEMENTED("TODO");
} }
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr) { int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr) {
check(block_ptr->data_count < block_ptr->coding_params.k, "Inserting an unnecessary item.");
List_push(block_ptr->data_blocks, packet_ptr); List_push(block_ptr->data_blocks, packet_ptr);
block_ptr->data_count++; block_ptr->data_count++;
block_ptr->largest_data_length = MAX(block_ptr->largest_data_length, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE); block_ptr->largest_data_length = MAX(block_ptr->largest_data_length, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return 0; return 0;
error:
PNOTIMPLEMENTED("HANDLING MISSING");
} }
int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr) { int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr) {
List_push(block_ptr->redundancy_blocks, ptr); List_push(block_ptr->redundancy_blocks, ptr);
...@@ -38,10 +48,14 @@ int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *p ...@@ -38,10 +48,14 @@ int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *p
return 0; return 0;
} }
int PrrtBlock_ready(const PrrtBlock *block_ptr) { int PrrtBlock_encode_ready(const PrrtBlock *block_ptr) {
return (block_ptr->data_count == block_ptr->coding_params.k) ? TRUE : FALSE; return (block_ptr->data_count == block_ptr->coding_params.k) ? TRUE : FALSE;
} }
int PrrtBlock_decode_ready(const PrrtBlock *block_ptr) {
return (block_ptr->data_count + block_ptr->redundancy_count == block_ptr->coding_params.k) ? TRUE : FALSE;
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) { PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) {
block_ptr->data_count--; block_ptr->data_count--;
return List_shift(block_ptr->data_blocks); return List_shift(block_ptr->data_blocks);
...@@ -79,7 +93,6 @@ void PrrtBlock_code(PrrtBlock *block_ptr, uint16_t *seqno) { ...@@ -79,7 +93,6 @@ void PrrtBlock_code(PrrtBlock *block_ptr, uint16_t *seqno) {
PrrtCoder_encode(coder, src, fec[j], j+k, length); // gf **src, gf *fec, int index, int sz PrrtCoder_encode(coder, src, fec[j], j+k, length); // gf **src, gf *fec, int index, int sz
PrrtPacket* red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void*) fec[j], length, *seqno, index, base_seqno, block_ptr->coding_params); PrrtPacket* red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void*) fec[j], length, *seqno, index, base_seqno, block_ptr->coding_params);
*seqno = *seqno+1 % SEQNO_SPACE; *seqno = *seqno+1 % SEQNO_SPACE;
PrrtPacket_print(red_packet_ptr);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr); PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
} }
......
...@@ -28,7 +28,8 @@ void PrrtBlock_free(PrrtBlock **mblock); ...@@ -28,7 +28,8 @@ void PrrtBlock_free(PrrtBlock **mblock);
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr); int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *packet_ptr);
int PrrtBlock_ready(const PrrtBlock *block_ptr); int PrrtBlock_encode_ready(const PrrtBlock *block_ptr);
int PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr); PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
......
...@@ -345,7 +345,6 @@ int PrrtPacket_destroy(PrrtPacket *packet_ptr) { ...@@ -345,7 +345,6 @@ int PrrtPacket_destroy(PrrtPacket *packet_ptr) {
return 0; return 0;
} }
// PACKET SPECIFIC CODE
int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr, int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr,
uint32_t data_len, int seqno) { uint32_t data_len, int seqno) {
uint32_t payload_length = (uint32_t) (data_len + sizeof(PrrtPacketDataPayload)); uint32_t payload_length = (uint32_t) (data_len + sizeof(PrrtPacketDataPayload));
......
#ifndef PRRT_FRAME_H #ifndef PRRT_FRAME_H
#define PRRT_FRAME_H #define PRRT_FRAME_H
// CREATE: PRRT_PACKET
// ENCODE: PRRT_PACKET -> BYTES
// DECODE: BYTES -> PRRT_PACKET
// DESTROY:
#include <stdint.h> #include <stdint.h>
#include <src/prrt/coding_params.h> #include <src/prrt/coding_params.h>
...@@ -57,11 +62,6 @@ uint8_t PrrtPacket_priority(PrrtPacket *packet_ptr); ...@@ -57,11 +62,6 @@ uint8_t PrrtPacket_priority(PrrtPacket *packet_ptr);
uint16_t PrrtPacket_size(PrrtPacket *packet_ptr); uint16_t PrrtPacket_size(PrrtPacket *packet_ptr);
int PrrtPacket_print(PrrtPacket *packet_ptr); int PrrtPacket_print(PrrtPacket *packet_ptr);
// CREATE: PRRT_PACKET
// ENCODE: PRRT_PACKET -> BYTES
// DECODE: BYTES -> PRRT_PACKET
// DESTROY:
int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr, int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr,
uint32_t data_len, int seqno); uint32_t data_len, int seqno);
......
...@@ -4,6 +4,9 @@ ...@@ -4,6 +4,9 @@
#include <src/defines.h> #include <src/defines.h>
#include <src/util/dbg.h> #include <src/util/dbg.h>
#include <src/prrt/socket.h> #include <src/prrt/socket.h>
#include <src/prrt/packet.h>
#include <src/prrt/block.h>
#include <src/prrt/coding_params.h>
#include "data_receiver.h" #include "data_receiver.h"
int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) { int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
...@@ -40,6 +43,8 @@ void *receive_data_loop(void *ptr) { ...@@ -40,6 +43,8 @@ void *receive_data_loop(void *ptr) {
unsigned char buffer[MAX_PAYLOAD_LENGTH]; unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
PrrtPacket *packet; PrrtPacket *packet;
PrrtPacketRedundancyPayload*redundancyPayload;
PrrtBlock* block;
while (1) { while (1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH); memset(buffer, 0, MAX_PAYLOAD_LENGTH);
...@@ -62,9 +67,12 @@ void *receive_data_loop(void *ptr) { ...@@ -62,9 +67,12 @@ void *receive_data_loop(void *ptr) {
break; break;
} }
// check incomplete_prrt_blocks for this seqno: insert if found // check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store // else: insert in data_packet_store
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, packet);
// forward to application layer // forward to application layer
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex); pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
...@@ -72,6 +80,37 @@ void *receive_data_loop(void *ptr) { ...@@ -72,6 +80,37 @@ void *receive_data_loop(void *ptr) {
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv); pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex); pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
break; break;
case PACKET_TYPE_REDUNDANCY:
redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable, redundancyPayload->base_seqno, redundancyPayload->n)) {
printf("-------- IRRELEVANT -----------------------\n");
PrrtPacket_print(packet);
printf("-------- IRRELEVANT -----------------------\n");
PrrtPacket_destroy(packet);
break;
}
block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
if(block == NULL) {
PrrtCodingParams *cpar = malloc(sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
PrrtBlock_alloc(&block, cpar);
// TODO: range selection on dataStore to insert remaining packets -> insert stuff
BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno, block);
}
PrrtBlock_insert_redundancy_packet(block, packet);
if(PrrtBlock_decode_ready(block)) {
PrrtBlock_decode(block, &sock_ptr->sequenceNumberRedundancy);
}
printf("RELEVANT\n");
break;
default: default:
//PrrtPacket_print(packet); //PrrtPacket_print(packet);
break; break;
......
...@@ -61,7 +61,7 @@ void *send_data_loop(void *ptr) { ...@@ -61,7 +61,7 @@ void *send_data_loop(void *ptr) {
PrrtBlock_insert_data_packet(block, packet); PrrtBlock_insert_data_packet(block, packet);
if (PrrtBlock_ready(block)) { if (PrrtBlock_encode_ready(block)) {
int j = 0; int j = 0;
PrrtBlock_code(block, &sock_ptr->sequenceNumberRedundancy); PrrtBlock_code(block, &sock_ptr->sequenceNumberRedundancy);
......
...@@ -11,12 +11,15 @@ ...@@ -11,12 +11,15 @@
#include <src/prrt/processes/data_transmitter.h> #include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h> #include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h> #include <src/util/dbg.h>
#include "socket.h"
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) { int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
sock_ptr->sequenceNumberSource = 1; sock_ptr->sequenceNumberSource = 1;
sock_ptr->sequenceNumberRedundancy = 1; sock_ptr->sequenceNumberRedundancy = 1;
sock_ptr->dataStore = NULL;
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.") check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket."); check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
...@@ -113,6 +116,10 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) { ...@@ -113,6 +116,10 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
} }
int PrrtSocket_close(const PrrtSocket *sock_ptr) { int PrrtSocket_close(const PrrtSocket *sock_ptr) {
if(sock_ptr->dataStore != NULL) {
BPTree_destroy(sock_ptr->dataStore);
}
// TODO: shut down threads; // TODO: shut down threads;
// TODO: clean up all receivers // TODO: clean up all receivers
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <src/util/list.h> #include <src/util/list.h>
#include <src/prrt/packet.h> #include <src/prrt/packet.h>
#include <src/prrt/stores/forward_packet_table.h> #include <src/prrt/stores/forward_packet_table.h>
#include <src/util/bptree.h>
typedef struct { typedef struct {
const char* host_name; const char* host_name;
...@@ -26,6 +27,10 @@ typedef struct { ...@@ -26,6 +27,10 @@ typedef struct {
pthread_cond_t inQueueFilledMutexCv; pthread_cond_t inQueueFilledMutexCv;
List *inQueue; List *inQueue;
BPTreeNode* dataStore;
BPTreeNode* blockStore;
PrrtForwardPacketTable* forwardPacketTable; PrrtForwardPacketTable* forwardPacketTable;
PrrtReceiver receivers[PRRT_MAX_RECEIVER_COUNT]; PrrtReceiver receivers[PRRT_MAX_RECEIVER_COUNT];
......
add_subdirectory(lib/gtest-1.7.0) add_subdirectory(lib/gtest-1.7.0)
include_directories(SYSTEM ${gtest_SOURCE_DIR}/include ${gtest_SOURCE_DIR}) include_directories(SYSTEM ${gtest_SOURCE_DIR}/include ${gtest_SOURCE_DIR})
add_executable(prrtTests forward_packet_table_tests.cpp bptree_tests.cpp) add_executable(prrtTests forward_packet_table_tests.cpp bptree_tests.cpp PrrtBlock_tests.cpp)
target_link_libraries(prrtTests LINK_PUBLIC PRRT UTIL gtest gtest_main) target_link_libraries(prrtTests LINK_PUBLIC PRRT UTIL gtest gtest_main)
\ No newline at end of file
#include <gtest/gtest.h>
extern "C" {
#include "src/prrt/block.h"
#include "src/prrt/packet.h"
#include "src/prrt/coding_params.h"
}
class PrrtBlockTest : public ::testing::Test {
protected:
virtual void SetUp() {
encBlock = NULL;
decBlock = NULL;
PrrtCodingParams *cpar = (PrrtCodingParams *) malloc(sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&encBlock, cpar);
PrrtBlock_alloc(&decBlock, cpar);
}
PrrtBlock *encBlock;
PrrtBlock *decBlock;
};
TEST_F(PrrtBlockTest, Encode) {
uint16_t base = 0;
PrrtPacket packets[2];
PrrtPacket redPackets[2];
for (int i = 1; i < 3; ++i) {
char data[3];
sprintf(data, "%d", i);
PrrtPacket *ptr = &packets[i - 1];
PrrtPacket_create_data_packet(ptr, 0, data, (uint32_t) strlen(data), i);
PrrtBlock_insert_data_packet(encBlock, ptr);
}
// ENCODING
int ready = PrrtBlock_encode_ready(encBlock);
ASSERT_TRUE(ready);
if (ready) {
int j = 0;
PrrtBlock_code(encBlock, &base);
uint32_t pkt_count = (encBlock)->data_count;
for (j = 0; j < pkt_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(encBlock);
ASSERT_EQ(data_pkt->seqno, packets[j].seqno);
}
uint32_t red_count = (encBlock)->redundancy_count;
for (j = 0; j < red_count; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(encBlock);
redPackets[j] = *red_pkt;
}
}
// DECODING
PrrtBlock_insert_data_packet(decBlock, &packets[0]);
PrrtBlock_insert_redundancy_packet(decBlock, &redPackets[0]);
int readyDec = PrrtBlock_decode_ready(decBlock);
ASSERT_TRUE(readyDec);
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment