dataTransmitter.c 4.19 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
5
6
#include "../../defines.h"
#include "../block.h"
7
#include "../clock.h"
8
#include "../receiver.h"
9
#include "../socket.h"
10
11
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
#include "dataTransmitter.h"
13
14


15
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
16
    uint8_t buf[MAX_PAYLOAD_LENGTH];
17
    memset(buf, 0, sizeof(buf));
18
19
    prrtPacketLength_t length = PrrtPacket_size(packet);
    prrtPacketType_t type = PrrtPacket_type(packet);
20

21
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
22
23

    // SENDING TO ALL RECEIVERS
24
25
    LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
        PrrtReceiver* recv = cur->value;
Andreas Schmidt's avatar
Andreas Schmidt committed
26

27
28
29
30
31
        struct hostent *hp;

        struct sockaddr_in targetaddr;
        memset((char *) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
32
        targetaddr.sin_port = htons(recv->port);
33

34
        hp = gethostbyname(recv->host_name);
Andreas Schmidt's avatar
Andreas Schmidt committed
35
        check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
36
37
        memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

38
39
        if(type == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH)->timestamp = htonl(PrrtClock_get_current_time()); }
        // TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
40
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) == length, "Sendto failed.");
41
        usleep(1);
42
    }
43

44
45
    PrrtPacket_destroy(packet);

46
    return true;
47

48
    error:
49
        PERROR("Sending packet failed.%s", "")
50
        return false;
51
52
}

53
void * send_data_loop(void *ptr) {
54
55
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;
56
    PrrtCodingParams *cpar = PrrtCodingParams_create();
57
58

    while (1) {
59
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
60
        while (List_count(sock_ptr->outQueue) == 0) {
61
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
62
            if(sock_ptr->closing) {
63
                PrrtCodingParams_destroy(cpar);
Andreas Schmidt's avatar
Andreas Schmidt committed
64
65
66
                if(block != NULL) {
                    PrrtBlock_destroy(block);
                }
67
68
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
69
70
                return NULL;
            }
71
72
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
            check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0, "Cond wait failed.");
73
        }
74

75
        if(block == NULL) {
76
            block = PrrtBlock_create(cpar, sock_ptr->sequenceNumberSource);
77
78
        }

79
        PrrtPacket *packet = List_shift(sock_ptr->outQueue);
80
81
        packet->sequenceNumber = sock_ptr->sequenceNumberSource++;
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
82

Andreas Schmidt's avatar
Andreas Schmidt committed
83
        PrrtPacketDataPayload* payload = packet->payload;
84
85
        payload->groupRTT_us = PrrtChannelStateInformation_get_rtt(sock_ptr->csi);
        payload->packetTimeout_us = PrrtNetworkConstraints_get_target_delay(sock_ptr->applicationConstraints);
Andreas Schmidt's avatar
Andreas Schmidt committed
86

87
88
        PrrtPacket* packetToSend = PrrtPacket_copy(packet);
        send_packet(sock_ptr, packetToSend);
89

90
        PrrtBlock_insert_data_packet(block, packet);
91

92
93
        // TODO: redundancy should only be sent when necessary
        if (PrrtBlock_encode_ready(block) && false) {
Andreas Schmidt's avatar
Andreas Schmidt committed
94
            uint32_t j = 0;
95
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
96

97
            uint32_t redundancyBlocks = List_count(block->redundancyPackets);
98
            for (j = 0; j < redundancyBlocks; j++) {
99
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
100
                send_packet(sock_ptr, red_pkt);
101
            }
102

Andreas Schmidt's avatar
Andreas Schmidt committed
103
            PrrtBlock_destroy(block);
104
            block = NULL;
105
        }
106

107
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
108
    }
109
110

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
111
        PERROR("Failed to send packet %s", "");
112
        return NULL;
113
}