Commit 898492fb authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Accumulating and coding packets before sending.

parent 068fb4ac
...@@ -11,4 +11,10 @@ ...@@ -11,4 +11,10 @@
#define TRUE 1 #define TRUE 1
#define FALSE 0 #define FALSE 0
#define GF_BITS 8
#define K_START 2
#define N_START 4
#define N_P_START 1
#endif //PRRT_DEFINES_H #endif //PRRT_DEFINES_H
#include <string.h>
#include <stdio.h>
#include "block.h" #include "block.h"
#include "../defines.h"
#include "coding_params.h"
#include "vdmcode/block_code.h"
#include "packet.h"
#include "../util/common.h"
void PrrtBlock_free(PrrtBlock **mblock) { void PrrtBlock_free(PrrtBlock **mblock) {
...@@ -8,5 +15,69 @@ void PrrtBlock_free(PrrtBlock **mblock) { ...@@ -8,5 +15,69 @@ void PrrtBlock_free(PrrtBlock **mblock) {
int PrrtBlock_alloc(PrrtBlock **mblock, PrrtCodingParams *cpar) { int PrrtBlock_alloc(PrrtBlock **mblock, PrrtCodingParams *cpar) {
*mblock = malloc(sizeof(PrrtBlock)); *mblock = malloc(sizeof(PrrtBlock));
(*mblock)->coding_params = *cpar;
(*mblock)->data_blocks =List_create();
(*mblock)->redundancy_blocks =List_create();
return 0; return 0;
} }
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, PrrtPacket *packet_ptr) {
List_push(block_ptr->data_blocks, packet_ptr);
block_ptr->data_count++;
block_ptr->largest_data_length = MAX(block_ptr->largest_data_length, packet_ptr->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return 0;
}
int PrrtBlock_ready(PrrtBlock *block_ptr) {
return (block_ptr->data_count == block_ptr->coding_params.k) ? TRUE : FALSE;
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr) {
block_ptr->data_count--;
return List_shift(block_ptr->data_blocks);
}
void PrrtBlock_code(PrrtBlock *block_ptr) {
printf("CODING\n");
int j = 0, m = 0;
int k = block_ptr->coding_params.k;
int n = block_ptr->coding_params.n;
int length = block_ptr->largest_data_length;
PrrtCoder* coder = NULL;
PrrtCoder_get_coder(&coder, n, k);
gf** src = malloc(sizeof(gf*) * k);
LIST_FOREACH(block_ptr->data_blocks, first, next, cur) {
src[j] = malloc(sizeof(gf) * length);
memset(src[j], 0, sizeof(gf) * length);
PrrtPacket* pkt = cur->value;
PrrtPacket_print(pkt);
PrrtPacketDataPayload *payload = pkt->payload;
memcpy(src[j], payload + PRRT_PACKET_DATA_HEADER_SIZE, pkt->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
for(j = 0; j < k; j++) {
print_gf(src[j], length);
}
return;
gf** fec = malloc(sizeof(gf*) * n);
for(j = 0; j < n; j++) {
fec[j] = malloc(sizeof(gf) * length);
memset(fec[j], 0, sizeof(gf) * length);
PrrtCoder_encode(coder, src, fec[j], j, length); // gf **src, gf *fec, int index, int sz
print_gf(fec[j], length);
}
}
PrrtPacket *PrrtBlock_get_red_data(PrrtBlock *block_ptr) {
return NULL;
}
\ No newline at end of file
...@@ -3,14 +3,16 @@ ...@@ -3,14 +3,16 @@
#include "../util/list.h" #include "../util/list.h"
#include "coding_params.h" #include "coding_params.h"
#include "packet.h"
typedef struct { typedef struct {
int data_count; uint32_t data_count;
int redundancy_count; uint32_t redundancy_count;
PrrtCodingParams coding_params; PrrtCodingParams coding_params;
uint32_t largest_data_length;
List* data_blocks; List* data_blocks;
List* redundancy_blocks; List* redundancy_blocks;
short is_coded; uint8_t is_coded;
} PrrtBlock; } PrrtBlock;
...@@ -24,5 +26,14 @@ int PrrtBlock_alloc(PrrtBlock **mblock, PrrtCodingParams *cpar); ...@@ -24,5 +26,14 @@ int PrrtBlock_alloc(PrrtBlock **mblock, PrrtCodingParams *cpar);
*/ */
void PrrtBlock_free(PrrtBlock **mblock); void PrrtBlock_free(PrrtBlock **mblock);
int PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, PrrtPacket *packet_ptr);
int PrrtBlock_ready(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
void PrrtBlock_code(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_red_data(PrrtBlock *block_ptr);
#endif //PRRT_BLOCK_H #endif //PRRT_BLOCK_H
//
// Created by andreas on 09.02.16.
//
#include "coding_params.h" #include "coding_params.h"
#include "../defines.h"
void PrrtCodingParams_init(PrrtCodingParams *cpar) {
pthread_mutex_init(&cpar->lock, NULL);
cpar->k = K_START;
cpar->n = N_START;
cpar->r = cpar->n - cpar->k;
cpar->n_p = N_P_START;
}
...@@ -5,8 +5,17 @@ ...@@ -5,8 +5,17 @@
#ifndef PRRT_CODING_PARAMS_H #ifndef PRRT_CODING_PARAMS_H
#define PRRT_CODING_PARAMS_H #define PRRT_CODING_PARAMS_H
#include <pthread.h>
typedef struct { typedef struct {
pthread_mutex_t lock;
int k;
int r;
int n;
int n_p;
} PrrtCodingParams; } PrrtCodingParams;
#endif //PRRT_CODING_PARAMS_H #endif //PRRT_CODING_PARAMS_H
void PrrtCodingParams_init(PrrtCodingParams *cpar);
...@@ -4,46 +4,75 @@ ...@@ -4,46 +4,75 @@
#include <netdb.h> #include <netdb.h>
#include "data_transmitter.h" #include "data_transmitter.h"
#include "socket.h" #include "socket.h"
#include "block.h"
void * send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
while(1) {
pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
while(List_count(sock_ptr->out_queue) == 0) {
pthread_cond_wait(&sock_ptr->out_queue_filled_cv, &sock_ptr->out_queue_filled_mutex);
}
PrrtPacket * packet = List_shift(sock_ptr->out_queue); int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) {
int err = 0;
uint8_t buf[MAX_PAYLOAD_LENGTH];
uint32_t length = PrrtPacket_size(data_pkt);
if (PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, data_pkt) < 0) {
perror("BUF too small.");
exit(0);
}
// SENDING TO ALL RECEIVERS
int i;
for (i = 0; i < sock_ptr->receiver_len; i++) {
PrrtReceiver recv = sock_ptr->receivers[i];
struct hostent *hp;
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons(recv.port);
uint8_t buf[MAX_PAYLOAD_LENGTH]; hp = gethostbyname(recv.host_name);
uint32_t length = PrrtPacket_size(packet); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet) < 0) {
perror("BUF too small."); if ((sendto(sock_ptr->fd_data, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
exit(0); perror("sendto failed");
exit(1);
} }
}
// SENDING TO ALL RECEIVERS PrrtPacket_destroy(data_pkt);
int i;
for(i = 0; i < sock_ptr->receiver_len; i++) {
PrrtReceiver recv = sock_ptr->receivers[i];
struct hostent *hp; return err;
}
void *send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
PrrtCodingParams *cpar = malloc(sizeof(PrrtCodingParams));
PrrtCodingParams_init(cpar);
PrrtBlock_alloc(&block, cpar);
while (1) {
pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
while (List_count(sock_ptr->out_queue) == 0) {
pthread_cond_wait(&sock_ptr->out_queue_filled_cv, &sock_ptr->out_queue_filled_mutex);
}
struct sockaddr_in targetaddr; PrrtPacket *packet = List_shift(sock_ptr->out_queue);
memset((char*) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons(recv.port);
hp = gethostbyname(recv.host_name); PrrtBlock_insert_data_packet(block, packet);
memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if((sendto(sock_ptr->fd_data, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) { if (PrrtBlock_ready(block)) {
perror("sendto failed"); int j = 0;
exit(1); PrrtBlock_code(block);
break;
for (j = 0; j < (block)->data_count; j++) {
PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
send_packet(sock_ptr, data_pkt);
}
for (j = 0; j < (block)->redundancy_count; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_red_data(block);
send_packet(sock_ptr, red_pkt);
} }
} }
PrrtPacket_destroy(packet);
pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex); pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
usleep(1); usleep(1);
......
...@@ -288,13 +288,13 @@ int PrrtPacket_destroy(PrrtPacket *packet_ptr) { ...@@ -288,13 +288,13 @@ int PrrtPacket_destroy(PrrtPacket *packet_ptr) {
// PACKET SPECIFIC CODE // PACKET SPECIFIC CODE
int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr, int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr,
unsigned long data_len) { unsigned long data_len, int seqno) {
uint32_t payload_length = (uint32_t) (data_len + sizeof(PrrtPacketDataPayload)); uint32_t payload_length = (uint32_t) (data_len + sizeof(PrrtPacketDataPayload));
packet_ptr->type_priority = PACKET_TYPE_DATA << 4; packet_ptr->type_priority = PACKET_TYPE_DATA << 4;
packet_ptr->type_priority |= priority & 0x0F; packet_ptr->type_priority |= priority & 0x0F;
packet_ptr->index = 17; // TODO: replace with sensible value packet_ptr->index = 17; // TODO: replace with sensible value
packet_ptr->seqno = 4711; // TODO: replace with sensible value packet_ptr->seqno = seqno; // TODO: replace with sensible value
void *content_buf = malloc(sizeof(PrrtPacketDataPayload) + data_len); void *content_buf = malloc(sizeof(PrrtPacketDataPayload) + data_len);
assert(content_buf != NULL); assert(content_buf != NULL);
......
...@@ -54,7 +54,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr); ...@@ -54,7 +54,7 @@ int PrrtPacket_print(PrrtPacket *packet_ptr);
int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr, int PrrtPacket_create_data_packet(PrrtPacket *packet_ptr, uint8_t priority, const void *data_ptr,
unsigned long data_len); unsigned long data_len, int seqno);
PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t round_trip_time, PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t round_trip_time,
uint32_t forward_trip_time, uint32_t packet_loss_rate, uint16_t gap, uint32_t forward_trip_time, uint32_t packet_loss_rate, uint16_t gap,
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#include "data_transmitter.h" #include "data_transmitter.h"
int PrrtSocket_create(PrrtSocket *sock_ptr, uint16_t port, uint8_t is_sender) { int PrrtSocket_create(PrrtSocket *sock_ptr, uint16_t port, uint8_t is_sender) {
sock_ptr->seqno_source = 1;
// Create Data Socket // Create Data Socket
if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("cannot create socket"); perror("cannot create socket");
...@@ -89,7 +91,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, size_t data_len) { ...@@ -89,7 +91,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, size_t data_len) {
PrrtPacket * packet = malloc(sizeof(PrrtPacket)); PrrtPacket * packet = malloc(sizeof(PrrtPacket));
assert(packet != NULL); assert(packet != NULL);
PrrtPacket_create_data_packet(packet, 5, data, data_len); PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->seqno_source++);
List_push(sock_ptr->out_queue, packet); List_push(sock_ptr->out_queue, packet);
pthread_cond_signal(&sock_ptr->out_queue_filled_cv); pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
......
...@@ -56,9 +56,9 @@ ...@@ -56,9 +56,9 @@
#error "GF_BITS must be 2 .. 16" #error "GF_BITS must be 2 .. 16"
#endif #endif
#if (GF_BITS <= 8) #if (GF_BITS <= 8)
typedef unsigned char gf; typedef uint8_t gf;
#else #else
typedef unsigned short gf; typedef uint16_t gf;
#endif #endif
#define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */ #define GF_SIZE ((1 << GF_BITS) - 1) /* powers of \alpha */
......
...@@ -29,7 +29,7 @@ int main(int argc, char* const argv) { ...@@ -29,7 +29,7 @@ int main(int argc, char* const argv) {
printf("SENDING\n"); printf("SENDING\n");
for(i= 0; i < 5; i++) { for(i= 0; i < 4; i++) {
char buf[30]; char buf[30];
sprintf(buf, "this is a message %d", i); sprintf(buf, "this is a message %d", i);
PrrtSocket_send(&sock, buf, strlen(buf)); PrrtSocket_send(&sock, buf, strlen(buf));
......
...@@ -7,19 +7,6 @@ ...@@ -7,19 +7,6 @@
#include "util/common.h" #include "util/common.h"
#include "prrt/vdmcode/block_code.h" #include "prrt/vdmcode/block_code.h"
void print_gf(gf* start, int len) {
printf("GF: ");
int i;
for (i = 0; i < len; i++) {
gf s = *(start + i);
printf("%d", s);
if(i != len-1) {
printf(",");
}
}
printf("\n");
}
int main() { int main() {
char* msg = "This is a test message."; char* msg = "This is a test message.";
......
...@@ -11,4 +11,17 @@ int print_buffer(char* buf, int length) { ...@@ -11,4 +11,17 @@ int print_buffer(char* buf, int length) {
printf("%2x", buf[i]); printf("%2x", buf[i]);
} }
printf("\n"); printf("\n");
}
void print_gf(gf* start, int len) {
printf("GF: ");
int i;
for (i = 0; i < len; i++) {
gf s = *(start + i);
printf("%d", s);
if(i != len-1) {
printf(",");
}
}
printf("\n");
} }
\ No newline at end of file
#ifndef PRRT_COMMON_H #ifndef PRRT_COMMON_H
#define PRRT_COMMON_H #define PRRT_COMMON_H
#include "../prrt/vdmcode/block_code.h"
int print_buffer(char* buf, int length); int print_buffer(char* buf, int length);
void print_gf(gf* start, int len);
#define PERROR(fmt, args...) \ #define PERROR(fmt, args...) \
printf("PRRT ERROR: " fmt, ## args); printf("PRRT ERROR: " fmt, ## args);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment