Commit 4ac6f179 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Refactor PrrtPacketDeliveryStore.

parent bdb8f771
Loading
Loading
Loading
Loading
+74 −72
Original line number Diff line number Diff line
@@ -7,58 +7,88 @@
#include "../../util/time.h"
#include "packetDeliveryStore.h"

int packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop, const struct timespec *deadline) {
    q->last_start = start;
    q->last_stop = stop;
    q->in_ordered_wait = true;
    int result = pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
int packet_wait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop, const struct timespec *deadline) {
    store->last_start = start;
    store->last_stop = stop;
    store->in_ordered_wait = true;
    int result = pthread_cond_timedwait(&store->wait_for_data, &store->lock, deadline);
    debug(DEBUG_RECEIVER, "After wait: %d", result);
    q->in_ordered_wait = false;
    store->in_ordered_wait = false;
    return result;
}

PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
    PrrtPacketDeliveryStore *q = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
    q->tree = NULL;
    q->closing = false;
    q->last_start = 0;
    q->last_stop = 0;
    q->in_ordered_wait = false;
    PrrtPacketDeliveryStore *store = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
    store->tree = NULL;
    store->closing = false;
    store->last_start = 0;
    store->last_stop = 0;
    store->in_ordered_wait = false;

    pthread_mutexattr_t attr;
    check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
    check(pthread_mutex_init(&q->lock, &attr) == 0, "Mutex init failed.");
    check(pthread_mutex_init(&store->lock, &attr) == 0, "Mutex init failed.");

    check(pthread_cond_init(&q->wait_for_data, NULL) == EXIT_SUCCESS, "Condition init failed.");
    check(pthread_cond_init(&store->wait_for_data, NULL) == EXIT_SUCCESS, "Condition init failed.");

    return q;
    return store;
    error:
    PERROR("Out of memory%s.", "");
    return NULL;
}

PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *store) {
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
    atomic_store_explicit(&store->closing, true, memory_order_release);
    pthread_cond_broadcast(&store->wait_for_data);
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
    return;
    error:
    PERROR("PrrtPacketDeliveryStore_interrupt() failed.");
}

bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *store) {
    List *packetList = List_create();
    BPTree_get_range(store->tree, packetList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1));
    while (List_count(packetList) > 0) {
        PrrtPacket *block = List_shift(packetList);
        store->tree = BPTree_delete(store->tree, block->sequenceNumber);
        PrrtPacket_destroy(block);
    }

    List_destroy(packetList);

    if(store->tree != NULL) {
        store->tree = BPTree_destroy(store->tree);
    }

    pthread_mutex_destroy(&store->lock);
    pthread_cond_destroy(&store->wait_for_data);
    free(store);
    return true;
}

PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
                                                          prrtTimestamp_t stop, const struct timespec *deadline) {
    PrrtPacket *packet = NULL;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");

    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
    do {
        packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
        packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
        if (packet == NULL) {
            int res = packet_wait(q, start, stop, deadline);
            packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
            int res = packet_wait(store, start, stop, deadline);
            packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
            if (packet == NULL && res == ETIMEDOUT) {
                errno = ETIMEDOUT;
                break;
            }
        }
        if (atomic_load_explicit(&q->closing, memory_order_acquire)) {
        if (atomic_load_explicit(&store->closing, memory_order_acquire)) {
            break;
        }
    } while(!packet);
    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

@@ -67,20 +97,20 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
    return NULL;
}

PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q) {
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *store) {
    PrrtPacket *packet = NULL;
    prrtTimestamp_t start = 0;
    prrtTimestamp_t stop = MAX_TIMESTAMP;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");

    packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
    if (!packet && !atomic_load_explicit(&q->closing, memory_order_acquire)) {
        pthread_cond_wait(&q->wait_for_data, &q->lock);
        packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
    packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
    if (!packet && !atomic_load_explicit(&store->closing, memory_order_acquire)) {
        pthread_cond_wait(&store->wait_for_data, &store->lock);
        packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

@@ -89,25 +119,25 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q)
    return NULL;
}

PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop) {
    PrrtPacket *packet = NULL;

    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");

    if (start > stop) {
        packet = BPTree_get_first_in_range(q->tree, (BPTreeKey_t) start, TIMESTAMP_SPACE-1);
        packet = BPTree_get_first_in_range(store->tree, (BPTreeKey_t) start, TIMESTAMP_SPACE-1);
        if (packet == NULL) {
            packet = BPTree_get_first_in_range(q->tree, 0, (BPTreeKey_t) stop);
            packet = BPTree_get_first_in_range(store->tree, 0, (BPTreeKey_t) stop);
        }
    } else {
        packet = BPTree_get_first_in_range(q->tree, (BPTreeKey_t) start, (BPTreeKey_t) stop);
        packet = BPTree_get_first_in_range(store->tree, (BPTreeKey_t) start, (BPTreeKey_t) stop);
    }

    if(packet != NULL) {
        q->tree = BPTree_delete(q->tree, PrrtDataPacket_packet_timeout(packet));
        store->tree = BPTree_delete(store->tree, PrrtDataPacket_packet_timeout(packet));
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");

    return packet;

@@ -116,48 +146,20 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtT
    return NULL;
}

bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet) {
    check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *store, PrrtPacket *packet) {
    check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
    prrtTimestamp_t timeout = PrrtDataPacket_packet_timeout(packet);
    q->tree = BPTree_insert(q->tree, timeout, packet);
    store->tree = BPTree_insert(store->tree, timeout, packet);

    bool packet_awaited = (PrrtTimestamp_cmp(q->last_start, timeout) <= 0) && (PrrtTimestamp_cmp(timeout, q->last_stop) <= 0);
    if (!q->in_ordered_wait || packet_awaited) {
        pthread_cond_broadcast(&q->wait_for_data);
    bool packet_awaited = (PrrtTimestamp_cmp(store->last_start, timeout) <= 0) && (PrrtTimestamp_cmp(timeout, store->last_stop) <= 0);
    if (!store->in_ordered_wait || packet_awaited) {
        pthread_cond_broadcast(&store->wait_for_data);
    }

    check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
    check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
    return true;

    error:
    PERROR("Insert failed%s.", "");
    return false;
}

bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) {
    List *packetList = List_create();
    BPTree_get_range(q->tree, packetList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1));
    while (List_count(packetList) > 0) {
        PrrtPacket *block = List_shift(packetList);
        q->tree = BPTree_delete(q->tree, block->sequenceNumber);
        PrrtPacket_destroy(block);
    }

    List_destroy(packetList);

    if(q->tree != NULL) {
        q->tree = BPTree_destroy(q->tree);
    }

    pthread_mutex_destroy(&q->lock);
    pthread_cond_destroy(&q->wait_for_data);
    free(q);
    return true;
}

void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q) {
    pthread_mutex_lock(&q->lock);
    atomic_store_explicit(&q->closing, true, memory_order_release);
    pthread_cond_broadcast(&q->wait_for_data);
    pthread_mutex_unlock(&q->lock);
}
 No newline at end of file
+6 −6
Original line number Diff line number Diff line
@@ -16,17 +16,17 @@ typedef struct prrtReceiveDataQueue {

PrrtPacketDeliveryStore* PrrtPacketDeliveryStore_create(void);

PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop);

PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *store);

PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *queue, prrtTimestamp_t start,
PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
                                                         prrtTimestamp_t stop, const struct timespec *deadline);

bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet);
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *store, PrrtPacket *packet);

void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q);
void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *store);

bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q);
bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *store);

#endif //PRRT_RECEIVEDATAQUEUE_H