socket.c 21.2 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3
4
5
6
7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8
9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
13
14
15
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
16
17
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
18
#include "processes/feedbackReceiver.h"
19
#include "stores/deliveredPacketTable.h"
20
#include "types/packetTimeout.h"
21
#include "socket.h"
22

23
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
24
    prrtPacketLength_t len = 0;
25
26
27
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
28

29
30
31
32
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
33

34
35
36
37
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    return len;
}

struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
    struct timespec now;
    clock_gettime(CLOCK_REALTIME, &now);

    struct timespec deadline;
    prrtTimedelta_t diff_s = wait_time / 1000000;
    prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
    deadline.tv_sec = diff_s + now.tv_sec;
    deadline.tv_nsec = diff_ns + now.tv_nsec;
    return deadline;
}

53
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
54
    assert(sizeof(float) == 4);
55
56
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
57

58

59
60
    s->isSender = is_sender;
    s->isHardwareTimestamping = false;
61
    s->interfaceName = NULL;
62

63
64
    s->isThreadPinning = false;

65
    PrrtClock_init(&s->clock);
66

67
    s->isBound = false;
68
    s->receiver = NULL;
69

70
    s->codingParameters = PrrtCodingParams_create();
71

72
73
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
74
    s->sequenceNumberFeedback = 1;
75

76

77
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
78
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
79

80
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
81

82
    s->dataPacketStore = PrrtDataPacketStore_create();
83

84
85
    int enabled = 1;

86
87
    check(s->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
88
          "Socket option set failed.");
89
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
90
          "Socket option set failed.");
91
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
92
          "Socket option set failed.");
93
94
    check(s->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
95
          "Socket option set failed.");
96
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
97
          "Socket option set failed.");
98
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
99
100
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
101
    if (is_sender) {
Andreas Schmidt's avatar
Andreas Schmidt committed
102
        s->sendDataQueue = MPSCQueue_create();
103
    } else {
104
        s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
105
        s->repairBlockStore = PrrtRepairBlockStore_create();
106

107
        s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
108
109
110

        s->dataReceptionTable = PrrtReceptionTable_create();
        s->redundancyReceptionTable = PrrtReceptionTable_create();
111
112
    }

113
    return s;
114
115

    error:
116
    PrrtSocket_close(s);
117
    return NULL;
118
119
}

120
121
122
123
124
125
126
127
128
129
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

130
131
132
133
134
135
136
137
138
139
140
141
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
142
143
144
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
145
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
146
147
148
149
150
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
151
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
152

153
    check(bind(s->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
154
155
156
157
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
        if(ioctl(s->dataSocketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
185

186
187
    check(bind(s->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
188

189
190
191
192
193
194
195
196
197
198
199
200
    if(s->isHardwareTimestamping) {
        if(!s->isSender) {
            int enabled = 1;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
            socklen_t val, len;
            len = sizeof(val);
            check(getsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
        } else {
            int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
        }
    }
Stefan Reif's avatar
Stefan Reif committed
201

202
203
204
205
206
    if (s->isSender) {
        s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveFeedbackThreadAttr);
        s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->sendDataThreadAttr);
207
208
209
210
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
            pin_thread_to_core(s->sendDataThreadAttr, 2);
        }
211

212
        check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
Andreas Schmidt's avatar
Andreas Schmidt committed
213
                             receive_feedback_loop,
214
                             (void *) s) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
215
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
216
217
        check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
                             (void *) s) ==
218
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
219
220
              "Cannot create send thread.");
    } else {
221
222
        s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveDataThreadAttr);
223
224
225
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveDataThreadAttr, 3);
        }
226
227
        check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
                             (void *) s) ==
228
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
229
230
231
              "Cannot create data receiving thread.");
    }

232
233
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
234
235
    return true;
    error:
236
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
237
238
239
    return false;
}

240
int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
241
242
243
244
    if(s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
    }
    s->receiver = PrrtReceiver_create(host, port);
245
246
247
    return 0;
}

248
249
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    check(s->isSender, "Cannot send on receiver socket.")
250
251
252
253
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
254
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
255
256
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
257
258
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
259

260
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Andreas Schmidt's avatar
Andreas Schmidt committed
261
    MPSCQueue_push(s->sendDataQueue, &packet->asListNode);
262

263
264
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
265
    return 0;
266
    error:
267
268
    PERROR("There was a failure while sending from socket.%s", "");
    return -1;
269
270
}

271
272
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
273
274
}

275
276
277
278
279
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
    prrtPacketLength_t len = 0;
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
280
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
281
    return deliver_packet(s, buf_ptr, packet);
282
283
284
285
286
287
288
289
290
291
292

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
293
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
294
295
296
297
298
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

299
    return deliver_packet(s, buf_ptr, packet);
300
301
302
303
304
305
306

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
307
    check(s->isSender == false, "Cannot receive on sender socket.")
308

309
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
310

311
    return deliver_packet(s, buf_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
312
313
314
315
316
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

317
318
319
320
321
322
323
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr);
}

int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    prrtPacketLength_t len = 0;

324
    check(s->isSender == false, "Cannot receive on sender socket.")
325
326

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
327
328
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
                                                            now + time_window_us);
329
    return deliver_packet(s, buf_ptr, packet);
330
331
332
333
334
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

335
336
337
338
339
340
341
342
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
        struct timespec deadline = abstime_from_now(time_window_us);

343
344
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                              now + time_window_us, &deadline);
345
346
347
348
349
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

350
    return deliver_packet(s, buf_ptr, packet);
351
352
353

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
354
    return -1;
355
356
357
358
359
360
361
}

int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    prrtTimestamp_t now = PrrtClock_get_current_time_us();

362
363
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                                      now + time_window_us, deadline);
364
    return deliver_packet(s, buf_ptr, packet);
365
    error:
366
367
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
368
369
}

370

371
372
int PrrtSocket_interrupt(PrrtSocket *s) {
    atomic_store_explicit(&s->closing, true, memory_order_release);
373

374
375
    if (s->packetDeliveryStore)
        PrrtPacketDeliveryStore_wake(s->packetDeliveryStore);
Stefan Reif's avatar
Stefan Reif committed
376
377


378
    void **res = NULL;
379
380
381
382
    if (s->sendDataThread != 0) {
        check(pthread_join(s->sendDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->sendDataThreadAttr);
        s->sendDataThread = 0;
383
384
    }

385
386
387
388
389
    if (s->receiveDataThread != 0) {
        check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(s->receiveDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->receiveDataThreadAttr);
        s->receiveDataThread = 0;
390
391
    }

392
393
    if (s->receiveFeedbackThread != 0) {
        check(pthread_join(s->receiveFeedbackThread, res) == 0, "Join failed.");
394

395
396
        pthread_attr_destroy(s->receiveFeedbackThreadAttr);
        s->receiveFeedbackThread = 0;
397
    }
398

Andreas Schmidt's avatar
Andreas Schmidt committed
399
    return EXIT_SUCCESS;
400
401

    error:
402
    return EXIT_FAILURE;
403
404
}

405
int PrrtSocket_close(PrrtSocket *s) {
Stefan Reif's avatar
Stefan Reif committed
406
    debug(DEBUG_SOCKET, "Closing socket.");
407
408
    if (!atomic_load_explicit(&s->closing, memory_order_acquire)) {
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
409
410
    }

411
412
413
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
414
415
    }

416
417
418
    if (s->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(s->repairBlockStore);
        s->repairBlockStore = NULL;
419
420
    }

421
422
    if (s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
423
424
    }

425
    if (s->sendDataQueue != NULL) {
Andreas Schmidt's avatar
Andreas Schmidt committed
426
        MPSCQueue_destroy(s->sendDataQueue);
427
        s->sendDataQueue = NULL;
428
429
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
430
431
432
433
434
435
436
437
438
439
    if (s->dataReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->dataReceptionTable);
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->redundancyReceptionTable);
        s->redundancyReceptionTable = NULL;
    }

440
441
442
    if (s->packetDeliveryStore != NULL) {
        PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
        s->packetDeliveryStore = NULL;
443
444
    }

445
446
447
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
448
    }
449

450
451
452
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
453
454
    }

455
456
    if (s->address != NULL) {
        free(s->address);
Andreas Schmidt's avatar
Andreas Schmidt committed
457
458
    }

459
460
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
461
              "Could not destroy application constraints.")
462
463
    }

464
465
    if (s->codingParameters) {
        check(PrrtCodingParams_destroy(s->codingParameters), "Could not destroy coding parameters.")
466
467
    }

468
469
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
470
471
    }

472
473
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
474
475
    }

476
477
    if (s->receiveFeedbackThreadAttr != NULL) {
        free(s->receiveFeedbackThreadAttr);
478
479
480
    }


481
482
    close(s->dataSocketFd);
    close(s->feedbackSocketFd);
Stefan Reif's avatar
Stefan Reif committed
483
    debug(DEBUG_SOCKET, "Socket closed.");
484
    return 0;
485
486

    error:
487
488
    PERROR("Closing socket failed.%s", "");
    return -1;
489
490
}

491
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
492
    if (strcmp(name, "targetdelay") == 0) {
493
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
494
495
496
497
498
499
    } else {
        PERROR("Unknwon property %s", name);
        return 0;
    }
}

500
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
501
    if (strcmp(name, "app_queue_size")) {
502
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
503
        // TODO: MPSC_Queue does not provide a size.
504
505
506
507
508
509
    } else {
        return false;
    }

    return true;
}
510

511
512
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n) {
    PrrtCodingParams_update(s->codingParameters, k, n);
513
514
    return true;
}
515

516
517
518
519
PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return PrrtCodingParams_copy(s->codingParameters);
}

520

521
522
bool PrrtSocket_cleanup(PrrtSocket *s) {
    if (s->isSender) {
523
524

    } else {
525
526
        if (s->packetTimeoutTable != NULL) {
            List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
527
                                                                          PrrtClock_get_prrt_time_us(
528
                                                                                  &s->clock));
529
            uint32_t expired_count = List_count(expired_packets);
Andreas Schmidt's avatar
Andreas Schmidt committed
530
            if (expired_count > 0) {
531
                PrrtPacketTimeout *first = List_first(expired_packets);
532
533
534
                prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                       first->index - SEQNO_SPACE / 2);

535
                PrrtPacketTimeout *last = List_last(expired_packets);
536
537
                prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                      last->index - 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
538

539
                PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);
540

541
                PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
Andreas Schmidt's avatar
Andreas Schmidt committed
542
                                                        lastSequenceNumberBase);
543
544

                List *list = List_create();
545
                PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
546
                                                 last->sequenceNumber);
547

Andreas Schmidt's avatar
Andreas Schmidt committed
548
549
                while (List_count(list) > 0) {
                    PrrtPacket *packet = (PrrtPacket *) List_shift(list);
550
551
552
553
                    PrrtPacket_destroy(packet);
                }
                List_destroy(list);

Andreas Schmidt's avatar
Andreas Schmidt committed
554
                while (List_count(expired_packets) > 0) {
555
556
                    PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                    PrrtPacketTimeout_destroy(packetTimeout);
557
558
559
560
                }
            }
            List_destroy(expired_packets);
        }
561
562
        if(s->deliveredPacketTable != NULL) {
            prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
563
            PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
564
565
                                                    (prrtSequenceNumber_t) (current_start - 1));
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
566
567

        s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
568
569
570
    }
    return true;
}
571

572
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
573
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
574
}
575

576
577
uint32_t PrrtSocket_get_rtprop(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
578
}
Andreas Schmidt's avatar
Andreas Schmidt committed
579
580
581
582

prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *socket) {
    return PrrtChannelStateInformation_get_plr(socket->receiver->csi);
}