dataReceiver.c 15.2 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4
5
6
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
7
#include "../types/lossStatistics.h"
8
#include "../block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
9
10
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "dataReceiver.h"
12

Andreas Schmidt's avatar
Andreas Schmidt committed
13
14
static void
retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) {
15
    List *res = List_create();
16

17
18
    PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
                                     (prrtSequenceNumber_t) (base_seqno + k - 1));
19

20
21
    LIST_FOREACH(res, first, next, cur) {
        PrrtPacket *packetPtr = cur->value;
22
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
23
    }
24
25
    List_destroy(res);
    return;
26
    error:
27
    PERROR("Insert failed.");
28
    List_destroy(res);
29
30
}

Andreas Schmidt's avatar
Andreas Schmidt committed
31
32
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
    if (block != NULL && PrrtBlock_decode_ready(block)) {
33
        check(PrrtBlock_decode(block), "Decoding failed");
34

Andreas Schmidt's avatar
Andreas Schmidt committed
35
        while (List_count(block->dataPackets) > 0) {
36
            PrrtPacket *pkt = List_shift(block->dataPackets);
37
            if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) {
38
                PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
39
40
            } else {
                PrrtPacket_destroy(pkt);
41
42
            }
        }
43

44
        PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
45
        PrrtBlock_destroy(block);
46
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
47
48

    return;
49
50
    error:
    PERROR("Decoding failed.%s", "")
51
}
52

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp,
                          prrtTimestamp_t sentTimestamp, prrtPacketType_t type) {
    enum XlapTimestampPacketKind kind = ts_data_packet;
    if (type == PACKET_TYPE_DATA) {
        kind = ts_data_packet;
    } else if (type == PACKET_TYPE_REDUNDANCY) {
        kind = ts_redundancy_packet;
    }

    debug(DEBUG_FEEDBACK, "Send feedback %d %d", type, seqno);
    XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);

    prrtFeedback_t feedback = {
            .seqNo = seqno,
            .type = type,
            .receivedTime = receiveStamp,
            .sentTime = sentTimestamp
    };

72
73
74
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);

75
76
77
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
Andreas Schmidt's avatar
Andreas Schmidt committed
78
    targetaddr.sin_port = htons((uint16_t) (remote_port));
79
80
81
82
83

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

84
85
86
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

Andreas Schmidt's avatar
Andreas Schmidt committed
87
88
    PrrtLossStatistics stats = sock_ptr->lossStatistics;

89
90
91
92
    int group_RTT = 0; // TODO: To be determined.
    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
                                                                     stats.gapLength, stats.gapCount, stats.burstLength,
                                                                     stats.burstCount, forwardTripTime,
93
94
                                                                     stats.erasureCount, stats.packetCount, feedback.seqNo,
                                                                     feedback.type);
95
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
96
    void *buf = calloc(1, length);
97
    check_mem(buf);
98

99
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
100

Andreas Schmidt's avatar
Andreas Schmidt committed
101
    check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
102
          length, "Sending feedback failed.");
103
    free(buf);
104

105
    PrrtPacket_destroy(feedback_pkt_ptr);
106
    XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackEnd);
107

108
    return true;
109
110

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
111
112
    if (buf != NULL) { free(buf); }
    if (feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
113
    return false;
114
115
}

Andreas Schmidt's avatar
Andreas Schmidt committed
116
117
118
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
    /* TODO: implement */
    return false;
Stefan Reif's avatar
Stefan Reif committed
119
}
120

121
122
static void
handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
123
    PrrtPacketDataPayload *payload = packet->payload;
124
125
126
    prrtTimestamp_t sentTimestamp = payload->timestamp;
    sock_ptr->lastSentTimestamp = sentTimestamp;
    PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
127

128
    PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
129
    check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
130

Andreas Schmidt's avatar
Andreas Schmidt committed
131
    prrtSequenceNumber_t seqno = packet->sequenceNumber;
Andreas Schmidt's avatar
Andreas Schmidt committed
132
    PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
Stefan Reif's avatar
Stefan Reif committed
133

Stefan Reif's avatar
Stefan Reif committed
134
    prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
Andreas Schmidt's avatar
Andreas Schmidt committed
135
    if (is_timeout(now, payload->packetTimeout_us)) {
136
        PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
137
        PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
138
139
        debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
              (unsigned long) payload->packetTimeout_us);
140
141
    } else if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
                                                                    packet->sequenceNumber) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
142
               false) {
143
144
145
146
147
148
149
        PrrtPacket_destroy(packet);
    } else {
        prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;

        PrrtPacket *reference = PrrtPacket_copy(packet);

        PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
150
        if (block != NULL) {
151
152
            check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed: %d, %d", baseSequenceNumber,
                  seqno);
153
154
            decode_block(sock_ptr, block);
        } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
155
            if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
156
157
158
159
160
                PrrtPacket_destroy(reference);
            }
        }

        // forward to application layer
Andreas Schmidt's avatar
Andreas Schmidt committed
161
        debug(DEBUG_DATARECEIVER, "forward %u", seqno);
162

163
        XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
Andreas Schmidt's avatar
Andreas Schmidt committed
164
        XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
165
        PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
166
167
168
169
170
171
172
173
    }
    return;

    error:
    PERROR("Handling data packet failed%s.", "");
    return;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
174
static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
175
176
    PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;

Andreas Schmidt's avatar
Andreas Schmidt committed
177
178
    PrrtReceptionTable_mark_received(socket->redundancyReceptionTable, packet->sequenceNumber);

179
180
181
    if (!PrrtDeliveredPacketTable_test_is_block_relevant(socket->deliveredPacketTable,
                                                         redundancyPayload->baseSequenceNumber,
                                                         redundancyPayload->n)) {
182
183
184
185
        PrrtPacket_destroy(packet);
    } else {
        PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
                                                          redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
186
        if (block == NULL) {
187
188
            PrrtCodingConfiguration* codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
                                                                                   redundancyPayload->n, 0, NULL);
189

190
            block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber);
191

192
            PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
193
194
        }

195
        retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
196

Andreas Schmidt's avatar
Andreas Schmidt committed
197
        if (PrrtBlock_insert_redundancy_packet(block, packet)) {
198
199
200
201
202
203
204
205
            decode_block(socket, block);
        } else {
            PrrtPacket_destroy(packet);
        }
    }
    return;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
206
void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
Andreas Schmidt's avatar
Andreas Schmidt committed
207
208
209
210
211
212
213
214
215
    check(prrtPacket != NULL, "Cannot be null");
    debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
    PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
    prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;

    bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
    debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");

    if(valid_sample) {
Andreas Schmidt's avatar
Andreas Schmidt committed
216
        PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample->delivery_rate);
Andreas Schmidt's avatar
Andreas Schmidt committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
    }
    PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
    debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");

    PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
                                              (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
    debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop ");
    PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
    debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr ");
    return;

    error:
    PERROR("handle_feedback_packet failed.");
}

232
233
234
void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
                         struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr,
                         uint64_t *packet_cyclestamp_ptr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
235
    if (socket_ptr->isHardwareTimestamping) {
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
        struct cmsghdr *cmsg;
        struct msghdr msg;
        struct iovec entry;

        struct {
            struct cmsghdr cm;
            char control[512];
        } control;

        memset(&msg, 0, sizeof(msg));
        msg.msg_iov = &entry;
        msg.msg_iovlen = 1;
        entry.iov_base = buffer_ptr;
        entry.iov_len = MAX_PAYLOAD_LENGTH;
        msg.msg_name = (caddr_t) remote_ptr;
        msg.msg_namelen = *remote_len_ptr;
        msg.msg_control = &control;
        msg.msg_controllen = sizeof(control);

Andreas Schmidt's avatar
Andreas Schmidt committed
255
        *received_size = recvmsg(socket_ptr->socketFd, &msg, 0);
256

Andreas Schmidt's avatar
Andreas Schmidt committed
257
258
        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            switch (cmsg->cmsg_type) {
259
                case SO_TIMESTAMPNS: {
260
                    memcpy(packet_timestamp_ptr, (struct timespec *) CMSG_DATA(cmsg), sizeof(struct timespec));
261
262
263
264
265
266
267
268
                    break;
                }
                default:
                    PERROR("Unknown Control Msg Type: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
269
        *received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
270
                                  (struct sockaddr *) remote_ptr, remote_len_ptr);
271
        clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
272
    }
273
    *packet_cyclestamp_ptr = __builtin_ia32_rdtsc();
274
275
}

Andreas Schmidt's avatar
Andreas Schmidt committed
276
void *receive_data_loop(void *ptr) {
277
278
279
280
281
282
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;

Andreas Schmidt's avatar
Andreas Schmidt committed
283
284
285
286
287
288
289
    while (1) {
        XlapTimestampPlaceholder tsph1;
        XlapTimestampPlaceholder tsph2;
        XlapTimestampPlaceholder tsph3;
        XlapTimestampPlaceholderInitialize(&tsph1);
        XlapTimestampPlaceholderInitialize(&tsph2);
        XlapTimestampPlaceholderInitialize(&tsph3);
Stefan Reif's avatar
Stefan Reif committed
290

291
292
293
294
295
        struct timespec packet_recv_timestamp;
        uint64_t packet_recv_cyclestamp = 0;
        receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
        debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
              packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
Andreas Schmidt's avatar
Andreas Schmidt committed
296
297
        XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
        XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
298

299
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
300
        check_mem(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
301
        XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
302
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
303
304
        XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
        prrtSequenceNumber_t seqno = packet->sequenceNumber;
Andreas Schmidt's avatar
Andreas Schmidt committed
305
        prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
306
        memcpy(&packet->sender_addr, &remote, addrlen);
307

308
        prrtPacketType_t packetType = PrrtPacket_type(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
309
        debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
310

Andreas Schmidt's avatar
Andreas Schmidt committed
311
        enum XlapTimestampPacketKind kind = ts_any_packet;
312
        prrtTimestamp_t sentTimestamp;
Andreas Schmidt's avatar
Andreas Schmidt committed
313
        if (packetType == PACKET_TYPE_DATA) {
Andreas Schmidt's avatar
Andreas Schmidt committed
314
            kind = ts_data_packet;
315
            sentTimestamp = PrrtPacket_get_data_timestamp(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
316
        } else if (packetType == PACKET_TYPE_REDUNDANCY) {
Andreas Schmidt's avatar
Andreas Schmidt committed
317
            kind = ts_redundancy_packet;
318
            sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
319
320
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
            kind = ts_feedback_packet;
Andreas Schmidt's avatar
Andreas Schmidt committed
321
322
        }
        if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
Andreas Schmidt's avatar
Andreas Schmidt committed
323
324
            sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;

325
326
            XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
            XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp);
Andreas Schmidt's avatar
Andreas Schmidt committed
327
328
329
330
331
332

            XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph1);
            XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph2);
            XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph3);

            XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart);
333

Andreas Schmidt's avatar
Andreas Schmidt committed
334
            if (packetType == PACKET_TYPE_DATA) {
335
                handle_data_packet(sock_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
336
337
338
339
340
            } else if (packetType == PACKET_TYPE_REDUNDANCY) {
                handle_redundancy_packet(sock_ptr, packet);
            } else {
                goto error;
            }
341
            send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
Andreas Schmidt's avatar
Andreas Schmidt committed
342
            XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
343
344
345
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
            handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp);
            PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
346
347
348
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
349
        }
350

Andreas Schmidt's avatar
Andreas Schmidt committed
351
        debug(DEBUG_DATARECEIVER, "Cleanup");
352
        PrrtSocket_cleanup(sock_ptr);
Andreas Schmidt's avatar
Andreas Schmidt committed
353
        debug(DEBUG_DATARECEIVER, "Cleaned");
354
355
356
    }

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
357
    PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
Andreas Schmidt's avatar
Andreas Schmidt committed
358
}