Commit 93c0ca07 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Integrate MPSCQueue.

parent fcae02f4
Pipeline #1678 passed with stages
in 1 minute and 27 seconds
......@@ -101,7 +101,7 @@ cdef extern from "proto/socket.h":
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
pthread_cond_t outQueueFilledCv
Pipe* sendDataQueue
MPSCQueue* sendDataQueue
pthread_t receiveDataThread
pthread_mutex_t inQueueFilledMutex
......@@ -165,3 +165,7 @@ cdef extern from "util/bptree.h":
cdef extern from "util/pipe.h":
ctypedef struct Pipe:
pass
cdef extern from "util/mpsc_queue.h":
ctypedef struct MPSCQueue:
pass
......@@ -127,7 +127,7 @@ void *send_data_loop(void *ptr) {
while (1) {
ListNode *job;
do {
job = Pipe_pull(sock_ptr->sendDataQueue);
job = MPSCQueue_pop(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) {
PrrtBlock_destroy(block);
......
......@@ -96,7 +96,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
"Socket option set failed.");
if (is_sender) {
s->sendDataQueue = Pipe_create();
s->sendDataQueue = MPSCQueue_create();
} else {
s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
s->repairBlockStore = PrrtRepairBlockStore_create();
......@@ -252,7 +252,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Pipe_push(s->sendDataQueue, &packet->asListNode);
MPSCQueue_push(s->sendDataQueue, &packet->asListNode);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
......@@ -382,8 +382,6 @@ int PrrtSocket_interrupt(PrrtSocket *s) {
if (s->receiveDataQueue)
PrrtReceiveDataQueue_wake(s->receiveDataQueue);
if (s->sendDataQueue)
Pipe_wake(s->sendDataQueue);
void **res = NULL;
......@@ -434,7 +432,7 @@ int PrrtSocket_close(PrrtSocket *s) {
}
if (s->sendDataQueue != NULL) {
Pipe_destroy(s->sendDataQueue);
MPSCQueue_destroy(s->sendDataQueue);
s->sendDataQueue = NULL;
}
......@@ -505,7 +503,7 @@ uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
if (strcmp(name, "app_queue_size")) {
PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
Pipe_set_size(s->sendDataQueue, value);
// TODO: MPSC_Queue does not provide a size.
} else {
return false;
}
......
......@@ -6,6 +6,7 @@
#include "packet.h"
#include "../util/list.h"
#include "../util/pipe.h"
#include "../util/mpsc_queue.h"
#include "../util/bptree.h"
#include "channelStateInformation.h"
#include "applicationConstraints.h"
......@@ -35,7 +36,7 @@ typedef struct prrtSocket {
PrrtClock clock;
pthread_t sendDataThread;
Pipe *sendDataQueue;
MPSCQueue *sendDataQueue;
pthread_t receiveDataThread;
PrrtReceiveDataQueue* receiveDataQueue;
......
add_library(UTIL ../defines.h common.c common.h list.c list.h pipe.c pipe.h dbg.h bptree.c bptree.h bitmap.c bitmap.h)
add_library(UTIL ../defines.h common.c common.h list.c list.h pipe.c pipe.h dbg.h bptree.c bptree.h bitmap.c bitmap.h mpsc_queue.c mpsc_queue.h)
set_property(TARGET UTIL PROPERTY C_STANDARD 99)
target_link_libraries(UTIL ${M_LIB})
......@@ -10,13 +10,13 @@
#define DEBUG_ALL 0
#define DEBUG_BLOCK 0
#define DEBUG_PACKET 1
#define DEBUG_RECEIVER 1
#define DEBUG_PACKET 0
#define DEBUG_RECEIVER 0
#define DEBUG_SENDER 0
#define DEBUG_SOCKET 1
#define DEBUG_DATARECEIVER 1
#define DEBUG_DATATRANSMITTER 1
#define DEBUG_HARDSTAMPING 1
#define DEBUG_DATATRANSMITTER 0
#define DEBUG_HARDSTAMPING 0
#ifdef DEBUG
#define debug(DOMAIN, M, ...) do { if (DEBUG_ALL||(DOMAIN)) fprintf(stderr, "DEBUG %-20s %s:%d: " M "\n", #DOMAIN + 6, __FILE__, __LINE__, ##__VA_ARGS__); } while (0)
......
#include "mpsc_queue.h"
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
MPSCQueue* MPSCQueue_create()
{
MPSCQueue* self = calloc(1, sizeof(MPSCQueue));
mpscq_node_t* stub = calloc(1, sizeof(mpscq_node_t));
assert(self && stub);
atomic_store_explicit(&stub->next, NULL, memory_order_relaxed);
atomic_store_explicit(&self->head, stub, memory_order_relaxed);
self->tail = stub;
self->size = 0;
atomic_thread_fence(memory_order_release);
return self;
}
void MPSCQueue_push(MPSCQueue *self, void *value)
{
mpscq_node_t* n = calloc(1, sizeof(mpscq_node_t));
assert(n);
n->data = value;
atomic_store_explicit(&n->next, NULL, memory_order_relaxed);
mpscq_node_t* prev = atomic_exchange_explicit(&self->head, n, memory_order_acq_rel);
atomic_store_explicit(&prev->next, n, memory_order_release);
self->size++;
}
void* MPSCQueue_pop(MPSCQueue* self)
{
mpscq_node_t* tail = self->tail;
mpscq_node_t* next = atomic_load_explicit(&tail->next, memory_order_acquire);
if (next)
{
self->tail = next;
void* ret = next->data;
free(tail);
self->size--;
return ret;
}
return NULL;
}
void MPSCQueue_destroy(MPSCQueue *self)
{
mpscq_node_t* node = MPSCQueue_pop(self);
while (node != 0) {
free(node);
node = MPSCQueue_pop(self);
}
if (self->head != NULL) {
free(self->head);
self->head = NULL;
}
free(self);
}
// Taken from: http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
#ifndef PRRT_MPSC_QUEUE_H
#define PRRT_MPSC_QUEUE_H
#include <stdint.h>
#include <stdatomic.h>
typedef struct mpscq_node_t mpscq_node_t;
typedef struct mpscq_node_t
{
_Atomic(mpscq_node_t *) next;
void* data;
} mpscq_node_t;
typedef struct mpscq_t
{
_Atomic(mpscq_node_t *) head;
mpscq_node_t * tail;
volatile int size;
} MPSCQueue;
MPSCQueue* MPSCQueue_create();
void MPSCQueue_push(MPSCQueue *self, void *value);
void* MPSCQueue_pop(MPSCQueue* self);
void MPSCQueue_destroy(MPSCQueue *self);
#endif //PRRT_MPSC_QUEUE_H
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment