Loading prrt/proto/CMakeLists.txt +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ add_library(PRRT ../defines.h codingParams.c codingParams.h receiver.c receiver.h socket.c socket.h timer.c timer.h ../xlap/xlap.c ../xlap/xlap.h applicationConstraints.c applicationConstraints.h processes/dataReceiver.c processes/dataReceiver.h Loading prrt/proto/timer.c 0 → 100644 +210 −0 Original line number Diff line number Diff line #include "timer.h" #include <assert.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> static unsigned long long timer_get_resolution(void) { #define N 100 struct timespec last; struct timespec iter; clock_gettime(CLOCK_MONOTONIC, &last); unsigned long long sum = 0; for (int n = 0; n < N; n++) { clock_gettime(CLOCK_MONOTONIC, &iter); sum += (iter.tv_sec - last.tv_sec) * 1000000000ULL + (iter.tv_nsec - last.tv_nsec); memcpy(&last, &iter, sizeof(struct timespec)); } return sum / N; } static bool timeout_is_imminent(PrrtTimer *timer, PrrtTimerListNode *node) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); struct timespec when = node->time; #define EPSILON (timer->resolution) uint64_t delta = 0; if (when.tv_sec > now.tv_sec) { return true; } else if (when.tv_sec == now.tv_sec) { if (when.tv_nsec > now.tv_nsec) return true; delta = now.tv_nsec - when.tv_nsec; } else if (when.tv_sec == now.tv_sec + 1){ delta = 1000000000ULL + (now.tv_nsec - when.tv_nsec); } else { return false; } return delta < EPSILON; } #define CMP_EARLIER (-1) #define CMP_EQUAL (0) #define CMP_LATER (+1) static int timeout_compare(PrrtTimerListNode *n1, PrrtTimerListNode *n2) { struct timespec t1 = n1->time; struct timespec t2 = n2->time; if (t1.tv_sec > t2.tv_sec) return CMP_LATER; if (t1.tv_sec < t2.tv_sec) return CMP_EARLIER; if (t1.tv_nsec > t2.tv_nsec) return CMP_LATER; if (t1.tv_nsec < t2.tv_nsec) return CMP_EARLIER; return CMP_EQUAL; } static bool compute_sleep_timeout(PrrtTimer *timer, struct timespec *timeout, struct timespec *sleep) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (timeout->tv_sec < now.tv_sec) return false; if (timeout->tv_sec == now.tv_sec && timeout->tv_nsec < now.tv_nsec) return false; unsigned long long delta = (timeout->tv_sec - now.tv_sec) * 1000000000ULL + (timeout->tv_nsec - now.tv_nsec); if (delta < timer->resolution) return false; delta -= timer->resolution; sleep->tv_sec = delta / 1000000000ULL; sleep->tv_nsec = delta % 1000000000ULL; return true; } static void task_run(PrrtTimerTask *task) { task->fun(task->arg); } static void *timer_loop(void *arg) { PrrtTimer *timer = (PrrtTimer *) arg; unsigned int oval = atomic_load_explicit(&timer->wake, memory_order_acquire); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *item = timer->list; pthread_mutex_unlock(&timer->lock); while (true) { if (!item) { futex_wait((atomic_int *) &timer->wake, oval); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); } pthread_mutex_lock(&timer->lock); item = timer->list; pthread_mutex_unlock(&timer->lock); if (timeout_is_imminent(timer, item)) { task_run(&item->task); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *next = item->next; timer->list = next; pthread_mutex_unlock(&timer->lock); free(item); item = next; } if (item) { struct timespec sleep; if (compute_sleep_timeout(timer, &item->time, &sleep)) futex_wait_until((atomic_int *) &timer->wake, oval, &sleep); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); } } return NULL; } static void timer_wake_worker(PrrtTimer *timer) { atomic_fetch_add_explicit(&timer->wake, 1, memory_order_acq_rel); futex_wake_one((atomic_int *) &timer->wake); } PrrtTimer *PrrtTimer_create(void) { PrrtTimer *timer = malloc(sizeof(PrrtTimer)); assert(timer); int failed = pthread_create(&timer->worker, NULL, timer_loop, timer); assert(!failed); timer->resolution = timer_get_resolution(); pthread_mutex_init(&timer->lock, NULL); timer->list = NULL; atomic_store_explicit(&timer->wake, 0, memory_order_release); return timer; } void PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task) { PrrtTimerListNode *node = malloc(sizeof(PrrtTimerListNode)); assert(node); memcpy(&node->task, task, sizeof(PrrtTimerTask)); memcpy(&node->time, time, sizeof(struct timespec)); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *oldhead = timer->list; PrrtTimerListNode **addr = &timer->list; while (true) { PrrtTimerListNode *iter = *addr; if (!iter) break; if (CMP_EARLIER != timeout_compare(node, iter)) break; addr = &iter->next; } PrrtTimerListNode *next = (*addr)->next; node->next = next; (*addr)->next = node; PrrtTimerListNode *newhead = timer->list; pthread_mutex_unlock(&timer->lock); if (oldhead != newhead) timer_wake_worker(timer); } void PrrtTimer_end(PrrtTimer *timer) { pthread_cancel(timer->worker); pthread_join(timer->worker, NULL); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *node = timer->list; while (node) { PrrtTimerListNode *next = node->next; free(node); node = next; } pthread_mutex_unlock(&timer->lock); pthread_mutex_destroy(&timer->lock); } prrt/proto/timer.h 0 → 100644 +34 −0 Original line number Diff line number Diff line #ifndef PRRT_TIMER_H #define PRRT_TIMER_H #include <pthread.h> #include <stdatomic.h> #include "../util/futex.h" #include "../util/list.h" typedef struct prrtTimerTask { void (*fun)(void *); void *arg; } PrrtTimerTask; typedef struct prrtTimerListNode { struct prrtTimerListNode *next; struct timespec time; PrrtTimerTask task; } PrrtTimerListNode; typedef struct prrtTimer { pthread_t worker; pthread_mutex_t lock; PrrtTimerListNode *list; atomic_uint wake; unsigned long long resolution; } PrrtTimer; PrrtTimer *PrrtTimer_create(void); void *PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task); void PrrtTimer_end(PrrtTimer *timer); #endif // PRRT_TIMER_H prrt/util/futex.h +21 −0 Original line number Diff line number Diff line Loading @@ -120,4 +120,25 @@ futex__exposed int futex_wake_all(atomic_int *addr) return syscall(SYS_futex, addr, FUTEX_WAKE_PRIVATE, INT_MAX); } /** * \brief Wait for a variable to change, with timeout * * This function is equivalent to futex_wait, but it takes an additional * timeout parameter that limits the waiting time. * * \param addr The address of the variable of interest * \param exp The expected value of that variable * \param time The waiting timeout * \returns 0 on success * \returns -1 on error * * \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html) * \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html) * \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html) */ futex__exposed int futex_wait_until(atomic_int *addr, int exp, struct timespec *time) { return syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, exp, time); } #endif /* __FUTEX_H_INCLUDED__ */ Loading
prrt/proto/CMakeLists.txt +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ add_library(PRRT ../defines.h codingParams.c codingParams.h receiver.c receiver.h socket.c socket.h timer.c timer.h ../xlap/xlap.c ../xlap/xlap.h applicationConstraints.c applicationConstraints.h processes/dataReceiver.c processes/dataReceiver.h Loading
prrt/proto/timer.c 0 → 100644 +210 −0 Original line number Diff line number Diff line #include "timer.h" #include <assert.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> static unsigned long long timer_get_resolution(void) { #define N 100 struct timespec last; struct timespec iter; clock_gettime(CLOCK_MONOTONIC, &last); unsigned long long sum = 0; for (int n = 0; n < N; n++) { clock_gettime(CLOCK_MONOTONIC, &iter); sum += (iter.tv_sec - last.tv_sec) * 1000000000ULL + (iter.tv_nsec - last.tv_nsec); memcpy(&last, &iter, sizeof(struct timespec)); } return sum / N; } static bool timeout_is_imminent(PrrtTimer *timer, PrrtTimerListNode *node) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); struct timespec when = node->time; #define EPSILON (timer->resolution) uint64_t delta = 0; if (when.tv_sec > now.tv_sec) { return true; } else if (when.tv_sec == now.tv_sec) { if (when.tv_nsec > now.tv_nsec) return true; delta = now.tv_nsec - when.tv_nsec; } else if (when.tv_sec == now.tv_sec + 1){ delta = 1000000000ULL + (now.tv_nsec - when.tv_nsec); } else { return false; } return delta < EPSILON; } #define CMP_EARLIER (-1) #define CMP_EQUAL (0) #define CMP_LATER (+1) static int timeout_compare(PrrtTimerListNode *n1, PrrtTimerListNode *n2) { struct timespec t1 = n1->time; struct timespec t2 = n2->time; if (t1.tv_sec > t2.tv_sec) return CMP_LATER; if (t1.tv_sec < t2.tv_sec) return CMP_EARLIER; if (t1.tv_nsec > t2.tv_nsec) return CMP_LATER; if (t1.tv_nsec < t2.tv_nsec) return CMP_EARLIER; return CMP_EQUAL; } static bool compute_sleep_timeout(PrrtTimer *timer, struct timespec *timeout, struct timespec *sleep) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (timeout->tv_sec < now.tv_sec) return false; if (timeout->tv_sec == now.tv_sec && timeout->tv_nsec < now.tv_nsec) return false; unsigned long long delta = (timeout->tv_sec - now.tv_sec) * 1000000000ULL + (timeout->tv_nsec - now.tv_nsec); if (delta < timer->resolution) return false; delta -= timer->resolution; sleep->tv_sec = delta / 1000000000ULL; sleep->tv_nsec = delta % 1000000000ULL; return true; } static void task_run(PrrtTimerTask *task) { task->fun(task->arg); } static void *timer_loop(void *arg) { PrrtTimer *timer = (PrrtTimer *) arg; unsigned int oval = atomic_load_explicit(&timer->wake, memory_order_acquire); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *item = timer->list; pthread_mutex_unlock(&timer->lock); while (true) { if (!item) { futex_wait((atomic_int *) &timer->wake, oval); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); } pthread_mutex_lock(&timer->lock); item = timer->list; pthread_mutex_unlock(&timer->lock); if (timeout_is_imminent(timer, item)) { task_run(&item->task); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *next = item->next; timer->list = next; pthread_mutex_unlock(&timer->lock); free(item); item = next; } if (item) { struct timespec sleep; if (compute_sleep_timeout(timer, &item->time, &sleep)) futex_wait_until((atomic_int *) &timer->wake, oval, &sleep); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); } } return NULL; } static void timer_wake_worker(PrrtTimer *timer) { atomic_fetch_add_explicit(&timer->wake, 1, memory_order_acq_rel); futex_wake_one((atomic_int *) &timer->wake); } PrrtTimer *PrrtTimer_create(void) { PrrtTimer *timer = malloc(sizeof(PrrtTimer)); assert(timer); int failed = pthread_create(&timer->worker, NULL, timer_loop, timer); assert(!failed); timer->resolution = timer_get_resolution(); pthread_mutex_init(&timer->lock, NULL); timer->list = NULL; atomic_store_explicit(&timer->wake, 0, memory_order_release); return timer; } void PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task) { PrrtTimerListNode *node = malloc(sizeof(PrrtTimerListNode)); assert(node); memcpy(&node->task, task, sizeof(PrrtTimerTask)); memcpy(&node->time, time, sizeof(struct timespec)); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *oldhead = timer->list; PrrtTimerListNode **addr = &timer->list; while (true) { PrrtTimerListNode *iter = *addr; if (!iter) break; if (CMP_EARLIER != timeout_compare(node, iter)) break; addr = &iter->next; } PrrtTimerListNode *next = (*addr)->next; node->next = next; (*addr)->next = node; PrrtTimerListNode *newhead = timer->list; pthread_mutex_unlock(&timer->lock); if (oldhead != newhead) timer_wake_worker(timer); } void PrrtTimer_end(PrrtTimer *timer) { pthread_cancel(timer->worker); pthread_join(timer->worker, NULL); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *node = timer->list; while (node) { PrrtTimerListNode *next = node->next; free(node); node = next; } pthread_mutex_unlock(&timer->lock); pthread_mutex_destroy(&timer->lock); }
prrt/proto/timer.h 0 → 100644 +34 −0 Original line number Diff line number Diff line #ifndef PRRT_TIMER_H #define PRRT_TIMER_H #include <pthread.h> #include <stdatomic.h> #include "../util/futex.h" #include "../util/list.h" typedef struct prrtTimerTask { void (*fun)(void *); void *arg; } PrrtTimerTask; typedef struct prrtTimerListNode { struct prrtTimerListNode *next; struct timespec time; PrrtTimerTask task; } PrrtTimerListNode; typedef struct prrtTimer { pthread_t worker; pthread_mutex_t lock; PrrtTimerListNode *list; atomic_uint wake; unsigned long long resolution; } PrrtTimer; PrrtTimer *PrrtTimer_create(void); void *PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task); void PrrtTimer_end(PrrtTimer *timer); #endif // PRRT_TIMER_H
prrt/util/futex.h +21 −0 Original line number Diff line number Diff line Loading @@ -120,4 +120,25 @@ futex__exposed int futex_wake_all(atomic_int *addr) return syscall(SYS_futex, addr, FUTEX_WAKE_PRIVATE, INT_MAX); } /** * \brief Wait for a variable to change, with timeout * * This function is equivalent to futex_wait, but it takes an additional * timeout parameter that limits the waiting time. * * \param addr The address of the variable of interest * \param exp The expected value of that variable * \param time The waiting timeout * \returns 0 on success * \returns -1 on error * * \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html) * \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html) * \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html) */ futex__exposed int futex_wait_until(atomic_int *addr, int exp, struct timespec *time) { return syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, exp, time); } #endif /* __FUTEX_H_INCLUDED__ */