block.c 12.1 KB
Newer Older
1
#include <string.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
3
4
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
5
#include "block.h"
6

rna's avatar
rna committed
7
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
Andreas Schmidt's avatar
Andreas Schmidt committed
8
{
Andreas Schmidt's avatar
Andreas Schmidt committed
9
    uint32_t i;
10
    uint32_t m = 0;
11

12
    uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
13

Andreas Schmidt's avatar
Andreas Schmidt committed
14
    for(i = 0; i < redundancyBlocks; i++) {
15
        PrrtPacket *packet = List_shift(block_ptr->redundancyPackets);
16
17
18
19
20

        while(idx_p[m] != -1) {
            m++;
        }

21
22
23
24
25
26
27
28
        // Take the data payload out of the redundancy payload and
        // decode/deserialize it into the fec matrix
        PrrtPacket_decode_payload(
                packet->payload + PRRT_PACKET_REDUNDANCY_HEADER_SIZE,
                PACKET_TYPE_DATA,
                fec[m],
                packet->payloadLength
        );
29
        idx_p[m] = packet->index;
30
        PrrtPacket_destroy(packet);
31
32
33
    }
}

rna's avatar
rna committed
34
static void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
Andreas Schmidt's avatar
Andreas Schmidt committed
35
{
36
37
    LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
        PrrtPacket *packet = current->value;
rna's avatar
rna committed
38
        PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, 0);
39
        idx_p[packet->index] = packet->index;
40
41
    }
}
Andreas Schmidt's avatar
Andreas Schmidt committed
42

43
static void clear_list(gf *const *src, uint8_t k)
Andreas Schmidt's avatar
Andreas Schmidt committed
44
{
45
46
47
48
49
    int j = 0;
    for(j = 0; j < k; j++) {
        free(src[j]);
    }
}
Andreas Schmidt's avatar
Andreas Schmidt committed
50

51
52
53
54
55
56
57
58
59
void PrrtBlock_print(PrrtBlock *block) {
    printf(" 0                   1                   2                   3\n"
           " 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1\n"
           "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    printf("| %29u | %13u | %13u |\n", block->baseSequenceNumber, block->codingParams->n, block->codingParams->k);
    printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    printf("| %13d | %13d |\n", List_count(block->dataPackets), List_count(block->redundancyPackets));
    printf("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
}
Andreas Schmidt's avatar
Andreas Schmidt committed
60

61
62
63
64
65
bool PrrtBlock_destroy(PrrtBlock *block)
{
    check(pthread_mutex_lock(&block->lock) == EXIT_SUCCESS, "Lock failed.");
    while(List_count(block->dataPackets) > 0) {
        PrrtPacket *pkt = List_shift(block->dataPackets);
66
67
        PrrtPacket_destroy(pkt);
    }
68
69
    while(List_count(block->redundancyPackets) > 0) {
        PrrtPacket *pkt = List_shift(block->redundancyPackets);
70
71
        PrrtPacket_destroy(pkt);
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
72

73
74
    if(block->coder != NULL) {
        PrrtCoder_destroy(block->coder);
75
76
    }

77
78
    if(block->codingParams != NULL) {
        PrrtCodingConfiguration_destroy(block->codingParams);
79
80
    }

81
82
83
84
85
86
    List_destroy(block->dataPackets);
    List_destroy(block->redundancyPackets);

    check(pthread_mutex_unlock(&block->lock) == EXIT_SUCCESS, "Unlock failed.");
    check(pthread_mutex_destroy(&block->lock) == EXIT_SUCCESS, "Mutex init failed.");
    free(block);
Andreas Schmidt's avatar
Andreas Schmidt committed
87
88
89
90

    return true;

    error:
91
    PERROR("PrrtBlock_destroy() failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
92
    return false;
Andreas Schmidt's avatar
Andreas Schmidt committed
93
94
}

95
PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, prrtSequenceNumber_t baseSequenceNumber)
Andreas Schmidt's avatar
Andreas Schmidt committed
96
{
Andreas Schmidt's avatar
Andreas Schmidt committed
97
98
    PrrtBlock *block_ptr = calloc(1, sizeof(PrrtBlock));
    check_mem(block_ptr);
99

100
    block_ptr->coder = coder;
101
    block_ptr->codingParams = cpar;
Andreas Schmidt's avatar
Andreas Schmidt committed
102
103
104
    block_ptr->dataPackets = List_create();
    block_ptr->redundancyPackets = List_create();
    block_ptr->baseSequenceNumber = baseSequenceNumber;
rna's avatar
rna committed
105
    block_ptr->largestPayloadLength = 0;
106
107
    block_ptr->nextRedundancyPacket = 0;
    block_ptr->senderBlock = false;
108

Andreas Schmidt's avatar
Andreas Schmidt committed
109
110
111
112
113
114
    pthread_mutexattr_t attr;
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
    check(pthread_mutex_init(&block_ptr->lock, &attr) == 0, "Mutex init failed.");

    return block_ptr;
115
116
117
118

    error:
        PERROR("Memory issue.%s","");
        return NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
119
}
120

121
bool insert_data_packet(PrrtBlock * block, const PrrtPacket *packet) {
122
    bool found = false;
123
124
    debug(DEBUG_BLOCK, "S: %d, Insert [D]: %d", block->senderBlock, packet->sequenceNumber);
    LIST_FOREACH(block->dataPackets, first, next, cur) {
125
        PrrtPacket *pkt = cur->value;
126
        if(pkt->sequenceNumber == packet->sequenceNumber) {
127
            found = true;
128
129
        }
    }
130

131
    if(found == false) {
132
        List_push(block->dataPackets, packet);
133
    }
134
135
136
137
138
139
140
141
142
143
144
    return found;
}

bool PrrtBlock_insert_data_packet(PrrtBlock *block, const PrrtPacket *packet)
{
    check(pthread_mutex_lock(&block->lock) == EXIT_SUCCESS, "Lock failed.");
    check(!block->isDecoded, "Block already coded.")
    bool found = insert_data_packet(block, packet);
    block->largestPayloadLength = (prrtPacketLength_t) MAX(block->largestPayloadLength,
                                                           packet->payloadLength);
    check(pthread_mutex_unlock(&block->lock) == EXIT_SUCCESS, "Unlock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
145
146
147
148
149
    return found == false;

    error:
    PERROR("Insert data failed.")
    return false;
150
}
Andreas Schmidt's avatar
Andreas Schmidt committed
151

152
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block, const PrrtPacket *packet)
Andreas Schmidt's avatar
Andreas Schmidt committed
153
{
154
    bool found = false;
155
156
157
    check(pthread_mutex_lock(&block->lock) == EXIT_SUCCESS, "Lock failed.");
    debug(DEBUG_BLOCK, "S: %d, Insert [R]: %d", block->senderBlock, packet->sequenceNumber);
    LIST_FOREACH(block->redundancyPackets, first, next, cur) {
158
        PrrtPacket *pkt = cur->value;
159
        if(pkt->sequenceNumber == packet->sequenceNumber) {
160
            found = true;
161
162
        }
    }
163
    if(found == false) {
164
165
166
        List_push(block->redundancyPackets, packet);
        block->largestPayloadLength = (prrtPacketLength_t) MAX(block->largestPayloadLength,
                                                                packet->payloadLength - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
167
    }
168
    check(pthread_mutex_unlock(&block->lock) == EXIT_SUCCESS, "Unlock failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
169
170
171
172
173
    return found == false;

    error:
    PERROR("Insert data failed.")
    return false;
174
}
175

Andreas Schmidt's avatar
Andreas Schmidt committed
176
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr)
Andreas Schmidt's avatar
Andreas Schmidt committed
177
{
Andreas Schmidt's avatar
Andreas Schmidt committed
178
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
179
    bool res = List_count(block_ptr->dataPackets) == block_ptr->codingParams->k;
Andreas Schmidt's avatar
Andreas Schmidt committed
180
181
182
183
184
185
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
    return res;

    error:
    PERROR("Encode ready failed.")
    return false;
186
187
}

Andreas Schmidt's avatar
Andreas Schmidt committed
188
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr)
Andreas Schmidt's avatar
Andreas Schmidt committed
189
{
Andreas Schmidt's avatar
Andreas Schmidt committed
190
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
191
    bool res = (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams->k);
Andreas Schmidt's avatar
Andreas Schmidt committed
192
193
194
195
196
197
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
    return res;

    error:
    PERROR("Decode ready failed.")
    return false;
198
199
}

Andreas Schmidt's avatar
Andreas Schmidt committed
200
201
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
202
203
204
205
206
207
208
209
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
    PrrtPacket *res = List_shift(block_ptr->dataPackets);
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");

    return res;
    error:
    PERROR("Get first data failed.")
    return NULL;
210
211
}

212
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
Andreas Schmidt's avatar
Andreas Schmidt committed
213
{
Andreas Schmidt's avatar
Andreas Schmidt committed
214
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
215
    int j = 0;
216
217
    uint8_t k = block_ptr->codingParams->k;
    uint8_t r = block_ptr->codingParams->r;
218
    prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
rna's avatar
rna committed
219
    prrtPacketLength_t length = block_ptr->largestPayloadLength;
220

Andreas Schmidt's avatar
Andreas Schmidt committed
221
    gf **src = calloc(k, sizeof(gf *));
222
    check_mem(src);
223

224
    LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
225
        src[j] = calloc(length, sizeof(gf));
226
        check_mem(src[j]);
Andreas Schmidt's avatar
Andreas Schmidt committed
227
        PrrtPacket *pkt = cur->value;
228
        pkt->index = (uint8_t) ((pkt->sequenceNumber - baseSequenceNumber) % SEQNO_SPACE);
Sven Liefgen's avatar
Sven Liefgen committed
229
        PrrtPacket_encode_payload(src[j], pkt);
230
231
232
        j++;
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
233
    gf **fec = calloc(1, sizeof(gf *) * r);
234
    check_mem(fec);
235
    for(j = 0; j < r; j++) {
236
        fec[j] = calloc(length, sizeof(gf));
237
        check_mem(fec[j]);
238
239
240
        // Encode/Serialize the payload and store it in the fec matrix
        // A simple copy does not suffice, as the payload is send 'as is'
        // over the wire. It however has to conform the protocol specification.
241
        PrrtCoder_encode(block_ptr->coder, src, fec[j], j + k, length);
Andreas Schmidt's avatar
Andreas Schmidt committed
242
        PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
243
                                                                         (uint8_t) (k + j), block_ptr->baseSequenceNumber,
Andreas Schmidt's avatar
Andreas Schmidt committed
244
                                                                         block_ptr->codingParams);
245
        *seqno = (prrtSequenceNumber_t) ((*seqno + 1) % SEQNO_SPACE);
246
247
        PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
    }
248

249
    block_ptr->isCoded = true;
250

251
252
253
    clear_list(fec, r);
    free(fec);

254
255
    clear_list(src, k);
    free(src);
Andreas Schmidt's avatar
Andreas Schmidt committed
256
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");
257

Andreas Schmidt's avatar
Andreas Schmidt committed
258
259
    return;

260
261
    error:
        PERROR("Encoding failed%s.", "");
262
263
}

264
bool PrrtBlock_decode(PrrtBlock *block_ptr)
Andreas Schmidt's avatar
Andreas Schmidt committed
265
{
266
    debug(DEBUG_BLOCK, "Decode Block: %d", block_ptr->baseSequenceNumber);
rna's avatar
rna committed
267
    prrtIndex_t i, j = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
268
    gf **fec = NULL;
rna's avatar
rna committed
269
270
    int16_t *idx_p = NULL;
    prrtIndex_t k = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
271
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
272
    block_ptr->isDecoded = true;
273
    k = block_ptr->codingParams->k;
274
    prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
rna's avatar
rna committed
275
    prrtPacketLength_t length = block_ptr->largestPayloadLength;
276

Andreas Schmidt's avatar
Andreas Schmidt committed
277
    fec = calloc(k, sizeof(gf *));
278
    check_mem(fec);
279
    for(i = 0; i < k; i++) {
rna's avatar
rna committed
280
        fec[i] = calloc(block_ptr->largestPayloadLength, sizeof(gf));
281
282
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
283
    idx_p = calloc(k, sizeof(int));
284
    check_mem(idx_p);
285
286
287
    for(i = 0; i < k; i++) {
        idx_p[i] = -1;
    }
288

289
    // Store the data payload with header in the fec matrix
290
    gather_data_packets(block_ptr, fec, idx_p);
291
    // Store the data payload with header from the redundancy packets in the fec matrix
292
    gather_redundancy_packets(block_ptr, fec, idx_p);
293

294
    // Decode the fec matrix in place
295
    check(PrrtCoder_decode(block_ptr->coder, fec, idx_p, length) == EXIT_SUCCESS, "Could not decode current block.");
296

297
    for(j = 0; j < k; j++) {
298
        if(idx_p[j] >= k) {
299
300
301
            // The content of fec[j] is correctly deserialized and can be
            // cast into a payload struct
            PrrtPacketDataPayload* packet_and_payload = (PrrtPacketDataPayload *) fec[j];
rna's avatar
rna committed
302
303
            PrrtPacket *packet = PrrtPacket_reconstruct_data_packet(packet_and_payload, j,
                                                                    (prrtSequenceNumber_t) (baseSequenceNumber + j));
304
305
            debug(DEBUG_BLOCK, "Reconstructed [D]: %d", packet->sequenceNumber);
            if(insert_data_packet(block_ptr, packet)) {
Stefan Reif's avatar
Stefan Reif committed
306
                debug(DEBUG_BLOCK, "Tried to insert unnecessary packet.");
307
308
309
                PrrtPacket_destroy(packet);
            }
        }
310
311
    }

312
313
314
    clear_list(fec, k);
    free(fec);
    free(idx_p);
Andreas Schmidt's avatar
Andreas Schmidt committed
315
316
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");

317
318
    return true;

319
    error:
320

321
    clear_list(fec, k);
322
323
324
325
326
327
    if(fec != NULL) {
        free(fec);
    }
    if(idx_p != NULL) {
        free(idx_p);
    }
328
    return false;
329
330
}

Andreas Schmidt's avatar
Andreas Schmidt committed
331
332
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
333
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
    // TODO: This copy should be avoided.
    PrrtPacket *res = PrrtPacket_copy(PrrtBlock_get_red_data(block_ptr, block_ptr->nextRedundancyPacket));
    block_ptr->nextRedundancyPacket++;
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");

    return res;
    error:
    PERROR("Get first data failed.")
    return NULL;
}

PrrtPacket *PrrtBlock_get_red_data(PrrtBlock *block_ptr, uint8_t index)
{
    check(pthread_mutex_lock(&block_ptr->lock) == EXIT_SUCCESS, "Lock failed.");
    PrrtPacket *res = (PrrtPacket *) List_get(block_ptr->redundancyPackets, index);
Andreas Schmidt's avatar
Andreas Schmidt committed
349
350
351
352
353
354
    check(pthread_mutex_unlock(&block_ptr->lock) == EXIT_SUCCESS, "Unlock failed.");

    return res;
    error:
    PERROR("Get first data failed.")
    return NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
355
}