dataTransmitter.c 6.7 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4
5
#include "../../defines.h"
#include "../block.h"
6
#include "../receiver.h"
7
#include "../socket.h"
8
9
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataTransmitter.h"
11

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length,  struct timespec *packet_stamp_ptr) {
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

        sendmsg(sock_ptr->dataSocketFd, &msg, 0);

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
            got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
                    memcpy(packet_stamp_ptr, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_stamp_ptr->tv_sec, (long) packet_stamp_ptr->tv_nsec);
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
              length, "Sendto failed.");
        clock_gettime(CLOCK_REALTIME, packet_stamp_ptr);
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_stamp_ptr->tv_sec, (long) packet_stamp_ptr->tv_nsec);
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
72

Andreas Schmidt's avatar
Andreas Schmidt committed
73
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
74
    uint8_t buf[MAX_PAYLOAD_LENGTH];
75
    memset(buf, 0, sizeof(buf));
76
    prrtPacketLength_t length = PrrtPacket_size(packet);
77

78
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
79

Andreas Schmidt's avatar
Andreas Schmidt committed
80
81
82
83
84
85
86
87
88
89
90
91
92
    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
93

94
95
96
    struct timespec timestamp;
    send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp);
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
97

98
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
113

114
115
    PrrtPacket_destroy(packet);

116
    return true;
117

118
    error:
119
120
    PERROR("Sending packet failed.%s", "")
    return false;
121
122
}

Andreas Schmidt's avatar
Andreas Schmidt committed
123
void *send_data_loop(void *ptr) {
124
125
126
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;

Andreas Schmidt's avatar
Andreas Schmidt committed
127
128
129
    while (1) {
        ListNode *job;
        do {
Andreas Schmidt's avatar
Andreas Schmidt committed
130
            job = MPSCQueue_pop(sock_ptr->sendDataQueue);
Andreas Schmidt's avatar
Andreas Schmidt committed
131
            if (PrrtSocket_closing(sock_ptr)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
132
133
134
135
136
                if (block != NULL) {
                    PrrtBlock_destroy(block);
                }
                return NULL;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
137
        } while (!job);
Stefan Reif's avatar
Stefan Reif committed
138
        PrrtPacket *packet = PrrtPacket_byListNode(job);
Andreas Schmidt's avatar
Andreas Schmidt committed
139
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
Andreas Schmidt's avatar
Andreas Schmidt committed
140
141
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
        if (block == NULL) {
142
            block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
143
144
        }

145
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
146

147
        PrrtPacketDataPayload *payload = packet->payload;
148
        payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
149

150
        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
151
        send_packet(sock_ptr, packetToSend);
Andreas Schmidt's avatar
Andreas Schmidt committed
152
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
153
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
154

155
        PrrtBlock_insert_data_packet(block, packet);
156

157
        // TODO: redundancy should only be sent when necessary
Andreas Schmidt's avatar
Andreas Schmidt committed
158
        if (PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
159
            uint32_t j = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
160
161
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
162
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
Andreas Schmidt's avatar
Andreas Schmidt committed
163
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
164

Andreas Schmidt's avatar
Andreas Schmidt committed
165
            uint32_t redundancyPackets = List_count(block->redundancyPackets);
Andreas Schmidt's avatar
Andreas Schmidt committed
166
            for (j = 0; j < redundancyPackets; j++) {
167
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
168
                send_packet(sock_ptr, red_pkt);
169
            }
170

Andreas Schmidt's avatar
Andreas Schmidt committed
171
            PrrtBlock_destroy(block);
172
            block = NULL;
173
        }
174
        PrrtSocket_cleanup(sock_ptr);
175
176
    }
}