Skip to content
GitLab
Explore
Sign in
Commits on Source (267)
44b41686
Add first BBR pieces.
Mar 15, 2018
06afdefd
Fix Python bindings.
Mar 15, 2018
02539561
Fix BtlBw bindings.
Mar 15, 2018
e4cd39e1
Fix bps conversion. Add missing tracking field.
Mar 15, 2018
f58b26e5
Add time-sender and time-receiver
Mar 16, 2018
37f5e170
Improve BBR.
Mar 16, 2018
546873a1
Fix maximum payload length.
Mar 16, 2018
3f93b30f
Pacing is now an option.
Mar 16, 2018
606bbbc0
Fix float error. Smaller payloads.
Mar 16, 2018
117f862c
Add BBR state.
Mar 16, 2018
1799a7be
Revert BBR state.
Mar 16, 2018
518bac8b
Update time-apps.
Mar 19, 2018
bf36d136
Add channel time in evaluation.
Mar 19, 2018
a2371171
Add both to CSV.
Mar 19, 2018
a8884ece
Increase payload.
Mar 19, 2018
269d70ba
Increase payload.
Mar 19, 2018
368c430e
Time sender payload size is now configurable.
Mar 19, 2018
07b5494c
Fix assertion.
Mar 19, 2018
b10cd23a
Add support for TCP in time-{sender,receiver}
Mar 19, 2018
1833bee4
Fix indexing.
Mar 19, 2018
17dc6991
Fix memory leaks.
Mar 19, 2018
d3a1dda6
Fix memory problem.
Mar 19, 2018
f6420c8a
Fix SIGSEGV and reformat code.
Mar 20, 2018
3ff66af4
Add TCP as method.
Mar 20, 2018
cea54405
Fix Cmake.
Mar 20, 2018
6085dafc
Fix superflous sleep.
Mar 20, 2018
2614d54c
TCP time-apps.
Mar 20, 2018
afcb020a
Add size parameter to time-receiver.
Mar 20, 2018
37681fe7
Reduce sleep.
Mar 20, 2018
1a41c73a
Free buffer.
Mar 20, 2018
323cef5f
Fix memory problem.
Mar 20, 2018
7f88c93b
Update BBR.
Mar 21, 2018
bdc6bc20
Extract pacing rate. Update app_limited.
Mar 21, 2018
7c690edc
Fix app_limited criteria.
Mar 21, 2018
24236605
Update BBR.
Mar 21, 2018
61fb6772
Update time protocol.
Mar 21, 2018
56324b8a
Revert "Update time protocol."
Mar 21, 2018
4762a168
Add proper target delay.
Mar 21, 2018
42e04a48
Ordered wait.
Mar 21, 2018
90829abc
Receive window.
Mar 21, 2018
892d8089
Cwnd in bytes. Proper pacing.
Mar 21, 2018
5bb63d08
Add loss detection to BBR.
Mar 21, 2018
4c2bedbc
Call OnAck for all ACKs.
Mar 21, 2018
98a7103f
Move OnACK back.
Mar 21, 2018
4e1beda4
Asap in time-protocol.
Mar 21, 2018
90bb4cdb
Fixes.
Mar 21, 2018
c4695481
Pacing.
Mar 21, 2018
253f8338
Add locks to channel state information.
Mar 21, 2018
ce9f2b74
Improve BBR.
Mar 22, 2018
6db56833
Add cwnd control.
Mar 22, 2018
00c47278
Fix BBR.
Mar 22, 2018
7aef4f30
Expose app-limited. Space calculations.
Mar 22, 2018
a5b90be5
Enable pacing on demand.
Mar 22, 2018
7b0d48f8
Typo.
Mar 22, 2018
6002448a
Fix missing field.
Mar 22, 2018
1fa687f4
Enable pacing by default.
Mar 22, 2018
4288f189
Drop CVs from receiver.
Mar 22, 2018
faee8a34
Add minimum sender buffer
Mar 23, 2018
51c616ab
Merge branch 'develop' into feature/congestionControl
Mar 26, 2018
14f6eb21
Bps instead of bps.
Mar 26, 2018
efe6da33
Do not incorporate pacing in RTprop.
Mar 26, 2018
c8e456d6
Add RTprop measurement for redundancy packets.
Mar 26, 2018
18429509
Use packet before free.
Mar 26, 2018
92e34718
Btlbw is now also available at the receiver.
Apr 12, 2018
01b825e1
Use the same socket for data and feedback.
Apr 18, 2018
973501b7
Remove isSender flag.
Apr 18, 2018
00f430d8
Multiplex packets by type and not socket.
Apr 18, 2018
23934970
Merge branch 'feature/socketMerging' into feature/congestionControl
Apr 18, 2018
70af6898
Update xlap.
Apr 18, 2018
fbc5ed58
Timestamp appending to payload is now an option.
Apr 19, 2018
7bc0e9e9
Fix time-* programs.
Apr 19, 2018
e7808ea1
Python3 code examples.
Apr 19, 2018
385537fb
Merge branch 'develop' into feature/congestionControl
Apr 27, 2018
733996d1
Merge branch 'develop' into feature/congestionControl
Apr 27, 2018
bc42bc24
Merge branch 'develop' into feature/congestionControl
Jun 15, 2018
f76f6549
Merge branch 'develop' into feature/congestionControl
Jun 22, 2018
91b784dc
Fix MTU.
Jun 22, 2018
70070657
PrrtReceiver properly encapsulates BBR.
Jun 22, 2018
b0b0226b
Merge branch 'develop' into feature/congestionControl
Jun 22, 2018
ba811c2c
Remove call-trough.
Jun 22, 2018
085ff10d
Separate pacing wait from cwnd wait.
Jun 22, 2018
e6e59503
Refactor receiver. Add condition variable.
Jun 22, 2018
a891f072
Refactor handler to wait for cwnd space.
Jun 22, 2018
c1063ac4
Merge branch 'develop' into feature/congestionControl
Jun 22, 2018
3ea610e2
Fix binding.
Jun 22, 2018
0672d883
Use timedwait for space.
Jun 22, 2018
7650f453
Merge branch 'develop' into feature/congestionControl
Jun 25, 2018
d44b6c69
Merge branch 'develop' into feature/congestionControl
Jun 26, 2018
a08fa1d6
Merge branch 'develop' into feature/congestionControl
Jun 26, 2018
67f15d71
Refactor BBR to use maximum payload size.
Jun 26, 2018
45693578
Merge branch 'develop' into feature/congestionControl
Jun 28, 2018
535cf490
Merge branch 'develop' into feature/congestionControl
Jun 28, 2018
4dbc0b92
Fix btlbw.
Jun 28, 2018
c1f3364f
Merge branch 'develop' into feature/congestionControl
Jun 29, 2018
db6d95b3
Merge branch 'develop' into feature/congestionControl
Jul 09, 2018
e38e26d6
Refactor out pace method.
Jul 09, 2018
e8e952b8
Update Docker files.
Jul 09, 2018
7099d1af
Refactor windowed filter.
Jul 09, 2018
db7bcf84
Remove inline.
Jul 09, 2018
83bc59c4
Add NSDI changes.
Jul 09, 2018
Expand all
Show whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
b51f0a89
...
...
@@ -38,6 +38,17 @@ build:container:
-
docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
-
docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp
:
stage
:
build
tags
:
-
docker
script
:
-
export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
-
docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
-
docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
-
docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
-
docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem
:
stage
:
test
dependencies
:
...
...
README.md
View file @
b51f0a89
...
...
@@ -29,9 +29,9 @@ port = int(sys.argv[1])
s
=
prrt
.
PrrtSocket
(
port
=
port
)
while
True
:
d
=
s
.
recv
()
d
=
s
.
recv
()
.
decode
(
"
utf8
"
)
if
d
!=
"
Close
"
:
print
d
print
(
d
)
else
:
break
```
...
...
@@ -44,25 +44,27 @@ import prrt
host
=
sys
.
argv
[
1
]
port
=
int
(
sys
.
argv
[
2
])
localport
=
int
(
sys
.
argv
[
3
])
s
=
prrt
.
PrrtSocket
(
port
=
port
)
s
=
prrt
.
PrrtSocket
(
port
=
local
port
)
s
.
connect
(
host
,
port
)
for
i
in
range
(
10
):
s
.
send
(
"
Packet {}
"
.
format
(
i
))
s
.
send
(
"
Close
"
)
s
.
send
(
"
Packet {}
"
.
format
(
i
)
.
encode
(
"
utf8
"
)
)
s
.
send
(
"
Close
"
.
encode
(
"
utf8
"
)
)
```
Start the receiver by:
```
bash
python receiver.py 5000
python
3
receiver.py 5000
```
In a separate terminal, run:
```
bash
python sender.py 127.0.0.1 5000
python
3
sender.py 127.0.0.1 5000
6000
```
This should generate the following output in the receiver console:
...
...
dissect/prrt.lua
View file @
b51f0a89
...
...
@@ -25,16 +25,22 @@ local pf_data_length = ProtoField.uint32("prrt.data.length", "Length")
local
pf_data_timestamp
=
ProtoField
.
uint32
(
"prrt.data.timestamp"
,
"Timestamp"
)
local
pf_data_groupRTprop
=
ProtoField
.
uint32
(
"prrt.data.grouprtprop"
,
"Group RTprop"
)
local
pf_data_packettimeout
=
ProtoField
.
uint32
(
"prrt.data.packettimeout"
,
"Packet Timeout"
)
local
pf_data_btlbw
=
ProtoField
.
uint32
(
"prrt.data.btlbw"
,
"Bottleneck Bandwidth"
)
local
pf_data_btl_pace
=
ProtoField
.
uint32
(
"prrt.data.btl_pace"
,
"Bottleneck Pace"
)
local
pf_data_appSendTotal_pace
=
ProtoField
.
uint32
(
"prrt.data.appSendTotal_pace"
,
"Sender total application pace"
)
local
pf_red
=
ProtoField
.
new
(
"Redundancy"
,
"prrt.redundancy"
,
ftypes
.
BYTES
,
base
.
NONE
)
local
pf_red_baseSeqN
=
ProtoField
.
uint16
(
"prrt.redundancy.baseSequenceNumber"
,
"Base Sequence Number"
,
base
.
DEC
)
local
pf_red_timestamp
=
ProtoField
.
uint32
(
"prrt.redundancy.timestamp"
,
"Timestamp"
)
local
pf_red_btl_pace
=
ProtoField
.
uint32
(
"prrt.redundancy.btl_pace"
,
"Bottleneck Pace"
)
local
pf_red_appSendTotal_pace
=
ProtoField
.
uint32
(
"prrt.redundancy.appSendTotal_pace"
,
"Sender total application pace"
)
local
pf_red_n
=
ProtoField
.
uint8
(
"prrt.redundancy.n"
,
"n"
)
local
pf_red_k
=
ProtoField
.
uint8
(
"prrt.redundancy.k"
,
"k"
)
local
pf_fb
=
ProtoField
.
new
(
"Feedback"
,
"prrt.feedback"
,
ftypes
.
BYTES
,
base
.
NONE
)
local
pf_fb_groupRTT
=
ProtoField
.
uint32
(
"prrt.feedback.groupRTT"
,
"Group RTT"
)
local
pf_fb_ftt
=
ProtoField
.
uint32
(
"prrt.feedback.FTT"
,
"FTT"
)
local
pf_fb_btlPace
=
ProtoField
.
uint32
(
"prrt.feedback.btlPace"
,
"Bottleneck pace"
)
local
pf_fb_erasurecount
=
ProtoField
.
uint16
(
"prrt.feedback.erasureCount"
,
"Erasure count"
)
local
pf_fb_packetcount
=
ProtoField
.
uint16
(
"prrt.feedback.packetCount"
,
"Packet count"
)
local
pf_fb_gaplength
=
ProtoField
.
uint16
(
"prrt.feedback.gapLength"
,
"Gap length"
)
...
...
@@ -56,16 +62,22 @@ prrt_proto.fields = {
pf_data_timestamp
,
pf_data_groupRTprop
,
pf_data_packettimeout
,
pf_data_btlbw
,
pf_data_btl_pace
,
pf_data_appSendTotal_pace
,
pf_red
,
pf_red_baseSeqN
,
pf_red_timestamp
,
pf_red_btl_pace
,
pf_red_appSendTotal_pace
,
pf_red_n
,
pf_red_k
,
pf_fb
,
pf_fb_groupRTT
,
pf_fb_ftt
,
pf_fb_btlPace
,
pf_fb_erasurecount
,
pf_fb_packetcount
,
pf_fb_gaplength
,
...
...
@@ -89,9 +101,16 @@ local ex_type = Field.new("prrt.type")
local
function
getType
()
return
ex_type
()()
end
local
function
getTypeName
()
return
prrtPacketTypeNames
[
getType
()]
end
local
ex_index
=
Field
.
new
(
"prrt.index"
)
local
function
getIndex
()
return
ex_index
()()
end
local
ex_data_length
=
Field
.
new
(
"prrt.data.length"
)
local
function
getDataLength
()
return
ex_data_length
()()
end
local
ex_red_baseseqno
=
Field
.
new
(
"prrt.redundancy.baseSequenceNumber"
)
local
function
getRedBaseSeqNo
()
return
ex_red_baseseqno
()()
end
local
ex_red_n
=
Field
.
new
(
"prrt.redundancy.n"
)
local
function
getRedN
()
return
ex_red_n
()()
end
...
...
@@ -108,8 +127,11 @@ local function dissect_data(buffer, pinfo, root)
tree
:
add
(
pf_data_timestamp
,
buffer
:
range
(
4
,
4
))
tree
:
add
(
pf_data_groupRTprop
,
buffer
:
range
(
8
,
4
))
tree
:
add
(
pf_data_packettimeout
,
buffer
:
range
(
12
,
4
))
tree
:
add
(
pf_data_btlbw
,
buffer
:
range
(
16
,
4
))
tree
:
add
(
pf_data_btl_pace
,
buffer
:
range
(
20
,
4
))
tree
:
add
(
pf_data_appSendTotal_pace
,
buffer
:
range
(
24
,
4
))
local
label
=
"
DATA
Len="
..
getDataLength
()
local
label
=
"
[D] Idx="
..
getIndex
()
..
"
Len="
..
getDataLength
()
tree
:
set_text
(
label
)
pinfo
.
cols
.
info
:
set
(
label
)
end
...
...
@@ -118,10 +140,12 @@ local function dissect_redundancy(buffer, pinfo, root)
local
tree
=
root
:
add
(
pf_red
,
buffer
:
range
(
0
))
tree
:
add
(
pf_red_baseSeqN
,
buffer
:
range
(
0
,
2
))
tree
:
add
(
pf_red_timestamp
,
buffer
:
range
(
2
,
4
))
tree
:
add
(
pf_red_n
,
buffer
:
range
(
6
,
1
))
tree
:
add
(
pf_red_k
,
buffer
:
range
(
7
,
1
))
tree
:
add
(
pf_red_btl_pace
,
buffer
:
range
(
6
,
4
))
tree
:
add
(
pf_red_appSendTotal_pace
,
buffer
:
range
(
10
,
4
))
tree
:
add
(
pf_red_n
,
buffer
:
range
(
14
,
1
))
tree
:
add
(
pf_red_k
,
buffer
:
range
(
15
,
1
))
local
label
=
"
REDUNDANCY
n="
..
getRedN
()
..
" k="
..
getRedK
()
local
label
=
"
[R] Idx="
..
getIndex
()
..
" b="
..
getRedBaseSeqNo
()
..
"
n="
..
getRedN
()
..
" k="
..
getRedK
()
tree
:
set_text
(
label
)
pinfo
.
cols
.
info
:
set
(
label
)
end
...
...
@@ -130,16 +154,17 @@ local function dissect_feedback(buffer, pinfo, root)
local
tree
=
root
:
add
(
pf_fb
,
buffer
:
range
(
0
))
tree
:
add
(
pf_fb_groupRTT
,
buffer
:
range
(
0
,
4
))
tree
:
add
(
pf_fb_ftt
,
buffer
:
range
(
4
,
4
))
tree
:
add
(
pf_fb_erasurecount
,
buffer
:
range
(
8
,
2
))
tree
:
add
(
pf_fb_packetcount
,
buffer
:
range
(
10
,
2
))
tree
:
add
(
pf_fb_gaplength
,
buffer
:
range
(
12
,
2
))
tree
:
add
(
pf_fb_gapcount
,
buffer
:
range
(
14
,
2
))
tree
:
add
(
pf_fb_burstlength
,
buffer
:
range
(
16
,
2
))
tree
:
add
(
pf_fb_burstcount
,
buffer
:
range
(
18
,
2
))
tree
:
add
(
pf_fb_acktype
,
buffer
:
range
(
20
,
1
))
tree
:
add
(
pf_fb_ackSeqN
,
buffer
:
range
(
21
,
2
))
local
label
=
"FEEDBACK"
tree
:
add
(
pf_fb_btlPace
,
buffer
:
range
(
8
,
4
))
tree
:
add
(
pf_fb_erasurecount
,
buffer
:
range
(
12
,
2
))
tree
:
add
(
pf_fb_packetcount
,
buffer
:
range
(
14
,
2
))
tree
:
add
(
pf_fb_gaplength
,
buffer
:
range
(
16
,
2
))
tree
:
add
(
pf_fb_gapcount
,
buffer
:
range
(
18
,
2
))
tree
:
add
(
pf_fb_burstlength
,
buffer
:
range
(
20
,
2
))
tree
:
add
(
pf_fb_burstcount
,
buffer
:
range
(
22
,
2
))
tree
:
add
(
pf_fb_acktype
,
buffer
:
range
(
24
,
1
))
tree
:
add
(
pf_fb_ackSeqN
,
buffer
:
range
(
25
,
2
))
local
label
=
"[F]"
tree
:
set_text
(
label
)
pinfo
.
cols
.
info
:
set
(
label
)
end
...
...
docker/Dockerfile
View file @
b51f0a89
FROM
gcc:
5
FROM
gcc:
8
MAINTAINER
Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN
apt-get update
\
&&
apt-get upgrade
-y
\
&&
apt-get
install
-y
cmake
ENV
DEBIAN_FRONTEND noninteractive
RUN
apt-get update
&&
apt-get
install
--yes
--force-yes
\
bc
\
cmake
\
iperf3
\
psmisc
\
traceroute
\
tshark
COPY
CMakeLists.txt /prrt/
COPY
prrt /prrt/prrt
COPY
tests /prrt/tests
COPY
docker/entrypoint.sh /
COPY
docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR
/prrt
RUN
cmake
.
\
&&
make
ENV
PATH /prrt/bin:$PATH
WORKDIR
/prrt/bin
ENV
PATH /prrt:$PATH
VOLUME
/output
...
...
docker/Dockerfile_tcp
0 → 100644
View file @
b51f0a89
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
iperf3 \
psmisc \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake -DTCP=1 . \
&& make
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
docker/entrypoint.sh
View file @
b51f0a89
...
...
@@ -5,13 +5,15 @@ dev=eth0
command
=
$1
shift
if
[[
"
$command
"
==
"sender"
||
"
$command
"
==
"receiver"
]]
;
then
if
[[
"
$command
"
==
"sender"
||
"
$command
"
==
"receiver"
||
"
$command
"
==
"time-sender"
||
"
$command
"
==
"time-receiver"
]]
;
then
:
else
echo
"Command should be either sender or receiver."
exit
0
;
fi
TARGET
=
"127.0.0.1"
OUTPUT
=
"/dev/null"
NETEM
=()
PRRT
=()
while
[[
$#
-gt
0
]]
...
...
@@ -19,7 +21,24 @@ do
key
=
"
$1
"
case
$key
in
-t
|
--target
|
-p
|
--port
|
-r
|
--rounds
)
-t
|
--target
)
if
[[
"
$command
"
==
"sender"
||
"
$command
"
==
"time-sender"
]]
;
then
PRRT+
=(
"
$1
$2
"
)
fi
TARGET
=(
"
$2
"
)
shift
shift
;;
-w
|
--wireshark
)
OUTPUT
=(
"
$2
"
)
shift
shift
;;
-T
|
--threadpinning
|
-U
)
PRRT+
=(
"
$1
"
)
shift
;;
-p
|
--port
|
-r
|
--rounds
|
-s
|
--size
|
-R
|
--rcvbuf
|
-S
|
--sndbuf
|
-o
|
--output
|
-a
|
--appdelay
|
-j
|
--appjitter
)
PRRT+
=(
"
$1
$2
"
)
shift
shift
...
...
@@ -34,8 +53,32 @@ done
PRRT_PARAMS
=
"
${
PRRT
[@]
}
"
NETEM_PARAMS
=
"
${
NETEM
[@]
}
"
echo
"Starting Wireshark."
tshark
-i
eth0
-w
$OUTPUT
.pcap &
TSHARK_PID
=
$!
sleep
2
start
=
$(
date
+%s.%N
)
;
echo
"Checking reachability of
$TARGET
."
until
ping
-c1
$TARGET
&>/dev/null
;
do
sleep
1
;
done
dur
=
$(
echo
"
$(
date
+%s.%N
)
-
$start
"
| bc
)
;
printf
"Reachable after %.6f seconds
\n
"
$dur
if
[[
"
$command
"
==
"sender"
||
"
$command
"
==
"time-sender"
]]
;
then
echo
"Delaying
$command
"
sleep
10
fi
start
=
$(
date
+%s.%N
)
;
echo
"Running PRRT with command:
\"
$command
$PRRT_PARAMS
\"
and link parameters:
\"
$NETEM_PARAMS
\"
"
tc qdisc add dev
$dev
root netem
$NETEM_PARAMS
/prrt/
$command
$PRRT_PARAMS
-o
/output/log.csv
tc qdisc del dev
$dev
root
#tc qdisc add dev $dev root netem $NETEM_PARAMS
trap
'echo "Caught SIGINT."; echo "$(ps -a)"; killall -SIGINT $command'
INT
LOG
=
$(
/prrt/
$command
$PRRT_PARAMS
2>&1
)
printf
"
$LOG
\n
"
echo
"Exit status:
$?
"
dur
=
$(
echo
"
$(
date
+%s.%N
)
-
$start
"
| bc
)
;
printf
"Done after %.6f seconds
\n
"
$dur
#tc qdisc del dev $dev root
kill
$TSHARK_PID
docker/sysctl.conf
0 → 100644
View file @
b51f0a89
net
.
ipv6
.
conf
.
all
.
disable_ipv6
=
1
net
.
ipv6
.
conf
.
default
.
disable_ipv6
=
1
net
.
ipv6
.
conf
.
lo
.
disable_ipv6
=
1
prrt/CMakeLists.txt
View file @
b51f0a89
...
...
@@ -10,14 +10,26 @@ if (XLAP)
add_definitions
(
-DXLAP
)
endif
()
option
(
TCP
"Set time protocol to TCP."
)
if
(
TCP
)
add_definitions
(
-DTCP
)
endif
()
add_subdirectory
(
proto
)
add_subdirectory
(
util
)
add_executable
(
sender sender.c
)
add_executable
(
receiver receiver.c
)
add_executable
(
refcount refcount.c
)
add_executable
(
time-sender time-sender.c
)
add_executable
(
time-receiver time-receiver.c
)
target_link_libraries
(
sender LINK_PUBLIC PRRT UTIL
${
CMAKE_THREAD_LIBS_INIT
}
)
target_link_libraries
(
receiver LINK_PUBLIC PRRT UTIL
${
CMAKE_THREAD_LIBS_INIT
}
)
target_link_libraries
(
time-sender LINK_PUBLIC PRRT UTIL
${
CMAKE_THREAD_LIBS_INIT
}
)
target_link_libraries
(
time-receiver LINK_PUBLIC PRRT UTIL
${
CMAKE_THREAD_LIBS_INIT
}
)
add_executable
(
refcount refcount.c
)
target_link_libraries
(
refcount LINK_PUBLIC PRRT UTIL
${
CMAKE_THREAD_LIBS_INIT
}
)
prrt/cprrt.pxd
View file @
b51f0a89
...
...
@@ -119,11 +119,12 @@ cdef extern from "proto/socket.h":
ctypedef
prrtSocket
PrrtSocket
cdef
PrrtSocket
*
PrrtSocket_create
(
const
uint32_t
m
tu
,
const
uint32_t
target_delay
)
cdef
PrrtSocket
*
PrrtSocket_create
(
const
uint32_t
m
aximum_payload_size
,
const
uint32_t
target_delay
)
bint
PrrtSocket_bind
(
PrrtSocket
*
sock_ptr
,
const_char
*
ipAddress
,
const
uint16_t
port
)
int
PrrtSocket_close
(
const
PrrtSocket
*
sock_ptr
)
int
PrrtSocket_connect
(
PrrtSocket
*
sock_ptr
,
const_char
*
host
,
const
uint16_t
port
)
int
PrrtSocket_send
(
PrrtSocket
*
sock_ptr
,
const
uint8_t
*
data
,
const
size_t
data_len
)
int
PrrtSocket_send_sync
(
PrrtSocket
*
sock_ptr
,
const
uint8_t
*
data
,
const
size_t
data_len
)
int
PrrtSocket_send_async
(
PrrtSocket
*
sock_ptr
,
const
uint8_t
*
data
,
const
size_t
data_len
)
int32_t
PrrtSocket_recv
(
PrrtSocket
*
sock_ptr
,
void
*
buf_ptr
,
sockaddr
*
addr
)
nogil
int32_t
PrrtSocket_receive_asap
(
PrrtSocket
*
s
,
void
*
buf_ptr
,
sockaddr
*
addr
)
nogil
...
...
@@ -144,11 +145,23 @@ cdef extern from "proto/socket.h":
uint32_t
PrrtSocket_get_rtprop_fwd
(
PrrtSocket
*
socket
)
float
PrrtSocket_get_plr_fwd
(
PrrtSocket
*
socket
)
uint32_t
PrrtSocket_get_delivery_rate_fwd
(
PrrtSocket
*
socket
)
uint32_t
PrrtSocket_get_btlbw_fwd
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_btlbw_back
(
PrrtSocket
*
s
);
bint
PrrtSocket_get_app_limited
(
PrrtSocket
*
socket
)
uint32_t
PrrtSocket_get_bbr_state
(
PrrtSocket
*
s
)
uint64_t
PrrtSocket_get_full_bw
(
PrrtSocket
*
s
)
bint
PrrtSocket_get_filled_pipe
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_cycle_index
(
PrrtSocket
*
s
)
float
PrrtSocket_get_pacing_gain
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_cwnd
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_inflight
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_pacing_rate
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_send_quantum
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_pipe
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_delivered
(
PrrtSocket
*
s
)
bint
PrrtSocket_get_bbr_round_start
(
PrrtSocket
*
s
)
uint32_t
PrrtSocket_get_bbr_app_limited
(
PrrtSocket
*
socket
)
bint
PrrtSocket_get_bbr_is_app_limited
(
PrrtSocket
*
socket
)
bint
PrrtSocket_enable_thread_pinning
(
PrrtSocket
*
socket
)
cdef
extern
from
"
proto/stores/packetDeliveryStore.h
"
:
...
...
prrt/defines.h
View file @
b51f0a89
...
...
@@ -30,8 +30,8 @@
#define GF_BITS 8
#define K_START
4
#define N_START
7
#define K_START
1
#define N_START
1
#define N_P_START 1
#define RRT_ALPHA 0.125
...
...
prrt/proto/CMakeLists.txt
View file @
b51f0a89
set
(
PRRT_SOURCES ../defines.h
types/block.c types/block.h
types/channelStateInformation.c types/channelStateInformation.h
bbr.c bbr.h
clock.c clock.h
types/codingParams.c types/codingParams.h
receiver.c receiver.h
socket.c socket.h
t
ypes/applicationConstraints.c types/applicationConstraints
.h
t
imer.c timer
.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h
)
types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h
)
if
(
XLAP
)
set
(
PRRT_SOURCES
${
PRRT_SOURCES
}
../xlap/xlap.c ../xlap/xlap.h
)
...
...
prrt/proto/bbr.c
0 → 100644
View file @
b51f0a89
#include
"bbr.h"
#include
"../util/dbg.h"
#include
"../util/common.h"
#include
"receiver.h"
#include
<math.h>
prrtByteCount_t
BBR_Inflight
(
BBR
*
bbr
,
double
gain
)
{
if
(
bbr
->
rtprop
==
RTprop_Inf
)
return
bbr
->
initial_cwnd
;
/* no valid RTT samples yet */
uint32_t
quanta
=
bbr
->
mps
;
uint32_t
estimated_bdp
=
(
uint32_t
)
round
((((
double
)
bbr
->
bw
)
*
bbr
->
rtprop
)
/
(
1000
*
1000
));
return
(
uint32_t
)(
gain
*
estimated_bdp
+
quanta
);
}
void
BBR_EnterStartup
(
BBR
*
bbr
)
{
bbr
->
state
=
STARTUP
;
bbr
->
pacing_gain
=
BBRHighGain
;
bbr
->
cwnd_gain
=
BBRHighGain
;
}
void
BBR_UpdateBtlBw
(
BBR
*
bbr
,
PrrtRateSample
*
rs
,
PrrtPacketTracking
*
tracking
)
{
if
(
tracking
->
delivered
>=
bbr
->
next_round_delivered
)
{
bbr
->
next_round_delivered
=
tracking
->
delivered
;
bbr
->
round_count
++
;
bbr
->
round_start
=
true
;
}
else
{
bbr
->
round_start
=
false
;
}
uint32_t
delivery_rate_Bps
=
(
uint32_t
)((
float
)
rs
->
delivery_rate
);
if
((
delivery_rate_Bps
>=
bbr
->
bw
||
!
rs
->
is_app_limited
)
&&
delivery_rate_Bps
!=
0
)
{
bbr
->
bw
=
(
uint32_t
)
WindowedFilter_push
(
bbr
->
btlBwFilter
,
delivery_rate_Bps
);
debug
(
DEBUG_BBR
,
"Current BtlBw: %u, RS delivery rate: %u"
,
bbr
->
bw
,
delivery_rate_Bps
);
}
}
void
BBR_CheckFullPipe
(
BBR
*
bbr
,
PrrtRateSample
*
rs
)
{
if
(
bbr
->
filled_pipe
||
!
bbr
->
round_start
||
rs
->
is_app_limited
)
return
;
// no need to check for a full pipe now
if
(
bbr
->
bw
>=
bbr
->
full_bw
*
PROBE_GAIN
)
{
// BBR.BtlBw still growing?
bbr
->
full_bw
=
bbr
->
bw
;
// record new baseline level
bbr
->
full_bw_count
=
0
;
return
;
}
bbr
->
full_bw_count
++
;
// another round w/o much growth
if
(
bbr
->
full_bw_count
>=
3
)
bbr
->
filled_pipe
=
true
;
}
bool
BBR_IsNextCyclePhase
(
BBR
*
bbr
,
prrtByteCount_t
bytes_lost
,
prrtByteCount_t
prior_inflight
)
{
bool
is_full_length
=
(
PrrtClock_get_current_time_us
()
-
bbr
->
cycle_stamp
)
>
bbr
->
rtprop
;
if
(
bbr
->
pacing_gain
==
1
)
return
is_full_length
;
if
(
bbr
->
pacing_gain
>
1
)
return
is_full_length
&&
(
bytes_lost
>
0
||
prior_inflight
>=
BBR_Inflight
(
bbr
,
bbr
->
pacing_gain
));
bool
is_max_length
=
(
PrrtClock_get_current_time_us
()
-
bbr
->
cycle_stamp
)
>
4
*
bbr
->
rtprop
;
return
is_max_length
||
(
prior_inflight
<=
BBR_Inflight
(
bbr
,
1
.
0
));
}
void
BBR_AdvanceCyclePhase
(
BBR
*
bbr
)
{
bbr
->
cycle_stamp
=
PrrtClock_get_current_time_us
();
bbr
->
cycle_index
=
(
uint8_t
)((
bbr
->
cycle_index
+
1
)
%
BBRGainCycleLen
);
float
pacing_gain_cycle
[
BBRGainCycleLen
]
=
{
PROBE_GAIN
,
DRAIN_GAIN
,
1
.
0
,
1
.
0
,
1
.
0
,
1
.
0
,
1
.
0
,
1
.
0
};
bbr
->
pacing_gain
=
pacing_gain_cycle
[
bbr
->
cycle_index
];
debug
(
DEBUG_BBR
,
"Advanced cycle with gain: %f"
,
bbr
->
pacing_gain
);
}
void
BBR_CheckCyclePhase
(
BBR
*
bbr
,
prrtByteCount_t
bytes_lost
,
prrtByteCount_t
prior_inflight
)
{
if
(
bbr
->
state
==
PROBE_BW
&&
BBR_IsNextCyclePhase
(
bbr
,
bytes_lost
,
prior_inflight
))
BBR_AdvanceCyclePhase
(
bbr
);
}
void
BBR_EnterProbeBW
(
BBR
*
bbr
)
{
bbr
->
state
=
PROBE_BW
;
bbr
->
pacing_gain
=
1
;
bbr
->
cwnd_gain
=
2
;
bbr
->
cycle_index
=
(
uint8_t
)(
BBRGainCycleLen
-
1
-
(
random
()
%
7
));
BBR_AdvanceCyclePhase
(
bbr
);
}
void
BBR_CheckDrain
(
BBR
*
bbr
,
prrtByteCount_t
bytes_inflight
)
{
if
(
bbr
->
state
==
STARTUP
&&
bbr
->
filled_pipe
)
{
//Drain
bbr
->
state
=
DRAIN
;
bbr
->
pacing_gain
=
1
/
BBRHighGain
;
// pace slowly
bbr
->
cwnd_gain
=
BBRHighGain
;
// maintain cwnd
}
if
(
bbr
->
state
==
DRAIN
&&
bytes_inflight
<=
BBR_Inflight
(
bbr
,
1
.
0
))
BBR_EnterProbeBW
(
bbr
);
// we estimate queue is drained
}
void
BBR_ExitProbeRTT
(
BBR
*
bbr
)
{
if
(
bbr
->
filled_pipe
)
BBR_EnterProbeBW
(
bbr
);
else
BBR_EnterStartup
(
bbr
);
}
uint32_t
BBR_SaveCwnd
(
BBR
*
bbr
)
{
if
(
!
bbr
->
is_loss_recovery
&&
bbr
->
state
!=
PROBE_RTT
)
return
bbr
->
cwnd
;
return
MAX
(
bbr
->
prior_cwnd
,
bbr
->
cwnd
);
}
void
BBR_RestoreCwnd
(
BBR
*
bbr
)
{
bbr
->
cwnd
=
MAX
(
bbr
->
cwnd
,
bbr
->
prior_cwnd
);
}
void
BBR_UpdateRTprop
(
BBR
*
bbr
,
prrtTimedelta_t
rtt
)
{
bbr
->
rtprop_expired
=
PrrtClock_get_current_time_us
()
>
(
bbr
->
rtprop_stamp
+
RTpropFilterLen
);
if
(
rtt
>=
0
&&
(
rtt
<=
bbr
->
rtprop
||
bbr
->
rtprop_expired
))
{
bbr
->
rtprop
=
rtt
;
bbr
->
rtprop_stamp
=
PrrtClock_get_current_time_us
();
}
}
void
BBR_EnterProbeRTT
(
BBR
*
bbr
)
{
bbr
->
state
=
PROBE_RTT
;
bbr
->
pacing_gain
=
1
;
bbr
->
cwnd_gain
=
1
;
}
void
BBR_HandleProbeRTT
(
BBR
*
bbr
,
PrrtPacketTracking
*
tracking
)
{
tracking
->
app_limited
=
(
tracking
->
delivered
+
tracking
->
pipe
)
?
:
1
;
/* Ignore low rate samples during ProbeRTT: */
prrtTimestamp_t
now
=
PrrtClock_get_current_time_us
();
if
(
bbr
->
probe_rtt_done_stamp
==
0
&&
tracking
->
pipe
<=
bbr
->
min_pipe_cwnd
)
{
bbr
->
probe_rtt_done_stamp
=
now
+
ProbeRTTDuration
;
bbr
->
probe_rtt_round_done
=
false
;
bbr
->
next_round_delivered
=
tracking
->
delivered
;
}
else
if
(
bbr
->
probe_rtt_done_stamp
!=
0
)
{
if
(
bbr
->
round_start
)
{
bbr
->
probe_rtt_round_done
=
true
;
}
if
(
bbr
->
probe_rtt_round_done
&&
(
now
>
bbr
->
probe_rtt_done_stamp
))
{
bbr
->
rtprop_stamp
=
now
;
BBR_RestoreCwnd
(
bbr
);
BBR_ExitProbeRTT
(
bbr
);
}
}
}
void
BBR_CheckProbeRTT
(
BBR
*
bbr
,
PrrtPacketTracking
*
tracking
)
{
if
(
bbr
->
state
!=
PROBE_RTT
&&
bbr
->
rtprop_expired
&&
!
bbr
->
idle_restart
)
{
BBR_EnterProbeRTT
(
bbr
);
BBR_SaveCwnd
(
bbr
);
bbr
->
probe_rtt_done_stamp
=
0
;
}
if
(
bbr
->
state
==
PROBE_RTT
)
{
BBR_HandleProbeRTT
(
bbr
,
tracking
);
}
bbr
->
idle_restart
=
false
;
}
void
BBR_UpdateModelAndState
(
BBR
*
bbr
,
PrrtRateSample
*
rs
,
PrrtPacketTracking
*
packetTracking
,
prrtTimedelta_t
rtt
)
{
BBR_UpdateBtlBw
(
bbr
,
rs
,
packetTracking
);
BBR_CheckCyclePhase
(
bbr
,
packetTracking
->
bytes_lost
,
packetTracking
->
prior_inflight
);
BBR_CheckFullPipe
(
bbr
,
rs
);
BBR_CheckDrain
(
bbr
,
packetTracking
->
pipe
);
BBR_UpdateRTprop
(
bbr
,
rtt
);
BBR_CheckProbeRTT
(
bbr
,
packetTracking
);
}
void
BBR_UpdateTargetCwnd
(
BBR
*
bbr
)
{
bbr
->
target_cwnd
=
BBR_Inflight
(
bbr
,
bbr
->
cwnd_gain
);
}
void
BBR_ModulateCwndForProbeRTT
(
BBR
*
bbr
)
{
if
(
bbr
->
state
==
PROBE_RTT
)
bbr
->
cwnd
=
MIN
(
bbr
->
cwnd
,
bbr
->
min_pipe_cwnd
);
}
void
BBR_ModulateCwndForRecovery
(
BBR
*
bbr
,
prrtByteCount_t
bytes_lost
,
prrtByteCount_t
pipe
,
prrtByteCount_t
delivered
)
{
if
(
bytes_lost
>
0
)
{
if
(
bbr
->
cwnd
>
bytes_lost
)
{
bbr
->
cwnd
=
MAX
(
bbr
->
cwnd
-
bytes_lost
,
bbr
->
mps
);
}
else
{
bbr
->
cwnd
=
bbr
->
mps
;
}
}
if
(
bbr
->
packet_conservation
)
bbr
->
cwnd
=
MAX
(
bbr
->
cwnd
,
pipe
+
delivered
);
}
void
BBR_SetCwnd
(
BBR
*
bbr
,
PrrtPacketTracking
*
packetTracking
)
{
BBR_UpdateTargetCwnd
(
bbr
);
BBR_ModulateCwndForRecovery
(
bbr
,
packetTracking
->
bytes_lost
,
packetTracking
->
pipe
,
packetTracking
->
delivered
);
if
(
!
bbr
->
packet_conservation
)
{
if
(
bbr
->
filled_pipe
)
bbr
->
cwnd
=
MIN
(
bbr
->
cwnd
+
packetTracking
->
delivered
,
bbr
->
target_cwnd
);
else
if
(
bbr
->
cwnd
<
bbr
->
target_cwnd
||
packetTracking
->
delivered
<
bbr
->
initial_cwnd
)
bbr
->
cwnd
=
bbr
->
cwnd
+
packetTracking
->
delivered
;
bbr
->
cwnd
=
MAX
(
bbr
->
cwnd
,
bbr
->
min_pipe_cwnd
);
}
BBR_ModulateCwndForProbeRTT
(
bbr
);
debug
(
DEBUG_BBR
,
"New cwnd: %u, State: %u"
,
bbr
->
cwnd
,
bbr
->
state
);
}
void
BBR_SetPacingRateWithGain
(
BBR
*
bbr
,
double
pacing_gain
)
{
double
rate
=
(
pacing_gain
*
((
double
)
bbr
->
bw
));
debug
(
DEBUG_BBR
,
"Current rate: %f, Pacing gain: %f, BtlBw: %u, Calc Rate: %f, Filled pipe: %u"
,
bbr
->
pacing_rate
,
pacing_gain
,
bbr
->
bw
,
rate
,
bbr
->
filled_pipe
);
if
(
rate
!=
0
&&
(
bbr
->
filled_pipe
||
rate
>
bbr
->
pacing_rate
))
bbr
->
pacing_rate
=
rate
;
}
void
BBR_SetPacingRate
(
BBR
*
bbr
)
{
BBR_SetPacingRateWithGain
(
bbr
,
bbr
->
pacing_gain
);
}
void
BBR_SetSendQuantum
(
BBR
*
bbr
)
{
if
(
bbr
->
pacing_rate
<
150000
)
{
// 1.2Mbps = 0.15 MBps = 150000 Bps
bbr
->
send_quantum
=
1
*
bbr
->
mps
;
}
else
if
(
bbr
->
pacing_rate
<
3000000
)
{
// 24 Mbps = 20 * 1.2Mbps = 3000000
bbr
->
send_quantum
=
2
*
bbr
->
mps
;
}
else
{
bbr
->
send_quantum
=
MIN
((
prrtByteCount_t
)
round
((
double
)
bbr
->
pacing_rate
/
1000
),
64000
);
}
}
void
BBR_OnACK
(
BBR
*
bbr
,
PrrtChannelStateInformation
*
csi
,
PrrtRateSample
*
rs
,
PrrtPacketTracking
*
packetTracking
,
prrtTimedelta_t
rtt
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
BBR_UpdateModelAndState
(
bbr
,
rs
,
packetTracking
,
rtt
);
BBR_SetPacingRate
(
bbr
);
BBR_SetSendQuantum
(
bbr
);
BBR_SetCwnd
(
bbr
,
packetTracking
);
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"BBR_OnACK failed."
)
}
void
BBR_OnSpuriousLoss
(
BBR
*
bbr
,
PrrtPacketTracking
*
tracking
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
if
(
!
bbr
->
is_loss_recovery
)
{
bbr
->
is_loss_recovery
=
true
;
bbr
->
loss_recovery_stamp
=
PrrtClock_get_current_time_us
();
bbr
->
prior_cwnd
=
BBR_SaveCwnd
(
bbr
);
bbr
->
cwnd
=
tracking
->
pipe
+
MAX
(
tracking
->
delivered
,
1
);
bbr
->
packet_conservation
=
true
;
}
else
if
(
PrrtClock_get_current_time_us
()
>
bbr
->
loss_recovery_stamp
+
bbr
->
rtprop
){
bbr
->
packet_conservation
=
false
;
}
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"BBR_OnACK failed."
)
}
void
BBR_OnRTOLoss
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
if
(
!
bbr
->
is_loss_recovery
)
{
bbr
->
is_loss_recovery
=
true
;
bbr
->
prior_cwnd
=
BBR_SaveCwnd
(
bbr
);
bbr
->
cwnd
=
bbr
->
mps
;
}
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"BBR_OnACK failed."
)
}
void
BBR_OnLossExit
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
if
(
bbr
->
is_loss_recovery
)
{
bbr
->
is_loss_recovery
=
false
;
bbr
->
packet_conservation
=
false
;
BBR_RestoreCwnd
(
bbr
);
}
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"BBR_OnACK failed."
)
}
BBR
*
BBR_Init
(
prrtByteCount_t
maximum_payload_size
)
{
BBR
*
bbr
=
calloc
(
1
,
sizeof
(
BBR
));
check_mem
(
bbr
);
pthread_mutexattr_t
attr
;
check
(
pthread_mutexattr_init
(
&
attr
)
==
EXIT_SUCCESS
,
"Mutex attr init failed."
);
check
(
pthread_mutexattr_settype
(
&
attr
,
PTHREAD_MUTEX_RECURSIVE
)
==
EXIT_SUCCESS
,
"Setting type failed."
);
check
(
pthread_mutex_init
(
&
bbr
->
lock
,
&
attr
)
==
0
,
"lock init failed."
);
bbr
->
mps
=
maximum_payload_size
;
bbr
->
min_pipe_cwnd
=
4
*
maximum_payload_size
;
bbr
->
initial_cwnd
=
4
*
maximum_payload_size
;
bbr
->
has_seen_rtt
=
false
;
bbr
->
btlBwFilter
=
WindowedFilter_create
(
true
,
10
);
bbr
->
rtprop
=
RTprop_Inf
;
bbr
->
rtprop_stamp
=
PrrtClock_get_current_time_us
();
bbr
->
probe_rtt_done_stamp
=
0
;
bbr
->
probe_rtt_round_done
=
false
;
bbr
->
packet_conservation
=
false
;
bbr
->
prior_cwnd
=
0
;
bbr
->
cwnd
=
bbr
->
initial_cwnd
;
bbr
->
idle_restart
=
false
;
bbr
->
is_loss_recovery
=
false
;
//Init round counting
bbr
->
next_round_delivered
=
0
;
bbr
->
round_start
=
false
;
bbr
->
round_count
=
0
;
//Init full pipe
bbr
->
filled_pipe
=
false
;
bbr
->
full_bw
=
0
;
bbr
->
full_bw_count
=
0
;
//Init pacing rate
double
nominal_bandwidth
=
bbr
->
initial_cwnd
/
(
bbr
->
has_seen_rtt
?
bbr
->
rtprop
:
1000
);
bbr
->
pacing_rate
=
bbr
->
pacing_gain
*
nominal_bandwidth
;
BBR_EnterStartup
(
bbr
);
return
bbr
;
error:
PERROR
(
"Failed to init BBR%s."
,
""
);
return
NULL
;
}
void
BBR_destroy
(
BBR
*
bbr
)
{
WindowedFilter_destroy
(
bbr
->
btlBwFilter
);
check
(
pthread_mutex_destroy
(
&
bbr
->
lock
)
==
0
,
"lock destroy failed."
);
free
(
bbr
);
return
;
error:
PERROR
(
"BBR_destroy failed%s."
,
""
);
}
double
BBR_getPacingRate
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
double
res
=
bbr
->
pacing_rate
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getPacingRate failed."
)
return
0
;
}
prrtByteCount_t
BBR_getCwnd
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
prrtByteCount_t
res
=
bbr
->
cwnd
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getCwnd failed."
)
return
0
;
}
prrtDeliveryRate_t
BBR_getBtlBw
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
prrtDeliveryRate_t
res
=
bbr
->
bw
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getBtlBw failed."
)
return
0
;
}
uint32_t
BBR_getRTProp
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
uint32_t
res
=
bbr
->
rtprop
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getRTProp failed."
)
return
0
;
}
uint32_t
BBR_getState
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
uint32_t
res
=
bbr
->
state
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getState failed."
)
return
0
;
}
uint32_t
BBR_getCycleIndex
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
uint32_t
res
=
bbr
->
cycle_index
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getCycleIndex failed."
)
return
0
;
}
double
BBR_getPacingGain
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
double
res
=
bbr
->
pacing_gain
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getPacingGain failed."
)
return
0
;
}
bool
BBR_getFilledPipe
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
bool
res
=
bbr
->
filled_pipe
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getFilledPipe failed."
)
return
0
;
}
bool
BBR_getRoundStart
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
bool
res
=
bbr
->
round_start
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getRoundStart failed."
)
return
0
;
}
prrtByteCount_t
BBR_getFullBw
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
prrtByteCount_t
res
=
bbr
->
full_bw
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getInflight failed."
)
return
0
;
}
prrtByteCount_t
BBR_getInflight
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
prrtByteCount_t
res
=
BBR_Inflight
(
bbr
,
bbr
->
pacing_gain
);
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getInflight failed."
)
return
0
;
}
prrtByteCount_t
BBR_getSendQuantum
(
BBR
*
bbr
)
{
check
(
pthread_mutex_lock
(
&
bbr
->
lock
)
==
0
,
"Lock failed."
);
prrtByteCount_t
res
=
bbr
->
send_quantum
;
check
(
pthread_mutex_unlock
(
&
bbr
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"BBR_getSendQuantum failed."
)
return
0
;
}
\ No newline at end of file
prrt/proto/bbr.h
0 → 100644
View file @
b51f0a89
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include
<stdint.h>
#include
"stdbool.h"
#include
"types/packet.h"
#include
"clock.h"
#include
"types/channelStateInformation.h"
#include
"types/packetTracking.h"
#include
"types/rateSample.h"
#include
"../util/windowedFilter.h"
#define PROBE_GAIN 1.1
#define DRAIN_GAIN 0.9
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define RTprop_Inf UINT32_MAX
enum
bbr_state
{
STARTUP
,
DRAIN
,
PROBE_BW
,
PROBE_RTT
};
typedef
struct
bbr
{
pthread_mutex_t
lock
;
prrtByteCount_t
mps
;
prrtByteCount_t
min_pipe_cwnd
;
prrtByteCount_t
initial_cwnd
;
prrtTimedelta_t
rtprop
;
prrtTimestamp_t
rtprop_stamp
;
prrtTimestamp_t
probe_rtt_done_stamp
;
bool
probe_rtt_round_done
;
bool
packet_conservation
;
prrtByteCount_t
prior_cwnd
;
prrtByteCount_t
cwnd
;
prrtByteCount_t
target_cwnd
;
bool
idle_restart
;
enum
bbr_state
state
;
double
pacing_gain
;
float
cwnd_gain
;
bool
filled_pipe
;
prrtByteCount_t
full_bw
;
uint32_t
full_bw_count
;
double
pacing_rate
;
bool
has_seen_rtt
;
uint32_t
next_round_delivered
;
bool
round_start
;
uint32_t
round_count
;
uint32_t
next_rtt_delivered
;
uint32_t
rtt_count
;
bool
rtprop_expired
;
bool
is_loss_recovery
;
prrtTimestamp_t
loss_recovery_stamp
;
prrtTimestamp_t
cycle_stamp
;
uint8_t
cycle_index
;
float
*
pacing_gain_cycle
;
prrtByteCount_t
send_quantum
;
prrtDeliveryRate_t
bw
;
WindowedFilter
*
btlBwFilter
;
}
BBR
;
BBR
*
BBR_Init
(
prrtByteCount_t
maximum_payload_size
);
void
BBR_OnACK
(
BBR
*
bbr
,
PrrtChannelStateInformation
*
csi
,
PrrtRateSample
*
rs
,
PrrtPacketTracking
*
packetTracking
,
prrtTimedelta_t
rtt
);
void
BBR_OnSpuriousLoss
(
BBR
*
bbr
,
PrrtPacketTracking
*
tracking
);
void
BBR_OnRTOLoss
(
BBR
*
bbr
);
void
BBR_OnLossExit
(
BBR
*
bbr
);
void
BBR_destroy
(
BBR
*
bbr
);
double
BBR_getPacingRate
(
BBR
*
bbr
);
prrtByteCount_t
BBR_getCwnd
(
BBR
*
bbr
);
prrtDeliveryRate_t
BBR_getBtlBw
(
BBR
*
bbr
);
uint32_t
BBR_getState
(
BBR
*
bbr
);
prrtByteCount_t
BBR_getFullBw
(
BBR
*
bbr
);
double
BBR_getPacingGain
(
BBR
*
bbr
);
uint32_t
BBR_getCycleIndex
(
BBR
*
bbr
);
bool
BBR_getFilledPipe
(
BBR
*
bbr
);
uint32_t
BBR_getRTProp
(
BBR
*
bbr
);
prrtByteCount_t
BBR_getInflight
(
BBR
*
bbr
);
prrtByteCount_t
BBR_getSendQuantum
(
BBR
*
bbr
);
bool
BBR_getRoundStart
(
BBR
*
bbr
);
#endif //PRRT_BBR_H
prrt/proto/processes/dataReceiver.c
View file @
b51f0a89
...
...
@@ -4,24 +4,27 @@
#include
"../../defines.h"
#include
"../../util/dbg.h"
#include
"../../util/common.h"
#include
"../../util/time.h"
#include
"../types/lossStatistics.h"
#include
"../types/block.h"
#include
"../clock.h"
#include
"../socket.h"
#include
"dataReceiver.h"
static
void
retrieve_data_block
s
(
PrrtSocket
*
sock_ptr
,
static
void
retrieve_data_
packets_for_
block
(
PrrtSocket
*
sock_ptr
,
prrtSequenceNumber_t
base_seqno
,
uint8_t
k
,
const
PrrtBlock
*
block
)
{
List
*
res
=
List_create
();
prrtSequenceNumber_t
last_seqno
=
(
prrtSequenceNumber_t
)
(
base_seqno
+
k
-
1
);
debug
(
DEBUG_BLOCK
,
"Size: %d"
,
PrrtDataPacketStore_size
(
sock_ptr
->
dataPacketStore
));
PrrtDataPacketStore_remove_range
(
sock_ptr
->
dataPacketStore
,
res
,
base_seqno
,
(
prrtSequenceNumber_t
)
(
base_seqno
+
k
-
1
));
last_seqno
);
debug
(
DEBUG_BLOCK
,
"Retrieve %d packets in range: %u-%u."
,
List_count
(
res
),
base_seqno
,
last_seqno
);
LIST_FOREACH
(
res
,
first
,
next
,
cur
)
{
PrrtPacket
*
packet
Ptr
=
cur
->
value
;
check
(
PrrtBlock_insert_data_packet
((
PrrtBlock
*
)
block
,
packet
Ptr
),
"Insert failed!"
)
PrrtPacket
*
packet
=
cur
->
value
;
check
(
PrrtBlock_insert_data_packet
((
PrrtBlock
*
)
block
,
packet
),
"Insert failed!"
)
}
List_destroy
(
res
);
return
;
...
...
@@ -32,6 +35,10 @@ static void retrieve_data_blocks(PrrtSocket *sock_ptr,
static
void
decode_block
(
PrrtSocket
*
sock_ptr
,
PrrtBlock
*
block
)
{
if
(
block
!=
NULL
&&
PrrtBlock_decode_ready
(
block
))
{
bool
data_relevant
=
PrrtDeliveredPacketTable_test_is_block_relevant
(
sock_ptr
->
deliveredPacketTable
,
block
->
baseSequenceNumber
,
block
->
codingParams
->
n
);
if
(
data_relevant
)
{
check
(
PrrtBlock_decode
(
block
),
"Decoding failed"
);
while
(
List_count
(
block
->
dataPackets
)
>
0
)
{
...
...
@@ -43,7 +50,7 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
PrrtPacket_destroy
(
pkt
);
}
}
}
PrrtRepairBlockStore_delete
(
sock_ptr
->
repairBlockStore
,
block
->
baseSequenceNumber
);
PrrtBlock_destroy
(
block
);
}
...
...
@@ -67,7 +74,6 @@ static bool send_feedback(PrrtSocket *sock_ptr,
kind
=
ts_redundancy_packet
;
}
debug
(
DEBUG_FEEDBACK
,
"Send feedback %d %d"
,
type
,
seqno
);
XlapTimeStampCycle
(
sock_ptr
,
kind
,
seqno
,
SendFeedbackStart
);
prrtFeedback_t
feedback
=
{
...
...
@@ -95,20 +101,23 @@ static bool send_feedback(PrrtSocket *sock_ptr,
PrrtLossStatistics
stats
=
sock_ptr
->
lossStatistics
;
int
group_RTT
=
0
;
// TODO: To be determined.
uint32_t
local_bottleneck_pace
=
MAX
(
PrrtPace_get_effective
(
sock_ptr
->
appDeliverPace
),
PrrtPace_get_effective
(
sock_ptr
->
prrtReceivePace
));
PrrtPacket
*
feedback_pkt_ptr
=
PrrtPacket_create_feedback_packet
(
0
,
sock_ptr
->
sequenceNumberFeedback
++
,
group_RTT
,
stats
.
gapLength
,
stats
.
gapCount
,
stats
.
burstLength
,
stats
.
burstCount
,
forwardTripTime
,
stats
.
erasureCount
,
stats
.
packetCount
,
feedback
.
seqNo
,
feedback
.
typ
e
);
feedback
.
seqNo
,
feedback
.
type
,
local_bottleneck_pac
e
);
prrtPacketLength_t
length
=
PrrtPacket_size
(
feedback_pkt_ptr
);
void
*
buf
=
calloc
(
1
,
length
);
check_mem
(
buf
);
check
(
PrrtPacket_encode
(
buf
,
MAX_PAYLOAD_LENGTH
,
feedback_pkt_ptr
),
"Buffer for encoding feedback is too small"
);
pthread_setcancelstate
(
PTHREAD_CANCEL_DISABLE
,
NULL
);
check
(
sendto
(
sock_ptr
->
socketFd
,
buf
,
length
,
0
,
(
struct
sockaddr
*
)
&
targetaddr
,
sizeof
(
targetaddr
))
==
length
,
"Sending feedback failed."
);
pthread_setcancelstate
(
PTHREAD_CANCEL_ENABLE
,
NULL
);
free
(
buf
);
PrrtPacket_destroy
(
feedback_pkt_ptr
);
...
...
@@ -122,35 +131,40 @@ static bool send_feedback(PrrtSocket *sock_ptr,
return
false
;
}
static
bool
is_timeout
(
prrtTimestamp_t
now
,
prrtTimestamp_t
to
)
{
/* TODO: implement */
return
false
;
}
static
void
handle_data_packet
(
PrrtSocket
*
sock_ptr
,
PrrtPacket
*
packet
)
{
PrrtPacketDataPayload
*
payload
=
packet
->
payload
;
prrtTimestamp_t
sentTimestamp
=
payload
->
timestamp
;
sock_ptr
->
lastSentTimestamp
=
sentTimestamp
;
PrrtClock_update
(
&
sock_ptr
->
clock
,
sentTimestamp
,
payload
->
groupRTprop_us
);
debug
(
DEBUG_DATARECEIVER
,
"Timeout: %lu"
,
payload
->
packetTimeout_us
);
PrrtPacketTimeout
*
packetTimeout
=
PrrtPacketTimeout_create
(
packet
);
check
(
PrrtPacketTimeoutTable_insert
(
sock_ptr
->
packetTimeoutTable
,
packetTimeout
),
"Could not insert data packet."
);
prrtSequenceNumber_t
seqno
=
packet
->
sequenceNumber
;
PrrtReceptionTable_mark_received
(
sock_ptr
->
dataReceptionTable
,
seqno
);
PrrtReceptionTable_mark_received
(
sock_ptr
->
dataReceptionTable
,
seqno
,
sentTimestamp
);
prrtTimestamp_t
now
=
PrrtClock_get_prrt_time_us
(
&
sock_ptr
->
clock
);
if
(
is_timeout
(
now
,
payload
->
packetTimeout_us
))
{
prrtTimestamp_t
now
=
PrrtClock_get_current_time_us
();
if
(
PrrtTimestamp_cmp
(
now
,
payload
->
packetTimeout_us
)
>
0
)
{
debug
(
DEBUG_DATARECEIVER
,
"Timeout data packet %u (%lu > %lu)"
,
seqno
,
(
unsigned
long
)
now
,
(
unsigned
long
)
payload
->
packetTimeout_us
);
PrrtDeliveredPacketTable_test_set_is_number_relevant
(
sock_ptr
->
deliveredPacketTable
,
packet
->
sequenceNumber
);
PrrtPacket_destroy
(
packet
);
debug
(
DEBUG_RECEIVER
,
"timeout data packet %u (%lu > %lu)"
,
seqno
,
(
unsigned
long
)
now
,
(
unsigned
long
)
payload
->
packetTimeout_us
);
}
else
if
(
!
PrrtDeliveredPacketTable_test_set_is_number_relevant
(
sock_ptr
->
deliveredPacketTable
,
packet
->
sequenceNumber
))
{
debug
(
DEBUG_DATARECEIVER
,
"Not relevant: %u"
,
seqno
);
PrrtPacket_destroy
(
packet
);
}
else
{
PrrtChannelStateInformation_update_delivery_rate
(
sock_ptr
->
senderChannelStateInformation
,
payload
->
btlbw
);
sock_ptr
->
send_peer_btl_pace
=
payload
->
btl_pace
;
sock_ptr
->
send_peer_app_total_pace
=
payload
->
appSendTotal_pace
;
sock_ptr
->
rtt
=
payload
->
groupRTprop_us
;
prrtSequenceNumber_t
baseSequenceNumber
=
packet
->
sequenceNumber
-
packet
->
index
;
// forward to application layer
debug
(
DEBUG_DATARECEIVER
,
"Forward: %u"
,
seqno
);
PrrtPacketDeliveryStore_insert
(
sock_ptr
->
packetDeliveryStore
,
packet
);
PrrtPacket
*
reference
=
PrrtPacket_copy
(
packet
);
// forward to application layer
...
...
@@ -159,18 +173,21 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtBlock
*
block
=
PrrtRepairBlockStore_get_block
(
sock_ptr
->
repairBlockStore
,
baseSequenceNumber
);
if
(
block
!=
NULL
)
{
check
(
PrrtBlock_insert_data_packet
(
block
,
reference
),
"Inserting failed: %d, %d"
,
baseSequenceNumber
,
seqno
);
if
(
PrrtBlock_insert_data_packet
(
block
,
reference
))
{
decode_block
(
sock_ptr
,
block
);
}
else
{
PrrtPacket_destroy
(
reference
);
}
}
else
{
debug
(
DEBUG_DATARECEIVER
,
"Inserting data packet %d for later."
,
reference
->
sequenceNumber
);
if
(
PrrtDataStore_insert
(
sock_ptr
->
dataPacketStore
,
reference
)
==
false
)
{
debug
(
DEBUG_DATARECEIVER
,
"Failed to insert %d."
,
reference
->
sequenceNumber
);
PrrtPacket_destroy
(
reference
);
}
else
{
debug
(
DEBUG_DATARECEIVER
,
"Inserted %d."
,
reference
->
sequenceNumber
);
}
}
// forward to application layer
debug
(
DEBUG_DATARECEIVER
,
"forward %u"
,
seqno
);
XlapTimeStampClock
(
sock_ptr
,
ts_data_packet
,
seqno
,
PrrtReturnPackage
);
XlapTimeStampCycle
(
sock_ptr
,
ts_data_packet
,
seqno
,
PrrtReturnPackage
);
}
...
...
@@ -181,9 +198,13 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
}
static
void
handle_redundancy_packet
(
PrrtSocket
*
socket
,
PrrtPacket
*
packet
)
{
PrrtPacketRedundancyPayload
*
payload
=
packet
->
payload
;
prrtTimestamp_t
sentTimestamp
=
payload
->
timestamp
;
socket
->
lastSentTimestamp
=
sentTimestamp
;
PrrtPacketRedundancyPayload
*
redundancyPayload
=
packet
->
payload
;
PrrtReceptionTable_mark_received
(
socket
->
redundancyReceptionTable
,
packet
->
sequenceNumber
);
PrrtReceptionTable_mark_received
(
socket
->
redundancyReceptionTable
,
packet
->
sequenceNumber
,
redundancyPayload
->
timestamp
);
if
(
!
PrrtDeliveredPacketTable_test_is_block_relevant
(
socket
->
deliveredPacketTable
,
redundancyPayload
->
baseSequenceNumber
,
...
...
@@ -192,6 +213,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
}
else
{
PrrtBlock
*
block
=
PrrtRepairBlockStore_get_block
(
socket
->
repairBlockStore
,
redundancyPayload
->
baseSequenceNumber
);
socket
->
send_peer_btl_pace
=
payload
->
btl_pace
;
if
(
block
==
NULL
)
{
uint8_t
n_cycle
[
1
]
=
{
redundancyPayload
->
n
-
redundancyPayload
->
k
};
PrrtCodingConfiguration
*
codingParams
=
PrrtCodingConfiguration_create
(
redundancyPayload
->
k
,
...
...
@@ -201,9 +223,9 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
redundancyPayload
->
baseSequenceNumber
);
PrrtRepairBlockStore_insert
(
socket
->
repairBlockStore
,
block
);
}
retrieve_data_blocks
(
socket
,
redundancyPayload
->
baseSequenceNumber
,
block
->
codingParams
->
k
,
block
);
retrieve_data_packets_for_block
(
socket
,
redundancyPayload
->
baseSequenceNumber
,
block
->
codingParams
->
k
,
block
);
}
if
(
PrrtBlock_insert_redundancy_packet
(
block
,
packet
))
{
decode_block
(
socket
,
block
);
...
...
@@ -213,31 +235,16 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
}
}
void
handle_feedback_packet
(
PrrtSocket
*
prrtS
ocket
,
PrrtPacket
*
p
rrtP
acket
,
prrtTimestamp_t
receiveTime
)
{
check
(
p
rrtP
acket
!=
NULL
,
"Cannot be null"
);
void
handle_feedback_packet
(
PrrtSocket
*
s
ocket
,
PrrtPacket
*
packet
,
prrtTimestamp_t
receiveTime
)
{
check
(
packet
!=
NULL
,
"Cannot be null"
);
debug
(
DEBUG_DATARECEIVER
,
"handle_feedback_packet"
);
PrrtPacketFeedbackPayload
*
feedbackPayload
=
(
PrrtPacketFeedbackPayload
*
)
p
rrtP
acket
->
payload
;
PrrtPacketFeedbackPayload
*
feedbackPayload
=
(
PrrtPacketFeedbackPayload
*
)
packet
->
payload
;
prrtTimestamp_t
forwardTripTimestamp
=
feedbackPayload
->
forwardTripTimestamp_us
;
bool
valid_sample
=
PrrtReceiver_updateAndGenerateRateSample
(
prrtSocket
->
receiver
,
feedbackPayload
->
ackSequenceNumber
,
feedbackPayload
->
ackPacketType
,
receiveTime
);
debug
(
DEBUG_DATARECEIVER
,
"PrrtReceiver_updateAndGenerateRateSample "
);
prrtTimedelta_t
rtt
=
(
prrtTimedelta_t
)
(
receiveTime
-
forwardTripTimestamp
);
socket
->
recv_peer_btl_pace
=
feedbackPayload
->
btl_pace
;
if
(
valid_sample
)
{
PrrtChannelStateInformation_update_delivery_rate
(
prrtSocket
->
receiver
->
csi
,
prrtSocket
->
receiver
->
rateSample
->
delivery_rate
);
}
PrrtChannelStateInformation_update_app_limited
(
prrtSocket
->
receiver
->
csi
,
prrtSocket
->
receiver
->
rateSample
->
is_app_limited
);
debug
(
DEBUG_DATARECEIVER
,
"PrrtChannelStateInformation_update_app_limited "
);
PrrtChannelStateInformation_update_rtprop
(
prrtSocket
->
receiver
->
csi
,
(
prrtTimedelta_t
)
(
receiveTime
-
forwardTripTimestamp
));
debug
(
DEBUG_DATARECEIVER
,
"PrrtChannelStateInformation_update_rtprop "
);
PrrtChannelStateInformation_update_plr
(
prrtSocket
->
receiver
->
csi
,
feedbackPayload
->
erasureCount
,
feedbackPayload
->
packetCount
);
debug
(
DEBUG_DATARECEIVER
,
"PrrtChannelStateInformation_update_plr "
);
PrrtReceiver_on_ack
(
socket
->
receiver
,
feedbackPayload
,
receiveTime
,
rtt
,
socket
->
applicationConstraints
);
return
;
error:
...
...
@@ -297,6 +304,8 @@ void *receive_data_loop(void *ptr) {
PrrtSocket
*
s
=
ptr
;
while
(
1
)
{
PrrtPace_track_start
(
s
->
prrtReceivePace
);
debug
(
DEBUG_DATARECEIVER
,
"About to receive."
);
XlapTimestampPlaceholder
tsph1
;
XlapTimestampPlaceholder
tsph2
;
XlapTimestampPlaceholder
tsph3
;
...
...
@@ -306,8 +315,11 @@ void *receive_data_loop(void *ptr) {
struct
timespec
packet_recv_timestamp
;
uint64_t
packet_recv_cyclestamp
=
0
;
PrrtPace_track_pause
(
s
->
prrtReceivePace
);
receive_from_socket
(
s
,
buffer
,
&
n
,
&
remote
,
&
addrlen
,
&
packet_recv_timestamp
,
&
packet_recv_cyclestamp
);
if
(
PrrtSocket_closing
(
s
))
{
pthread_setcancelstate
(
PTHREAD_CANCEL_DISABLE
,
NULL
);
PrrtPace_track_resume
(
s
->
prrtReceivePace
);
if
(
atomic_load_explicit
(
&
s
->
closing
,
memory_order_acquire
))
{
break
;
}
...
...
@@ -327,6 +339,7 @@ void *receive_data_loop(void *ptr) {
prrtPacketType_t
packetType
=
PrrtPacket_type
(
packet
);
debug
(
DEBUG_DATARECEIVER
,
"received packet %d:%u"
,
(
int
)
packetType
,
seqno
);
packet
->
channelReceive
=
packet_recv_timestamp
;
enum
XlapTimestampPacketKind
kind
=
ts_any_packet
;
prrtTimestamp_t
sentTimestamp
;
...
...
@@ -369,9 +382,9 @@ void *receive_data_loop(void *ptr) {
debug
(
DEBUG_DATARECEIVER
,
"Cleanup"
);
PrrtSocket_cleanup
(
s
);
debug
(
DEBUG_DATARECEIVER
,
"Cleaned"
);
PrrtPace_track_end
(
s
->
prrtReceivePace
);
pthread_setcancelstate
(
PTHREAD_CANCEL_ENABLE
,
NULL
);
}
PrrtSocket_cleanup
(
s
);
return
NULL
;
error:
PERROR
(
"receive_data_loop() ended unexpectedly."
);
...
...
prrt/proto/processes/dataTransmitter.c
View file @
b51f0a89
...
...
@@ -2,12 +2,15 @@
#include
<netdb.h>
#include
<string.h>
#include
"../../defines.h"
#include
"../timer.h"
#include
"../receiver.h"
#include
"../socket.h"
#include
"../types/block.h"
#include
"../../util/dbg.h"
#include
"../../util/common.h"
#include
"../../util/time.h"
#include
"dataTransmitter.h"
#include
<math.h>
bool
send_to_socket
(
PrrtSocket
*
sock_ptr
,
uint8_t
*
buf
,
prrtPacketLength_t
length
,
struct
timespec
*
packet_timestamp
,
uint64_t
*
packet_clockstamp
)
{
PrrtReceiver
*
recv
=
sock_ptr
->
receiver
;
...
...
@@ -76,7 +79,65 @@ bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t lengt
static
bool
send_packet
(
PrrtSocket
*
sock_ptr
,
PrrtPacket
*
packet
)
{
uint8_t
buf
[
MAX_PAYLOAD_LENGTH
];
memset
(
buf
,
0
,
sizeof
(
buf
));
prrtPacketLength_t
length
=
PrrtPacket_size
(
packet
);
PrrtPace_track_pause
(
sock_ptr
->
prrtTransmitPace
);
bool
paceSuccessful
=
PrrtSocket_pace
(
sock_ptr
,
true
);
PrrtPace_track_resume
(
sock_ptr
->
prrtTransmitPace
);
if
(
!
paceSuccessful
)
{
debug
(
DEBUG_DATATRANSMITTER
,
"Pacing interrupted."
);
PrrtPacket_destroy
(
packet
);
return
false
;
}
debug
(
DEBUG_DATATRANSMITTER
,
"Pacing interval passed."
);
PrrtPace_track_pause
(
sock_ptr
->
prrtTransmitPace
);
bool
waitSuccessful
=
PrrtReceiver_wait_for_space
(
sock_ptr
->
receiver
,
sock_ptr
->
maximum_payload_size
,
sock_ptr
->
applicationConstraints
);
PrrtPace_track_resume
(
sock_ptr
->
prrtTransmitPace
);
if
(
!
waitSuccessful
)
{
debug
(
DEBUG_DATATRANSMITTER
,
"Wait for space interrupted."
);
PrrtPacket_destroy
(
packet
);
return
false
;
}
debug
(
DEBUG_DATATRANSMITTER
,
"Space available."
);
prrtTimestamp_t
now
=
PrrtClock_get_current_time_us
();
if
(
sock_ptr
->
pacingEnabled
)
{
prrtTimedelta_t
peerPacingTime
=
0
;
prrtTimedelta_t
channelPacingTime
=
0
;
double
pacing_rate
=
PrrtReceiver_get_BBR_pacingRate
(
sock_ptr
->
receiver
);
double
pacing_gain
=
PrrtReceiver_get_BBR_pacingGain
(
sock_ptr
->
receiver
)
*
0
.
9
;
uint32_t
state
=
PrrtReceiver_get_BBR_state
(
sock_ptr
->
receiver
);
if
(
pacing_rate
!=
0
)
{
channelPacingTime
=
(
prrtTimedelta_t
)
round
(((
1000
*
1000
*
((
double
)
packet
->
payloadLength
))
/
pacing_rate
));
}
// Cross-Pace iff PROBE_BW and unity gain
if
(
sock_ptr
->
recv_peer_btl_pace
!=
0
&&
state
==
PROBE_BW
)
{
double
pt
=
round
(((
double
)
sock_ptr
->
recv_peer_btl_pace
)
/
pacing_gain
);
if
(
pt
>
(
TIMESTAMP_SPACE
-
1
))
{
peerPacingTime
=
TIMESTAMP_SPACE
-
1
;
}
else
{
peerPacingTime
=
(
prrtTimedelta_t
)
pt
;
}
}
prrtTimedelta_t
pacingTime
=
MAX
(
channelPacingTime
,
peerPacingTime
);
debug
(
DEBUG_DATATRANSMITTER
,
"Payload: %u, PacingRate: %f, Pacing Time: %u"
,
packet
->
payloadLength
,
pacing_rate
,
pacingTime
);
sock_ptr
->
nextSendTime
=
now
+
pacingTime
;
}
// Update timestamp
prrtTimedelta_t
btl_pace
=
MAX
(
MAX
(
PrrtPace_get_effective
(
sock_ptr
->
prrtTransmitPace
),
PrrtPace_get_effective
(
sock_ptr
->
appSendPace
)),
PrrtSocket_get_sock_opt
(
sock_ptr
,
"nw_pace"
));
prrtTimedelta_t
appSendTotal_pace
=
PrrtPace_get_total
(
sock_ptr
->
appSendPace
);
if
(
PrrtPacket_type
(
packet
)
==
PACKET_TYPE_DATA
)
{
((
PrrtPacketDataPayload
*
)
(
packet
->
payload
))
->
timestamp
=
PrrtClock_get_current_time_us
();
((
PrrtPacketDataPayload
*
)
(
packet
->
payload
))
->
btlbw
=
PrrtReceiver_get_BBR_btlDr
(
sock_ptr
->
receiver
);
((
PrrtPacketDataPayload
*
)
(
packet
->
payload
))
->
btl_pace
=
btl_pace
;
((
PrrtPacketDataPayload
*
)
(
packet
->
payload
))
->
appSendTotal_pace
=
appSendTotal_pace
;
}
else
if
(
PrrtPacket_type
(
packet
)
==
PACKET_TYPE_REDUNDANCY
)
{
((
PrrtPacketRedundancyPayload
*
)
(
packet
->
payload
))
->
timestamp
=
PrrtClock_get_current_time_us
();
((
PrrtPacketRedundancyPayload
*
)
(
packet
->
payload
))
->
btl_pace
=
btl_pace
;
((
PrrtPacketRedundancyPayload
*
)
(
packet
->
payload
))
->
appSendTotal_pace
=
appSendTotal_pace
;
}
check
(
PrrtPacket_encode
(
buf
,
MAX_PAYLOAD_LENGTH
,
packet
),
"Buffer too small."
);
...
...
@@ -96,7 +157,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
struct
timespec
timestamp
;
uint64_t
cyclestamp
;
send_to_socket
(
sock_ptr
,
buf
,
length
,
&
timestamp
,
&
cyclestamp
);
send_to_socket
(
sock_ptr
,
buf
,
PrrtPacket_size
(
packet
)
,
&
timestamp
,
&
cyclestamp
);
XlapTimeStampValue
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
ChannelTransmit
,
timestamp
);
XlapCycleStampValue
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
ChannelTransmit
,
cyclestamp
);
...
...
@@ -117,7 +178,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
case
PACKET_TYPE_CHANNEL_FEEDBACK
:
default:
;
}
return
true
;
error:
...
...
@@ -125,57 +185,112 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return
false
;
}
void
*
send_data_loop
(
void
*
ptr
)
{
PrrtSocket
*
sock_ptr
=
ptr
;
PrrtBlock
*
block
=
NULL
;
typedef
struct
timer_arg
{
PrrtSocket
*
socket
;
PrrtBlock
*
block
;
}
RetransmissionTimerArgs
;
while
(
1
)
{
ListNode
*
job
;
do
{
job
=
Pipe_pull
(
sock_ptr
->
sendDataQueue
);
if
(
PrrtSocket_closing
(
sock_ptr
))
{
if
(
block
!=
NULL
)
{
void
retransmission_round_handler
(
void
*
arg
)
{
uint8_t
j
;
RetransmissionTimerArgs
*
args
=
(
RetransmissionTimerArgs
*
)
arg
;
PrrtBlock
*
block
=
args
->
block
;
PrrtSocket
*
socket
=
args
->
socket
;
if
(
block
->
inRound
>
0
)
{
PrrtReceiver_rto_check
(
socket
->
receiver
,
socket
->
applicationConstraints
);
}
if
(
PrrtSocket_closing
(
socket
)
||
block
->
inRound
>=
block
->
codingParams
->
c
)
{
PrrtBlock_destroy
(
block
);
free
(
arg
);
return
;
}
return
NULL
;
uint32_t
redundancyPackets
=
block
->
codingParams
->
n_cycle
[
block
->
inRound
];
for
(
j
=
0
;
j
<
redundancyPackets
;
j
++
)
{
PrrtPacket
*
red_pkt
=
PrrtBlock_get_first_red_data
(
block
);
bool
sendResult
=
send_packet
(
socket
,
red_pkt
);
if
(
!
sendResult
)
{
debug
(
DEBUG_DATATRANSMITTER
,
"Sending redundancy data failed."
);
PrrtBlock_destroy
(
block
);
free
(
arg
);
return
;
}
}
while
(
!
job
);
PrrtPacket
*
packet
=
PrrtPacket_byListNode
(
job
);
}
block
->
inRound
++
;
PrrtTimerTask
task
=
{
.
arg
=
arg
,
.
fun
=
retransmission_round_handler
};
uint32_t
waittime_us
=
PrrtReceiver_get_retransmission_delay
(
socket
->
receiver
,
socket
->
applicationConstraints
);
prrtTimerDate
deadline
=
abstime_from_now
(
waittime_us
);
debug
(
DEBUG_DATATRANSMITTER
,
"Set timer to expire in: %dus"
,
waittime_us
);
PrrtTimer_submit
(
socket
->
retransmissionTimer
,
&
deadline
,
&
task
);
}
void
PrrtDataTransmitter_transmit
(
PrrtSocket
*
sock_ptr
,
PrrtPacket
*
packet
)
{
PrrtPace_track_start
(
sock_ptr
->
prrtTransmitPace
);
XlapTimeStampClock
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
PrrtTransmitStart
);
XlapTimeStampCycle
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
PrrtTransmitStart
);
if
(
block
==
NULL
)
{
block
=
PrrtBlock_create
(
PrrtCodingConfiguration_copy
(
sock_ptr
->
codingParameters
),
PrrtCoder_copy
(
sock_ptr
->
coder
),
packet
->
sequenceNumber
);
if
(
sock_ptr
->
receiveBlock
==
NULL
)
{
sock_ptr
->
receiveBlock
=
PrrtBlock_create
(
PrrtCodingConfiguration_copy
(
sock_ptr
->
codingParameters
),
PrrtCoder_copy
(
sock_ptr
->
coder
),
packet
->
sequenceNumber
);
sock_ptr
->
receiveBlock
->
senderBlock
=
true
;
}
packet
->
index
=
(
uint8_t
)
(
packet
->
sequenceNumber
-
b
lock
->
baseSequenceNumber
);
packet
->
index
=
(
uint8_t
)
(
packet
->
sequenceNumber
-
sock_ptr
->
receiveB
lock
->
baseSequenceNumber
);
PrrtPacketDataPayload
*
payload
=
packet
->
payload
;
payload
->
groupRTprop_us
=
PrrtSocket_get_rtprop_fwd
(
sock_ptr
);
PrrtPacket
*
packetToSend
=
PrrtPacket_copy
(
packet
);
debug
(
DEBUG_DATATRANSMITTER
,
"Send: %i"
,
packet
->
sequenceNumber
);
send_packet
(
sock_ptr
,
packetToSend
);
int
sendResult
=
send_packet
(
sock_ptr
,
packetToSend
);
if
(
sendResult
)
{
XlapTimeStampClock
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
PrrtTransmitEnd
);
XlapTimeStampCycle
(
sock_ptr
,
ts_data_packet
,
packet
->
sequenceNumber
,
PrrtTransmitEnd
);
PrrtBlock_insert_data_packet
(
block
,
packet
);
if
(
PrrtBlock_insert_data_packet
(
sock_ptr
->
receiveBlock
,
packet
)
==
false
)
{
PERROR
(
"Failed to insert packet: %d"
,
packet
->
sequenceNumber
);
}
// TODO: redundancy should only be sent when necessary
if
(
PrrtBlock_encode_ready
(
block
))
{
uint32_t
j
=
0
;
if
(
PrrtBlock_encode_ready
(
sock_ptr
->
receiveBlock
))
{
unsigned
int
redundancy_seqno
=
sock_ptr
->
sequenceNumberRedundancy
;
XlapTimeStampCycle
(
sock_ptr
,
ts_redundancy_packet
,
redundancy_seqno
,
PrrtEncodeStart
);
PrrtBlock_encode
(
b
lock
,
&
sock_ptr
->
sequenceNumberRedundancy
);
PrrtBlock_encode
(
sock_ptr
->
receiveB
lock
,
&
sock_ptr
->
sequenceNumberRedundancy
);
XlapTimeStampCycle
(
sock_ptr
,
ts_redundancy_packet
,
redundancy_seqno
,
PrrtEncodeEnd
);
uint32_t
redundancyPackets
=
List_count
(
block
->
redundancyPackets
);
for
(
j
=
0
;
j
<
redundancyPackets
;
j
++
)
{
PrrtPacket
*
red_pkt
=
PrrtBlock_get_first_red_data
(
block
);
send_packet
(
sock_ptr
,
red_pkt
);
RetransmissionTimerArgs
*
args
=
(
RetransmissionTimerArgs
*
)
calloc
(
1
,
sizeof
(
RetransmissionTimerArgs
));
args
->
block
=
sock_ptr
->
receiveBlock
;
sock_ptr
->
receiveBlock
=
NULL
;
args
->
socket
=
sock_ptr
;
retransmission_round_handler
(
args
);
}
}
else
{
PrrtPacket_destroy
(
packet
);
}
PrrtPace_track_end
(
sock_ptr
->
prrtTransmitPace
);
}
PrrtBlock_destroy
(
block
);
block
=
NULL
;
void
*
PrrtDataTransmitter_send_data_loop
(
void
*
ptr
)
{
PrrtSocket
*
s
=
ptr
;
while
(
1
)
{
ListNode
*
job
;
do
{
job
=
Pipe_pull
(
s
->
sendDataQueue
);
if
(
PrrtSocket_closing
(
s
))
{
if
(
s
->
receiveBlock
!=
NULL
)
{
PrrtBlock_destroy
(
s
->
receiveBlock
);
s
->
receiveBlock
=
NULL
;
}
return
NULL
;
}
}
while
(
!
job
);
PrrtPacket
*
packet
=
PrrtPacket_byListNode
(
job
);
PrrtDataTransmitter_transmit
(
s
,
packet
);
}
}
prrt/proto/processes/dataTransmitter.h
View file @
b51f0a89
#ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H
void
*
send_data_loop
(
void
*
ptr
);
#include
"../socket.h"
void
*
PrrtDataTransmitter_send_data_loop
(
void
*
ptr
);
void
PrrtDataTransmitter_transmit
(
PrrtSocket
*
sock_ptr
,
PrrtPacket
*
packet
);
#endif //PRRT_DATA_TRANSMITTER_H
prrt/proto/receiver.c
View file @
b51f0a89
...
...
@@ -3,10 +3,110 @@
#include
"xlap.h"
#include
"../util/common.h"
#include
"../util/dbg.h"
#include
"../util/time.h"
#include
"stores/inFlightPacketStore.h"
#include
"receiver.h"
PrrtReceiver
*
PrrtReceiver_create
(
const
char
*
host
,
uint16_t
port
)
{
prrtByteCount_t
clean_rto
(
PrrtReceiver
*
recv
,
PrrtApplicationConstraints
*
constraints
)
{
prrtTimestamp_t
now
=
PrrtClock_get_current_time_us
();
uint32_t
retransmission_delay
=
PrrtReceiver_get_retransmission_delay
(
recv
,
constraints
);
prrtTimestamp_t
deadline
=
now
-
retransmission_delay
;
prrtByteCount_t
lostBytes
=
0
;
lostBytes
+=
PrrtInFlightPacketStore_clear_before
(
recv
->
dataInflightPacketStore
,
deadline
);
lostBytes
+=
PrrtInFlightPacketStore_clear_before
(
recv
->
redundancyInflightPacketStore
,
deadline
);
recv
->
packetTracking
->
pipe
-=
lostBytes
;
recv
->
packetTracking
->
bytes_lost
=
lostBytes
;
return
lostBytes
;
}
void
update_rate_sample
(
PrrtRateSample
*
rateSample
,
PrrtPacket
*
packet
,
prrtTimestamp_t
receiveTime
,
PrrtPacketTracking
*
packetTracking
)
{
if
(
packet
->
delivered_time
==
0
)
return
;
packetTracking
->
delivered
+=
packet
->
payloadLength
;
packetTracking
->
delivered_time
=
receiveTime
;
if
(
packet
->
delivered
>
rateSample
->
prior_delivered
)
{
rateSample
->
prior_delivered
=
packet
->
delivered
;
rateSample
->
prior_time
=
packet
->
delivered_time
;
rateSample
->
is_app_limited
=
packet
->
is_app_limited
;
rateSample
->
send_elapsed
=
packet
->
sent_time
-
packet
->
first_sent_time
;
rateSample
->
ack_elapsed
=
packetTracking
->
delivered_time
-
packet
->
delivered_time
;
packetTracking
->
first_sent_time
=
packet
->
sent_time
;
}
packet
->
delivered_time
=
0
;
}
bool
gnerate_rate_sample
(
PrrtRateSample
*
rateSample
,
PrrtPacketTracking
*
packetTracking
)
{
/* Clear app-limited field */
if
(
packetTracking
->
app_limited
>
0
&&
packetTracking
->
delivered
>
packetTracking
->
app_limited
)
packetTracking
->
app_limited
=
0
;
if
(
rateSample
->
prior_time
==
0
)
{
//printf("Prior Time is 0, Cancelling Rate sample calculation\n");
return
false
;
}
prrtTimedelta_t
interval
=
MAX
(
rateSample
->
send_elapsed
,
rateSample
->
ack_elapsed
);
if
(
interval
<
MIN_RTT
)
{
return
false
;
}
rateSample
->
interval
=
interval
;
rateSample
->
delivered
=
packetTracking
->
delivered
-
rateSample
->
prior_delivered
;
if
(
rateSample
->
interval
!=
0
)
{
// delivered: bytes; interval: us; convert to bps
rateSample
->
delivery_rate
=
(
uint32_t
)
round
(
(((
double
)
rateSample
->
delivered
)
*
1000
.
0
*
1000
.
0
)
/
((
double
)
rateSample
->
interval
));
}
debug
(
DEBUG_FEEDBACK
,
"RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u"
,
rateSample
->
interval
,
rateSample
->
delivered
,
rateSample
->
delivery_rate
,
rateSample
->
is_app_limited
);
return
true
;
}
bool
update_and_generate_rate_sample
(
PrrtReceiver
*
recv
,
prrtSequenceNumber_t
seqnum
,
uint8_t
packetType
,
prrtTimestamp_t
receiveTime
,
prrtTimedelta_t
rtt
,
PrrtApplicationConstraints
*
constraints
)
{
PrrtInFlightPacketStore
*
inflightPacketStore
=
NULL
;
if
(
packetType
==
PACKET_TYPE_DATA
)
{
inflightPacketStore
=
recv
->
dataInflightPacketStore
;
}
else
if
(
packetType
==
PACKET_TYPE_REDUNDANCY
)
{
inflightPacketStore
=
recv
->
redundancyInflightPacketStore
;
}
else
return
false
;
bool
result
=
false
;
PrrtPacket
*
packet
=
PrrtInFlightPacketStore_get_packet_by_seqno
(
inflightPacketStore
,
seqnum
);
if
(
packet
!=
NULL
)
{
update_rate_sample
(
recv
->
rateSample
,
packet
,
receiveTime
,
recv
->
packetTracking
);
recv
->
packetTracking
->
pipe
-=
packet
->
payloadLength
;
PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno
(
inflightPacketStore
,
seqnum
);
prrtByteCount_t
lostBytes
=
clean_rto
(
recv
,
constraints
);
if
(
lostBytes
>
0
)
{
BBR_OnSpuriousLoss
(
recv
->
bbr
,
recv
->
packetTracking
);
}
if
(
lostBytes
==
0
)
{
BBR_OnLossExit
(
recv
->
bbr
);
}
result
=
gnerate_rate_sample
(
recv
->
rateSample
,
recv
->
packetTracking
);
recv
->
packetTracking
->
prior_inflight
=
recv
->
packetTracking
->
pipe
;
if
(
recv
->
rateSample
!=
NULL
)
{
BBR_OnACK
(
recv
->
bbr
,
recv
->
csi
,
recv
->
rateSample
,
recv
->
packetTracking
,
rtt
);
}
}
return
result
;
}
PrrtReceiver
*
PrrtReceiver_create
(
const
char
*
host
,
uint16_t
port
,
prrtByteCount_t
maximum_payload_size
)
{
PrrtReceiver
*
recv
=
calloc
(
1
,
sizeof
(
PrrtReceiver
));
check_mem
(
recv
);
recv
->
host_name
=
strdup
(
host
);
...
...
@@ -24,8 +124,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv
->
csi
=
PrrtChannelStateInformation_create
();
recv
->
dataPacketSt
ates
=
PrrtInFlightPacketStore_create
();
recv
->
redundancyPacketSt
ates
=
PrrtInFlightPacketStore_create
();
recv
->
data
Inflight
PacketSt
ore
=
PrrtInFlightPacketStore_create
();
recv
->
redundancy
Inflight
PacketSt
ore
=
PrrtInFlightPacketStore_create
();
struct
addrinfo
*
info
;
...
...
@@ -38,12 +138,16 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
check
(
0
==
getaddrinfo
(
host
,
portstr
,
&
hints
,
&
info
),
"getaddrinfo"
);
recv
->
ai
=
info
;
recv
->
bbr
=
BBR_Init
(
maximum_payload_size
);
recv
->
closing
=
false
;
pthread_mutexattr_t
attr
;
check
(
pthread_mutexattr_init
(
&
attr
)
==
EXIT_SUCCESS
,
"Mutex attr init failed."
);
check
(
pthread_mutexattr_settype
(
&
attr
,
PTHREAD_MUTEX_RECURSIVE
)
==
EXIT_SUCCESS
,
"Setting type failed."
);
check
(
pthread_mutex_init
(
&
recv
->
lock
,
&
attr
)
==
0
,
"lock init failed."
);
check
(
pthread_cond_init
(
&
recv
->
wait_for_space
,
NULL
)
==
EXIT_SUCCESS
,
"Condition init failed."
);
return
recv
;
...
...
@@ -57,16 +161,20 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
}
bool
PrrtReceiver_destroy
(
PrrtReceiver
*
receiver
)
{
if
(
receiver
->
bbr
)
{
BBR_destroy
(
receiver
->
bbr
);
}
if
(
receiver
->
csi
!=
NULL
)
{
PrrtChannelStateInformation_destroy
(
receiver
->
csi
);
}
if
(
receiver
->
dataPacketSt
ates
!=
NULL
)
{
PrrtInFlightPacketStore_destroy
(
receiver
->
dataPacketSt
ates
);
if
(
receiver
->
data
Inflight
PacketSt
ore
!=
NULL
)
{
PrrtInFlightPacketStore_destroy
(
receiver
->
data
Inflight
PacketSt
ore
);
}
if
(
receiver
->
redundancyPacketSt
ates
!=
NULL
)
{
PrrtInFlightPacketStore_destroy
(
receiver
->
redundancyPacketSt
ates
);
if
(
receiver
->
redundancy
Inflight
PacketSt
ore
!=
NULL
)
{
PrrtInFlightPacketStore_destroy
(
receiver
->
redundancy
Inflight
PacketSt
ore
);
}
if
(
receiver
->
packetTracking
!=
NULL
)
{
...
...
@@ -81,6 +189,7 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
freeaddrinfo
(
receiver
->
ai
);
free
((
void
*
)
receiver
->
host_name
);
check
(
pthread_mutex_destroy
(
&
receiver
->
lock
)
==
0
,
"lock destroy failed."
);
check
(
pthread_cond_destroy
(
&
receiver
->
wait_for_space
)
==
0
,
"cond destroy failed."
);
free
(
receiver
);
return
true
;
...
...
@@ -89,105 +198,177 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
return
false
;
}
void
PrrtReceiver_updateRateSample
(
PrrtRateSample
*
rateSample
,
PrrtPacket
*
packet
,
prrtTimestamp_t
receiveTime
,
PrrtPacketTracking
*
packetTracking
)
{
if
(
packet
->
delivered_time
==
0
)
return
;
void
PrrtReceiver_rto_check
(
PrrtReceiver
*
recv
,
PrrtApplicationConstraints
*
constraints
)
{
check
(
pthread_mutex_lock
(
&
recv
->
lock
)
==
0
,
"Lock failed."
);
packetTracking
->
delivered
+=
packet
->
payloadLength
;
packetTracking
->
delivered_time
=
receiveTime
;
prrtByteCount_t
lostBytes
=
clean_rto
(
recv
,
constraints
);
if
(
packet
->
delivered
>
rateSample
->
prior_delivered
)
{
rateSample
->
prior_delivered
=
packet
->
delivered
;
rateSample
->
prior_time
=
packet
->
delivered_time
;
rateSample
->
is_app_limited
=
packet
->
is_app_limited
;
rateSample
->
send_elapsed
=
packet
->
sent_time
-
packet
->
first_sent_time
;
rateSample
->
ack_elapsed
=
packetTracking
->
delivered_time
-
packet
->
delivered_time
;
packetTracking
->
first_sent_time
=
packet
->
sent_time
;
if
(
lostBytes
>
0
)
{
BBR_OnRTOLoss
(
recv
->
bbr
);
}
packet
->
delivered_time
=
0
;
if
(
lostBytes
==
0
)
{
BBR_OnLossExit
(
recv
->
bbr
);
}
pthread_cond_broadcast
(
&
recv
->
wait_for_space
);
check
(
pthread_mutex_unlock
(
&
recv
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"PrrtReceiver_rto_check() failed."
)
}
void
PrrtReceiver_on_application_write
(
PrrtReceiver
*
receiver
)
{
prrtByteCount_t
PrrtReceiver_get_pipe
(
PrrtReceiver
*
recv
)
{
prrtByteCount_t
res
=
0
;
check
(
pthread_mutex_lock
(
&
recv
->
lock
)
==
0
,
"Lock failed."
);
res
=
recv
->
packetTracking
->
pipe
;
check
(
pthread_mutex_unlock
(
&
recv
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"PrrtReceiver_get_pipe() failed."
)
return
0
;
}
void
PrrtReceiver_on_application_write
(
PrrtReceiver
*
receiver
,
uint32_t
send_queue_length
,
prrtSequenceNumber_t
sequenceNumber
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
debug
(
DEBUG_RECEIVER
,
"OnApplicationWrite: %u, %d"
,
sequenceNumber
,
send_queue_length
);
PrrtPacketTracking
*
tracking
=
receiver
->
packetTracking
;
if
(
PrrtInFlightPacketStore_get_queue_size
(
receiver
->
dataPacketStates
)
+
PrrtInFlightPacketStore_get_queue_size
(
receiver
->
redundancyPacketStates
)
==
0
)
{
if
(
send_queue_length
==
0
&&
tracking
->
pipe
<
BBR_getCwnd
(
receiver
->
bbr
))
{
tracking
->
app_limited
=
(
tracking
->
delivered
+
tracking
->
pipe
)
?
:
1
;
}
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
debug
(
DEBUG_RECEIVER
,
"OnApplicationWrite done: %d."
,
send_queue_length
);
return
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
}
void
PrrtReceiver_on_ack
(
PrrtReceiver
*
receiver
,
PrrtPacketFeedbackPayload
*
feedbackPayload
,
prrtTimestamp_t
receiveTime
,
prrtTimedelta_t
rtt
,
PrrtApplicationConstraints
*
constraints
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
//debug(DEBUG_RECEIVER, "PrrtReceiver_on_ack");
PrrtChannelStateInformation_update_plr
(
receiver
->
csi
,
feedbackPayload
->
erasureCount
,
feedbackPayload
->
packetCount
);
bool
valid_sample
=
update_and_generate_rate_sample
(
receiver
,
feedbackPayload
->
ackSequenceNumber
,
feedbackPayload
->
ackPacketType
,
receiveTime
,
rtt
,
constraints
);
if
(
valid_sample
)
{
PrrtChannelStateInformation_update_delivery_rate
(
receiver
->
csi
,
receiver
->
rateSample
->
delivery_rate
);
}
bool
PrrtReceiver_generateRateSample
(
PrrtRateSample
*
rateSample
,
PrrtPacketTracking
*
packetTracking
)
{
/* Clear app-limited field */
if
(
packetTracking
->
app_limited
&&
packetTracking
->
delivered
>
packetTracking
->
app_limited
)
packetTracking
->
app_limited
=
0
;
pthread_cond_broadcast
(
&
receiver
->
wait_for_space
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
;
if
(
rateSample
->
prior_time
==
0
)
{
//printf("Prior Time is 0, Cancelling Rate sample calculation\n");
return
false
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
}
prrtTimedelta_t
interval
=
MAX
(
rateSample
->
send_elapsed
,
rateSample
->
ack_elapsed
);
if
(
interval
<
MIN_RTT
)
{
return
false
;
double
PrrtReceiver_get_BBR_pacingRate
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
double
res
=
BBR_getPacingRate
(
receiver
->
bbr
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
0
.
0
;
}
rateSample
->
interval
=
interval
;
uint32_t
PrrtReceiver_get_BBR_state
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
uint32_t
res
=
BBR_getState
(
receiver
->
bbr
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
rateSample
->
delivered
=
packetTracking
->
delivered
-
rateSample
->
prior_delivered
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
0
;
}
if
(
rateSample
->
interval
!=
0
)
{
// delivered: bytes; interval: us; convert to bps
rateSample
->
delivery_rate
=
(
uint32_t
)
round
(
(((
double
)
rateSample
->
delivered
)
*
1000
.
0
*
1000
.
0
*
8
.
0
)
/
((
double
)
rateSample
->
interval
));
double
PrrtReceiver_get_BBR_pacingGain
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
double
res
=
BBR_getPacingGain
(
receiver
->
bbr
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
0
.
0
;
}
debug
(
DEBUG_FEEDBACK
,
"RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u"
,
rateSample
->
interval
,
rateSample
->
delivered
,
rateSample
->
delivery_rate
,
rateSample
->
is_app_limited
);
return
true
;
prrtByteCount_t
PrrtReceiver_get_space
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
prrtByteCount_t
cwnd
=
BBR_getCwnd
(
receiver
->
bbr
);
prrtByteCount_t
pipe
=
receiver
->
packetTracking
->
pipe
;
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
if
(
cwnd
<
pipe
)
{
return
0
;
}
return
cwnd
-
pipe
;
bool
PrrtReceiver_updateAndGenerateRateSample
(
PrrtReceiver
*
recv
,
prrtSequenceNumber_t
seqnum
,
uint8_t
packetType
,
prrtTimestamp_t
receiveTime
)
{
PrrtInFlightPacketStore
*
packetStore
=
NULL
;
if
(
packetType
==
PACKET_TYPE_DATA
)
{
packetStore
=
recv
->
dataPacketStates
;
}
else
if
(
packetType
==
PACKET_TYPE_REDUNDANCY
)
{
packetStore
=
recv
->
redundancyPacketStates
;
}
else
return
false
;
bool
result
=
false
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
0
;
}
check
(
pthread_mutex_lock
(
&
recv
->
lock
)
==
0
,
"Lock failed."
);
PrrtPacket
*
packet
=
PrrtInFlightPacketStore_get_packet
(
packetStore
,
seqnum
);
if
(
packet
!=
NULL
)
{
PrrtReceiver_updateRateSample
(
recv
->
rateSample
,
packet
,
receiveTime
,
recv
->
packetTracking
);
result
=
PrrtReceiver_generateRateSample
(
recv
->
rateSample
,
recv
->
packetTracking
);
recv
->
packetTracking
->
pipe
-=
packet
->
payloadLength
;
PrrtInFlightPacketStore_remove_outstanding_packet
(
packetStore
,
seqnum
);
bool
PrrtReceiver_wait_for_space
(
PrrtReceiver
*
receiver
,
prrtByteCount_t
maximum_payload_size
,
PrrtApplicationConstraints
*
applicationConstraints
)
{
bool
waitResult
=
true
;
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
int64_t
space
=
PrrtReceiver_get_space
(
receiver
);
while
(
space
<
maximum_payload_size
)
{
uint32_t
waittime_us
=
PrrtReceiver_get_retransmission_delay
(
receiver
,
applicationConstraints
);
debug
(
DEBUG_RECEIVER
,
"Wait for: %d"
,
waittime_us
);
struct
timespec
deadline
=
abstime_from_now
(
waittime_us
);
pthread_cond_timedwait
(
&
receiver
->
wait_for_space
,
&
receiver
->
lock
,
&
deadline
);
if
(
atomic_load_explicit
(
&
receiver
->
closing
,
memory_order_acquire
))
{
waitResult
=
false
;
break
;
}
PrrtReceiver_rto_check
(
receiver
,
applicationConstraints
);
space
=
PrrtReceiver_get_space
(
receiver
);
}
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
waitResult
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
false
;
}
check
(
pthread_mutex_unlock
(
&
recv
->
lock
)
==
0
,
"Unlock failed."
);
return
result
;
uint32_t
PrrtReceiver_get_retransmission_delay
(
PrrtReceiver
*
receiver
,
PrrtApplicationConstraints
*
applicationConstraints
)
{
uint32_t
rtprop
=
BBR_getRTProp
(
receiver
->
bbr
);
if
(
rtprop
==
RTprop_Inf
)
{
rtprop
=
(
uint32_t
)
round
(
0
.
5
*
PrrtApplicationConstraints_get_target_delay
(
applicationConstraints
));
}
int
processing_margin_us
=
1000
;
double
waittime_us
=
rtprop
*
1
.
0
+
processing_margin_us
;
return
(
uint32_t
)
round
(
waittime_us
);
}
prrtDeliveryRate_t
PrrtReceiver_get_BBR_btlDr
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
prrtDeliveryRate_t
res
=
BBR_getBtlBw
(
receiver
->
bbr
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
res
;
error:
PERROR
(
"Mutex error.%s"
,
""
);
return
false
;
return
0
;
}
void
PrrtReceiver_add_outstanding_packet_state
(
PrrtReceiver
*
recv
,
PrrtPacket
*
packet
,
prrtTimestamp_t
sentTime
)
{
PrrtInFlightPacketStore
*
packetStore
=
NULL
;
if
(
PrrtPacket_type
(
packet
)
==
PACKET_TYPE_DATA
)
{
packetStore
=
recv
->
dataPacketSt
ates
;
packetStore
=
recv
->
data
Inflight
PacketSt
ore
;
}
else
if
(
PrrtPacket_type
(
packet
)
==
PACKET_TYPE_REDUNDANCY
)
{
packetStore
=
recv
->
redundancyPacketSt
ates
;
packetStore
=
recv
->
redundancy
Inflight
PacketSt
ore
;
}
else
return
;
//printf("Adding Packet #%u to %u\n", packet->sequenceNumber, PrrtPacket_type(packet));
check
(
pthread_mutex_lock
(
&
recv
->
lock
)
==
0
,
"Lock failed."
);
if
(
recv
->
packetTracking
->
pipe
==
0
)
{
...
...
@@ -202,10 +383,22 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
packet
->
is_app_limited
=
(
recv
->
packetTracking
->
app_limited
!=
0
);
PrrtInFlightPacketStore_add_outstanding_packet
(
packetStore
,
packet
);
check
(
pthread_mutex_unlock
(
&
recv
->
lock
)
==
0
,
"Unlock failed."
);
pthread_cond_broadcast
(
&
recv
->
wait_for_space
);
check
(
pthread_mutex_unlock
(
&
recv
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"Lock error.%s"
,
""
);
}
void
PrrtReceiver_interrupt
(
PrrtReceiver
*
receiver
)
{
check
(
pthread_mutex_lock
(
&
receiver
->
lock
)
==
0
,
"Lock failed."
);
atomic_store_explicit
(
&
receiver
->
closing
,
true
,
memory_order_release
);
pthread_cond_broadcast
(
&
receiver
->
wait_for_space
);
check
(
pthread_mutex_unlock
(
&
receiver
->
lock
)
==
0
,
"Unlock failed."
);
return
;
error:
PERROR
(
"PrrtReceiver_interrupt failed."
);
}
\ No newline at end of file
prrt/proto/receiver.h
View file @
b51f0a89
...
...
@@ -7,28 +7,10 @@
#include
<sys/socket.h>
#include
<netdb.h>
#include
"stores/inFlightPacketStore.h"
#include
"bbr.h"
#include
"types/applicationConstraints.h"
#include
"types/channelStateInformation.h"
typedef
struct
prrtRateSample
{
prrtByteCount_t
prior_delivered
;
prrtTimestamp_t
prior_time
;
prrtTimedelta_t
send_elapsed
;
prrtTimedelta_t
ack_elapsed
;
prrtTimedelta_t
interval
;
prrtByteCount_t
delivered
;
bool
is_app_limited
;
prrtDeliveryRate_t
delivery_rate
;
// Bps
}
PrrtRateSample
;
typedef
struct
packetTracking
{
prrtByteCount_t
pipe
;
prrtByteCount_t
delivered
;
prrtTimestamp_t
delivered_time
;
prrtTimestamp_t
first_sent_time
;
prrtByteCount_t
app_limited
;
}
PrrtPacketTracking
;
typedef
struct
prrtReceiver
{
const
char
*
host_name
;
...
...
@@ -36,23 +18,39 @@ typedef struct prrtReceiver {
struct
addrinfo
*
ai
;
PrrtChannelStateInformation
*
csi
;
pthread_mutex_t
lock
;
pthread_cond_t
wait_for_space
;
atomic_bool
closing
;
BBR
*
bbr
;
PrrtInFlightPacketStore
*
dataPacketSt
ates
;
PrrtInFlightPacketStore
*
redundancyPacketSt
ates
;
PrrtInFlightPacketStore
*
data
Inflight
PacketSt
ore
;
PrrtInFlightPacketStore
*
redundancy
Inflight
PacketSt
ore
;
PrrtRateSample
*
rateSample
;
PrrtPacketTracking
*
packetTracking
;
}
PrrtReceiver
;
PrrtReceiver
*
PrrtReceiver_create
(
const
char
*
host
,
uint16_t
port
);
bool
PrrtReceiver_updateAndGenerateRateSample
(
PrrtReceiver
*
recv
,
prrtSequenceNumber_t
seqnum
,
uint8_t
packetType
,
prrtTimestamp_t
receiveTime
);
PrrtReceiver
*
PrrtReceiver_create
(
const
char
*
host
,
uint16_t
port
,
prrtByteCount_t
maximum_payload_size
);
void
PrrtReceiver_add_outstanding_packet_state
(
PrrtReceiver
*
recv
,
PrrtPacket
*
packet
,
prrtTimestamp_t
sentTime
);
prrtByteCount_t
PrrtReceiver_get_pipe
(
PrrtReceiver
*
recv
);
void
PrrtReceiver_rto_check
(
PrrtReceiver
*
recv
,
PrrtApplicationConstraints
*
constraints
);
void
PrrtReceiver_on_application_write
(
PrrtReceiver
*
receiver
,
uint32_t
send_queue_length
,
prrtSequenceNumber_t
sequenceNumber
);
void
PrrtReceiver_on_ack
(
PrrtReceiver
*
receiver
,
PrrtPacketFeedbackPayload
*
feedbackPayload
,
prrtTimestamp_t
receiveTime
,
prrtTimedelta_t
rtt
,
PrrtApplicationConstraints
*
constraints
);
bool
PrrtReceiver_wait_for_space
(
PrrtReceiver
*
receiver
,
prrtByteCount_t
maximum_payload_size
,
PrrtApplicationConstraints
*
pConstraints
);
uint32_t
PrrtReceiver_get_retransmission_delay
(
PrrtReceiver
*
socket
,
PrrtApplicationConstraints
*
applicationConstraints
);
void
PrrtReceiver_on_application_write
(
PrrtReceiver
*
receiver
);
prrtByteCount_t
PrrtReceiver_get_space
(
PrrtReceiver
*
receiver
);
double
PrrtReceiver_get_BBR_pacingRate
(
PrrtReceiver
*
receiver
);
double
PrrtReceiver_get_BBR_pacingGain
(
PrrtReceiver
*
receiver
);
uint32_t
PrrtReceiver_get_BBR_state
(
PrrtReceiver
*
receiver
);
prrtDeliveryRate_t
PrrtReceiver_get_BBR_btlDr
(
PrrtReceiver
*
receiver
);
void
PrrtReceiver_interrupt
(
PrrtReceiver
*
receiver
);
bool
PrrtReceiver_destroy
(
PrrtReceiver
*
receiver
);
#endif //PRRT_RECEIVER_H
prrt/proto/socket.c
View file @
b51f0a89
This diff is collapsed.
Click to expand it.
prrt/proto/socket.h
View file @
b51f0a89
#ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H
#include
"../defines.h"
#include
"../util/list.h"
#include
"../util/pipe.h"
...
...
@@ -9,6 +10,8 @@
#include
"stores/dataPacketStore.h"
#include
"stores/deliveredPacketTable.h"
#include
"stores/packetTimeoutTable.h"
#include
"stores/pace.h"
#include
"stores/paceFilter.h"
#include
"stores/receptionTable.h"
#include
"stores/repairBlockStore.h"
#include
"stores/packetDeliveryStore.h"
...
...
@@ -17,6 +20,7 @@
#include
"types/lossStatistics.h"
#include
"types/packet.h"
#include
"clock.h"
#include
"timer.h"
#include
"xlap.h"
#include
"receiver.h"
...
...
@@ -31,8 +35,12 @@ typedef struct prrtSocket {
struct
sockaddr_in
*
address
;
bool
isBound
;
bool
withTimestamp
;
bool
pacingEnabled
;
PrrtClock
clock
;
PrrtBlock
*
receiveBlock
;
pthread_t
sendDataThread
;
Pipe
*
sendDataQueue
;
...
...
@@ -50,6 +58,8 @@ typedef struct prrtSocket {
atomic_bool
closing
;
prrtTimestamp_t
nextSendTime
;
prrtSequenceNumber_t
packetsCount
;
prrtSequenceNumber_t
sequenceNumberSource
;
prrtSequenceNumber_t
sequenceNumberRepetition
;
...
...
@@ -69,6 +79,18 @@ typedef struct prrtSocket {
PrrtCodingConfiguration
*
codingParameters
;
PrrtCoder
*
coder
;
prrtTimedelta_t
rtt
;
// Pacing
PrrtPace
*
appSendPace
;
PrrtPace
*
prrtTransmitPace
;
PrrtPace
*
prrtReceivePace
;
PrrtPace
*
appDeliverPace
;
prrtTimedelta_t
send_peer_btl_pace
;
prrtTimedelta_t
recv_peer_btl_pace
;
prrtTimedelta_t
send_peer_app_total_pace
;
_Atomic
(
XlapTimestampTable
*
)
tstable
[
2
];
pthread_attr_t
*
sendDataThreadAttr
;
...
...
@@ -77,8 +99,12 @@ typedef struct prrtSocket {
atomic_bool
isHardwareTimestamping
;
char
*
interfaceName
;
PrrtChannelStateInformation
*
senderChannelStateInformation
;
atomic_bool
isThreadPinning
;
prrtByteCount_t
maximum_payload_size
;
PrrtTimer
*
retransmissionTimer
;
}
PrrtSocket
;
...
...
@@ -106,7 +132,9 @@ int PrrtSocket_close(PrrtSocket *s);
bool
PrrtSocket_connect
(
PrrtSocket
*
s
,
const
char
*
host
,
const
uint16_t
port
);
int
PrrtSocket_send
(
PrrtSocket
*
s
,
const
uint8_t
*
data
,
size_t
data_len
);
int
PrrtSocket_send_async
(
PrrtSocket
*
s
,
const
uint8_t
*
data
,
const
size_t
data_len
);
int
PrrtSocket_send_sync
(
PrrtSocket
*
s
,
const
uint8_t
*
data
,
size_t
data_len
);
bool
PrrtSocket_pace
(
PrrtSocket
*
sock_ptr
,
bool
prepace
);
int32_t
PrrtSocket_recv
(
PrrtSocket
*
s
,
void
*
buf_ptr
,
struct
sockaddr
*
addr
);
...
...
@@ -118,7 +146,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t
PrrtSocket_receive_ordered_wait
(
PrrtSocket
*
s
,
void
*
buf_ptr
,
struct
sockaddr
*
addr
,
prrtTimedelta_t
time_window_us
);
int32_t
PrrtSocket_receive_ordered_timedwait
(
PrrtSocket
*
s
,
void
*
buf_ptr
,
struct
sockaddr
*
addr
,
prrtTimedelta_t
time_window_us
,
struct
timespec
*
deadline
);
bool
PrrtSocket_cleanup
(
PrrtSocket
*
s
);
bool
PrrtSocket_closing
(
PrrtSocket
*
s
);
...
...
@@ -126,11 +153,23 @@ bool PrrtSocket_closing(PrrtSocket *s);
bool
PrrtSocket_uses_thread_pinning
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_rtprop_fwd
(
PrrtSocket
*
s
);
prrtPacketLossRate_t
PrrtSocket_get_plr_fwd
(
PrrtSocket
*
s
);
prrtDeliveryRate_t
PrrtSocket_get_delivery_rate_fwd
(
PrrtSocket
*
s
);
prrtDeliveryRate_t
PrrtSocket_get_btlbw_fwd
(
PrrtSocket
*
s
);
prrtDeliveryRate_t
PrrtSocket_get_btlbw_back
(
PrrtSocket
*
s
);
bool
PrrtSocket_get_app_limited
(
PrrtSocket
*
s
);
uint64_t
PrrtSocket_get_full_bw
(
PrrtSocket
*
s
);
bool
PrrtSocket_get_filled_pipe
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_bbr_state
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_bbr_app_limited
(
PrrtSocket
*
s
);
bool
PrrtSocket_get_bbr_is_app_limited
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_cycle_index
(
PrrtSocket
*
s
);
float
PrrtSocket_get_pacing_gain
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_cwnd
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_inflight
(
PrrtSocket
*
s
);
uint32_t
PrrtSocket_get_pacing_rate
(
PrrtSocket
*
s
);
prrtByteCount_t
PrrtSocket_get_send_quantum
(
PrrtSocket
*
s
);
prrtByteCount_t
PrrtSocket_get_pipe
(
PrrtSocket
*
s
);
prrtByteCount_t
PrrtSocket_get_delivered
(
PrrtSocket
*
s
);
bool
PrrtSocket_get_bbr_round_start
(
PrrtSocket
*
s
);
#endif // PRRT_SOCKET_H
Prev
1
2
3
4
Next