Commit 02a43b57 authored by Andreas Schmidt's avatar Andreas Schmidt

Add sync/async receive modes.

parent db7bcf84
Pipeline #2616 failed with stages
in 19 seconds
...@@ -123,7 +123,9 @@ cdef extern from "proto/socket.h": ...@@ -123,7 +123,9 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port) bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr) int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port) int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
#include "dataTransmitter.h" #include "dataTransmitter.h"
#include <math.h> #include <math.h>
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) { bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
if(sock_ptr->isHardwareTimestamping) { if(sock_ptr->isHardwareTimestamping) {
struct msghdr msg; struct msghdr msg;
struct iovec iov; struct iovec iov;
...@@ -151,57 +151,61 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -151,57 +151,61 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false; return false;
} }
void *send_data_loop(void *ptr) { void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (sock_ptr->receiveBlock == NULL) {
sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
}
packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(sock_ptr->receiveBlock->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(sock_ptr->receiveBlock);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
}
}
void *PrrtDataTransmitter_send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
while (1) { while (1) {
ListNode *job; ListNode *job;
do { do {
job = Pipe_pull(sock_ptr->sendDataQueue); job = Pipe_pull(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) { if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) { if (sock_ptr->receiveBlock != NULL) {
PrrtBlock_destroy(block); PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
} }
return NULL; return NULL;
} }
} while (!job); } while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job); PrrtPacket *packet = PrrtPacket_byListNode(job);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart); PrrtDataTransmitter_transmit(sock_ptr, packet);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (block == NULL) {
block = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
}
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(block);
block = NULL;
}
} }
} }
#ifndef PRRT_DATA_TRANSMITTER_H #ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H #define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr); #include "../socket.h"
void * PrrtDataTransmitter_send_data_loop(void *ptr);
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet);
#endif //PRRT_DATA_TRANSMITTER_H #endif //PRRT_DATA_TRANSMITTER_H
...@@ -196,7 +196,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) ...@@ -196,7 +196,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
pin_thread_to_core(s->sendDataThreadAttr, 1); pin_thread_to_core(s->sendDataThreadAttr, 1);
} }
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop, check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, PrrtDataTransmitter_send_data_loop,
(void *) s) == EXIT_SUCCESS, "Cannot create send thread."); (void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop, check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
...@@ -218,13 +218,13 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) { ...@@ -218,13 +218,13 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
return 0; return 0;
} }
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) { int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync) {
if (data_len > s->maximum_payload_size) { if (data_len > s->maximum_payload_size) {
PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size); PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
return -1; return -1;
} }
if(s->receiver == NULL) { if (s->receiver == NULL) {
PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n"); PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n");
return -1; return -1;
} }
...@@ -240,7 +240,13 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) { ...@@ -240,7 +240,13 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph); XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage); XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Pipe_push(s->sendDataQueue, &packet->asListNode); if (sync) {
PrrtDataTransmitter_transmit(s, packet);
PrrtSocket_pace(s);
} else {
Pipe_push(s->sendDataQueue, &packet->asListNode);
}
PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource); PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd); XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
...@@ -248,6 +254,14 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) { ...@@ -248,6 +254,14 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return 0; return 0;
} }
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, true);
}
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, false);
}
bool PrrtSocket_closing(PrrtSocket *s) { bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire); return atomic_load_explicit(&s->closing, memory_order_acquire);
} }
......
...@@ -36,6 +36,7 @@ typedef struct prrtSocket { ...@@ -36,6 +36,7 @@ typedef struct prrtSocket {
bool pacingEnabled; bool pacingEnabled;
PrrtClock clock; PrrtClock clock;
PrrtBlock* receiveBlock;
pthread_t sendDataThread; pthread_t sendDataThread;
Pipe *sendDataQueue; Pipe *sendDataQueue;
...@@ -113,7 +114,10 @@ int PrrtSocket_close(PrrtSocket *s); ...@@ -113,7 +114,10 @@ int PrrtSocket_close(PrrtSocket *s);
int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port); int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len); int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
void PrrtSocket_pace(PrrtSocket *s);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr); int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
...@@ -125,8 +129,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr ...@@ -125,8 +129,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us); int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline); int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
void PrrtSocket_pace(PrrtSocket *s);
bool PrrtSocket_cleanup(PrrtSocket *s); bool PrrtSocket_cleanup(PrrtSocket *s);
bool PrrtSocket_closing(PrrtSocket *s); bool PrrtSocket_closing(PrrtSocket *s);
......
...@@ -211,10 +211,19 @@ cdef class PrrtSocket: ...@@ -211,10 +211,19 @@ cdef class PrrtSocket:
maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size") maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
data_len = len(data) data_len = len(data)
if len(data) <= maximum_payload_size: if len(data) <= maximum_payload_size:
cprrt.PrrtSocket_send(self._c_socket, data, data_len) cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
else: else:
raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size)) raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
def send_sync(self, data):
if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
raise Exception("PrrtSocket must be connected first, before data can be sent.")
maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
data_len = len(data)
if len(data) <= maximum_payload_size:
cprrt.PrrtSocket_send_sync(self._c_socket, data, data_len)
else:
raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
# Receiving # Receiving
def recv(self): def recv(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment