feedbackReceiver.c 1.94 KB
Newer Older
1
2
#include <string.h>
#include <unistd.h>
3
4
5
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/poll.h>
6
#include "../../defines.h"
7
8
#include "../../util/common.h"
#include "../../util/dbg.h"
9
#include "../clock.h"
10
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
#include "feedbackReceiver.h"
12

13
static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
14
15
{
    char bufin[MAX_PAYLOAD_LENGTH];
16
    PrrtPacket *prrtPacket = NULL;
17
18
19
20
21
22
23
24
25
26
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

    struct pollfd fds;
    int timeout_msecs = 1000;
    fds.fd = prrtSocket->feedbackSocketFd;
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
Andreas Schmidt's avatar
Andreas Schmidt committed
27
    check(n >= 0, "Select failed.");
28
29
30
31
32
33
34
35
    if(n == 0) {
        return;
    }
    prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();

    n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");

36
    prrtPacket = calloc(1, sizeof(PrrtPacket));
37
38
39
    check_mem(prrtPacket);
    PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);

Andreas Schmidt's avatar
Andreas Schmidt committed
40
41
    PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
    prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
42

43
44
    PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
                                              (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
Andreas Schmidt's avatar
Andreas Schmidt committed
45
    PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
46
47
48
49
50
51
52

    error:
    if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
}

void *receive_feedback_loop(void *ptr)
{
53
    PrrtSocket *sock_ptr = ptr;
54

55
    while (!atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
56
        handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
57
    }
58
59

    return NULL;
60

61
62
63
64
//    error:
//    PERROR("Feedback reception failed.%s","");
//    return NULL;
}