Commit b5840714 authored by Andreas Schmidt's avatar Andreas Schmidt

Add a simple condition variable for fill level of outgoing queue.

parent 4eb78040
......@@ -8,7 +8,7 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
find_package (Threads)
add_subdirectory(prrt)
add_library(PRRT prrt/socket.c prrt/block.c prrt/block.h prrt/packet.c prrt/packet.h)
add_library(PRRT prrt/socket.c prrt/block.c prrt/block.h prrt/packet.c prrt/packet.h prrt/feedback_receiver.c prrt/feedback_receiver.h prrt/data_transmitter.c prrt/data_transmitter.h)
add_library(UTIL util/common.c util/common.h)
add_executable(sender sender.c)
......
#include <unistd.h>
#include "data_transmitter.h"
#include "socket.h"
void * send_data_loop(void *ptr) {
prrt_socket *sock_ptr = ptr;
while(1) {
pthread_mutex_lock(&sock_ptr->is_data_available);
while(sock_ptr->packets_count == 0) {
pthread_cond_wait(&sock_ptr->is_data_available_cv, &sock_ptr->is_data_available);
}
// TODO: take a packet from the list and send it
sock_ptr->packets_count--;
printf("TAKING OUT (NOW: %d)\n", sock_ptr->packets_count);
pthread_mutex_unlock(&sock_ptr->is_data_available);
usleep(1000);
}
}
#ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr);
#endif //PRRT_DATA_TRANSMITTER_H
#include <string.h>
#include <unistd.h>
#include "feedback_receiver.h"
#include "../defines.h"
#include "socket.h"
void *receive_feedback_loop(void *ptr) {
char bufin[MAX_PAYLOAD_LENGTH];
prrt_socket *sock_ptr = ptr;
while(1) {
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
prrt_packet *t = prrt_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
if(t != NULL) {
delete_packet(t);
}
usleep(1000);
}
}
\ No newline at end of file
#ifndef PRRT_FEEDBACK_RECEIVER_H
#define PRRT_FEEDBACK_RECEIVER_H
void *receive_feedback_loop(void *ptr);
#endif //PRRT_FEEDBACK_RECEIVER_H
......@@ -7,21 +7,8 @@
#include <pthread.h>
#include "../defines.h"
#include "socket.h"
void *receive_loop(void *ptr) {
char bufin[MAX_PAYLOAD_LENGTH];
prrt_socket *sock_ptr = ptr;
while(1) {
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
prrt_packet *t = prrt_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
if(t != NULL) {
delete_packet(t);
}
usleep(1000);
}
}
#include "feedback_receiver.h"
#include "data_transmitter.h"
int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port, uint8_t is_sender) {
// Create Data Socket
......@@ -60,12 +47,20 @@ int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port, uint8_t is_sender)
}
if(is_sender) {
int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_loop, (void*) sock_ptr);
pthread_mutex_init(&sock_ptr->is_data_available, NULL);
pthread_cond_init(&sock_ptr->is_data_available_cv, NULL);
int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
if(rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
rc = pthread_create(&sock_ptr->send_thread, NULL, send_data_loop, (void *) sock_ptr);
if(rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
} else {
}
......@@ -87,6 +82,17 @@ int prrt_connect(prrt_socket *sock_ptr, char *host, uint16_t port) {
}
int prrt_send(prrt_socket *sock_ptr, const void *data, size_t data_len) {
pthread_mutex_lock(&sock_ptr->is_data_available);
sock_ptr->packets_count++;
printf("ADDING (NOW: %d)\n", sock_ptr->packets_count);
pthread_cond_signal(&sock_ptr->is_data_available_cv);
// TODO: add a packet to the list
pthread_mutex_unlock(&sock_ptr->is_data_available);
int res = 0;
prrt_packet* packet = malloc(sizeof(prrt_packet));
create_packet_data(packet, 5, data, data_len);
......@@ -169,6 +175,9 @@ int prrt_close_socket(prrt_socket *sock_ptr) {
// TODO: clean up all receivers
pthread_mutex_destroy(&sock_ptr->is_data_available);
pthread_cond_destroy(&sock_ptr->is_data_available_cv);
close(sock_ptr->fd_data);
close(sock_ptr->fd_feedback);
return 0;
......
......@@ -17,9 +17,12 @@ typedef struct {
int fd_data;
int fd_feedback;
pthread_t receive_thread;
pthread_mutex_t is_data_available;
pthread_cond_t is_data_available_cv;
pthread_t send_thread;
prrt_receiver receivers[PRRT_MAX_RECEIVER_COUNT];
int receiver_len;
uint16_t packets_count;
uint16_t seqno_source;
uint16_t seqno_repetition;
uint16_t seqno_parity;
......
......@@ -38,5 +38,7 @@ int main(int argc, char* const argv[]) {
prrt_close_socket(&sock);
pthread_exit(NULL);
return 0;
}
......@@ -36,6 +36,7 @@ int main(int argc, char* const argv) {
usleep(5000*1000);
prrt_close_socket(&sock);
pthread_exit(NULL);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment