dataTransmitter.c 12.1 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4
#include "../../defines.h"
5
#include "../timer.h"
6
#include "../receiver.h"
7
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
#include "../types/block.h"
9
10
#include "../../util/dbg.h"
#include "../../util/common.h"
11
#include "../../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
#include "dataTransmitter.h"
13
#include <math.h>
14

Andreas Schmidt's avatar
Andreas Schmidt committed
15
16
bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t length,  struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
    PrrtReceiver *recv = sock_ptr->receiver;
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

Andreas Schmidt's avatar
Andreas Schmidt committed
34
        sendmsg(sock_ptr->socketFd, &msg, 0);
35
        *packet_clockstamp = __builtin_ia32_rdtsc();
36
37
38
39
40

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
Andreas Schmidt's avatar
Andreas Schmidt committed
41
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
42
43
44
45
46
47
48
49
50
51
52
53
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
54
55
                    memcpy(packet_timestamp, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
56
57
58
59
60
61
62
63
64
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
Andreas Schmidt's avatar
Andreas Schmidt committed
65
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
66
              length, "Sendto failed.");
67
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
68
69
        *packet_clockstamp = __builtin_ia32_rdtsc();
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
70
71
72
73
74
75
76
77
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
78

Andreas Schmidt's avatar
Andreas Schmidt committed
79
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
80
    uint8_t buf[MAX_PAYLOAD_LENGTH];
81
    memset(buf, 0, sizeof(buf));
Andreas Schmidt's avatar
Andreas Schmidt committed
82
83
84
    PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
    bool paceSuccessful = PrrtSocket_pace(sock_ptr, true);
    PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
85
86
87
88
89
90
91
    if (!paceSuccessful) {
        debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
        PrrtPacket_destroy(packet);
        return false;
    }
    debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
92
    PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
93
94
    bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
                                                      sock_ptr->applicationConstraints);
Andreas Schmidt's avatar
Andreas Schmidt committed
95
    PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
96
97
98
99
100
101
102
103
104
    if(!waitSuccessful) {
        debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
        PrrtPacket_destroy(packet);
        return false;
    }
    debug(DEBUG_DATATRANSMITTER, "Space available.");

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    if (sock_ptr->pacingEnabled) {
Andreas Schmidt's avatar
Andreas Schmidt committed
105
106
107
108
109
        prrtTimedelta_t peerPacingTime = 0;
        prrtTimedelta_t channelPacingTime = 0;
        double pacing_rate = PrrtReceiver_get_BBR_pacingRate(sock_ptr->receiver);
        double pacing_gain = PrrtReceiver_get_BBR_pacingGain(sock_ptr->receiver) * 0.9;
        uint32_t state = PrrtReceiver_get_BBR_state(sock_ptr->receiver);
110
        if(pacing_rate != 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
111
112
113
114
115
116
117
118
119
120
            channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
        }
        // Cross-Pace iff PROBE_DR and unity gain
        if(sock_ptr->recv_peer_btl_pace != 0 && state == PROBE_DR) {
            double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain);
            if (pt > (TIMESTAMP_SPACE-1)) {
                peerPacingTime = TIMESTAMP_SPACE-1;
            } else {
                peerPacingTime = (prrtTimedelta_t) pt;
            }
121
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
122

Andreas Schmidt's avatar
Andreas Schmidt committed
123
124
125
        prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
        debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
        sock_ptr->nextSendTime = now + pacingTime;
126
127
    }
    // Update timestamp
Andreas Schmidt's avatar
Andreas Schmidt committed
128
    prrtTimedelta_t btl_pace = MAX(MAX(PrrtPace_get_effective(sock_ptr->prrtTransmitPace), PrrtPace_get_effective(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
129
130
    if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
        ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
131
132
        ((PrrtPacketDataPayload*) (packet->payload))->btl_datarate = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
        ((PrrtPacketDataPayload*) (packet->payload))->btl_pace = btl_pace;
133
134
    } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
        ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
135
        ((PrrtPacketRedundancyPayload*) (packet->payload))->btl_pace = btl_pace;
136
    }
137

138
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
139

Andreas Schmidt's avatar
Andreas Schmidt committed
140
141
142
143
144
145
146
147
148
149
150
151
152
    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
153

154
    struct timespec timestamp;
155
    uint64_t cyclestamp;
Andreas Schmidt's avatar
Andreas Schmidt committed
156
    send_to_socket(sock_ptr, buf, PrrtPacket_size(packet), &timestamp, &cyclestamp);
157
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
158
    XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
Marlene Böhmer's avatar
Marlene Böhmer committed
159
    debug(DEBUG_PACKETFLOW, "%lu: Packet %d:%u sent", (unsigned long) PrrtClock_get_current_time_us(), PrrtPacket_type(packet), packet->sequenceNumber);
160

161
162
    PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));

163
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
178
    return true;
179

180
    error:
181
182
    PERROR("Sending packet failed.%s", "")
    return false;
183
184
}

185
186
void block_timeout(void* arg);

187
188
189
190
typedef struct timer_arg {
    PrrtSocket* socket;
    PrrtBlock* block;
} RetransmissionTimerArgs;
191

192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
void retransmission_round_handler(void *arg) {
    uint8_t j;
    RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;

    PrrtBlock *block = args->block;
    PrrtSocket *socket = args->socket;

    if(block->inRound > 0) {
        PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
    }

    if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
        PrrtBlock_destroy(block);
        free(arg);
        return;
    }

    uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
    for (j = 0; j < redundancyPackets; j++) {
        PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
        bool sendResult = send_packet(socket, red_pkt);
        if(!sendResult) {
            debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
            PrrtBlock_destroy(block);
            free(arg);
            return;
218
        }
219
    }
220

221
222
223
224
225
    block->inRound++;
    PrrtTimerTask task = {
            .arg = arg,
            .fun = retransmission_round_handler
    };
226

227
228
229
230
231
232
233
234
    uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
    prrtTimerDate deadline = abstime_from_now(waittime_us);
    debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
    PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}


void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
Andreas Schmidt's avatar
Andreas Schmidt committed
235
    PrrtPace_track_start(sock_ptr->prrtTransmitPace);
236
237
238
239
240
241
242
243
244
245
    XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    if (sock_ptr->receiveBlock == NULL) {
        sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
        sock_ptr->receiveBlock->senderBlock = true;
    }

    packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);

    PrrtPacketDataPayload *payload = packet->payload;
Andreas Schmidt's avatar
Andreas Schmidt committed
246
    payload->RTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
247

248
249
250
251
    PrrtPacket *packetToSend = PrrtPacket_copy(packet);
    debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
    int sendResult = send_packet(sock_ptr, packetToSend);
    if (sendResult) {
Andreas Schmidt's avatar
Andreas Schmidt committed
252
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
253
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
254

255
256
257
        if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
            PERROR("Failed to insert packet: %d", packet->sequenceNumber);
        }
258

259
        if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
260
261
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
262
            PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
Andreas Schmidt's avatar
Andreas Schmidt committed
263
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
264

265
266
267
268
            RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
            args->block = sock_ptr->receiveBlock;
            args->socket = sock_ptr;
            retransmission_round_handler(args);
269

270
            sock_ptr->receiveBlock = NULL;
271
        }
272
273
274
    } else {
        PrrtPacket_destroy(packet);
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
275
    PrrtPace_track_end(sock_ptr->prrtTransmitPace);
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
}

void *PrrtDataTransmitter_send_data_loop(void *ptr) {
    PrrtSocket *s = ptr;

    while (1) {
        ListNode *job;
        do {
            job = Pipe_pull(s->sendDataQueue);
            if (PrrtSocket_closing(s)) {
                if (s->receiveBlock != NULL) {
                    PrrtBlock_destroy(s->receiveBlock);
                    s->receiveBlock = NULL;
                }
                return NULL;
            }
        } while (!job);
        PrrtPacket *packet = PrrtPacket_byListNode(job);
        PrrtDataTransmitter_transmit(s, packet);
295
296
    }
}