Commit 275ffc5d authored by Andreas Schmidt's avatar Andreas Schmidt

Created macros for packet payload get and set.

parent 0c35bba7
#include <string.h>
#include <src/defines.h>
#include <src/prrt/vdmcode/block_code.h>
#include <stdlib.h>
#include <src/util/list.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "block.h"
#include "coding_params.h"
#include "packet.h"
void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p) {
int m = block_ptr->data_count;
LIST_FOREACH(block_ptr->redundancy_blocks, first, next, cur) {
PrrtPacket* packet = cur->value;
memcpy(fec[m], packet->payload + PRRT_PACKET_REDUNDANCY_HEADER_SIZE, packet->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m] = packet->index;
m++;
}
......@@ -24,7 +21,7 @@ void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p) {
uint32_t count = block_ptr->data_count;
for (m = 0; m < count; ++m) {
PrrtPacket* packet = List_shift(block_ptr->data_blocks);
memcpy(fec[m], packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[m] = packet->index;
m++;
}
......@@ -107,7 +104,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno) {
memset(src[j], 0, sizeof(gf) * length);
PrrtPacket* pkt = cur->value;
pkt->index = (uint8_t) j;
memcpy(src[j], pkt->payload + PRRT_PACKET_DATA_HEADER_SIZE, pkt->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
......
......@@ -90,15 +90,13 @@ int PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
uint8_t type = PrrtPacket_type(packet_ptr);
if (type == PACKET_TYPE_DATA) {
buf_ptr = encode_data_header(buf_ptr, payload);
memcpy(buf_ptr, payload + PRRT_PACKET_DATA_HEADER_SIZE, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_DATA_HEADER_SIZE);
} else if (type == PACKET_TYPE_REDUNDANCY) {
buf_ptr = encode_redundancy_header(buf_ptr, payload);
memcpy(buf_ptr, payload + PRRT_PACKET_REDUNDANCY_HEADER_SIZE,
packet_ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if (type == PACKET_TYPE_FEEDBACK) {
buf_ptr = encode_feedback_header(buf_ptr, payload);
memcpy(buf_ptr, payload + PRRT_PACKET_FEEDBACK_HEADER_SIZE,
packet_ptr->payload_len - PRRT_PACKET_FEEDBACK_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_FEEDBACK_HEADER_SIZE);
} else {
perror("NOT IMPLEMENTED");
return -1;
......@@ -233,15 +231,13 @@ int PrrtPacket_decode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
if (PrrtPacket_type(packet_ptr) == PACKET_TYPE_DATA) {
buf_ptr = decode_data_header(buf_ptr, payload_buffer);
memcpy(payload_buffer + PRRT_PACKET_DATA_HEADER_SIZE, buf_ptr, payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_buffer_to_payload(packet_ptr, buf_ptr, PRRT_PACKET_DATA_HEADER_SIZE);
} else if (PrrtPacket_type(packet_ptr)) {
buf_ptr = decode_redundancy_header(buf_ptr, payload_buffer);
memcpy(payload_buffer + PRRT_PACKET_REDUNDANCY_HEADER_SIZE, buf_ptr,
payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
PrrtPacket_copy_buffer_to_payload(packet_ptr, buf_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
} else if (PrrtPacket_type(packet_ptr) == PACKET_TYPE_FEEDBACK) {
buf_ptr = decode_feedback_header(buf_ptr, payload_buffer);
memcpy(payload_buffer + PRRT_PACKET_FEEDBACK_HEADER_SIZE, buf_ptr,
payload_len - PRRT_PACKET_FEEDBACK_HEADER_SIZE);
PrrtPacket_copy_buffer_to_payload(packet_ptr, buf_ptr, PRRT_PACKET_FEEDBACK_HEADER_SIZE);
} else {
printf("NOT IMPLEMENTED\n");
}
......@@ -350,7 +346,7 @@ int PrrtPacket_destroy(PrrtPacket *packet_ptr) {
int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr,
uint32_t data_len, uint16_t seqno) {
uint32_t payload_length = (uint32_t) (data_len + sizeof(PrrtPacketDataPayload));
uint32_t payload_length = (uint32_t) (data_len + PRRT_PACKET_DATA_HEADER_SIZE);
packet_ptr->type_priority = PACKET_TYPE_DATA << 4;
packet_ptr->type_priority |= priority & 0x0F;
......@@ -373,8 +369,8 @@ int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, cons
source_payload->feedback_timeout = 170; // TODO: payload->feedback_timer
packet_ptr->payload = content_buf;
memcpy(content_buf + sizeof(PrrtPacketDataPayload), data_ptr, data_len);
packet_ptr->payload_len = payload_length;
PrrtPacket_copy_buffer_to_payload(packet_ptr, data_ptr, PRRT_PACKET_DATA_HEADER_SIZE)
error:
return -1;
......@@ -403,8 +399,8 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *data_ptr
redundancy_payload->n = codingParams.n;
packet_ptr->payload = content_buf;
memcpy(content_buf + sizeof(PrrtPacketRedundancyPayload), data_ptr, data_len);
packet_ptr->payload_len = payload_length;
PrrtPacket_copy_buffer_to_payload(packet_ptr, data_ptr, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return packet_ptr;
......
......@@ -78,4 +78,7 @@ int PrrtPacket_decode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr);
int PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr);
int PrrtPacket_destroy(PrrtPacket *packet_ptr);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload_len - header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload_len - header_size);
#endif //PRRT_FRAME_H
......@@ -5,12 +5,39 @@
#include <src/util/dbg.h>
#include <src/prrt/socket.h>
#include <src/prrt/block.h>
#include <src/prrt/packet.h>
#include "data_receiver.h"
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno);
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block) {
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet(block, packetPtr);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, packetPtr->seqno);
}
}
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if (block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->coding_params.k, block);
PrrtBlock_decode(block);
LIST_FOREACH(block->data_blocks, first, next, cur) {
PrrtPacket *pkt = cur->value;
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
printf("RECOVERED %d via coding.\n", pkt->seqno);
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
}
}
}
}
int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
struct sockaddr_in targetaddr;
......@@ -37,7 +64,7 @@ int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int
return EXIT_FAILURE;
}
void *receive_data_loop(void *ptr) {
void receive_data_loop(void *ptr) {
ssize_t n;
uint16_t remote_port;
char *remote_host;
......@@ -114,42 +141,8 @@ void *receive_data_loop(void *ptr) {
//PrrtPacket_print(packet);
break;
}
}
error:
free(packet);
}
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if (block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->coding_params.k, block);
PrrtBlock_decode(block);
LIST_FOREACH(block->data_blocks, first, next, cur) {
PrrtPacket *pkt = cur->value;
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
printf("RECOVERED %d via coding.\n", pkt->seqno);
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
}
}
}
}
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block) {
List *res = List_create();
BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet(block, packetPtr);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, packetPtr->seqno);
}
}
\ No newline at end of file
#ifndef PRRT_DATA_RECEIVER_H
#define PRRT_DATA_RECEIVER_H
void *receive_data_loop(void *ptr);
void receive_data_loop(void *ptr);
#endif //PRRT_DATA_RECEIVER_H
......@@ -108,7 +108,7 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, len);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
pthread_mutex_unlock(&t);
return len;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment