receiveDataQueue.c 3.78 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <pthread.h>
#include "../packet.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "receiveDataQueue.h"

PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() {
    PrrtReceiveDataQueue *q = (PrrtReceiveDataQueue *) calloc(1, sizeof(PrrtReceiveDataQueue));
    q->tree = NULL;

    pthread_mutexattr_t attr;
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
    check(pthread_mutex_init(&q->lock, &attr) == 0, "Mutex init failed.");

    check(pthread_cond_init(&q->wait_for_data, NULL) == EXIT_SUCCESS, "Condition init failed.");

    return q;
    error:
    PERROR("Out of memory%s.", "");
    return NULL;
}

24
25
PrrtPacket * PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *q, prrtTimestamp_t start,
                                                       prrtTimestamp_t stop, const struct timespec *deadline) {
26
27
28
29
30
31
    PrrtPacket *packet = NULL;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");

    packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
    if (!packet) {
32
        pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
33
34
35
36
37
38
39
40
        packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

    error:
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", "");
    return NULL;
}

PrrtPacket * PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtTimestamp_t start,
                                                       prrtTimestamp_t stop) {
    PrrtPacket *packet = NULL;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");

    packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
    if (!packet) {
        pthread_cond_wait(&q->wait_for_data, &q->lock);
        packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

    error:
    PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", "");
63
64
65
66
67
68
69
70
71
    return NULL;
}

PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
    PrrtPacket *packet = NULL;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");

    if (start > stop) {
72
73
74
75
        packet = BPTree_get_first_in_range(q->tree, (BPTreeKey_t) start, TIMESTAMP_SPACE-1);
        if (packet == NULL) {
            packet = BPTree_get_first_in_range(q->tree, 0, (BPTreeKey_t) stop);
        }
76
    } else {
77
        packet = BPTree_get_first_in_range(q->tree, (BPTreeKey_t) start, (BPTreeKey_t) stop);
78
79
    }

80
    if(packet != NULL) {
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        q->tree = BPTree_delete(q->tree, PrrtDataPacket_packet_timeout(packet));
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

    error:
    PERROR("Get failed%s.", "");
    return NULL;
}

bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet) {
    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
    q->tree = BPTree_insert(q->tree, PrrtDataPacket_packet_timeout(packet), packet);
    pthread_cond_broadcast(&q->wait_for_data);
    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
    return true;

    error:
    PERROR("Insert failed%s.", "");
    return false;
}

bool PrrtReceiveDataQueue_destroy(PrrtReceiveDataQueue *q) {
    pthread_mutex_destroy(&q->lock);
    pthread_cond_destroy(&q->wait_for_data);
    free(q);
    return true;
}

void PrrtReceiveDataQueue_wake(PrrtReceiveDataQueue *q) {
    pthread_mutex_lock(&q->lock);
    pthread_cond_broadcast(&q->wait_for_data);
    pthread_mutex_unlock(&q->lock);
}