dataReceiver.c 9.52 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
7
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
9
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10
#include "dataReceiver.h"
11

Andreas Schmidt's avatar
Andreas Schmidt committed
12
13
static void
retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) {
14
    List *res = List_create();
15

16
17
    PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
                                     (prrtSequenceNumber_t) (base_seqno + k - 1));
18

19
20
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
21
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
22
    }
23
24
    List_destroy(res);
    return;
25
    error:
26
    PERROR("Insert failed.");
27
    List_destroy(res);
28
29
}

Andreas Schmidt's avatar
Andreas Schmidt committed
30
31
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
    if (block != NULL && PrrtBlock_decode_ready(block)) {
32
        check(PrrtBlock_decode(block), "Decoding failed");
33

Andreas Schmidt's avatar
Andreas Schmidt committed
34
        while (List_count(block->dataPackets) > 0) {
35
            PrrtPacket *pkt = List_shift(block->dataPackets);
Andreas Schmidt's avatar
Andreas Schmidt committed
36
37
            if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
                Pipe_push(sock_ptr->inQueue, &pkt->asListNode);
38
39
            } else {
                PrrtPacket_destroy(pkt);
40
41
            }
        }
42

43
        PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
44
        PrrtBlock_destroy(block);
45
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
46
47

    return;
48
49
    error:
    PERROR("Decoding failed.%s", "")
50
}
51

Andreas Schmidt's avatar
Andreas Schmidt committed
52
static bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
53
54
55
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

56
57
58
59
60
61
62
63
64
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

65
66
67
68
69
70
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 0, 4, 6, 8, 9, 5,
                                                                     sock_ptr->address->sin_addr.s_addr,
                                                                     forwardTripTime);
71
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
72
    void *buf = calloc(1, length);
73
    check_mem(buf);
74

75
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
76

77
78
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
79
    free(buf);
80

81
82
    PrrtPacket_destroy(feedback_pkt_ptr);

83
    return true;
84
85

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
86
87
    if (buf != NULL) { free(buf); }
    if (feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
88
    return false;
89
90
}

Andreas Schmidt's avatar
Andreas Schmidt committed
91
92
93
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
    /* TODO: implement */
    return false;
Stefan Reif's avatar
Stefan Reif committed
94
}
95

Andreas Schmidt's avatar
Andreas Schmidt committed
96
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote) {
97
98
99
    PrrtPacketDataPayload *payload = packet->payload;
    prrtTimestamp_t dataTimestamp = payload->timestamp;
    sock_ptr->lastSentTimestamp = dataTimestamp;
Stefan Reif's avatar
Stefan Reif committed
100
    PrrtClock_update(&sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
101
102

    PrrtPacket *copiedPacket = PrrtPacket_copy(packet);
103
    check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, copiedPacket), "Could not insert data packet.");
104

Andreas Schmidt's avatar
Andreas Schmidt committed
105
    prrtSequenceNumber_t seqno = packet->sequenceNumber;
Stefan Reif's avatar
Stefan Reif committed
106

Andreas Schmidt's avatar
Andreas Schmidt committed
107
    XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackStart);
108
    check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
109
    XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, SendFeedbackEnd);
110

Stefan Reif's avatar
Stefan Reif committed
111
    prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
Andreas Schmidt's avatar
Andreas Schmidt committed
112
    if (is_timeout(now, payload->packetTimeout_us)) {
113
        PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->sequenceNumber);
114
115
        // TODO: note this as loss
        PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
116
117
118
119
120
        debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
              (unsigned long) payload->packetTimeout_us);
    } else if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
                                                                  packet->sequenceNumber) ==
               false) {
121
122
123
124
125
126
127
        PrrtPacket_destroy(packet);
    } else {
        prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;

        PrrtPacket *reference = PrrtPacket_copy(packet);

        PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
128
        if (block != NULL) {
129
130
131
            check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
            decode_block(sock_ptr, block);
        } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
132
            if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
133
134
135
136
137
                PrrtPacket_destroy(reference);
            }
        }

        // forward to application layer
Andreas Schmidt's avatar
Andreas Schmidt committed
138
        debug(DEBUG_DATARECEIVER, "forward %u", seqno);
139

Andreas Schmidt's avatar
Andreas Schmidt committed
140
141
        XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
        Pipe_push(sock_ptr->inQueue, &packet->asListNode);
142
143
144
145
146
147
148
149
    }
    return;

    error:
    PERROR("Handling data packet failed%s.", "");
    return;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
150
static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
151
152
    PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;

Andreas Schmidt's avatar
Andreas Schmidt committed
153
154
155
    if (!PrrtForwardPacketTable_test_is_block_relevant(socket->forwardPacketTable,
                                                       redundancyPayload->baseSequenceNumber,
                                                       redundancyPayload->n)) {
156
157
158
159
        PrrtPacket_destroy(packet);
    } else {
        PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
                                                          redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
160
        if (block == NULL) {
Andreas Schmidt's avatar
Andreas Schmidt committed
161
            PrrtCodingParams_update(socket->codingParameters, redundancyPayload->k, redundancyPayload->n);
162

Andreas Schmidt's avatar
Andreas Schmidt committed
163
            block = PrrtBlock_create(socket->codingParameters, redundancyPayload->baseSequenceNumber);
164

165
            PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
166
167
        }

168
        retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
169

Andreas Schmidt's avatar
Andreas Schmidt committed
170
        if (PrrtBlock_insert_redundancy_packet(block, packet)) {
171
172
173
174
175
176
177
178
            decode_block(socket, block);
        } else {
            PrrtPacket_destroy(packet);
        }
    }
    return;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
179
void *receive_data_loop(void *ptr) {
180
181
182
183
184
185
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

Andreas Schmidt's avatar
Andreas Schmidt committed
186
187
188
189
190
191
192
    while (1) {
        XlapTimestampPlaceholder tsph1;
        XlapTimestampPlaceholder tsph2;
        XlapTimestampPlaceholder tsph3;
        XlapTimestampPlaceholderInitialize(&tsph1);
        XlapTimestampPlaceholderInitialize(&tsph2);
        XlapTimestampPlaceholderInitialize(&tsph3);
Stefan Reif's avatar
Stefan Reif committed
193

194
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
195
        sock_ptr->lastReceivedTimestamp = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
196
197
        XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
        XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
198

199
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
200
        check_mem(packet);
Stefan Reif's avatar
Stefan Reif committed
201

Andreas Schmidt's avatar
Andreas Schmidt committed
202
        XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
203
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
204
        XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
Stefan Reif's avatar
Stefan Reif committed
205

Andreas Schmidt's avatar
Andreas Schmidt committed
206
        prrtSequenceNumber_t seqno = packet->sequenceNumber;
207

208
        prrtPacketType_t packetType = PrrtPacket_type(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
209
210
211
212
213
        debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
        if (packetType == PACKET_TYPE_DATA) {
            XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph1);
            XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph2);
            XlapTimestampPlaceholderUse(sock_ptr, ts_data_packet, seqno, &tsph3);
Stefan Reif's avatar
Stefan Reif committed
214

Andreas Schmidt's avatar
Andreas Schmidt committed
215
            XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketStart);
216
            handle_data_packet(sock_ptr, packet, remote);
Andreas Schmidt's avatar
Andreas Schmidt committed
217
218
219
220
221
            XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, HandlePacketEnd);
        } else if (packetType == PACKET_TYPE_REDUNDANCY) {
            XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph1);
            XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph2);
            XlapTimestampPlaceholderUse(sock_ptr, ts_redundancy_packet, seqno, &tsph3);
Stefan Reif's avatar
Stefan Reif committed
222

Andreas Schmidt's avatar
Andreas Schmidt committed
223
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketStart);
224
            handle_redundancy_packet(sock_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
225
            XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, seqno, HandlePacketEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
226
227
228
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
229
        }
230
231

        PrrtSocket_cleanup(sock_ptr);
232
233
234
    }

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
235
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
236
}