Commit 63374a82 authored by Andreas Schmidt's avatar Andreas Schmidt

Decouple data receiving via queue. Add macros for debugging and error handling.

parent 05b315f8
add_library(PRRT ../defines.h socket.c block.c block.h packet.c packet.h processes/feedback_receiver.c processes/feedback_receiver.h processes/data_transmitter.c processes/data_transmitter.h coding_params.c coding_params.h vdmcode/block_code.c vdmcode/block_code.h coding_params.c coding_params.h collections/in_buffer.c collections/in_buffer.h stores/forward_packet_table.c stores/forward_packet_table.h)
\ No newline at end of file
add_library(PRRT ../defines.h socket.c block.c block.h packet.c packet.h processes/feedback_receiver.c processes/feedback_receiver.h processes/data_transmitter.c processes/data_transmitter.h coding_params.c coding_params.h vdmcode/block_code.c vdmcode/block_code.h coding_params.c coding_params.h collections/in_buffer.c collections/in_buffer.h stores/forward_packet_table.c stores/forward_packet_table.h processes/data_receiver.c processes/data_receiver.h)
\ No newline at end of file
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <prrt/socket.h>
#include "data_receiver.h"
int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) == EXIT_SUCCESS,
"Buffer for encoding feedback is too small");
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sending feedback failed.");
return EXIT_SUCCESS;
error:
return EXIT_FAILURE;
}
void *receive_data_loop(void *ptr) {
ssize_t n;
uint16_t remote_port;
char *remote_host;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
unsigned char buffer[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
PrrtPacket *packet;
while (1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
remote_port = ntohs(remote.sin_port);
remote_host = inet_ntoa(remote.sin_addr);
packet = malloc(sizeof(PrrtPacket));
check_mem(packet);
PrrtPacket_decode(buffer, (uint16_t) n, packet);
check(send_feedback(sock_ptr, remote_host, remote_port) == EXIT_SUCCESS, "Sending feedback failed.");
switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
// packet.timestamp + packet.timeout < now: break
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
FALSE) {
break;
}
// check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store
// forward to application layer
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
break;
default:
//PrrtPacket_print(packet);
break;
}
}
error:
free(packet);
}
\ No newline at end of file
#ifndef PRRT_DATA_RECEIVER_H
#define PRRT_DATA_RECEIVER_H
#include "../../util/dbg.h"
void *receive_data_loop(void *ptr);
#endif //PRRT_DATA_RECEIVER_H
......@@ -18,7 +18,7 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) {
// SENDING TO ALL RECEIVERS
int i;
for (i = 0; i < sock_ptr->receiver_len; i++) {
for (i = 0; i < sock_ptr->receiverLength; i++) {
PrrtReceiver recv = sock_ptr->receivers[i];
struct hostent *hp;
......@@ -31,7 +31,7 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) {
hp = gethostbyname(recv.host_name);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
if ((sendto(sock_ptr->fd_data, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
if ((sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
perror("sendto failed");
exit(1);
}
......@@ -51,18 +51,18 @@ void *send_data_loop(void *ptr) {
PrrtBlock_alloc(&block, cpar);
while (1) {
pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
while (List_count(sock_ptr->out_queue) == 0) {
pthread_cond_wait(&sock_ptr->out_queue_filled_cv, &sock_ptr->out_queue_filled_mutex);
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
while (List_count(sock_ptr->outQueue) == 0) {
pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
}
PrrtPacket *packet = List_shift(sock_ptr->out_queue);
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
PrrtBlock_insert_data_packet(block, packet);
if (PrrtBlock_ready(block)) {
int j = 0;
PrrtBlock_code(block, &sock_ptr->seqnoRedundancy);
PrrtBlock_code(block, &sock_ptr->sequenceNumberRedundancy);
uint32_t pkt_count = (block)->data_count;
for (j = 0; j < pkt_count; j++) {
......@@ -76,7 +76,7 @@ void *send_data_loop(void *ptr) {
}
}
pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
usleep(1);
}
}
#include <prrt/processes/data_receiver.h>
#include "socket.h"
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
sock_ptr->seqno_source = 1;
sock_ptr->seqnoRedundancy = 1;
sock_ptr->sequenceNumberSource = 1;
sock_ptr->sequenceNumberRedundancy = 1;
// Create Data Socket
if ((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("cannot create socket");
return -1;
}
// Create Feedback Socket
if ((sock_ptr->fd_feedback = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("cannot create socket");
return -1;
}
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
if (is_sender) {
// Bind Feedback Socket
......@@ -25,27 +17,14 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
if (bind(sock_ptr->fd_feedback, (struct sockaddr *) &address, sizeof(address)) < 0) {
perror("cannot bind socket");
return -1;
}
check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
pthread_mutex_init(&sock_ptr->out_queue_filled_mutex, NULL);
pthread_cond_init(&sock_ptr->out_queue_filled_cv, NULL);
pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
sock_ptr->outQueue = List_create();
sock_ptr->out_queue = List_create();
int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
rc = pthread_create(&sock_ptr->send_thread, NULL, send_data_loop, (void *) sock_ptr);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
} else {
// Bind Data Socket
struct sockaddr_in address;
......@@ -54,24 +33,32 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(port);
if (bind(sock_ptr->fd_data, (struct sockaddr *) &address, sizeof(address)) < 0) {
perror("cannot bind socket");
return -1;
}
check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
sock_ptr->forwardPacketTable = malloc(sizeof(PrrtForwardPacketTable));
PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
sock_ptr->inQueue = List_create();
check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
}
return 0;
return EXIT_SUCCESS;
error:
// TODO: cancel threads
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
return EXIT_FAILURE;
}
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
PrrtReceiver recv = {host, port};
if (sock_ptr->receiver_len < PRRT_MAX_RECEIVER_COUNT) {
sock_ptr->receivers[sock_ptr->receiver_len] = recv;
sock_ptr->receiver_len++;
if (sock_ptr->receiverLength < PRRT_MAX_RECEIVER_COUNT) {
sock_ptr->receivers[sock_ptr->receiverLength] = recv;
sock_ptr->receiverLength++;
} else {
return -1;
}
......@@ -80,85 +67,34 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
}
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
PrrtPacket *packet = malloc(sizeof(PrrtPacket));
assert(packet != NULL);
PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->seqno_source++);
PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->sequenceNumberSource++);
List_push(sock_ptr->out_queue, packet);
pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
List_push(sock_ptr->outQueue, packet);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
return 0;
}
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
unsigned char buffer[MAX_PAYLOAD_LENGTH];
while (1) {
// RECEIVE DATA
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
n = recvfrom(sock_ptr->fd_data, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
PrrtPacket *packet = malloc(sizeof(PrrtPacket));
assert(packet != NULL);
PrrtPacket_decode(buffer, (uint16_t) n, packet);
// REPLY
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
if (PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) < 0) {
perror("BUF too small.");
} else {
if ((sendto(sock_ptr->fd_feedback, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) <
0)) {
perror("sendto failed");
return NULL;
}
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
while (List_count(sock_ptr->inQueue) == 0) {
pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &sock_ptr->inQueueFilledMutex);
}
switch (PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
// packet.timestamp + packet.timeout < now: break
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
FALSE) {
break;
}
uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, len);
// check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store
// forward to application layer
memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE,
packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
default:
//PrrtPacket_print(packet);
break;
}
// TODO: enough packets received?
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
return len;
}
return 0;
}
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
......@@ -166,12 +102,12 @@ int PrrtSocket_close(const PrrtSocket *sock_ptr) {
// TODO: clean up all receivers
pthread_mutex_destroy(&sock_ptr->out_queue_filled_mutex);
pthread_cond_destroy(&sock_ptr->out_queue_filled_cv);
List_destroy(sock_ptr->out_queue);
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
List_destroy(sock_ptr->outQueue);
close(sock_ptr->fd_data);
close(sock_ptr->fd_feedback);
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
return 0;
}
......@@ -180,7 +116,7 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
n = recvfrom(sock_ptr->fd_feedback, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
if (n < 0) {
perror("RECVFROM FAIL");
return NULL;
......
......@@ -25,24 +25,29 @@ typedef struct {
} PrrtReceiver;
typedef struct {
int fd_data;
int fd_feedback;
pthread_t receive_thread;
int dataSocketFd;
int feedbackSocketFd;
pthread_t receiveFeedbackThread;
pthread_t send_thread;
pthread_mutex_t out_queue_filled_mutex;
pthread_cond_t out_queue_filled_cv;
List *out_queue;
pthread_t sendThread;
pthread_mutex_t outQueueFilledMutex;
pthread_cond_t outQueueFilledCv;
List *outQueue;
pthread_t receiveDataThread;
pthread_mutex_t inQueueFilledMutex;
pthread_cond_t inQueueFilledMutexCv;
List *inQueue;
PrrtForwardPacketTable* forwardPacketTable;
PrrtReceiver receivers[PRRT_MAX_RECEIVER_COUNT];
int receiver_len;
uint16_t packets_count;
uint16_t seqno_source;
uint16_t seqno_repetition;
uint16_t seqnoRedundancy;
uint16_t seqno_feedback;
int receiverLength;
uint16_t packetsCount;
uint16_t sequenceNumberSource;
uint16_t sequenceNumberRepetition;
uint16_t sequenceNumberRedundancy;
uint16_t sequenceNumberFeedback;
} PrrtSocket;
......
add_library(UTIL ../defines.h common.c common.h list.c list.h)
\ No newline at end of file
add_library(UTIL ../defines.h common.c common.h list.c list.h dbg.h)
\ No newline at end of file
// From: http://c.learncodethehardway.org/book/ex20.html
#ifndef PRRT_DBG_H
#define PRRT_DBG_H
#include <stdio.h>
#include <errno.h>
#include <string.h>
#ifdef NDEBUG
#define debug(M, ...)
#else
#define debug(M, ...) fprintf(stderr, "DEBUG %s:%d: " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#endif
#define clean_errno() (errno == 0 ? "None" : strerror(errno))
#define log_err(M, ...) fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__)
#define log_warn(M, ...) fprintf(stderr, "[WARN] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__)
#define log_info(M, ...) fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#define check(A, M, ...) if(!(A)) { log_err(M, ##__VA_ARGS__); errno=0; goto error; }
#define sentinel(M, ...) { log_err(M, ##__VA_ARGS__); errno=0; goto error; }
#define check_mem(A) check((A), "Out of memory.")
#define check_debug(A, M, ...) if(!(A)) { debug(M, ##__VA_ARGS__); errno=0; goto error; }
#endif //PRRT_DBG_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment