socket.c 20 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3
4
5
6
7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8
9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
13
14
#include "../defines.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
15
16
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
17
#include "stores/deliveredPacketTable.h"
18
#include "types/packetTimeout.h"
19
#include "socket.h"
20

21
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
22
    prrtPacketLength_t len = 0;
23
24
25
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
26

27
28
29
30
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
31

32
33
        memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));

34
35
36
37
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    return len;
}

struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
    struct timespec now;
    clock_gettime(CLOCK_REALTIME, &now);

    struct timespec deadline;
    prrtTimedelta_t diff_s = wait_time / 1000000;
    prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
    deadline.tv_sec = diff_s + now.tv_sec;
    deadline.tv_nsec = diff_ns + now.tv_nsec;
    return deadline;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
53
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us) {
54
    assert(sizeof(float) == 4);
55
56
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
57

58

59
    s->isHardwareTimestamping = false;
60
    s->interfaceName = NULL;
61

62
63
    s->isThreadPinning = false;

64
    PrrtClock_init(&s->clock);
65

66
    s->isBound = false;
67
    s->receiver = NULL;
68

69
70
71
    uint8_t n_cycle[1] = { N_START - K_START };
    s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
72

73
74
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
75
    s->sequenceNumberFeedback = 1;
76

77

78
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
79
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
80

81
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
82

83
    s->dataPacketStore = PrrtDataPacketStore_create();
84

85
86
    int enabled = 1;

Andreas Schmidt's avatar
Andreas Schmidt committed
87
88
    check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
89
          "Socket option set failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
90
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
91
          "Socket option set failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
92
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
93
94
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
95
96
97
    s->sendDataQueue = Pipe_create();
    s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
    s->repairBlockStore = PrrtRepairBlockStore_create();
98

Andreas Schmidt's avatar
Andreas Schmidt committed
99
    s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
100

Andreas Schmidt's avatar
Andreas Schmidt committed
101
102
    s->dataReceptionTable = PrrtReceptionTable_create();
    s->redundancyReceptionTable = PrrtReceptionTable_create();
103

104
    return s;
105
106

    error:
107
    PrrtSocket_close(s);
108
    return NULL;
109
110
}

111
112
113
114
115
116
117
118
119
120
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

121
122
123
124
125
126
127
128
129
130
131
132
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
133
134
135
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
136
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
137
138
139
140
141
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port));
Andreas Schmidt's avatar
Andreas Schmidt committed
142
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
143

144
145
146
147
148
149
150
151
152
153
154
155
156
157
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
Andreas Schmidt's avatar
Andreas Schmidt committed
158
        if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
159
160
161
162
163
164
165
166
167
168
169
170
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
171

Andreas Schmidt's avatar
Andreas Schmidt committed
172
    check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
173
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
174

175
    if(s->isHardwareTimestamping) {
Andreas Schmidt's avatar
Andreas Schmidt committed
176
177
178
179
180
181
182
183
        int enabled = 1;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
        socklen_t val, len;
        len = sizeof(val);
        check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));

        int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
184
    }
Stefan Reif's avatar
Stefan Reif committed
185

Andreas Schmidt's avatar
Andreas Schmidt committed
186
187
188
189
    s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->sendDataThreadAttr);
    s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->receiveDataThreadAttr);
190

Andreas Schmidt's avatar
Andreas Schmidt committed
191
192
193
194

    if(s->isThreadPinning) {
        pin_thread_to_core(s->receiveDataThreadAttr, 2);
        pin_thread_to_core(s->sendDataThreadAttr, 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
195
196
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
197
198
199
200
201
202
    check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
                         (void *) s) == EXIT_SUCCESS, "Cannot create send thread.");

    check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
                         (void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");

203
204
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
205
206
    return true;
    error:
207
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
208
209
210
    return false;
}

211
int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
212
213
214
215
    if(s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
    }
    s->receiver = PrrtReceiver_create(host, port);
216
217
218
    return 0;
}

219
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
220
221
222
223
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
224
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
225
226
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
227
228
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
229

230
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
231
    Pipe_push(s->sendDataQueue, &packet->asListNode);
232
    PrrtReceiver_on_application_write(s->receiver);
233

234
235
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
236
    return 0;
237
238
}

239
240
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
241
242
}

243
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
244
    PrrtPacket *packet;
245
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
246
    return deliver_packet(s, buf_ptr, packet, addr);
247
248
}

249
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
250
251
    PrrtPacket *packet;
    do {
252
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
253
254
255
256
257
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

258
    return deliver_packet(s, buf_ptr, packet, addr);
259
260
}

261
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
262
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
263
264
265
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }
266

267
    return deliver_packet(s, buf_ptr, packet, addr);
Andreas Schmidt's avatar
Andreas Schmidt committed
268
269
}

270
271
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
272
273
}

274
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
275
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
276
277
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
                                                            now + time_window_us);
278
    return deliver_packet(s, buf_ptr, packet, addr);
279
280
}

281
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
282
283
284
    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
285
286
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now - time_window_us,
                                                              now + time_window_us);
287
288
289
290
291
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

292
    return deliver_packet(s, buf_ptr, packet, addr);
293
294
}

295
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
296
297
    prrtTimestamp_t now = PrrtClock_get_current_time_us();

298
299
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                                      now + time_window_us, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
300
301
302
303
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }

304
    return deliver_packet(s, buf_ptr, packet, addr);
305
306
}

307

308
309
int PrrtSocket_interrupt(PrrtSocket *s) {
    atomic_store_explicit(&s->closing, true, memory_order_release);
310

311
312
313
    if (s->packetDeliveryStore) {
        PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
    }
Stefan Reif's avatar
Stefan Reif committed
314

315
316
317
318
    if(s->sendDataQueue != NULL) {
      Pipe_wake(s->sendDataQueue);
    }

319
    void **res = NULL;
320
321
322
323
    if (s->sendDataThread != 0) {
        check(pthread_join(s->sendDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->sendDataThreadAttr);
        s->sendDataThread = 0;
324
325
    }

326
327
328
329
330
    if (s->receiveDataThread != 0) {
        check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(s->receiveDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->receiveDataThreadAttr);
        s->receiveDataThread = 0;
331
332
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
333
    return EXIT_SUCCESS;
334
335

    error:
336
    return EXIT_FAILURE;
337
338
}

339
int PrrtSocket_close(PrrtSocket *s) {
Stefan Reif's avatar
Stefan Reif committed
340
    debug(DEBUG_SOCKET, "Closing socket.");
341
342
    if (!atomic_load_explicit(&s->closing, memory_order_acquire)) {
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
343
344
    }

345
346
347
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
348
349
    }

350
351
352
    if (s->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(s->repairBlockStore);
        s->repairBlockStore = NULL;
353
354
    }

355
356
    if (s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
357
358
    }

359
    if (s->sendDataQueue != NULL) {
360
        Pipe_destroy(s->sendDataQueue);
361
        s->sendDataQueue = NULL;
362
363
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
364
365
366
367
368
369
370
371
372
373
    if (s->dataReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->dataReceptionTable);
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->redundancyReceptionTable);
        s->redundancyReceptionTable = NULL;
    }

374
375
376
    if (s->packetDeliveryStore != NULL) {
        PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
        s->packetDeliveryStore = NULL;
377
378
    }

379
380
381
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
382
    }
383

384
385
386
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
387
388
    }

389
390
    if (s->address != NULL) {
        free(s->address);
Andreas Schmidt's avatar
Andreas Schmidt committed
391
392
    }

393
394
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
395
              "Could not destroy application constraints.")
396
397
    }

398
    if (s->codingParameters) {
399
400
401
402
403
        check(PrrtCodingConfiguration_destroy(s->codingParameters), "Could not destroy coding parameters.")
    }

    if(s->coder) {
        check(PrrtCoder_destroy(s->coder), "Could not destroy coder.")
404
405
    }

406
407
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
408
409
    }

410
411
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
412
413
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
414
    close(s->socketFd);
Stefan Reif's avatar
Stefan Reif committed
415
    debug(DEBUG_SOCKET, "Socket closed.");
416
    return 0;
417
418

    error:
419
420
    PERROR("Closing socket failed.%s", "");
    return -1;
421
422
}

423
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
424
    if (strcmp(name, "targetdelay") == 0) {
425
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
426
427
428
429
430
431
    } else {
        PERROR("Unknwon property %s", name);
        return 0;
    }
}

432
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
433
    if (strcmp(name, "app_queue_size")) {
434
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
435
        // TODO: MPSC_Queue does not provide a size.
436
437
438
439
440
441
    } else {
        return false;
    }

    return true;
}
442

rna's avatar
rna committed
443
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
444
445
446
447
448
    if(s->codingParameters != NULL) {
        PrrtCodingConfiguration_destroy(s->codingParameters);
    }
    s->codingParameters = PrrtCodingConfiguration_create(k, n, c, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
449
450
    return true;
}
451

452
453
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return s->codingParameters;
454
455
}

456

457
bool PrrtSocket_cleanup(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup");
    if (s->packetTimeoutTable != NULL) {
        List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
                                                                      PrrtClock_get_prrt_time_us(
                                                                              &s->clock));
        uint32_t expired_count = List_count(expired_packets);
        debug(DEBUG_CLEANUP, "EXPIRED");
        if (expired_count > 0) {
            PrrtPacketTimeout *first = List_first(expired_packets);
            prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                   first->index - SEQNO_SPACE / 2);

            PrrtPacketTimeout *last = List_last(expired_packets);
            prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                  last->index - 1);

            PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);

            PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
                                                    lastSequenceNumberBase);

            List *list = List_create();
            PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
                                             last->sequenceNumber);

            while (List_count(list) > 0) {
                PrrtPacket *packet = (PrrtPacket *) List_shift(list);
                PrrtPacket_destroy(packet);
            }
            List_destroy(list);
488

Andreas Schmidt's avatar
Andreas Schmidt committed
489
490
491
            while (List_count(expired_packets) > 0) {
                PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                PrrtPacketTimeout_destroy(packetTimeout);
492
            }
493
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
494
495
496
497
498
499
500
        List_destroy(expired_packets);
    }
    debug(DEBUG_CLEANUP, "Expire block range.");
    if(s->deliveredPacketTable != NULL) {
        prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
        PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
                                                (prrtSequenceNumber_t) (current_start - 1));
501
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
502
503
504
505

    debug(DEBUG_CLEANUP, "Loss stats.");
    s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup done");
506
507
    return true;
}
508

509
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
510
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
511
}
512

Andreas Schmidt's avatar
Andreas Schmidt committed
513
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s) {
514
    return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
515
}
Andreas Schmidt's avatar
Andreas Schmidt committed
516

Andreas Schmidt's avatar
Andreas Schmidt committed
517
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s) {
518
519
520
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
521
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
522
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
523
}
524

Andreas Schmidt's avatar
Andreas Schmidt committed
525
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
526
527
528
    return PrrtChannelStateInformation_get_btlbw(s->receiver->csi);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
529
530
531
532
533
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s) {
    return 0;
}


534
535
536
bool PrrtSocket_get_app_limited(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_app_limited(s->receiver->csi);
}
537
538
539
540
541
542
543
544
545
546

PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
    if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
        PrrtCoder_get_n(s->coder) != codingParams->n) {
        if (s->coder != NULL) {
            PrrtCoder_destroy(s->coder);
        }
        s->coder = PrrtCoder_create(codingParams);
    }
    return PrrtCoder_copy(s->coder);
547
548
549
550
551
552
553
554
555
};

char *PrrtSocket_inet_ntoa(struct in_addr* in) {
    return inet_ntoa(*in);
}

uint16_t PrrtSocket_ntohs(uint16_t v) {
    return ntohs(v);
}