Commit 70d02f04 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Rename receiveDataQueue into packetDeliveryStore.

parent ce45efc6
Pipeline #1709 failed with stages
in 1 minute and 16 seconds
...@@ -106,7 +106,7 @@ cdef extern from "proto/socket.h": ...@@ -106,7 +106,7 @@ cdef extern from "proto/socket.h":
pthread_t receiveDataThread pthread_t receiveDataThread
pthread_mutex_t inQueueFilledMutex pthread_mutex_t inQueueFilledMutex
pthread_cond_t inQueueFilledMutexCv pthread_cond_t inQueueFilledMutexCv
PrrtReceiveDataQueue *receiveDataQueue PrrtPacketDeliveryStore *packetDeliveryStore
BPTreeNode* dataStore BPTreeNode* dataStore
...@@ -147,8 +147,8 @@ cdef extern from "proto/socket.h": ...@@ -147,8 +147,8 @@ cdef extern from "proto/socket.h":
cdef extern from "proto/stores/receiveDataQueue.h": cdef extern from "proto/stores/packetDeliveryStore.h":
ctypedef struct PrrtReceiveDataQueue: ctypedef struct PrrtPacketDeliveryStore:
pass pass
cdef extern from "util/bptree.h": cdef extern from "util/bptree.h":
......
...@@ -18,6 +18,6 @@ add_library(PRRT ../defines.h ...@@ -18,6 +18,6 @@ add_library(PRRT ../defines.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h stores/dataPacketStore.c stores/dataPacketStore.h
types/packetTimeout.c types/packetTimeout.h types/packetTimeout.c types/packetTimeout.h
stores/receiveDataQueue.c stores/receiveDataQueue.h) stores/packetDeliveryStore.c stores/packetDeliveryStore.h)
target_link_libraries(PRRT rt) target_link_libraries(PRRT rt)
...@@ -35,7 +35,7 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { ...@@ -35,7 +35,7 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
while (List_count(block->dataPackets) > 0) { while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets); PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) { if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) {
PrrtReceiveDataQueue_insert(sock_ptr->receiveDataQueue, pkt); PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
} else { } else {
PrrtPacket_destroy(pkt); PrrtPacket_destroy(pkt);
} }
...@@ -141,7 +141,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct ...@@ -141,7 +141,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage); XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage); XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
PrrtReceiveDataQueue_insert(sock_ptr->receiveDataQueue, packet); PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
} }
return; return;
......
...@@ -103,7 +103,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay ...@@ -103,7 +103,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
s->deliveredPacketTable = PrrtDeliveredPacketTable_create(); s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
s->repairBlockStore = PrrtRepairBlockStore_create(); s->repairBlockStore = PrrtRepairBlockStore_create();
s->receiveDataQueue = PrrtReceiveDataQueue_create(); s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
} }
return s; return s;
...@@ -273,7 +273,7 @@ int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) { ...@@ -273,7 +273,7 @@ int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
check(s->isSender == false, "Cannot receive on sender socket.") check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet; PrrtPacket *packet;
packet = PrrtReceiveDataQueue_get_packet(s->receiveDataQueue, 0, MAX_TIMESTAMP); packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
return deliver_packet(s, buf_ptr, packet); return deliver_packet(s, buf_ptr, packet);
error: error:
...@@ -286,7 +286,7 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) { ...@@ -286,7 +286,7 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
PrrtPacket *packet; PrrtPacket *packet;
do { do {
packet = PrrtReceiveDataQueue_get_packet_wait(s->receiveDataQueue, 0, MAX_TIMESTAMP); packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(s)) { if (PrrtSocket_closing(s)) {
return -1; return -1;
} }
...@@ -302,7 +302,7 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) { ...@@ -302,7 +302,7 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) { int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
check(s->isSender == false, "Cannot receive on sender socket.") check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, 0, MAX_TIMESTAMP, deadline); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
return deliver_packet(s, buf_ptr, packet); return deliver_packet(s, buf_ptr, packet);
error: error:
...@@ -320,7 +320,8 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t ...@@ -320,7 +320,8 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t
check(s->isSender == false, "Cannot receive on sender socket.") check(s->isSender == false, "Cannot receive on sender socket.")
prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet(s->receiveDataQueue, now - time_window_us, now + time_window_us); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
now + time_window_us);
return deliver_packet(s, buf_ptr, packet); return deliver_packet(s, buf_ptr, packet);
error: error:
PERROR("There was a failure while receiving from socket.%s", ""); PERROR("There was a failure while receiving from socket.%s", "");
...@@ -335,7 +336,8 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimede ...@@ -335,7 +336,8 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimede
prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us();
struct timespec deadline = abstime_from_now(time_window_us); struct timespec deadline = abstime_from_now(time_window_us);
packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, now - time_window_us, now + time_window_us, &deadline); packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
now + time_window_us, &deadline);
if (PrrtSocket_closing(s)) { if (PrrtSocket_closing(s)) {
return -1; return -1;
} }
...@@ -353,7 +355,8 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT ...@@ -353,7 +355,8 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, now - time_window_us, now + time_window_us, deadline); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
now + time_window_us, deadline);
return deliver_packet(s, buf_ptr, packet); return deliver_packet(s, buf_ptr, packet);
error: error:
PERROR("There was a failure while receiving from socket.%s", ""); PERROR("There was a failure while receiving from socket.%s", "");
...@@ -364,8 +367,8 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT ...@@ -364,8 +367,8 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
int PrrtSocket_interrupt(PrrtSocket *s) { int PrrtSocket_interrupt(PrrtSocket *s) {
atomic_store_explicit(&s->closing, true, memory_order_release); atomic_store_explicit(&s->closing, true, memory_order_release);
if (s->receiveDataQueue) if (s->packetDeliveryStore)
PrrtReceiveDataQueue_wake(s->receiveDataQueue); PrrtPacketDeliveryStore_wake(s->packetDeliveryStore);
void **res = NULL; void **res = NULL;
...@@ -420,9 +423,9 @@ int PrrtSocket_close(PrrtSocket *s) { ...@@ -420,9 +423,9 @@ int PrrtSocket_close(PrrtSocket *s) {
s->sendDataQueue = NULL; s->sendDataQueue = NULL;
} }
if (s->receiveDataQueue != NULL) { if (s->packetDeliveryStore != NULL) {
PrrtReceiveDataQueue_destroy(s->receiveDataQueue); PrrtPacketDeliveryStore_destroy(s->packetDeliveryStore);
s->receiveDataQueue = NULL; s->packetDeliveryStore = NULL;
} }
if (s->deliveredPacketTable != NULL) { if (s->deliveredPacketTable != NULL) {
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include "stores/deliveredPacketTable.h" #include "stores/deliveredPacketTable.h"
#include "stores/packetTimeoutTable.h" #include "stores/packetTimeoutTable.h"
#include "stores/repairBlockStore.h" #include "stores/repairBlockStore.h"
#include "stores/receiveDataQueue.h" #include "stores/packetDeliveryStore.h"
#include "clock.h" #include "clock.h"
#include "../xlap/xlap.h" #include "../xlap/xlap.h"
#include "receiver.h" #include "receiver.h"
...@@ -39,7 +39,7 @@ typedef struct prrtSocket { ...@@ -39,7 +39,7 @@ typedef struct prrtSocket {
MPSCQueue *sendDataQueue; MPSCQueue *sendDataQueue;
pthread_t receiveDataThread; pthread_t receiveDataThread;
PrrtReceiveDataQueue* receiveDataQueue; PrrtPacketDeliveryStore* packetDeliveryStore;
PrrtPacketTimeoutTable *packetTimeoutTable; PrrtPacketTimeoutTable *packetTimeoutTable;
......
...@@ -2,10 +2,10 @@ ...@@ -2,10 +2,10 @@
#include "../packet.h" #include "../packet.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
#include "receiveDataQueue.h" #include "packetDeliveryStore.h"
PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() { PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
PrrtReceiveDataQueue *q = (PrrtReceiveDataQueue *) calloc(1, sizeof(PrrtReceiveDataQueue)); PrrtPacketDeliveryStore *q = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
q->tree = NULL; q->tree = NULL;
pthread_mutexattr_t attr; pthread_mutexattr_t attr;
...@@ -21,16 +21,16 @@ PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() { ...@@ -21,16 +21,16 @@ PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() {
return NULL; return NULL;
} }
PrrtPacket * PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *q, prrtTimestamp_t start, PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline) { prrtTimestamp_t stop, const struct timespec *deadline) {
PrrtPacket *packet = NULL; PrrtPacket *packet = NULL;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtReceiveDataQueue_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (!packet) { if (!packet) {
pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline); pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
packet = PrrtReceiveDataQueue_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
} }
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed."); check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
...@@ -38,20 +38,20 @@ PrrtPacket * PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *q, ...@@ -38,20 +38,20 @@ PrrtPacket * PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *q,
return packet; return packet;
error: error:
PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", ""); PERROR("PrrtPacketDeliveryStore_get_packet_timedwait failed%s.", "");
return NULL; return NULL;
} }
PrrtPacket * PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtTimestamp_t start, PrrtPacket * PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
prrtTimestamp_t stop) { prrtTimestamp_t stop) {
PrrtPacket *packet = NULL; PrrtPacket *packet = NULL;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtReceiveDataQueue_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (!packet) { if (!packet) {
pthread_cond_wait(&q->wait_for_data, &q->lock); pthread_cond_wait(&q->wait_for_data, &q->lock);
packet = PrrtReceiveDataQueue_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
} }
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed."); check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
...@@ -59,11 +59,11 @@ PrrtPacket * PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtT ...@@ -59,11 +59,11 @@ PrrtPacket * PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtT
return packet; return packet;
error: error:
PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", ""); PERROR("PrrtPacketDeliveryStore_get_packet_timedwait failed%s.", "");
return NULL; return NULL;
} }
PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop) { PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket *packet = NULL; PrrtPacket *packet = NULL;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
...@@ -90,7 +90,7 @@ PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimesta ...@@ -90,7 +90,7 @@ PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimesta
return NULL; return NULL;
} }
bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet) { bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet) {
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
q->tree = BPTree_insert(q->tree, PrrtDataPacket_packet_timeout(packet), packet); q->tree = BPTree_insert(q->tree, PrrtDataPacket_packet_timeout(packet), packet);
pthread_cond_broadcast(&q->wait_for_data); pthread_cond_broadcast(&q->wait_for_data);
...@@ -102,14 +102,14 @@ bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet) { ...@@ -102,14 +102,14 @@ bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet) {
return false; return false;
} }
bool PrrtReceiveDataQueue_destroy(PrrtReceiveDataQueue *q) { bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) {
pthread_mutex_destroy(&q->lock); pthread_mutex_destroy(&q->lock);
pthread_cond_destroy(&q->wait_for_data); pthread_cond_destroy(&q->wait_for_data);
free(q); free(q);
return true; return true;
} }
void PrrtReceiveDataQueue_wake(PrrtReceiveDataQueue *q) { void PrrtPacketDeliveryStore_wake(PrrtPacketDeliveryStore *q) {
pthread_mutex_lock(&q->lock); pthread_mutex_lock(&q->lock);
pthread_cond_broadcast(&q->wait_for_data); pthread_cond_broadcast(&q->wait_for_data);
pthread_mutex_unlock(&q->lock); pthread_mutex_unlock(&q->lock);
......
#ifndef PRRT_RECEIVEDATAQUEUE_H
#define PRRT_RECEIVEDATAQUEUE_H
#include <sys/types.h>
#include "../../util/bptree.h"
typedef struct prrtReceiveDataQueue {
pthread_mutex_t lock;
BPTreeNode *tree;
pthread_cond_t wait_for_data;
} PrrtPacketDeliveryStore;
PrrtPacketDeliveryStore* PrrtPacketDeliveryStore_create(void);
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
prrtTimestamp_t stop);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *queue, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline);
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet);
void PrrtPacketDeliveryStore_wake(PrrtPacketDeliveryStore *q);
bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q);
#endif //PRRT_RECEIVEDATAQUEUE_H
#ifndef PRRT_RECEIVEDATAQUEUE_H
#define PRRT_RECEIVEDATAQUEUE_H
#include <sys/types.h>
#include "../../util/bptree.h"
typedef struct prrtReceiveDataQueue {
pthread_mutex_t lock;
BPTreeNode *tree;
pthread_cond_t wait_for_data;
} PrrtReceiveDataQueue;
PrrtReceiveDataQueue* PrrtReceiveDataQueue_create(void);
PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *queue, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline);
bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet);
void PrrtReceiveDataQueue_wake(PrrtReceiveDataQueue* q);
bool PrrtReceiveDataQueue_destroy(PrrtReceiveDataQueue* q);
#endif //PRRT_RECEIVEDATAQUEUE_H
...@@ -21,7 +21,7 @@ cdef extern from "proto/stores/packetTimeoutTable.c": ...@@ -21,7 +21,7 @@ cdef extern from "proto/stores/packetTimeoutTable.c":
cdef extern from "proto/stores/repairBlockStore.c": cdef extern from "proto/stores/repairBlockStore.c":
pass pass
cdef extern from "proto/stores/receiveDataQueue.c": cdef extern from "proto/stores/packetDeliveryStore.c":
pass pass
cdef extern from "proto/types/packetTimeout.c": cdef extern from "proto/types/packetTimeout.c":
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment