dataReceiver.c 7.84 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
7
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
9
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataReceiver.h"
11

12
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
13
{
14
15
    List *res = List_create();
    BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
16

17
18
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
19
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
20
        sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->sequenceNumber);
21
    }
22
    error:
23
    List_destroy(res);
24
25
}

26
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, prrtSequenceNumber_t base_seqno)
27
28
{
    if(block != NULL && PrrtBlock_decode_ready(block)) {
29
        check(PrrtBlock_decode(block), "Decoding failed");
30

31
        while(List_count(block->dataPackets) > 0) {
32
            PrrtPacket *pkt = List_shift(block->dataPackets);
33
            if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
34
                check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
35
                List_push(sock_ptr->inQueue, pkt);
36
37
                check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
38
39
            } else {
                PrrtPacket_destroy(pkt);
40
41
            }
        }
42

Andreas Schmidt's avatar
Andreas Schmidt committed
43
        PrrtBlock_destroy(block);
44
45

        sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
46
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
47
48

    return;
49
50
    error:
    PERROR("Decoding failed.%s", "")
51
}
52

53
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
54
{
55
56
57
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

58
59
60
61
62
63
64
65
66
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

67
68
69
    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 3, 4, 6, 8, 9, 5,
                                                                     sock_ptr->address->sin_addr.s_addr);
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
70
    void *buf = calloc(1, length);
71
    check_mem(buf);
72

73
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
74

75
76
    prrtTimestamp_t forwardTripTime = htonl(PrrtClock_get_current_time() + sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp);
    ((PrrtPacketFeedbackPayload*) (buf + PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH))->forwardTripTimestamp_us = forwardTripTime;
77
    // TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
78
79
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
80
    free(buf);
81

82
83
    PrrtPacket_destroy(feedback_pkt_ptr);

84
    return true;
85
86

    error:
87
88
    if(buf != NULL) { free(buf); }
    if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
89
    return false;
90
91
}

92
93
void *receive_data_loop(void *ptr)
{
94
95
96
97
98
99
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

100
    while(1) {
101
102
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
103
        sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time();
104

105
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
106
        check_mem(packet);
107
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
108

Andreas Schmidt's avatar
Andreas Schmidt committed
109
110
        // TODO: make something useful with RTT approximation
        PrrtPacketDataPayload* payload = packet->payload;
111
        debug("RTT: %d", payload->groupRTT_us);
Andreas Schmidt's avatar
Andreas Schmidt committed
112

113
114
115
116
        sock_ptr->lastSentTimestamp = PrrtPacket_get_data_timestamp(packet);

        check(send_feedback(sock_ptr, remote), "Sending feedback failed.");

117
        prrtPacketType_t packetType = PrrtPacket_type(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
118
119
        if(packetType == PACKET_TYPE_DATA) {
            // TODO: packet.timestamp + packet.timeout < now: break
120

121
            if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->sequenceNumber) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
122
123
124
               false) {
                PrrtPacket_destroy(packet);
            } else {
125
                prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
126

Andreas Schmidt's avatar
Andreas Schmidt committed
127
                PrrtPacket *reference = PrrtPacket_copy(packet);
128

Andreas Schmidt's avatar
Andreas Schmidt committed
129
130
131
132
133
134
                PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
                if(block != NULL) {
                    check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
                    decode_block(sock_ptr, block, baseSequenceNumber);
                } else {
                    // Check for duplicate data packet.
135
136
                    if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
                        sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
137
                    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
138
                        PrrtPacket_destroy(reference);
139
                    }
Andreas Schmidt's avatar
Andreas Schmidt committed
140
                }
141

Andreas Schmidt's avatar
Andreas Schmidt committed
142
                // forward to application layer
143

144
                check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
145
                List_push(sock_ptr->inQueue, packet);
146
147
                check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
148
            }
149
150
        } else if(packetType == PACKET_TYPE_REDUNDANCY) {
            PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
151

Andreas Schmidt's avatar
Andreas Schmidt committed
152
            if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
153
                                                              redundancyPayload->baseSequenceNumber,
Andreas Schmidt's avatar
Andreas Schmidt committed
154
155
156
                                                              redundancyPayload->n)) {
                PrrtPacket_destroy(packet);
            } else {
157
                PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
158
                if(block == NULL) {
159
160
                    // TODO: PROPER CREATION
                    PrrtCodingParams *cpar = PrrtCodingParams_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
161
162
163
                    cpar->k = redundancyPayload->k;
                    cpar->n = redundancyPayload->n;

164
                    block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
165
166
                    PrrtCodingParams_destroy(cpar);

167
                    sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber,
Andreas Schmidt's avatar
Andreas Schmidt committed
168
169
                                                         block);
                }
170

171
                retrieve_data_blocks(sock_ptr, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);
172

Andreas Schmidt's avatar
Andreas Schmidt committed
173
                if(PrrtBlock_insert_redundancy_packet(block, packet)) {
174
                    decode_block(sock_ptr, block, redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
175
176
                } else {
                    PrrtPacket_destroy(packet);
177
                }
Andreas Schmidt's avatar
Andreas Schmidt committed
178
179
180
181
            }
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
182
183
184
        }
    }

185
186
    // TODO: occasionally clean up dataStore and blockStore !!!

187
    error:
188
    PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
189
}