socket.c 28.6 KB
Newer Older
1
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <math.h>
3 4 5 6 7
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
8 9
#include <net/if.h>
#include <linux/net_tstamp.h>
10
#include <assert.h>
11
#include <sys/ioctl.h>
12
#include <netdb.h>
13 14
#include "../defines.h"
#include "../util/common.h"
15 16
#include "../util/dbg.h"
#include "../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
17 18
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
19
#include "stores/deliveredPacketTable.h"
20
#include "types/packetTimeout.h"
21
#include "socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
22
#include "timer.h"
23

24
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
25
    size_t timespec_size = sizeof(struct timespec);
26
    prrtPacketLength_t len = 0;
27 28 29
    if(packet != NULL) {
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
30

31 32 33
        len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
        PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
34 35 36 37
        if(s->withTimestamp) {
            memcpy(buffer + len, &packet->channelReceive, timespec_size);
            len += timespec_size;
        }
38

39
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
40

41 42
        memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));

43 44 45 46
        XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
        PrrtPacket_destroy(packet);
    }
47 48 49
    return len;
}

50 51 52 53
bool PrrtSocket_pace(PrrtSocket *s, bool prepace) {
    bool result = true;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    if (s->pacingEnabled && s->nextSendTime != 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
54
        debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
55
        int64_t diff = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
56
        do {
57 58 59 60
            if(PrrtSocket_closing(s)) {
                result = false;
                break;
            }
61 62
            prrtTimestamp_t now = PrrtClock_get_current_time_us();
            diff = PrrtTimestamp_cmp(s->nextSendTime, now);
Andreas Schmidt's avatar
Andreas Schmidt committed
63
            if (!prepace) { // post-pacing removes appSendPace
64
                diff -= PrrtPace_get_external(s->appSendPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
65 66 67
                diff = MAX(0, diff);
            }
            if (diff > 0) {
68
                usleep_nano(diff);
Andreas Schmidt's avatar
Andreas Schmidt committed
69 70 71 72 73
            }
        } while (diff > 0);
    } else {
        usleep_nano(1);
    }
74 75
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    return result;
Andreas Schmidt's avatar
Andreas Schmidt committed
76 77
}

rna's avatar
rna committed
78
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us) {
79
    assert(sizeof(float) == 4);
80 81
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);
82

Andreas Schmidt's avatar
Andreas Schmidt committed
83
    s->nextSendTime = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
84
    s->pacingEnabled = true;
rna's avatar
rna committed
85
    s->maximum_payload_size = maximum_payload_size;
86
    s->isHardwareTimestamping = false;
87
    s->interfaceName = NULL;
88

Andreas Schmidt's avatar
Andreas Schmidt committed
89 90
    s->retransmissionTimer = PrrtTimer_create(3);

91 92
    s->isThreadPinning = false;

93
    PrrtClock_init(&s->clock);
Andreas Schmidt's avatar
Andreas Schmidt committed
94
    s->nextSendTime = PrrtClock_get_current_time_us();
95

96
    s->isBound = false;
97
    atomic_store_explicit(&s->closing, false, memory_order_release);
98
    s->receiver = NULL;
99

100 101 102
    uint8_t n_cycle[1] = { N_START - K_START };
    s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
103

104 105
    s->sequenceNumberSource = 1;
    s->sequenceNumberRedundancy = 1;
106
    s->sequenceNumberFeedback = 1;
107

108 109
    s->senderChannelStateInformation = PrrtChannelStateInformation_create();

110
    check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
111
    s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
112

113
    s->packetTimeoutTable = PrrtPacketTimeoutTable_create();
114

115
    s->dataPacketStore = PrrtDataPacketStore_create();
116

117 118
    int enabled = 1;

119 120
    check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
121
          "Socket option set failed.");
122
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
123 124
          "Socket option set failed.");

Andreas Schmidt's avatar
Andreas Schmidt committed
125 126 127
    s->sendDataQueue = Pipe_create();
    s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
    s->repairBlockStore = PrrtRepairBlockStore_create();
128

Andreas Schmidt's avatar
Andreas Schmidt committed
129
    s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
130

Andreas Schmidt's avatar
Andreas Schmidt committed
131 132
    s->dataReceptionTable = PrrtReceptionTable_create();
    s->redundancyReceptionTable = PrrtReceptionTable_create();
133

Andreas Schmidt's avatar
Andreas Schmidt committed
134 135 136 137 138 139
    // Pacing
    s->appSendPace = PrrtPace_create();
    s->prrtTransmitPace = PrrtPace_create();
    s->prrtReceivePace = PrrtPace_create();
    s->appDeliverPace = PrrtPace_create();

140
    return s;
141 142

    error:
143
    PrrtSocket_close(s);
144
    return NULL;
145 146
}

147 148 149 150 151 152 153 154 155 156
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
    check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
    s->isThreadPinning = true;
    return true;

    error:
    PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
    return false;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
157

158 159 160 161 162 163 164 165 166 167 168
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
    check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
    s->isHardwareTimestamping = true;
    s->interfaceName = strdup(interface_name);
    return true;

    error:
    PERROR("PrrtSocket_enable_hardware_timestamping() failed.%s", "");
    return false;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
169
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
Andreas Schmidt's avatar
Andreas Schmidt committed
170 171 172
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
173
    struct sockaddr_in *address = calloc(1, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
174 175
    check_mem(address);

176 177 178 179 180 181
    struct hostent *host = gethostbyname(ipAddress);
    check(host, "gethostbyname(%s): %s", ipAddress, hstrerror(h_errno));

    char buf[16];
    unsigned char* addr = (unsigned char*) host->h_addr;
    snprintf(buf, 16, "%u.%u.%u.%u", addr[0], addr[1], addr[2], addr[3]);
182

Andreas Schmidt's avatar
Andreas Schmidt committed
183
    address->sin_family = AF_INET;
184
    address->sin_addr.s_addr = inet_addr(buf);
Andreas Schmidt's avatar
Andreas Schmidt committed
185
    address->sin_port = htons((uint16_t) (port));
186
    s->address = address;
Andreas Schmidt's avatar
Andreas Schmidt committed
187

188 189 190 191 192 193 194 195 196 197 198 199 200 201
    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
        struct hwtstamp_config hwconfig, hwconfig_requested;

        char *interface = s->interfaceName;
        // Configure HW Timestamping
        memset(&hwtstamp, 0, sizeof(hwtstamp));
        strncpy(hwtstamp.ifr_name, interface, sizeof(hwtstamp.ifr_name));
        hwtstamp.ifr_data = (void *) &hwconfig;
        memset(&hwconfig, 0, sizeof(hwconfig));
        hwconfig.flags = 0;
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
202
        if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
203 204 205 206 207 208 209 210 211 212 213 214
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
               hwconfig_requested.rx_filter == HWTSTAMP_FILTER_NONE) {
                debug(DEBUG_ALL, "Hardware timestamping not possible.");
                goto error; // TODO: Provide nice handling.
            } else {
                debug(DEBUG_ALL, "Failed SIOCSHWTSTAMP: %d.", errno);
                goto error; // TODO: Provide nice handling.
            }
        }
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
215

216
    check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
217
          "Cannot bind data socket.");
Stefan Reif's avatar
Stefan Reif committed
218

219
    if(s->isHardwareTimestamping) {
Andreas Schmidt's avatar
Andreas Schmidt committed
220 221 222 223 224 225 226 227
        int enabled = 1;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
        socklen_t val, len;
        len = sizeof(val);
        check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));

        int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
228
    }
Stefan Reif's avatar
Stefan Reif committed
229

230 231
    s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->receiveDataThreadAttr);
Andreas Schmidt's avatar
Andreas Schmidt committed
232
    if(s->isThreadPinning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
233
        check(pin_thread_to_core(s->receiveDataThreadAttr, 2) == EXIT_SUCCESS, "Cannot pin receive-data thread to core 2.");
Andreas Schmidt's avatar
Andreas Schmidt committed
234
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
235
    check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
236
                         (void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");
Andreas Schmidt's avatar
Andreas Schmidt committed
237

238 239
    s->isBound = true;

Andreas Schmidt's avatar
Andreas Schmidt committed
240
    return 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
241
    error:
242
    PrrtSocket_close(s);
Andreas Schmidt's avatar
Andreas Schmidt committed
243 244 245 246
    if (h_errno) {
        return h_errno;
    }
    return -2;
Andreas Schmidt's avatar
Andreas Schmidt committed
247 248
}

Andreas Schmidt's avatar
Andreas Schmidt committed
249 250
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
    check(s->receiver == NULL, "PrrtSocket_connect can only be called once.");
251
    s->receiver = PrrtReceiver_create(host, port, s->maximum_payload_size);
Andreas Schmidt's avatar
Andreas Schmidt committed
252 253
    s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    if(s->isThreadPinning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
254
        check(pin_thread_to_core(s->sendDataThreadAttr, 1) == EXIT_SUCCESS, "Cannot pin send-data thread to core 1.");
Andreas Schmidt's avatar
Andreas Schmidt committed
255 256
    }
    pthread_attr_init(s->sendDataThreadAttr);
257 258 259


    check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, PrrtDataTransmitter_send_data_loop,
Andreas Schmidt's avatar
Andreas Schmidt committed
260 261 262
                         (void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
    return true;
    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
263
    PERROR("PrrtSocket_connect() failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
264
    return false;
265 266
}

Andreas Schmidt's avatar
Andreas Schmidt committed
267
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync) {
rna's avatar
rna committed
268 269
    if (data_len > s->maximum_payload_size) {
        PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
270 271
        return -1;
    }
272

273
    if (s->receiver == NULL) {
274 275 276
        PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n");
        return -1;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
277
    PrrtPace_track_start(s->appSendPace);
278 279 280 281
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
    XlapTimeStampCycle(&tsph, ts_any_packet, 0, PrrtSendStart);
282
    prrtSequenceNumber_t sequenceNumber = s->sequenceNumberSource++;
283 284
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, sequenceNumber,
                                                       PrrtApplicationConstraints_get_target_delay(
285 286
                                                               s->applicationConstraints));
    XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
287

288
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
289
    PrrtPace_track_pause(s->appSendPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
290 291 292 293 294 295
    if (sync) {
        PrrtDataTransmitter_transmit(s, packet);
        PrrtSocket_pace(s, false);
    } else {
        Pipe_push(s->sendDataQueue, &packet->asListNode);
    }
296
    PrrtPace_track_resume(s->appSendPace);
297

Andreas Schmidt's avatar
Andreas Schmidt committed
298
    PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
299

300 301
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
Andreas Schmidt's avatar
Andreas Schmidt committed
302
    PrrtPace_track_end(s->appSendPace);
303
    return 0;
304 305
}

Andreas Schmidt's avatar
Andreas Schmidt committed
306 307
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    return sendpacket(s, data, data_len, true);
Stefan Reif's avatar
Stefan Reif committed
308 309
}

Andreas Schmidt's avatar
Andreas Schmidt committed
310 311 312 313 314 315
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    return sendpacket(s, data, data_len, false);
}

bool PrrtSocket_closing(PrrtSocket *s) {
    return atomic_load_explicit(&s->closing, memory_order_acquire);
Andreas Schmidt's avatar
Andreas Schmidt committed
316 317
}

318
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
319
    PrrtPace_track_start(s->appDeliverPace);
320
    PrrtPacket *packet;
321
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
Andreas Schmidt's avatar
Andreas Schmidt committed
322 323 324
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
325 326
}

327
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
328
    PrrtPace_track_start(s->appDeliverPace);
329 330
    PrrtPacket *packet;
    do {
Andreas Schmidt's avatar
Andreas Schmidt committed
331
        struct timespec deadline = abstime_from_now(10 * 1000);
332
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, &deadline, s->appDeliverPace);
333 334 335
        if (packet == NULL && errno == ETIMEDOUT) {
            errno = 0;
        }
336
        if (PrrtSocket_closing(s)) {
337
            PrrtPace_track_end(s->appDeliverPace);
338 339 340
            return -1;
        }
    } while (!packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
341 342 343
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
344 345
}

346
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
Andreas Schmidt's avatar
Andreas Schmidt committed
347
    PrrtPace_track_start(s->appDeliverPace);
348 349
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP,
                                                                      deadline, s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
350
    if (packet == NULL && errno == ETIMEDOUT) {
Andreas Schmidt's avatar
Andreas Schmidt committed
351
        PrrtPace_track_end(s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
352 353
        return -1 * ETIMEDOUT;
    }
354

Andreas Schmidt's avatar
Andreas Schmidt committed
355 356 357
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
Andreas Schmidt's avatar
Andreas Schmidt committed
358 359
}

360 361
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
    return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
362 363
}

364
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
Andreas Schmidt's avatar
Andreas Schmidt committed
365
    PrrtPace_track_start(s->appDeliverPace);
366
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
367
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now,
368
                                                            now + time_window_us);
Andreas Schmidt's avatar
Andreas Schmidt committed
369 370 371
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
372 373
}

374
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
Andreas Schmidt's avatar
Andreas Schmidt committed
375
    PrrtPace_track_start(s->appDeliverPace);
376 377 378
    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
379
        struct timespec deadline = abstime_from_now(time_window_us);
380 381
        packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
                                                              &deadline, s->appDeliverPace);
382 383 384
        if (packet == NULL && errno == ETIMEDOUT) {
            errno = 0;
        }
385
        if (PrrtSocket_closing(s)) {
386
            PrrtPace_track_end(s->appDeliverPace);
387 388 389
            return -1;
        }
    } while (!packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
390 391 392
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
393 394
}

395
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
Andreas Schmidt's avatar
Andreas Schmidt committed
396
    PrrtPace_track_start(s->appDeliverPace);
397 398
    prrtTimestamp_t now = PrrtClock_get_current_time_us();

399 400
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
                                                                      deadline, s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
401
    if (packet == NULL && errno == ETIMEDOUT) {
Andreas Schmidt's avatar
Andreas Schmidt committed
402
        PrrtPace_track_end(s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
403 404 405
        return -1 * ETIMEDOUT;
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
406 407 408
    prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
    PrrtPace_track_end(s->appDeliverPace);
    return l;
409 410
}

411
int PrrtSocket_interrupt(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
412
    debug(DEBUG_SOCKET, "PrrtSocket_interrupt().");
413
    atomic_store_explicit(&s->closing, true, memory_order_release);
414

415
    if (s->packetDeliveryStore != NULL) {
416 417
        PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
    }
Stefan Reif's avatar
Stefan Reif committed
418

Andreas Schmidt's avatar
Andreas Schmidt committed
419 420 421 422
    if(s->receiver != NULL) {
        PrrtReceiver_interrupt(s->receiver);
    }

423 424 425 426
    if(s->sendDataQueue != NULL) {
      Pipe_wake(s->sendDataQueue);
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
427 428 429 430 431
    if (s->retransmissionTimer != NULL) {
        PrrtTimer_end(s->retransmissionTimer);
        s->retransmissionTimer = NULL;
    }

432
    void **res = NULL;
433
    if (s->sendDataThread != 0) {
434
        check(pthread_join(s->sendDataThread, res) == EXIT_SUCCESS, "Join failed.");
435
        check(pthread_attr_destroy(s->sendDataThreadAttr) == EXIT_SUCCESS, "Destroy failed.");
436
        s->sendDataThread = 0;
437 438
    }

439
    if (s->receiveDataThread != 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
440 441 442 443
        int cancelResult = pthread_cancel(s->receiveDataThread);
        check(cancelResult == EXIT_SUCCESS, "Cancel failed: %d.", cancelResult);
        int joinResult = pthread_join(s->receiveDataThread, res);
        check(joinResult == EXIT_SUCCESS, "Join failed: %d.", joinResult);
444
        check(pthread_attr_destroy(s->receiveDataThreadAttr) == EXIT_SUCCESS, "Destroy failed.");
445
        s->receiveDataThread = 0;
446 447
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
448
    return EXIT_SUCCESS;
449 450

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
451
    PERROR("PrrtSocket_interrupt() failed.\n");
452
    return EXIT_FAILURE;
453 454
}

455
int PrrtSocket_close(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
456
    debug(DEBUG_SOCKET, "PrrtSocket_close() start.");
457
    if (!PrrtSocket_closing(s)) {
458
        check(PrrtSocket_interrupt(s) == EXIT_SUCCESS, "Interrupt failed.");
459 460
    }

461 462 463
    if (s->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(s->dataPacketStore), "Destroy failed.");
        s->dataPacketStore = NULL;
464 465
    }

466
    if (s->repairBlockStore != NULL) {
467
        check(PrrtRepairBlockStore_destroy(s->repairBlockStore), "Destroy failed.");
468
        s->repairBlockStore = NULL;
469 470
    }

471
    if (s->receiver != NULL) {
472 473
        check(PrrtReceiver_destroy(s->receiver), "Destroy failed.");
        s->receiver = NULL;
474 475
    }

476
    if (s->sendDataQueue != NULL) {
477
        check(Pipe_destroy(s->sendDataQueue), "Destroy failed.");
478
        s->sendDataQueue = NULL;
479 480
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
481
    if (s->dataReceptionTable != NULL) {
482
        check(PrrtReceptionTable_destroy(s->dataReceptionTable), "Destroy failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
483 484 485 486
        s->dataReceptionTable = NULL;
    }

    if (s->redundancyReceptionTable != NULL) {
487
        check(PrrtReceptionTable_destroy(s->redundancyReceptionTable), "Destroy failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
488 489 490
        s->redundancyReceptionTable = NULL;
    }

491 492 493 494 495
    if(s->senderChannelStateInformation != NULL) {
        PrrtChannelStateInformation_destroy(s->senderChannelStateInformation);
        s->senderChannelStateInformation = NULL;
    }

496
    if (s->packetDeliveryStore != NULL) {
497
        check(PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore), "Destroy failed.");
498
        s->packetDeliveryStore = NULL;
499 500
    }

501 502 503
    if (s->deliveredPacketTable != NULL) {
        check(PrrtDeliveredPacketTable_destroy(s->deliveredPacketTable), "Destroy failed.");
        s->deliveredPacketTable = NULL;
504
    }
505

506 507 508
    if (s->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(s->packetTimeoutTable), "Destroy failed.");
        s->packetTimeoutTable = NULL;
509 510
    }

511 512
    if (s->address != NULL) {
        free(s->address);
513
        s->address = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
514 515
    }

516 517
    if (s->applicationConstraints != NULL) {
        check(PrrtApplicationConstraints_destroy(s->applicationConstraints),
518
              "Could not destroy application constraints.")
519 520
    }

521
    if (s->codingParameters) {
522 523 524
        check(PrrtCodingConfiguration_destroy(s->codingParameters), "Could not destroy coding parameters.")
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
    if(s->appSendPace != NULL) {
        check(PrrtPace_destroy(s->appSendPace), "Could not destroy appSendPace");
    }

    if(s->prrtTransmitPace != NULL) {
        check(PrrtPace_destroy(s->prrtTransmitPace), "Could not destroy prrtTransmitPace");
    }

    if(s->prrtReceivePace != NULL) {
        check(PrrtPace_destroy(s->prrtReceivePace), "Could not destroy prrtReceivePace");
    }

    if(s->appDeliverPace != NULL) {
        check(PrrtPace_destroy(s->appDeliverPace), "Could not destroy appDeliverPace");
    }

541 542
    if(s->coder) {
        check(PrrtCoder_destroy(s->coder), "Could not destroy coder.")
543 544
    }

545 546
    if (s->sendDataThreadAttr != NULL) {
        free(s->sendDataThreadAttr);
547 548
    }

549 550
    if (s->receiveDataThreadAttr != NULL) {
        free(s->receiveDataThreadAttr);
551 552
    }

553
    close(s->socketFd);
Andreas Schmidt's avatar
Andreas Schmidt committed
554
    debug(DEBUG_SOCKET, "PrrtSocket_close() end.");
555
    return 0;
556 557

    error:
558 559
    PERROR("Closing socket failed.%s", "");
    return -1;
560 561
}

562
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
Andreas Schmidt's avatar
Andreas Schmidt committed
563
    if (strcmp(name, "targetdelay") == 0) {
564
        return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
    } else if(strcmp(name, "appSend_pace_internal") == 0) {
        return PrrtPace_get_internal(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->appSendPace);
    } else if(strcmp(name, "appSend_pace_external") == 0) {
        return PrrtPace_get_external(s->appSendPace);
    } else if(strcmp(name, "prrtTransmit_pace_internal") == 0) {
        return PrrtPace_get_internal(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtTransmit_pace_external") == 0) {
        return PrrtPace_get_external(s->prrtTransmitPace);
    } else if(strcmp(name, "prrtReceive_pace_internal") == 0) {
        return PrrtPace_get_internal(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->prrtReceivePace);
    } else if(strcmp(name, "prrtReceive_pace_external") == 0) {
        return PrrtPace_get_external(s->prrtReceivePace);
    } else if(strcmp(name, "appDeliver_pace_internal") == 0) {
        return PrrtPace_get_internal(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_dependent") == 0) {
        return PrrtPace_get_dependent(s->appDeliverPace);
    } else if(strcmp(name, "appDeliver_pace_external") == 0) {
        return PrrtPace_get_external(s->appDeliverPace);
Andreas Schmidt's avatar
Andreas Schmidt committed
589 590 591 592
    } else if (strcmp(name, "send_peer_btl_pace") == 0) {
        return s->send_peer_btl_pace;
    } else if (strcmp(name, "recv_peer_btl_pace") == 0) {
        return s->recv_peer_btl_pace;
Andreas Schmidt's avatar
Andreas Schmidt committed
593 594 595 596 597 598
    } else if (strcmp(name, "nw_pace") == 0) {
        double pacingRate = PrrtReceiver_get_BBR_pacingRate(s->receiver);
        if (pacingRate == 0) {
            return 0;
        }
        return (prrtTimedelta_t) round(((1000 * 1000 * ((double) s->maximum_payload_size)) / pacingRate));
rna's avatar
rna committed
599 600
    } else if (strcmp(name, "connected") == 0) {
        return (s->receiver != NULL) ? 1 : 0;
rna's avatar
rna committed
601 602
    } else if (strcmp(name, "maximum_payload_size") == 0) {
        return s->maximum_payload_size;
603
    } else {
604
        PERROR("Unknown property %s\n", name);
605 606 607 608
        return 0;
    }
}

609
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
610
    if (strcmp(name, "app_queue_size")) {
611
        PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Andreas Schmidt's avatar
Andreas Schmidt committed
612
        // TODO: MPSC_Queue does not provide a size.
613 614 615 616 617 618
    } else {
        return false;
    }

    return true;
}
619

rna's avatar
rna committed
620
bool PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle) {
621 622 623 624 625
    if(s->codingParameters != NULL) {
        PrrtCodingConfiguration_destroy(s->codingParameters);
    }
    s->codingParameters = PrrtCodingConfiguration_create(k, n, c, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
626 627
    return true;
}
628

629 630
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {
    return s->codingParameters;
631 632
}

633

634
bool PrrtSocket_cleanup(PrrtSocket *s) {
635
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup");
Andreas Schmidt's avatar
Andreas Schmidt committed
636
    if (s->packetTimeoutTable != NULL) {
637 638
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
        List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable, now);
Andreas Schmidt's avatar
Andreas Schmidt committed
639
        uint32_t expired_count = List_count(expired_packets);
Andreas Schmidt's avatar
Andreas Schmidt committed
640
        debug(DEBUG_CLEANUP, "EXPIRED: %d", expired_count);
Andreas Schmidt's avatar
Andreas Schmidt committed
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
        if (expired_count > 0) {
            PrrtPacketTimeout *first = List_first(expired_packets);
            prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
                                                                                   first->index - SEQNO_SPACE / 2);

            PrrtPacketTimeout *last = List_last(expired_packets);
            prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
                                                                                  last->index - 1);

            PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);

            List *list = List_create();
            PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
                                             last->sequenceNumber);

            while (List_count(list) > 0) {
                PrrtPacket *packet = (PrrtPacket *) List_shift(list);
                PrrtPacket_destroy(packet);
            }
            List_destroy(list);
661

Andreas Schmidt's avatar
Andreas Schmidt committed
662 663 664
            while (List_count(expired_packets) > 0) {
                PrrtPacketTimeout *packetTimeout = (PrrtPacketTimeout *) List_shift(expired_packets);
                PrrtPacketTimeout_destroy(packetTimeout);
665
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
666
            PrrtPacketDeliveryStore_cleanup(s->packetDeliveryStore, now);
667
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
668
        List_destroy(expired_packets);
669
    }
670
    debug(DEBUG_CLEANUP, "Expire block range.");
Andreas Schmidt's avatar
Andreas Schmidt committed
671 672 673 674
    if(s->deliveredPacketTable != NULL) {
        prrtSequenceNumber_t current_start = s->deliveredPacketTable->start;
        PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, (prrtSequenceNumber_t) (current_start - SEQNO_SPACE/2),
                                                (prrtSequenceNumber_t) (current_start - 1));
675
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
676

677
    debug(DEBUG_CLEANUP, "Loss stats.");
Andreas Schmidt's avatar
Andreas Schmidt committed
678
    s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
679
    debug(DEBUG_CLEANUP, "PrrtSocket_cleanup done");
680 681
    return true;
}
682

683
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
684
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
685
}
686

Andreas Schmidt's avatar
Andreas Schmidt committed
687
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s) {
688
    return BBR_getRTProp(s->receiver->bbr);
689
}
Andreas Schmidt's avatar
Andreas Schmidt committed
690

Andreas Schmidt's avatar
Andreas Schmidt committed
691
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s) {
692 693 694
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
695
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
696
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
Andreas Schmidt's avatar
Andreas Schmidt committed
697
}
698

699 700 701 702
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s) {
    return BBR_getRoundStart(s->receiver->bbr);
}

703 704 705 706 707 708 709 710 711 712 713 714
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) {
    return BBR_getState(s->receiver->bbr);
}

bool PrrtSocket_get_filled_pipe(PrrtSocket *s) {
    return BBR_getFilledPipe(s->receiver->bbr);
}

uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) {
    return BBR_getFullBw(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
715
prrtByteCount_t PrrtSocket_get_cwnd(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
716 717 718
    return BBR_getCwnd(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
719
prrtByteCount_t PrrtSocket_get_inflight(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
720 721 722
    return BBR_getInflight(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
723 724
prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s) {
    return BBR_getSendQuantum(s->receiver->bbr);
725
}
Andreas Schmidt's avatar
Andreas Schmidt committed
726

Andreas Schmidt's avatar
Andreas Schmidt committed
727
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
728
    return PrrtReceiver_get_pipe(s->receiver);
729
}
Andreas Schmidt's avatar
Andreas Schmidt committed
730

Andreas Schmidt's avatar
Andreas Schmidt committed
731 732
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s) {
    return s->receiver->packetTracking->delivered;
733
}
Andreas Schmidt's avatar
Andreas Schmidt committed
734

Andreas Schmidt's avatar
Andreas Schmidt committed
735 736 737 738
float PrrtSocket_get_pacing_gain(PrrtSocket *s) {
    return BBR_getPacingGain(s->receiver->bbr);
}

739 740 741 742
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s) {
    return BBR_getPacingRate(s->receiver->bbr);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
743
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s) {
744
    return BBR_getBtlBw(s->receiver->bbr);
Andreas Schmidt's avatar
Andreas Schmidt committed
745 746
}

Andreas Schmidt's avatar
Andreas Schmidt committed
747
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s) {
748
    return PrrtChannelStateInformation_get_delivery_rate(s->senderChannelStateInformation);
Andreas Schmidt's avatar
Andreas Schmidt committed
749 750
}

Andreas Schmidt's avatar
Andreas Schmidt committed
751 752 753
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) {
    return BBR_getCycleIndex(s->receiver->bbr);
}
754

755
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s) {
Andreas Schmidt's avatar
Andreas Schmidt committed
756
    return s->receiver->rateSample->is_app_limited;
757
}
758 759 760
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
    return s->receiver->packetTracking->app_limited;
}
Andreas Schmidt's avatar
Andreas Schmidt committed
761

762

Andreas Schmidt's avatar
Andreas Schmidt committed
763

764 765 766 767 768 769 770 771 772
PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
    if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
        PrrtCoder_get_n(s->coder) != codingParams->n) {
        if (s->coder != NULL) {
            PrrtCoder_destroy(s->coder);
        }
        s->coder = PrrtCoder_create(codingParams);
    }
    return PrrtCoder_copy(s->coder);
773
};