socket.c 23 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3 4 5 6 7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8 9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12 13
#include "../defines.h"
#include "../util/common.h"
14 15
#include "../util/dbg.h"
#include "../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
16 17
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
18
#include "stores/deliveredPacketTable.h"
19
#include "types/packetTimeout.h"
20
#include "socket.h"
21

22
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
23
    size_t timespec_size = sizeof(struct timespec);
24
    prrtPacketLength_t len = 0;
25 26 27
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
28

29 30 31
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
32 33 34 35
        if(s->withTimestamp) {
            memcpy(buffer + len, &packet->channelReceive, timespec_size);
            len += timespec_size;
        }
36

37
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
38

39 40
        memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));

41 42 43 44
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
45 46 47
    return len;
}

rna's avatar
rna committed
48
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us) {
49
    assert(sizeof(float) == 4);
50 51
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
52

Andreas Schmidt's avatar
Andreas Schmidt committed
53
    s->nextSendTime = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
54
    s->pacingEnabled = true;
rna's avatar
rna committed
55
    s->maximum_payload_size = maximum_payload_size;
56
    s->isHardwareTimestamping = false;
57
    s->interfaceName = NULL;
58

59 60
    s->isThreadPinning = false;

61
    PrrtClock_init(&s->clock);
Andreas Schmidt's avatar
Andreas Schmidt committed
62
    s->nextSendTime = PrrtClock_get_current_time_us();
63

64
    s->isBound = false;
65
    s->receiver = NULL;
66

67 68 69
    uint8_t n_cycle[1] = { N_START - K_START };
    s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
70

71 72
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
73
    s->sequenceNumberFeedback = 1;
74

75 76
    s->senderChannelStateInformation = PrrtChannelStateInformation_create();

77

78
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
79
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
80

81
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
82

83
    s->dataPacketStore = PrrtDataPacketStore_create();
84

85 86
    int enabled = 1;

87 88
    check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
89
          "Socket option set failed.");
90
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
91
          "Socket option set failed.");
92
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
93 94
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
95 96 97
    s->sendDataQueue = Pipe_create();
    s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
    s->repairBlockStore = PrrtRepairBlockStore_create();
98

Andreas Schmidt's avatar
Andreas Schmidt committed
99
    s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
100

Andreas Schmidt's avatar
Andreas Schmidt committed
101 102
    s->dataReceptionTable = PrrtReceptionTable_create();
    s->redundancyReceptionTable = PrrtReceptionTable_create();
103

104
    return s;
105 106

    error:
107
    PrrtSocket_close(s);
108
    return NULL;
109 110
}

111 112 113 114 115 116 117 118 119 120
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
121

122 123 124 125 126 127 128 129 130 131 132 133
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
134 135 136
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
137
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
138 139
    check_mem(address);

140 141
    // TODO: Allow DNS names to be passed as ipAddress.

Andreas Schmidt's avatar
Andreas Schmidt committed
142 143 144
    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port));
145
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
146

147 148 149 150 151 152 153 154 155 156 157 158 159 160
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
161
        if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
162 163 164 165 166 167 168 169 170 171 172 173
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
174

175
    check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
176
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
177

178
    if(s->isHardwareTimestamping) {
Andreas Schmidt's avatar
Andreas Schmidt committed
179 180 181 182 183 184 185 186
        int enabled = 1;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
        socklen_t val, len;
        len = sizeof(val);
        check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));

        int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
187
    }
Stefan Reif's avatar
Stefan Reif committed
188

Andreas Schmidt's avatar
Andreas Schmidt committed
189 190
    s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->sendDataThreadAttr);
191 192
    s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->receiveDataThreadAttr);
193

Andreas Schmidt's avatar
Andreas Schmidt committed
194
    if(s->isThreadPinning) {
195 196
        pin_thread_to_core(s->receiveDataThreadAttr, 2);
        pin_thread_to_core(s->sendDataThreadAttr, 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
197
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
198

199
    check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, PrrtDataTransmitter_send_data_loop,
200
                         (void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
Andreas Schmidt's avatar
Andreas Schmidt committed
201 202

    check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
203
                         (void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");
Andreas Schmidt's avatar
Andreas Schmidt committed
204

205 206
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
207 208
    return true;
    error:
209
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
210 211 212
    return false;
}

213
int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
214 215 216
    if(s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
    }
217
    s->receiver = PrrtReceiver_create(host, port, s->maximum_payload_size);
218 219 220
    return 0;
}

221
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync) {
rna's avatar
rna committed
222 223
    if (data_len > s->maximum_payload_size) {
        PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
224 225
        return -1;
    }
226

227
    if (s->receiver == NULL) {
228 229 230 231
        PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n");
        return -1;
    }

232 233 234 235
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
236
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
237 238
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
239 240
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
241

242
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
243 244 245 246 247 248 249
    if (sync) {
        PrrtDataTransmitter_transmit(s, packet);
        PrrtSocket_pace(s);
    } else {
        Pipe_push(s->sendDataQueue, &packet->asListNode);
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
250
    PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
251

252 253
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
254
    return 0;
255 256
}

257 258 259 260 261 262 263 264
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    return sendpacket(s, data, data_len, true);
}

int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    return sendpacket(s, data, data_len, false);
}

265 266
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
267 268
}

Andreas Schmidt's avatar
Andreas Schmidt committed
269
void PrrtSocket_pace(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    if (s->pacingEnabled && s->nextSendTime != 0) {
        debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
        prrtTimeDifference_t diff = 0;
        do {
            prrtTimeDifference_t now = (prrtTimeDifference_t) PrrtClock_get_current_time_us();
            diff = ((prrtTimeDifference_t) s->nextSendTime) - now;
            if (diff > 0) {
                usleep_nano((uint32_t) diff);
            }
        } while (diff > 0);
    } else {
        usleep_nano(1);
    }
}

285
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
286
    PrrtPacket *packet;
287
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
288
    return deliver_packet(s, buf_ptr, packet, addr);
289 290
}

291
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
292 293
    PrrtPacket *packet;
    do {
294
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore);
295 296 297 298
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);
299
    return deliver_packet(s, buf_ptr, packet, addr);
300 301
}

302
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
303
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
304 305 306
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }
307

308
    return deliver_packet(s, buf_ptr, packet, addr);
Andreas Schmidt's avatar
Andreas Schmidt committed
309 310
}

311 312
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
313 314
}

315
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
316
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
317
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now,
318
                                                            now + time_window_us);
319
    return deliver_packet(s, buf_ptr, packet, addr);
320 321
}

322
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
323 324 325
    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
326 327
        struct timespec deadline = abstime_from_now(time_window_us);
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, &deadline);
328 329 330 331
        if (PrrtSocket_closing(s)) {
            return -1;
        }
    } while (!packet);
332
    return deliver_packet(s, buf_ptr, packet, addr);
333 334
}

335
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
336 337
    prrtTimestamp_t now = PrrtClock_get_current_time_us();

338
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, deadline);
Andreas Schmidt's avatar
Andreas Schmidt committed
339 340 341 342
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }

343
    return deliver_packet(s, buf_ptr, packet, addr);
344 345
}

346

347 348
int PrrtSocket_interrupt(PrrtSocket *s) {
    atomic_store_explicit(&s->closing, true, memory_order_release);
349

350
    if (s->packetDeliveryStore != NULL) {
351 352
        PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
    }
Stefan Reif's avatar
Stefan Reif committed
353

354 355 356 357
    if(s->sendDataQueue != NULL) {
      Pipe_wake(s->sendDataQueue);
    }

358
    void **res = NULL;
359 360 361 362
    if (s->sendDataThread != 0) {
        check(pthread_join(s->sendDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->sendDataThreadAttr);
        s->sendDataThread = 0;
363 364
    }

365
    if (s->receiveDataThread != 0) {
366
        pthread_cancel(s->receiveDataThread);
367 368 369
        check(pthread_join(s->receiveDataThread, res) == 0, "Join failed.");
        pthread_attr_destroy(s->receiveDataThreadAttr);
        s->receiveDataThread = 0;
370 371
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
372
    return EXIT_SUCCESS;
373 374

    error:
375
    return EXIT_FAILURE;
376 377
}

378
int PrrtSocket_close(PrrtSocket *s) {
Stefan Reif's avatar
Stefan Reif committed
379
    debug(DEBUG_SOCKET, "Closing socket.");
380 381
    if (!atomic_load_explicit(&s->closing, memory_order_acquire)) {
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
382 383
    }

384 385 386
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
387 388
    }

389 390 391
    if (s->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(s->repairBlockStore);
        s->repairBlockStore = NULL;
392 393
    }

394 395
    if (s->receiver != NULL) {
        PrrtReceiver_destroy(s->receiver);
396 397
    }

398
    if (s->sendDataQueue != NULL) {
399
        Pipe_destroy(s->sendDataQueue);
400
        s->sendDataQueue = NULL;
401 402
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
403 404 405 406 407 408 409 410 411 412
    if (s->dataReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->dataReceptionTable);
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
        PrrtReceptionTable_destroy(s->redundancyReceptionTable);
        s->redundancyReceptionTable = NULL;
    }

413 414 415 416 417
    if(s->senderChannelStateInformation != NULL) {
        PrrtChannelStateInformation_destroy(s->senderChannelStateInformation);
        s->senderChannelStateInformation = NULL;
    }

418 419 420
    if (s->packetDeliveryStore != NULL) {
        PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
        s->packetDeliveryStore = NULL;
421 422
    }

423 424 425
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
426
    }
427

428 429 430
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
431 432
    }

433 434
    if (s->address != NULL) {
        free(s->address);
Andreas Schmidt's avatar
Andreas Schmidt committed
435 436
    }

437 438
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
439
              "Could not destroy application constraints.")
440 441
    }

442
    if (s->codingParameters) {
443 444 445 446 447
        check(PrrtCodingConfiguration_destroy(s->codingParameters), "Could not destroy coding parameters.")
    }

    if(s->coder) {
        check(PrrtCoder_destroy(s->coder), "Could not destroy coder.")
448 449
    }

450 451
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
452 453
    }

454 455
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
456 457
    }

458
    close(s->socketFd);
Stefan Reif's avatar
Stefan Reif committed
459
    debug(DEBUG_SOCKET, "Socket closed.");
460
    return 0;
461 462

    error:
463 464
    PERROR("Closing socket failed.%s", "");
    return -1;
465 466
}

467
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
468
    if (strcmp(name, "targetdelay") == 0) {
469
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
rna's avatar
rna committed
470 471
    } else if (strcmp(name, "connected") == 0) {
        return (s->receiver != NULL) ? 1 : 0;
rna's avatar
rna committed
472 473
    } else if (strcmp(name, "maximum_payload_size") == 0) {
        return s->maximum_payload_size;
474
    } else {
475
        PERROR("Unknown property %s\n", name);
476 477 478 479
        return 0;
    }
}

480
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
481
    if (strcmp(name, "app_queue_size")) {
482
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
483
        // TODO: MPSC_Queue does not provide a size.
484 485 486 487 488 489
    } else {
        return false;
    }

    return true;
}
490

rna's avatar
rna committed
491
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
492 493 494 495 496
    if(s->codingParameters != NULL) {
        PrrtCodingConfiguration_destroy(s->codingParameters);
    }
    s->codingParameters = PrrtCodingConfiguration_create(k, n, c, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
497 498
    return true;
}
499

500 501
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return s->codingParameters;
502 503
}

504

505
bool PrrtSocket_cleanup(PrrtSocket *s) {
506
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup");
Andreas Schmidt's avatar
Andreas Schmidt committed
507 508 509 510 511
    if (s->packetTimeoutTable != NULL) {
        List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
                                                                      PrrtClock_get_prrt_time_us(
                                                                              &s->clock));
        uint32_t expired_count = List_count(expired_packets);
512
        debug(DEBUG_CLEANUP, "EXPIRED");
Andreas Schmidt's avatar
Andreas Schmidt committed
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
        if (expired_count > 0) {
            PrrtPacketTimeout *first = List_first(expired_packets);
            prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                   first->index - SEQNO_SPACE / 2);

            PrrtPacketTimeout *last = List_last(expired_packets);
            prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                  last->index - 1);

            PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);

            PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
                                                    lastSequenceNumberBase);

            List *list = List_create();
            PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
                                             last->sequenceNumber);

            while (List_count(list) > 0) {
                PrrtPacket *packet = (PrrtPacket *) List_shift(list);
                PrrtPacket_destroy(packet);
            }
            List_destroy(list);
536

Andreas Schmidt's avatar
Andreas Schmidt committed
537 538 539
            while (List_count(expired_packets) > 0) {
                PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                PrrtPacketTimeout_destroy(packetTimeout);
540 541
            }
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
542
        List_destroy(expired_packets);
543
    }
544
    debug(DEBUG_CLEANUP, "Expire block range.");
Andreas Schmidt's avatar
Andreas Schmidt committed
545 546 547 548
    if(s->deliveredPacketTable != NULL) {
        prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
        PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
                                                (prrtSequenceNumber_t) (current_start - 1));
549
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
550

551
    debug(DEBUG_CLEANUP, "Loss stats.");
Andreas Schmidt's avatar
Andreas Schmidt committed
552
    s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
553
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup done");
554 555
    return true;
}
556

557
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
558
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
559
}
560

Andreas Schmidt's avatar
Andreas Schmidt committed
561
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s) {
562
    return BBR_getRTProp(s->receiver->bbr);
563
}
Andreas Schmidt's avatar
Andreas Schmidt committed
564

Andreas Schmidt's avatar
Andreas Schmidt committed
565
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s) {
566 567 568
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
569
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
570
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
571
}
572

573 574 575 576
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s) {
    return BBR_getRoundStart(s->receiver->bbr);
}

577 578 579 580 581 582 583 584 585 586 587 588
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) {
    return BBR_getState(s->receiver->bbr);
}

bool PrrtSocket_get_filled_pipe(PrrtSocket *s) {
    return BBR_getFilledPipe(s->receiver->bbr);
}

uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) {
    return BBR_getFullBw(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
589
prrtByteCount_t PrrtSocket_get_cwnd(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
590 591 592
    return BBR_getCwnd(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
593
prrtByteCount_t PrrtSocket_get_inflight(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
594 595 596
    return BBR_getInflight(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
597 598
prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s) {
    return BBR_getSendQuantum(s->receiver->bbr);
599
}
Andreas Schmidt's avatar
Andreas Schmidt committed
600

Andreas Schmidt's avatar
Andreas Schmidt committed
601
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
602
    return PrrtReceiver_get_pipe(s->receiver);
603
}
Andreas Schmidt's avatar
Andreas Schmidt committed
604

Andreas Schmidt's avatar
Andreas Schmidt committed
605 606
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s) {
    return s->receiver->packetTracking->delivered;
607
}
Andreas Schmidt's avatar
Andreas Schmidt committed
608

Andreas Schmidt's avatar
Andreas Schmidt committed
609 610 611 612
float PrrtSocket_get_pacing_gain(PrrtSocket *s) {
    return BBR_getPacingGain(s->receiver->bbr);
}

613 614 615 616
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s) {
    return BBR_getPacingRate(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
617
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s) {
618
    return BBR_getBtlBw(s->receiver->bbr);
Andreas Schmidt's avatar
Andreas Schmidt committed
619 620
}

Andreas Schmidt's avatar
Andreas Schmidt committed
621
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s) {
622
    return PrrtChannelStateInformation_get_delivery_rate(s->senderChannelStateInformation);
Andreas Schmidt's avatar
Andreas Schmidt committed
623 624
}

Andreas Schmidt's avatar
Andreas Schmidt committed
625 626 627
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) {
    return BBR_getCycleIndex(s->receiver->bbr);
}
628

629
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
630
    return s->receiver->rateSample->is_app_limited;
631
}
632 633 634
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
    return s->receiver->packetTracking->app_limited;
}
Andreas Schmidt's avatar
Andreas Schmidt committed
635

636 637 638 639 640 641 642 643 644 645

PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
    if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
        PrrtCoder_get_n(s->coder) != codingParams->n) {
        if (s->coder != NULL) {
            PrrtCoder_destroy(s->coder);
        }
        s->coder = PrrtCoder_create(codingParams);
    }
    return PrrtCoder_copy(s->coder);
646
};