Skip to content
Commits on Source (17)
...@@ -65,6 +65,7 @@ test:prrt_functional: ...@@ -65,6 +65,7 @@ test:prrt_functional:
tags: tags:
- bash - bash
script: script:
- rm CMakeCache.txt
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1 - CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
- make - make
- exec ./prrtTests - exec ./prrtTests
......
[submodule "prrt/xlap"] [submodule "prrt/xlap"]
path = prrt/xlap path = prrt/xlap
url = ../../as/X-Lap.git url = ../X-Lap.git
...@@ -9,5 +9,5 @@ s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150) ...@@ -9,5 +9,5 @@ s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port)) s.connect((host, port))
for i in range(10): for i in range(10):
s.send("Packet {}".format(i).encode("utf8")) s.send_sync("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8")) s.send_sync("Close".encode("utf8"))
...@@ -181,6 +181,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -181,6 +181,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false; return false;
} }
void block_timeout(void* arg);
typedef struct timer_arg { typedef struct timer_arg {
PrrtSocket* socket; PrrtSocket* socket;
PrrtBlock* block; PrrtBlock* block;
...@@ -261,9 +263,10 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -261,9 +263,10 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs)); RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
args->block = sock_ptr->receiveBlock; args->block = sock_ptr->receiveBlock;
sock_ptr->receiveBlock = NULL;
args->socket = sock_ptr; args->socket = sock_ptr;
retransmission_round_handler(args); retransmission_round_handler(args);
sock_ptr->receiveBlock = NULL;
} }
} else { } else {
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
......
...@@ -97,6 +97,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt ...@@ -97,6 +97,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
atomic_store_explicit(&s->closing, false, memory_order_release); atomic_store_explicit(&s->closing, false, memory_order_release);
s->receiver = NULL; s->receiver = NULL;
s->retransmissionTimer = PrrtTimer_create(0);
uint8_t n_cycle[1] = { N_START - K_START }; uint8_t n_cycle[1] = { N_START - K_START };
s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle); s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
s->coder = PrrtCoder_create(s->codingParameters); s->coder = PrrtCoder_create(s->codingParameters);
......
...@@ -100,7 +100,6 @@ typedef struct prrtSocket { ...@@ -100,7 +100,6 @@ typedef struct prrtSocket {
atomic_bool isThreadPinning; atomic_bool isThreadPinning;
prrtByteCount_t maximum_payload_size; prrtByteCount_t maximum_payload_size;
PrrtTimer *retransmissionTimer; PrrtTimer *retransmissionTimer;
} PrrtSocket; } PrrtSocket;
......
...@@ -133,7 +133,7 @@ struct prrtTimer { ...@@ -133,7 +133,7 @@ struct prrtTimer {
atomic_int wait; atomic_int wait;
_Atomic(TimerNode *) new; _Atomic(TimerNode *) new;
_Atomic(TimerNode *) old; _Atomic(TimerNode *) old;
_Atomic(TimerNode *) del; _Atomic(TimerNode *) cur;
TimerDateUDiff_t precision; TimerDateUDiff_t precision;
TimerDateUDiff_t lcp; TimerDateUDiff_t lcp;
TimerDateUDiff_t osp; TimerDateUDiff_t osp;
...@@ -143,7 +143,7 @@ struct prrtTimer { ...@@ -143,7 +143,7 @@ struct prrtTimer {
typedef struct prrtTimer Timer; typedef struct prrtTimer Timer;
static bool timer_date_is_due(Timer *self, TimerDate *when, const TimerDate *now) static bool timer_date_is_due(Timer *self, const TimerDate *when, const TimerDate *now)
{ {
// TODO: use self->precision to check whether now and *when are similar enough // TODO: use self->precision to check whether now and *when are similar enough
(void) self; (void) self;
...@@ -208,6 +208,8 @@ static void *timer_worker_loop(void *arg) ...@@ -208,6 +208,8 @@ static void *timer_worker_loop(void *arg)
TimerNode *task = atomic_load(&self->new); TimerNode *task = atomic_load(&self->new);
assert(task != NULL && "task list contains NULL node"); assert(task != NULL && "task list contains NULL node");
atomic_store(&self->cur, task);
if (timer_date_is_inf(&task->date)) { if (timer_date_is_inf(&task->date)) {
if (!atomic_load_explicit(&self->alive, memory_order_acquire)) { if (!atomic_load_explicit(&self->alive, memory_order_acquire)) {
if (task == atomic_load(&self->new)) if (task == atomic_load(&self->new))
...@@ -260,11 +262,9 @@ static void *timer_worker_loop(void *arg) ...@@ -260,11 +262,9 @@ static void *timer_worker_loop(void *arg)
task->done = true; task->done = true;
} }
atomic_store(&self->del, task);
TimerNode *next = atomic_load(&task->next); TimerNode *next = atomic_load(&task->next);
TimerNode *temp = task; TimerNode *temp = task;
atomic_compare_exchange_strong(&self->new, &temp, next); atomic_compare_exchange_strong(&self->new, &temp, next);
atomic_store(&self->del, NULL);
if (slept && !learned) { if (slept && !learned) {
clock_gettime(CLOCK_REALTIME, &td1); clock_gettime(CLOCK_REALTIME, &td1);
...@@ -306,7 +306,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core) ...@@ -306,7 +306,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)
atomic_store_explicit(&self->wait, 0, memory_order_relaxed); atomic_store_explicit(&self->wait, 0, memory_order_relaxed);
atomic_store_explicit(&self->new, node, memory_order_relaxed); atomic_store_explicit(&self->new, node, memory_order_relaxed);
atomic_store_explicit(&self->old, node, memory_order_relaxed); atomic_store_explicit(&self->old, node, memory_order_relaxed);
atomic_store_explicit(&self->del, NULL, memory_order_relaxed); atomic_store_explicit(&self->cur, node, memory_order_relaxed);
self->precision = timer_measure_clock_precision(); self->precision = timer_measure_clock_precision();
for (int i = 0; i < OSP_WINDOW_SIZE; i++) for (int i = 0; i < OSP_WINDOW_SIZE; i++)
...@@ -364,7 +364,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core) ...@@ -364,7 +364,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)
int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what) int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what)
{ {
TimerNode *iter, *stop, *next; TimerNode *iter, *stop, *next, *hold;
TimerNode *node = malloc(sizeof(TimerNode)); TimerNode *node = malloc(sizeof(TimerNode));
if (!node) if (!node)
return -1; return -1;
...@@ -380,13 +380,14 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask ...@@ -380,13 +380,14 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
iter = atomic_load(&self->old); iter = atomic_load(&self->old);
stop = atomic_load(&self->new); stop = atomic_load(&self->new);
while (iter != stop) { hold = atomic_load(&self->cur);
while (iter != stop && iter != hold) {
next = iter->next; next = iter->next;
assert(iter->done && "cleanup task that is not marked as done"); assert(iter->done && "cleanup task that is not marked as done");
free(iter); free(iter);
iter = next; iter = next;
} }
atomic_store(&self->old, stop); atomic_store(&self->old, iter);
_Atomic(TimerNode *) *addr = &self->old; _Atomic(TimerNode *) *addr = &self->old;
while (1) { while (1) {
...@@ -407,9 +408,8 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask ...@@ -407,9 +408,8 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
atomic_store(&node->next, iter); atomic_store(&node->next, iter);
atomic_store(addr, node); atomic_store(addr, node);
TimerNode *del = atomic_load(&self->del);
TimerNode *tail = atomic_load(&self->new); TimerNode *tail = atomic_load(&self->new);
if (del == tail || timer_date_is_lt(&node->date, &tail->date)) { if (timer_date_is_lt(&node->date, &tail->date) || (addr == &tail->next && atomic_load(&tail->done))) {
atomic_store(&self->new, node); atomic_store(&self->new, node);
timer_wake_worker(self, false); timer_wake_worker(self, false);
} }
...@@ -417,6 +417,50 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask ...@@ -417,6 +417,50 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
return -1; return -1;
} }
static void wake_sleeping_thread(void *arg)
{
atomic_int *ip = (atomic_int *) arg;
atomic_store_explicit(ip, 1, memory_order_release);
futex((int *) ip, FUTEX_WAKE|FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0);
}
void PrrtTimer_sleep_until(PrrtTimer *self, const TimerDate *end)
{
atomic_int cond;
atomic_store_explicit(&cond, 0, memory_order_release);
TimerDate now;
TimerDate care = *end;
PrrtTimerTask what;
what.fun = wake_sleeping_thread;
what.arg = &cond;
timer_date_sub(&care, 2*self->osp);
clock_gettime(CLOCK_REALTIME, &now);
if (!timer_date_is_due(self, &care, &now)) {
PrrtTimer_submit(self, &care, &what);
while (!atomic_load(&cond)) {
clock_gettime(CLOCK_REALTIME, &now);
if (!timer_date_is_due(self, &care, &now))
//futex(&cond, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, care, NULL, FUTEX_BITSET_MATCH_ANY);
futex(&cond, FUTEX_WAIT|FUTEX_PRIVATE_FLAG, 0, NULL, NULL, 0);
}
}
while (1) {
clock_gettime(CLOCK_REALTIME, &now);
if (timer_date_is_due(self, end, &now))
break;
}
}
void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos)
{
TimerDate when;
clock_gettime(CLOCK_REALTIME, &when);
timer_date_add(&when, nanos);
PrrtTimer_sleep_until(self, &when);
}
void PrrtTimer_end(PrrtTimer *self) void PrrtTimer_end(PrrtTimer *self)
{ {
atomic_store_explicit(&self->alive, false, memory_order_release); atomic_store_explicit(&self->alive, false, memory_order_release);
......
...@@ -10,6 +10,7 @@ typedef void *prrtTimerTaskArg; ...@@ -10,6 +10,7 @@ typedef void *prrtTimerTaskArg;
typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg); typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg);
typedef struct timespec prrtTimerDate; typedef struct timespec prrtTimerDate;
typedef unsigned long long TimerDateUDiff_t;
typedef struct prrtTimerTask { typedef struct prrtTimerTask {
prrtTimerTaskFun fun; prrtTimerTaskFun fun;
...@@ -24,4 +25,7 @@ int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTime ...@@ -24,4 +25,7 @@ int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTime
void PrrtTimer_end(PrrtTimer *timer); void PrrtTimer_end(PrrtTimer *timer);
void PrrtTimer_sleep_until(PrrtTimer *self, const prrtTimerDate *end);
void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos);
#endif // PRRT_TIMER_H #endif // PRRT_TIMER_H
...@@ -43,3 +43,17 @@ int pin_thread_to_core(pthread_attr_t *ap, int core) ...@@ -43,3 +43,17 @@ int pin_thread_to_core(pthread_attr_t *ap, int core)
return pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset); return pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset);
} }
struct timespec abstime_from_now(uint32_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
struct timespec deadline;
uint32_t diff_s = wait_time / 1000000;
uint32_t diff_ns = (wait_time % 1000000) * 1000;
__syscall_slong_t sum = diff_ns + now.tv_nsec;
__syscall_slong_t carry = (sum) / 1000000000;
__syscall_slong_t rest = (sum) % 1000000000;
deadline.tv_sec = diff_s + now.tv_sec + carry;
deadline.tv_nsec = rest;
return deadline;
}
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
int print_buffer(const char *buf, const int length); int print_buffer(const char *buf, const int length);
void print_gf(const gf *start, const int len); void print_gf(const gf *start, const int len);
int pin_thread_to_core(pthread_attr_t *ap, int core); int pin_thread_to_core(pthread_attr_t *ap, int core);
struct timespec abstime_from_now(uint32_t wait_time);
#define PERROR(fmt, args...) \ #define PERROR(fmt, args...) \
printf("PRRT ERROR (" __FILE__ ":%d)\n" fmt, __LINE__, ## args); printf("PRRT ERROR (" __FILE__ ":%d)\n" fmt, __LINE__, ## args);
......
...@@ -15,21 +15,6 @@ struct timespec abstime_now() { ...@@ -15,21 +15,6 @@ struct timespec abstime_now() {
return now; return now;
} }
struct timespec abstime_from_now(uint32_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
struct timespec deadline;
uint32_t diff_s = wait_time / 1000000;
uint32_t diff_ns = (wait_time % 1000000) * 1000;
__syscall_slong_t sum = diff_ns + now.tv_nsec;
__syscall_slong_t carry = (sum) / 1000000000;
__syscall_slong_t rest = (sum) % 1000000000;
deadline.tv_sec = diff_s + now.tv_sec + carry;
deadline.tv_nsec = rest;
return deadline;
}
// < 0: a less than b (b is in the future) // < 0: a less than b (b is in the future)
// > 0: a greater b (b is in the past) // > 0: a greater b (b is in the past)
// == 0: a equal b // == 0: a equal b
......
Subproject commit b87b0b7953c279c4acde42f2b3535c309048d8ab Subproject commit 956abea52c2a88e6d5124a5bf04289a1c2563ca4