dataTransmitter.c 4.32 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
2
#include <stdio.h>
#include <netdb.h>
3
#include <unistd.h>
4
#include <string.h>
5
6
#include "../../defines.h"
#include "../block.h"
7
#include "../receiver.h"
8
#include "../socket.h"
9
10
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "dataTransmitter.h"
12
13


14
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet)
15
{
16
    uint8_t buf[MAX_PAYLOAD_LENGTH];
17
    memset(buf, 0, sizeof(buf));
18
    prrtPacketLength_t length = PrrtPacket_size(packet);
19

20
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
21

22
23
24
25
26
27
28
29
30
31
32
33
34
	switch (PrrtPacket_type(packet)) {
	case PACKET_TYPE_DATA:
	    PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitStart);
		break;
	case PACKET_TYPE_REDUNDANCY:
		PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitStart);
		break;
	case PACKET_TYPE_REPETITION:
	case PACKET_TYPE_FEEDBACK:
	case PACKET_TYPE_PRESENT_REDUNDANCY:
	case PACKET_TYPE_CHANNEL_FEEDBACK:
	default:;
	}
35
    // SENDING TO ALL RECEIVERS
36
    LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
37
        PrrtReceiver *recv = cur->value;
Andreas Schmidt's avatar
Andreas Schmidt committed
38

39
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
40
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
41
              length, "Sendto failed.");
42
        usleep(1);
43
    }
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    switch (PrrtPacket_type(packet)) {
	case PACKET_TYPE_DATA:
       PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
       PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, LinkTransmitEnd);
	   break;
    case PACKET_TYPE_REDUNDANCY:
       PrrtTimeStampClock(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
       PrrtTimeStampCycle(sock_ptr, ts_redundancy_packet, packet->sequenceNumber, LinkTransmitEnd);
	   break;
	case PACKET_TYPE_REPETITION:
	case PACKET_TYPE_FEEDBACK:
	case PACKET_TYPE_PRESENT_REDUNDANCY:
	case PACKET_TYPE_CHANNEL_FEEDBACK:
	default:;
	}
59

60
61
    PrrtPacket_destroy(packet);

62
    return true;
63

64
    error:
65
66
    PERROR("Sending packet failed.%s", "")
    return false;
67
68
}

69
70
void *send_data_loop(void *ptr)
{
71
72
73
    PrrtSocket *sock_ptr = ptr;
    PrrtBlock *block = NULL;

74
    while(1) {
75
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
76
        while(List_count(sock_ptr->outQueue) == 0) {
77
            if (atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
78
79
80
                if(block != NULL) {
                    PrrtBlock_destroy(block);
                }
81
                check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
82
83
                return NULL;
            }
84
85
            check(pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex) == 0,
                  "Cond wait failed.");
86
        }
87

88
        PrrtPacket *packet = List_shift(sock_ptr->outQueue);
89
        if(block == NULL) {
90
            block = PrrtBlock_create(sock_ptr->codingParameters, packet->sequenceNumber);
91
92
        }

93
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
94

95
        PrrtPacketDataPayload *payload = packet->payload;
96
        payload->groupRTT_us = PrrtChannelStateInformation_get_rtt(sock_ptr->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
97

98
        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
99
        send_packet(sock_ptr, packetToSend);
100

101
        PrrtBlock_insert_data_packet(block, packet);
102

103
        // TODO: redundancy should only be sent when necessary
104
        if(PrrtBlock_encode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
105
            uint32_t j = 0;
106
			PrrtTimeStampCycle(sock_ptr, ts_data_packet, packetToSend->sequenceNumber, PrrtEncodeStart);
107
            PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
108
			PrrtTimeStampCycle(sock_ptr, ts_data_packet, packetToSend->sequenceNumber, PrrtEncodeEnd);
109

Andreas Schmidt's avatar
Andreas Schmidt committed
110
111
            uint32_t redundancyPackets = List_count(block->redundancyPackets);
            for(j = 0; j < redundancyPackets; j++) {
112
                PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
113
                send_packet(sock_ptr, red_pkt);
114
            }
115

Andreas Schmidt's avatar
Andreas Schmidt committed
116
            PrrtBlock_destroy(block);
117
            block = NULL;
118
        }
119

120
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
121
    }
122
123

    error:
124
125
    PERROR("Failed to send packet %s", "");
    return NULL;
126
}