Commit 3063e8bc authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Merge remote-tracking branch 'origin/timer' into develop

parents 6c34299c def4cf26
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -9,5 +9,5 @@ s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port))

for i in range(10):
    s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
    s.send_sync("Packet {}".format(i).encode("utf8"))
s.send_sync("Close".encode("utf8"))
+4 −1
Original line number Diff line number Diff line
@@ -181,6 +181,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
    return false;
}

void block_timeout(void* arg);

typedef struct timer_arg {
    PrrtSocket* socket;
    PrrtBlock* block;
@@ -261,9 +263,10 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {

            RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
            args->block = sock_ptr->receiveBlock;
            sock_ptr->receiveBlock = NULL;
            args->socket = sock_ptr;
            retransmission_round_handler(args);

            sock_ptr->receiveBlock = NULL;
        }
    } else {
        PrrtPacket_destroy(packet);
+2 −0
Original line number Diff line number Diff line
@@ -97,6 +97,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
    atomic_store_explicit(&s->closing, false, memory_order_release);
    s->receiver = NULL;

    s->retransmissionTimer = PrrtTimer_create(0);

    uint8_t n_cycle[1] = { N_START - K_START };
    s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
    s->coder = PrrtCoder_create(s->codingParameters);
+0 −1
Original line number Diff line number Diff line
@@ -100,7 +100,6 @@ typedef struct prrtSocket {

    atomic_bool isThreadPinning;
    prrtByteCount_t maximum_payload_size;

    PrrtTimer *retransmissionTimer;
} PrrtSocket;

+54 −10
Original line number Diff line number Diff line
@@ -133,7 +133,7 @@ struct prrtTimer {
	atomic_int           wait;
	_Atomic(TimerNode *) new;
	_Atomic(TimerNode *) old;
	_Atomic(TimerNode *) del;
	_Atomic(TimerNode *) cur;
	TimerDateUDiff_t     precision;
	TimerDateUDiff_t     lcp;
	TimerDateUDiff_t     osp;
@@ -143,7 +143,7 @@ struct prrtTimer {

typedef struct prrtTimer Timer;

static bool timer_date_is_due(Timer *self, TimerDate *when, const TimerDate *now)
static bool timer_date_is_due(Timer *self, const TimerDate *when, const TimerDate *now)
{
	// TODO: use self->precision to check whether now and *when are similar enough
	(void) self;
@@ -208,6 +208,8 @@ static void *timer_worker_loop(void *arg)
		TimerNode *task = atomic_load(&self->new);
		assert(task != NULL && "task list contains NULL node");

		atomic_store(&self->cur, task);

		if (timer_date_is_inf(&task->date)) {
			if (!atomic_load_explicit(&self->alive, memory_order_acquire)) {
				if (task == atomic_load(&self->new))
@@ -260,11 +262,9 @@ static void *timer_worker_loop(void *arg)
			task->done = true;
		}

		atomic_store(&self->del, task);
		TimerNode *next = atomic_load(&task->next);
		TimerNode *temp = task;
		atomic_compare_exchange_strong(&self->new, &temp, next);
		atomic_store(&self->del, NULL);

		if (slept && !learned) {
			clock_gettime(CLOCK_REALTIME, &td1);
@@ -306,7 +306,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)
	atomic_store_explicit(&self->wait,  0,    memory_order_relaxed);
	atomic_store_explicit(&self->new,   node, memory_order_relaxed);
	atomic_store_explicit(&self->old,   node, memory_order_relaxed);
	atomic_store_explicit(&self->del,   NULL, memory_order_relaxed);
	atomic_store_explicit(&self->cur,   node, memory_order_relaxed);

	self->precision = timer_measure_clock_precision();
	for (int i = 0; i < OSP_WINDOW_SIZE; i++)
@@ -364,7 +364,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)

int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what)
{
	TimerNode *iter, *stop, *next;
	TimerNode *iter, *stop, *next, *hold;
	TimerNode *node = malloc(sizeof(TimerNode));
	if (!node)
		return -1;
@@ -380,13 +380,14 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask

	iter = atomic_load(&self->old);
	stop = atomic_load(&self->new);
	while (iter != stop) {
	hold = atomic_load(&self->cur);
	while (iter != stop && iter != hold) {
		next = iter->next;
		assert(iter->done && "cleanup task that is not marked as done");
		free(iter);
		iter = next;
	}
	atomic_store(&self->old, stop);
	atomic_store(&self->old, iter);

	_Atomic(TimerNode *) *addr = &self->old;
	while (1) {
@@ -407,9 +408,8 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
	atomic_store(&node->next, iter);
	atomic_store(addr, node);

	TimerNode *del = atomic_load(&self->del);
	TimerNode *tail = atomic_load(&self->new);
	if (del == tail || timer_date_is_lt(&node->date, &tail->date)) {
	if (timer_date_is_lt(&node->date, &tail->date) || (addr == &tail->next && atomic_load(&tail->done))) {
		atomic_store(&self->new, node);
		timer_wake_worker(self, false);
	}
@@ -417,6 +417,50 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
	return -1;
}

static void wake_sleeping_thread(void *arg)
{
	atomic_int *ip = (atomic_int *) arg;
	atomic_store_explicit(ip, 1, memory_order_release);
	futex((int *) ip, FUTEX_WAKE|FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0);
}

void PrrtTimer_sleep_until(PrrtTimer *self, const TimerDate *end)
{
	atomic_int cond;
	atomic_store_explicit(&cond, 0, memory_order_release);

	TimerDate now;
	TimerDate care = *end;
	PrrtTimerTask what;
	what.fun = wake_sleeping_thread;
	what.arg = &cond;
	timer_date_sub(&care, 2*self->osp);
	clock_gettime(CLOCK_REALTIME, &now);
	if (!timer_date_is_due(self, &care, &now)) {
		PrrtTimer_submit(self, &care, &what);
		while (!atomic_load(&cond)) {
			clock_gettime(CLOCK_REALTIME, &now);
			if (!timer_date_is_due(self, &care, &now))
				//futex(&cond, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, care, NULL, FUTEX_BITSET_MATCH_ANY);
				futex(&cond, FUTEX_WAIT|FUTEX_PRIVATE_FLAG, 0, NULL, NULL, 0);
		}
	}

	while (1) {
		clock_gettime(CLOCK_REALTIME, &now);
		if (timer_date_is_due(self, end, &now))
			break;
	}
}

void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos)
{
	TimerDate when;
	clock_gettime(CLOCK_REALTIME, &when);
	timer_date_add(&when, nanos);
	PrrtTimer_sleep_until(self, &when);
}

void PrrtTimer_end(PrrtTimer *self)
{
	atomic_store_explicit(&self->alive, false, memory_order_release);
Loading