diff --git a/examples/sender.py b/examples/sender.py index 433251186f1a8cd6ddfb1e647460ed1b172f26ad..fc1b98bd74dc652f7b88dab6aefe65261f333b1d 100644 --- a/examples/sender.py +++ b/examples/sender.py @@ -9,5 +9,5 @@ s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150) s.connect((host, port)) for i in range(10): - s.send("Packet {}".format(i).encode("utf8")) -s.send("Close".encode("utf8")) + s.send_sync("Packet {}".format(i).encode("utf8")) +s.send_sync("Close".encode("utf8")) diff --git a/prrt/proto/processes/dataTransmitter.c b/prrt/proto/processes/dataTransmitter.c index f8356954d6bc0a5b52b44999960c7c90602b36e7..957cbb1df70be016db23bfe22a7fbc81c87f903c 100644 --- a/prrt/proto/processes/dataTransmitter.c +++ b/prrt/proto/processes/dataTransmitter.c @@ -181,6 +181,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { return false; } +void block_timeout(void* arg); + typedef struct timer_arg { PrrtSocket* socket; PrrtBlock* block; @@ -261,9 +263,10 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) { RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs)); args->block = sock_ptr->receiveBlock; - sock_ptr->receiveBlock = NULL; args->socket = sock_ptr; retransmission_round_handler(args); + + sock_ptr->receiveBlock = NULL; } } else { PrrtPacket_destroy(packet); diff --git a/prrt/proto/socket.c b/prrt/proto/socket.c index e5e74938a0a8090b255be40a5e40d57a2b3c069a..457af8eda6ab0f3b1e1d6b52ae33ff2add328ad1 100644 --- a/prrt/proto/socket.c +++ b/prrt/proto/socket.c @@ -97,6 +97,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt atomic_store_explicit(&s->closing, false, memory_order_release); s->receiver = NULL; + s->retransmissionTimer = PrrtTimer_create(0); + uint8_t n_cycle[1] = { N_START - K_START }; s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle); s->coder = PrrtCoder_create(s->codingParameters); diff --git a/prrt/proto/socket.h b/prrt/proto/socket.h index 7d86d754eae7e1ae3b2a552a9a13e28aeb8a3414..87a297e3e09ae4d7255dcf7f2edfa5faf67c790a 100644 --- a/prrt/proto/socket.h +++ b/prrt/proto/socket.h @@ -100,7 +100,6 @@ typedef struct prrtSocket { atomic_bool isThreadPinning; prrtByteCount_t maximum_payload_size; - PrrtTimer *retransmissionTimer; } PrrtSocket; diff --git a/prrt/proto/timer.c b/prrt/proto/timer.c index 2dc921ced5c8fa7bc52ca1aec652c04b62f12ac4..1ce9a491ab129ef9c95db249495335337b83cf61 100644 --- a/prrt/proto/timer.c +++ b/prrt/proto/timer.c @@ -133,7 +133,7 @@ struct prrtTimer { atomic_int wait; _Atomic(TimerNode *) new; _Atomic(TimerNode *) old; - _Atomic(TimerNode *) del; + _Atomic(TimerNode *) cur; TimerDateUDiff_t precision; TimerDateUDiff_t lcp; TimerDateUDiff_t osp; @@ -143,7 +143,7 @@ struct prrtTimer { typedef struct prrtTimer Timer; -static bool timer_date_is_due(Timer *self, TimerDate *when, const TimerDate *now) +static bool timer_date_is_due(Timer *self, const TimerDate *when, const TimerDate *now) { // TODO: use self->precision to check whether now and *when are similar enough (void) self; @@ -208,6 +208,8 @@ static void *timer_worker_loop(void *arg) TimerNode *task = atomic_load(&self->new); assert(task != NULL && "task list contains NULL node"); + atomic_store(&self->cur, task); + if (timer_date_is_inf(&task->date)) { if (!atomic_load_explicit(&self->alive, memory_order_acquire)) { if (task == atomic_load(&self->new)) @@ -260,11 +262,9 @@ static void *timer_worker_loop(void *arg) task->done = true; } - atomic_store(&self->del, task); TimerNode *next = atomic_load(&task->next); TimerNode *temp = task; atomic_compare_exchange_strong(&self->new, &temp, next); - atomic_store(&self->del, NULL); if (slept && !learned) { clock_gettime(CLOCK_REALTIME, &td1); @@ -306,7 +306,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core) atomic_store_explicit(&self->wait, 0, memory_order_relaxed); atomic_store_explicit(&self->new, node, memory_order_relaxed); atomic_store_explicit(&self->old, node, memory_order_relaxed); - atomic_store_explicit(&self->del, NULL, memory_order_relaxed); + atomic_store_explicit(&self->cur, node, memory_order_relaxed); self->precision = timer_measure_clock_precision(); for (int i = 0; i < OSP_WINDOW_SIZE; i++) @@ -364,7 +364,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core) int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask *what) { - TimerNode *iter, *stop, *next; + TimerNode *iter, *stop, *next, *hold; TimerNode *node = malloc(sizeof(TimerNode)); if (!node) return -1; @@ -380,13 +380,14 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask iter = atomic_load(&self->old); stop = atomic_load(&self->new); - while (iter != stop) { + hold = atomic_load(&self->cur); + while (iter != stop && iter != hold) { next = iter->next; assert(iter->done && "cleanup task that is not marked as done"); free(iter); iter = next; } - atomic_store(&self->old, stop); + atomic_store(&self->old, iter); _Atomic(TimerNode *) *addr = &self->old; while (1) { @@ -407,9 +408,8 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask atomic_store(&node->next, iter); atomic_store(addr, node); - TimerNode *del = atomic_load(&self->del); TimerNode *tail = atomic_load(&self->new); - if (del == tail || timer_date_is_lt(&node->date, &tail->date)) { + if (timer_date_is_lt(&node->date, &tail->date) || (addr == &tail->next && atomic_load(&tail->done))) { atomic_store(&self->new, node); timer_wake_worker(self, false); } @@ -417,6 +417,50 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask return -1; } +static void wake_sleeping_thread(void *arg) +{ + atomic_int *ip = (atomic_int *) arg; + atomic_store_explicit(ip, 1, memory_order_release); + futex((int *) ip, FUTEX_WAKE|FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0); +} + +void PrrtTimer_sleep_until(PrrtTimer *self, const TimerDate *end) +{ + atomic_int cond; + atomic_store_explicit(&cond, 0, memory_order_release); + + TimerDate now; + TimerDate care = *end; + PrrtTimerTask what; + what.fun = wake_sleeping_thread; + what.arg = &cond; + timer_date_sub(&care, 2*self->osp); + clock_gettime(CLOCK_REALTIME, &now); + if (!timer_date_is_due(self, &care, &now)) { + PrrtTimer_submit(self, &care, &what); + while (!atomic_load(&cond)) { + clock_gettime(CLOCK_REALTIME, &now); + if (!timer_date_is_due(self, &care, &now)) + //futex(&cond, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, care, NULL, FUTEX_BITSET_MATCH_ANY); + futex(&cond, FUTEX_WAIT|FUTEX_PRIVATE_FLAG, 0, NULL, NULL, 0); + } + } + + while (1) { + clock_gettime(CLOCK_REALTIME, &now); + if (timer_date_is_due(self, end, &now)) + break; + } +} + +void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos) +{ + TimerDate when; + clock_gettime(CLOCK_REALTIME, &when); + timer_date_add(&when, nanos); + PrrtTimer_sleep_until(self, &when); +} + void PrrtTimer_end(PrrtTimer *self) { atomic_store_explicit(&self->alive, false, memory_order_release); diff --git a/prrt/proto/timer.h b/prrt/proto/timer.h index 232e05ebe5dbdb6a8337a5b86f08db0fd0854b12..0c0570cfd25b594bad0cf2489167df704d2829c9 100644 --- a/prrt/proto/timer.h +++ b/prrt/proto/timer.h @@ -10,6 +10,7 @@ typedef void *prrtTimerTaskArg; typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg); typedef struct timespec prrtTimerDate; +typedef unsigned long long TimerDateUDiff_t; typedef struct prrtTimerTask { prrtTimerTaskFun fun; @@ -24,4 +25,7 @@ int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTime void PrrtTimer_end(PrrtTimer *timer); -#endif // PRRT_TIMER_H \ No newline at end of file +void PrrtTimer_sleep_until(PrrtTimer *self, const prrtTimerDate *end); +void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos); + +#endif // PRRT_TIMER_H diff --git a/prrt/util/common.c b/prrt/util/common.c index 37b2397eeaf1dc4bcc6b0ed61c6ac28068f4ec6f..9035ffe90176616b3290fda4f09375e363677760 100644 --- a/prrt/util/common.c +++ b/prrt/util/common.c @@ -43,3 +43,17 @@ int pin_thread_to_core(pthread_attr_t *ap, int core) return pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset); } +struct timespec abstime_from_now(uint32_t wait_time) { + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + struct timespec deadline; + uint32_t diff_s = wait_time / 1000000; + uint32_t diff_ns = (wait_time % 1000000) * 1000; + __syscall_slong_t sum = diff_ns + now.tv_nsec; + __syscall_slong_t carry = (sum) / 1000000000; + __syscall_slong_t rest = (sum) % 1000000000; + deadline.tv_sec = diff_s + now.tv_sec + carry; + deadline.tv_nsec = rest; + return deadline; +} diff --git a/prrt/util/common.h b/prrt/util/common.h index 034cd44a89d5b841a84a63235e6afbb2ceb9f22f..1ed95272ffd4cdd5e423157f7d24d6b6d35f0c71 100644 --- a/prrt/util/common.h +++ b/prrt/util/common.h @@ -9,6 +9,7 @@ int print_buffer(const char *buf, const int length); void print_gf(const gf *start, const int len); int pin_thread_to_core(pthread_attr_t *ap, int core); +struct timespec abstime_from_now(uint32_t wait_time); #define PERROR(fmt, args...) \ printf("PRRT ERROR (" __FILE__ ":%d)\n" fmt, __LINE__, ## args); diff --git a/prrt/util/time.c b/prrt/util/time.c index ca191e8145dca9e5a1ff9266b5a6841cb8161378..e647f14a4177dfa7dd4c376be4816355b6244ad3 100644 --- a/prrt/util/time.c +++ b/prrt/util/time.c @@ -15,21 +15,6 @@ struct timespec abstime_now() { return now; } -struct timespec abstime_from_now(uint32_t wait_time) { - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - - struct timespec deadline; - uint32_t diff_s = wait_time / 1000000; - uint32_t diff_ns = (wait_time % 1000000) * 1000; - __syscall_slong_t sum = diff_ns + now.tv_nsec; - __syscall_slong_t carry = (sum) / 1000000000; - __syscall_slong_t rest = (sum) % 1000000000; - deadline.tv_sec = diff_s + now.tv_sec + carry; - deadline.tv_nsec = rest; - return deadline; -} - // < 0: a less than b (b is in the future) // > 0: a greater b (b is in the past) // == 0: a equal b