Commit 973501b7 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Remove isSender flag.

parent 01b825e1
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@ import prrt

port = int(sys.argv[1])

s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)

while True:
    d = s.recv()
@@ -44,7 +44,7 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])

s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=port)
s.connect(host, port)

for i in range(10):
+8 −6
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ cdef extern from "proto/socket.h":

    ctypedef prrtSocket PrrtSocket

    cdef PrrtSocket* PrrtSocket_create(bint isSender, const uint32_t target_delay)
    cdef PrrtSocket* PrrtSocket_create(const uint32_t target_delay)
    bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
    int PrrtSocket_close(const PrrtSocket *sock_ptr)
    int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
@@ -138,13 +138,15 @@ cdef extern from "proto/socket.h":
    bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
    PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s)
    bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
    uint32_t PrrtSocket_get_rtprop(PrrtSocket *socket)

    bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
    uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
    float PrrtSocket_get_plr(PrrtSocket *socket)
    uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
    uint32_t PrrtSocket_get_btlbw(PrrtSocket *socket)
    uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
    float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
    uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)

    uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
    uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);

    bint PrrtSocket_get_app_limited(PrrtSocket *socket)
    bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)

+1 −1
Original line number Diff line number Diff line
@@ -149,7 +149,7 @@ void *send_data_loop(void *ptr) {
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);

        PrrtPacketDataPayload *payload = packet->payload;
        payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
        payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);

        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
        send_packet(sock_ptr, packetToSend);
+92 −136
Original line number Diff line number Diff line
@@ -49,13 +49,12 @@ struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
    return deadline;
}

PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us) {
    assert(sizeof(float) == 4);
    PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
    check_mem(s);


    s->isSender = is_sender;
    s->isHardwareTimestamping = false;
    s->interfaceName = NULL;

@@ -92,9 +91,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");

    if (is_sender) {
    s->sendDataQueue = Pipe_create();
    } else {
    s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
    s->repairBlockStore = PrrtRepairBlockStore_create();

@@ -102,7 +99,6 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay

    s->dataReceptionTable = PrrtReceptionTable_create();
    s->redundancyReceptionTable = PrrtReceptionTable_create();
    }

    return s;

@@ -176,19 +172,16 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
          "Cannot bind data socket.");

    if(s->isHardwareTimestamping) {
        if(!s->isSender) {
        int enabled = 1;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
        socklen_t val, len;
        len = sizeof(val);
        check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
        } else {
            int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
            check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
        }

        int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
        check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
    }

    if (s->isSender) {
    s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->receiveFeedbackThreadAttr);
    s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
@@ -206,7 +199,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
                         (void *) s) ==
          EXIT_SUCCESS,
          "Cannot create send thread.");
    } else {

    s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
    pthread_attr_init(s->receiveDataThreadAttr);
    if(s->isThreadPinning) {
@@ -216,7 +209,6 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
                         (void *) s) ==
          EXIT_SUCCESS,
          "Cannot create data receiving thread.");
    }

    s->isBound = true;

@@ -235,7 +227,6 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
}

int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    check(s->isSender, "Cannot send on receiver socket.")
    XlapTimestampPlaceholder tsph;
    XlapTimestampPlaceholderInitialize(&tsph);
    XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
@@ -253,9 +244,6 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
    XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
    return 0;
    error:
    PERROR("There was a failure while sending from socket.%s", "");
    return -1;
}

bool PrrtSocket_closing(PrrtSocket *s) {
@@ -263,20 +251,12 @@ bool PrrtSocket_closing(PrrtSocket *s) {
}

int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
    return deliver_packet(s, buf_ptr, packet);

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
        packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
@@ -286,24 +266,15 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
    } while (!packet);

    return deliver_packet(s, buf_ptr, packet);

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
    if (packet == NULL && errno == ETIMEDOUT) {
        return -1 * ETIMEDOUT;
    }

    return deliver_packet(s, buf_ptr, packet);
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
@@ -311,20 +282,13 @@ int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
}

int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
                                                            now + time_window_us);
    return deliver_packet(s, buf_ptr, packet);
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    PrrtPacket *packet;
    do {
        prrtTimestamp_t now = PrrtClock_get_current_time_us();
@@ -336,15 +300,9 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimede
    } while (!packet);

    return deliver_packet(s, buf_ptr, packet);

    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}

int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
    check(s->isSender == false, "Cannot receive on sender socket.")

    prrtTimestamp_t now = PrrtClock_get_current_time_us();

    PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
@@ -354,9 +312,6 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
    }

    return deliver_packet(s, buf_ptr, packet);
    error:
    PERROR("There was a failure while receiving from socket.%s", "");
    return -1;
}


@@ -526,9 +481,6 @@ PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s) {


bool PrrtSocket_cleanup(PrrtSocket *s) {
    if (s->isSender) {

    } else {
    if (s->packetTimeoutTable != NULL) {
        List *expired_packets = PrrtPacketTimeoutTable_expire_packets(s->packetTimeoutTable,
                                                                      PrrtClock_get_prrt_time_us(
@@ -572,7 +524,6 @@ bool PrrtSocket_cleanup(PrrtSocket *s) {
    }

    s->lossStatistics = PrrtLossStatistics_add(PrrtReceptionTable_calculate_statistics(s->dataReceptionTable), PrrtReceptionTable_calculate_statistics(s->redundancyReceptionTable));
    }
    return true;
}

@@ -580,22 +531,27 @@ bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
    return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
}

uint32_t PrrtSocket_get_rtprop(PrrtSocket *s) {
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_rtprop(s->receiver->csi);
}

prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s) {
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_plr(s->receiver->csi);
}

prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s) {
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
}

prrtDeliveryRate_t PrrtSocket_get_btlbw(PrrtSocket *s) {
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_btlbw(s->receiver->csi);
}

prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s) {
    return 0;
}


bool PrrtSocket_get_app_limited(PrrtSocket *s) {
    return PrrtChannelStateInformation_get_app_limited(s->receiver->csi);
}
+6 −6
Original line number Diff line number Diff line
@@ -57,7 +57,6 @@ typedef struct prrtSocket {
    prrtSequenceNumber_t sequenceNumberRepetition;
    prrtSequenceNumber_t sequenceNumberRedundancy;
    prrtSequenceNumber_t sequenceNumberFeedback;
    bool isSender;

    prrtTimestamp_t lastSentTimestamp;
    prrtTimestamp_t lastReceivedTimestamp;
@@ -85,7 +84,7 @@ typedef struct prrtSocket {
} PrrtSocket;


PrrtSocket *PrrtSocket_create(bool is_sender, prrtTimedelta_t target_delay_us);
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us);

bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name);

@@ -128,11 +127,12 @@ bool PrrtSocket_closing(PrrtSocket *s);

bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);

uint32_t PrrtSocket_get_rtprop(PrrtSocket *s);
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s);

prrtPacketLossRate_t PrrtSocket_get_plr(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);

#endif // PRRT_SOCKET_H
Loading