socket.c 30.4 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3 4 5 6 7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8 9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
#include <netdb.h>
13 14
#include "../defines.h"
#include "../util/common.h"
15 16
#include "../util/dbg.h"
#include "../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
17 18
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
19
#include "stores/deliveredPacketTable.h"
20
#include "types/packetTimeout.h"
21
#include "socket.h"
22
#include "timer.h"
23

24
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
25
    size_t timespec_size = sizeof(struct timespec);
26
    prrtPacketLength_t len = 0;
27 28 29
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
30

31 32 33
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
34 35 36 37 38
        if(s->withTimestamp) {
            memcpy(buffer + len, &packet->channelReceive, timespec_size);
            len += timespec_size;
        }

39
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
40

41 42
        memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));

43 44 45 46
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
47 48 49
    return len;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
bool PrrtSocket_pace(PrrtSocket *s, bool prepace) {
    bool result = true;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    if (s->pacingEnabled && s->nextSendTime != 0) {
        debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
        int64_t diff = 0;
        do {
            if(PrrtSocket_closing(s)) {
                result = false;
                break;
            }
            prrtTimestamp_t now = PrrtClock_get_current_time_us();
            diff = PrrtTimestamp_cmp(s->nextSendTime, now);
            if (!prepace) { // post-pacing removes appSendPace
                diff -= PrrtPace_get_external(s->appSendPace);
                diff = MAX(0, diff);
            }
            if (diff > 0) {
                usleep_nano(diff);
            }
        } while (diff > 0);
    } else {
        usleep_nano(1);
    }
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    return result;
}

rna's avatar
rna committed
78
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us) {
79
    assert(sizeof(float) == 4);
80 81
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
82

83
    s->nextSendTime = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
84
    s->pacingEnabled = false;
rna's avatar
rna committed
85
    s->maximum_payload_size = maximum_payload_size;
86
    s->isHardwareTimestamping = false;
87
    s->interfaceName = NULL;
88

89 90
    s->retransmissionTimer = PrrtTimer_create(3);

91 92
    s->isThreadPinning = false;

93
    PrrtClock_init(&s->clock);
94
    s->nextSendTime = PrrtClock_get_current_time_us();
95

96
    s->isBound = false;
97
    atomic_store_explicit(&s->closing, false, memory_order_release);
98
    s->receiver = NULL;
99

100 101 102
    uint8_t n_cycle[1] = { N_START - K_START };
    s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
103

104 105
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
106
    s->sequenceNumberFeedback = 1;
107

108 109
    s->senderChannelStateInformation = PrrtChannelStateInformation_create();

110
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
111
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
112

113
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
114

115
    s->dataPacketStore = PrrtDataPacketStore_create();
116

117 118
    int enabled = 1;

Andreas Schmidt's avatar
Andreas Schmidt committed
119 120
    check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
121
          "Socket option set failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
122
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
123 124
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
125 126 127
    s->sendDataQueue = Pipe_create();
    s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
    s->repairBlockStore = PrrtRepairBlockStore_create();
128

Andreas Schmidt's avatar
Andreas Schmidt committed
129
    s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
130

Andreas Schmidt's avatar
Andreas Schmidt committed
131 132
    s->dataReceptionTable = PrrtReceptionTable_create();
    s->redundancyReceptionTable = PrrtReceptionTable_create();
133

Andreas Schmidt's avatar
Andreas Schmidt committed
134 135 136 137 138 139
    // Pacing
    s->appSendPace = PrrtPace_create();
    s->prrtTransmitPace = PrrtPace_create();
    s->prrtReceivePace = PrrtPace_create();
    s->appDeliverPace = PrrtPace_create();

140
    return s;
141 142

    error:
143
    PrrtSocket_close(s);
144
    return NULL;
145 146
}

147 148 149 150 151 152 153 154 155 156
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

157

158 159 160 161 162 163 164 165 166 167 168
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

169
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
170 171 172
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
173
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
174 175
    check_mem(address);

176 177 178 179 180 181
    struct hostent *host = gethostbyname(ipAddress);
    check(host, "gethostbyname(%s): %s", ipAddress, hstrerror(h_errno));

    char buf[16];
    unsigned char* addr = (unsigned char*) host->h_addr;
    snprintf(buf, 16, "%u.%u.%u.%u", addr[0], addr[1], addr[2], addr[3]);
182

Andreas Schmidt's avatar
Andreas Schmidt committed
183
    address->sin_family = AF_INET;
184
    address->sin_addr.s_addr = inet_addr(buf);
Andreas Schmidt's avatar
Andreas Schmidt committed
185
    address->sin_port = htons((uint16_t) (port));
Andreas Schmidt's avatar
Andreas Schmidt committed
186
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
187

188 189 190 191 192 193 194 195 196 197 198 199 200 201
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
Andreas Schmidt's avatar
Andreas Schmidt committed
202
        if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
203 204 205 206 207 208 209 210 211 212 213 214
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
215

Andreas Schmidt's avatar
Andreas Schmidt committed
216
    check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
217
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
218

219
    if(s->isHardwareTimestamping) {
Andreas Schmidt's avatar
Andreas Schmidt committed
220 221 222 223 224 225 226 227
        int enabled = 1;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
        socklen_t val, len;
        len = sizeof(val);
        check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));

        int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
228
    }
Stefan Reif's avatar
Stefan Reif committed
229

Andreas Schmidt's avatar
Andreas Schmidt committed
230
    s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
231
    pthread_attr_init(s->receiveDataThreadAttr);
Andreas Schmidt's avatar
Andreas Schmidt committed
232
    if(s->isThreadPinning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
233
        check(pin_thread_to_core(s->receiveDataThreadAttr, 2) == EXIT_SUCCESS, "Cannot pin receive-data thread to core 2.");
Andreas Schmidt's avatar
Andreas Schmidt committed
234
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
235 236 237
    check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
                         (void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");

238 239
    s->isBound = true;

240
    return 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
241
    error:
242
    PrrtSocket_close(s);
243 244 245 246
    if (h_errno) {
        return h_errno;
    }
    return -2;
Andreas Schmidt's avatar
Andreas Schmidt committed
247 248
}

Andreas Schmidt's avatar
Andreas Schmidt committed
249 250
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
    check(s->receiver == NULL, "PrrtSocket_connect can only be called once.");
251
    s->receiver = PrrtReceiver_create(host, port, s->maximum_payload_size);
Andreas Schmidt's avatar
Andreas Schmidt committed
252 253
    s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    if(s->isThreadPinning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
254
        check(pin_thread_to_core(s->sendDataThreadAttr, 1) == EXIT_SUCCESS, "Cannot pin send-data thread to core 1.");
Andreas Schmidt's avatar
Andreas Schmidt committed
255 256
    }
    pthread_attr_init(s->sendDataThreadAttr);
257 258 259


    check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, PrrtDataTransmitter_send_data_loop,
Andreas Schmidt's avatar
Andreas Schmidt committed
260 261 262
                         (void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
    return true;
    error:
263
    PERROR("PrrtSocket_connect() failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
264
    return false;
265 266
}

Andreas Schmidt's avatar
Andreas Schmidt committed
267 268
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync, bool try) {
    int res = 0;
rna's avatar
rna committed
269 270
    if (data_len > s->maximum_payload_size) {
        PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
271 272
        return -1;
    }
273

274
    if (s->receiver == NULL) {
275 276 277
        PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n");
        return -1;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
278
    PrrtPace_track_start(s->appSendPace);
279 280 281 282
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
283
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
284 285
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
286 287
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
288

289
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Andreas Schmidt's avatar
Andreas Schmidt committed
290
    PrrtPace_track_pause(s->appSendPace);
291 292
    if (sync) {
        PrrtDataTransmitter_transmit(s, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
293
        PrrtSocket_pace(s, false);
294
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
295 296 297 298 299 300 301 302
        if(try) {
            res = Pipe_try_push(s->sendDataQueue, &packet->asListNode);
            if (res != 0) {
                PrrtPacket_destroy(packet);
            }
        } else {
            Pipe_push(s->sendDataQueue, &packet->asListNode);
        }
303
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
304
    PrrtPace_track_resume(s->appSendPace);
305 306

    PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
307

308 309
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
310
    PrrtPace_track_end(s->appSendPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
311
    return res;
312 313
}

314
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
Andreas Schmidt's avatar
Andreas Schmidt committed
315
    return sendpacket(s, data, data_len, true, false);
316 317 318
}

int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
Andreas Schmidt's avatar
Andreas Schmidt committed
319 320 321 322 323
    return sendpacket(s, data, data_len, false, false);
}

int PrrtSocket_try_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    return sendpacket(s, data, data_len, false, true);
324 325
}

326 327
bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Stefan Reif's avatar
Stefan Reif committed
328 329
}

330
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
331
    PrrtPace_track_start(s->appDeliverPace);
332
    PrrtPacket *packet;
333
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
Andreas Schmidt's avatar
Andreas Schmidt committed
334 335 336
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
337 338
}

339
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
340
    PrrtPace_track_start(s->appDeliverPace);
341 342
    PrrtPacket *packet;
    do {
Andreas Schmidt's avatar
Andreas Schmidt committed
343 344 345 346 347
        struct timespec deadline = abstime_from_now(10 * 1000);
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, &deadline, s->appDeliverPace);
        if (packet == NULL && errno == ETIMEDOUT) {
            errno = 0;
        }
348
        if (PrrtSocket_closing(s)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
349
            PrrtPace_track_end(s->appDeliverPace);
350 351 352
            return -1;
        }
    } while (!packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
353 354 355
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
356 357
}

358
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
Andreas Schmidt's avatar
Andreas Schmidt committed
359 360 361
    PrrtPace_track_start(s->appDeliverPace);
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP,
                                                                      deadline, s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
362
    if (packet == NULL && errno == ETIMEDOUT) {
Andreas Schmidt's avatar
Andreas Schmidt committed
363
        PrrtPace_track_end(s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
364 365
        return -1 * ETIMEDOUT;
    }
366

Andreas Schmidt's avatar
Andreas Schmidt committed
367 368 369
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
Andreas Schmidt's avatar
Andreas Schmidt committed
370 371
}

372 373
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
374 375
}

376
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
Andreas Schmidt's avatar
Andreas Schmidt committed
377
    PrrtPace_track_start(s->appDeliverPace);
378
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
379
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now,
380
                                                            now + time_window_us);
Andreas Schmidt's avatar
Andreas Schmidt committed
381 382 383
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
384 385
}

386
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
Andreas Schmidt's avatar
Andreas Schmidt committed
387
    return PrrtSocket_receive_ordered_timedwait(s, buf_ptr, addr, time_window_us, NULL);
388 389
}

390
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
Andreas Schmidt's avatar
Andreas Schmidt committed
391
    PrrtPace_track_start(s->appDeliverPace);
392
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
Andreas Schmidt's avatar
Andreas Schmidt committed
393 394 395 396 397 398 399
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now, now + time_window_us);
    if (packet == NULL) {
        do {
            struct timespec abs_now = abstime_now();
            if (deadline != NULL && timedelta(&abs_now, deadline) > 0) {
                return -1 * ETIMEDOUT;
            }
400

Andreas Schmidt's avatar
Andreas Schmidt committed
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
            now = PrrtClock_get_current_time_us();
            struct timespec window_end_time = abstime_from_now(time_window_us / 2);
            if (deadline != NULL && timedelta(&window_end_time, deadline) > 0) {
                memcpy(&window_end_time, deadline, sizeof(struct timespec));
            }
            packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
                                                                  &window_end_time, s->appDeliverPace);
            if (packet == NULL && errno == ETIMEDOUT) {
                errno = 0;
            }
            if (PrrtSocket_closing(s)) {
                PrrtPace_track_end(s->appDeliverPace);
                return -1;
            }
        } while (!packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
416
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
417 418 419
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
420 421
}

422
int PrrtSocket_interrupt(PrrtSocket *s) {
423
    debug(DEBUG_SOCKET, "PrrtSocket_interrupt().");
424
    atomic_store_explicit(&s->closing, true, memory_order_release);
425

426
    if (s->packetDeliveryStore != NULL) {
427 428
        PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
    }
Stefan Reif's avatar
Stefan Reif committed
429

430 431 432 433
    if(s->receiver != NULL) {
        PrrtReceiver_interrupt(s->receiver);
    }

434 435 436 437
    if(s->sendDataQueue != NULL) {
      Pipe_wake(s->sendDataQueue);
    }

438 439 440 441 442
    if (s->retransmissionTimer != NULL) {
        PrrtTimer_end(s->retransmissionTimer);
        s->retransmissionTimer = NULL;
    }

443
    void **res = NULL;
444
    if (s->sendDataThread != 0) {
445
        check(pthread_join(s->sendDataThread, res) == EXIT_SUCCESS, "Join failed.");
446
        check(pthread_attr_destroy(s->sendDataThreadAttr) == EXIT_SUCCESS, "Destroy failed.");
447
        s->sendDataThread = 0;
448 449
    }

450
    if (s->receiveDataThread != 0) {
451 452 453 454
        int cancelResult = pthread_cancel(s->receiveDataThread);
        check(cancelResult == EXIT_SUCCESS, "Cancel failed: %d.", cancelResult);
        int joinResult = pthread_join(s->receiveDataThread, res);
        check(joinResult == EXIT_SUCCESS, "Join failed: %d.", joinResult);
455
        check(pthread_attr_destroy(s->receiveDataThreadAttr) == EXIT_SUCCESS, "Destroy failed.");
456
        s->receiveDataThread = 0;
457 458
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
459
    return EXIT_SUCCESS;
460 461

    error:
462
    PERROR("PrrtSocket_interrupt() failed.\n");
463
    return EXIT_FAILURE;
464 465
}

466
int PrrtSocket_close(PrrtSocket *s) {
467
    debug(DEBUG_SOCKET, "PrrtSocket_close() start.");
468
    if (!PrrtSocket_closing(s)) {
469
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
470 471
    }

472 473 474
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
475 476
    }

477
    if (s->repairBlockStore != NULL) {
478
        check(PrrtRepairBlockStore_destroy(s->repairBlockStore), "Destroy failed.");
479
        s->repairBlockStore = NULL;
480 481
    }

482
    if (s->receiver != NULL) {
483 484
        check(PrrtReceiver_destroy(s->receiver), "Destroy failed.");
        s->receiver = NULL;
485 486
    }

487
    if (s->sendDataQueue != NULL) {
488
        check(Pipe_destroy(s->sendDataQueue), "Destroy failed.");
489
        s->sendDataQueue = NULL;
490 491
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
492
    if (s->dataReceptionTable != NULL) {
493
        check(PrrtReceptionTable_destroy(s->dataReceptionTable), "Destroy failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
494 495 496 497
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
498
        check(PrrtReceptionTable_destroy(s->redundancyReceptionTable), "Destroy failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
499 500 501
        s->redundancyReceptionTable = NULL;
    }

502 503 504 505 506
    if(s->senderChannelStateInformation != NULL) {
        PrrtChannelStateInformation_destroy(s->senderChannelStateInformation);
        s->senderChannelStateInformation = NULL;
    }

507
    if (s->packetDeliveryStore != NULL) {
508
        check(PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore), "Destroy failed.");
509
        s->packetDeliveryStore = NULL;
510 511
    }

512 513 514
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
515
    }
516

517 518 519
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
520 521
    }

522 523
    if (s->address != NULL) {
        free(s->address);
524
        s->address = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
525 526
    }

527 528
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
529
              "Could not destroy application constraints.")
530 531
    }

532
    if (s->codingParameters) {
533 534 535
        check(PrrtCodingConfiguration_destroy(s->codingParameters), "Could not destroy coding parameters.")
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
    if(s->appSendPace != NULL) {
        check(PrrtPace_destroy(s->appSendPace), "Could not destroy appSendPace");
    }

    if(s->prrtTransmitPace != NULL) {
        check(PrrtPace_destroy(s->prrtTransmitPace), "Could not destroy prrtTransmitPace");
    }

    if(s->prrtReceivePace != NULL) {
        check(PrrtPace_destroy(s->prrtReceivePace), "Could not destroy prrtReceivePace");
    }

    if(s->appDeliverPace != NULL) {
        check(PrrtPace_destroy(s->appDeliverPace), "Could not destroy appDeliverPace");
    }

552 553
    if(s->coder) {
        check(PrrtCoder_destroy(s->coder), "Could not destroy coder.")
554 555
    }

556 557
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
558 559
    }

560 561
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
562 563
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
564
    close(s->socketFd);
565
    debug(DEBUG_SOCKET, "PrrtSocket_close() end.");
566
    return 0;
567 568

    error:
569 570
    PERROR("Closing socket failed.%s", "");
    return -1;
571 572
}

573
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
574
    if (strcmp(name, "target_delay") == 0) {
575
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
Andreas Schmidt's avatar
Andreas Schmidt committed
576 577
    } else if(strcmp(name, "pacing_enabled") == 0) {
        return (uint32_t) s->pacingEnabled;
Andreas Schmidt's avatar
Andreas Schmidt committed
578 579 580 581 582 583 584 585 586
    } else if(strcmp(name, "appSend_pace_internal") == 0) {
        return PrrtPace_get_internal(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_external") == 0) {
        return PrrtPace_get_external(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_effective") == 0) {
        return PrrtPace_get_effective(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_total") == 0) {
587
        return PrrtPace_get_total(s->appSendPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
588 589 590 591 592 593 594 595 596
    } else if(strcmp(name, "prrtTransmit_pace_internal") == 0) {
        return PrrtPace_get_internal(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_external") == 0) {
        return PrrtPace_get_external(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_effective") == 0) {
        return PrrtPace_get_effective(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_total") == 0) {
597
        return PrrtPace_get_total(s->prrtTransmitPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
598 599 600 601 602 603 604 605 606
    } else if(strcmp(name, "prrtReceive_pace_internal") == 0) {
        return PrrtPace_get_internal(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_external") == 0) {
        return PrrtPace_get_external(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_effective") == 0) {
        return PrrtPace_get_effective(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_total") == 0) {
607
        return PrrtPace_get_total(s->prrtReceivePace);
Andreas Schmidt's avatar
Andreas Schmidt committed
608 609 610 611 612 613 614 615 616
    } else if(strcmp(name, "appDeliver_pace_internal") == 0) {
        return PrrtPace_get_internal(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_external") == 0) {
        return PrrtPace_get_external(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_effective") == 0) {
        return PrrtPace_get_effective(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_total") == 0) {
617
        return PrrtPace_get_total(s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
618 619 620 621 622 623 624 625 626 627
    } else if (strcmp(name, "send_peer_btl_pace") == 0) {
        return s->send_peer_btl_pace;
    } else if (strcmp(name, "recv_peer_btl_pace") == 0) {
        return s->recv_peer_btl_pace;
    } else if (strcmp(name, "nw_pace") == 0) {
        double pacingRate = PrrtReceiver_get_BBR_pacingRate(s->receiver);
        if (pacingRate == 0) {
            return 0;
        }
        return (prrtTimedelta_t) round(((1000 * 1000 * ((double) s->maximum_payload_size)) / pacingRate));
rna's avatar
rna committed
628 629
    } else if (strcmp(name, "connected") == 0) {
        return (s->receiver != NULL) ? 1 : 0;
rna's avatar
rna committed
630 631
    } else if (strcmp(name, "maximum_payload_size") == 0) {
        return s->maximum_payload_size;
632
    } else {
633
        PERROR("Unknown property %s\n", name);
634 635 636 637
        return 0;
    }
}

638
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
Andreas Schmidt's avatar
Andreas Schmidt committed
639
    if (strcmp(name, "app_queue_size") == 0) {
640
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
641
        // TODO: MPSC_Queue does not provide a size.
Andreas Schmidt's avatar
Andreas Schmidt committed
642 643 644 645
    } else if (strcmp(name, "pacing_enabled") == 0) {
        s->pacingEnabled = (value > 0) ? true : false;
    } else if (strcmp(name, "target_delay") == 0) {
        return PrrtApplicationConstraints_set_target_delay(s->applicationConstraints, value);
646 647 648 649 650 651
    } else {
        return false;
    }

    return true;
}
652

rna's avatar
rna committed
653
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
654 655 656 657 658
    if(s->codingParameters != NULL) {
        PrrtCodingConfiguration_destroy(s->codingParameters);
    }
    s->codingParameters = PrrtCodingConfiguration_create(k, n, c, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
659 660
    return true;
}
661

662 663
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return s->codingParameters;
664 665
}

666

667
bool PrrtSocket_cleanup(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
668 669
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup");
    if (s->packetTimeoutTable != NULL) {
670 671
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
        List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable, now);
Andreas Schmidt's avatar
Andreas Schmidt committed
672
        uint32_t expired_count = List_count(expired_packets);
673
        debug(DEBUG_CLEANUP, "EXPIRED: %d", expired_count);
Andreas Schmidt's avatar
Andreas Schmidt committed
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
        if (expired_count > 0) {
            PrrtPacketTimeout *first = List_first(expired_packets);
            prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                   first->index - SEQNO_SPACE / 2);

            PrrtPacketTimeout *last = List_last(expired_packets);
            prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                  last->index - 1);

            PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);

            List *list = List_create();
            PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
                                             last->sequenceNumber);

            while (List_count(list) > 0) {
                PrrtPacket *packet = (PrrtPacket *) List_shift(list);
                PrrtPacket_destroy(packet);
            }
            List_destroy(list);
694

Andreas Schmidt's avatar
Andreas Schmidt committed
695 696 697
            while (List_count(expired_packets) > 0) {
                PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                PrrtPacketTimeout_destroy(packetTimeout);
698
            }
699
            PrrtPacketDeliveryStore_cleanup(s->packetDeliveryStore, now);
700
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
701 702 703 704 705 706 707
        List_destroy(expired_packets);
    }
    debug(DEBUG_CLEANUP, "Expire block range.");
    if(s->deliveredPacketTable != NULL) {
        prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
        PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
                                                (prrtSequenceNumber_t) (current_start - 1));
708
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
709 710 711 712

    debug(DEBUG_CLEANUP, "Loss stats.");
    s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup done");
713 714
    return true;
}
715

716
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
717
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
718
}
719

Andreas Schmidt's avatar
Andreas Schmidt committed
720
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s) {
721
    return BBR_getRTProp(s->receiver->bbr);
722
}
Andreas Schmidt's avatar
Andreas Schmidt committed
723

Andreas Schmidt's avatar
Andreas Schmidt committed
724
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s) {
725 726 727
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
728
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
729
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
730
}
731

732 733 734 735 736 737 738 739 740 741 742 743
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s) {
    return BBR_getRoundStart(s->receiver->bbr);
}

uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) {
    return BBR_getState(s->receiver->bbr);
}

bool PrrtSocket_get_filled_pipe(PrrtSocket *s) {
    return BBR_getFilledPipe(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
744 745
uint64_t PrrtSocket_get_full_datarate(PrrtSocket *s) {
    return BBR_getFullDatarate(s->receiver->bbr);
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
}

prrtByteCount_t PrrtSocket_get_cwnd(PrrtSocket *s) {
    return BBR_getCwnd(s->receiver->bbr);
}

prrtByteCount_t PrrtSocket_get_inflight(PrrtSocket *s) {
    return BBR_getInflight(s->receiver->bbr);
}

prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s) {
    return BBR_getSendQuantum(s->receiver->bbr);
}

prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s) {
    return PrrtReceiver_get_pipe(s->receiver);
}

prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s) {
    return s->receiver->packetTracking->delivered;
}

float PrrtSocket_get_pacing_gain(PrrtSocket *s) {
    return BBR_getPacingGain(s->receiver->bbr);
}

uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s) {
    return BBR_getPacingRate(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
776 777
prrtDeliveryRate_t PrrtSocket_get_btldatarate_fwd(PrrtSocket *s) {
    return BBR_getBtlDatarate(s->receiver->bbr);
Andreas Schmidt's avatar
Andreas Schmidt committed
778 779
}

Andreas Schmidt's avatar
Andreas Schmidt committed
780
prrtDeliveryRate_t PrrtSocket_get_btldatarate_back(PrrtSocket *s) {
781
    return PrrtChannelStateInformation_get_delivery_rate(s->senderChannelStateInformation);
Andreas Schmidt's avatar
Andreas Schmidt committed
782 783
}

784 785 786
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) {
    return BBR_getCycleIndex(s->receiver->bbr);
}
Andreas Schmidt's avatar
Andreas Schmidt committed
787

788 789 790
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s) {
    return s->receiver->rateSample->is_app_limited;
}
Andreas Schmidt's avatar
Andreas Schmidt committed
791

792 793
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
    return s->receiver->packetTracking->app_limited;
794
}
795 796 797 798 799 800 801 802 803 804

PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
    if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
        PrrtCoder_get_n(s->coder) != codingParams->n) {
        if (s->coder != NULL) {
            PrrtCoder_destroy(s->coder);
        }
        s->coder = PrrtCoder_create(codingParams);
    }
    return PrrtCoder_copy(s->coder);
805
};