Commit 6e175dd1 authored by Andreas Schmidt's avatar Andreas Schmidt

Fix payload issue. Start with threads.

parent 8af874a3
......@@ -8,4 +8,7 @@
#define MAX_PAYLOAD_LENGTH 65528 // maximum UDP packet length (2^16 - 8)
#define TRUE 1
#define FALSE 0
#endif //PRRT_DEFINES_H
......@@ -36,11 +36,17 @@ int print_packet(prrt_packet *packet_ptr) {
else if(type == PACKET_TYPE_FEEDBACK) {
prrt_packet_feedback_payload * payload = packet_ptr->payload;
printf("| %61u |\n", payload->group_round_trip_time);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->forward_trip_time);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->packet_loss_rate);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %29u | %29u |\n", payload->gap, payload->ngap);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %29u | %29u |\n", payload->burst, payload->nburst);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->bandwidth);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
printf("| %61u |\n", payload->buffer_feedback);
printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
} else {
......@@ -71,10 +77,10 @@ int encode_packet(void *buf_ptr, uint16_t buf_size, prrt_packet *packet) {
uint8_t type = packet_type(packet);
if(type == PACKET_TYPE_DATA) {
memcpy(buf_ptr, payload, PRRT_PACKET_DATA_HEADER_SIZE);
buf_ptr += PRRT_PACKET_DATA_HEADER_SIZE;
memcpy(buf_ptr, payload + PRRT_PACKET_DATA_HEADER_SIZE, packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
// TODO: use network order
memcpy(buf_ptr, payload, packet->payload_len);
} else if (type == PACKET_TYPE_FEEDBACK) {
// TODO: use network order
memcpy(buf_ptr, payload, PRRT_PACKET_FEEDBACK_SIZE);
} else {
perror("NOT IMPLEMENTED");
......@@ -91,14 +97,10 @@ int decode_packet(void *buf_ptr, uint16_t buf_size, prrt_packet *packet_ptr) {
uint16_t* seqno_prt = (uint16_t*) (buf_ptr+2);
packet_ptr->seqno = ntohs(*seqno_prt);
// TODO: packet specific transformation
unsigned char* payload_buffer = malloc(payload_len);
memcpy(payload_buffer, buf_ptr + PRRT_PACKET_GENERAL_HEADER_SIZE, payload_len);
printf("%s\n", payload_buffer + PRRT_PACKET_DATA_HEADER_SIZE);
packet_ptr->payload = payload_buffer;
packet_ptr->payload_len = payload_len;
......@@ -115,35 +117,39 @@ int delete_packet(prrt_packet *packet_ptr) {
// PACKET SPECIFIC CODE
int create_packet_data(prrt_packet *packet_ptr, uint8_t priority, const void *data_ptr, unsigned long data_len) {
uint32_t payload_length = (uint32_t) (data_len + sizeof(prrt_packet_source_payload));
packet_ptr->type_priority = PACKET_TYPE_DATA << 4;
packet_ptr->type_priority |= priority & 0x0F;
packet_ptr->index = 17; // TODO: replace with sensible value
packet_ptr->seqno = 4711; // TODO: replace with sensible value
prrt_packet_source_payload * payload = malloc(sizeof(prrt_packet_source_payload));
void *content_buf = malloc(sizeof(prrt_packet_source_payload) + data_len);
prrt_packet_source_payload * source_payload = content_buf;
struct timeval tv;
gettimeofday(&tv,NULL);
unsigned long time_in_micros = (unsigned long) (1000000 * tv.tv_sec + tv.tv_usec);
payload->timestamp = (uint32_t) time_in_micros;
source_payload->timestamp = (uint32_t) time_in_micros;
// TODO: payload->rtt = CURRENT ESTIMATE
// TODO: payload->packet_timeout = NOW + maximum delay
// TODO: payload->decoding_timeout
// TODO: payload->feedback_timer
packet_ptr->payload = (void*) payload;
memcpy(payload + sizeof(prrt_packet_source_payload), data_ptr, data_len);
packet_ptr->payload_len = (uint32_t) (data_len + sizeof(prrt_packet_source_payload));
packet_ptr->payload = content_buf;
memcpy(content_buf + sizeof(prrt_packet_source_payload), data_ptr, data_len);
packet_ptr->payload_len = payload_length;
return 0;
}
prrt_packet *create_packet_feedback(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t receiver_addr,
uint32_t round_trip_time, uint32_t forward_trip_time, uint32_t packet_loss_rate,
uint16_t gap, uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth,
uint32_t buffer_feedback) {
prrt_packet *create_packet_feedback(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t round_trip_time,
uint32_t forward_trip_time, uint32_t packet_loss_rate, uint16_t gap, uint16_t ngap,
uint16_t burst, uint16_t nburst, uint32_t bandwidth, uint32_t buffer_feedback) {
prrt_packet *packet_ptr = malloc(sizeof(prrt_packet));
packet_ptr->type_priority = PACKET_TYPE_FEEDBACK << 4;
packet_ptr->type_priority |= priority & 0x0F;
......@@ -152,7 +158,6 @@ prrt_packet *create_packet_feedback(uint8_t priority, uint8_t index, uint16_t se
prrt_packet_feedback_payload * payload = malloc(sizeof(prrt_packet_feedback_payload));
payload->receiver_addr = receiver_addr;
payload->group_round_trip_time = round_trip_time;
payload->forward_trip_time = forward_trip_time;
payload->packet_loss_rate = packet_loss_rate;
......
......@@ -61,10 +61,9 @@ int create_packet_repetition(prrt_packet packet, uint8_t priority);
int create_packet_parity(prrt_packet packet, uint8_t priority);
prrt_packet *create_packet_feedback(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t receiver_addr,
uint32_t round_trip_time, uint32_t forward_trip_time, uint32_t packet_loss_rate,
uint16_t gap, uint16_t ngap, uint16_t burst, uint16_t nburst, uint32_t bandwidth,
uint32_t buffer_feedback);
prrt_packet *create_packet_feedback(uint8_t priority, uint8_t index, uint16_t seqno, uint32_t round_trip_time,
uint32_t forward_trip_time, uint32_t packet_loss_rate, uint16_t gap, uint16_t ngap,
uint16_t burst, uint16_t nburst, uint32_t bandwidth, uint32_t buffer_feedback);
int encode_packet_feedback(void *buf_ptr, uint16_t buf_size, prrt_packet *pkt_ptr);
int decode_packet_feedback(void* buf_ptr, uint16_t buf_size, prrt_packet *packet_ptr);
......
......@@ -4,11 +4,29 @@
#include <unistd.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <pthread.h>
#include "../defines.h"
#include "socket.h"
#include "../util/common.h"
int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port) {
void *receive_loop(void *ptr) {
char bufin[MAX_PAYLOAD_LENGTH];
prrt_socket *sock_ptr = ptr;
printf("RECEIVING:\n");
while(1) {
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
prrt_packet *t = prrt_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
if(t != NULL) {
print_packet(t);
delete_packet(t);
}
usleep(1000);
}
}
int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port, uint8_t is_sender) {
// Create Data Socket
if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("cannot create socket");
......@@ -44,6 +62,17 @@ int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port) {
return -1;
}
if(is_sender) {
int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_loop, (void*) sock_ptr);
if(rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
} else {
}
return 0;
}
......@@ -120,13 +149,10 @@ prrt_packet *prrt_recv(prrt_socket *sock_ptr) {
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
uint32_t addr = get_ip_from_socket(sock_ptr->fd_data);
prrt_packet* feedback_pkt_ptr = create_packet_feedback(0, 19, 4715, addr, 5, 0, 50, 0, 0, 0, 0, 5, 1);
prrt_packet* feedback_pkt_ptr = create_packet_feedback(0, 19, 4715, 5, 0, 50, 0, 0, 0, 0, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
uint32_t length = packet_size(feedback_pkt_ptr);
......@@ -143,6 +169,8 @@ prrt_packet *prrt_recv(prrt_socket *sock_ptr) {
}
int prrt_close_socket(prrt_socket *sock_ptr) {
// TODO: shut down threads;
// TODO: clean up all receivers
close(sock_ptr->fd_data);
......@@ -156,6 +184,11 @@ prrt_packet * prrt_recv_feedback(prrt_socket *sock_ptr, void *bufin, size_t leng
socklen_t addrlen = sizeof(remote);
n = recvfrom(sock_ptr->fd_feedback, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
printf("SIZE: %d\n", n);
if(n < 0) {
perror("Test");
return NULL;
}
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
printf("RECV FEEDBACK: %s:%d\n", remote_host, remote_port);
......
......@@ -6,6 +6,7 @@
#include <stdio.h>
#include "../defines.h"
#include "packet.h"
#include <pthread.h>
typedef struct {
char* host_name;
......@@ -15,6 +16,8 @@ typedef struct {
typedef struct {
int fd_data;
int fd_feedback;
pthread_t receive_thread;
pthread_t send_thread;
prrt_receiver receivers[PRRT_MAX_RECEIVER_COUNT];
int receiver_len;
uint16_t seqno_source;
......@@ -24,7 +27,7 @@ typedef struct {
} prrt_socket;
int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port);
int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port, uint8_t is_sender);
int prrt_close_socket(prrt_socket *sock_ptr);
int prrt_connect(prrt_socket *sock_ptr, char *host, uint16_t port);
int prrt_send(prrt_socket *sock_ptr, const void *data, size_t data_len);
......
......@@ -16,7 +16,7 @@ int main(int argc, char* const argv[]) {
prrt_packet *pkt;
printf("PRRT - RECEIVER\n");
if(prrt_create_socket(&sock, port) < 0) {
if(prrt_create_socket(&sock, port, 0) < 0) {
perror("could not create socket");
return 0;
}
......
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include "prrt/socket.h"
#define MAXBUF 1024 * 1024
int main(int argc, char* const argv) {
if(argc != 1) {
printf("Too few arguments.\n");
......@@ -14,7 +13,7 @@ int main(int argc, char* const argv) {
prrt_socket sock = {};
printf("PRRT - SENDER\n");
if(prrt_create_socket(&sock, local_port) < 0) {
if(prrt_create_socket(&sock, local_port, TRUE) < 0) {
perror("socket failed");
return 0;
}
......@@ -31,15 +30,8 @@ int main(int argc, char* const argv) {
char *message = "this is a message";
prrt_send(&sock, message, strlen(message));
char bufin[MAXBUF];
prrt_packet *t = prrt_recv_feedback(&sock, bufin, MAXBUF);
//print_packet(t);
delete_packet(t);
t = prrt_recv_feedback(&sock, bufin, MAXBUF);
//print_packet(t);
delete_packet(t);
usleep(5000*1000);
prrt_close_socket(&sock);
return 0;
}
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include "prrt/packet.h"
#include "defines.h"
#include "util/common.h"
int main() {
char *message = "this is a message";
prrt_packet * packet = malloc(sizeof(prrt_packet));
create_packet_data(packet, 0, message, strlen(message));
#define NUM_THREADS 10
print_packet(packet);
void *PrintHello(void *threadid) {
long tid;
tid = (long) threadid;
printf("Hello World! It's me, thread #%ld!\n", tid);
pthread_exit(NULL);
}
char buf[MAX_PAYLOAD_LENGTH];
uint16_t length = packet_size(packet);
if(encode_packet(buf, MAX_PAYLOAD_LENGTH, packet) < 0) {
perror("BUF too small.");
return -1;
int main() {
pthread_t threads[NUM_THREADS];
int rc;
long t;
for(t=0; t<NUM_THREADS; t++) {
printf("In main: creating thread: %ld\n", t);
rc = pthread_create(&threads[t], NULL, PrintHello, (void*) t);
if(rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
print_buffer(buf, length);
pthread_exit(NULL);
return 0;
}
\ No newline at end of file
......@@ -11,11 +11,4 @@ int print_buffer(char* buf, int length) {
printf("%2x", buf[i]);
}
printf("\n");
}
uint32_t get_ip_from_socket(int fd) {
struct sockaddr_in addr;
socklen_t addr_size = sizeof(struct sockaddr_in);
int res = getpeername(fd, (struct sockaddr *)&addr, &addr_size);
return addr.sin_addr.s_addr;
}
\ No newline at end of file
......@@ -2,6 +2,5 @@
#define PRRT_COMMON_H
int print_buffer(char* buf, int length);
uint32_t get_ip_from_socket(int fd);
#endif //PRRT_COMMON_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment