dataTransmitter.c 12 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1 2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4
#include "../../defines.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
5
#include "../timer.h"
6
#include "../receiver.h"
7
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
#include "../types/block.h"
9 10
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "../../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
#include "dataTransmitter.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
13
#include <math.h>
14

Andreas Schmidt's avatar
Andreas Schmidt committed
15 16
bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t length,  struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
    PrrtReceiver *recv = sock_ptr->receiver;
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

34
        sendmsg(sock_ptr->socketFd, &msg, 0);
35
        *packet_clockstamp = __builtin_ia32_rdtsc();
36 37 38 39 40

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
41
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
42 43 44 45 46 47 48 49 50 51 52 53
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
54 55
                    memcpy(packet_timestamp, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
56 57 58 59 60 61 62 63 64
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
65
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
66
              length, "Sendto failed.");
67
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
68 69
        *packet_clockstamp = __builtin_ia32_rdtsc();
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
70 71 72 73 74 75 76 77
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
78

Andreas Schmidt's avatar
Andreas Schmidt committed
79
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
80
    uint8_t buf[MAX_PAYLOAD_LENGTH];
81
    memset(buf, 0, sizeof(buf));
82
    PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
83
    bool paceSuccessful = PrrtSocket_pace(sock_ptr, true);
84
    PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
85
    if (!paceSuccessful) {
Andreas Schmidt's avatar
Andreas Schmidt committed
86
        debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
87 88 89
        PrrtPacket_destroy(packet);
        return false;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
90
    debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
91

Andreas Schmidt's avatar
Andreas Schmidt committed
92
    PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
93 94
    bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
                                                      sock_ptr->applicationConstraints);
Andreas Schmidt's avatar
Andreas Schmidt committed
95
    PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
96 97 98 99
    if(!waitSuccessful) {
        debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
        PrrtPacket_destroy(packet);
        return false;
rna's avatar
rna committed
100
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
101
    debug(DEBUG_DATATRANSMITTER, "Space available.");
rna's avatar
rna committed
102

Andreas Schmidt's avatar
Andreas Schmidt committed
103
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
104
    if (sock_ptr->pacingEnabled) {
Andreas Schmidt's avatar
Andreas Schmidt committed
105 106 107
        prrtTimedelta_t peerPacingTime = 0;
        prrtTimedelta_t channelPacingTime = 0;
        double pacing_rate = PrrtReceiver_get_BBR_pacingRate(sock_ptr->receiver);
108
        double pacing_gain = PrrtReceiver_get_BBR_pacingGain(sock_ptr->receiver) * 0.9;
Andreas Schmidt's avatar
Andreas Schmidt committed
109
        uint32_t state = PrrtReceiver_get_BBR_state(sock_ptr->receiver);
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
110
        if(pacing_rate != 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
111
            channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
112
        }
113 114
        // Cross-Pace iff PROBE_DR and unity gain
        if(sock_ptr->recv_peer_btl_pace != 0 && state == PROBE_DR) {
115
            double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain);
116 117 118 119
            if (pt > (TIMESTAMP_SPACE-1)) {
                peerPacingTime = TIMESTAMP_SPACE-1;
            } else {
                peerPacingTime = (prrtTimedelta_t) pt;
120
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
121 122
        }
        
123
        prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
Andreas Schmidt's avatar
Andreas Schmidt committed
124 125
        debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
        sock_ptr->nextSendTime = now + pacingTime;
Andreas Schmidt's avatar
Andreas Schmidt committed
126
    }
127
    // Update timestamp
128
    prrtTimedelta_t btl_pace = MAX(MAX(PrrtPace_get_effective(sock_ptr->prrtTransmitPace), PrrtPace_get_effective(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
129 130
    if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
        ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
131
        ((PrrtPacketDataPayload*) (packet->payload))->btl_datarate = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
Andreas Schmidt's avatar
Andreas Schmidt committed
132
        ((PrrtPacketDataPayload*) (packet->payload))->btl_pace = btl_pace;
133 134
    } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
        ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
135
        ((PrrtPacketRedundancyPayload*) (packet->payload))->btl_pace = btl_pace;
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    }

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");

    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
153

154
    struct timespec timestamp;
155
    uint64_t cyclestamp;
156
    send_to_socket(sock_ptr, buf, PrrtPacket_size(packet), &timestamp, &cyclestamp);
157
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
158
    XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
159

160 161
    PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));

162
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
177
    return true;
178

179
    error:
180 181
    PERROR("Sending packet failed.%s", "")
    return false;
182 183
}

Andreas Schmidt's avatar
Andreas Schmidt committed
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
typedef struct timer_arg {
    PrrtSocket* socket;
    PrrtBlock* block;
} RetransmissionTimerArgs;

void retransmission_round_handler(void *arg) {
    uint8_t j;
    RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;

    PrrtBlock *block = args->block;
    PrrtSocket *socket = args->socket;

    if(block->inRound > 0) {
        PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
    }

    if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
        PrrtBlock_destroy(block);
        free(arg);
        return;
    }

    uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
    for (j = 0; j < redundancyPackets; j++) {
        PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
        bool sendResult = send_packet(socket, red_pkt);
        if(!sendResult) {
            debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
            PrrtBlock_destroy(block);
            free(arg);
            return;
        }
    }

    block->inRound++;
    PrrtTimerTask task = {
            .arg = arg,
            .fun = retransmission_round_handler
    };

    uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
    prrtTimerDate deadline = abstime_from_now(waittime_us);
    debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
    PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}


Andreas Schmidt's avatar
Andreas Schmidt committed
231 232 233 234 235 236
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
    PrrtPace_track_start(sock_ptr->prrtTransmitPace);
    XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    if (sock_ptr->receiveBlock == NULL) {
        sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
237
        sock_ptr->receiveBlock->senderBlock = true;
Andreas Schmidt's avatar
Andreas Schmidt committed
238 239 240 241 242 243 244 245 246
    }

    packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);

    PrrtPacketDataPayload *payload = packet->payload;
    payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);

    PrrtPacket *packetToSend = PrrtPacket_copy(packet);
    debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
247 248 249 250 251 252 253
    int sendResult = send_packet(sock_ptr, packetToSend);
    if (sendResult) {
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);

        if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
            PERROR("Failed to insert packet: %d", packet->sequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
254 255
        }

Andreas Schmidt's avatar
Andreas Schmidt committed
256 257 258 259 260 261 262 263 264 265 266 267 268 269
        if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
            PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);

            RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
            args->block = sock_ptr->receiveBlock;
            sock_ptr->receiveBlock = NULL;
            args->socket = sock_ptr;
            retransmission_round_handler(args);
        }
    } else {
        PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
270 271 272 273 274
    }
    PrrtPace_track_end(sock_ptr->prrtTransmitPace);
}

void *PrrtDataTransmitter_send_data_loop(void *ptr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
275
    PrrtSocket *s = ptr;
276

Andreas Schmidt's avatar
Andreas Schmidt committed
277 278 279
    while (1) {
        ListNode *job;
        do {
Andreas Schmidt's avatar
Andreas Schmidt committed
280 281 282 283 284
            job = Pipe_pull(s->sendDataQueue);
            if (PrrtSocket_closing(s)) {
                if (s->receiveBlock != NULL) {
                    PrrtBlock_destroy(s->receiveBlock);
                    s->receiveBlock = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
285 286 287
                }
                return NULL;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
288
        } while (!job);
Stefan Reif's avatar
Stefan Reif committed
289
        PrrtPacket *packet = PrrtPacket_byListNode(job);
Andreas Schmidt's avatar
Andreas Schmidt committed
290
        PrrtDataTransmitter_transmit(s, packet);
291 292
    }
}