dataReceiver.c 7.8 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
7
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
9
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataReceiver.h"
11

12
void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block)
13
{
14
    List *res = List_create();
15
16

    PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno, (prrtSequenceNumber_t) (base_seqno + k - 1));
17

18
19
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
20
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
21
    }
22

23
    error:
24
    List_destroy(res);
25
26
}

27
void decode_block(PrrtSocket *socket, PrrtBlock *block)
28
29
{
    if(block != NULL && PrrtBlock_decode_ready(block)) {
30
        check(PrrtBlock_decode(block), "Decoding failed");
31

32
        while(List_count(block->dataPackets) > 0) {
33
            PrrtPacket *pkt = List_shift(block->dataPackets);
34
35
36
37
38
            if(PrrtForwardPacketTable_test_set_is_number_relevant(socket->forwardPacketTable, pkt->sequenceNumber)) {
                check(pthread_mutex_lock(&socket->inQueueFilledMutex) == 0, "Lock failed.");
                List_push(socket->inQueue, pkt);
                check(pthread_cond_signal(&socket->inQueueFilledCv) == 0, "Signal failed.");
                check(pthread_mutex_unlock(&socket->inQueueFilledMutex) == 0, "Unlock failed.");
39
40
            } else {
                PrrtPacket_destroy(pkt);
41
42
            }
        }
43

Andreas Schmidt's avatar
Andreas Schmidt committed
44
        PrrtBlock_destroy(block);
45

46
        PrrtRepairBlockStore_delete(socket->repairBlockStore, block->baseSequenceNumber);
47
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
48
49

    return;
50
51
    error:
    PERROR("Decoding failed.%s", "")
52
}
53

54
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
55
{
56
57
58
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

59
60
61
62
63
64
65
66
67
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

68
69
70
71
72
73
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 0, 4, 6, 8, 9, 5,
                                                                     sock_ptr->address->sin_addr.s_addr,
                                                                     forwardTripTime);
74
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
75
    void *buf = calloc(1, length);
76
    check_mem(buf);
77

78
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
79

80
81
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
82
    free(buf);
83

84
85
    PrrtPacket_destroy(feedback_pkt_ptr);

86
    return true;
87
88

    error:
89
90
    if(buf != NULL) { free(buf); }
    if(feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
91
    return false;
92
93
}

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote)
{
    PrrtPacketDataPayload *payload = packet->payload;

    prrtTimestamp_t dataTimestamp = payload->timestamp;
    sock_ptr->lastSentTimestamp = dataTimestamp;
    PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);

    PrrtPacket *copiedPacket = PrrtPacket_copy(packet);
    PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, copiedPacket);

    check(send_feedback(sock_ptr, remote), "Sending feedback failed.");

    prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
    if(now > payload->packetTimeout_us) {
        // TODO: note this as loss
        PrrtPacket_destroy(packet);
    }
    else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
                                                               packet->sequenceNumber) ==
            false) {
        PrrtPacket_destroy(packet);
    } else {
        prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;

        PrrtPacket *reference = PrrtPacket_copy(packet);

        PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
        if(block != NULL) {
            check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
            decode_block(sock_ptr, block);
        } else {
126
            if(PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
                PrrtPacket_destroy(reference);
            }
        }

        // forward to application layer

        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        List_push(sock_ptr->inQueue, packet);
        check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
    }
    return;

    error:
    PERROR("Handling data packet failed%s.", "");
    return;
}

void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet)
{
    PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;

    if(!PrrtForwardPacketTable_test_is_block_relevant(socket->forwardPacketTable,
                                                      redundancyPayload->baseSequenceNumber,
                                                      redundancyPayload->n)) {
        PrrtPacket_destroy(packet);
    } else {
        PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
                                                          redundancyPayload->baseSequenceNumber);
        if(block == NULL) {
            // TODO: PROPER CREATION
            PrrtCodingParams *cpar = PrrtCodingParams_create();
            cpar->k = redundancyPayload->k;
            cpar->n = redundancyPayload->n;

            block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
            PrrtCodingParams_destroy(cpar);

            PrrtRepairBlockStore_insert(socket->repairBlockStore, redundancyPayload->baseSequenceNumber,
                                        block);
        }

        retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);

        if(PrrtBlock_insert_redundancy_packet(block, packet)) {
            decode_block(socket, block);
        } else {
            PrrtPacket_destroy(packet);
        }
    }
    return;
}

180
181
void *receive_data_loop(void *ptr)
{
182
183
184
185
186
187
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

188
    while(1) {
189
190
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
191
        sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
192

193
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
194
        check_mem(packet);
195
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
196

197
198
        prrtPacketType_t packetType = PrrtPacket_type(packet);
        if(packetType == PACKET_TYPE_DATA) {
199
            handle_data_packet(sock_ptr, packet, remote);
200
        } else if(packetType == PACKET_TYPE_REDUNDANCY) {
201
            handle_redundancy_packet(sock_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
202
203
204
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
205
206
207
208
        }
    }

    error:
209
    PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
210
}