dataTransmitter.c 10.7 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1 2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4
#include "../../defines.h"
5
#include "../timer.h"
6
#include "../receiver.h"
7
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
#include "../types/block.h"
9 10
#include "../../util/dbg.h"
#include "../../util/common.h"
11
#include "../../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
#include "dataTransmitter.h"
13
#include <math.h>
14

Andreas Schmidt's avatar
Andreas Schmidt committed
15 16
bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t length,  struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
    PrrtReceiver *recv = sock_ptr->receiver;
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

Andreas Schmidt's avatar
Andreas Schmidt committed
34
        sendmsg(sock_ptr->socketFd, &msg, 0);
35
        *packet_clockstamp = __builtin_ia32_rdtsc();
36 37 38 39 40

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
Andreas Schmidt's avatar
Andreas Schmidt committed
41
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
42 43 44 45 46 47 48 49 50 51 52 53
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
54 55
                    memcpy(packet_timestamp, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
56 57 58 59 60 61 62 63 64
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
Andreas Schmidt's avatar
Andreas Schmidt committed
65
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
66
              length, "Sendto failed.");
67
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
68 69
        *packet_clockstamp = __builtin_ia32_rdtsc();
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
70 71 72 73 74 75 76 77
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
78

Andreas Schmidt's avatar
Andreas Schmidt committed
79
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
80
    uint8_t buf[MAX_PAYLOAD_LENGTH];
81
    memset(buf, 0, sizeof(buf));
82
    prrtPacketLength_t length = PrrtPacket_size(packet);
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    prrtPacketLength_t payloadLength = packet->payloadLength;

    bool paceSuccessful = PrrtSocket_pace(sock_ptr);
    if (!paceSuccessful) {
        debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
        PrrtPacket_destroy(packet);
        return false;
    }
    debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");

    bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
                                                      sock_ptr->applicationConstraints);
    if(!waitSuccessful) {
        debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
        PrrtPacket_destroy(packet);
        return false;
    }
    debug(DEBUG_DATATRANSMITTER, "Space available.");

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    if (sock_ptr->pacingEnabled) {
        double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver);
        if(pacing_rate != 0) {
            prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate));
            debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
            sock_ptr->nextSendTime = now + pacingTime;
        }
    }
    // Update timestamp
    if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
        ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
        ((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
    } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
        ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
    }
118

119
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
120

Andreas Schmidt's avatar
Andreas Schmidt committed
121 122 123 124 125 126 127 128 129 130 131 132 133
    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
134

135
    struct timespec timestamp;
136
    uint64_t cyclestamp;
Andreas Schmidt's avatar
Andreas Schmidt committed
137
    send_to_socket(sock_ptr, buf, length, &timestamp, &cyclestamp);
138
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
139
    XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
140

141 142
    PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));

143
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
144 145 146 147 148 149 150 151 152 153 154 155 156 157
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
158

159
    return true;
160

161
    error:
162 163
    PERROR("Sending packet failed.%s", "")
    return false;
164 165
}

166 167 168 169
typedef struct timer_arg {
    PrrtSocket* socket;
    PrrtBlock* block;
} RetransmissionTimerArgs;
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
void retransmission_round_handler(void *arg) {
    uint8_t j;
    RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;

    PrrtBlock *block = args->block;
    PrrtSocket *socket = args->socket;

    if(block->inRound > 0) {
        PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
    }

    if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
        PrrtBlock_destroy(block);
        free(arg);
        return;
    }

    uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
    for (j = 0; j < redundancyPackets; j++) {
        PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
        bool sendResult = send_packet(socket, red_pkt);
        if(!sendResult) {
            debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
            PrrtBlock_destroy(block);
            free(arg);
            return;
197
        }
198
    }
199

200 201 202 203 204
    block->inRound++;
    PrrtTimerTask task = {
            .arg = arg,
            .fun = retransmission_round_handler
    };
205

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
    prrtTimerDate deadline = abstime_from_now(waittime_us);
    debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
    PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}


void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
    XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    if (sock_ptr->receiveBlock == NULL) {
        sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
        sock_ptr->receiveBlock->senderBlock = true;
    }

    packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);

    PrrtPacketDataPayload *payload = packet->payload;
    payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
225

226 227 228 229
    PrrtPacket *packetToSend = PrrtPacket_copy(packet);
    debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
    int sendResult = send_packet(sock_ptr, packetToSend);
    if (sendResult) {
Andreas Schmidt's avatar
Andreas Schmidt committed
230
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
231
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
232

233 234 235
        if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
            PERROR("Failed to insert packet: %d", packet->sequenceNumber);
        }
236

237
        if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
238 239
            unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
240
            PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
Andreas Schmidt's avatar
Andreas Schmidt committed
241
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
242

243 244 245 246 247
            RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
            args->block = sock_ptr->receiveBlock;
            sock_ptr->receiveBlock = NULL;
            args->socket = sock_ptr;
            retransmission_round_handler(args);
248
        }
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
    } else {
        PrrtPacket_destroy(packet);
    }
}

void *PrrtDataTransmitter_send_data_loop(void *ptr) {
    PrrtSocket *s = ptr;

    while (1) {
        ListNode *job;
        do {
            job = Pipe_pull(s->sendDataQueue);
            if (PrrtSocket_closing(s)) {
                if (s->receiveBlock != NULL) {
                    PrrtBlock_destroy(s->receiveBlock);
                    s->receiveBlock = NULL;
                }
                return NULL;
            }
        } while (!job);
        PrrtPacket *packet = PrrtPacket_byListNode(job);
        PrrtDataTransmitter_transmit(s, packet);
271 272
    }
}