Commit 7c14c771 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Using lists to send outgoing packets in a separate thread.

parent b5840714
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -9,7 +9,7 @@ find_package (Threads)

add_subdirectory(prrt)
add_library(PRRT prrt/socket.c prrt/block.c prrt/block.h prrt/packet.c prrt/packet.h prrt/feedback_receiver.c prrt/feedback_receiver.h prrt/data_transmitter.c prrt/data_transmitter.h)
add_library(UTIL util/common.c util/common.h)
add_library(UTIL util/common.c util/common.h util/list.c util/list.h)

add_executable(sender sender.c)
add_executable(receiver receiver.c)
+38 −8
Original line number Diff line number Diff line
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <netdb.h>
#include "data_transmitter.h"
#include "socket.h"

void * send_data_loop(void *ptr) {
    prrt_socket *sock_ptr = ptr;
    while(1) {
        pthread_mutex_lock(&sock_ptr->is_data_available);
        while(sock_ptr->packets_count == 0) {
            pthread_cond_wait(&sock_ptr->is_data_available_cv, &sock_ptr->is_data_available);
        pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
        while(List_count(sock_ptr->out_queue) == 0) {
            pthread_cond_wait(&sock_ptr->out_queue_filled_cv, &sock_ptr->out_queue_filled_mutex);
        }
        // TODO: take a packet from the list and send it

        sock_ptr->packets_count--;
        printf("TAKING OUT (NOW: %d)\n", sock_ptr->packets_count);
        prrt_packet* packet = List_shift(sock_ptr->out_queue);

        uint8_t buf[MAX_PAYLOAD_LENGTH];
        uint32_t length = packet_size(packet);
        if(encode_packet(buf, MAX_PAYLOAD_LENGTH, packet) < 0) {
            perror("BUF too small.");
            exit(0);
        }

        // SENDING TO ALL RECEIVERS
        int i;
        for(i = 0; i < sock_ptr->receiver_len; i++) {
            prrt_receiver recv = sock_ptr->receivers[i];

            struct hostent *hp;

            struct sockaddr_in targetaddr;
            memset((char*) &targetaddr, 0, sizeof(targetaddr));
            targetaddr.sin_family = AF_INET;
            targetaddr.sin_port = htons(recv.port);

            hp = gethostbyname(recv.host_name);
            memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

            if((sendto(sock_ptr->fd_data, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
                perror("sendto failed");
                exit(1);
                break;
            }
        }
        delete_packet(packet);

        pthread_mutex_unlock(&sock_ptr->is_data_available);
        usleep(1000);
        pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
        usleep(1);
    }
}
+13 −45
Original line number Diff line number Diff line
@@ -47,8 +47,10 @@ int prrt_create_socket(prrt_socket *sock_ptr, uint16_t port, uint8_t is_sender)
    }

    if(is_sender) {
        pthread_mutex_init(&sock_ptr->is_data_available, NULL);
        pthread_cond_init(&sock_ptr->is_data_available_cv, NULL);
        pthread_mutex_init(&sock_ptr->out_queue_filled_mutex, NULL);
        pthread_cond_init(&sock_ptr->out_queue_filled_cv, NULL);

        sock_ptr->out_queue = List_create();

        int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
        if(rc) {
@@ -82,51 +84,16 @@ int prrt_connect(prrt_socket *sock_ptr, char *host, uint16_t port) {
}

int prrt_send(prrt_socket *sock_ptr, const void *data, size_t data_len) {
    pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);

    pthread_mutex_lock(&sock_ptr->is_data_available);

    sock_ptr->packets_count++;
    printf("ADDING (NOW: %d)\n", sock_ptr->packets_count);
    pthread_cond_signal(&sock_ptr->is_data_available_cv);

    // TODO: add a packet to the list

    pthread_mutex_unlock(&sock_ptr->is_data_available);

    int res = 0;
    prrt_packet* packet = malloc(sizeof(prrt_packet));
    create_packet_data(packet, 5, data, data_len);

    uint8_t buf[MAX_PAYLOAD_LENGTH];
    uint32_t length = packet_size(packet);
    if(encode_packet(buf, MAX_PAYLOAD_LENGTH, packet) < 0) {
        perror("BUF too small.");
        return -1;
    }

    // SENDING TO ALL RECEIVERS
    int i;
    for(i = 0; i < sock_ptr->receiver_len; i++) {
        prrt_receiver recv = sock_ptr->receivers[i];

        struct hostent *hp;

        struct sockaddr_in targetaddr;
        memset((char*) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
        targetaddr.sin_port = htons(recv.port);
    List_push(sock_ptr->out_queue, packet);
    pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
    pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);

        hp = gethostbyname(recv.host_name);
        memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

        if((sendto(sock_ptr->fd_data, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
            perror("sendto failed");
            res = -1;
            break;
        }
    }
    delete_packet(packet);
    return res;
    return 0;
}

prrt_packet *prrt_recv(prrt_socket *sock_ptr) {
@@ -175,8 +142,9 @@ int prrt_close_socket(prrt_socket *sock_ptr) {

    // TODO: clean up all receivers

    pthread_mutex_destroy(&sock_ptr->is_data_available);
    pthread_cond_destroy(&sock_ptr->is_data_available_cv);
    pthread_mutex_destroy(&sock_ptr->out_queue_filled_mutex);
    pthread_cond_destroy(&sock_ptr->out_queue_filled_cv);
    List_destroy(sock_ptr->out_queue);

    close(sock_ptr->fd_data);
    close(sock_ptr->fd_feedback);
+6 −2
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@
#include <stdio.h>
#include "../defines.h"
#include "packet.h"
#include "../util/list.h"
#include <pthread.h>

typedef struct {
@@ -17,9 +18,12 @@ typedef struct {
    int fd_data;
    int fd_feedback;
    pthread_t receive_thread;
    pthread_mutex_t is_data_available;
    pthread_cond_t is_data_available_cv;

    pthread_t send_thread;
    pthread_mutex_t out_queue_filled_mutex;
    pthread_cond_t out_queue_filled_cv;
    List *out_queue;

    prrt_receiver receivers[PRRT_MAX_RECEIVER_COUNT];
    int receiver_len;
    uint16_t packets_count;
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ int main(int argc, char* const argv[]) {
            buffer[pkt->payload_len - PRRT_PACKET_DATA_HEADER_SIZE] = '\0';
            printf("%s\n", buffer);
        }
        usleep(1000*1000);
        usleep(1);
    }

    prrt_close_socket(&sock);
Loading