Commit 4c234cc4 authored by Andreas Schmidt's avatar Andreas Schmidt

Single UDP socket.

parent 71dfff6b
......@@ -26,7 +26,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
......@@ -45,7 +45,7 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=port)
s.connect(host, port)
for i in range(10):
......
......@@ -93,9 +93,7 @@ cdef extern from "proto/receiver.h":
cdef extern from "proto/socket.h":
cdef struct prrtSocket:
int dataSocketFd
int feedbackSocketFd
pthread_t receiveFeedbackThread
int socketFd
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
......@@ -119,7 +117,7 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(bint isSender, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
......@@ -139,13 +137,15 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
float PrrtSocket_get_plr(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
......
......@@ -8,7 +8,6 @@ add_library(PRRT ../defines.h
../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
......
......@@ -75,7 +75,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port + 1));
targetaddr.sin_port = htons((uint16_t) (remote_port));
struct hostent *hp;
hp = gethostbyname(remote_host);
......@@ -98,7 +98,7 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
free(buf);
......@@ -203,6 +203,32 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
return;
}
void handle_feedback_packet(const PrrtSocket *prrtSocket, const PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop ");
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr ");
return;
error:
PERROR("handle_feedback_packet failed.");
}
void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr,
uint64_t *packet_cyclestamp_ptr) {
......@@ -226,7 +252,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
*received_size = recvmsg(socket_ptr->dataSocketFd, &msg, 0);
*received_size = recvmsg(socket_ptr->socketFd, &msg, 0);
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
switch (cmsg->cmsg_type) {
......@@ -240,7 +266,7 @@ void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528]
}
}
} else {
*received_size = recvfrom(socket_ptr->dataSocketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
*received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
(struct sockaddr *) remote_ptr, remote_len_ptr);
clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
}
......@@ -267,20 +293,16 @@ void *receive_data_loop(void *ptr) {
receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
......@@ -293,8 +315,12 @@ void *receive_data_loop(void *ptr) {
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
kind = ts_redundancy_packet;
sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
kind = ts_feedback_packet;
}
if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp;
XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp);
......@@ -313,14 +339,19 @@ void *receive_data_loop(void *ptr) {
}
send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd);
} else if (packetType == PACKET_TYPE_FEEDBACK) {
handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp);
PrrtPacket_destroy(packet);
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(sock_ptr);
debug(DEBUG_DATARECEIVER, "Cleaned");
}
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
}
......@@ -27,14 +27,14 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
msg.msg_control = control;
msg.msg_controllen = 0;
sendmsg(sock_ptr->dataSocketFd, &msg, 0);
sendmsg(sock_ptr->socketFd, &msg, 0);
*packet_clockstamp = __builtin_ia32_rdtsc();
msg.msg_control = control;
iov.iov_len = MAX_PAYLOAD_LENGTH;
do {
msg.msg_controllen = 1024;
got = recvmsg(sock_ptr->dataSocketFd, &msg, MSG_ERRQUEUE);
got = recvmsg(sock_ptr->socketFd, &msg, MSG_ERRQUEUE);
} while(got < 0 && errno == EAGAIN && check++ < check_max);
check(!(got < 0 && errno == EAGAIN), "Failed to get stamp. Gave up.");
......@@ -58,7 +58,7 @@ bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrt
}
} else {
// TODO: [LATENCY] By knowing the time encoding etc. that happens upfront, one could make an adjustment here.
check(sendto(sock_ptr->dataSocketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
check(sendto(sock_ptr->socketFd, buf, length, 0, recv->ai->ai_addr, recv->ai->ai_addrlen) ==
length, "Sendto failed.");
clock_gettime(CLOCK_REALTIME, packet_timestamp);
*packet_clockstamp = __builtin_ia32_rdtsc();
......@@ -149,9 +149,10 @@ void *send_data_loop(void *ptr) {
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop(sock_ptr);
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
......@@ -175,6 +176,5 @@ void *send_data_loop(void *ptr) {
PrrtBlock_destroy(block);
block = NULL;
}
PrrtSocket_cleanup(sock_ptr);
}
}
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/poll.h>
#include "../../defines.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../clock.h"
#include "../socket.h"
#include "feedbackReceiver.h"
static void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
PrrtPacket *prrtPacket = NULL;
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
struct pollfd fds;
int timeout_msecs = 1000;
fds.fd = prrtSocket->feedbackSocketFd;
fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs);
check(n >= 0, "Select failed.");
if(n == 0) {
return;
}
prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
prrtPacket = calloc(1, sizeof(PrrtPacket));
check_mem(prrtPacket);
PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtPacket, prrtSocket->receiver->rateSample);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited);
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
error:
if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
}
void *receive_feedback_loop(void *ptr)
{
PrrtSocket *sock_ptr = ptr;
while (!atomic_load_explicit(&sock_ptr->closing, memory_order_acquire)) {
handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
}
return NULL;
// error:
// PERROR("Feedback reception failed.%s","");
// return NULL;
}
#ifndef PRRT_FEEDBACK_RECEIVER_H
#define PRRT_FEEDBACK_RECEIVER_H
void * receive_feedback_loop(void *ptr);
#endif //PRRT_FEEDBACK_RECEIVER_H
......@@ -39,9 +39,11 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv->ai = info;
check(pthread_mutex_init(&recv->lock, NULL) == 0, "lock init failed.");
check(pthread_cond_init(&recv->pipeNotFullCv, NULL) == 0, "pipeNotFullCv init failed.");
check(pthread_cond_init(&recv->recordNotFoundCv, NULL) == 0, "recordNotFound init failed.");
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");
return recv;
......@@ -79,8 +81,6 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
check(pthread_cond_destroy(&receiver->pipeNotFullCv) == 0, "pipeNotFullCv destroy failed.");
check(pthread_cond_destroy(&receiver->recordNotFoundCv) == 0, "recordNotFoundCv destroy failed.");
free(receiver);
return true;
......@@ -161,19 +161,17 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return false;
bool result = false;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
while (packet == NULL) {
pthread_cond_wait(&recv->recordNotFoundCv, &recv->lock);
packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
if (packet != NULL) {
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
}
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
bool result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->pipeNotFullCv) == 0, "Signal failed.");
return result;
......@@ -204,15 +202,9 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
check(pthread_cond_broadcast(&recv->recordNotFoundCv) == 0, "Signal failed.");
return;
error:
PERROR("Lock error.%s", "");
}
void PrrtReceiver_interrupt(PrrtReceiver *recv) {
pthread_cond_broadcast(&recv->recordNotFoundCv);
pthread_cond_broadcast(&recv->pipeNotFullCv);
}
......@@ -35,8 +35,6 @@ typedef struct prrtReceiver {
uint16_t port;
struct addrinfo *ai;
PrrtChannelStateInformation *csi;
pthread_cond_t pipeNotFullCv;
pthread_cond_t recordNotFoundCv;
pthread_mutex_t lock;
PrrtInFlightPacketStore *dataPacketStates;
......@@ -55,8 +53,6 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
void PrrtReceiver_on_application_write(PrrtReceiver* receiver);
void PrrtReceiver_interrupt(PrrtReceiver *recv);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H
......@@ -14,7 +14,6 @@
#include "../util/common.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
#include "processes/feedbackReceiver.h"
#include "stores/deliveredPacketTable.h"
#include "types/packetTimeout.h"
#include "socket.h"
......@@ -49,13 +48,12 @@ struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
return deadline;
}
PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay_us) {
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
check_mem(s);
s->isSender = is_sender;
s->isHardwareTimestamping = false;
s->interfaceName = NULL;
......@@ -84,32 +82,22 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
int enabled = 1;
check(s->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
check(s->socketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
check(setsockopt(s->socketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(s->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->feedbackSocketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
if (is_sender) {
s->sendDataQueue = Pipe_create();
} else {
s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
s->repairBlockStore = PrrtRepairBlockStore_create();
s->sendDataQueue = Pipe_create();
s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
s->repairBlockStore = PrrtRepairBlockStore_create();
s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
s->packetDeliveryStore = PrrtPacketDeliveryStore_create();
s->dataReceptionTable = PrrtReceptionTable_create();
s->redundancyReceptionTable = PrrtReceptionTable_create();
}
s->dataReceptionTable = PrrtReceptionTable_create();
s->redundancyReceptionTable = PrrtReceptionTable_create();
return s;
......@@ -148,13 +136,8 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
address->sin_family = AF_INET;
address->sin_addr.s_addr = inet_addr(ipAddress);
address->sin_port = htons((uint16_t) (port + 1));
s->address = address;
check(bind(s->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
"Cannot bind feedback socket.");
address->sin_port = htons((uint16_t) (port));
s->address = address;
if(s->isHardwareTimestamping) {
struct ifreq hwtstamp;
......@@ -170,7 +153,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
hwconfig.tx_type = HWTSTAMP_TX_ON;
hwconfig.rx_filter = HWTSTAMP_FILTER_ALL;
hwconfig_requested = hwconfig;
if(ioctl(s->dataSocketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
if(ioctl(s->socketFd, SIOCSHWTSTAMP, &hwtstamp) < 0) {
check(errno != EPERM, "Insufficient permissions. Run in privileged mode.")
if((errno == EINVAL || errno == ENOTSUP) && hwconfig_requested.tx_type == HWTSTAMP_TX_OFF &&
......@@ -184,52 +167,37 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
}
}
check(bind(s->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
check(bind(s->socketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
"Cannot bind data socket.");
if(s->isHardwareTimestamping) {
if(!s->isSender) {
int enabled = 1;
check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
socklen_t val, len;
len = sizeof(val);
check(getsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
} else {
int enabled = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
check(setsockopt(s->dataSocketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPING");
}
int enabled = 1;
check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &enabled, sizeof(enabled)) >= 0, "Could not set SO_TIMESTAMPNS");
socklen_t val, len;
len = sizeof(val);
check(getsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPNS, &val, &len) >= 0, "%s: %s\n", "Getsockopt SO_TIMESTAMPNS", strerror(errno));
int enabled2 = SOF_TIMESTAMPING_TX_HARDWARE | SOF_TIMESTAMPING_SYS_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE;
check(setsockopt(s->socketFd, SOL_SOCKET, SO_TIMESTAMPING, &enabled2, sizeof(enabled2)) >= 0, "Could not set SO_TIMESTAMPING");
}
if (s->isSender) {
s->receiveFeedbackThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveFeedbackThreadAttr);
s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->sendDataThreadAttr);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
pin_thread_to_core(s->sendDataThreadAttr, 2);
}
s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->sendDataThreadAttr);
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
receive_feedback_loop,
(void *) s) ==
EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
(void *) s) ==
EXIT_SUCCESS,
"Cannot create send thread.");
} else {
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveDataThreadAttr, 3);
}
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
(void *) s) ==
EXIT_SUCCESS,
"Cannot create data receiving thread.");
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveDataThreadAttr, 2);
pin_thread_to_core(s->sendDataThreadAttr, 1);
}
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
(void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
(void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");
s->isBound = true;
return true;
......@@ -247,7 +215,6 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
}
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
check(s->isSender, "Cannot send on receiver socket.")
XlapTimestampPlaceholder tsph;
XlapTimestampPlaceholderInitialize(&tsph);
XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
......@@ -265,9 +232,6 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
return 0;
error:
PERROR("There was a failure while sending from socket.%s", "");
return -1;
}
bool PrrtSocket_closing(PrrtSocket *s) {
......@@ -275,20 +239,12 @@ bool PrrtSocket_closing(PrrtSocket *s) {
}
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet;
packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
return deliver_packet(s, buf_ptr, packet);
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet;
do {
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
......@@ -298,24 +254,15 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
} while (!packet);
return deliver_packet(s, buf_ptr, packet);
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
check(s->isSender == false, "Cannot receive on sender socket.")
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
if (packet == NULL && errno == ETIMEDOUT) {
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet);
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
......@@ -323,20 +270,13 @@ int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
}
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
check(s->isSender == false, "Cannot receive on sender socket.")