Commit 83bc59c4 authored by Andreas Schmidt's avatar Andreas Schmidt

Add NSDI changes.

parent db7bcf84
Pipeline #2615 failed with stages
in 1 minute and 11 seconds
......@@ -123,7 +123,8 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
......
......@@ -30,8 +30,8 @@
#define GF_BITS 8
#define K_START 4
#define N_START 7
#define K_START 1
#define N_START 1
#define N_P_START 1
#define RRT_ALPHA 0.125
......
......@@ -12,6 +12,8 @@ set (PRRT_SOURCES ../defines.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
......
......@@ -86,11 +86,13 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS
PrrtLossStatistics stats = sock_ptr->lossStatistics;
int group_RTT = 0; // TODO: To be determined.
uint32_t local_bottleneck_pace = MAX(PrrtPace_get_diff(sock_ptr->appDeliverPace), PrrtPace_get_max(sock_ptr->prrtReceivePace));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime,
stats.erasureCount, stats.packetCount, feedback.seqNo,
feedback.type);
stats.erasureCount, stats.packetCount,
feedback.seqNo, feedback.type,
local_bottleneck_pace);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
......@@ -142,6 +144,7 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacket_destroy(packet);
} else {
PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw);
sock_ptr->send_peer_btl_pace = payload->btl_pace;
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
......@@ -187,6 +190,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
} else {
PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
redundancyPayload->baseSequenceNumber);
socket->send_peer_btl_pace = payload->btl_pace;
if (block == NULL) {
PrrtCodingConfiguration* codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
redundancyPayload->n, 0, NULL);
......@@ -214,6 +218,7 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
prrtSocket->recv_peer_btl_pace = feedbackPayload->btl_pace;
PrrtReceiver_on_ack(prrtSocket->receiver, feedbackPayload, receiveTime, rtt);
return;
......@@ -274,6 +279,7 @@ void *receive_data_loop(void *ptr) {
PrrtSocket *s = ptr;
while (1) {
PrrtPace_track_start(s->prrtReceivePace);
debug(DEBUG_DATARECEIVER, "About to receive.");
XlapTimestampPlaceholder tsph1;
XlapTimestampPlaceholder tsph2;
......@@ -284,6 +290,9 @@ void *receive_data_loop(void *ptr) {
struct timespec packet_recv_timestamp;
uint64_t packet_recv_cyclestamp = 0;
PrrtPace_track_pause(s->prrtReceivePace);
receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
PrrtPace_track_resume(s->prrtReceivePace);
receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
if (atomic_load_explicit(&s->closing, memory_order_acquire)) {
break;
......@@ -350,6 +359,7 @@ void *receive_data_loop(void *ptr) {
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(s);
debug(DEBUG_DATARECEIVER, "Cleaned");
PrrtPace_track_end(s->prrtReceivePace);
}
PrrtSocket_cleanup(s);
return NULL;
......
......@@ -10,7 +10,7 @@
#include "dataTransmitter.h"
#include <math.h>
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
bool send_to_socket(PrrtSocket* sock_ptr, PrrtReceiver *recv, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
if(sock_ptr->isHardwareTimestamping) {
struct msghdr msg;
struct iovec iov;
......@@ -77,8 +77,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
prrtPacketLength_t length = PrrtPacket_size(packet);
prrtPacketLength_t payloadLength = packet->payloadLength;
PrrtSocket_pace(sock_ptr);
PrrtSocket_pace(sock_ptr, true);
int64_t space = PrrtReceiver_get_space(sock_ptr->receiver);
while (space < sock_ptr->maximum_payload_size) {
......@@ -89,19 +88,32 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) {
double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver);
prrtTimedelta_t peerPacingTime = 0;
prrtTimedelta_t channelPacingTime = 0;
double pacing_rate = PrrtReceiver_get_BBR_pacingRate(sock_ptr->receiver);
double pacing_gain = PrrtReceiver_get_BBR_pacingGain(sock_ptr->receiver);
uint32_t state = PrrtReceiver_get_BBR_state(sock_ptr->receiver);
if(pacing_rate != 0) {
prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate));
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
}
// Cross-Pace iff PROBE_BW and unity gain
if(state == PROBE_BW && pacing_gain == 1.0 && sock_ptr->recv_peer_btl_pace != 0) {
peerPacingTime = 0; //(prrtTimedelta_t) round(sock_ptr->recv_peer_btl_pace);
}
prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(MAX(channelPacingTime, peerPacingTime));
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
}
// Update timestamp
prrtTimedelta_t btl_pace = MAX(PrrtPace_get_max(sock_ptr->prrtTransmitPace), PrrtPace_get_max(sock_ptr->appSendPace));
if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
((PrrtPacketDataPayload*) (packet->payload))->btl_pace = btl_pace;
} else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketRedundancyPayload*) (packet->payload))->btl_pace = btl_pace;
}
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
......@@ -143,7 +155,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
case PACKET_TYPE_CHANNEL_FEEDBACK:
default:;
}
return true;
error:
......@@ -151,57 +162,63 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false;
}
void *send_data_loop(void *ptr) {
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPace_track_start(sock_ptr->prrtTransmitPace);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (sock_ptr->receiveBlock == NULL) {
sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
}
packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(sock_ptr->receiveBlock->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(sock_ptr->receiveBlock);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
}
PrrtPace_track_end(sock_ptr->prrtTransmitPace);
}
void *PrrtDataTransmitter_send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
while (1) {
ListNode *job;
do {
job = Pipe_pull(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) {
PrrtBlock_destroy(block);
if (sock_ptr->receiveBlock != NULL) {
PrrtBlock_destroy(sock_ptr->receiveBlock);
sock_ptr->receiveBlock = NULL;
}
return NULL;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (block == NULL) {
block = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
}
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(block);
block = NULL;
}
PrrtDataTransmitter_transmit(sock_ptr, packet);
}
}
#ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr);
#include "../socket.h"
void * PrrtDataTransmitter_send_data_loop(void *ptr);
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet);
#endif //PRRT_DATA_TRANSMITTER_H
......@@ -188,8 +188,6 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
return false;
}
void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t seqNum, prrtPacketType_t packetType) {
prrtByteCount_t lostBytes = 0;
PrrtInFlightPacketStore *inflightPacketStore = NULL;
......@@ -273,7 +271,7 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * fee
PERROR("Mutex error.%s", "");
}
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) {
double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingRate(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
......@@ -284,6 +282,28 @@ double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) {
return 0.0;
}
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
uint32_t res = BBR_getState(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0;
}
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingGain(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0.0;
}
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
prrtByteCount_t cwnd = BBR_getCwnd(receiver->bbr);
......
......@@ -37,10 +37,13 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPack
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i, prrtSequenceNumber_t i1);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt);
void PrrtReceiver_wait_for_space(PrrtReceiver *receiver);
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver);
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
......
......@@ -45,6 +45,28 @@ static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffe
return len;
}
void PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace) {
PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
if (sock_ptr->pacingEnabled && sock_ptr->nextSendTime != 0) {
debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
prrtTimeDifference_t diff = 0;
do {
prrtTimeDifference_t now = (prrtTimeDifference_t) PrrtClock_get_current_time_us();
diff = ((prrtTimeDifference_t) sock_ptr->nextSendTime) - now;
if (!prepace) { // post-pacing removes appSendPace
diff -= PrrtPace_get_eff(sock_ptr->appSendPace);
diff = MAX(0, diff);
}
if (diff > 0) {
usleep_nano((uint32_t) diff);
}
} while (diff > 0);
} else {
usleep_nano(1);
}
PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
}
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
......@@ -101,6 +123,12 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
s->dataReceptionTable = PrrtReceptionTable_create();
s->redundancyReceptionTable = PrrtReceptionTable_create();
// Pacing
s->appSendPace = PrrtPace_create();
s->prrtTransmitPace = PrrtPace_create();
s->prrtReceivePace = PrrtPace_create();
s->appDeliverPace = PrrtPace_create();
return s;
error:
......@@ -196,7 +224,7 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
pin_thread_to_core(s->sendDataThreadAttr, 1);
}
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, send_data_loop,
check(pthread_create(&s->sendDataThread, s->sendDataThreadAttr, PrrtDataTransmitter_send_data_loop,
(void *) s) == EXIT_SUCCESS, "Cannot create send thread.");
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
......@@ -218,7 +246,7 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
return 0;
}
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync) {
if (data_len > s->maximum_payload_size) {
PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
return -1;
......@@ -228,7 +256,7 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
PERROR("PrrtSocket_connect() must be called before PrrtSocket_send().\n");
return -1;
}
PrrtPace_track_start(s->appSendPace);
XlapTimestampPlaceholder tsph;
XlapTimestampPlaceholderInitialize(&tsph);
XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
......@@ -240,41 +268,43 @@ int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
Pipe_push(s->sendDataQueue, &packet->asListNode);
if (sync) {
PrrtDataTransmitter_transmit(s, packet);
PrrtSocket_pace(s, false);
} else {
Pipe_push(s->sendDataQueue, &packet->asListNode);
}
PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
PrrtPace_track_end(s->appSendPace);
return 0;
}
bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, true);
}
void PrrtSocket_pace(PrrtSocket *s) {
if (s->pacingEnabled && s->nextSendTime != 0) {
debug(DEBUG_DATATRANSMITTER, "About to check for pacing.");
prrtTimeDifference_t diff = 0;
do {
prrtTimeDifference_t now = (prrtTimeDifference_t) PrrtClock_get_current_time_us();
diff = ((prrtTimeDifference_t) s->nextSendTime) - now;
if (diff > 0) {
usleep_nano((uint32_t) diff);
}
} while (diff > 0);
} else {
usleep_nano(1);
}
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, false);
}
bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire);
}
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet;
packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet;
do {
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore);
......@@ -282,16 +312,22 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockad
return -1;
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
if (packet == NULL && errno == ETIMEDOUT) {
PrrtPace_track_end(s->appDeliverPace);
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
......@@ -299,13 +335,17 @@ int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
}
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
PrrtPace_track_start(s->appDeliverPace);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now,
now + time_window_us);
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
......@@ -315,21 +355,26 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc
return -1;
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
PrrtPace_track_start(s->appDeliverPace);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, deadline);
if (packet == NULL && errno == ETIMEDOUT) {
PrrtPace_track_end(s->appDeliverPace);
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet, addr);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
}
int PrrtSocket_interrupt(PrrtSocket *s) {
atomic_store_explicit(&s->closing, true, memory_order_release);
......@@ -429,6 +474,22 @@ int PrrtSocket_close(PrrtSocket *s) {
check(PrrtCodingConfiguration_destroy(s->codingParameters), "Could not destroy coding parameters.")
}
if(s->appSendPace != NULL) {
check(PrrtPace_destroy(s->appSendPace), "Could not destroy appSendPace");
}
if(s->prrtTransmitPace != NULL) {
check(PrrtPace_destroy(s->prrtTransmitPace), "Could not destroy prrtTransmitPace");
}
if(s->prrtReceivePace != NULL) {
check(PrrtPace_destroy(s->prrtReceivePace), "Could not destroy prrtReceivePace");
}
if(s->appDeliverPace != NULL) {
check(PrrtPace_destroy(s->appDeliverPace), "Could not destroy appDeliverPace");
}
if(s->coder) {
check(PrrtCoder_destroy(s->coder), "Could not destroy coder.")
}
......@@ -453,6 +514,26 @@ int PrrtSocket_close(PrrtSocket *s) {
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
if (strcmp(name, "targetdelay") == 0) {
return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
} else if (strcmp(name, "app_send_max_pace") == 0) {
return PrrtPace_get_max(s->appSendPace);
} else if (strcmp(name, "app_send_eff_pace") == 0) {
return PrrtPace_get_eff(s->appSendPace);
} else if (strcmp(name, "prrt_transmit_max_pace") == 0) {
return PrrtPace_get_max(s->prrtTransmitPace);
} else if (strcmp(name, "prrt_transmit_eff_pace") == 0) {
return PrrtPace_get_eff(s->prrtTransmitPace);
} else if (strcmp(name, "prrt_receive_max_pace") == 0) {
return PrrtPace_get_max(s->prrtReceivePace);
} else if (strcmp(name, "prrt_receive_eff_pace") == 0) {
return PrrtPace_get_eff(s->prrtReceivePace);
} else if (strcmp(name, "app_deliver_max_pace") == 0) {
return PrrtPace_get_max(s->appDeliverPace);
} else if (strcmp(name, "app_deliver_eff_pace") == 0) {
return PrrtPace_get_eff(s->appDeliverPace);
} else if (strcmp(name, "send_peer_btl_pace") == 0) {
return s->send_peer_btl_pace;
} else if (strcmp(name, "recv_peer_btl_pace") == 0) {
return s->recv_peer_btl_pace;
} else if (strcmp(name, "connected") == 0) {
return (s->receiver != NULL) ? 1 : 0;
} else if (strcmp(name, "maximum_payload_size") == 0) {
......
#ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H
#include "../defines.h"
#include "../util/list.h"
#include "../util/pipe.h"
......@@ -8,9 +9,11 @@
#include "../util/bptree.h"
#include "channelStateInformation.h"
#include "applicationConstraints.h"
#include "stores/pace.h"
#include "stores/dataPacketStore.h"
#include "stores/deliveredPacketTable.h"
#include "stores/packetTimeoutTable.h"
#include "stores/paceFilter.h"
#include "stores/receptionTable.h"
#include "stores/repairBlockStore.h"
#include "stores/packetDeliveryStore.h"
......@@ -47,6 +50,8 @@ typedef struct prrtSocket {
PrrtDataPacketStore *dataPacketStore;
PrrtRepairBlockStore *repairBlockStore;
PrrtBlock* receiveBlock;
PrrtDeliveredPacketTable *deliveredPacketTable;
PrrtReceiver* receiver;
......@@ -74,6 +79,15 @@ typedef struct prrtSocket {
PrrtCodingConfiguration *codingParameters;
PrrtCoder *coder;
// Pacing
PrrtPace* appSendPace;
PrrtPace* prrtTransmitPace;
PrrtPace* prrtReceivePace;
PrrtPace* appDeliverPace;
prrtTimedelta_t send_peer_btl_pace;
prrtTimedelta_t recv_peer_btl_pace;
_Atomic (XlapTimestampTable *) tstable[2];
pthread_attr_t *sendDataThreadAttr;
......@@ -113,7 +127,9 @@ int PrrtSocket_close(PrrtSocket *s);
int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
void PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
......@@ -125,8 +141,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
void PrrtSocket_pace(PrrtSocket *s);
bool PrrtSocket_cleanup(PrrtSocket *s);
bool PrrtSocket_closing(PrrtSocket *s);
......
#include "pace.h"
#include "../clock.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../../util/time.h"
#include <math.h>
PrrtPace* PrrtPace_create() {
PrrtPace* pace = (PrrtPace*) calloc(1, sizeof(PrrtPace));
check_mem(pace);
pace->effectivePace = PrrtPaceFilter_create(500 * 1000);
pace->maximumPace = PrrtPaceFilter_create(500 * 1000);
pace->totalPauseDuration = 0;
pace->init = false;
pace->first = false;
clock_gettime(CLOCK_REALTIME, &pace->lastStartTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastEndTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
return pace;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
bool PrrtPace_destroy(PrrtPace* pace) {
if(pace->maximumPace != NULL) {
check(PrrtPaceFilter_destroy(pace->maximumPace), "Cannot destroy maximumPace.")
}
if(pace->effectivePace != NULL) {
check(PrrtPaceFilter_destroy(pace->effectivePace), "Cannot destroy effectivePace.")
}
free(pace);
return true;
error:
return false;
}
prrtTimedelta_t PrrtPace_get_eff(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->effectivePace);
}
prrtTimedelta_t PrrtPace_get_max(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->maximumPace);
}
prrtTimedelta_t PrrtPace_get_diff(PrrtPace* pace) {
int64_t paceMax = PrrtPaceFilter_get(pace->maximumPace);
int64_t paceEff = PrrtPaceFilter_get(pace->effectivePace);
uint32_t paceDiff = (uint32_t) MAX(0, paceEff - paceMax);
return paceDiff;
}
void PrrtPace_track_start(PrrtPace* pace) {