Commit 45ae311f authored by rna's avatar rna

Complete data header is error-coded.

* Data length is part of DataHeader.
* Reconstruct data packet from payload.
* Ensure sizes are kept.
* int -> int16_t for block code.
parent 818b1d64
Pipeline #1463 failed with stages
in 2 minutes and 8 seconds
......@@ -6,7 +6,7 @@
#include "packet.h"
#include "block.h"
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
{
uint32_t i;
uint32_t m = 0;
......@@ -26,11 +26,11 @@ static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec
}
}
static void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
static void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
{
LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
PrrtPacket *packet = current->value;
PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, 0);
idx_p[packet->index] = packet->index;
}
}
......@@ -77,7 +77,7 @@ PrrtBlock * PrrtBlock_create(PrrtCodingParams *cpar, prrtSequenceNumber_t baseSe
block_ptr->dataPackets = List_create();
block_ptr->redundancyPackets = List_create();
block_ptr->baseSequenceNumber = baseSequenceNumber;
block_ptr->largestDataLength = 0;
block_ptr->largestPayloadLength = 0;
block_ptr->coder = NULL;
pthread_mutexattr_t attr;
......@@ -105,8 +105,8 @@ bool PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *prrtPa
if(found == false) {
List_push(block_ptr->dataPackets, prrtPacket);
block_ptr->largestDataLength = (prrtPacketLength_t) MAX(block_ptr->largestDataLength,
prrtPacket->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
block_ptr->largestPayloadLength = (prrtPacketLength_t) MAX(block_ptr->largestPayloadLength,
prrtPacket->payloadLength);
}
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
return found == false;
......@@ -128,7 +128,7 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *
}
if(found == false) {
List_push(block_ptr->redundancyPackets, ptr);
block_ptr->largestDataLength = (prrtPacketLength_t) MAX(block_ptr->largestDataLength,
block_ptr->largestPayloadLength = (prrtPacketLength_t) MAX(block_ptr->largestPayloadLength,
ptr->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
}
check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
......@@ -182,7 +182,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
uint8_t k = block_ptr->codingParams->k;
uint8_t r = block_ptr->codingParams->r;
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestDataLength;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
......@@ -194,7 +194,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
check_mem(src[j]);
PrrtPacket *pkt = cur->value;
pkt->index = (uint8_t) ((pkt->sequenceNumber - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, 0);
j++;
}
......@@ -228,21 +228,21 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
bool PrrtBlock_decode(PrrtBlock *block_ptr)
{
int i, j = 0;
prrtIndex_t i, j = 0;
gf **fec = NULL;
int *idx_p = NULL;
uint8_t k = 0;
int16_t *idx_p = NULL;
prrtIndex_t k = 0;
check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
k = block_ptr->codingParams->k;
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestDataLength;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
fec = calloc(k, sizeof(gf *));
check_mem(fec);
for(i = 0; i < k; i++) {
fec[i] = calloc(block_ptr->largestDataLength, sizeof(gf));
fec[i] = calloc(block_ptr->largestPayloadLength, sizeof(gf));
}
idx_p = calloc(k, sizeof(int));
......@@ -258,8 +258,12 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
for(j = 0; j < k; j++) {
if(idx_p[j] >= k) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length,
(prrtSequenceNumber_t) (baseSequenceNumber + j), 0);
PrrtPacketDataPayload* packet_and_payload = (PrrtPacketDataPayload *) fec[j];
PrrtPacket *packet = PrrtPacket_reconstruct_data_packet(packet_and_payload, j,
(prrtSequenceNumber_t) (baseSequenceNumber + j));
if(PrrtBlock_insert_data_packet(block_ptr, packet) == false) {
debug(DEBUG_BLOCK, "Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
......
......@@ -9,7 +9,7 @@
typedef struct prrtBlock {
PrrtCodingParams* codingParams;
prrtPacketLength_t largestDataLength;
prrtPacketLength_t largestPayloadLength;
prrtSequenceNumber_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
......
......@@ -247,6 +247,10 @@ void *encode_data_header(void *buf_ptr, const void *payload)
{
const PrrtPacketDataPayload *data_payload = payload;
prrtPacketLength_t *packetLength = (prrtPacketLength_t*) buf_ptr;
*packetLength = htonl(data_payload->dataLength);
buf_ptr += sizeof(prrtPacketLength_t);
prrtTimestamp_t *timestamp = (prrtTimestamp_t *) buf_ptr;
*timestamp = htonl(data_payload->timestamp);
buf_ptr += sizeof(prrtTimestamp_t);
......@@ -393,6 +397,10 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer)
{
PrrtPacketDataPayload *data_payload = (PrrtPacketDataPayload *) srcBuffer;
prrtPacketLength_t *dataLength = (prrtPacketLength_t *) dstBuffer;
data_payload->dataLength = ntohl(*dataLength);
dstBuffer += sizeof(prrtPacketLength_t);
prrtTimestamp_t *timestamp = (prrtTimestamp_t *) dstBuffer;
data_payload->timestamp = ntohl(*timestamp);
dstBuffer += sizeof(prrtTimestamp_t);
......@@ -415,6 +423,7 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer)
return dstBuffer;
}
int PrrtPacket_destroy(PrrtPacket *packet)
{
if(packet->payload != NULL) {
......@@ -425,16 +434,17 @@ int PrrtPacket_destroy(PrrtPacket *packet)
}
PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer,
prrtPacketLength_t payloadLength, prrtSequenceNumber_t sequenceNumber,
prrtPacketLength_t dataLength, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t targetDelay)
{
PrrtPacket *packet = create_header(priority, sequenceNumber,
(prrtPacketLength_t) (payloadLength + PRRT_PACKET_DATA_HEADER_SIZE), PACKET_TYPE_DATA, 0);
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE), PACKET_TYPE_DATA, 0);
PrrtPacketDataPayload *dataPayload = calloc(1, packet->payloadLength);
check_mem(dataPayload);
packet->payload = dataPayload;
dataPayload->dataLength = dataLength;
dataPayload->timestamp = PrrtClock_get_current_time_us();
dataPayload->packetTimeout_us = dataPayload->timestamp + targetDelay;
debug(DEBUG_PACKET, "timeout = %lu + %lu", (unsigned long) dataPayload->timestamp, (unsigned long) targetDelay);
......@@ -450,6 +460,25 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
return NULL;
}
PrrtPacket *PrrtPacket_reconstruct_data_packet(PrrtPacketDataPayload *payload, prrtIndex_t index, prrtSequenceNumber_t sequenceNumber) {
prrtPacketLength_t dataLength = payload->dataLength;
PrrtPacket *packet = create_header(0, sequenceNumber,
(prrtPacketLength_t) (dataLength + PRRT_PACKET_DATA_HEADER_SIZE), PACKET_TYPE_DATA, index);
PrrtPacketDataPayload *dataPayload = calloc(1, packet->payloadLength);
check_mem(dataPayload);
packet->payload = dataPayload;
PrrtPacket_copy_buffer_to_payload(packet, payload, 0)
return packet;
error:
PERROR("Could not reconstruct packet.%s", "");
return NULL;
}
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer,
prrtPacketLength_t payloadLength,
prrtSequenceNumber_t sequenceNumber, uint8_t index,
......
......@@ -40,6 +40,7 @@ typedef struct prrtPacket {
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
typedef struct prrtPacketDataPayload {
prrtPacketLength_t dataLength;
prrtTimestamp_t timestamp;
prrtTimedelta_t groupRTT_us;
prrtTimestamp_t packetTimeout_us;
......@@ -68,7 +69,7 @@ typedef struct prrtPacketFeedbackPayload {
uint32_t bandwidthEstimate;
PrrtIncompleteBlock *incompleteBlocks;
} PrrtPacketFeedbackPayload;
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE sizeof(PrrtPacketFeedbackPayload) - sizeof(PrrtIncompleteBlock*)
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE (sizeof(PrrtPacketFeedbackPayload) - sizeof(PrrtIncompleteBlock*))
prrtPacketType_t PrrtPacket_type(PrrtPacket *packet_ptr);
......@@ -81,8 +82,9 @@ int PrrtPacket_print(PrrtPacket *packet_ptr);
PrrtPacket *PrrtPacket_copy(PrrtPacket *original);
PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadPointer,
prrtPacketLength_t payloadLength, prrtSequenceNumber_t sequenceNumber,
prrtPacketLength_t dataLength, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t targetDelay);
PrrtPacket* PrrtPacket_reconstruct_data_packet(PrrtPacketDataPayload* payload, prrtIndex_t index, prrtSequenceNumber_t sequenceNumber);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, prrtSequenceNumber_t sequenceNumber,
prrtTimedelta_t groupRTT, prrtSequenceNumber_t gapLength,
......@@ -101,13 +103,13 @@ bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
int PrrtPacket_destroy(PrrtPacket *packet);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payloadLength - header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payloadLength - header_size);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, (packet)->payload + (header_size), (packet)->payloadLength - (header_size));
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy((packet)->payload + (header_size), src, (packet)->payloadLength - (header_size));
#define PrrtPacket_get_data_timestamp(packet) ((PrrtPacketDataPayload*) packet->payload)->timestamp;
#define PrrtPacket_byListNode(lnptr) ((PrrtPacket *) (lnptr))
#define PrrtDataPacket_packet_timeout(packet) ((PrrtPacketDataPayload*) packet->payload)->packetTimeout_us
#define PrrtDataPacket_packet_timeout(packet) ((PrrtPacketDataPayload*) (packet)->payload)->packetTimeout_us
#endif //PRRT_FRAME_H
......@@ -663,7 +663,7 @@ void PrrtCoder_encode(PrrtCoder *cod, gf **src, gf *fec, int index, int sz) {
* indexes. The matrix must be already allocated as
* a vector of k*k elements, in row-major order
*/
static gf *prrt_coder_build_matrix(PrrtCoder *cod, int *index) {
static gf *prrt_coder_build_matrix(PrrtCoder *cod, int16_t *index) {
int i, k = cod->params.k;
gf *p, *matrix = NEW_GF_MATRIX(k, k);
if (!matrix)
......@@ -694,7 +694,7 @@ static gf *prrt_coder_build_matrix(PrrtCoder *cod, int *index) {
return matrix;
}
int PrrtCoder_decode(PrrtCoder *cod, gf **pkt, int *index, int sz) {
int PrrtCoder_decode(PrrtCoder *cod, gf **pkt, int16_t *index, int sz) {
gf *m_dec = 0;
gf **new_pkt = 0;
int row, col, k = cod->params.k;
......
......@@ -126,6 +126,6 @@ void PrrtCoder_encode(PrrtCoder *cod, gf **src, gf *dst, int index, int sz) ;
* \param index: this is the position of the packets in the block code.
* \param sz: the size of the data of the packets.
*/
int PrrtCoder_decode(PrrtCoder *cod, gf **pkt, int *index, int sz) ;
int PrrtCoder_decode(PrrtCoder *cod, gf **pkt, int16_t *index, int sz) ;
#endif //PRRT_BLOCK_CODE_H
\ No newline at end of file
......@@ -61,7 +61,7 @@ TEST_F(PrrtBlockTest, VDMCode)
}
gf **decFec = (gf **) calloc(1, sizeof(gf *) * k);
int *idx_p = (int *) calloc(k, sizeof(int));
int16_t *idx_p = (int16_t *) calloc(k, sizeof(int16_t));
for(i = 0; i < k; i++) {
decFec[i] = (gf *) calloc(length, sizeof(gf));
......@@ -108,11 +108,15 @@ TEST_F(PrrtBlockTest, EncodeDecode)
PrrtPacket *packets[4];
PrrtPacket *refPackets[4];
PrrtPacket *redPackets = (PrrtPacket *) calloc(4, sizeof(PrrtPacket));
PrrtPacket *redPackets[4];
for(uint32_t i = 0; i < 4; i++) {
char data[3];
sprintf(data, "%d", i);
if(i == 3) {
// Last packet has a different size.
sprintf(data, "D:%d", i);
}
packets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data), i + 1, 0);
packets[i]->index = (uint8_t) i;
refPackets[i] = PrrtPacket_create_data_packet(0, data, (prrtPacketLength_t) strlen(data), i + 1, 0);
......@@ -132,16 +136,16 @@ TEST_F(PrrtBlockTest, EncodeDecode)
uint32_t red_count = List_count(encBlock->redundancyPackets);
for(j = 0; j < red_count; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(encBlock);
redPackets[j] = *red_pkt;
redPackets[j] = red_pkt;
}
printf("------------------------\n");
// DECODING
PrrtBlock_insert_redundancy_packet(decBlock, redPackets);
PrrtBlock_insert_data_packet(decBlock, packets[0]);
PrrtBlock_insert_data_packet(decBlock, packets[1]);
PrrtBlock_insert_data_packet(decBlock, packets[2]);
PrrtBlock_insert_redundancy_packet(decBlock, redPackets[0]);
PrrtBlock_insert_redundancy_packet(decBlock, redPackets[1]);
PrrtBlock_insert_redundancy_packet(decBlock, redPackets[2]);
PrrtBlock_insert_data_packet(decBlock, packets[3]);
ASSERT_TRUE(PrrtBlock_decode_ready(decBlock));
......@@ -149,10 +153,16 @@ TEST_F(PrrtBlockTest, EncodeDecode)
for(int k = 0; k < 4; ++k) {
PrrtPacket *ptr = PrrtBlock_get_first_data(decBlock);
prrtIndex_t idx = static_cast<prrtIndex_t>(ptr->sequenceNumber - 1);
const char *s1 = (const char *) ((char *) refPackets[k]->payload + PRRT_PACKET_DATA_HEADER_SIZE);
const char *s1 = (const char *) ((char *) refPackets[idx]->payload + PRRT_PACKET_DATA_HEADER_SIZE);
const char *s2 = (const char *) ((char *) ptr->payload + PRRT_PACKET_DATA_HEADER_SIZE);
prrtPacketLength_t refLength = ((PrrtPacketDataPayload*) refPackets[idx]->payload)->dataLength;
prrtPacketLength_t ptrLength = ((PrrtPacketDataPayload*) ptr->payload)->dataLength;
ASSERT_STREQ(s1, s2);
ASSERT_EQ(refLength, ptrLength);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment