Commit f17b8b5f authored by Andreas Schmidt's avatar Andreas Schmidt

Add PrrtSocket_timedrecv().

parent 7b6deaa3
Pipeline #953 passed with stages
in 57 seconds
......@@ -117,6 +117,7 @@ cdef extern from "proto/socket.h":
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
#include <arpa/inet.h>
#include <math.h>
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
......@@ -214,6 +215,53 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr)
return -1;
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time)
int clk_id = CLOCK_REALTIME;
struct timespec ts, res;
clock_getres(clk_id, &res);
clock_gettime(clk_id, &ts);
ts.tv_sec += floor(wait_time / 1000000);
ts.tv_nsec += (wait_time % 1000000) * 1000;
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while(1) {
if (pthread_mutex_timedlock(&sock_ptr->inQueueFilledMutex, &ts) != 0) { return -1; }
while(List_count(sock_ptr->inQueue) == 0) {
debug(DEBUG_SOCKET, "poll");
if(atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
return -1;
if (pthread_cond_timedwait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex, &ts) != 0) {
return -1;
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputStart);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
debug(DEBUG_SOCKET, "len: %d", (int) len);
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
PrrtTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
return len;
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
int PrrtSocket_interrupt(PrrtSocket *sock_ptr)
atomic_store_explicit(&sock_ptr->closing, true, memory_order_release);
......@@ -83,5 +83,6 @@ int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time);
#endif // PRRT_SOCKET_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment