Commit be7387d0 authored by Andreas Schmidt's avatar Andreas Schmidt

Use right paces. Expose paces.

parent c4f0c211
Pipeline #2835 passed with stages
in 1 minute and 41 seconds
......@@ -101,7 +101,7 @@ static bool send_feedback(PrrtSocket *sock_ptr,
PrrtLossStatistics stats = sock_ptr->lossStatistics;
int group_RTT = 0; // TODO: To be determined.
uint32_t local_bottleneck_pace = 0; // MAX(PrrtPace_get_eff(sock_ptr->appDeliverPace), PrrtPace_get_eff(sock_ptr->prrtReceivePace));
uint32_t local_bottleneck_pace = MAX(PrrtPace_get_effective(sock_ptr->appDeliverPace), PrrtPace_get_effective(sock_ptr->prrtReceivePace));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime,
......
......@@ -112,14 +112,13 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
}
// Cross-Pace iff PROBE_BW and unity gain
if(sock_ptr->recv_peer_btl_pace != 0 /*&& state == PROBE_BW*/) {
peerPacingTime = sock_ptr->recv_peer_btl_pace;
/*double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain;
if(sock_ptr->recv_peer_btl_pace != 0 && state == PROBE_BW) {
double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain);
if (pt > (TIMESTAMP_SPACE-1)) {
peerPacingTime = TIMESTAMP_SPACE-1;
} else {
peerPacingTime = (prrtTimedelta_t) pt;
}*/
}
}
prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
......@@ -127,7 +126,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
sock_ptr->nextSendTime = now + pacingTime;
}
// Update timestamp
prrtTimedelta_t btl_pace = 0; //MAX(MAX(PrrtPace_get_max(sock_ptr->prrtTransmitPace), PrrtPace_get_max(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
prrtTimedelta_t btl_pace = MAX(MAX(PrrtPace_get_effective(sock_ptr->prrtTransmitPace), PrrtPace_get_effective(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
......
......@@ -60,7 +60,7 @@ bool PrrtSocket_pace(PrrtSocket *s, bool prepace) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
diff = PrrtTimestamp_cmp(s->nextSendTime, now);
if (!prepace) { // post-pacing removes appSendPace
// TODO: diff -= PrrtPace_get_diff(s->appSendPace);
diff -= PrrtPace_get_external(s->appSendPace);
diff = MAX(0, diff);
}
if (diff > 0) {
......@@ -280,12 +280,14 @@ int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool s
XlapTimestampPlaceholderUse(s, ts_data_packet, packet->sequenceNumber, &tsph);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSubmitPackage);
PrrtPace_track_pause(s->appSendPace);
if (sync) {
PrrtDataTransmitter_transmit(s, packet);
PrrtSocket_pace(s, false);
} else {
Pipe_push(s->sendDataQueue, &packet->asListNode);
}
PrrtPace_track_resume(s->appSendPace);
PrrtReceiver_on_application_write(s->receiver, Pipe_get_size(s->sendDataQueue), s->sequenceNumberSource);
......@@ -321,7 +323,7 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockad
PrrtPacket *packet;
do {
struct timespec deadline = abstime_from_now(100);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, &deadline);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, &deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
errno = 0;
}
......@@ -337,7 +339,8 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockad
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP,
deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
PrrtPace_track_end(s->appDeliverPace);
return -1 * ETIMEDOUT;
......@@ -368,7 +371,8 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
struct timespec deadline = abstime_from_now(time_window_us);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, &deadline);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
&deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
errno = 0;
}
......@@ -386,7 +390,8 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struc
PrrtPace_track_start(s->appDeliverPace);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, deadline);
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
PrrtPace_track_end(s->appDeliverPace);
return -1 * ETIMEDOUT;
......@@ -551,6 +556,30 @@ int PrrtSocket_close(PrrtSocket *s) {
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
if (strcmp(name, "targetdelay") == 0) {
return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
} else if(strcmp(name, "appSend_pace_internal") == 0) {
return PrrtPace_get_internal(s->appSendPace);
} else if(strcmp(name, "appSend_pace_dependent") == 0) {
return PrrtPace_get_dependent(s->appSendPace);
} else if(strcmp(name, "appSend_pace_external") == 0) {
return PrrtPace_get_external(s->appSendPace);
} else if(strcmp(name, "prrtTransmit_pace_internal") == 0) {
return PrrtPace_get_internal(s->prrtTransmitPace);
} else if(strcmp(name, "prrtTransmit_pace_dependent") == 0) {
return PrrtPace_get_dependent(s->prrtTransmitPace);
} else if(strcmp(name, "prrtTransmit_pace_external") == 0) {
return PrrtPace_get_external(s->prrtTransmitPace);
} else if(strcmp(name, "prrtReceive_pace_internal") == 0) {
return PrrtPace_get_internal(s->prrtReceivePace);
} else if(strcmp(name, "prrtReceive_pace_dependent") == 0) {
return PrrtPace_get_dependent(s->prrtReceivePace);
} else if(strcmp(name, "prrtReceive_pace_external") == 0) {
return PrrtPace_get_external(s->prrtReceivePace);
} else if(strcmp(name, "appDeliver_pace_internal") == 0) {
return PrrtPace_get_internal(s->appDeliverPace);
} else if(strcmp(name, "appDeliver_pace_dependent") == 0) {
return PrrtPace_get_dependent(s->appDeliverPace);
} else if(strcmp(name, "appDeliver_pace_external") == 0) {
return PrrtPace_get_external(s->appDeliverPace);
} else if (strcmp(name, "send_peer_btl_pace") == 0) {
return s->send_peer_btl_pace;
} else if (strcmp(name, "recv_peer_btl_pace") == 0) {
......
......@@ -62,7 +62,7 @@ prrtTimedelta_t PrrtPace_get_total(PrrtPace* pace) {
}
prrtTimedelta_t PrrtPace_get_effective(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->externalPace) + PrrtPaceFilter_get(pace->internalPace) - PrrtPaceFilter_get(pace->dependentPace);
return (prrtTimedelta_t) MAX(0, (uint64_t) PrrtPaceFilter_get(pace->externalPace) + PrrtPaceFilter_get(pace->internalPace) - PrrtPaceFilter_get(pace->dependentPace));
}
void PrrtPace_track_start(PrrtPace* pace) {
......@@ -73,7 +73,8 @@ void PrrtPace_track_start(PrrtPace* pace) {
long long int externalDelta_ns = timedelta(&now, &pace->lastEndTimestamp);
if (pace->firstRoundDone) {
PrrtPaceFilter_update(pace->internalPace, (prrtTimedelta_t) MAX(0, round(((double) internalDelta_ns) / 1000)));
// make sure internal >= dependent
PrrtPaceFilter_update(pace->internalPace, (prrtTimedelta_t) MAX(0, round(((double) MAX(internalDelta_ns, pace->totalPauseDuration_ns)) / 1000)));
PrrtPaceFilter_update(pace->externalPace, (prrtTimedelta_t) MAX(0, round(((double) externalDelta_ns) / 1000)));
PrrtPaceFilter_update(pace->dependentPace, (prrtTimedelta_t) MAX(0, round(((double) pace->totalPauseDuration_ns) / 1000)));
}
......
......@@ -6,6 +6,7 @@
#include "../../util/dbg.h"
#include "../../util/time.h"
#include "packetDeliveryStore.h"
#include "pace.h"
PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
PrrtPacketDeliveryStore *store = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
......@@ -59,8 +60,9 @@ bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *store) {
return true;
}
PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline) {
PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline,
PrrtPace *pace) {
PrrtPacket *packet = NULL;
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
......@@ -70,7 +72,9 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
store->last_start = start;
store->last_stop = stop;
store->in_ordered_wait = true;
PrrtPace_track_pause(pace);
int res = pthread_cond_timedwait(&store->wait_for_data, &store->lock, deadline);
PrrtPace_track_resume(pace);
debug(DEBUG_RECEIVER, "After wait: %d", res);
store->in_ordered_wait = false;
packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
......@@ -93,28 +97,6 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
return NULL;
}
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *store) {
PrrtPacket *packet = NULL;
prrtTimestamp_t start = 0;
prrtTimestamp_t stop = MAX_TIMESTAMP;
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
if (!packet && !atomic_load_explicit(&store->closing, memory_order_acquire)) {
check(pthread_cond_wait(&store->wait_for_data, &store->lock), "Wait failed.");
packet = PrrtPacketDeliveryStore_get_packet(store, start, stop);
}
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return packet;
error:
PERROR("PrrtPacketDeliveryStore_get_packet_timedwait failed%s.", "");
return NULL;
}
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *store, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket *packet = NULL;
......
......@@ -2,6 +2,7 @@
#define PRRT_RECEIVEDATAQUEUE_H
#include <sys/types.h>
#include "pace.h"
#include "../../util/bptree.h"
typedef struct prrtReceiveDataQueue {
......@@ -21,7 +22,8 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *store, p
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *store);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *store, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline);
prrtTimestamp_t stop, const struct timespec *deadline,
PrrtPace *pace);
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *store, PrrtPacket *packet);
......
......@@ -146,6 +146,27 @@ class PrrtCodingConfiguration:
def __str__(self):
return "({},{},{})".format(self.n, self.k, self.n_cycle)
class PrrtPace:
def __init__(self, internal, dependent, external):
self.internal = internal
self.dependent = dependent
self.external = external
@property
def total(self):
return self.internal + self.external
@property
def effective(self):
return self.internal + self.external - self.dependent
class PrrtPaces:
def __init__(self, appSend, prrtTransmit, prrtReceive, appDeliver):
self.appSend = appSend
self.prrtTransmit = prrtTransmit
self.prrtReceive = prrtReceive
self.appDeliver = appDeliver
cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
_epoch = datetime.datetime.utcfromtimestamp(0)
......@@ -186,6 +207,26 @@ cdef class PrrtSocket:
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
# Pacing
property paces:
def __get__(self):
appSend = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_internal") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_dependent") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_external") * 0.000001)
prrtTransmit = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_internal") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_dependent") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_external") * 0.000001)
prrtReceive = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_internal") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_dependent") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_external") * 0.000001)
appDeliver = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_internal") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_dependent") * 0.000001,
cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_external") * 0.000001)
return PrrtPaces(appSend, prrtTransmit, prrtReceive, appDeliver)
property nw_pace:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "nw_pace") * 0.000001
......@@ -378,9 +419,6 @@ cdef class PrrtSocket:
def __get__(self):
return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)
# Paces
def __dealloc__(self):
if self._c_socket != NULL:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment