data_transmitter.c 2.47 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
5
6
7
#include <src/defines.h>
#include <src/prrt/socket.h>
#include <src/prrt/block.h>
8
9
10
#include "data_transmitter.h"


11
12
13
14
15
16
17
18
19
20
21
int send_packet(PrrtSocket *sock_ptr, PrrtPacket *data_pkt) {
    int err = 0;
    uint8_t buf[MAX_PAYLOAD_LENGTH];
    uint32_t length = PrrtPacket_size(data_pkt);
    if (PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, data_pkt) < 0) {
        perror("BUF too small.");
        exit(0);
    }

    // SENDING TO ALL RECEIVERS
    int i;
22
    for (i = 0; i < sock_ptr->receiverLength; i++) {
23
24
25
26
27
28
29
30
        PrrtReceiver recv = sock_ptr->receivers[i];

        struct hostent *hp;

        struct sockaddr_in targetaddr;
        memset((char *) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
        targetaddr.sin_port = htons(recv.port);
31

32
33
34
        hp = gethostbyname(recv.host_name);
        memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

35
        if ((sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
36
37
            perror("sendto failed");
            exit(1);
38
        }
39
    }
40

41
    PrrtPacket_destroy(data_pkt);
42

43
44
45
46
47
48
49
50
51
52
53
54
    return err;
}

void *send_data_loop(void *ptr) {
    PrrtSocket *sock_ptr = ptr;

    PrrtBlock *block = NULL;
    PrrtCodingParams *cpar = malloc(sizeof(PrrtCodingParams));
    PrrtCodingParams_init(cpar);
    PrrtBlock_alloc(&block, cpar);

    while (1) {
55
56
57
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        while (List_count(sock_ptr->outQueue) == 0) {
            pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
58
        }
59

60
        PrrtPacket *packet = List_shift(sock_ptr->outQueue);
61

62
        PrrtBlock_insert_data_packet(block, packet);
63

64
65
        if (PrrtBlock_ready(block)) {
            int j = 0;
66
            PrrtBlock_code(block, &sock_ptr->sequenceNumberRedundancy);
67

68
69
            uint32_t pkt_count = (block)->data_count;
            for (j = 0; j < pkt_count; j++) {
70
71
72
                PrrtPacket *data_pkt = PrrtBlock_get_first_data(block);
                send_packet(sock_ptr, data_pkt);
            }
73
74
75
            uint32_t red_count = (block)->redundancy_count;
            for (j = 0; j < red_count; j++) {
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
76
                send_packet(sock_ptr, red_pkt);
77
78
            }
        }
79

80
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
81
        usleep(1);
82
83
    }
}