Commit a979f6c3 authored by Andreas Schmidt's avatar Andreas Schmidt

~= Aachen2018 changes

parent 3514decf
Pipeline #3928 failed with stages
in 5 seconds
[submodule "prrt/xlap"]
path = prrt/xlap
url = ../../as/X-Lap.git
url = ../X-Lap.git
......@@ -38,7 +38,7 @@ local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k")
local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE)
local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT")
local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT")
local pf_fb_btl_pace = ProtoField.uint32("prrt.feedback.btl_pace", "Bottleneck Pace")
local pf_fb_btlPace = ProtoField.uint32("prrt.feedback.btlPace", "Bottleneck pace")
local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count")
local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count")
local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length")
......@@ -73,7 +73,7 @@ prrt_proto.fields = {
pf_fb,
pf_fb_groupRTT,
pf_fb_ftt,
pf_fb_btl_pace,
pf_fb_btlPace,
pf_fb_erasurecount,
pf_fb_packetcount,
pf_fb_gaplength,
......@@ -148,7 +148,7 @@ local function dissect_feedback(buffer, pinfo, root)
local tree = root:add(pf_fb, buffer:range(0))
tree:add(pf_fb_groupRTT, buffer:range(0,4))
tree:add(pf_fb_ftt, buffer:range(4,4))
tree:add(pf_fb_btl_pace, buffer:range(8,4))
tree:add(pf_fb_btlPace, buffer:range(8,4))
tree:add(pf_fb_erasurecount, buffer:range(12,2))
tree:add(pf_fb_packetcount, buffer:range(14,2))
tree:add(pf_fb_gaplength, buffer:range(16,2))
......@@ -217,4 +217,3 @@ function prrt_proto.prefs_changed()
end
end
end
......@@ -148,7 +148,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
// forward to application layer
debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
......@@ -287,7 +287,7 @@ void *receive_data_loop(void *ptr) {
while (1) {
PrrtPace_track_start(s->prrtReceivePace);
debug(DEBUG_DATARECEIVER, "About to receive.");
debug(DEBUG_DATARECEIVER, "About to recv on UDP socket.");
XlapTimestampPlaceholder tsph1;
XlapTimestampPlaceholder tsph2;
XlapTimestampPlaceholder tsph3;
......
......@@ -119,7 +119,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
peerPacingTime = (prrtTimedelta_t) pt;
}
}
prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
......
......@@ -264,7 +264,8 @@ bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
return false;
}
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync) {
int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool sync, bool try) {
int res = 0;
if (data_len > s->maximum_payload_size) {
PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
return -1;
......@@ -291,7 +292,14 @@ int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool s
PrrtDataTransmitter_transmit(s, packet);
PrrtSocket_pace(s, false);
} else {
Pipe_push(s->sendDataQueue, &packet->asListNode);
if(try) {
res = Pipe_try_push(s->sendDataQueue, &packet->asListNode);
if (res != 0) {
PrrtPacket_destroy(packet);
}
} else {
Pipe_push(s->sendDataQueue, &packet->asListNode);
}
}
PrrtPace_track_resume(s->appSendPace);
......@@ -300,15 +308,19 @@ int sendpacket(PrrtSocket *s, const uint8_t *data, const size_t data_len, bool s
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtSendEnd);
PrrtPace_track_end(s->appSendPace);
return 0;
return res;
}
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, true);
return sendpacket(s, data, data_len, true, false);
}
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, false);
return sendpacket(s, data, data_len, false, false);
}
int PrrtSocket_try_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
return sendpacket(s, data, data_len, false, true);
}
bool PrrtSocket_closing(PrrtSocket *s) {
......@@ -372,37 +384,36 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
}
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
PrrtPace_track_start(s->appDeliverPace);
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
struct timespec deadline = abstime_from_now(time_window_us);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
&deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
errno = 0;
}
if (PrrtSocket_closing(s)) {
PrrtPace_track_end(s->appDeliverPace);
return -1;
}
} while (!packet);
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
return PrrtSocket_receive_ordered_timedwait(s, buf_ptr, addr, time_window_us, NULL);
}
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
PrrtPace_track_start(s->appDeliverPace);
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now, now + time_window_us);
if (packet == NULL) {
do {
struct timespec abs_now = abstime_now();
if (deadline != NULL && timedelta(&abs_now, deadline) > 0) {
return -1 * ETIMEDOUT;
}
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
deadline, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
PrrtPace_track_end(s->appDeliverPace);
return -1 * ETIMEDOUT;
now = PrrtClock_get_current_time_us();
struct timespec window_end_time = abstime_from_now(time_window_us / 2);
if (deadline != NULL && timedelta(&window_end_time, deadline) > 0) {
memcpy(&window_end_time, deadline, sizeof(struct timespec));
}
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us,
&window_end_time, s->appDeliverPace);
if (packet == NULL && errno == ETIMEDOUT) {
errno = 0;
}
if (PrrtSocket_closing(s)) {
PrrtPace_track_end(s->appDeliverPace);
return -1;
}
} while (!packet);
}
prrtPacketLength_t l = deliver_packet(s, buf_ptr, packet, addr);
PrrtPace_track_end(s->appDeliverPace);
return l;
......@@ -560,8 +571,10 @@ int PrrtSocket_close(PrrtSocket *s) {
}
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
if (strcmp(name, "targetdelay") == 0) {
if (strcmp(name, "target_delay") == 0) {
return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
} else if(strcmp(name, "pacing_enabled") == 0) {
return (uint32_t) s->pacingEnabled;
} else if(strcmp(name, "appSend_pace_internal") == 0) {
return PrrtPace_get_internal(s->appSendPace);
} else if(strcmp(name, "appSend_pace_dependent") == 0) {
......@@ -623,9 +636,13 @@ uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
}
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value) {
if (strcmp(name, "app_queue_size")) {
if (strcmp(name, "app_queue_size") == 0) {
PrrtApplicationConstraints_set_app_queue_size(s->applicationConstraints, value);
// TODO: MPSC_Queue does not provide a size.
} else if (strcmp(name, "pacing_enabled") == 0) {
s->pacingEnabled = (value > 0) ? true : false;
} else if (strcmp(name, "target_delay") == 0) {
return PrrtApplicationConstraints_set_target_delay(s->applicationConstraints, value);
} else {
return false;
}
......@@ -776,8 +793,6 @@ uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
return s->receiver->packetTracking->app_limited;
}
PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
PrrtCoder_get_n(s->coder) != codingParams->n) {
......
......@@ -131,6 +131,8 @@ bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port);
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
int PrrtSocket_try_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
bool PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
......
......@@ -26,7 +26,13 @@ bool PrrtApplicationConstraints_destroy(PrrtApplicationConstraints *applicationC
prrtTimedelta_t PrrtApplicationConstraints_get_target_delay(PrrtApplicationConstraints *applicationConstraints)
{
return applicationConstraints->targetDelay_us;
return (prrtTimedelta_t) atomic_load_explicit(&applicationConstraints->targetDelay_us, memory_order_acquire);
}
bool PrrtApplicationConstraints_set_target_delay(PrrtApplicationConstraints *applicationConstraints, prrtTimedelta_t target_delay_us)
{
atomic_store_explicit(&applicationConstraints->targetDelay_us, target_delay_us, memory_order_release);
return true;
}
uint32_t PrrtApplicationConstraints_get_app_queue_size(PrrtApplicationConstraints *applicationConstraints)
......
......@@ -12,6 +12,7 @@ PrrtApplicationConstraints *PrrtApplicationConstraints_create(prrtTimedelta_t ta
bool PrrtApplicationConstraints_destroy(PrrtApplicationConstraints *applicationConstraints);
prrtTimedelta_t PrrtApplicationConstraints_get_target_delay(PrrtApplicationConstraints *applicationConstraints);
bool PrrtApplicationConstraints_set_target_delay(PrrtApplicationConstraints *applicationConstraints, prrtTimedelta_t target_delay_us);
uint32_t PrrtApplicationConstraints_get_app_queue_size(PrrtApplicationConstraints *applicationConstraints);
bool PrrtApplicationConstraints_set_app_queue_size(PrrtApplicationConstraints *applicationConstraints, uint32_t size);
......
......@@ -217,7 +217,7 @@ cdef class PrrtSocket:
# Application Properties
property target_delay:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "target_delay") * 0.000001
property maximum_payload_size:
def __get__(self):
......
......@@ -153,9 +153,12 @@ int main(int argc, char **argv) {
// 1400 bytes.
sprintf(buf, "%1400d", j + 1);
PrrtSocket_send_sync(s, (unsigned char *) buf, strlen(buf));
j++;
int res = PrrtSocket_try_send_async(s, (unsigned char *) buf, strlen(buf));
if (res != 0) {
// EAGAIN
} else {
j++;
}
// Send every 100us, as this is a sensible packet interval.
if (arguments.ipt != 0) {
usleep_nano(arguments.ipt);
......
......@@ -68,19 +68,32 @@ static void pipe_wake(atomic_int *var) {
futex_wake_all(var);
}
void Pipe_push(Pipe *p, ListNode *value) {
void do_push(Pipe *p, ListNode *value) {
mpscq_node_t *n = calloc(1, sizeof(mpscq_node_t));
assert(n);
pipe_wait_for(&p->space);
n->data = value;
atomic_store_explicit(&n->next, NULL, memory_order_relaxed);
mpscq_node_t* prev = atomic_exchange_explicit(&p->head, n, memory_order_acq_rel);
atomic_store_explicit(&prev->next, n, memory_order_release);
pipe_wake(&p->items);
}
mpscq_node_t *n = calloc(1, sizeof(mpscq_node_t));
assert(n);
void Pipe_push(Pipe *p, ListNode *value) {
pipe_wait_for(&p->space);
do_push(p, value);
n->data = value;
atomic_store_explicit(&n->next, NULL, memory_order_relaxed);
mpscq_node_t* prev = atomic_exchange_explicit(&p->head, n, memory_order_acq_rel);
atomic_store_explicit(&prev->next, n, memory_order_release);
}
pipe_wake(&p->items);
int Pipe_try_push(Pipe *p, ListNode *value) {
bool res = pipe_wait_try(&p->space);
if (res) {
do_push(p, value);
return 0;
} else {
return EAGAIN;
}
}
static ListNode *Pipe_deq(Pipe *p) {
......
......@@ -22,6 +22,7 @@ Pipe *Pipe_create(void);
bool Pipe_destroy(Pipe *);
void Pipe_push(Pipe *, ListNode *);
int Pipe_try_push(Pipe *, ListNode *);
ListNode *Pipe_pull(Pipe *);
......
......@@ -9,6 +9,12 @@ long long timedelta(struct timespec *t1, struct timespec *t2) {
return delta;
}
struct timespec abstime_now() {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
return now;
}
struct timespec abstime_from_now(uint32_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
......
......@@ -5,6 +5,7 @@
long long timedelta(struct timespec *t1, struct timespec *t2);
struct timespec abstime_from_now(uint32_t wait_time);
struct timespec abstime_now();
int64_t PrrtTimestamp_cmp(prrtTimestamp_t a, prrtTimestamp_t b);
#endif //PRRT_TIME_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment