packetDeliveryStore.c 6.74 KB
Newer Older
1
#include <pthread.h>
2
#include "../clock.h"
3
#include "../types/packet.h"
4
#include "../../defines.h"
5
6
#include "../../util/common.h"
#include "../../util/dbg.h"
7
#include "../../util/time.h"
8
#include "packetDeliveryStore.h"
9

10
11
12
13
14
int packet_wait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop, const struct timespec *deadline) {
    store->last_start = start;
    store->last_stop = stop;
    store->in_ordered_wait = true;
    int result = pthread_cond_timedwait(&store->wait_for_data, &store->lock, deadline);
15
    debug(DEBUG_RECEIVER, "After wait: %d", result);
16
    store->in_ordered_wait = false;
17
18
19
    return result;
}

20
PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
21
22
23
24
25
26
    PrrtPacketDeliveryStore *store = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
    store->tree = NULL;
    store->closing = false;
    store->last_start = 0;
    store->last_stop = 0;
    store->in_ordered_wait = false;
27
28
29
30

    pthread_mutexattr_t attr;
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
31
    check(pthread_mutex_init(&store->lock, &attr) == 0, "Mutex init failed.");
32

33
    check(pthread_cond_init(&store->wait_for_data, NULL) == EXIT_SUCCESS, "Condition init failed.");
34

35
    return store;
36
37
38
39
40
    error:
    PERROR("Out of memory%s.", "");
    return NULL;
}

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *store) {
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
    atomic_store_explicit(&store->closing, true, memory_order_release);
    pthread_cond_broadcast(&store->wait_for_data);
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
    return;
    error:
    PERROR("PrrtPacketDeliveryStore_interrupt() failed.");
}

bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *store) {
    List *packetList = List_create();
    BPTree_get_range(store->tree, packetList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1));
    while (List_count(packetList) > 0) {
        PrrtPacket *block = List_shift(packetList);
        store->tree = BPTree_delete(store->tree, block->sequenceNumber);
        PrrtPacket_destroy(block);
    }

    List_destroy(packetList);

    if(store->tree != NULL) {
        store->tree = BPTree_destroy(store->tree);
    }

    pthread_mutex_destroy(&store->lock);
    pthread_cond_destroy(&store->wait_for_data);
    free(store);
    return true;
}

PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
73
                                                          prrtTimestamp_t stop, const struct timespec *deadline) {
74
75
    PrrtPacket *packet = NULL;

76
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
77
    do {
78
        packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
79
        if (packet == NULL) {
80
81
            int res = packet_wait(store, start, stop, deadline);
            packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
82
83
            if (packet == NULL && res == ETIMEDOUT) {
                errno = ETIMEDOUT;
84
85
                break;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
86
        }
87
        if (atomic_load_explicit(&store->closing, memory_order_acquire)) {
88
89
            break;
        }
90
    } while(!packet);
91
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
92
93
94
95

    return packet;

    error:
96
    PERROR("PrrtPacketDeliveryStore_get_packet_timedwait failed%s.", "");
97
98
99
    return NULL;
}

100
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *store) {
101
    PrrtPacket *packet = NULL;
102
103
    prrtTimestamp_t start = 0;
    prrtTimestamp_t stop = MAX_TIMESTAMP;
104

105
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
106

107
108
109
110
    packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
    if (!packet && !atomic_load_explicit(&store->closing, memory_order_acquire)) {
        pthread_cond_wait(&store->wait_for_data, &store->lock);
        packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
111
112
    }

113
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
114
115
116
117

    return packet;

    error:
118
    PERROR("PrrtPacketDeliveryStore_get_packet_timedwait failed%s.", "");
119
120
121
    return NULL;
}

122
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop) {
123
124
    PrrtPacket *packet = NULL;

125
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
126
127

    if (start > stop) {
128
        packet = BPTree_get_first_in_range(store->tree, (BPTreeKey_t) start, TIMESTAMP_SPACE - 1);
129
        if (packet == NULL) {
130
            packet = BPTree_get_first_in_range(store->tree, 0, (BPTreeKey_t) stop);
131
        }
132
    } else {
133
        packet = BPTree_get_first_in_range(store->tree, (BPTreeKey_t) start, (BPTreeKey_t) stop);
134
135
    }

136
    if(packet != NULL) {
137
        store->tree = BPTree_delete(store->tree, PrrtDataPacket_packet_timeout(packet));
138
139
    }

140
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
141
142
143
144
145
146
147
148

    return packet;

    error:
    PERROR("Get failed%s.", "");
    return NULL;
}

149
150
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *store, PrrtPacket *packet) {
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
151
    prrtTimestamp_t timeout = PrrtDataPacket_packet_timeout(packet);
152
    store->tree = BPTree_insert(store->tree, timeout, packet);
153

154
155
156
    bool packet_awaited = (PrrtTimestamp_cmp(store->last_start, timeout) <= 0) && (PrrtTimestamp_cmp(timeout, store->last_stop) <= 0);
    if (!store->in_ordered_wait || packet_awaited) {
        pthread_cond_broadcast(&store->wait_for_data);
157
158
    }

159
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
160
161
162
163
164
165
    return true;

    error:
    PERROR("Insert failed%s.", "");
    return false;
}
166
167

void PrrtPacketDeliveryStore_cleanup(PrrtPacketDeliveryStore *store, prrtTimestamp_t now) {
Andreas Schmidt's avatar
Andreas Schmidt committed
168
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
169
170
171
172
173
    List* packetList = List_create();
    if (now >= TIMESTAMP_SPACE/2) {
        BPTree_get_range(store->tree, packetList, now - TIMESTAMP_SPACE/2, now);
    } else {
        BPTree_get_range(store->tree, packetList, 0, now);
174
        BPTree_get_range(store->tree, packetList, now + TIMESTAMP_SPACE/2, TIMESTAMP_SPACE - 1);
175
176
    }
    while (List_count(packetList) > 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
177
178
179
        PrrtPacket *packet = List_shift(packetList);
        store->tree = BPTree_delete(store->tree, PrrtDataPacket_packet_timeout(packet));
        PrrtPacket_destroy(packet);
180
181
    }
    List_destroy(packetList);
Andreas Schmidt's avatar
Andreas Schmidt committed
182
183
184
185
186
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
    return;

    error:
    PERROR("PrrtPacketDeliveryStore_cleanup failed%s.", "");
187
}