socket.c 21.7 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3
4
5
6
7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8
9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
13
14
#include "../defines.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
15
16
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
17
#include "processes/feedbackReceiver.h"
18
#include "stores/deliveredPacketTable.h"
19
#include "types/packetTimeout.h"
20
#include "socket.h"
21

22
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
23
    prrtPacketLength_t len = 0;
24
25
26
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
27

28
29
30
31
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
32

33
34
35
36
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
    return len;
}

struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
    struct timespec now;
    clock_gettime(CLOCK_REALTIME, &now);

    struct timespec deadline;
    prrtTimedelta_t diff_s = wait_time / 1000000;
    prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
    deadline.tv_sec = diff_s + now.tv_sec;
    deadline.tv_nsec = diff_ns + now.tv_nsec;
    return deadline;
}

52
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
53
    assert(sizeof(float) == 4);
54
55
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
56

57

58
59
    s->isSender = is_sender;
    s->isHardwareTimestamping = false;
60
    s->interfaceName = NULL;
61

62
63
    s->isThreadPinning = false;

64
    PrrtClock_init(&s->clock);
65

66
    s->isBound = false;
67
    s->receiver = NULL;
68

69
    s->codingParameters = PrrtCodingParams_create();
70

71
72
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
73
    s->sequenceNumberFeedback = 1;
74

75

76
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
77
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
78

79
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
80

81
    s->dataPacketStore = PrrtDataPacketStore_create();
82

83
84
    int enabled = 1;

85
86
    check(s->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
87
          "Socket option set failed.");
88
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
89
          "Socket option set failed.");
90
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
91
          "Socket option set failed.");
92
93
    check(s->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
94
          "Socket option set failed.");
95
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
96
          "Socket option set failed.");
97
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
98
99
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
100
    if (is_sender) {
101
        s->sendDataQueue = Pipe_create();
102
    } else {
103
        s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
104
        s->repairBlockStore = PrrtRepairBlockStore_create();
105

106
        s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
107
108
109

        s->dataReceptionTable = PrrtReceptionTable_create();
        s->redundancyReceptionTable = PrrtReceptionTable_create();
110
111
    }

112
    return s;
113
114

    error:
115
    PrrtSocket_close(s);
116
    return NULL;
117
118
}

119
120
121
122
123
124
125
126
127
128
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

129
130
131
132
133
134
135
136
137
138
139
140
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
141
142
143
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
144
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
145
146
147
148
149
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
150
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
151

152
    check(bind(s->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
153
154
155
156
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
        if(ioctl(s->dataSocketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
184

185
186
    check(bind(s->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
187

188
189
190
191
192
193
194
195
196
197
198
199
    if(s->isHardwareTimestamping) {
        if(!s->isSender) {
            int enabled = 1;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
            socklen_t val, len;
            len = sizeof(val);
            check(getsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
        } else {
            int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
        }
    }
Stefan Reif's avatar
Stefan Reif committed
200

201
202
203
204
205
    if (s->isSender) {
        s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveFeedbackThreadAttr);
        s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->sendDataThreadAttr);
206
207
208
209
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
            pin_thread_to_core(s->sendDataThreadAttr, 2);
        }
210

211
        check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
Andreas Schmidt's avatar
Andreas Schmidt committed
212
                             receive_feedback_loop,
213
                             (void *) s) ==
Andreas Schmidt's avatar
Andreas Schmidt committed
214
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
215
216
        check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
                             (void *) s) ==
217
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
218
219
              "Cannot create send thread.");
    } else {
220
221
        s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
        pthread_attr_init(s->receiveDataThreadAttr);
222
223
224
        if(s->isThreadPinning) {
            pin_thread_to_core(s->receiveDataThreadAttr, 3);
        }
225
226
        check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
                             (void *) s) ==
227
              EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
228
229
230
              "Cannot create data receiving thread.");
    }

231
232
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
233
234
    return true;
    error:
235
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
236
237
238
    return false;
}

239
int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
240
241
242
243
    if(s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
    }
    s->receiver = PrrtReceiver_create(host, port);
244
245
246
    return 0;
}

247
248
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    check(s->isSender, "Cannot send on receiver socket.")
249
250
251
252
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
253
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
254
255
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
256
257
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
258

259
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
260
    Pipe_push(s->sendDataQueue, &packet->asListNode);
261
    PrrtReceiver_on_application_write(s->receiver);
262

263
264
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
265
    return 0;
266
    error:
267
268
    PERROR("There was a failure while sending from socket.%s", "");
    return -1;
269
270
}

271
272
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
273
274
}

275
276
277
278
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
279
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
280
    return deliver_packet(s, buf_ptr, packet);
281
282
283
284
285
286
287
288
289
290
291

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
292
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
293
294
295
296
297
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

298
    return deliver_packet(s, buf_ptr, packet);
299
300
301
302
303
304
305

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
306
    check(s->isSender == false, "Cannot receive on sender socket.")
307

308
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
309
310
311
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }
312

313
    return deliver_packet(s, buf_ptr, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
314
315
316
317
318
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

319
320
321
322
323
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr);
}

int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
324
    check(s->isSender == false, "Cannot receive on sender socket.")
325
326

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
327
328
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
                                                            now + time_window_us);
329
    return deliver_packet(s, buf_ptr, packet);
330
331
332
333
334
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

335
336
337
338
339
340
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
341
342
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now - time_window_us,
                                                              now + time_window_us);
343
344
345
346
347
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);

348
    return deliver_packet(s, buf_ptr, packet);
349
350
351

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
352
    return -1;
353
354
355
356
357
358
359
}

int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    prrtTimestamp_t now = PrrtClock_get_current_time_us();

360
361
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
                                                                      now + time_window_us, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
362
363
364
365
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }

366
    return deliver_packet(s, buf_ptr, packet);
367
    error:
368
369
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
370
371
}

372

373
374
int PrrtSocket_interrupt(PrrtSocket *s) {
    atomic_store_explicit(&s->closing, true, memory_order_release);
375

376
377
378
    if (s->packetDeliveryStore) {
        PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
    }
Stefan Reif's avatar
Stefan Reif committed
379

380
381
382
    if(s->receiver != NULL) {
        PrrtReceiver_interrupt(s->receiver);
    }
Stefan Reif's avatar
Stefan Reif committed
383

384
385
386
387
    if(s->sendDataQueue != NULL) {
      Pipe_wake(s->sendDataQueue);
    }

388
    void **res = NULL;
389
390
391
392
    if (s->sendDataThread != 0) {
        check(pthread_join(s->sendDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->sendDataThreadAttr);
        s->sendDataThread = 0;
393
394
    }

395
396
397
398
399
    if (s->receiveDataThread != 0) {
        check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(s->receiveDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->receiveDataThreadAttr);
        s->receiveDataThread = 0;
400
401
    }

402
403
    if (s->receiveFeedbackThread != 0) {
        check(pthread_join(s->receiveFeedbackThread, res) == 0, "Join failed.");
404

405
406
        pthread_attr_destroy(s->receiveFeedbackThreadAttr);
        s->receiveFeedbackThread = 0;
407
    }
408

Andreas Schmidt's avatar
Andreas Schmidt committed
409
    return EXIT_SUCCESS;
410
411

    error:
412
    return EXIT_FAILURE;
413
414
}

415
int PrrtSocket_close(PrrtSocket *s) {
Stefan Reif's avatar
Stefan Reif committed
416
    debug(DEBUG_SOCKET, "Closing socket.");
417
418
    if (!atomic_load_explicit(&s->closing, memory_order_acquire)) {
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
419
420
    }

421
422
423
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
424
425
    }

426
427
428
    if (s->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(s->repairBlockStore);
        s->repairBlockStore = NULL;
429
430
    }

431
432
    if (s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
433
434
    }

435
    if (s->sendDataQueue != NULL) {
436
        Pipe_destroy(s->sendDataQueue);
437
        s->sendDataQueue = NULL;
438
439
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
440
441
442
443
444
445
446
447
448
449
    if (s->dataReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->dataReceptionTable);
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->redundancyReceptionTable);
        s->redundancyReceptionTable = NULL;
    }

450
451
452
    if (s->packetDeliveryStore != NULL) {
        PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
        s->packetDeliveryStore = NULL;
453
454
    }

455
456
457
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
458
    }
459

460
461
462
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
463
464
    }

465
466
    if (s->address != NULL) {
        free(s->address);
Andreas Schmidt's avatar
Andreas Schmidt committed
467
468
    }

469
470
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
471
              "Could not destroy application constraints.")
472
473
    }

474
475
    if (s->codingParameters) {
        check(PrrtCodingParams_destroy(s->codingParameters), "Could not destroy coding parameters.")
476
477
    }

478
479
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
480
481
    }

482
483
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
484
485
    }

486
487
    if (s->receiveFeedbackThreadAttr != NULL) {
        free(s->receiveFeedbackThreadAttr);
488
489
490
    }


491
492
    close(s->dataSocketFd);
    close(s->feedbackSocketFd);
Stefan Reif's avatar
Stefan Reif committed
493
    debug(DEBUG_SOCKET, "Socket closed.");
494
    return 0;
495
496

    error:
497
498
    PERROR("Closing socket failed.%s", "");
    return -1;
499
500
}

501
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
502
    if (strcmp(name, "targetdelay") == 0) {
503
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
504
505
506
507
508
509
    } else {
        PERROR("Unknwon property %s", name);
        return 0;
    }
}

510
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
511
    if (strcmp(name, "app_queue_size")) {
512
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
513
        // TODO: MPSC_Queue does not provide a size.
514
515
516
517
518
519
    } else {
        return false;
    }

    return true;
}
520

rna's avatar
rna committed
521
522
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
    PrrtCodingParams_update(s->codingParameters, k, n, c, n_cycle);
523
524
    return true;
}
525

526
527
528
529
PrrtCodingParams *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return PrrtCodingParams_copy(s->codingParameters);
}

530

531
532
bool PrrtSocket_cleanup(PrrtSocket *s) {
    if (s->isSender) {
533
534

    } else {
535
536
        if (s->packetTimeoutTable != NULL) {
            List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
537
                                                                          PrrtClock_get_prrt_time_us(
538
                                                                                  &s->clock));
539
            uint32_t expired_count = List_count(expired_packets);
Andreas Schmidt's avatar
Andreas Schmidt committed
540
            if (expired_count > 0) {
541
                PrrtPacketTimeout *first = List_first(expired_packets);
542
543
544
                prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                       first->index - SEQNO_SPACE / 2);

545
                PrrtPacketTimeout *last = List_last(expired_packets);
546
547
                prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                      last->index - 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
548

549
                PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);
550

551
                PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
Andreas Schmidt's avatar
Andreas Schmidt committed
552
                                                        lastSequenceNumberBase);
553
554

                List *list = List_create();
555
                PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
556
                                                 last->sequenceNumber);
557

Andreas Schmidt's avatar
Andreas Schmidt committed
558
559
                while (List_count(list) > 0) {
                    PrrtPacket *packet = (PrrtPacket *) List_shift(list);
560
561
562
563
                    PrrtPacket_destroy(packet);
                }
                List_destroy(list);

Andreas Schmidt's avatar
Andreas Schmidt committed
564
                while (List_count(expired_packets) > 0) {
565
566
                    PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                    PrrtPacketTimeout_destroy(packetTimeout);
567
568
569
570
                }
            }
            List_destroy(expired_packets);
        }
571
572
        if(s->deliveredPacketTable != NULL) {
            prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
573
            PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
574
575
                                                    (prrtSequenceNumber_t) (current_start - 1));
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
576
577

        s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
578
579
580
    }
    return true;
}
581

582
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
583
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
584
}
585

586
587
uint32_t PrrtSocket_get_rtprop(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
588
}
Andreas Schmidt's avatar
Andreas Schmidt committed
589

590
591
592
593
594
595
prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
596
}
597
598
599
600

bool PrrtSocket_get_app_limited(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_app_limited(s->receiver->csi);
}