Commit 51b80eb7 authored by Andreas Schmidt's avatar Andreas Schmidt

Add receive modes ordered & asap with direct return, wait and timedwait.

parent e135910d
Pipeline #1675 passed with stages
in 1 minute and 33 seconds
include "posix/time.pxd"
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.string cimport const_char
......@@ -79,6 +81,8 @@ cdef extern from "proto/packet.h":
ctypedef prrtPacket PrrtPacket
ctypedef uint32_t prrtTimedelta_t;
cdef extern from "proto/receiver.h":
ctypedef struct PrrtReceiver:
const char* host_name
......@@ -121,9 +125,16 @@ cdef extern from "proto/socket.h":
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_recv_sync(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t time_window_us) nogil
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n)
......
......@@ -20,6 +20,34 @@
#include "types/packetTimeout.h"
#include "socket.h"
prrtPacketLength_t receive_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
prrtPacketLength_t len = 0;
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet);
return len;
}
struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
struct timespec deadline;
prrtTimedelta_t diff_s = wait_time / 1000000;
prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
deadline.tv_sec = diff_s + now.tv_sec;
deadline.tv_nsec = diff_ns + now.tv_nsec;
return deadline;
}
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
......@@ -238,100 +266,117 @@ bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire);
}
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
prrtPacketLength_t len = 0;
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet;
packet = PrrtReceiveDataQueue_get_packet(s->receiveDataQueue, 0, MAX_TIMESTAMP);
if(packet != NULL) {
len = receive_packet(s, buf_ptr, packet);
}
return len;
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet;
do {
packet = PrrtReceiveDataQueue_get_packet_wait(s->receiveDataQueue, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(s)) {
return -1;
}
} while (!packet);
return receive_packet(s, buf_ptr, packet);
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
prrtPacketLength_t len = 0;
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
check(s->isSender == false, "Cannot receive on sender socket.")
while (1) {
PrrtPacket *packet;
do {
packet = PrrtReceiveDataQueue_get_packet_sync(s->receiveDataQueue, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(s))
return -1;
} while (!packet);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
debug(DEBUG_SOCKET, "len: %d", (int) len);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet);
return len;
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, 0, MAX_TIMESTAMP, deadline);
if(packet != NULL) {
len = receive_packet(s, buf_ptr, packet);
}
return len;
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_recv_sync(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
return PrrtSocket_receive_asap_wait(s, buf_ptr);
}
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
prrtPacketLength_t len = 0;
check(s->isSender == false, "Cannot receive on sender socket.")
while (1) {
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
packet = PrrtReceiveDataQueue_get_packet_sync(s->receiveDataQueue, now - time_window_us, now + time_window_us);
if (PrrtSocket_closing(s))
return -1;
} while (!packet);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
return len;
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet(s->receiveDataQueue, now - time_window_us, now + time_window_us);
if(packet != NULL) {
len = receive_packet(s, buf_ptr, packet);
}
return len;
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) {
PERROR("Not implemented.%s", "");
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
struct timespec deadline = abstime_from_now(time_window_us);
packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, now - time_window_us, now + time_window_us, &deadline);
if (PrrtSocket_closing(s)) {
return -1;
}
} while (!packet);
return receive_packet(s, buf_ptr, packet);
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
int clk_id = CLOCK_REALTIME;
struct timespec ts, res;
clock_getres(clk_id, &res);
clock_gettime(clk_id, &ts);
ts.tv_sec += floor(wait_time / 1000000);
ts.tv_nsec += (wait_time % 1000000) * 1000;
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while (1) {
/* TODO: use the specified timeout */
PrrtPacket *packet;
do {
packet = PrrtReceiveDataQueue_get_packet(sock_ptr->receiveDataQueue, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(sock_ptr))
return -1;
} while (!packet);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
debug(DEBUG_SOCKET, "len: %d", (int) len);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet);
return len;
}
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
prrtPacketLength_t len = 0;
check(s->isSender == false, "Cannot receive on sender socket.")
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtReceiveDataQueue_get_packet_timedwait(s->receiveDataQueue, now - time_window_us, now + time_window_us, deadline);
if(packet != NULL) {
len = receive_packet(s, buf_ptr, packet);
}
return len;
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int PrrtSocket_interrupt(PrrtSocket *s) {
atomic_store_explicit(&s->closing, true, memory_order_release);
......
......@@ -106,9 +106,14 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_recv_sync(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline);
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline);
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time);
bool PrrtSocket_cleanup(PrrtSocket *s);
......
......@@ -21,22 +21,15 @@ PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() {
return NULL;
}
PrrtPacket * PrrtReceiveDataQueue_get_packet_sync(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket * PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *q, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline) {
PrrtPacket *packet = NULL;
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
prrtTimedelta_t diff_us = stop - start;
struct timespec deadline;
prrtTimedelta_t diff_s = diff_us / 1000000;
prrtTimedelta_t diff_ns = (diff_us % 1000000) * 1000;
deadline.tv_sec = diff_s + now.tv_sec;
deadline.tv_nsec = diff_ns + now.tv_nsec;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
if (!packet) {
pthread_cond_timedwait(&q->wait_for_data, &q->lock, &deadline);
pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
}
......@@ -45,7 +38,28 @@ PrrtPacket * PrrtReceiveDataQueue_get_packet_sync(PrrtReceiveDataQueue *q, prrtT
return packet;
error:
PERROR("PrrtReceiveDataQueue_get_packet_sync failed%s.", "");
PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", "");
return NULL;
}
PrrtPacket * PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtTimestamp_t start,
prrtTimestamp_t stop) {
PrrtPacket *packet = NULL;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
if (!packet) {
pthread_cond_wait(&q->wait_for_data, &q->lock);
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
}
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
return packet;
error:
PERROR("PrrtReceiveDataQueue_get_packet_timedwait failed%s.", "");
return NULL;
}
......
......@@ -14,7 +14,10 @@ PrrtReceiveDataQueue* PrrtReceiveDataQueue_create(void);
PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_sync(PrrtReceiveDataQueue *queue, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_wait(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_timedwait(PrrtReceiveDataQueue *queue, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline);
bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet);
......
......@@ -164,12 +164,12 @@ cdef class PrrtSocket:
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer)
return buffer[:len]
def recv_sync(self, granularity):
def receive_ordered_wait(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef uint32_t time_window_us = granularity * (1000**2)
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_recv_sync(self._c_socket, <void*> buffer, time_window_us)
len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
def connect(self, host, port):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment