From e787bc50c6408311e0ff73c9639bcfce51024983 Mon Sep 17 00:00:00 2001 From: Andreas Schmidt Date: Fri, 18 Mar 2016 10:55:15 +0100 Subject: [PATCH] Remove feedback handling from API. --- src/cython/cprrt.pxd | 1 - src/prrt/processes/feedbackReceiver.c | 58 +++++++++++++++++++++++---- src/prrt/socket.c | 42 ------------------- src/prrt/socket.h | 1 - 4 files changed, 50 insertions(+), 52 deletions(-) diff --git a/src/cython/cprrt.pxd b/src/cython/cprrt.pxd index 3adeff2c..e6983f98 100644 --- a/src/cython/cprrt.pxd +++ b/src/cython/cprrt.pxd @@ -117,7 +117,6 @@ cdef extern from "prrt/socket.h": int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port) int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil - PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) cdef extern from "util/bptree.h": ctypedef struct BPTreeNode: diff --git a/src/prrt/processes/feedbackReceiver.c b/src/prrt/processes/feedbackReceiver.c index a6c97bd5..3139eaef 100644 --- a/src/prrt/processes/feedbackReceiver.c +++ b/src/prrt/processes/feedbackReceiver.c @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include #include "../../defines.h" #include "../../util/common.h" #include "../../util/dbg.h" @@ -8,16 +12,54 @@ #include "../socket.h" #include "feedbackReceiver.h" -void * receive_feedback_loop(void *ptr) { +void handle_feedback(PrrtSocket *prrtSocket, const size_t length) +{ + char bufin[MAX_PAYLOAD_LENGTH]; + ssize_t n; + struct sockaddr_in remote; + socklen_t addrlen = sizeof(remote); + + struct pollfd fds; + int timeout_msecs = 1000; + fds.fd = prrtSocket->feedbackSocketFd; + fds.events = POLLIN; + + n = poll(&fds, 1, timeout_msecs); + check(n >= 0, "Select failed.") + if(n == 0) { + return; + } + prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us(); + + n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen); + check(n >= 0, "Receiving feedback failed."); + + PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket)); + check_mem(prrtPacket); + PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket); + + //PrrtPacketFeedbackPayload* payload = prrtPacket->payload; + //struct in_addr a; + //a.s_addr = payload->receiverAddress; + //debug("Receiver Address: %s", inet_ntoa(a)); + + prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload *) prrtPacket->payload)->forwardTripTimestamp_us; + + PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); + PrrtChannelStateInformation_print(prrtSocket->csi); + + error: + if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); } +} + +void *receive_feedback_loop(void *ptr) +{ PrrtSocket *sock_ptr = ptr; check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); - while (sock_ptr->closing == false) { + while(sock_ptr->closing == false) { check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); - PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, MAX_PAYLOAD_LENGTH); - if (t != NULL) { - PrrtPacket_destroy(t); - } + handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH); usleep(1); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); } @@ -26,6 +68,6 @@ void * receive_feedback_loop(void *ptr) { return NULL; error: - PERROR("Feedback reception failed."); - return NULL; + PERROR("Feedback reception failed."); + return NULL; } \ No newline at end of file diff --git a/src/prrt/socket.c b/src/prrt/socket.c index 9ebcc19b..54477cc6 100644 --- a/src/prrt/socket.c +++ b/src/prrt/socket.c @@ -286,48 +286,6 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { return -1; } -PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length) -{ - char bufin[MAX_PAYLOAD_LENGTH]; - ssize_t n; - struct sockaddr_in remote; - socklen_t addrlen = sizeof(remote); - - struct pollfd fds; - int timeout_msecs = 1000; - fds.fd = prrtSocket->feedbackSocketFd; - fds.events = POLLIN; - - n = poll(&fds, 1, timeout_msecs); - check(n >= 0, "Select failed.") - if (n == 0) { - return NULL; - } - prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us(); - - n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen); - check(n >= 0, "Receiving feedback failed."); - - PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket)); - check_mem(prrtPacket); - PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket); - - //PrrtPacketFeedbackPayload* payload = prrtPacket->payload; - //struct in_addr a; - //a.s_addr = payload->receiverAddress; - //debug("Receiver Address: %s", inet_ntoa(a)); - - prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us; - - PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); - PrrtChannelStateInformation_print(prrtSocket->csi); - - return prrtPacket; - - error: - return NULL; -} - bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value) { if(strcmp(name, "targetdelay") == 0) { diff --git a/src/prrt/socket.h b/src/prrt/socket.h index 2aa51b2b..b38c55d6 100644 --- a/src/prrt/socket.h +++ b/src/prrt/socket.h @@ -71,6 +71,5 @@ int PrrtSocket_close(PrrtSocket *sock_ptr); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len); int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr); -PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length); #endif // PRRT_SOCKET_H -- GitLab