Commit 07e048f0 authored by Stefan Reif's avatar Stefan Reif
Browse files

Fix the whole pipe/MPSC_queue situation

- Use the much faster MPSC_queue algorithm to implement the pipe
- Replace the MPSC_queue by the propper pipe abstraction instead
parent f06a86ed
Pipeline #2012 failed with stages
in 28 seconds
...@@ -131,7 +131,7 @@ void *send_data_loop(void *ptr) { ...@@ -131,7 +131,7 @@ void *send_data_loop(void *ptr) {
while (1) { while (1) {
ListNode *job; ListNode *job;
do { do {
job = MPSCQueue_pop(sock_ptr->sendDataQueue); job = Pipe_poll(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) { if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) { if (block != NULL) {
PrrtBlock_destroy(block); PrrtBlock_destroy(block);
......
...@@ -98,7 +98,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay ...@@ -98,7 +98,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
"Socket option set failed."); "Socket option set failed.");
if (is_sender) { if (is_sender) {
s->sendDataQueue = MPSCQueue_create(); s->sendDataQueue = Pipe_create();
} else { } else {
s->deliveredPacketTable = PrrtDeliveredPacketTable_create(); s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
s->repairBlockStore = PrrtRepairBlockStore_create(); s->repairBlockStore = PrrtRepairBlockStore_create();
...@@ -257,7 +257,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) { ...@@ -257,7 +257,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph); XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage); XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
MPSCQueue_push(s->sendDataQueue, &packet->asListNode); Pipe_push(s->sendDataQueue, &packet->asListNode);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd); XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd); XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
...@@ -428,7 +428,7 @@ int PrrtSocket_close(PrrtSocket *s) { ...@@ -428,7 +428,7 @@ int PrrtSocket_close(PrrtSocket *s) {
} }
if (s->sendDataQueue != NULL) { if (s->sendDataQueue != NULL) {
MPSCQueue_destroy(s->sendDataQueue); Pipe_destroy(s->sendDataQueue);
s->sendDataQueue = NULL; s->sendDataQueue = NULL;
} }
......
...@@ -38,7 +38,7 @@ typedef struct prrtSocket { ...@@ -38,7 +38,7 @@ typedef struct prrtSocket {
PrrtClock clock; PrrtClock clock;
pthread_t sendDataThread; pthread_t sendDataThread;
MPSCQueue *sendDataQueue; Pipe *sendDataQueue;
pthread_t receiveDataThread; pthread_t receiveDataThread;
PrrtPacketDeliveryStore* packetDeliveryStore; PrrtPacketDeliveryStore* packetDeliveryStore;
......
/**
* \brief Simple wrapper functions for the Linux Futex system call
* \file futex.h
*
* This header provides small wrapper functions for the futex system call. It
* has, however, the following limitations:
* - no support for wait operations with timeout
* - no support for intra-process futexes via shared memory mappings
* - no sophisticated futex operations like \c FUTEX_REQUEUE, \c FUTEX_WAIT_BITSET,
* or \c FUTEX_WAKE_OP.
*
* \warning This code requires gcc or a compatible compiler (e.g. clang)
* \warning The futex system call is linux-specific and therefore not portable
*/
#ifndef __FUTEX_H_INCLUDED__
#define __FUTEX_H_INCLUDED__
#include <linux/futex.h>
#include <limits.h>
#include <stddef.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdatomic.h>
#ifndef SYS_futex
# error "The futex system call is not available"
#endif
#ifndef __GNUC__
# error "please use gcc, clang, or a compatible compiler"
#endif
/**
* \brief Let functions behave like macros
*
* This modifier allows placement of functions in header files. The compiler
* will inline them (just like macros) and will not complain about unused
* functions.
*
* In general, functions have multple advantages over macros, e.g.
* - Type safety
* - No double argument expansion
* - No unexpected text replacement in other source code files
* - Better debugging information
*
* \warning This macro is intended for internal use only
*/
#ifdef __GNUC__
# define futex__exposed static inline __attribute__((__always_inline__,__unused__))
#else
# define futex__exposed /* this macro confuses doxygen */
#endif
/**
* \brief Wait for a variable to change
*
* This function causes the calling thread to sleep, but only if the value at
* \c *addr is equal to \c exp. If the values are not equal (e.g. due to a
* concurrent modification), this system call returns.
*
* The thread will not wake up automatically when the variable changes.
* Instead, each thread that updates the variable is responsible to call
* futex_wake_one or futex_wake_all, to wake up sleeping threads.
*
* \param addr The address of the variable of interest
* \param exp The expected value of that variable
* \returns 0 on success
* \returns -1 on error
*
* \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html)
* \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html)
* \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html)
*/
futex__exposed int futex_wait(atomic_int *addr, int exp)
{
return syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, exp, NULL);
}
/**
* \brief Wake up a thread waiting on a futex variable
*
* This function wakes up a thread waiting on a futex. However, the number of
* threads that actually wake up is not clear. If there is a thread waiting on
* the given variable, then at least one thread will wake up. However, it is
* possible that no thread wakes up (if no thread is waiting on the variable),
* and it is also possible that more than one thread wakes up (e.g. if a thread
* is concurrently going to sleep).
*
* \param addr The address of the variable of interest
* \returns 0 on success
* \returns -1 on error
*
* \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html)
* \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html)
* \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html)
*/
futex__exposed int futex_wake_one(atomic_int *addr)
{
return syscall(SYS_futex, addr, FUTEX_WAKE_PRIVATE, 1);
}
/**
* \brief Wake up a thread waiting on a futex variable
*
* This function wakes up all threads waiting on a futex. However, it is not
* clear whether a thread that concurrently calls futex_wait will wake up as
* well.
*
* \param addr The address of the variable of interest
* \returns 0 on success
* \returns -1 on error
*
* \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html)
* \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html)
* \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html)
*/
futex__exposed int futex_wake_all(atomic_int *addr)
{
return syscall(SYS_futex, addr, FUTEX_WAKE_PRIVATE, INT_MAX);
}
#endif /* __FUTEX_H_INCLUDED__ */
#include "pipe.h" #include "pipe.h"
#include "dbg.h" #include "dbg.h"
#include "common.h" #include "common.h"
#include "futex.h"
#include <stdlib.h> #include <stdlib.h>
#include <assert.h>
#include <stdbool.h>
Pipe *Pipe_create(void) { Pipe *Pipe_create(void) {
Pipe *p = malloc(sizeof(Pipe)); Pipe *p = malloc(sizeof(Pipe));
check_mem(p); check_mem(p);
p->list = List_create();
p->size = 1; mpscq_node_t* stub = calloc(1, sizeof(mpscq_node_t));
pthread_mutex_init(&p->lock, NULL); check_mem(stub);
pthread_cond_init(&p->wait_for_data, NULL);
pthread_cond_init(&p->wait_for_space, NULL); p->tail = stub;
atomic_store_explicit(&stub->next, NULL, memory_order_relaxed);
atomic_store_explicit(&p->head, stub, memory_order_relaxed);
atomic_store_explicit(&p->items, 0, memory_order_relaxed);
atomic_store_explicit(&p->space, 1, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
return p; return p;
...@@ -20,67 +29,107 @@ Pipe *Pipe_create(void) { ...@@ -20,67 +29,107 @@ Pipe *Pipe_create(void) {
abort(); abort();
} }
void Pipe_destroy(Pipe *p) { void Pipe_destroy(Pipe *self) {
List_clear_destroy(p->list); void* node = Pipe_poll(self);
pthread_mutex_destroy(&p->lock); while (node != 0) {
pthread_cond_destroy(&p->wait_for_data); free(node);
pthread_cond_destroy(&p->wait_for_space); node = Pipe_poll(self);
free(p); }
if (self->head != NULL) {
free(self->head);
self->head = NULL;
}
free(self);
} }
void Pipe_push(Pipe *p, ListNode *n) { static void pipe_wait_for(atomic_int *var) {
pthread_mutex_lock(&p->lock); int val = atomic_load_explicit(var, memory_order_acquire);
while (List_count(p->list) > p->size) { while (true) {
pthread_cond_wait(&p->wait_for_space, &p->lock); if (!val) {
} futex_wait(var, 0);
List_push(p->list, n); val = atomic_load_explicit(var, memory_order_acquire);
pthread_cond_broadcast(&p->wait_for_data); } else if (atomic_compare_exchange_weak_explicit(var, &val, val - 1, memory_order_acq_rel, memory_order_acquire)) {
pthread_mutex_unlock(&p->lock); return;
}
}
}
static bool pipe_wait_try(atomic_int *var) {
int val = atomic_load_explicit(var, memory_order_acquire);
if (val > 0 && atomic_compare_exchange_weak_explicit(var, &val, val - 1, memory_order_acq_rel, memory_order_acquire))
return true;
return false;
}
static void pipe_wake(atomic_int *var) {
int val = atomic_fetch_add_explicit(var, 1, memory_order_release);
if (!val)
futex_wake_all(var);
}
void Pipe_push(Pipe *p, ListNode *value) {
pipe_wait_for(&p->space);
mpscq_node_t *n = calloc(1, sizeof(mpscq_node_t));
assert(n);
n->data = value;
atomic_store_explicit(&n->next, NULL, memory_order_relaxed);
mpscq_node_t* prev = atomic_exchange_explicit(&p->head, n, memory_order_acq_rel);
atomic_store_explicit(&prev->next, n, memory_order_release);
pipe_wake(&p->items);
}
static ListNode *Pipe_deq(Pipe *p) {
mpscq_node_t* tail = p->tail;
assert(tail);
mpscq_node_t* next = atomic_load_explicit(&tail->next, memory_order_acquire);
if (!next) {
pipe_wake(&p->items);
return NULL;
}
p->tail = next;
void* ret = next->data;
free(tail);
pipe_wake(&p->space);
return (ListNode *) ret;
} }
ListNode *Pipe_pull(Pipe *p) { ListNode *Pipe_pull(Pipe *p) {
ListNode *n;
pthread_mutex_lock(&p->lock); pipe_wait_for(&p->items);
n = List_shift(p->list); return Pipe_deq(p);
if (!n) {
pthread_cond_wait(&p->wait_for_data, &p->lock);
n = List_shift(p->list);
}
pthread_cond_broadcast(&p->wait_for_space);
pthread_mutex_unlock(&p->lock);
return n;
} }
ListNode *Pipe_poll(Pipe *p) { ListNode *Pipe_poll(Pipe *p) {
ListNode *n;
pthread_mutex_lock(&p->lock); if (!pipe_wait_try(&p->items))
n = List_shift(p->list); return NULL;
pthread_mutex_unlock(&p->lock); return Pipe_deq(p);
return n;
} }
void Pipe_wake(Pipe *p) { void Pipe_wake(Pipe *p) {
pthread_mutex_lock(&p->lock); pipe_wake(&p->items);
pthread_cond_broadcast(&p->wait_for_data); pipe_wake(&p->space);
pthread_mutex_unlock(&p->lock);
} }
void Pipe_set_size(Pipe *p, uint32_t size) { void Pipe_set_size(Pipe *p, uint32_t size) {
uint32_t old_size = 0; int items = atomic_load(&p->items);
pthread_mutex_lock(&p->lock); int space = size - items;
old_size = p->size; if (space < 0)
p->size = size; space = 0;
if (old_size < size) { atomic_store(&p->space, space);
pthread_cond_broadcast(&p->wait_for_space);
}
pthread_mutex_unlock(&p->lock);
} }
uint32_t Pipe_get_size(Pipe *p) { uint32_t Pipe_get_size(Pipe *p) {
uint32_t size = 0; return atomic_load(&p->space) + atomic_load(&p->items);
pthread_mutex_lock(&p->lock);
size = p->size;
pthread_mutex_unlock(&p->lock);
return size;
} }
...@@ -2,17 +2,18 @@ ...@@ -2,17 +2,18 @@
#define PRRT_PIPE_H #define PRRT_PIPE_H
#include "list.h" #include "list.h"
#include "mpsc_queue.h"
#include <pthread.h> #include <pthread.h>
#include <stdatomic.h>
typedef struct pipe { typedef struct pipe {
List *list; _Atomic(mpscq_node_t *) head;
pthread_mutex_t lock; mpscq_node_t * tail;
pthread_cond_t wait_for_data;
pthread_cond_t wait_for_space;
uint32_t size; atomic_int space;
atomic_int items;
} Pipe; } Pipe;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment