...
 
Commits (280)
...@@ -10,3 +10,4 @@ MANIFEST ...@@ -10,3 +10,4 @@ MANIFEST
prrt.cpython*.so prrt.cpython*.so
prrt.so prrt.so
.ipynb_checkpoints/ .ipynb_checkpoints/
.idea/
...@@ -22,19 +22,23 @@ local pf_seqN = ProtoField.uint16("prrt.sequenceNumber", "Sequence Number") ...@@ -22,19 +22,23 @@ local pf_seqN = ProtoField.uint16("prrt.sequenceNumber", "Sequence Number")
local pf_data = ProtoField.new("Data", "prrt.data", ftypes.BYTES, base.NONE) local pf_data = ProtoField.new("Data", "prrt.data", ftypes.BYTES, base.NONE)
local pf_data_length = ProtoField.uint32("prrt.data.length", "Length") local pf_data_length = ProtoField.uint32("prrt.data.length", "Length")
local pf_data_btl_pace = ProtoField.uint32("prrt.data.btl_pace", "Bottleneck Pace")
local pf_data_timestamp = ProtoField.uint32("prrt.data.timestamp", "Timestamp") local pf_data_timestamp = ProtoField.uint32("prrt.data.timestamp", "Timestamp")
local pf_data_groupRTprop = ProtoField.uint32("prrt.data.grouprtprop", "Group RTprop") local pf_data_groupRTprop = ProtoField.uint32("prrt.data.grouprtprop", "Group RTprop")
local pf_data_packettimeout = ProtoField.uint32("prrt.data.packettimeout", "Packet Timeout") local pf_data_packettimeout = ProtoField.uint32("prrt.data.packettimeout", "Packet Timeout")
local pf_data_btlDatarate = ProtoField.uint32("prrt.data.btl_datarate", "Bottleneck Datarate")
local pf_red = ProtoField.new("Redundancy", "prrt.redundancy", ftypes.BYTES, base.NONE) local pf_red = ProtoField.new("Redundancy", "prrt.redundancy", ftypes.BYTES, base.NONE)
local pf_red_baseSeqN = ProtoField.uint16("prrt.redundancy.baseSequenceNumber", "Base Sequence Number", base.DEC) local pf_red_baseSeqN = ProtoField.uint16("prrt.redundancy.baseSequenceNumber", "Base Sequence Number", base.DEC)
local pf_red_timestamp = ProtoField.uint32("prrt.redundancy.timestamp", "Timestamp") local pf_red_timestamp = ProtoField.uint32("prrt.redundancy.timestamp", "Timestamp")
local pf_red_btl_pace = ProtoField.uint32("prrt.redundancy.btl_pace", "Bottleneck Pace")
local pf_red_n = ProtoField.uint8("prrt.redundancy.n", "n") local pf_red_n = ProtoField.uint8("prrt.redundancy.n", "n")
local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k") local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k")
local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE) local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE)
local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT") local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT")
local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT") local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT")
local pf_fb_btl_pace = ProtoField.uint32("prrt.feedback.btl_pace", "Bottleneck Pace")
local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count") local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count")
local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count") local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count")
local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length") local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length")
...@@ -53,19 +57,23 @@ prrt_proto.fields = { ...@@ -53,19 +57,23 @@ prrt_proto.fields = {
pf_data, pf_data,
pf_data_length, pf_data_length,
pf_data_btl_pace,
pf_data_timestamp, pf_data_timestamp,
pf_data_groupRTprop, pf_data_groupRTprop,
pf_data_packettimeout, pf_data_packettimeout,
pf_data_btlDatarate,
pf_red, pf_red,
pf_red_baseSeqN, pf_red_baseSeqN,
pf_red_timestamp, pf_red_timestamp,
pf_red_btl_pace,
pf_red_n, pf_red_n,
pf_red_k, pf_red_k,
pf_fb, pf_fb,
pf_fb_groupRTT, pf_fb_groupRTT,
pf_fb_ftt, pf_fb_ftt,
pf_fb_btl_pace,
pf_fb_erasurecount, pf_fb_erasurecount,
pf_fb_packetcount, pf_fb_packetcount,
pf_fb_gaplength, pf_fb_gaplength,
...@@ -111,10 +119,12 @@ local PRRT_MIN_SIZE = 8 ...@@ -111,10 +119,12 @@ local PRRT_MIN_SIZE = 8
-- create sub-dissectors for different types -- create sub-dissectors for different types
local function dissect_data(buffer, pinfo, root) local function dissect_data(buffer, pinfo, root)
local tree = root:add(pf_data, buffer:range(0)) local tree = root:add(pf_data, buffer:range(0))
tree:add(pf_data_length, buffer:range(0,4)) tree:add(pf_data_btl_pace, buffer:range(0,4))
tree:add(pf_data_timestamp, buffer:range(4,4)) tree:add(pf_data_length, buffer:range(4,4))
tree:add(pf_data_groupRTprop, buffer:range(8,4)) tree:add(pf_data_timestamp, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4)) tree:add(pf_data_groupRTprop, buffer:range(12,4))
tree:add(pf_data_packettimeout, buffer:range(16,4))
tree:add(pf_data_btlDatarate, buffer:range(20,4))
local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength() local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label) tree:set_text(label)
...@@ -125,8 +135,9 @@ local function dissect_redundancy(buffer, pinfo, root) ...@@ -125,8 +135,9 @@ local function dissect_redundancy(buffer, pinfo, root)
local tree = root:add(pf_red, buffer:range(0)) local tree = root:add(pf_red, buffer:range(0))
tree:add(pf_red_baseSeqN, buffer:range(0,2)) tree:add(pf_red_baseSeqN, buffer:range(0,2))
tree:add(pf_red_timestamp, buffer:range(2,4)) tree:add(pf_red_timestamp, buffer:range(2,4))
tree:add(pf_red_n, buffer:range(6,1)) tree:add(pf_red_btl_pace, buffer:range(6,4))
tree:add(pf_red_k, buffer:range(7,1)) tree:add(pf_red_n, buffer:range(10,1))
tree:add(pf_red_k, buffer:range(11,1))
local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK() local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label) tree:set_text(label)
...@@ -137,14 +148,15 @@ local function dissect_feedback(buffer, pinfo, root) ...@@ -137,14 +148,15 @@ local function dissect_feedback(buffer, pinfo, root)
local tree = root:add(pf_fb, buffer:range(0)) local tree = root:add(pf_fb, buffer:range(0))
tree:add(pf_fb_groupRTT, buffer:range(0,4)) tree:add(pf_fb_groupRTT, buffer:range(0,4))
tree:add(pf_fb_ftt, buffer:range(4,4)) tree:add(pf_fb_ftt, buffer:range(4,4))
tree:add(pf_fb_erasurecount, buffer:range(8,2)) tree:add(pf_fb_btl_pace, buffer:range(8,4))
tree:add(pf_fb_packetcount, buffer:range(10,2)) tree:add(pf_fb_erasurecount, buffer:range(12,2))
tree:add(pf_fb_gaplength, buffer:range(12,2)) tree:add(pf_fb_packetcount, buffer:range(14,2))
tree:add(pf_fb_gapcount, buffer:range(14,2)) tree:add(pf_fb_gaplength, buffer:range(16,2))
tree:add(pf_fb_burstlength, buffer:range(16,2)) tree:add(pf_fb_gapcount, buffer:range(18,2))
tree:add(pf_fb_burstcount, buffer:range(18,2)) tree:add(pf_fb_burstlength, buffer:range(20,2))
tree:add(pf_fb_acktype, buffer:range(20,1)) tree:add(pf_fb_burstcount, buffer:range(22,2))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2)) tree:add(pf_fb_acktype, buffer:range(24,1))
tree:add(pf_fb_ackSeqN, buffer:range(25, 2))
local label = "[F]" local label = "[F]"
tree:set_text(label) tree:set_text(label)
......
...@@ -87,7 +87,7 @@ static int hf_prrt_payload = -1; /* payload with arbitrary length, depend ...@@ -87,7 +87,7 @@ static int hf_prrt_payload = -1; /* payload with arbitrary length, depend
static int hf_prrt_receiver_addr = -1; /* 32 bits IP address of the receiver */ static int hf_prrt_receiver_addr = -1; /* 32 bits IP address of the receiver */
static int hf_prrt_rtt_probe = -1; /* 32 bits timestamp of most recent data packet plus delay since reception of this packet. */ static int hf_prrt_rtt_probe = -1; /* 32 bits timestamp of most recent data packet plus delay since reception of this packet. */
static int hf_prrt_plr = -1; /* 32 bits packet loss rate measured at the receiver. */ static int hf_prrt_plr = -1; /* 32 bits packet loss rate measured at the receiver. */
static int hf_prrt_bw_est = -1; /* 32 bits bandwidth estimated at the receiver. */ static int hf_prrt_datarate_est = -1; /* 32 bits datarate estimated at the receiver. */
/* The following two fields are the payload of PRRT feedback packet (packet type: 3). */ /* The following two fields are the payload of PRRT feedback packet (packet type: 3). */
static int hf_prrt_packet_no = -1; /* 16 bits packet no of the first lost packet. */ static int hf_prrt_packet_no = -1; /* 16 bits packet no of the first lost packet. */
static int hf_prrt_retr_round = -1; /* 16 bits retransmission round of the first lost. (first transmission is 0.) */ static int hf_prrt_retr_round = -1; /* 16 bits retransmission round of the first lost. (first transmission is 0.) */
...@@ -323,7 +323,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -323,7 +323,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
if (len == 24) { if (len == 24) {
...@@ -364,7 +364,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -364,7 +364,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
if (len == 28) { if (len == 28) {
...@@ -484,10 +484,10 @@ proto_register_prrt(void) ...@@ -484,10 +484,10 @@ proto_register_prrt(void)
FT_UINT32, BASE_DEC, NULL, 0x0, FT_UINT32, BASE_DEC, NULL, 0x0,
"The packet loss rate measured at the receiver, stored in general PRRT feedback header", HFILL } "The packet loss rate measured at the receiver, stored in general PRRT feedback header", HFILL }
}, },
{ &hf_prrt_bw_est, { &hf_prrt_datarate_est,
{ "Estimated Bandwidth", "prrt.feedback.bw", { "Estimated Datarate", "prrt.feedback.datarate",
FT_UINT32, BASE_DEC, NULL, 0x0, FT_UINT32, BASE_DEC, NULL, 0x0,
"The bandwidth estimated at the receiver, stored in general PRRT feedback header", HFILL } "The datarate estimated at the receiver, stored in general PRRT feedback header", HFILL }
}, },
{ &hf_prrt_packet_no, { &hf_prrt_packet_no,
{ "Packet No", "prrt.feedback.packetno", { "Packet No", "prrt.feedback.packetno",
......
...@@ -92,7 +92,7 @@ static int hf_prrt_payload = -1; /* payload with arbitrary length, depend ...@@ -92,7 +92,7 @@ static int hf_prrt_payload = -1; /* payload with arbitrary length, depend
static int hf_prrt_receiver_addr = -1; /* 32 bits IP address of the receiver */ static int hf_prrt_receiver_addr = -1; /* 32 bits IP address of the receiver */
static int hf_prrt_rtt_probe = -1; /* 32 bits timestamp of most recent data packet plus delay since reception of this packet. */ static int hf_prrt_rtt_probe = -1; /* 32 bits timestamp of most recent data packet plus delay since reception of this packet. */
static int hf_prrt_plr = -1; /* 32 bits packet loss rate measured at the receiver. */ static int hf_prrt_plr = -1; /* 32 bits packet loss rate measured at the receiver. */
static int hf_prrt_bw_est = -1; /* 32 bits bandwidth estimated at the receiver. */ static int hf_prrt_datarate_est = -1; /* 32 bits datarate estimated at the receiver. */
/* The following two fields are the payload of PRRT feedback packet (packet type: 3). */ /* The following two fields are the payload of PRRT feedback packet (packet type: 3). */
static int hf_prrt_packet_no = -1; /* 16 bits packet no of the first lost packet. */ static int hf_prrt_packet_no = -1; /* 16 bits packet no of the first lost packet. */
static int hf_prrt_retr_round = -1; /* 16 bits retransmission round of the first lost. (first transmission is 0.) */ static int hf_prrt_retr_round = -1; /* 16 bits retransmission round of the first lost. (first transmission is 0.) */
...@@ -361,7 +361,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -361,7 +361,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
if (len == 24) { if (len == 24) {
...@@ -402,7 +402,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -402,7 +402,7 @@ dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_plr, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
if (len == 28) { if (len == 28) {
...@@ -524,10 +524,10 @@ proto_register_prrt(void) ...@@ -524,10 +524,10 @@ proto_register_prrt(void)
FT_UINT32, BASE_DEC, NULL, 0x0, FT_UINT32, BASE_DEC, NULL, 0x0,
"The packet loss rate measured at the receiver, stored in general PRRT feedback header", HFILL } "The packet loss rate measured at the receiver, stored in general PRRT feedback header", HFILL }
}, },
{ &hf_prrt_bw_est, { &hf_prrt_datarate_est,
{ "Estimated Bandwidth", "prrt.feedback.bw", { "Estimated Datarate", "prrt.feedback.datarate",
FT_UINT32, BASE_DEC, NULL, 0x0, FT_UINT32, BASE_DEC, NULL, 0x0,
"The bandwidth estimated at the receiver, stored in general PRRT feedback header", HFILL } "The datarate estimated at the receiver, stored in general PRRT feedback header", HFILL }
}, },
{ &hf_prrt_packet_no, { &hf_prrt_packet_no,
{ "Packet No", "prrt.feedback.packetno", { "Packet No", "prrt.feedback.packetno",
......
...@@ -74,7 +74,7 @@ static int hf_prrt_gap = -1; ...@@ -74,7 +74,7 @@ static int hf_prrt_gap = -1;
static int hf_prrt_ngap = -1; static int hf_prrt_ngap = -1;
static int hf_prrt_burst = -1; static int hf_prrt_burst = -1;
static int hf_prrt_nburst = -1; static int hf_prrt_nburst = -1;
static int hf_prrt_bw_est = -1; static int hf_prrt_datarate_est = -1;
static int hf_prrt_buf_fb = -1; static int hf_prrt_buf_fb = -1;
/* Payload */ /* Payload */
...@@ -206,7 +206,7 @@ static void dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -206,7 +206,7 @@ static void dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 2; offset += 2;
proto_tree_add_item(prrt_tree, hf_prrt_nburst, tvb, offset, 2, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_nburst, tvb, offset, 2, FALSE);
offset += 2; offset += 2;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_buf_fb, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_buf_fb, tvb, offset, 4, FALSE);
} else if(packet_type == 4) { } else if(packet_type == 4) {
...@@ -236,7 +236,7 @@ static void dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree) ...@@ -236,7 +236,7 @@ static void dissect_prrt(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
offset += 2; offset += 2;
proto_tree_add_item(prrt_tree, hf_prrt_nburst, tvb, offset, 2, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_nburst, tvb, offset, 2, FALSE);
offset += 2; offset += 2;
proto_tree_add_item(prrt_tree, hf_prrt_bw_est, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_datarate_est, tvb, offset, 4, FALSE);
offset += 4; offset += 4;
proto_tree_add_item(prrt_tree, hf_prrt_buf_fb, tvb, offset, 4, FALSE); proto_tree_add_item(prrt_tree, hf_prrt_buf_fb, tvb, offset, 4, FALSE);
} else { } else {
...@@ -292,8 +292,8 @@ void proto_register_prrt(void) ...@@ -292,8 +292,8 @@ void proto_register_prrt(void)
{ "Aggregated Burst Length", "prrt.feedback.burst", FT_UINT16, BASE_DEC, NULL, 0x0, "", HFILL }}, { "Aggregated Burst Length", "prrt.feedback.burst", FT_UINT16, BASE_DEC, NULL, 0x0, "", HFILL }},
{ &hf_prrt_nburst, { &hf_prrt_nburst,
{ "Burst Count", "prrt.feedback.nburst", FT_UINT16, BASE_DEC, NULL, 0x0, "", HFILL }}, { "Burst Count", "prrt.feedback.nburst", FT_UINT16, BASE_DEC, NULL, 0x0, "", HFILL }},
{ &hf_prrt_bw_est, { &hf_prrt_datarate_est,
{ "Estimated Bandwidth", "prrt.feedback.bw", FT_UINT32, BASE_DEC, NULL, 0x0, "The bandwidth estimated at the receiver, stored in general PRRT feedback header", HFILL }}, { "Estimated Datarate", "prrt.feedback.datarate", FT_UINT32, BASE_DEC, NULL, 0x0, "The datarate estimated at the receiver, stored in general PRRT feedback header", HFILL }},
{ &hf_prrt_buf_fb, { &hf_prrt_buf_fb,
{ "Buffer Feedback", "prrt.feedback.buf_fb", FT_UINT32, BASE_DEC, NULL, 0x0, "", HFILL }}, { "Buffer Feedback", "prrt.feedback.buf_fb", FT_UINT32, BASE_DEC, NULL, 0x0, "", HFILL }},
{ &hf_prrt_payload, { &hf_prrt_payload,
......
FROM gcc:5 FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de> MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
...@@ -7,6 +7,8 @@ ENV DEBIAN_FRONTEND noninteractive ...@@ -7,6 +7,8 @@ ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \ RUN apt-get update && apt-get install --yes --force-yes \
bc \ bc \
cmake \ cmake \
iperf3 \
psmisc \
traceroute \ traceroute \
tshark tshark
...@@ -14,10 +16,11 @@ COPY CMakeLists.txt /prrt/ ...@@ -14,10 +16,11 @@ COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt COPY prrt /prrt/prrt
COPY tests /prrt/tests COPY tests /prrt/tests
COPY docker/entrypoint.sh / COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt WORKDIR /prrt
RUN cmake . \ RUN cmake . -DCMAKE_BUILD_TYPE=DEBUG \
&& make && make
ENV PATH /prrt:$PATH ENV PATH /prrt:$PATH
......
FROM gcc:5 FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de> MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
...@@ -7,6 +7,8 @@ ENV DEBIAN_FRONTEND noninteractive ...@@ -7,6 +7,8 @@ ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \ RUN apt-get update && apt-get install --yes --force-yes \
bc \ bc \
cmake \ cmake \
iperf3 \
psmisc \
traceroute \ traceroute \
tshark tshark
...@@ -17,7 +19,7 @@ COPY docker/entrypoint.sh / ...@@ -17,7 +19,7 @@ COPY docker/entrypoint.sh /
WORKDIR /prrt WORKDIR /prrt
RUN cmake -DTCP=1 . \ RUN cmake -DTCP=1 -DCMAKE_BUILD_TYPE=DEBUG . \
&& make && make
ENV PATH /prrt:$PATH ENV PATH /prrt:$PATH
......
#!/bin/bash #!/bin/sh
dev=eth0 dev=eth0
...@@ -34,11 +34,11 @@ case $key in ...@@ -34,11 +34,11 @@ case $key in
shift shift
shift shift
;; ;;
-T|--threadpinning) -T|--threadpinning|-U)
PRRT+=("$1") PRRT+=("$1")
shift shift
;; ;;
-p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay) -p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay|-j|--appjitter)
PRRT+=("$1 $2") PRRT+=("$1 $2")
shift shift
shift shift
...@@ -56,16 +56,14 @@ NETEM_PARAMS="${NETEM[@]}" ...@@ -56,16 +56,14 @@ NETEM_PARAMS="${NETEM[@]}"
echo "Starting Wireshark." echo "Starting Wireshark."
tshark -i eth0 -w $OUTPUT.pcap & tshark -i eth0 -w $OUTPUT.pcap &
TSHARK_PID=$! TSHARK_PID=$!
sleep 5 sleep 2
start=$(date +%s.%N); start=$(date +%s.%N);
echo "Checking reachability of $TARGET." echo "Checking reachability of $TARGET."
until ping -c1 $TARGET &>/dev/null; sleep 1; do :; done until ping -c1 $TARGET &>/dev/null; do sleep 1; done
dur=$(echo "$(date +%s.%N) - $start" | bc); dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Reachable after %.6f seconds\n" $dur
traceroute $TARGET > $OUTPUT.tr printf "Reachable after %.6f seconds\n" $dur
echo "Traceroute done."
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
echo "Delaying sender start." echo "Delaying sender start."
...@@ -73,11 +71,12 @@ else ...@@ -73,11 +71,12 @@ else
echo "Delaying receiver start." echo "Delaying receiver start."
fi fi
start=$(date +%s.%N);
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\"" echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
trap 'echo "Caught SIGINT."; echo "$(ps -a)"; killall -SIGINT $command' INT
tc qdisc add dev $dev root netem $NETEM_PARAMS LOG=$(/prrt/$command $PRRT_PARAMS 2>&1)
/prrt/$command $PRRT_PARAMS echo "Exit status: $?"
echo "Done." printf "$LOG\n"
tc qdisc del dev $dev root dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Done after %.6f seconds\n" $dur
kill $TSHARK_PID kill $TSHARK_PID
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
...@@ -126,7 +126,6 @@ cdef extern from "proto/socket.h": ...@@ -126,7 +126,6 @@ cdef extern from "proto/socket.h":
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
...@@ -146,11 +145,11 @@ cdef extern from "proto/socket.h": ...@@ -146,11 +145,11 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket) uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket) float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket) uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s); uint32_t PrrtSocket_get_btldatarate_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s); uint32_t PrrtSocket_get_btldatarate_back(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s) uint64_t PrrtSocket_get_full_datarate(PrrtSocket *s)
bint PrrtSocket_get_filled_pipe(PrrtSocket *s) bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s) uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s)
float PrrtSocket_get_pacing_gain(PrrtSocket *s) float PrrtSocket_get_pacing_gain(PrrtSocket *s)
......
...@@ -30,8 +30,8 @@ ...@@ -30,8 +30,8 @@
#define GF_BITS 8 #define GF_BITS 8
#define K_START 4 #define K_START 1
#define N_START 7 #define N_START 1
#define N_P_START 1 #define N_P_START 1
#define RRT_ALPHA 0.125 #define RRT_ALPHA 0.125
......
...@@ -9,6 +9,8 @@ set (PRRT_SOURCES ../defines.h ...@@ -9,6 +9,8 @@ set (PRRT_SOURCES ../defines.h
stores/dataPacketStore.c stores/dataPacketStore.h stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h stores/receptionTable.c stores/receptionTable.h
......
...@@ -9,7 +9,7 @@ prrtByteCount_t BBR_Inflight(BBR* bbr, double gain) ...@@ -9,7 +9,7 @@ prrtByteCount_t BBR_Inflight(BBR* bbr, double gain)
if (bbr->rtprop == RTprop_Inf) if (bbr->rtprop == RTprop_Inf)
return bbr->initial_cwnd; /* no valid RTT samples yet */ return bbr->initial_cwnd; /* no valid RTT samples yet */
uint32_t quanta = bbr->mps; uint32_t quanta = bbr->mps;
uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000)); uint32_t estimated_bdp = (uint32_t) round((((double)bbr->datarate) * bbr->rtprop) / (1000 * 1000));
return (uint32_t)(gain * estimated_bdp + quanta); return (uint32_t)(gain * estimated_bdp + quanta);
} }
...@@ -20,7 +20,7 @@ void BBR_EnterStartup(BBR* bbr) ...@@ -20,7 +20,7 @@ void BBR_EnterStartup(BBR* bbr)
bbr->cwnd_gain = BBRHighGain; bbr->cwnd_gain = BBRHighGain;
} }
void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking) void BBR_UpdateBtlDatarate(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking)
{ {
if (tracking->delivered >= bbr->next_round_delivered) { if (tracking->delivered >= bbr->next_round_delivered) {
bbr->next_round_delivered = tracking->delivered; bbr->next_round_delivered = tracking->delivered;
...@@ -30,9 +30,9 @@ void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking) ...@@ -30,9 +30,9 @@ void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking)
bbr->round_start = false; bbr->round_start = false;
} }
uint32_t delivery_rate_Bps = (uint32_t)((float) rs->delivery_rate); uint32_t delivery_rate_Bps = (uint32_t)((float) rs->delivery_rate);
if ((delivery_rate_Bps >= bbr->bw || !rs->is_app_limited) && delivery_rate_Bps != 0) { if ((delivery_rate_Bps >= bbr->datarate || !rs->is_app_limited) && delivery_rate_Bps != 0) {
bbr->bw = (uint32_t)WindowedFilter_push(bbr->btlBwFilter, delivery_rate_Bps); bbr->datarate = (uint32_t)WindowedFilter_push(bbr->btlDatarateFilter, delivery_rate_Bps);
debug(DEBUG_BBR, "Current BtlBw: %u, RS delivery rate: %u", bbr->bw, delivery_rate_Bps); debug(DEBUG_BBR, "Current BtlDatarate: %u, RS delivery rate: %u", bbr->datarate, delivery_rate_Bps);
} }
} }
...@@ -40,13 +40,13 @@ void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs) ...@@ -40,13 +40,13 @@ void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs)
{ {
if (bbr->filled_pipe || !bbr->round_start || rs->is_app_limited) if (bbr->filled_pipe || !bbr->round_start || rs->is_app_limited)
return; // no need to check for a full pipe now return; // no need to check for a full pipe now
if (bbr->bw >= bbr->full_bw * PROBE_GAIN) { // BBR.BtlBw still growing? if (bbr->datarate >= bbr->full_datarate * PROBE_GAIN) { // BBR.BtlDatarate still growing?
bbr->full_bw = bbr->bw; // record new baseline level bbr->full_datarate = bbr->datarate; // record new baseline level
bbr->full_bw_count = 0; bbr->full_datarate_count = 0;
return; return;
} }
bbr->full_bw_count++; // another round w/o much growth bbr->full_datarate_count++; // another round w/o much growth
if (bbr->full_bw_count >= 3) if (bbr->full_datarate_count >= 3)
bbr->filled_pipe = true; bbr->filled_pipe = true;
} }
...@@ -71,13 +71,13 @@ void BBR_AdvanceCyclePhase(BBR* bbr) ...@@ -71,13 +71,13 @@ void BBR_AdvanceCyclePhase(BBR* bbr)
} }
void BBR_CheckCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight) { void BBR_CheckCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight) {
if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, bytes_lost, prior_inflight)) if (bbr->state == PROBE_DR && BBR_IsNextCyclePhase(bbr, bytes_lost, prior_inflight))
BBR_AdvanceCyclePhase(bbr); BBR_AdvanceCyclePhase(bbr);
} }
void BBR_EnterProbeBW(BBR* bbr) void BBR_EnterProbeDR(BBR* bbr)
{ {
bbr->state = PROBE_BW; bbr->state = PROBE_DR;
bbr->pacing_gain = 1; bbr->pacing_gain = 1;
bbr->cwnd_gain = 2; bbr->cwnd_gain = 2;
bbr->cycle_index = (uint8_t)(BBRGainCycleLen - 1 - (random() % 7)); bbr->cycle_index = (uint8_t)(BBRGainCycleLen - 1 - (random() % 7));
...@@ -93,13 +93,13 @@ void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight) ...@@ -93,13 +93,13 @@ void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight)
bbr->cwnd_gain = BBRHighGain; // maintain cwnd bbr->cwnd_gain = BBRHighGain; // maintain cwnd
} }
if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0)) if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0))
BBR_EnterProbeBW(bbr); // we estimate queue is drained BBR_EnterProbeDR(bbr); // we estimate queue is drained
} }
void BBR_ExitProbeRTT(BBR* bbr) void BBR_ExitProbeRTT(BBR* bbr)
{ {
if (bbr->filled_pipe) if (bbr->filled_pipe)
BBR_EnterProbeBW(bbr); BBR_EnterProbeDR(bbr);
else else
BBR_EnterStartup(bbr); BBR_EnterStartup(bbr);
} }
...@@ -167,7 +167,7 @@ void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) { ...@@ -167,7 +167,7 @@ void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
void BBR_UpdateModelAndState(BBR *bbr, PrrtRateSample *rs, PrrtPacketTracking *packetTracking, prrtTimedelta_t rtt) void BBR_UpdateModelAndState(BBR *bbr, PrrtRateSample *rs, PrrtPacketTracking *packetTracking, prrtTimedelta_t rtt)
{ {
BBR_UpdateBtlBw(bbr, rs, packetTracking); BBR_UpdateBtlDatarate(bbr, rs, packetTracking);
BBR_CheckCyclePhase(bbr, packetTracking->bytes_lost, packetTracking->prior_inflight); BBR_CheckCyclePhase(bbr, packetTracking->bytes_lost, packetTracking->prior_inflight);
BBR_CheckFullPipe(bbr, rs); BBR_CheckFullPipe(bbr, rs);
BBR_CheckDrain(bbr, packetTracking->pipe); BBR_CheckDrain(bbr, packetTracking->pipe);
...@@ -217,9 +217,9 @@ void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking) ...@@ -217,9 +217,9 @@ void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain) void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain)
{ {
double rate = (pacing_gain * ((double)bbr->bw)); double rate = (pacing_gain * ((double)bbr->datarate));
debug(DEBUG_BBR, "Current rate: %f, Pacing gain: %f, BtlBw: %u, Calc Rate: %f, Filled pipe: %u", bbr->pacing_rate, debug(DEBUG_BBR, "Current rate: %f, Pacing gain: %f, BtlDatarate: %u, Calc Rate: %f, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe); pacing_gain, bbr->datarate, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate)) if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
bbr->pacing_rate = rate; bbr->pacing_rate = rate;
} }
...@@ -310,7 +310,7 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size) ...@@ -310,7 +310,7 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
bbr->min_pipe_cwnd = 4 * maximum_payload_size; bbr->min_pipe_cwnd = 4 * maximum_payload_size;
bbr->initial_cwnd = 4 * maximum_payload_size; bbr->initial_cwnd = 4 * maximum_payload_size;
bbr->has_seen_rtt = false; bbr->has_seen_rtt = false;
bbr->btlBwFilter = WindowedFilter_create(true, 10); bbr->btlDatarateFilter = WindowedFilter_create(true, 10);
bbr->rtprop = RTprop_Inf; bbr->rtprop = RTprop_Inf;
bbr->rtprop_stamp = PrrtClock_get_current_time_us(); bbr->rtprop_stamp = PrrtClock_get_current_time_us();
bbr->probe_rtt_done_stamp = 0; bbr->probe_rtt_done_stamp = 0;
...@@ -328,12 +328,12 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size) ...@@ -328,12 +328,12 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
//Init full pipe //Init full pipe
bbr->filled_pipe = false; bbr->filled_pipe = false;
bbr->full_bw = 0; bbr->full_datarate = 0;
bbr->full_bw_count = 0; bbr->full_datarate_count = 0;
//Init pacing rate //Init pacing rate
double nominal_bandwidth = bbr->initial_cwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000); double nominal_datarate = bbr->initial_cwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000);
bbr->pacing_rate = bbr->pacing_gain * nominal_bandwidth; bbr->pacing_rate = bbr->pacing_gain * nominal_datarate;
BBR_EnterStartup(bbr); BBR_EnterStartup(bbr);
...@@ -346,7 +346,7 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size) ...@@ -346,7 +346,7 @@ BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
void BBR_destroy(BBR* bbr) void BBR_destroy(BBR* bbr)
{ {
WindowedFilter_destroy(bbr->btlBwFilter); WindowedFilter_destroy(bbr->btlDatarateFilter);
check(pthread_mutex_destroy(&bbr->lock) == 0, "lock destroy failed."); check(pthread_mutex_destroy(&bbr->lock) == 0, "lock destroy failed.");
free(bbr); free(bbr);
return; return;
...@@ -376,14 +376,14 @@ prrtByteCount_t BBR_getCwnd(BBR* bbr) ...@@ -376,14 +376,14 @@ prrtByteCount_t BBR_getCwnd(BBR* bbr)
return 0; return 0;
} }
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr) prrtDeliveryRate_t BBR_getBtlDatarate(BBR* bbr)
{ {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed."); check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtDeliveryRate_t res = bbr->bw; prrtDeliveryRate_t res = bbr->datarate;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed."); check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res; return res;
error: error:
PERROR("BBR_getBtlBw failed.") PERROR("BBR_getBtlDatarate failed.")
return 0; return 0;
} }
...@@ -448,9 +448,9 @@ bool BBR_getRoundStart(BBR* bbr) { ...@@ -448,9 +448,9 @@ bool BBR_getRoundStart(BBR* bbr) {
return 0; return 0;
} }
prrtByteCount_t BBR_getFullBw(BBR* bbr) { prrtByteCount_t BBR_getFullDatarate(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed."); check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->full_bw; prrtByteCount_t res = bbr->full_datarate;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed."); check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res; return res;
error: error:
......
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
#include "types/rateSample.h" #include "types/rateSample.h"
#include "../util/windowedFilter.h" #include "../util/windowedFilter.h"
#define PROBE_GAIN 1.25 #define PROBE_GAIN 1.1
#define DRAIN_GAIN 0.75 #define DRAIN_GAIN 0.9
#define RTpropFilterLen 10000000 //10s #define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1) #define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8 #define BBRGainCycleLen 8
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
enum bbr_state { enum bbr_state {
STARTUP, STARTUP,
DRAIN, DRAIN,
PROBE_BW, PROBE_DR,
PROBE_RTT PROBE_RTT
}; };
...@@ -47,8 +47,8 @@ typedef struct bbr { ...@@ -47,8 +47,8 @@ typedef struct bbr {
float cwnd_gain; float cwnd_gain;
bool filled_pipe; bool filled_pipe;
prrtByteCount_t full_bw; prrtByteCount_t full_datarate;
uint32_t full_bw_count; uint32_t full_datarate_count;
double pacing_rate; double pacing_rate;
bool has_seen_rtt; bool has_seen_rtt;
...@@ -70,9 +70,9 @@ typedef struct bbr { ...@@ -70,9 +70,9 @@ typedef struct bbr {
prrtByteCount_t send_quantum; prrtByteCount_t send_quantum;
prrtDeliveryRate_t bw; prrtDeliveryRate_t datarate;
WindowedFilter* btlBwFilter; WindowedFilter* btlDatarateFilter;
} BBR; } BBR;
BBR* BBR_Init(prrtByteCount_t maximum_payload_size); BBR* BBR_Init(prrtByteCount_t maximum_payload_size);
...@@ -84,9 +84,9 @@ void BBR_destroy(BBR* bbr); ...@@ -84,9 +84,9 @@ void BBR_destroy(BBR* bbr);
double BBR_getPacingRate(BBR* bbr); double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr); prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr); prrtDeliveryRate_t BBR_getBtlDatarate(BBR* bbr);
uint32_t BBR_getState(BBR* bbr); uint32_t BBR_getState(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr); prrtByteCount_t BBR_getFullDatarate(BBR* bbr);
double BBR_getPacingGain(BBR* bbr); double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr); uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr); bool BBR_getFilledPipe(BBR* bbr);
......
...@@ -89,12 +89,13 @@ static bool send_feedback(PrrtSocket *sock_ptr, ...@@ -89,12 +89,13 @@ static bool send_feedback(PrrtSocket *sock_ptr,
PrrtLossStatistics stats = sock_ptr->lossStatistics; PrrtLossStatistics stats = sock_ptr->lossStatistics;
int group_RTT = 0; // TODO: To be determined. int group_RTT = 0; // TODO: To be determined.
uint32_t local_bottleneck_pace = MAX(PrrtPace_get_effective(sock_ptr->appDeliverPace), PrrtPace_get_effective(sock_ptr->prrtReceivePace));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT, PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength, stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime, stats.burstCount, forwardTripTime,
stats.erasureCount, stats.packetCount, stats.erasureCount, stats.packetCount,
feedback.seqNo, feedback.seqNo, feedback.type,
feedback.type); local_bottleneck_pace);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr); prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length); void *buf = calloc(1, length);
check_mem(buf); check_mem(buf);
...@@ -142,7 +143,8 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -142,7 +143,8 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno); debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
} else { } else {
PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw); PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btl_datarate);
sock_ptr->send_peer_btl_pace = payload->btl_pace;
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index; prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet); PrrtPacket *reference = PrrtPacket_copy(packet);
...@@ -193,6 +195,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { ...@@ -193,6 +195,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
} else { } else {
PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore, PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
redundancyPayload->baseSequenceNumber); redundancyPayload->baseSequenceNumber);
socket->send_peer_btl_pace = payload->btl_pace;
if (block == NULL) { if (block == NULL) {
uint8_t n_cycle[1] = {redundancyPayload->n - redundancyPayload->k}; uint8_t n_cycle[1] = {redundancyPayload->n - redundancyPayload->k};
PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k, PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
...@@ -221,6 +224,7 @@ void handle_feedback_packet(PrrtSocket *socket, PrrtPacket *packet, prrtTimestam ...@@ -221,6 +224,7 @@ void handle_feedback_packet(PrrtSocket *socket, PrrtPacket *packet, prrtTimestam
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us; prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp); prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
socket->recv_peer_btl_pace = feedbackPayload->btl_pace;
PrrtReceiver_on_ack(socket->receiver, feedbackPayload, receiveTime, rtt, socket->applicationConstraints); PrrtReceiver_on_ack(socket->receiver, feedbackPayload, receiveTime, rtt, socket->applicationConstraints);
return; return;
...@@ -282,6 +286,7 @@ void *receive_data_loop(void *ptr) { ...@@ -282,6 +286,7 @@ void *receive_data_loop(void *ptr) {
PrrtSocket *s = ptr; PrrtSocket *s = ptr;
while (1) { while (1) {
PrrtPace_track_start(s->prrtReceivePace);
debug(DEBUG_DATARECEIVER, "About to receive."); debug(DEBUG_DATARECEIVER, "About to receive.");
XlapTimestampPlaceholder tsph1; XlapTimestampPlaceholder tsph1;
XlapTimestampPlaceholder tsph2; XlapTimestampPlaceholder tsph2;
...@@ -292,8 +297,13 @@ void *receive_data_loop(void *ptr) { ...@@ -292,8 +297,13 @@ void *receive_data_loop(void *ptr) {
struct timespec packet_recv_timestamp; struct timespec packet_recv_timestamp;
uint64_t packet_recv_cyclestamp = 0; uint64_t packet_recv_cyclestamp = 0;
PrrtPace_track_pause(s->prrtReceivePace);
receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp); receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
PrrtPace_track_resume(s->prrtReceivePace);
if (atomic_load_explicit(&s->closing, memory_order_acquire)) {
break;
}
debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec, debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr)); packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
...@@ -354,6 +364,7 @@ void *receive_data_loop(void *ptr) { ...@@ -354,6 +364,7 @@ void *receive_data_loop(void *ptr) {
debug(DEBUG_DATARECEIVER, "Cleanup"); debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(s); PrrtSocket_cleanup(s);
debug(DEBUG_DATARECEIVER, "Cleaned"); debug(DEBUG_DATARECEIVER, "Cleaned");
PrrtPace_track_end(s->prrtReceivePace);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
} }
......
...@@ -79,10 +79,9 @@ bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t lengt ...@@ -79,10 +79,9 @@ bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t lengt
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH]; uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
prrtPacketLength_t length = PrrtPacket_size(packet); PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
prrtPacketLength_t payloadLength = packet->payloadLength; bool paceSuccessful = PrrtSocket_pace(sock_ptr, true);
PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
bool paceSuccessful = PrrtSocket_pace(sock_ptr);
if (!paceSuccessful) { if (!paceSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Pacing interrupted."); debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
...@@ -90,8 +89,10 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -90,8 +89,10 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
} }
debug(DEBUG_DATATRANSMITTER, "Pacing interval passed."); debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");
PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size, bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
sock_ptr->applicationConstraints); sock_ptr->applicationConstraints);
PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
if(!waitSuccessful) { if(!waitSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted."); debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
...@@ -101,19 +102,37 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -101,19 +102,37 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) { if (sock_ptr->pacingEnabled) {
double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver); prrtTimedelta_t peerPacingTime = 0;
prrtTimedelta_t channelPacingTime = 0;
double pacing_rate = PrrtReceiver_get_BBR_pacingRate(sock_ptr->receiver);
double pacing_gain = PrrtReceiver_get_BBR_pacingGain(sock_ptr->receiver) * 0.9;
uint32_t state = PrrtReceiver_get_BBR_state(sock_ptr->receiver);
if(pacing_rate != 0) { if(pacing_rate != 0) {
prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate)); channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime); }
sock_ptr->nextSendTime = now + pacingTime; // Cross-Pace iff PROBE_DR and unity gain
if(sock_ptr->recv_peer_btl_pace != 0 && state == PROBE_DR) {
double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain);
if (pt > (TIMESTAMP_SPACE-1)) {
peerPacingTime = TIMESTAMP_SPACE-1;
} else {
peerPacingTime = (prrtTimedelta_t) pt;
}
} }
prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
} }
// Update timestamp // Update timestamp
prrtTimedelta_t btl_pace = MAX(MAX(PrrtPace_get_effective(sock_ptr->prrtTransmitPace), PrrtPace_get_effective(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) { if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver); ((PrrtPacketDataPayload*) (packet->payload))->btl_datarate = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
((PrrtPacketDataPayload*) (packet->payload))->btl_pace = btl_pace;
} else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) { } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketRedundancyPayload*) (packet->payload))->btl_pace = btl_pace;
} }
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small."); check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
...@@ -134,7 +153,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -134,7 +153,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
struct timespec timestamp; struct timespec timestamp;
uint64_t cyclestamp; uint64_t cyclestamp;
send_to_socket(sock_ptr, buf, length, &timestamp, &cyclestamp); send_to_socket(sock_ptr, buf, PrrtPacket_size(packet), &timestamp, &cyclestamp);
XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp); XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp); XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
...@@ -155,7 +174,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -155,7 +174,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
case PACKET_TYPE_CHANNEL_FEEDBACK: case PACKET_TYPE_CHANNEL_FEEDBACK:
default:; default:;
} }
return true; return true;
error: error:
...@@ -211,6 +229,7 @@ void retransmission_round_handler(void *arg) { ...@@ -211,6 +229,7 @@ void retransmission_round_handler(void *arg) {
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) { void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPace_track_start(sock_ptr->prrtTransmitPace);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart); XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart); XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (sock_ptr->receiveBlock == NULL) { if (sock_ptr->receiveBlock == NULL) {
...@@ -249,6 +268,7 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -249,6 +268,7 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
} else { } else {
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
} }
PrrtPace_track_end(sock_ptr->prrtTransmitPace);
} }
void *PrrtDataTransmitter_send_data_loop(void *ptr) { void *PrrtDataTransmitter_send_data_loop(void *ptr) {
......
...@@ -267,7 +267,7 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver, ...@@ -267,7 +267,7 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver,
PERROR("Mutex error.%s", ""); PERROR("Mutex error.%s", "");
} }
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) { double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingRate(receiver->bbr); double res = BBR_getPacingRate(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
...@@ -278,6 +278,28 @@ double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) { ...@@ -278,6 +278,28 @@ double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) {
return 0.0; return 0.0;
} }
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
uint32_t res = BBR_getState(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0;
}
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingGain(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0.0;
}
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver) { prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
prrtByteCount_t cwnd = BBR_getCwnd(receiver->bbr); prrtByteCount_t cwnd = BBR_getCwnd(receiver->bbr);
...@@ -330,7 +352,7 @@ uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *receiver, PrrtAppli ...@@ -330,7 +352,7 @@ uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *receiver, PrrtAppli
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver) { prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr); prrtDeliveryRate_t res = BBR_getBtlDatarate(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res; return res;
......
...@@ -40,13 +40,14 @@ void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_que ...@@ -40,13 +40,14 @@ void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_que
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *feedbackPayload, prrtTimestamp_t receiveTime, void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *feedbackPayload, prrtTimestamp_t receiveTime,
prrtTimedelta_t rtt, PrrtApplicationConstraints *constraints); prrtTimedelta_t rtt, PrrtApplicationConstraints *constraints);
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *socket, PrrtApplicationConstraints *applicationConstraints);
bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver, prrtByteCount_t maximum_payload_size, bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver, prrtByteCount_t maximum_payload_size,
PrrtApplicationConstraints *pConstraints); PrrtApplicationConstraints *pConstraints);
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *socket, PrrtApplicationConstraints *applicationConstraints);
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver); prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver); double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver);
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver); prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);
void PrrtReceiver_interrupt(PrrtReceiver *receiver); void PrrtReceiver_interrupt(PrrtReceiver *receiver);
......
This diff is collapsed.
#ifndef PRRT_SOCKET_H #ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H #define PRRT_SOCKET_H
#include "../defines.h" #include "../defines.h"
#include "../util/list.h" #include "../util/list.h"
#include "../util/pipe.h" #include "../util/pipe.h"
...@@ -9,6 +10,8 @@ ...@@ -9,6 +10,8 @@
#include "stores/dataPacketStore.h" #include "stores/dataPacketStore.h"
#include "stores/deliveredPacketTable.h" #include "stores/deliveredPacketTable.h"
#include "stores/packetTimeoutTable.h" #include "stores/packetTimeoutTable.h"
#include "stores/pace.h"
#include "stores/paceFilter.h"
#include "stores/receptionTable.h" #include "stores/receptionTable.h"
#include "stores/repairBlockStore.h" #include "stores/repairBlockStore.h"
#include "stores/packetDeliveryStore.h" #include "stores/packetDeliveryStore.h"
...@@ -76,6 +79,15 @@ typedef struct prrtSocket { ...@@ -76,6 +79,15 @@ typedef struct prrtSocket {
PrrtCodingConfiguration *codingParameters; PrrtCodingConfiguration *codingParameters;
PrrtCoder *coder; PrrtCoder *coder;
// Pacing
PrrtPace* appSendPace;
PrrtPace* prrtTransmitPace;
PrrtPace* prrtReceivePace;
PrrtPace* appDeliverPace;
prrtTimedelta_t send_peer_btl_pace;
prrtTimedelta_t recv_peer_btl_pace;
_Atomic (XlapTimestampTable *) tstable[2]; _Atomic (XlapTimestampTable *) tstable[2];
pthread_attr_t *sendDataThreadAttr; pthread_attr_t *sendDataThreadAttr;
...@@ -119,8 +131,7 @@ bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port); ...@@ -119,8 +131,7 @@ bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port);
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len); int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len); int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
bool PrrtSocket_pace(PrrtSocket *s); bool PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr); int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
...@@ -141,9 +152,9 @@ bool PrrtSocket_uses_thread_pinning(PrrtSocket *s); ...@@ -141,9 +152,9 @@ bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s); uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s); prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_btldatarate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_btldatarate_back(PrrtSocket *s);
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s); uint64_t PrrtSocket_get_full_datarate(PrrtSocket *s);
bool PrrtSocket_get_filled_pipe(PrrtSocket *s); bool PrrtSocket_get_filled_pipe(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s); uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s); uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s);
......
#include "pace.h"
#include "../clock.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../../util/time.h"
#include <math.h>
PrrtPace* PrrtPace_create(void) {
PrrtPace* pace = (PrrtPace*) calloc(1, sizeof(PrrtPace));
check_mem(pace);
prrtTimedelta_t filterLength_us = 2 * 1000 * 1000; // 2 seconds
pace->internalPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MIN);
pace->dependentPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MIN);
pace->externalPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MIN);
pace->totalPauseDuration_ns = 0;
pace->initialized = false;
pace->firstRoundDone = false;
clock_gettime(CLOCK_REALTIME, &pace->lastStartTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastEndTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
return pace;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
bool PrrtPace_destroy(PrrtPace* pace) {
if(pace->internalPace != NULL) {
check(PrrtPaceFilter_destroy(pace->internalPace), "Cannot destroy internalPace.")
}
if(pace->dependentPace != NULL) {
check(PrrtPaceFilter_destroy(pace->dependentPace), "Cannot destroy dependentPace.")
}
if(pace->externalPace != NULL) {
check(PrrtPaceFilter_destroy(pace->externalPace), "Cannot destroy externalPace.")
}
free(pace);
return true;
error:
return false;
}
prrtTimedelta_t PrrtPace_get_internal(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->internalPace);
}
prrtTimedelta_t PrrtPace_get_external(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->externalPace);
}
prrtTimedelta_t PrrtPace_get_dependent(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->dependentPace);
}
prrtTimedelta_t PrrtPace_get_total(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->internalPace) + PrrtPaceFilter_get(pace->externalPace);
}
prrtTimedelta_t PrrtPace_get_effective(PrrtPace* pace) {
return (prrtTimedelta_t) MAX(0, ((int64_t) PrrtPaceFilter_get(pace->externalPace)) + ((int64_t)PrrtPaceFilter_get(pace->internalPace)) - ((int64_t)PrrtPaceFilter_get(pace->dependentPace)));
}
void PrrtPace_track_start(PrrtPace* pace) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
if (pace->initialized) {
long long int internalDelta_ns = timedelta(&pace->lastEndTimestamp, &pace->lastStartTimestamp);
long long int externalDelta_ns = timedelta(&now, &pace->lastEndTimestamp);
if (pace->firstRoundDone) {
// make sure internal >= dependent
PrrtPaceFilter_update(pace->internalPace, (prrtTimedelta_t) MAX(0, round(((double) MAX(internalDelta_ns, pace->totalPauseDuration_ns)) / 1000)));
PrrtPaceFilter_update(pace->externalPace, (prrtTimedelta_t) MAX(0, round(((double) externalDelta_ns) / 1000)));
PrrtPaceFilter_update(pace->dependentPace, (prrtTimedelta_t) MAX(0, round(((double) pace->totalPauseDuration_ns) / 1000)));
}
pace->firstRoundDone = true;
}
pace->totalPauseDuration_ns = 0;
pace->lastStartTimestamp = now;
pace->initialized = true;
}
void PrrtPace_track_end(PrrtPace* pace) {
clock_gettime(CLOCK_REALTIME, &pace->lastEndTimestamp);
}
void PrrtPace_track_pause(PrrtPace* pace) {
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
}
void PrrtPace_track_resume(PrrtPace* pace) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int delta = timedelta(&now, &pace->lastPauseTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
pace->totalPauseDuration_ns += delta;
}
#ifndef PRRT_PACE_H
#define PRRT_PACE_H
#include "paceFilter.h"
typedef struct prrtPace {
PrrtPaceFilter* internalPace;
PrrtPaceFilter* externalPace;
PrrtPaceFilter* dependentPace;
struct timespec lastStartTimestamp;
struct timespec lastEndTimestamp;
struct timespec lastPauseTimestamp;
prrtTimedelta_t totalPauseDuration_ns;
bool initialized;
bool firstRoundDone;
} PrrtPace;
PrrtPace* PrrtPace_create(void);
bool PrrtPace_destroy(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_internal(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_external(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_dependent(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_total(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_effective(PrrtPace* pace);
void PrrtPace_track_start(PrrtPace* pace);
void PrrtPace_track_end(PrrtPace* pace);
void PrrtPace_track_pause(PrrtPace* pace);
void PrrtPace_track_resume(PrrtPace* pace);
#endif //PRRT_PACE_H
#include "paceFilter.h"
#include "../clock.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include <math.h>
#include <assert.h>
void invalidate(PrrtPaceFilter* filter) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
filter->valid = false;
filter->updated = now;
//filter->value = 0;
}
PrrtPaceFilter* PrrtPaceFilter_create(prrtTimedelta_t window_us, prrtPaceFilterType type) {
PrrtPaceFilter* filter = (PrrtPaceFilter*) calloc(1, sizeof(PrrtPaceFilter));
check_mem(filter);
assert(type < FILTER_TYPE__COUNT);
filter->type = type;
filter->updated = PrrtClock_get_current_time_us();
filter->valid = false;
filter->value = 0;
filter->window_us = window_us;
return filter;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
bool PrrtPaceFilter_destroy(PrrtPaceFilter* filter) {
free(filter);
return true;
}
prrtTimedelta_t PrrtPaceFilter_get(PrrtPaceFilter* filter) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (filter->updated + filter->window_us < now) {
invalidate(filter);
}
return filter->value;
}
static void PrrtPaceFilter_max_update(PrrtPaceFilter* filter, prrtTimedelta_t value) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (filter->valid == false) {
filter->value = value;
filter->valid = true;
filter->updated = now;
} else {
if (value > filter->value) {
filter->value = value;
filter->updated = now;
} else {
// ignore sample
}
}
if (filter->updated + filter->window_us < now) {
invalidate(filter);
}
}
static void PrrtPaceFilter_min_update(PrrtPaceFilter* filter, prrtTimedelta_t value) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (filter->valid == false) {
filter->value = value;
filter->valid = true;
filter->updated = now;
} else {
if (value < filter->value) {
filter->value = value;
filter->updated = now;
} else {
// ignore sample
}
}
if (filter->updated + filter->window_us < now) {
invalidate(filter);
}
}
typedef void (*PrrtPaceUpdateFunction)(PrrtPaceFilter *, prrtTimedelta_t);
static PrrtPaceUpdateFunction updates[] = {
[FILTER_TYPE_MIN] = PrrtPaceFilter_min_update,
[FILTER_TYPE_MAX] = PrrtPaceFilter_max_update,
NULL
};
void PrrtPaceFilter_update(PrrtPaceFilter *filter, prrtTimedelta_t value)
{
updates[filter->type](filter, value);
}
#ifndef PRRT_PACEFILTER_H
#define PRRT_PACEFILTER_H
#include"../types/packet.h"
typedef enum prrtPaceFilterType {
FILTER_TYPE_MIN, // maximum filter
FILTER_TYPE_MAX, // minimum filter
FILTER_TYPE__COUNT // only used for bounds checking
} prrtPaceFilterType;
typedef struct prrtPaceFilter {
prrtPaceFilterType type;