dataTransmitter.c 3.77 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
5
6
#include "../../defines.h"
#include "../block.h"
7
#include "../receiver.h"
8
#include "../socket.h"
9
10
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "dataTransmitter.h"
12
13


14
15
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
{
16
    uint8_t buf[MAX_PAYLOAD_LENGTH];
17
    memset(buf, 0, sizeof(buf));
18
    prrtPacketLength_t length = PrrtPacket_size(packet);
19

20
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
21
22

    // SENDING TO ALL RECEIVERS
23
    LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
24
        PrrtReceiver *recv = cur->value;
Andreas Schmidt's avatar
Andreas Schmidt committed
25

26
27
28
29
30
        struct hostent *hp;

        struct sockaddr_in targetaddr;
        memset((char *) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
31
        targetaddr.sin_port = htons(recv->port);
32

33
        hp = gethostbyname(recv->host_name);
Andreas Schmidt's avatar
Andreas Schmidt committed
34
        check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
35
36
        memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

37
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
38
39
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
              length, "Sendto failed.");
40
        usleep(1);
41
    }
42

43
44
    PrrtPacket_destroy(packet);

45
    return true;
46

47
    error:
48
49
    PERROR("Sending packet failed.%s", "")
    return false;
50
51
}

52
53
void *send_data_loop(void *ptr)
{
54
55
56
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;

57
    while(1) {
58
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
59
        while(List_count(sock_ptr->outQueue) == 0) {
60
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
61
            if(sock_ptr->closing) {
Andreas Schmidt's avatar
Andreas Schmidt committed
62
63
64
                if(block != NULL) {
                    PrrtBlock_destroy(block);
                }
65
66
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
67
68
                return NULL;
            }
69
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
70
71
            check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0,
                  "Cond wait failed.");
72
        }
73

74
        if(block == NULL) {
75
            block = PrrtBlock_create(sock_ptr->codingParams, sock_ptr->sequenceNumberSource);
76
77
        }

78
        PrrtPacket *packet = List_shift(sock_ptr->outQueue);
79
80
        packet->sequenceNumber = sock_ptr->sequenceNumberSource++;
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
81

82
        PrrtPacketDataPayload *payload = packet->payload;
83
        payload->groupRTT_us = PrrtChannelStateInformation_get_rtt(sock_ptr->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
84

85
        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
86
        send_packet(sock_ptr, packetToSend);
87

88
        PrrtBlock_insert_data_packet(block, packet);
89

90
        // TODO: redundancy should only be sent when necessary
91
        if(PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
92
            uint32_t j = 0;
93
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
94

95
            uint32_t redundancyBlocks = List_count(block->redundancyPackets);
96
            for(j = 0; j < redundancyBlocks; j++) {
97
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
98
                send_packet(sock_ptr, red_pkt);
99
            }
100

Andreas Schmidt's avatar
Andreas Schmidt committed
101
            PrrtBlock_destroy(block);
102
            block = NULL;
103
        }
104

105
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
106
    }
107
108

    error:
109
110
    PERROR("Failed to send packet %s", "");
    return NULL;
111
}