Commit 23934970 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Merge branch 'feature/socketMerging' into feature/congestionControl

parents 92e34718 00f430d8
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@ import prrt

port = int(sys.argv[1])

s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)

while True:
    d = s.recv()
@@ -44,7 +44,7 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])

s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=port)
s.connect(host, port)

for i in range(10):
+10 −11
Original line number Diff line number Diff line
@@ -93,9 +93,7 @@ cdef extern from "proto/receiver.h":

cdef extern from "proto/socket.h":
    cdef struct prrtSocket:
        int dataSocketFd
        int feedbackSocketFd
        pthread_t receiveFeedbackThread
        int socketFd

        pthread_t sendDataThread
        pthread_mutex_t outQueueFilledMutex
@@ -119,7 +117,7 @@ cdef extern from "proto/socket.h":

    ctypedef prrtSocket PrrtSocket

    cdef PrrtSocket* PrrtSocket_create(bint isSender, const uint32_t target_delay)
    cdef PrrtSocket* PrrtSocket_create(const uint32_t target_delay)
    bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
    int PrrtSocket_close(const PrrtSocket *sock_ptr)
    int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
@@ -139,13 +137,15 @@ cdef extern from "proto/socket.h":
    bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
    PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s)
    bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
    uint32_t PrrtSocket_get_rtprop(PrrtSocket *socket)

    bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
    uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
    float PrrtSocket_get_plr(PrrtSocket *socket)
    uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
    uint32_t PrrtSocket_get_btlbw(PrrtSocket *socket)

    uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
    float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
    uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
    uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
    uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);

    uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
    uint64_t PrrtSocket_get_full_bw(PrrtSocket *s)
    bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
@@ -158,10 +158,9 @@ cdef extern from "proto/socket.h":
    uint32_t PrrtSocket_get_pipe(PrrtSocket *s)
    uint32_t PrrtSocket_get_delivered(PrrtSocket *s)
    bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)


    uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
    bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)

    bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)

cdef extern from "proto/stores/packetDeliveryStore.h":
+0 −1
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ add_library(PRRT ../defines.h
        ../xlap/xlap.c ../xlap/xlap.h
        applicationConstraints.c applicationConstraints.h
        processes/dataReceiver.c processes/dataReceiver.h
        processes/feedbackReceiver.c processes/feedbackReceiver.h
        processes/dataTransmitter.c processes/dataTransmitter.h
        stores/dataPacketStore.c stores/dataPacketStore.h
        stores/deliveredPacketTable.c stores/deliveredPacketTable.h
+31 −10
Original line number Diff line number Diff line
@@ -74,7 +74,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
    targetaddr.sin_port = htons((uint16_t) (remote_port));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
@@ -97,7 +97,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");

    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
    check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
    free(buf);

@@ -207,6 +207,21 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
    return;
}

void handle_feedback_packet(const PrrtSocket *prrtSocket, const PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
    check(prrtPacket != NULL, "Cannot be null");
    debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
    PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
    prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;

    prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);

    PrrtReceiver_on_ack(prrtSocket->receiver, feedbackPayload, receiveTime, rtt);
    return;

    error:
    PERROR("handle_feedback_packet failed.");
}

void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
                         struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr,
                         uint64_t *packet_cyclestamp_ptr) {
@@ -230,7 +245,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
        msg.msg_control = &control;
        msg.msg_controllen = sizeof(control);

        *received_size = recvmsg(socket_ptr->dataSocketFd, &msg, 0);
        *received_size = recvmsg(socket_ptr->socketFd, &msg, 0);

        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            switch (cmsg->cmsg_type) {
@@ -244,7 +259,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
            }
        }
    } else {
        *received_size = recvfrom(socket_ptr->dataSocketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
        *received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
                                  (struct sockaddr *) remote_ptr, remote_len_ptr);
        clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
    }
@@ -259,6 +274,7 @@ void *receive_data_loop(void *ptr) {
    PrrtSocket *sock_ptr = ptr;

    while (1) {
        debug(DEBUG_DATARECEIVER, "About to receive.");
        XlapTimestampPlaceholder tsph1;
        XlapTimestampPlaceholder tsph2;
        XlapTimestampPlaceholder tsph3;
@@ -271,20 +287,16 @@ void *receive_data_loop(void *ptr) {
        receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
        debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
              packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
        prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
        sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;

        XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
        XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);

        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
        check_mem(packet);

        XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
        XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);

        prrtSequenceNumber_t seqno = packet->sequenceNumber;
        prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);

        prrtPacketType_t packetType = PrrtPacket_type(packet);
        debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
@@ -298,8 +310,12 @@ void *receive_data_loop(void *ptr) {
        } else if (packetType == PACKET_TYPE_REDUNDANCY) {
            kind = ts_redundancy_packet;
            sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
            kind = ts_feedback_packet;
        }
        if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
            sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;

            XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
            XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp);

@@ -318,12 +334,17 @@ void *receive_data_loop(void *ptr) {
            }
            send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
            XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd);
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
            handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp);
            PrrtPacket_destroy(packet);
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
        }

        debug(DEBUG_DATARECEIVER, "Cleanup");
        PrrtSocket_cleanup(sock_ptr);
        debug(DEBUG_DATARECEIVER, "Cleaned");
    }

    error:
+5 −5
Original line number Diff line number Diff line
@@ -28,14 +28,14 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
        msg.msg_control = control;
        msg.msg_controllen = 0;

        sendmsg(sock_ptr->dataSocketFd, &msg, 0);
        sendmsg(sock_ptr->socketFd, &msg, 0);
        *packet_clockstamp = __builtin_ia32_rdtsc();

        msg.msg_control = control;
        iov.iov_len = MAX_PAYLOAD_LENGTH;
        do {
            msg.msg_controllen = 1024;
            got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
            got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
        } while(got < 0  && errno == EAGAIN && check++ < check_max);
        check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");

@@ -59,7 +59,7 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
        }
    } else {
        // TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
        check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
        check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
              length, "Sendto failed.");
        clock_gettime(CLOCK_REALTIME, packet_timestamp);
        *packet_clockstamp = __builtin_ia32_rdtsc();
@@ -194,9 +194,10 @@ void *send_data_loop(void *ptr) {
        packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);

        PrrtPacketDataPayload *payload = packet->payload;
        payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
        payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);

        PrrtPacket *packetToSend = PrrtPacket_copy(packet);
        debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
        send_packet(sock_ptr, packetToSend);
        XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
        XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
@@ -220,6 +221,5 @@ void *send_data_loop(void *ptr) {
            PrrtBlock_destroy(block);
            block = NULL;
        }
        PrrtSocket_cleanup(sock_ptr);
    }
}
Loading