dataTransmitter.c 7.06 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4
5
#include "../../defines.h"
#include "../block.h"
6
#include "../receiver.h"
7
#include "../socket.h"
8
9
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataTransmitter.h"
11

12
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length,  struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

        sendmsg(sock_ptr->dataSocketFd, &msg, 0);
31
        *packet_clockstamp = __builtin_ia32_rdtsc();
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
            got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
50
51
                    memcpy(packet_timestamp, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
52
53
54
55
56
57
58
59
60
61
62
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
              length, "Sendto failed.");
63
64
65
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
        *packet_clockstamp = __builtin_ia32_rdtsc();
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
66
67
68
69
70
71
72
73
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
74

Andreas Schmidt's avatar
Andreas Schmidt committed
75
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
76
    uint8_t buf[MAX_PAYLOAD_LENGTH];
77
    memset(buf, 0, sizeof(buf));
78
    prrtPacketLength_t length = PrrtPacket_size(packet);
79

80
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
81

Andreas Schmidt's avatar
Andreas Schmidt committed
82
83
84
85
86
87
88
89
90
91
92
93
94
    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
95

96
    struct timespec timestamp;
97
98
    uint64_t cyclestamp;
    send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp, &cyclestamp);
99
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
100
    XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
101

102
103
    PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));

104
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
119

120
    return true;
121

122
    error:
123
124
    PERROR("Sending packet failed.%s", "")
    return false;
125
126
}

Andreas Schmidt's avatar
Andreas Schmidt committed
127
void *send_data_loop(void *ptr) {
128
129
130
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;

Andreas Schmidt's avatar
Andreas Schmidt committed
131
132
133
    while (1) {
        ListNode *job;
        do {
Andreas Schmidt's avatar
Andreas Schmidt committed
134
            job = MPSCQueue_pop(sock_ptr->sendDataQueue);
Andreas Schmidt's avatar
Andreas Schmidt committed
135
            if (PrrtSocket_closing(sock_ptr)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
136
137
138
139
140
                if (block != NULL) {
                    PrrtBlock_destroy(block);
                }
                return NULL;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
141
        } while (!job);
Stefan Reif's avatar
Stefan Reif committed
142
        PrrtPacket *packet = PrrtPacket_byListNode(job);
Andreas Schmidt's avatar
Andreas Schmidt committed
143
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
Andreas Schmidt's avatar
Andreas Schmidt committed
144
145
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
        if (block == NULL) {
146
            block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
147
148
        }

149
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
150

151
        PrrtPacketDataPayload *payload = packet->payload;
152
        payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
153

154
        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
155
        send_packet(sock_ptr, packetToSend);
Andreas Schmidt's avatar
Andreas Schmidt committed
156
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
157
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
158

159
        PrrtBlock_insert_data_packet(block, packet);
160

161
        // TODO: redundancy should only be sent when necessary
Andreas Schmidt's avatar
Andreas Schmidt committed
162
        if (PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
163
            uint32_t j = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
164
165
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
166
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
Andreas Schmidt's avatar
Andreas Schmidt committed
167
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
168

Andreas Schmidt's avatar
Andreas Schmidt committed
169
            uint32_t redundancyPackets = List_count(block->redundancyPackets);
Andreas Schmidt's avatar
Andreas Schmidt committed
170
            for (j = 0; j < redundancyPackets; j++) {
171
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
172
                send_packet(sock_ptr, red_pkt);
173
            }
174

Andreas Schmidt's avatar
Andreas Schmidt committed
175
            PrrtBlock_destroy(block);
176
            block = NULL;
177
        }
178
        PrrtSocket_cleanup(sock_ptr);
179
180
    }
}