dataReceiver.c 8.1 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
7
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
9
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataReceiver.h"
11

12
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
13
{
14
15
    List *res = List_create();
    BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
16

17
18
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
19
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
20
        sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->sequenceNumber);
21
    }
22
    error:
23
    List_destroy(res);
24
25
}

26
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, prrtSequenceNumber_t base_seqno)
27
28
{
    if(block != NULL && PrrtBlock_decode_ready(block)) {
29
        check(PrrtBlock_decode(block), "Decoding failed");
30

31
        while(List_count(block->dataPackets) > 0) {
32
            PrrtPacket *pkt = List_shift(block->dataPackets);
33
            if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
34
                check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
35
                List_push(sock_ptr->inQueue, pkt);
36
37
                check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
38
39
            } else {
                PrrtPacket_destroy(pkt);
40
41
            }
        }
42

Andreas Schmidt's avatar
Andreas Schmidt committed
43
        PrrtBlock_destroy(block);
44
45

        sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
46
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
47
48

    return;
49
50
    error:
    PERROR("Decoding failed.%s", "")
51
}
52

53
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
54
{
55
56
57
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

58
59
60
61
62
63
64
65
66
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

67
68
69
70
71
72
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 0, 4, 6, 8, 9, 5,
                                                                     sock_ptr->address->sin_addr.s_addr,
                                                                     forwardTripTime);
73
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
74
    void *buf = calloc(1, length);
75
    check_mem(buf);
76

77
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
78

79
80
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
81
    free(buf);
82

83
84
    PrrtPacket_destroy(feedback_pkt_ptr);

85
    return true;
86
87

    error:
88
89
    if(buf != NULL) { free(buf); }
    if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
90
    return false;
91
92
}

93
94
void *receive_data_loop(void *ptr)
{
95
96
97
98
99
100
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

101
    while(1) {
102
103
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
104
        sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
105

106
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
107
        check_mem(packet);
108
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
109

110
111
112
        prrtPacketType_t packetType = PrrtPacket_type(packet);
        if(packetType == PACKET_TYPE_DATA) {
            PrrtPacketDataPayload *payload = packet->payload;
Andreas Schmidt's avatar
Andreas Schmidt committed
113

114
115
116
            prrtTimestamp_t dataTimestamp = payload->timestamp;
            sock_ptr->lastSentTimestamp = dataTimestamp;
            PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
117

118
            check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
119

120
121
122
123
124
125
126
            prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
            if(now > payload->packetTimeout_us) {
                debug("LOSS due to %u > %u", now, payload->packetTimeout_us);
                // TODO: note this as loss
                PrrtPacket_destroy(packet);
            }
            else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
127
                                                                  packet->sequenceNumber) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
128
129
130
               false) {
                PrrtPacket_destroy(packet);
            } else {
131
                prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
132

Andreas Schmidt's avatar
Andreas Schmidt committed
133
                PrrtPacket *reference = PrrtPacket_copy(packet);
134

Andreas Schmidt's avatar
Andreas Schmidt committed
135
136
137
138
139
140
                PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
                if(block != NULL) {
                    check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
                    decode_block(sock_ptr, block, baseSequenceNumber);
                } else {
                    // Check for duplicate data packet.
141
142
                    if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
                        sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
143
                    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
144
                        PrrtPacket_destroy(reference);
145
                    }
Andreas Schmidt's avatar
Andreas Schmidt committed
146
                }
147

Andreas Schmidt's avatar
Andreas Schmidt committed
148
                // forward to application layer
149

150
                check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
151
                List_push(sock_ptr->inQueue, packet);
152
153
                check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
154
            }
155
156
        } else if(packetType == PACKET_TYPE_REDUNDANCY) {
            PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
157

Andreas Schmidt's avatar
Andreas Schmidt committed
158
            if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
159
                                                              redundancyPayload->baseSequenceNumber,
Andreas Schmidt's avatar
Andreas Schmidt committed
160
161
162
                                                              redundancyPayload->n)) {
                PrrtPacket_destroy(packet);
            } else {
163
                PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
164
                if(block == NULL) {
165
166
                    // TODO: PROPER CREATION
                    PrrtCodingParams *cpar = PrrtCodingParams_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
167
168
169
                    cpar->k = redundancyPayload->k;
                    cpar->n = redundancyPayload->n;

170
                    block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
171
172
                    PrrtCodingParams_destroy(cpar);

173
                    sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber,
Andreas Schmidt's avatar
Andreas Schmidt committed
174
175
                                                         block);
                }
176

177
                retrieve_data_blocks(sock_ptr, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);
178

Andreas Schmidt's avatar
Andreas Schmidt committed
179
                if(PrrtBlock_insert_redundancy_packet(block, packet)) {
180
                    decode_block(sock_ptr, block, redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
181
182
                } else {
                    PrrtPacket_destroy(packet);
183
                }
Andreas Schmidt's avatar
Andreas Schmidt committed
184
185
186
187
            }
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
188
189
190
        }
    }

191
192
    // TODO: occasionally clean up dataStore and blockStore !!!

193
    error:
194
    PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
195
}