dataTransmitter.c 4.48 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
5
6
#include "../../defines.h"
#include "../block.h"
7
#include "../receiver.h"
8
#include "../socket.h"
9
10
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "dataTransmitter.h"
12
13


Andreas Schmidt's avatar
Andreas Schmidt committed
14
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
15
    uint8_t buf[MAX_PAYLOAD_LENGTH];
16
    memset(buf, 0, sizeof(buf));
17
    prrtPacketLength_t length = PrrtPacket_size(packet);
18

19
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
20

Andreas Schmidt's avatar
Andreas Schmidt committed
21
22
23
24
25
26
27
28
29
30
31
32
33
    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
34
    // SENDING TO ALL RECEIVERS
35
    LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
36
        PrrtReceiver *recv = cur->value;
Andreas Schmidt's avatar
Andreas Schmidt committed
37

38
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
39
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
40
              length, "Sendto failed.");
41
        usleep(1);
42
    }
43
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
58

59
60
    PrrtPacket_destroy(packet);

61
    return true;
62

63
    error:
64
65
    PERROR("Sending packet failed.%s", "")
    return false;
66
67
}

Andreas Schmidt's avatar
Andreas Schmidt committed
68
void *send_data_loop(void *ptr) {
69
70
71
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;

Andreas Schmidt's avatar
Andreas Schmidt committed
72
73
74
    while (1) {
        ListNode *job;
        do {
Andreas Schmidt's avatar
Andreas Schmidt committed
75
            job = Pipe_pull(sock_ptr->sendDataQueue);
Andreas Schmidt's avatar
Andreas Schmidt committed
76
            if (PrrtSocket_closing(sock_ptr)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
77
78
79
80
81
                if (block != NULL) {
                    PrrtBlock_destroy(block);
                }
                return NULL;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
82
        } while (!job);
Stefan Reif's avatar
Stefan Reif committed
83
        PrrtPacket *packet = PrrtPacket_byListNode(job);
Andreas Schmidt's avatar
Andreas Schmidt committed
84
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
Andreas Schmidt's avatar
Andreas Schmidt committed
85
86
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
        if (block == NULL) {
87
            block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
88
89
        }

90
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
91

92
        PrrtPacketDataPayload *payload = packet->payload;
93
        payload->groupRTT_us = PrrtChannelStateInformation_get_rtt(sock_ptr->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
94

95
        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
96
        send_packet(sock_ptr, packetToSend);
Andreas Schmidt's avatar
Andreas Schmidt committed
97
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
98
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
99

100
        PrrtBlock_insert_data_packet(block, packet);
101

102
        // TODO: redundancy should only be sent when necessary
Andreas Schmidt's avatar
Andreas Schmidt committed
103
        if (PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
104
            uint32_t j = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
105
106
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
107
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
Andreas Schmidt's avatar
Andreas Schmidt committed
108
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
109

Andreas Schmidt's avatar
Andreas Schmidt committed
110
            uint32_t redundancyPackets = List_count(block->redundancyPackets);
Andreas Schmidt's avatar
Andreas Schmidt committed
111
            for (j = 0; j < redundancyPackets; j++) {
112
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
113
                send_packet(sock_ptr, red_pkt);
114
            }
115

Andreas Schmidt's avatar
Andreas Schmidt committed
116
            PrrtBlock_destroy(block);
117
            block = NULL;
118
        }
119
120
    }
}