data_transmitter.c 3.47 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
5
6
7
8
9
#include "../../defines.h"
#include "../socket.h"
#include "../block.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
10
11
12
#include "data_transmitter.h"


13
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
14
    uint8_t buf[MAX_PAYLOAD_LENGTH];
15
16
17
    memset(buf, 0, sizeof(buf));
    uint32_t length = PrrtPacket_size(packet);

18
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
19
20

    // SENDING TO ALL RECEIVERS
21
22
    LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
        PrrtReceiver* recv = cur->value;
Andreas Schmidt's avatar
Andreas Schmidt committed
23

24
25
26
27
28
        struct hostent *hp;

        struct sockaddr_in targetaddr;
        memset((char *) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
29
        targetaddr.sin_port = htons(recv->port);
30

31
        hp = gethostbyname(recv->host_name);
Andreas Schmidt's avatar
Andreas Schmidt committed
32
        check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
33
34
        memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

35
36
        ssize_t sendtoRes = sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr));
        check(sendtoRes >= 0, "Sendto failed.")
37
    }
38

39
40
    PrrtPacket_destroy(packet);

41
    return true;
42

43
    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
44
        PERROR("Something is wrong%s.", "")
45
        return false;
46
47
}

48
void * send_data_loop(void *ptr) {
49
50
51
    PrrtSocket *sock_ptr = ptr;

    PrrtBlock *block = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
52
    PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
53
54
55
    PrrtCodingParams_init(cpar);

    while (1) {
56
57
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        while (List_count(sock_ptr->outQueue) == 0) {
58
59
60
            pthread_mutex_lock(&sock_ptr->closingMutex);
            if(sock_ptr->closing) {
                free(cpar);
Andreas Schmidt's avatar
Andreas Schmidt committed
61
62
63
                if(block != NULL) {
                    PrrtBlock_destroy(block);
                }
64
65
66
67
68
                pthread_mutex_unlock(&sock_ptr->closingMutex);
                pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
                return NULL;
            }
            pthread_mutex_unlock(&sock_ptr->closingMutex);
69
            pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
70
        }
71

72
        if(block == NULL) {
73
74
75
            block = calloc(1, sizeof(PrrtBlock));
            check_mem(block);

76
            PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberSource);
77
78
        }

79
        PrrtPacket *packet = List_shift(sock_ptr->outQueue);
80
        packet->seqno = sock_ptr->sequenceNumberSource++;
81

82
        PrrtBlock_insert_data_packet(block, packet);
83

84
        if (PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
85
            uint32_t j = 0;
86
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
87

88
            uint32_t redundancyBlocks = List_count(block->redundancyPackets);
89
            for (j = 0; j < redundancyBlocks; j++) {
90
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
91
                send_packet(sock_ptr, red_pkt);
92
            }
93

94
            uint32_t dataBlocks = List_count(block->dataPackets);
95
            for (j = 0; j < dataBlocks; j++) {
96
97
98
                PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
                send_packet(sock_ptr, data_pkt);
            }
99

Andreas Schmidt's avatar
Andreas Schmidt committed
100
            PrrtBlock_destroy(block);
101
            block = NULL;
102
        }
103

104
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
105
        usleep(1);
106
    }
107
108

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
109
        PERROR("Failed to send packet %s", "");
110
        return NULL;
111
}