Commit 01b825e1 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Use the same socket for data and feedback.

parent 6191755a
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -93,8 +93,7 @@ cdef extern from "proto/receiver.h":

cdef extern from "proto/socket.h":
    cdef struct prrtSocket:
        int dataSocketFd
        int feedbackSocketFd
        int socketFd
        pthread_t receiveFeedbackThread

        pthread_t sendDataThread
+4 −4
Original line number Diff line number Diff line
@@ -75,7 +75,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
    targetaddr.sin_port = htons((uint16_t) (remote_port));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
@@ -98,7 +98,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");

    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
    check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
    free(buf);

@@ -226,7 +226,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
        msg.msg_control = &control;
        msg.msg_controllen = sizeof(control);

        *received_size = recvmsg(socket_ptr->dataSocketFd, &msg, 0);
        *received_size = recvmsg(socket_ptr->socketFd, &msg, 0);

        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            switch (cmsg->cmsg_type) {
@@ -240,7 +240,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
            }
        }
    } else {
        *received_size = recvfrom(socket_ptr->dataSocketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
        *received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
                                  (struct sockaddr *) remote_ptr, remote_len_ptr);
        clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
    }
+3 −3
Original line number Diff line number Diff line
@@ -27,14 +27,14 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
        msg.msg_control = control;
        msg.msg_controllen = 0;

        sendmsg(sock_ptr->dataSocketFd, &msg, 0);
        sendmsg(sock_ptr->socketFd, &msg, 0);
        *packet_clockstamp = __builtin_ia32_rdtsc();

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
            got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

@@ -58,7 +58,7 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
              length, "Sendto failed.");
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
        *packet_clockstamp = __builtin_ia32_rdtsc();
+2 −2
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)

    struct pollfd fds;
    int timeout_msecs = 1000;
    fds.fd = prrtSocket->feedbackSocketFd;
    fds.fd = prrtSocket->socketFd;
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
@@ -30,7 +30,7 @@ static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
    }
    prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();

    n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    n = recvfrom(prrtSocket->socketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");

    prrtPacket = calloc(1, sizeof(PrrtPacket));
+11 −24
Original line number Diff line number Diff line
@@ -84,19 +84,12 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay

    int enabled = 1;

    check(s->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
    check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");
    check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");
    check(s->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");
    check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
    check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
          "Socket option set failed.");

    if (is_sender) {
@@ -148,13 +141,8 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
    s->address = address;

    check(bind(s->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));
    s->address = address;

    if(s->isHardwareTimestamping) {
        struct ifreq hwtstamp;
@@ -170,7 +158,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
        hwconfig.tx_type = HWTSTAMP_TX_ON;
        hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
        hwconfig_requested = hwconfig;
        if(ioctl(s->dataSocketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
        if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
            check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")

            if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
@@ -184,19 +172,19 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
        }
    }

    check(bind(s->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
    check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");

    if(s->isHardwareTimestamping) {
        if(!s->isSender) {
            int enabled = 1;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
            check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
            socklen_t val, len;
            len = sizeof(val);
            check(getsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
            check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
        } else {
            int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
            check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
            check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
        }
    }

@@ -494,8 +482,7 @@ int PrrtSocket_close(PrrtSocket *s) {
    }


    close(s->dataSocketFd);
    close(s->feedbackSocketFd);
    close(s->socketFd);
    debug(DEBUG_SOCKET, "Socket closed.");
    return 0;

Loading