data_receiver.c 2.85 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
4
5
6
#include <src/defines.h>
#include <src/util/dbg.h>
#include <src/prrt/socket.h>
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "data_receiver.h"

int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
    uint8_t buf[MAX_PAYLOAD_LENGTH];
    uint32_t length = PrrtPacket_size(feedback_pkt_ptr);

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) == EXIT_SUCCESS,
          "Buffer for encoding feedback is too small");
25
26
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

    return EXIT_SUCCESS;

    error:
    return EXIT_FAILURE;
}

void *receive_data_loop(void *ptr) {
    ssize_t n;
    uint16_t remote_port;
    char *remote_host;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;
    PrrtPacket *packet;

    while (1) {
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
        remote_port = ntohs(remote.sin_port);
        remote_host = inet_ntoa(remote.sin_addr);

        packet = malloc(sizeof(PrrtPacket));
        check_mem(packet);
        PrrtPacket_decode(buffer, (uint16_t) n, packet);

        check(send_feedback(sock_ptr, remote_host, remote_port) == EXIT_SUCCESS, "Sending feedback failed.");

        switch (PrrtPacket_type(packet)) {
            case PACKET_TYPE_DATA:
                // packet.timestamp + packet.timeout < now: break

                if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
                    FALSE) {
                    break;
                }

                // check incomplete_prrt_blocks for this seqno: insert if found
                // else: insert in data_packet_store

                // forward to application layer

                pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
                List_push(sock_ptr->inQueue, packet);
                pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
                pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
                break;
            default:
                //PrrtPacket_print(packet);
                break;
        }
    }

    error:
    free(packet);
}