Loading prrt/proto/timer.c +361 −135 Original line number Diff line number Diff line #include "timer.h" #include <assert.h> #include <errno.h> #include <pthread.h> #include <sched.h> #include <stdatomic.h> #include <stdbool.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> #include <stdio.h> #include <time.h> #include <limits.h> #include <linux/futex.h> #include <sys/time.h> #include <sys/syscall.h> #include <sys/prctl.h> #include <unistd.h> typedef prrtTimerDate TimerDate; typedef prrtTimerTaskFun TimerTaskFun; typedef prrtTimerTaskArg TimerTaskArg; static unsigned long long timer_get_resolution(void) { #define N 100 struct timespec last; struct timespec iter; typedef unsigned long long TimerDateUDiff_t; #define TimerDateUDiff_MAX ULLONG_MAX clock_gettime(CLOCK_MONOTONIC, &last); unsigned long long sum = 0; #define NSEC_PER_SEC 1000000000 for (int n = 0; n < N; n++) { clock_gettime(CLOCK_MONOTONIC, &iter); sum += (iter.tv_sec - last.tv_sec) * 1000000000ULL + (iter.tv_nsec - last.tv_nsec); memcpy(&last, &iter, sizeof(struct timespec)); static inline void timer_date_make_inf(TimerDate *td) { td->tv_sec = td->tv_nsec = 0; } return sum / N; static inline bool timer_date_is_inf(const TimerDate *td) { return !td->tv_sec && !td->tv_nsec; } static bool timeout_is_imminent(PrrtTimer *timer, PrrtTimerListNode *node) static inline TimerDateUDiff_t timer_duration_finite(const TimerDate *t1, const TimerDate *t2) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); // assume that t1 and t2 are finite return (t2->tv_sec - t1->tv_sec) * NSEC_PER_SEC + (t2->tv_nsec - t1->tv_nsec); } static inline void timer_date_add(TimerDate *td, TimerDateUDiff_t nsec) { assert(td->tv_nsec >= 0 && "negative time"); while (td->tv_nsec >= NSEC_PER_SEC) { td->tv_sec++; td->tv_nsec -= NSEC_PER_SEC; } td->tv_sec += nsec / NSEC_PER_SEC; td->tv_nsec += nsec % NSEC_PER_SEC; if (td->tv_nsec >= NSEC_PER_SEC) { td->tv_sec++; td->tv_nsec -= NSEC_PER_SEC; } } struct timespec when = node->time; static inline void timer_date_sub(TimerDate *td, TimerDateUDiff_t nsec) { assert(td->tv_nsec >= 0 && "negative time"); td->tv_sec -= nsec / NSEC_PER_SEC; nsec %= NSEC_PER_SEC; if ((TimerDateUDiff_t) td->tv_nsec < nsec) { td->tv_sec--; td->tv_nsec += NSEC_PER_SEC; } td->tv_nsec -= nsec; } #define EPSILON (timer->resolution) static inline bool timer_date_is_lt(const TimerDate *ta, const TimerDate *tb) { if (timer_date_is_inf(ta)) return false; if (timer_date_is_inf(tb)) return true; uint64_t delta = 0; // TODO: is integer overflow relevant here? if (when.tv_sec > now.tv_sec) { if (ta->tv_sec < tb->tv_sec) return true; } else if (when.tv_sec == now.tv_sec) { if (when.tv_nsec > now.tv_nsec) if (ta->tv_sec > tb->tv_sec) return false; if (ta->tv_nsec < tb->tv_nsec) return true; delta = now.tv_nsec - when.tv_nsec; } else if (when.tv_sec == now.tv_sec + 1){ delta = 1000000000ULL + (now.tv_nsec - when.tv_nsec); } else { if (ta->tv_nsec > tb->tv_nsec) return false; return false; } return delta < EPSILON; static inline bool timer_date_eq(const TimerDate *ta, const TimerDate *tb) { return ta->tv_sec == tb->tv_sec && ta->tv_nsec == tb->tv_nsec; } #define CMP_EARLIER (-1) #define CMP_EQUAL (0) #define CMP_LATER (+1) static inline TimerDateUDiff_t timer_measure_clock_precision_round(void) { TimerDate a,b; clock_gettime(CLOCK_REALTIME, &a); do { clock_gettime(CLOCK_REALTIME, &b); } while (0 == timer_duration_finite(&a, &b)); return timer_duration_finite(&a, &b); } static int timeout_compare(PrrtTimerListNode *n1, PrrtTimerListNode *n2) static TimerDateUDiff_t timer_measure_clock_precision(void) { const unsigned int ROUNDS = 10; TimerDateUDiff_t sum = 0; for (int r = 0; r < ROUNDS; r++) sum += timer_measure_clock_precision_round(); return (sum + ROUNDS / 2) / ROUNDS; } struct timespec t1 = n1->time; struct timespec t2 = n2->time; typedef struct prrtTimerNode { _Atomic(struct prrtTimerNode *) next; atomic_bool done; TimerDate date; TimerTaskArg arg; TimerTaskFun fun; } TimerNode; #define OSP_WINDOW_SIZE 8 struct prrtTimer { pthread_t worker; atomic_bool alive; atomic_int wait; _Atomic(TimerNode *) new; _Atomic(TimerNode *) old; _Atomic(TimerNode *) del; TimerDateUDiff_t precision; TimerDateUDiff_t lcp; TimerDateUDiff_t osp; TimerDateUDiff_t osp_window[OSP_WINDOW_SIZE]; unsigned int osp_idx; }; typedef struct prrtTimer Timer; static bool timer_date_is_due(Timer *self, TimerDate *when, const TimerDate *now) { // TODO: use self->precision to check whether now and *when are similar enough (void) self; return timer_date_is_lt(when, now); } static void compute_sleep_end(Timer *self, TimerDate *out, const TimerDate *end) { *out = *end; if (timer_date_is_inf(end)) return; if (t1.tv_sec > t2.tv_sec) return CMP_LATER; if (t1.tv_sec < t2.tv_sec) return CMP_EARLIER; if (t1.tv_nsec > t2.tv_nsec) return CMP_LATER; if (t1.tv_nsec < t2.tv_nsec) return CMP_EARLIER; return CMP_EQUAL; // is is okay when the computed sleep end is in the past timer_date_sub(out, self->osp + self->lcp + 2 * self->precision); } static bool compute_sleep_timeout(PrrtTimer *timer, struct timespec *timeout, struct timespec *sleep) static inline void learn_osp(Timer *self, TimerDateUDiff_t new) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (timeout->tv_sec < now.tv_sec) return false; if (timeout->tv_sec == now.tv_sec && timeout->tv_nsec < now.tv_nsec) return false; unsigned long long delta = (timeout->tv_sec - now.tv_sec) * 1000000000ULL + (timeout->tv_nsec - now.tv_nsec); unsigned int idx = self->osp_idx; TimerDateUDiff_t old = self->osp_window[idx]; (void) old; self->osp_window[idx] = new; if (delta < timer->resolution) return false; TimerDateUDiff_t max = self->osp_window[0]; for (int i = 1; i < OSP_WINDOW_SIZE; i++) max = self->osp_window[i] > max ? self->osp_window[i] : max; self->osp = max; delta -= timer->resolution; self->osp_idx = (idx + 1) & (OSP_WINDOW_SIZE - 1); } sleep->tv_sec = delta / 1000000000ULL; sleep->tv_nsec = delta % 1000000000ULL; return true; static inline void learn_lcp(Timer *self, TimerDateUDiff_t new) { TimerDateUDiff_t old = self->lcp; self->lcp = (3 * old + new) / 4; } static void task_run(PrrtTimerTask *task) #define futex(...) syscall(SYS_futex, __VA_ARGS__) static void timer_wait_imprecise(Timer *self, const TimerDate *end) { task->fun(task->arg); bool forever = timer_date_is_inf(end); futex(&self->wait, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, forever ? NULL : end, NULL, FUTEX_BITSET_MATCH_ANY); } static void *timer_loop(void *arg) static void timer_wake_worker(PrrtTimer *self, bool force) { PrrtTimer *timer = (PrrtTimer *) arg; int one = 1; if (atomic_compare_exchange_strong(&self->wait, &one, 0) || force) futex(&self->wait, FUTEX_WAKE|FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0); } unsigned int oval = atomic_load_explicit(&timer->wake, memory_order_acquire); static void *timer_worker_loop(void *arg) { PrrtTimer *self = (PrrtTimer *) arg; pthread_mutex_lock(&timer->lock); PrrtTimerListNode *item = timer->list; pthread_mutex_unlock(&timer->lock); bool slept, learned; while (true) { if (!item) { futex_wait((atomic_int *) &timer->wake, oval); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); while (1) { loop:; atomic_store(&self->wait, 1); TimerNode *task = atomic_load(&self->new); assert(task != NULL && "task list contains NULL node"); if (timer_date_is_inf(&task->date)) { if (!atomic_load_explicit(&self->alive, memory_order_acquire)) { if (task == atomic_load(&self->new)) break; } } TimerDate sleep_end; compute_sleep_end(self, &sleep_end, &task->date); TimerDate now, td1, td2; TimerDateUDiff_t oversleep = 0; slept = false; learned = false; clock_gettime(CLOCK_REALTIME, &now); if (timer_date_is_lt(&now, &sleep_end)) { slept = true; timer_wait_imprecise(self, &sleep_end); clock_gettime(CLOCK_REALTIME, &td1); if (timer_date_is_lt(&td1, &sleep_end)) goto loop; oversleep = timer_duration_finite(&sleep_end, &td1); TimerDateUDiff_t avail = timer_duration_finite(&td1, &task->date); if (timer_date_is_lt(&td1, &task->date) && avail >= 2 * self->lcp) { learn_osp(self, oversleep); learned = true; clock_gettime(CLOCK_REALTIME, &td2); learn_lcp(self, timer_duration_finite(&td1, &td2)); } } pthread_mutex_lock(&timer->lock); item = timer->list; pthread_mutex_unlock(&timer->lock); if (timeout_is_imminent(timer, item)) { task_run(&item->task); atomic_store(&self->wait, 0); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *next = item->next; timer->list = next; pthread_mutex_unlock(&timer->lock); while (true) { if (timer_date_is_due(self, &task->date, &now)) break; if (task != atomic_load(&self->new)) goto loop; clock_gettime(CLOCK_REALTIME, &now); } free(item); item = next; if (!task->done) { task->fun(task->arg); task->done = true; } if (item) { struct timespec sleep; if (compute_sleep_timeout(timer, &item->time, &sleep)) futex_wait_until((atomic_int *) &timer->wake, oval, &sleep); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); atomic_store(&self->del, task); TimerNode *next = atomic_load(&task->next); TimerNode *temp = task; atomic_compare_exchange_strong(&self->new, &temp, next); atomic_store(&self->del, NULL); if (slept && !learned) { clock_gettime(CLOCK_REALTIME, &td1); learn_osp(self, oversleep); clock_gettime(CLOCK_REALTIME, &td2); learn_lcp(self, timer_duration_finite(&td1, &td2)); } if (!slept) { for (int i = 0; i < OSP_WINDOW_SIZE; i++) self->osp_window[i] = self->osp_window[i] / 4 * 3; self->osp = self->osp / 4 * 3; } return NULL; } static void timer_wake_worker(PrrtTimer *timer) { atomic_fetch_add_explicit(&timer->wake, 1, memory_order_acq_rel); futex_wake_one((atomic_int *) &timer->wake); return self; } PrrtTimer *PrrtTimer_create(void) PrrtTimer *PrrtTimer_create(unsigned int core) { PrrtTimer *timer = malloc(sizeof(PrrtTimer)); assert(timer); int err; PrrtTimer *self = malloc(sizeof(PrrtTimer)); if (!self) return NULL; int failed = pthread_create(&timer->worker, NULL, timer_loop, timer); assert(!failed); // create dummy node TimerNode *node = malloc(sizeof(TimerNode)); if (!node) { free(self); return NULL; } timer_date_make_inf(&node->date); atomic_store_explicit(&node->done, false, memory_order_relaxed); atomic_store_explicit(&node->next, NULL, memory_order_relaxed); atomic_store_explicit(&self->alive, true, memory_order_relaxed); atomic_store_explicit(&self->wait, 0, memory_order_relaxed); atomic_store_explicit(&self->new, node, memory_order_relaxed); atomic_store_explicit(&self->old, node, memory_order_relaxed); atomic_store_explicit(&self->del, NULL, memory_order_relaxed); self->precision = timer_measure_clock_precision(); for (int i = 0; i < OSP_WINDOW_SIZE; i++) self->osp_window[i] = self->precision; self->osp = self->precision; self->lcp = self->precision; self->osp_idx = 0; atomic_thread_fence(memory_order_release); // start worker thread pthread_attr_t attr; err = pthread_attr_init(&attr); if (err) { free(node); free(self); errno = err; return NULL; } timer->resolution = timer_get_resolution(); cpu_set_t set; CPU_ZERO(&set); CPU_SET(core, &set); pthread_mutex_init(&timer->lock, NULL); timer->list = NULL; atomic_store_explicit(&timer->wake, 0, memory_order_release); return timer; err = pthread_attr_setaffinity_np(&attr, sizeof(set), &set); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } void PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task) { PrrtTimerListNode *node = malloc(sizeof(PrrtTimerListNode)); assert(node); err = pthread_attr_setschedpolicy(&attr, SCHED_FIFO); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } memcpy(&node->task, task, sizeof(PrrtTimerTask)); memcpy(&node->time, time, sizeof(struct timespec)); err = pthread_create(&self->worker, &attr, timer_worker_loop, self); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } pthread_mutex_lock(&timer->lock); PrrtTimerListNode *oldhead = timer->list; pthread_attr_destroy(&attr); return self; } PrrtTimerListNode **addr = &timer->list; while (true) { PrrtTimerListNode *iter = *addr; if (!iter) break; if (CMP_EARLIER != timeout_compare(node, iter)) int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what) { TimerNode *iter, *stop, *next; TimerNode *node = malloc(sizeof(TimerNode)); if (!node) return -1; node->done = false; node->arg = what->arg; node->fun = what->fun; node->date = *when; // fix the date, if needed if (timer_date_is_inf(&node->date)) timer_date_add(&node->date, 1); iter = atomic_load(&self->old); stop = atomic_load(&self->new); while (iter != stop) { next = iter->next; assert(iter->done && "cleanup task that is not marked as done"); free(iter); iter = next; } atomic_store(&self->old, stop); _Atomic(TimerNode *) *addr = &self->old; while (1) { iter = atomic_load(addr); assert(iter && "unexpected NULL pointer in task list"); // make sure every date is unique if (timer_date_eq(&node->date, &iter->date)) { timer_date_add(&node->date, 1); if (timer_date_is_inf(&node->date)) timer_date_add(&node->date, 1); } else if (timer_date_is_lt(&node->date, &iter->date)) { break; } addr = &iter->next; } PrrtTimerListNode *next = (*addr)->next; node->next = next; (*addr)->next = node; atomic_store(&node->next, iter); atomic_store(addr, node); PrrtTimerListNode *newhead = timer->list; pthread_mutex_unlock(&timer->lock); TimerNode *del = atomic_load(&self->del); TimerNode *tail = atomic_load(&self->new); if (del == tail || timer_date_is_lt(&node->date, &tail->date)) { atomic_store(&self->new, node); timer_wake_worker(self, false); } if (oldhead != newhead) timer_wake_worker(timer); return -1; } void PrrtTimer_end(PrrtTimer *timer) void PrrtTimer_end(PrrtTimer *self) { pthread_cancel(timer->worker); pthread_join(timer->worker, NULL); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *node = timer->list; while (node) { PrrtTimerListNode *next = node->next; free(node); node = next; atomic_store_explicit(&self->alive, false, memory_order_release); timer_wake_worker(self, true); pthread_join(self->worker, NULL); TimerNode *iter = atomic_load(&self->old); while (iter) { TimerNode *next = atomic_load(&iter->next); assert((iter->done || timer_date_is_inf(&iter->date)) && "cleanup task that is not marked as done"); free(iter); iter = next; } pthread_mutex_unlock(&timer->lock); pthread_mutex_destroy(&timer->lock); free(self); } prrt/proto/timer.h +10 −17 Original line number Diff line number Diff line Loading @@ -6,28 +6,21 @@ #include "../util/futex.h" #include "../util/list.h" typedef void *prrtTimerTaskArg; typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg); typedef struct timespec prrtTimerDate; typedef struct prrtTimerTask { void (*fun)(void *); void *arg; prrtTimerTaskFun fun; prrtTimerTaskArg arg; } PrrtTimerTask; typedef struct prrtTimerListNode { struct prrtTimerListNode *next; struct timespec time; PrrtTimerTask task; } PrrtTimerListNode; typedef struct prrtTimer { pthread_t worker; pthread_mutex_t lock; PrrtTimerListNode *list; atomic_uint wake; unsigned long long resolution; } PrrtTimer; typedef struct prrtTimer PrrtTimer; PrrtTimer *PrrtTimer_create(void); PrrtTimer *PrrtTimer_create(unsigned int core); void *PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task); int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTimerTask *what); void PrrtTimer_end(PrrtTimer *timer); Loading Loading
prrt/proto/timer.c +361 −135 Original line number Diff line number Diff line #include "timer.h" #include <assert.h> #include <errno.h> #include <pthread.h> #include <sched.h> #include <stdatomic.h> #include <stdbool.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> #include <stdio.h> #include <time.h> #include <limits.h> #include <linux/futex.h> #include <sys/time.h> #include <sys/syscall.h> #include <sys/prctl.h> #include <unistd.h> typedef prrtTimerDate TimerDate; typedef prrtTimerTaskFun TimerTaskFun; typedef prrtTimerTaskArg TimerTaskArg; static unsigned long long timer_get_resolution(void) { #define N 100 struct timespec last; struct timespec iter; typedef unsigned long long TimerDateUDiff_t; #define TimerDateUDiff_MAX ULLONG_MAX clock_gettime(CLOCK_MONOTONIC, &last); unsigned long long sum = 0; #define NSEC_PER_SEC 1000000000 for (int n = 0; n < N; n++) { clock_gettime(CLOCK_MONOTONIC, &iter); sum += (iter.tv_sec - last.tv_sec) * 1000000000ULL + (iter.tv_nsec - last.tv_nsec); memcpy(&last, &iter, sizeof(struct timespec)); static inline void timer_date_make_inf(TimerDate *td) { td->tv_sec = td->tv_nsec = 0; } return sum / N; static inline bool timer_date_is_inf(const TimerDate *td) { return !td->tv_sec && !td->tv_nsec; } static bool timeout_is_imminent(PrrtTimer *timer, PrrtTimerListNode *node) static inline TimerDateUDiff_t timer_duration_finite(const TimerDate *t1, const TimerDate *t2) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); // assume that t1 and t2 are finite return (t2->tv_sec - t1->tv_sec) * NSEC_PER_SEC + (t2->tv_nsec - t1->tv_nsec); } static inline void timer_date_add(TimerDate *td, TimerDateUDiff_t nsec) { assert(td->tv_nsec >= 0 && "negative time"); while (td->tv_nsec >= NSEC_PER_SEC) { td->tv_sec++; td->tv_nsec -= NSEC_PER_SEC; } td->tv_sec += nsec / NSEC_PER_SEC; td->tv_nsec += nsec % NSEC_PER_SEC; if (td->tv_nsec >= NSEC_PER_SEC) { td->tv_sec++; td->tv_nsec -= NSEC_PER_SEC; } } struct timespec when = node->time; static inline void timer_date_sub(TimerDate *td, TimerDateUDiff_t nsec) { assert(td->tv_nsec >= 0 && "negative time"); td->tv_sec -= nsec / NSEC_PER_SEC; nsec %= NSEC_PER_SEC; if ((TimerDateUDiff_t) td->tv_nsec < nsec) { td->tv_sec--; td->tv_nsec += NSEC_PER_SEC; } td->tv_nsec -= nsec; } #define EPSILON (timer->resolution) static inline bool timer_date_is_lt(const TimerDate *ta, const TimerDate *tb) { if (timer_date_is_inf(ta)) return false; if (timer_date_is_inf(tb)) return true; uint64_t delta = 0; // TODO: is integer overflow relevant here? if (when.tv_sec > now.tv_sec) { if (ta->tv_sec < tb->tv_sec) return true; } else if (when.tv_sec == now.tv_sec) { if (when.tv_nsec > now.tv_nsec) if (ta->tv_sec > tb->tv_sec) return false; if (ta->tv_nsec < tb->tv_nsec) return true; delta = now.tv_nsec - when.tv_nsec; } else if (when.tv_sec == now.tv_sec + 1){ delta = 1000000000ULL + (now.tv_nsec - when.tv_nsec); } else { if (ta->tv_nsec > tb->tv_nsec) return false; return false; } return delta < EPSILON; static inline bool timer_date_eq(const TimerDate *ta, const TimerDate *tb) { return ta->tv_sec == tb->tv_sec && ta->tv_nsec == tb->tv_nsec; } #define CMP_EARLIER (-1) #define CMP_EQUAL (0) #define CMP_LATER (+1) static inline TimerDateUDiff_t timer_measure_clock_precision_round(void) { TimerDate a,b; clock_gettime(CLOCK_REALTIME, &a); do { clock_gettime(CLOCK_REALTIME, &b); } while (0 == timer_duration_finite(&a, &b)); return timer_duration_finite(&a, &b); } static int timeout_compare(PrrtTimerListNode *n1, PrrtTimerListNode *n2) static TimerDateUDiff_t timer_measure_clock_precision(void) { const unsigned int ROUNDS = 10; TimerDateUDiff_t sum = 0; for (int r = 0; r < ROUNDS; r++) sum += timer_measure_clock_precision_round(); return (sum + ROUNDS / 2) / ROUNDS; } struct timespec t1 = n1->time; struct timespec t2 = n2->time; typedef struct prrtTimerNode { _Atomic(struct prrtTimerNode *) next; atomic_bool done; TimerDate date; TimerTaskArg arg; TimerTaskFun fun; } TimerNode; #define OSP_WINDOW_SIZE 8 struct prrtTimer { pthread_t worker; atomic_bool alive; atomic_int wait; _Atomic(TimerNode *) new; _Atomic(TimerNode *) old; _Atomic(TimerNode *) del; TimerDateUDiff_t precision; TimerDateUDiff_t lcp; TimerDateUDiff_t osp; TimerDateUDiff_t osp_window[OSP_WINDOW_SIZE]; unsigned int osp_idx; }; typedef struct prrtTimer Timer; static bool timer_date_is_due(Timer *self, TimerDate *when, const TimerDate *now) { // TODO: use self->precision to check whether now and *when are similar enough (void) self; return timer_date_is_lt(when, now); } static void compute_sleep_end(Timer *self, TimerDate *out, const TimerDate *end) { *out = *end; if (timer_date_is_inf(end)) return; if (t1.tv_sec > t2.tv_sec) return CMP_LATER; if (t1.tv_sec < t2.tv_sec) return CMP_EARLIER; if (t1.tv_nsec > t2.tv_nsec) return CMP_LATER; if (t1.tv_nsec < t2.tv_nsec) return CMP_EARLIER; return CMP_EQUAL; // is is okay when the computed sleep end is in the past timer_date_sub(out, self->osp + self->lcp + 2 * self->precision); } static bool compute_sleep_timeout(PrrtTimer *timer, struct timespec *timeout, struct timespec *sleep) static inline void learn_osp(Timer *self, TimerDateUDiff_t new) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); if (timeout->tv_sec < now.tv_sec) return false; if (timeout->tv_sec == now.tv_sec && timeout->tv_nsec < now.tv_nsec) return false; unsigned long long delta = (timeout->tv_sec - now.tv_sec) * 1000000000ULL + (timeout->tv_nsec - now.tv_nsec); unsigned int idx = self->osp_idx; TimerDateUDiff_t old = self->osp_window[idx]; (void) old; self->osp_window[idx] = new; if (delta < timer->resolution) return false; TimerDateUDiff_t max = self->osp_window[0]; for (int i = 1; i < OSP_WINDOW_SIZE; i++) max = self->osp_window[i] > max ? self->osp_window[i] : max; self->osp = max; delta -= timer->resolution; self->osp_idx = (idx + 1) & (OSP_WINDOW_SIZE - 1); } sleep->tv_sec = delta / 1000000000ULL; sleep->tv_nsec = delta % 1000000000ULL; return true; static inline void learn_lcp(Timer *self, TimerDateUDiff_t new) { TimerDateUDiff_t old = self->lcp; self->lcp = (3 * old + new) / 4; } static void task_run(PrrtTimerTask *task) #define futex(...) syscall(SYS_futex, __VA_ARGS__) static void timer_wait_imprecise(Timer *self, const TimerDate *end) { task->fun(task->arg); bool forever = timer_date_is_inf(end); futex(&self->wait, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, forever ? NULL : end, NULL, FUTEX_BITSET_MATCH_ANY); } static void *timer_loop(void *arg) static void timer_wake_worker(PrrtTimer *self, bool force) { PrrtTimer *timer = (PrrtTimer *) arg; int one = 1; if (atomic_compare_exchange_strong(&self->wait, &one, 0) || force) futex(&self->wait, FUTEX_WAKE|FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0); } unsigned int oval = atomic_load_explicit(&timer->wake, memory_order_acquire); static void *timer_worker_loop(void *arg) { PrrtTimer *self = (PrrtTimer *) arg; pthread_mutex_lock(&timer->lock); PrrtTimerListNode *item = timer->list; pthread_mutex_unlock(&timer->lock); bool slept, learned; while (true) { if (!item) { futex_wait((atomic_int *) &timer->wake, oval); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); while (1) { loop:; atomic_store(&self->wait, 1); TimerNode *task = atomic_load(&self->new); assert(task != NULL && "task list contains NULL node"); if (timer_date_is_inf(&task->date)) { if (!atomic_load_explicit(&self->alive, memory_order_acquire)) { if (task == atomic_load(&self->new)) break; } } TimerDate sleep_end; compute_sleep_end(self, &sleep_end, &task->date); TimerDate now, td1, td2; TimerDateUDiff_t oversleep = 0; slept = false; learned = false; clock_gettime(CLOCK_REALTIME, &now); if (timer_date_is_lt(&now, &sleep_end)) { slept = true; timer_wait_imprecise(self, &sleep_end); clock_gettime(CLOCK_REALTIME, &td1); if (timer_date_is_lt(&td1, &sleep_end)) goto loop; oversleep = timer_duration_finite(&sleep_end, &td1); TimerDateUDiff_t avail = timer_duration_finite(&td1, &task->date); if (timer_date_is_lt(&td1, &task->date) && avail >= 2 * self->lcp) { learn_osp(self, oversleep); learned = true; clock_gettime(CLOCK_REALTIME, &td2); learn_lcp(self, timer_duration_finite(&td1, &td2)); } } pthread_mutex_lock(&timer->lock); item = timer->list; pthread_mutex_unlock(&timer->lock); if (timeout_is_imminent(timer, item)) { task_run(&item->task); atomic_store(&self->wait, 0); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *next = item->next; timer->list = next; pthread_mutex_unlock(&timer->lock); while (true) { if (timer_date_is_due(self, &task->date, &now)) break; if (task != atomic_load(&self->new)) goto loop; clock_gettime(CLOCK_REALTIME, &now); } free(item); item = next; if (!task->done) { task->fun(task->arg); task->done = true; } if (item) { struct timespec sleep; if (compute_sleep_timeout(timer, &item->time, &sleep)) futex_wait_until((atomic_int *) &timer->wake, oval, &sleep); oval = atomic_load_explicit(&timer->wake, memory_order_acquire); atomic_store(&self->del, task); TimerNode *next = atomic_load(&task->next); TimerNode *temp = task; atomic_compare_exchange_strong(&self->new, &temp, next); atomic_store(&self->del, NULL); if (slept && !learned) { clock_gettime(CLOCK_REALTIME, &td1); learn_osp(self, oversleep); clock_gettime(CLOCK_REALTIME, &td2); learn_lcp(self, timer_duration_finite(&td1, &td2)); } if (!slept) { for (int i = 0; i < OSP_WINDOW_SIZE; i++) self->osp_window[i] = self->osp_window[i] / 4 * 3; self->osp = self->osp / 4 * 3; } return NULL; } static void timer_wake_worker(PrrtTimer *timer) { atomic_fetch_add_explicit(&timer->wake, 1, memory_order_acq_rel); futex_wake_one((atomic_int *) &timer->wake); return self; } PrrtTimer *PrrtTimer_create(void) PrrtTimer *PrrtTimer_create(unsigned int core) { PrrtTimer *timer = malloc(sizeof(PrrtTimer)); assert(timer); int err; PrrtTimer *self = malloc(sizeof(PrrtTimer)); if (!self) return NULL; int failed = pthread_create(&timer->worker, NULL, timer_loop, timer); assert(!failed); // create dummy node TimerNode *node = malloc(sizeof(TimerNode)); if (!node) { free(self); return NULL; } timer_date_make_inf(&node->date); atomic_store_explicit(&node->done, false, memory_order_relaxed); atomic_store_explicit(&node->next, NULL, memory_order_relaxed); atomic_store_explicit(&self->alive, true, memory_order_relaxed); atomic_store_explicit(&self->wait, 0, memory_order_relaxed); atomic_store_explicit(&self->new, node, memory_order_relaxed); atomic_store_explicit(&self->old, node, memory_order_relaxed); atomic_store_explicit(&self->del, NULL, memory_order_relaxed); self->precision = timer_measure_clock_precision(); for (int i = 0; i < OSP_WINDOW_SIZE; i++) self->osp_window[i] = self->precision; self->osp = self->precision; self->lcp = self->precision; self->osp_idx = 0; atomic_thread_fence(memory_order_release); // start worker thread pthread_attr_t attr; err = pthread_attr_init(&attr); if (err) { free(node); free(self); errno = err; return NULL; } timer->resolution = timer_get_resolution(); cpu_set_t set; CPU_ZERO(&set); CPU_SET(core, &set); pthread_mutex_init(&timer->lock, NULL); timer->list = NULL; atomic_store_explicit(&timer->wake, 0, memory_order_release); return timer; err = pthread_attr_setaffinity_np(&attr, sizeof(set), &set); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } void PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task) { PrrtTimerListNode *node = malloc(sizeof(PrrtTimerListNode)); assert(node); err = pthread_attr_setschedpolicy(&attr, SCHED_FIFO); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } memcpy(&node->task, task, sizeof(PrrtTimerTask)); memcpy(&node->time, time, sizeof(struct timespec)); err = pthread_create(&self->worker, &attr, timer_worker_loop, self); if (err) { free(node); free(self); pthread_attr_destroy(&attr); errno = err; return NULL; } pthread_mutex_lock(&timer->lock); PrrtTimerListNode *oldhead = timer->list; pthread_attr_destroy(&attr); return self; } PrrtTimerListNode **addr = &timer->list; while (true) { PrrtTimerListNode *iter = *addr; if (!iter) break; if (CMP_EARLIER != timeout_compare(node, iter)) int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what) { TimerNode *iter, *stop, *next; TimerNode *node = malloc(sizeof(TimerNode)); if (!node) return -1; node->done = false; node->arg = what->arg; node->fun = what->fun; node->date = *when; // fix the date, if needed if (timer_date_is_inf(&node->date)) timer_date_add(&node->date, 1); iter = atomic_load(&self->old); stop = atomic_load(&self->new); while (iter != stop) { next = iter->next; assert(iter->done && "cleanup task that is not marked as done"); free(iter); iter = next; } atomic_store(&self->old, stop); _Atomic(TimerNode *) *addr = &self->old; while (1) { iter = atomic_load(addr); assert(iter && "unexpected NULL pointer in task list"); // make sure every date is unique if (timer_date_eq(&node->date, &iter->date)) { timer_date_add(&node->date, 1); if (timer_date_is_inf(&node->date)) timer_date_add(&node->date, 1); } else if (timer_date_is_lt(&node->date, &iter->date)) { break; } addr = &iter->next; } PrrtTimerListNode *next = (*addr)->next; node->next = next; (*addr)->next = node; atomic_store(&node->next, iter); atomic_store(addr, node); PrrtTimerListNode *newhead = timer->list; pthread_mutex_unlock(&timer->lock); TimerNode *del = atomic_load(&self->del); TimerNode *tail = atomic_load(&self->new); if (del == tail || timer_date_is_lt(&node->date, &tail->date)) { atomic_store(&self->new, node); timer_wake_worker(self, false); } if (oldhead != newhead) timer_wake_worker(timer); return -1; } void PrrtTimer_end(PrrtTimer *timer) void PrrtTimer_end(PrrtTimer *self) { pthread_cancel(timer->worker); pthread_join(timer->worker, NULL); pthread_mutex_lock(&timer->lock); PrrtTimerListNode *node = timer->list; while (node) { PrrtTimerListNode *next = node->next; free(node); node = next; atomic_store_explicit(&self->alive, false, memory_order_release); timer_wake_worker(self, true); pthread_join(self->worker, NULL); TimerNode *iter = atomic_load(&self->old); while (iter) { TimerNode *next = atomic_load(&iter->next); assert((iter->done || timer_date_is_inf(&iter->date)) && "cleanup task that is not marked as done"); free(iter); iter = next; } pthread_mutex_unlock(&timer->lock); pthread_mutex_destroy(&timer->lock); free(self); }
prrt/proto/timer.h +10 −17 Original line number Diff line number Diff line Loading @@ -6,28 +6,21 @@ #include "../util/futex.h" #include "../util/list.h" typedef void *prrtTimerTaskArg; typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg); typedef struct timespec prrtTimerDate; typedef struct prrtTimerTask { void (*fun)(void *); void *arg; prrtTimerTaskFun fun; prrtTimerTaskArg arg; } PrrtTimerTask; typedef struct prrtTimerListNode { struct prrtTimerListNode *next; struct timespec time; PrrtTimerTask task; } PrrtTimerListNode; typedef struct prrtTimer { pthread_t worker; pthread_mutex_t lock; PrrtTimerListNode *list; atomic_uint wake; unsigned long long resolution; } PrrtTimer; typedef struct prrtTimer PrrtTimer; PrrtTimer *PrrtTimer_create(void); PrrtTimer *PrrtTimer_create(unsigned int core); void *PrrtTimer_submit(PrrtTimer *timer, struct timespec *time, const PrrtTimerTask *task); int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTimerTask *what); void PrrtTimer_end(PrrtTimer *timer); Loading