data_receiver.c 6.03 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
4
5
6
#include <src/defines.h>
#include <src/util/dbg.h>
#include <src/prrt/socket.h>
7
#include <src/prrt/block.h>
8
#include <src/prrt/packet.h>
9
10
#include "data_receiver.h"

11
12
13
14
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno);

void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block);

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int send_feedback(const PrrtSocket *sock_ptr, const char *remote_host, const int remote_port) {
    struct sockaddr_in targetaddr;
    memset((char *) &targetaddr, 0, sizeof(targetaddr));
    targetaddr.sin_family = AF_INET;
    targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

    struct hostent *hp;
    hp = gethostbyname(remote_host);
    memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
    uint8_t buf[MAX_PAYLOAD_LENGTH];
    uint32_t length = PrrtPacket_size(feedback_pkt_ptr);

    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) == EXIT_SUCCESS,
          "Buffer for encoding feedback is too small");
31
32
    check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
          length, "Sending feedback failed.");
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

    return EXIT_SUCCESS;

    error:
    return EXIT_FAILURE;
}

void *receive_data_loop(void *ptr) {
    ssize_t n;
    uint16_t remote_port;
    char *remote_host;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
    PrrtSocket *sock_ptr = ptr;
    PrrtPacket *packet;
49
50
    PrrtPacketRedundancyPayload *redundancyPayload;
    PrrtBlock *block;
51
52
53
54
55
56
57

    while (1) {
        memset(buffer, 0, MAX_PAYLOAD_LENGTH);
        n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
        remote_port = ntohs(remote.sin_port);
        remote_host = inet_ntoa(remote.sin_addr);

Andreas Schmidt's avatar
Andreas Schmidt committed
58
        packet = calloc(1, sizeof(PrrtPacket));
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
        check_mem(packet);
        PrrtPacket_decode(buffer, (uint16_t) n, packet);

        check(send_feedback(sock_ptr, remote_host, remote_port) == EXIT_SUCCESS, "Sending feedback failed.");

        switch (PrrtPacket_type(packet)) {
            case PACKET_TYPE_DATA:
                // packet.timestamp + packet.timeout < now: break

                if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
                    FALSE) {
                    break;
                }

                // check incomplete_prrt_blocks for this seqno: insert if found
                // else: insert in data_packet_store

76
77
                sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, packet);

78
79
80
81
82
83
                // forward to application layer

                pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
                List_push(sock_ptr->inQueue, packet);
                pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
                pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
84
85

                decode_block(sock_ptr, packet->seqno - packet->index);
86
                break;
87
88
89
            case PACKET_TYPE_REDUNDANCY:
                redundancyPayload = packet->payload;

90
91
92
                if (!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
                                                                   redundancyPayload->base_seqno,
                                                                   redundancyPayload->n)) {
93
94
95
                    //printf("-------- IRRELEVANT -----------------------\n");
                    //PrrtPacket_print(packet);
                    //printf("-------- IRRELEVANT -----------------------\n");
96
97
98
99
100
                    PrrtPacket_destroy(packet);
                    break;
                }

                block = BPTree_get(sock_ptr->blockStore, redundancyPayload->base_seqno);
101
                if (block == NULL) {
Andreas Schmidt's avatar
Andreas Schmidt committed
102
                    PrrtCodingParams *cpar = calloc(1, sizeof(PrrtCodingParams));
103
104
105
                    PrrtCodingParams_init(cpar);
                    cpar->k = redundancyPayload->k;
                    cpar->n = redundancyPayload->n;
106
107
                    PrrtBlock_alloc(&block, cpar, redundancyPayload->base_seqno);
                    sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->base_seqno, block);
108
109
110
                }

                PrrtBlock_insert_redundancy_packet(block, packet);
111
                decode_block(sock_ptr, redundancyPayload->base_seqno);
112
                break;
113
114
115
116
            default:
                //PrrtPacket_print(packet);
                break;
        }
117
118


119
120
121
122
    }

    error:
    free(packet);
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
}

void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno) {
    PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);

    if (block != NULL && PrrtBlock_decode_ready(block)) {
        retrieve_data_blocks(sock_ptr, base_seqno, block->coding_params.k, block);

        PrrtBlock_decode(block);

        LIST_FOREACH(block->data_blocks, first, next, cur) {
            PrrtPacket *pkt = cur->value;
            if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
                printf("RECOVERED %d via coding.\n", pkt->seqno);
                pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
                List_push(sock_ptr->inQueue, pkt);
                pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
                pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
            }
        }
    }
}

void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block) {
    List *res = List_create();
    BPTree_get_range(sock_ptr->dataStore, res, base_seqno, base_seqno + k - 1);

    LIST_FOREACH(res, first, next, cur) {
            PrrtPacket *packetPtr = cur->value;
            PrrtBlock_insert_data_packet(block, packetPtr);
            sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, packetPtr->seqno);
        }
155
}