#include #include #include #include #include #include "../../defines.h" #include "../../util/common.h" #include "../../util/dbg.h" #include "../clock.h" #include "../socket.h" #include "feedbackReceiver.h" void handle_feedback(PrrtSocket *prrtSocket, const size_t length) { char bufin[MAX_PAYLOAD_LENGTH]; ssize_t n; struct sockaddr_in remote; socklen_t addrlen = sizeof(remote); struct pollfd fds; int timeout_msecs = 1000; fds.fd = prrtSocket->feedbackSocketFd; fds.events = POLLIN; n = poll(&fds, 1, timeout_msecs); check(n >= 0, "Select failed.") if(n == 0) { return; } prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us(); n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen); check(n >= 0, "Receiving feedback failed."); PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket)); check_mem(prrtPacket); PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket); //PrrtPacketFeedbackPayload* payload = prrtPacket->payload; //struct in_addr a; //a.s_addr = payload->receiverAddress; //debug("Receiver Address: %s", inet_ntoa(a)); prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload *) prrtPacket->payload)->forwardTripTimestamp_us; PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); PrrtChannelStateInformation_print(prrtSocket->csi); error: if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); } } void *receive_feedback_loop(void *ptr) { PrrtSocket *sock_ptr = ptr; check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); while(sock_ptr->closing == false) { check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH); usleep(1); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); } check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); return NULL; error: PERROR("Feedback reception failed."); return NULL; }