dataReceiver.c 8.06 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
7
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
9
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataReceiver.h"
11

12
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
13
{
14
15
    List *res = List_create();
    BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);
16

17
18
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
19
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
20
        sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->sequenceNumber);
21
    }
22
    error:
23
    List_destroy(res);
24
25
}

26
void decode_block(PrrtSocket *socket, PrrtBlock *block)
27
28
{
    if(block != NULL && PrrtBlock_decode_ready(block)) {
29
        check(PrrtBlock_decode(block), "Decoding failed");
30

31
        while(List_count(block->dataPackets) > 0) {
32
            PrrtPacket *pkt = List_shift(block->dataPackets);
33
34
35
36
37
            if(PrrtForwardPacketTable_test_set_is_number_relevant(socket->forwardPacketTable, pkt->sequenceNumber)) {
                check(pthread_mutex_lock(&socket->inQueueFilledMutex) == 0, "Lock failed.");
                List_push(socket->inQueue, pkt);
                check(pthread_cond_signal(&socket->inQueueFilledCv) == 0, "Signal failed.");
                check(pthread_mutex_unlock(&socket->inQueueFilledMutex) == 0, "Unlock failed.");
38
39
            } else {
                PrrtPacket_destroy(pkt);
40
41
            }
        }
42

Andreas Schmidt's avatar
Andreas Schmidt committed
43
        PrrtBlock_destroy(block);
44

45
        PrrtRepairBlockStore_delete(socket->repairBlockStore, block->baseSequenceNumber);
46
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
47
48

    return;
49
50
    error:
    PERROR("Decoding failed.%s", "")
51
}
52

53
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
54
{
55
56
57
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

58
59
60
61
62
63
64
65
66
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

67
68
69
70
71
72
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 0, 4, 6, 8, 9, 5,
                                                                     sock_ptr->address->sin_addr.s_addr,
                                                                     forwardTripTime);
73
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
74
    void *buf = calloc(1, length);
75
    check_mem(buf);
76

77
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
78

79
80
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
81
    free(buf);
82

83
84
    PrrtPacket_destroy(feedback_pkt_ptr);

85
    return true;
86
87

    error:
88
89
    if(buf != NULL) { free(buf); }
    if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
90
    return false;
91
92
}

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote)
{
    PrrtPacketDataPayload *payload = packet->payload;

    prrtTimestamp_t dataTimestamp = payload->timestamp;
    sock_ptr->lastSentTimestamp = dataTimestamp;
    PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);

    PrrtPacket *copiedPacket = PrrtPacket_copy(packet);
    PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, copiedPacket);

    check(send_feedback(sock_ptr, remote), "Sending feedback failed.");

    prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
    if(now > payload->packetTimeout_us) {
        // TODO: note this as loss
        PrrtPacket_destroy(packet);
    }
    else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
                                                               packet->sequenceNumber) ==
            false) {
        PrrtPacket_destroy(packet);
    } else {
        prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;

        PrrtPacket *reference = PrrtPacket_copy(packet);

        PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
        if(block != NULL) {
            check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
            decode_block(sock_ptr, block);
        } else {
            // Check for duplicate data packet.
            if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
                sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
            } else {
                PrrtPacket_destroy(reference);
            }
        }

        // forward to application layer

        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        List_push(sock_ptr->inQueue, packet);
        check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
    }
    return;

    error:
    PERROR("Handling data packet failed%s.", "");
    return;
}

void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet)
{
    PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;

    if(!PrrtForwardPacketTable_test_is_block_relevant(socket->forwardPacketTable,
                                                      redundancyPayload->baseSequenceNumber,
                                                      redundancyPayload->n)) {
        PrrtPacket_destroy(packet);
    } else {
        PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
                                                          redundancyPayload->baseSequenceNumber);
        if(block == NULL) {
            // TODO: PROPER CREATION
            PrrtCodingParams *cpar = PrrtCodingParams_create();
            cpar->k = redundancyPayload->k;
            cpar->n = redundancyPayload->n;

            block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
            PrrtCodingParams_destroy(cpar);

            PrrtRepairBlockStore_insert(socket->repairBlockStore, redundancyPayload->baseSequenceNumber,
                                        block);
        }

        retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);

        if(PrrtBlock_insert_redundancy_packet(block, packet)) {
            decode_block(socket, block);
        } else {
            PrrtPacket_destroy(packet);
        }
    }
    return;
}

182
183
void *receive_data_loop(void *ptr)
{
184
185
186
187
188
189
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

190
    while(1) {
191
192
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
193
        sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
194

195
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
196
        check_mem(packet);
197
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
198

199
200
        prrtPacketType_t packetType = PrrtPacket_type(packet);
        if(packetType == PACKET_TYPE_DATA) {
201
            handle_data_packet(sock_ptr, packet, remote);
202
        } else if(packetType == PACKET_TYPE_REDUNDANCY) {
203
            handle_redundancy_packet(sock_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
204
205
206
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
207
208
209
        }
    }

210
    // TODO: occasionally clean up dataStore !!!
211

212
    error:
213
    PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
214
}