Commit 3bc2234b authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

PacketDeliveryStore can be interrupted.

parent a42fb197
Pipeline #1848 failed with stages
in 1 minute and 30 seconds
...@@ -367,8 +367,9 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT ...@@ -367,8 +367,9 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
int PrrtSocket_interrupt(PrrtSocket *s) { int PrrtSocket_interrupt(PrrtSocket *s) {
atomic_store_explicit(&s->closing, true, memory_order_release); atomic_store_explicit(&s->closing, true, memory_order_release);
if (s->packetDeliveryStore) if (s->packetDeliveryStore) {
PrrtPacketDeliveryStore_wake(s->packetDeliveryStore); PrrtPacketDeliveryStore_interrupt(s->packetDeliveryStore);
}
if(s->receiver != NULL) { if(s->receiver != NULL) {
PrrtReceiver_interrupt(s->receiver); PrrtReceiver_interrupt(s->receiver);
......
#include <pthread.h> #include <pthread.h>
#include <stdatomic.h>
#include "../types/packet.h" #include "../types/packet.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
...@@ -7,6 +8,7 @@ ...@@ -7,6 +8,7 @@
PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() { PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
PrrtPacketDeliveryStore *q = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore)); PrrtPacketDeliveryStore *q = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
q->tree = NULL; q->tree = NULL;
q->closing = false;
pthread_mutexattr_t attr; pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed."); check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
...@@ -28,7 +30,7 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor ...@@ -28,7 +30,7 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (!packet) { if (!packet && !atomic_load_explicit(&q->closing, memory_order_acquire)) {
pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline); pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
} }
...@@ -49,7 +51,7 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q, ...@@ -49,7 +51,7 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q,
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (!packet) { if (!packet && !atomic_load_explicit(&q->closing, memory_order_acquire)) {
pthread_cond_wait(&q->wait_for_data, &q->lock); pthread_cond_wait(&q->wait_for_data, &q->lock);
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop); packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
} }
...@@ -123,8 +125,9 @@ bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) { ...@@ -123,8 +125,9 @@ bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) {
return true; return true;
} }
void PrrtPacketDeliveryStore_wake(PrrtPacketDeliveryStore *q) { void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q) {
pthread_mutex_lock(&q->lock); pthread_mutex_lock(&q->lock);
atomic_store_explicit(&q->closing, true, memory_order_release);
pthread_cond_broadcast(&q->wait_for_data); pthread_cond_broadcast(&q->wait_for_data);
pthread_mutex_unlock(&q->lock); pthread_mutex_unlock(&q->lock);
} }
\ No newline at end of file
...@@ -8,6 +8,7 @@ typedef struct prrtReceiveDataQueue { ...@@ -8,6 +8,7 @@ typedef struct prrtReceiveDataQueue {
pthread_mutex_t lock; pthread_mutex_t lock;
BPTreeNode *tree; BPTreeNode *tree;
pthread_cond_t wait_for_data; pthread_cond_t wait_for_data;
atomic_bool closing;
} PrrtPacketDeliveryStore; } PrrtPacketDeliveryStore;
PrrtPacketDeliveryStore* PrrtPacketDeliveryStore_create(void); PrrtPacketDeliveryStore* PrrtPacketDeliveryStore_create(void);
...@@ -22,7 +23,7 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore ...@@ -22,7 +23,7 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet); bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet);
void PrrtPacketDeliveryStore_wake(PrrtPacketDeliveryStore *q); void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q);
bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q); bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment