socket.c 21.2 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3
4
5
6
7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8
9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
13
14
15
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
16
17
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
18
#include "processes/feedbackReceiver.h"
19
#include "stores/deliveredPacketTable.h"
20
#include "types/packetTimeout.h"
21
#include "socket.h"
22

23
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
24
    prrtPacketLength_t len = 0;
25
26
27
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
28

29
30
31
32
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
33

34
35
36
37
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    return len;
}

struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
    struct timespec now;
    clock_gettime(CLOCK_REALTIME, &now);

    struct timespec deadline;
    prrtTimedelta_t diff_s = wait_time / 1000000;
    prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
    deadline.tv_sec = diff_s + now.tv_sec;
    deadline.tv_nsec = diff_ns + now.tv_nsec;
    return deadline;
}

53
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
54
    assert(sizeof(float) == 4);
55
56
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
57

58

59
60
    s->isSender = is_sender;
    s->isHardwareTimestamping = false;
61
    s->interfaceName = NULL;
62

63
64
    s->isThreadPinning = false;

65
    PrrtClock_init(&s->clock);
66

67
    s->isBound = false;
68
    s->receiver = NULL;
69

70
    s->codingParameters = PrrtCodingParams_create();
71

72
73
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
74

75

76
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
77
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
78

79
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
80

81
    s->dataPacketStore = PrrtDataPacketStore_create();
82

83
84
    int enabled = 1;

85
86
    check(s->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
87
          "Socket option set failed.");
88
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
89
          "Socket option set failed.");
90
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
91
          "Socket option set failed.");
92
93
    check(s->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
94
          "Socket option set failed.");
95
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
96
          "Socket option set failed.");
97
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
98
99
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
100
    if (is_sender) {
Andreas Schmidt's avatar
Andreas Schmidt committed
101
        s->sendDataQueue = MPSCQueue_create();
102
    } else {
103
        s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
104
        s->repairBlockStore = PrrtRepairBlockStore_create();
105

106
        s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
107
108
109

        s->dataReceptionTable = PrrtReceptionTable_create();
        s->redundancyReceptionTable = PrrtReceptionTable_create();
110
111
    }

112
    return s;
113
114

    error:
115
    PrrtSocket_close(s);
116
    return NULL;
117
118
}

119
120
121
122
123
124
125
126
127
128
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

129
130
131
132
133
134
135
136
137
138
139
140
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
141
142
143
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
144
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
145
146
147
148
149
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
150
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
151

152
    check(bind(s->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
153
154
155
156
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
        if(ioctl(s->dataSocketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
184

185
186
    check(bind(s->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
187

188
189
190
191
192
193
194
195
196
197
198
199
    if(s->isHardwareTimestamping) {
        if(!s->isSender) {
            int enabled = 1;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
            socklen_t val, len;
            len = sizeof(val);
            check(getsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
        } else {
            int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
        }
    }
Stefan Reif's avatar
Stefan Reif committed
200

201
202
203
204
205
    if (s->isSender) {
        s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveFeedbackThreadAttr);
        s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->sendDataThreadAttr);
206
207
208
209
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
            pin_thread_to_core(s->sendDataThreadAttr, 2);
        }
210

211
        check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
Andreas Schmidt's avatar
Andreas Schmidt committed
212
                             receive_feedback_loop,
213
                             (void *) s) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
214
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
215
216
        check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
                             (void *) s) ==
217
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
218
219
              "Cannot create send thread.");
    } else {
220
221
        s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveDataThreadAttr);
222
223
224
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveDataThreadAttr, 3);
        }
225
226
        check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
                             (void *) s) ==
227
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
228
229
230
              "Cannot create data receiving thread.");
    }

231
232
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
233
234
    return true;
    error:
235
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
236
237
238
    return false;
}

239
int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
240
241
242
243
    if(s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
    }
    s->receiver = PrrtReceiver_create(host, port);
244
245
246
    return 0;
}

247
248
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    check(s->isSender, "Cannot send on receiver socket.")
249
250
251
252
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
253
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
254
255
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
256
257
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
258

259
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Andreas Schmidt's avatar
Andreas Schmidt committed
260
    MPSCQueue_push(s->sendDataQueue, &packet->asListNode);
261

262
263
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
264
    return 0;
265
    error:
266
267
    PERROR("There was a failure while sending from socket.%s", "");
    return -1;
268
269
}

270
271
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
272
273
}

274
275
276
277
278
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
    prrtPacketLength_t len = 0;
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
279
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
280
    return deliver_packet(s, buf_ptr, packet);
281
282
283
284
285
286
287
288
289
290
291

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
292
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
293
294
295
296
297
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

298
    return deliver_packet(s, buf_ptr, packet);
299
300
301
302
303
304
305

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
306
    check(s->isSender == false, "Cannot receive on sender socket.")
307

308
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
309

310
    return deliver_packet(s, buf_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
311
312
313
314
315
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

316
317
318
319
320
321
322
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr);
}

int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    prrtPacketLength_t len = 0;

323
    check(s->isSender == false, "Cannot receive on sender socket.")
324
325

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
326
327
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
                                                            now + time_window_us);
328
    return deliver_packet(s, buf_ptr, packet);
329
330
331
332
333
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

334
335
336
337
338
339
340
341
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
        struct timespec deadline = abstime_from_now(time_window_us);

342
343
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                              now + time_window_us, &deadline);
344
345
346
347
348
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

349
    return deliver_packet(s, buf_ptr, packet);
350
351
352

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
353
    return -1;
354
355
356
357
358
359
360
}

int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    prrtTimestamp_t now = PrrtClock_get_current_time_us();

361
362
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                                      now + time_window_us, deadline);
363
    return deliver_packet(s, buf_ptr, packet);
364
    error:
365
366
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
367
368
}

369

370
371
int PrrtSocket_interrupt(PrrtSocket *s) {
    atomic_store_explicit(&s->closing, true, memory_order_release);
372

373
374
    if (s->packetDeliveryStore)
        PrrtPacketDeliveryStore_wake(s->packetDeliveryStore);
Stefan Reif's avatar
Stefan Reif committed
375
376


377
    void **res = NULL;
378
379
380
381
    if (s->sendDataThread != 0) {
        check(pthread_join(s->sendDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->sendDataThreadAttr);
        s->sendDataThread = 0;
382
383
    }

384
385
386
387
388
    if (s->receiveDataThread != 0) {
        check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(s->receiveDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->receiveDataThreadAttr);
        s->receiveDataThread = 0;
389
390
    }

391
392
    if (s->receiveFeedbackThread != 0) {
        check(pthread_join(s->receiveFeedbackThread, res) == 0, "Join failed.");
393

394
395
        pthread_attr_destroy(s->receiveFeedbackThreadAttr);
        s->receiveFeedbackThread = 0;
396
    }
397

Andreas Schmidt's avatar
Andreas Schmidt committed
398
    return EXIT_SUCCESS;
399
400

    error:
401
    return EXIT_FAILURE;
402
403
}

404
int PrrtSocket_close(PrrtSocket *s) {
Stefan Reif's avatar
Stefan Reif committed
405
    debug(DEBUG_SOCKET, "Closing socket.");
406
407
    if (!atomic_load_explicit(&s->closing, memory_order_acquire)) {
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
408
409
    }

410
411
412
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
413
414
    }

415
416
417
    if (s->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(s->repairBlockStore);
        s->repairBlockStore = NULL;
418
419
    }

420
421
    if (s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
422
423
    }

424
    if (s->sendDataQueue != NULL) {
Andreas Schmidt's avatar
Andreas Schmidt committed
425
        MPSCQueue_destroy(s->sendDataQueue);
426
        s->sendDataQueue = NULL;
427
428
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
429
430
431
432
433
434
435
436
437
438
    if (s->dataReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->dataReceptionTable);
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->redundancyReceptionTable);
        s->redundancyReceptionTable = NULL;
    }

439
440
441
    if (s->packetDeliveryStore != NULL) {
        PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
        s->packetDeliveryStore = NULL;
442
443
    }

444
445
446
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
447
    }
448

449
450
451
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
452
453
    }

454
455
    if (s->address != NULL) {
        free(s->address);
Andreas Schmidt's avatar
Andreas Schmidt committed
456
457
    }

458
459
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
460
              "Could not destroy application constraints.")
461
462
    }

463
464
    if (s->codingParameters) {
        check(PrrtCodingParams_destroy(s->codingParameters), "Could not destroy coding parameters.")
465
466
    }

467
468
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
469
470
    }

471
472
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
473
474
    }

475
476
    if (s->receiveFeedbackThreadAttr != NULL) {
        free(s->receiveFeedbackThreadAttr);
477
478
479
    }


480
481
    close(s->dataSocketFd);
    close(s->feedbackSocketFd);
Stefan Reif's avatar
Stefan Reif committed
482
    debug(DEBUG_SOCKET, "Socket closed.");
483
    return 0;
484
485

    error:
486
487
    PERROR("Closing socket failed.%s", "");
    return -1;
488
489
}

490
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
491
    if (strcmp(name, "targetdelay") == 0) {
492
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
493
494
495
496
497
498
    } else {
        PERROR("Unknwon property %s", name);
        return 0;
    }
}

499
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
500
    if (strcmp(name, "app_queue_size")) {
501
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
502
        // TODO: MPSC_Queue does not provide a size.
503
504
505
506
507
508
    } else {
        return false;
    }

    return true;
}
509

510
511
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n) {
    PrrtCodingParams_update(s->codingParameters, k, n);
512
513
    return true;
}
514

515
516
517
518
PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return PrrtCodingParams_copy(s->codingParameters);
}

519

520
521
bool PrrtSocket_cleanup(PrrtSocket *s) {
    if (s->isSender) {
522
523

    } else {
524
525
        if (s->packetTimeoutTable != NULL) {
            List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
526
                                                                          PrrtClock_get_prrt_time_us(
527
                                                                                  &s->clock));
528
            uint32_t expired_count = List_count(expired_packets);
Andreas Schmidt's avatar
Andreas Schmidt committed
529
            if (expired_count > 0) {
530
                PrrtPacketTimeout *first = List_first(expired_packets);
531
532
533
                prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                       first->index - SEQNO_SPACE / 2);

534
                PrrtPacketTimeout *last = List_last(expired_packets);
535
536
                prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                      last->index - 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
537

538
                PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);
539

540
                PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
Andreas Schmidt's avatar
Andreas Schmidt committed
541
                                                        lastSequenceNumberBase);
542
543

                List *list = List_create();
544
                PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
545
                                                 last->sequenceNumber);
546

Andreas Schmidt's avatar
Andreas Schmidt committed
547
548
                while (List_count(list) > 0) {
                    PrrtPacket *packet = (PrrtPacket *) List_shift(list);
549
550
551
552
                    PrrtPacket_destroy(packet);
                }
                List_destroy(list);

Andreas Schmidt's avatar
Andreas Schmidt committed
553
                while (List_count(expired_packets) > 0) {
554
555
                    PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                    PrrtPacketTimeout_destroy(packetTimeout);
556
557
558
559
                }
            }
            List_destroy(expired_packets);
        }
560
561
        if(s->deliveredPacketTable != NULL) {
            prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
562
            PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
563
564
                                                    (prrtSequenceNumber_t) (current_start - 1));
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
565
566

        s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
567
568
569
    }
    return true;
}
570

571
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
572
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
573
}
574

575
576
uint32_t PrrtSocket_get_rtprop(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
577
}
Andreas Schmidt's avatar
Andreas Schmidt committed
578
579
580
581

prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *socket) {
    return PrrtChannelStateInformation_get_plr(socket->receiver->csi);
}