dataTransmitter.c 8.83 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1 2
#include <stdio.h>
#include <netdb.h>
3
#include <string.h>
4 5
#include "../../defines.h"
#include "../block.h"
6
#include "../receiver.h"
7
#include "../socket.h"
8 9
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataTransmitter.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include <math.h>
12

13
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
    if(sock_ptr->isHardwareTimestamping) {
        struct msghdr msg;
        struct iovec iov;
        char control[1024];
        ssize_t got;
        int check = 0;
        const int check_max = 9999;

        iov.iov_base = buf;
        iov.iov_len = length;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_name = recv->ai->ai_addr;
        msg.msg_namelen = recv->ai->ai_addrlen;
        msg.msg_control = control;
        msg.msg_controllen = 0;

31
        sendmsg(sock_ptr->socketFd, &msg, 0);
32
        *packet_clockstamp = __builtin_ia32_rdtsc();
33 34 35 36 37

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
38
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
39 40 41 42 43 44 45 46 47 48 49 50
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

        struct cmsghdr* cmsg;
        for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            if (cmsg->cmsg_level != SOL_SOCKET) {
                continue;
            }
            switch(cmsg->cmsg_type) {
                case SO_TIMESTAMPING: {
                    // Note: The raw stamp [2] is used, because the others are 0.
                    struct timespec *hardstamp = &(((struct timespec*) CMSG_DATA(cmsg))[2]); //TODO: This is ugly.
51 52
                    memcpy(packet_timestamp, hardstamp, sizeof(struct timespec));
                    debug(DEBUG_HARDSTAMPING, "Hardware TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
53 54 55 56 57 58 59 60 61
                    break;
                }
                default:
                    debug(DEBUG_DATARECEIVER, "Msgtype: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
62
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
63
              length, "Sendto failed.");
64
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
65 66
        *packet_clockstamp = __builtin_ia32_rdtsc();
        debug(DEBUG_HARDSTAMPING, "Software TS:\t%ld.%09ld", (long) packet_timestamp->tv_sec, (long) packet_timestamp->tv_nsec);
67 68 69 70 71 72 73 74
    }
    return true;

    error:
    PERROR("Sending packet failed.%s", "")
    return false;

}
75

Andreas Schmidt's avatar
Andreas Schmidt committed
76
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
77
    uint8_t buf[MAX_PAYLOAD_LENGTH];
78
    memset(buf, 0, sizeof(buf));
79
    prrtPacketLength_t length = PrrtPacket_size(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
80
    prrtPacketLength_t payloadLength = packet->payloadLength;
Andreas Schmidt's avatar
Andreas Schmidt committed
81
    PrrtSocket_pace(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
82

rna's avatar
rna committed
83
    int64_t space = PrrtReceiver_get_space(sock_ptr->receiver);
84
    while (space < sock_ptr->maximum_payload_size) {
rna's avatar
rna committed
85
        //PrrtReceiver_check_rto(sock_ptr->receiver, packet->sequenceNumber, PrrtPacket_type(packet));
rna's avatar
rna committed
86 87 88
        PrrtReceiver_wait_for_space(sock_ptr->receiver);
        space = PrrtReceiver_get_space(sock_ptr->receiver);
    }
rna's avatar
rna committed
89

Andreas Schmidt's avatar
Andreas Schmidt committed
90
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
91
    if (sock_ptr->pacingEnabled) {
92
        double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver);
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
93
        if(pacing_rate != 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
94 95 96
            prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate));
            debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
            sock_ptr->nextSendTime = now + pacingTime;
Andreas Schmidt's avatar
Pacing.  
Andreas Schmidt committed
97
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
98
    }
99 100 101
    // Update timestamp
    if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
        ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
rna's avatar
rna committed
102
        ((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
103 104
    } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
        ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
    }

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");

    switch (PrrtPacket_type(packet)) {
        case PACKET_TYPE_DATA:
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
122

123
    struct timespec timestamp;
124 125
    uint64_t cyclestamp;
    send_to_socket(sock_ptr, sock_ptr->receiver, buf, length, &timestamp, &cyclestamp);
126
    XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
127
    XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
128

129 130
    PrrtReceiver_add_outstanding_packet_state(sock_ptr->receiver, packet, PrrtClock_TimespecToPrrtTimestamp(timestamp));

131
    switch (PrrtPacket_type(packet)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145
        case PACKET_TYPE_DATA:
            XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REDUNDANCY:
            XlapTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
            break;
        case PACKET_TYPE_REPETITION:
        case PACKET_TYPE_FEEDBACK:
        case PACKET_TYPE_PRESENT_REDUNDANCY:
        case PACKET_TYPE_CHANNEL_FEEDBACK:
        default:;
    }
146

147
    return true;
148

149
    error:
150 151
    PERROR("Sending packet failed.%s", "")
    return false;
152 153
}

154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
    XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
    if (sock_ptr->receiveBlock == NULL) {
        sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
    }

    packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);

    PrrtPacketDataPayload *payload = packet->payload;
    payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);

    PrrtPacket *packetToSend = PrrtPacket_copy(packet);
    debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
    send_packet(sock_ptr, packetToSend);
    XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
    XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);

    PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet);

    // TODO: redundancy should only be sent when necessary
    if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
        uint32_t j = 0;
        unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
        XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
        PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
        XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);

        uint32_t redundancyPackets = List_count(sock_ptr->receiveBlock->redundancyPackets);
        for (j = 0; j < redundancyPackets; j++) {
            PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(sock_ptr->receiveBlock);
            send_packet(sock_ptr, red_pkt);
        }

        PrrtBlock_destroy(sock_ptr->receiveBlock);
        sock_ptr->receiveBlock = NULL;
    }
}

void *PrrtDataTransmitter_send_data_loop(void *ptr) {
194 195
    PrrtSocket *sock_ptr = ptr;

Andreas Schmidt's avatar
Andreas Schmidt committed
196 197 198
    while (1) {
        ListNode *job;
        do {
199
            job = Pipe_pull(sock_ptr->sendDataQueue);
Andreas Schmidt's avatar
Andreas Schmidt committed
200
            if (PrrtSocket_closing(sock_ptr)) {
201 202 203
                if (sock_ptr->receiveBlock != NULL) {
                    PrrtBlock_destroy(sock_ptr->receiveBlock);
                    sock_ptr->receiveBlock = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
204 205 206
                }
                return NULL;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
207
        } while (!job);
Stefan Reif's avatar
Stefan Reif committed
208
        PrrtPacket *packet = PrrtPacket_byListNode(job);
209
        PrrtDataTransmitter_transmit(sock_ptr, packet);
210 211
    }
}