...
 
Commits (261)
...@@ -9,3 +9,4 @@ tests/__pycache__/ ...@@ -9,3 +9,4 @@ tests/__pycache__/
MANIFEST MANIFEST
prrt.cpython*.so prrt.cpython*.so
prrt.so prrt.so
.ipynb_checkpoints/
...@@ -22,8 +22,8 @@ build:prrt: ...@@ -22,8 +22,8 @@ build:prrt:
- which cmake - which cmake
- which gcc - which gcc
- which g++ - which g++
- pip3 list | grep Cython - pip3 list --format=legacy | grep Cython
- pip3 list | grep numpy - pip3 list --format=legacy | grep numpy
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1 - CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
- make - make
...@@ -38,6 +38,17 @@ build:container: ...@@ -38,6 +38,17 @@ build:container:
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG - docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG - docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem: test:prrt_mem:
stage: test stage: test
dependencies: dependencies:
...@@ -94,4 +105,4 @@ deploy:profile: ...@@ -94,4 +105,4 @@ deploy:profile:
paths: paths:
- gprof-send.txt - gprof-send.txt
- gprof-recv.txt - gprof-recv.txt
expire_in: 30 days expire_in: 30 days
\ No newline at end of file
...@@ -36,4 +36,4 @@ if(PRRT_TESTS) ...@@ -36,4 +36,4 @@ if(PRRT_TESTS)
endif() endif()
add_custom_target(perftest COMMAND python3 tests/eval.py) add_custom_target(perftest COMMAND python3 tests/eval.py)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver) add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
\ No newline at end of file
...@@ -5,11 +5,13 @@ ...@@ -5,11 +5,13 @@
## Features ## Features
* Hybrid error control (FEC + ARQ) using systematic Vandermonde codes * Hybrid error control (FEC + ARQ) using systematic Vandermonde codes
* Congestion control and pacing using a variant of [BBR](https://groups.google.com/forum/#!forum/bbr-dev)
* Clock synchronization between sending stack and receiving stack * Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times * Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation * Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate * Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](http://xlap.larn.systems) * Packet-level timing analysis using [X-Lap](http://xlap.larn.systems)
* Wireshark dissector written in Lua
* [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping) * [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping)
## Installation ## Installation
...@@ -29,9 +31,9 @@ port = int(sys.argv[1]) ...@@ -29,9 +31,9 @@ port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port) s = prrt.PrrtSocket(port=port)
while True: while True:
d = s.recv() d = s.recv().decode("utf8")
if d != "Close": if d != "Close":
print d print(d)
else: else:
break break
``` ```
...@@ -44,25 +46,27 @@ import prrt ...@@ -44,25 +46,27 @@ import prrt
host = sys.argv[1] host = sys.argv[1]
port = int(sys.argv[2]) port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port) s = prrt.PrrtSocket(port=localport)
s.connect(host, port) s.connect(host, port)
for i in range(10): for i in range(10):
s.send("Packet {}".format(i)) s.send("Packet {}".format(i).encode("utf8"))
s.send("Close") s.send("Close".encode("utf8"))
``` ```
Start the receiver by: Start the receiver by:
```bash ```bash
python receiver.py 5000 python3 receiver.py 5000
``` ```
In a separate terminal, run: In a separate terminal, run:
```bash ```bash
python sender.py 127.0.0.1 5000 python3 sender.py 127.0.0.1 5000 6000
``` ```
This should generate the following output in the receiver console: This should generate the following output in the receiver console:
...@@ -84,13 +88,13 @@ Packet 9 ...@@ -84,13 +88,13 @@ Packet 9
If you find PRRT useful and incorporate it in your works, we are very happy to hear about it. Please also consider to cite us like this: If you find PRRT useful and incorporate it in your works, we are very happy to hear about it. Please also consider to cite us like this:
```bibtex ```bibtex
@misc{sic2018prrt, @misc{sic2018prrt,
author = {Schmidt, Andreas}, author = {Schmidt, Andreas},
title = {PRRT: Predictably Reliable Real-time Transport}, title = {PRRT: Predictably Reliable Real-time Transport},
howpublished={Web page}, howpublished={Web page},
url = {http://prrt.larn.systems}, url = {http://prrt.larn.systems},
year = {2018} year = {2018}
} }
``` ```
## License ## License
......
...@@ -35,6 +35,7 @@ local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k") ...@@ -35,6 +35,7 @@ local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k")
local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE) local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE)
local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT") local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT")
local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT") local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT")
local pf_fb_btlPace = ProtoField.uint32("prrt.feedback.btlPace", "Bottleneck pace")
local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count") local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count")
local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count") local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count")
local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length") local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length")
...@@ -66,6 +67,7 @@ prrt_proto.fields = { ...@@ -66,6 +67,7 @@ prrt_proto.fields = {
pf_fb, pf_fb,
pf_fb_groupRTT, pf_fb_groupRTT,
pf_fb_ftt, pf_fb_ftt,
pf_fb_btlPace,
pf_fb_erasurecount, pf_fb_erasurecount,
pf_fb_packetcount, pf_fb_packetcount,
pf_fb_gaplength, pf_fb_gaplength,
...@@ -89,9 +91,16 @@ local ex_type = Field.new("prrt.type") ...@@ -89,9 +91,16 @@ local ex_type = Field.new("prrt.type")
local function getType() return ex_type()() end local function getType() return ex_type()() end
local function getTypeName() return prrtPacketTypeNames[getType()] end local function getTypeName() return prrtPacketTypeNames[getType()] end
local ex_index = Field.new("prrt.index")
local function getIndex() return ex_index()() end
local ex_data_length = Field.new("prrt.data.length") local ex_data_length = Field.new("prrt.data.length")
local function getDataLength() return ex_data_length()() end local function getDataLength() return ex_data_length()() end
local ex_red_baseseqno = Field.new("prrt.redundancy.baseSequenceNumber")
local function getRedBaseSeqNo() return ex_red_baseseqno()() end
local ex_red_n = Field.new("prrt.redundancy.n") local ex_red_n = Field.new("prrt.redundancy.n")
local function getRedN() return ex_red_n()() end local function getRedN() return ex_red_n()() end
...@@ -109,7 +118,7 @@ local function dissect_data(buffer, pinfo, root) ...@@ -109,7 +118,7 @@ local function dissect_data(buffer, pinfo, root)
tree:add(pf_data_groupRTprop, buffer:range(8,4)) tree:add(pf_data_groupRTprop, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4)) tree:add(pf_data_packettimeout, buffer:range(12,4))
local label = "DATA Len=" .. getDataLength() local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label) tree:set_text(label)
pinfo.cols.info:set(label) pinfo.cols.info:set(label)
end end
...@@ -121,7 +130,7 @@ local function dissect_redundancy(buffer, pinfo, root) ...@@ -121,7 +130,7 @@ local function dissect_redundancy(buffer, pinfo, root)
tree:add(pf_red_n, buffer:range(6,1)) tree:add(pf_red_n, buffer:range(6,1))
tree:add(pf_red_k, buffer:range(7,1)) tree:add(pf_red_k, buffer:range(7,1))
local label = "REDUNDANCY n=" .. getRedN() .. " k=" .. getRedK() local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label) tree:set_text(label)
pinfo.cols.info:set(label) pinfo.cols.info:set(label)
end end
...@@ -130,16 +139,17 @@ local function dissect_feedback(buffer, pinfo, root) ...@@ -130,16 +139,17 @@ local function dissect_feedback(buffer, pinfo, root)
local tree = root:add(pf_fb, buffer:range(0)) local tree = root:add(pf_fb, buffer:range(0))
tree:add(pf_fb_groupRTT, buffer:range(0,4)) tree:add(pf_fb_groupRTT, buffer:range(0,4))
tree:add(pf_fb_ftt, buffer:range(4,4)) tree:add(pf_fb_ftt, buffer:range(4,4))
tree:add(pf_fb_erasurecount, buffer:range(8,2)) tree:add(pf_fb_btlPace, buffer:range(8,4))
tree:add(pf_fb_packetcount, buffer:range(10,2)) tree:add(pf_fb_erasurecount, buffer:range(12,2))
tree:add(pf_fb_gaplength, buffer:range(12,2)) tree:add(pf_fb_packetcount, buffer:range(14,2))
tree:add(pf_fb_gapcount, buffer:range(14,2)) tree:add(pf_fb_gaplength, buffer:range(16,2))
tree:add(pf_fb_burstlength, buffer:range(16,2)) tree:add(pf_fb_gapcount, buffer:range(18,2))
tree:add(pf_fb_burstcount, buffer:range(18,2)) tree:add(pf_fb_burstlength, buffer:range(20,2))
tree:add(pf_fb_acktype, buffer:range(20,1)) tree:add(pf_fb_burstcount, buffer:range(22,2))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2)) tree:add(pf_fb_acktype, buffer:range(24,1))
tree:add(pf_fb_ackSeqN, buffer:range(25, 2))
local label = "FEEDBACK"
local label = "[F]"
tree:set_text(label) tree:set_text(label)
pinfo.cols.info:set(label) pinfo.cols.info:set(label)
end end
......
FROM gcc:5 FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de> MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN apt-get update \ ENV DEBIAN_FRONTEND noninteractive
&& apt-get upgrade -y \
&& apt-get install -y cmake RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/ COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt COPY prrt /prrt/prrt
COPY tests /prrt/tests COPY tests /prrt/tests
COPY docker/entrypoint.sh / COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt WORKDIR /prrt
RUN cmake . \ RUN cmake . \
&& make && make
ENV PATH /prrt/bin:$PATH ENV PATH /prrt:$PATH
WORKDIR /prrt/bin
VOLUME /output VOLUME /output
......
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake -DTCP=1 . \
&& make
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
#!/bin/bash #!/bin/sh
dev=eth0 dev=eth0
command=$1 command=$1
shift shift
if [[ "$command" == "sender" || "$command" == "receiver" ]]; then if [[ "$command" == "sender" || "$command" == "receiver" || "$command" == "time-sender" || "$command" == "time-receiver" ]]; then
: :
else else
echo "Command should be either sender or receiver." echo "Command should be either sender or receiver."
exit 0; exit 0;
fi fi
TARGET="127.0.0.1"
OUTPUT="/dev/null"
NETEM=() NETEM=()
PRRT=() PRRT=()
while [[ $# -gt 0 ]] while [[ $# -gt 0 ]]
...@@ -19,7 +21,24 @@ do ...@@ -19,7 +21,24 @@ do
key="$1" key="$1"
case $key in case $key in
-t|--target|-p|--port|-r|--rounds) -t|--target)
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
PRRT+=("$1 $2")
fi
TARGET=("$2")
shift
shift
;;
-w|--wireshark)
OUTPUT=("$2")
shift
shift
;;
-T|--threadpinning|-U)
PRRT+=("$1")
shift
;;
-p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay|-j|--appjitter)
PRRT+=("$1 $2") PRRT+=("$1 $2")
shift shift
shift shift
...@@ -34,8 +53,34 @@ done ...@@ -34,8 +53,34 @@ done
PRRT_PARAMS="${PRRT[@]}" PRRT_PARAMS="${PRRT[@]}"
NETEM_PARAMS="${NETEM[@]}" NETEM_PARAMS="${NETEM[@]}"
echo "Starting Wireshark."
tshark -i eth0 -w $OUTPUT.pcap &
TSHARK_PID=$!
sleep 2
start=$(date +%s.%N);
echo "Checking reachability of $TARGET."
until ping -c1 $TARGET &>/dev/null; do sleep 1; done
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Reachable after %.6f seconds\n" $dur
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
echo "Delaying $command"
sleep 10
fi
start=$(date +%s.%N);
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\"" echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
tc qdisc add dev $dev root netem $NETEM_PARAMS #tc qdisc add dev $dev root netem $NETEM_PARAMS
/prrt/$command $PRRT_PARAMS -o /output/log.csv trap 'echo "Caught SIGINT."; echo "$(ps -a)"; killall -SIGINT $command' INT
tc qdisc del dev $dev root LOG=$(/prrt/$command $PRRT_PARAMS 2>&1)
printf "$LOG\n"
echo "Exit status: $?"
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Done after %.6f seconds\n" $dur
#tc qdisc del dev $dev root
kill $TSHARK_PID
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.read_csv(\"eval.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
import sys
sys.path.insert(0, "./build")
import tests.perf as perf
def run_setup_and_report(setup):
results = setup.run()
print("Setup:\n ", setup)
print("Results:\n " + str(results).replace("\n","\n "))
results.export()
if __name__ == "__main__":
# Packet Count Works: 2^15; Segfault at: 2^16
# TODO: support multiple tests via proper socket termination
setups = [
perf.TestSetup(packets=2**17,delay=1,loss=0,reorder=0,duplicate=0)
]
for setup in setups:
run_setup_and_report(setup)
...@@ -3,7 +3,7 @@ import prrt ...@@ -3,7 +3,7 @@ import prrt
port = int(sys.argv[1]) port = int(sys.argv[1])
s = prrt.PrrtSocket(("127.0.0.1", port)) s = prrt.PrrtSocket(("0.0.0.0", port))
while True: while True:
d, addr = s.recv() d, addr = s.recv()
......
...@@ -5,7 +5,7 @@ host = sys.argv[1] ...@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2]) port = int(sys.argv[2])
localport = int(sys.argv[3]) localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport), maximum_payload_size=150) s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port)) s.connect((host, port))
for i in range(10): for i in range(10):
......
...@@ -10,14 +10,29 @@ if (XLAP) ...@@ -10,14 +10,29 @@ if (XLAP)
add_definitions(-DXLAP) add_definitions(-DXLAP)
endif() endif()
option (TCP "Set time protocol to TCP.")
if (TCP)
add_definitions(-DTCP)
endif()
add_subdirectory(proto) add_subdirectory(proto)
add_subdirectory(util) add_subdirectory(util)
add_executable(sender sender.c) add_executable(sender sender.c)
add_executable(receiver receiver.c) add_executable(receiver receiver.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c) add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT}) install(DIRECTORY ./ DESTINATION include/prrt
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT}) FILES_MATCHING PATTERN "*.h")
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
...@@ -119,11 +119,12 @@ cdef extern from "proto/socket.h": ...@@ -119,11 +119,12 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay) cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port) int PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr) int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port) int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
...@@ -144,11 +145,23 @@ cdef extern from "proto/socket.h": ...@@ -144,11 +145,23 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket) uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket) float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket) uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s); uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s); uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket) uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s)
bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s)
float PrrtSocket_get_pacing_gain(PrrtSocket *s)
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s)
uint32_t PrrtSocket_get_inflight(PrrtSocket *s)
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s)
uint32_t PrrtSocket_get_send_quantum(PrrtSocket *s)
uint32_t PrrtSocket_get_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_delivered(PrrtSocket *s)
bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h": cdef extern from "proto/stores/packetDeliveryStore.h":
......
...@@ -30,8 +30,8 @@ ...@@ -30,8 +30,8 @@
#define GF_BITS 8 #define GF_BITS 8
#define K_START 4 #define K_START 1
#define N_START 7 #define N_START 1
#define N_P_START 1 #define N_P_START 1
#define RRT_ALPHA 0.125 #define RRT_ALPHA 0.125
......
set (PRRT_SOURCES ../defines.h set (PRRT_SOURCES ../defines.h
types/block.c types/block.h bbr.c bbr.h
types/channelStateInformation.c types/channelStateInformation.h
clock.c clock.h clock.c clock.h
types/codingParams.c types/codingParams.h
receiver.c receiver.h receiver.c receiver.h
socket.c socket.h socket.c socket.h
types/applicationConstraints.c types/applicationConstraints.h timer.c timer.h
processes/dataReceiver.c processes/dataReceiver.h processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h) types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP) if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h) set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif() endif()
add_library(PRRT ${PRRT_SOURCES}) add_library(prrt_obj OBJECT ${PRRT_SOURCES})
set_property(TARGET prrt_obj PROPERTY POSITION_INDEPENDENT_CODE 1)
add_library(prrt_static STATIC $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_static rt)
set_target_properties(prrt_static PROPERTIES OUTPUT_NAME "prrt")
add_library(prrt_shared SHARED $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_shared rt)
set_target_properties(prrt_shared PROPERTIES OUTPUT_NAME "prrt")
target_link_libraries(PRRT rt) install(TARGETS prrt_static prrt_shared
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
This diff is collapsed.
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include <stdint.h>
#include "stdbool.h"
#include "types/packet.h"
#include "clock.h"
#include "types/channelStateInformation.h"
#include "types/packetTracking.h"
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define PROBE_GAIN 1.1
#define DRAIN_GAIN 0.9
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define RTprop_Inf UINT32_MAX
enum bbr_state {
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
typedef struct bbr {
pthread_mutex_t lock;
prrtByteCount_t mps;
prrtByteCount_t min_pipe_cwnd;
prrtByteCount_t initial_cwnd;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimestamp_t probe_rtt_done_stamp;
bool probe_rtt_round_done;
bool packet_conservation;
prrtByteCount_t prior_cwnd;
prrtByteCount_t cwnd;
prrtByteCount_t target_cwnd;
bool idle_restart;
enum bbr_state state;
double pacing_gain;
float cwnd_gain;
bool filled_pipe;
prrtByteCount_t full_bw;
uint32_t full_bw_count;
double pacing_rate;
bool has_seen_rtt;
uint32_t next_round_delivered;
bool round_start;
uint32_t round_count;
uint32_t next_rtt_delivered;
uint32_t rtt_count;
bool rtprop_expired;
bool is_loss_recovery;
prrtTimestamp_t loss_recovery_stamp;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
prrtByteCount_t send_quantum;
prrtDeliveryRate_t bw;
WindowedFilter* btlBwFilter;
} BBR;
BBR* BBR_Init(prrtByteCount_t maximum_payload_size);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnRTOLoss(BBR *bbr);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);
double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
prrtByteCount_t BBR_getInflight(BBR* bbr);
prrtByteCount_t BBR_getSendQuantum(BBR* bbr);
bool BBR_getRoundStart(BBR* bbr);
#endif //PRRT_BBR_H
#include <sys/time.h> #include <sys/time.h>
#include <stdlib.h> #include <stdlib.h>
#include "../util/time.h"
#include "../util/common.h" #include "../util/common.h"
#include "clock.h" #include "clock.h"
...@@ -19,50 +20,45 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock) ...@@ -19,50 +20,45 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock)
prrtTimestamp_t lastMeasurement = clock->lastMeasurement; prrtTimestamp_t lastMeasurement = clock->lastMeasurement;
prrtTimeDifference_t diff = diff_abs_ts(currentTime, lastMeasurement); prrtTimeDifference_t diff = diff_abs_ts(currentTime, lastMeasurement);
prrtTimeDifference_t skew = (diff * clock->skew) / 400; prrtTimeDifference_t skew = (diff * clock->skew) / 400;
return (prrtTimestamp_t) virtualTime + clock->meanDeviation + skew; return (prrtTimestamp_t) (virtualTime + clock->meanDeviation + skew);
} else { } else {
return currentTime; return currentTime;
} }
} }
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t propagation_delay)
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
{ {
prrtTimestamp_t virtualTime = PrrtClock_get_prrt_time_us(clock);
prrtTimestamp_t currentTime = PrrtClock_get_current_time_us(); prrtTimestamp_t currentTime = PrrtClock_get_current_time_us();
prrtTimestamp_t virtualTime = clock->virtualTime; int64_t clockSkew = clock->skew;
prrtTimeDifference_t clockSkew = clock->skew; prrtTimedelta_t delay = propagation_delay / 2; // half the rtt
prrtTimedelta_t delay = rtt/2; // half the rtt int64_t phaseError = PrrtTimestamp_cmp(referenceTime, virtualTime) + delay;
prrtTimeDifference_t phaseError = referenceTime - virtualTime + delay; //printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13ld, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13d, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
if(abs(phaseError) > 10000) { if(labs(phaseError) > 10000) {
clock->meanDeviation = 0; clock->meanDeviation = 0;
clock->skew = 0; clock->skew = 0;
clock->lastMeasurement = currentTime; clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime; clock->virtualTime = currentTime;
} }
prrtTimeDifference_t deviationSum = (prrtTimeDifference_t) (phaseError + 3 * clock->meanDeviation); // TODO: why 3? int64_t deviationSum =(phaseError + 3 * clock->meanDeviation); // TODO: why 3?
prrtTimeDifference_t meanDeviation = (prrtTimeDifference_t) (deviationSum / 4); // TODO: find out why??? prrtTimeDifference_t meanDeviation = (prrtTimeDifference_t) (deviationSum / 4); // TODO: find out why???
prrtTimeDifference_t period = diff_abs_ts(currentTime, clock->lastMeasurement); int64_t period = PrrtTimestamp_cmp(currentTime, clock->lastMeasurement);
if(period > 0) { if(period > 0) {
clockSkew = (meanDeviation / period) + 15 * clock->skew; clockSkew = (meanDeviation / period) + 15 * clock->skew;
clockSkew = (prrtTimeDifference_t) (clockSkew / 16); clockSkew = (prrtTimeDifference_t) (clockSkew / 16);
} }
virtualTime = virtualTime + meanDeviation + period * clockSkew / 400; virtualTime = clock->virtualTime + meanDeviation + period * clockSkew / 400;
clock->meanDeviation = meanDeviation; clock->meanDeviation = meanDeviation;
clock->skew = clockSkew; clock->skew = (prrtTimeDifference_t) clockSkew;
clock->lastMeasurement = currentTime; clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime; clock->virtualTime = virtualTime;
if(abs(phaseError) > 10000) { //printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
clock->virtualTime = currentTime;
}
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
return true; return true;
} }
......
...@@ -26,7 +26,7 @@ prrtTimestamp_t PrrtClock_get_current_time_us(void); ...@@ -26,7 +26,7 @@ prrtTimestamp_t PrrtClock_get_current_time_us(void);
prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock); prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt); bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t propagation_delay);
#define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB)) #define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB))
......
This diff is collapsed.
This diff is collapsed.
#ifndef PRRT_DATA_TRANSMITTER_H #ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H #define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr); #include "../socket.h"
void * PrrtDataTransmitter_send_data_loop(void *ptr);
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet);
#endif //PRRT_DATA_TRANSMITTER_H #endif //PRRT_DATA_TRANSMITTER_H
This diff is collapsed.
...@@ -7,28 +7,10 @@ ...@@ -7,28 +7,10 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <netdb.h>
#include "stores/inFlightPacketStore.h" #include "stores/inFlightPacketStore.h"
#include "bbr.h"
#include "types/applicationConstraints.h"
#include "types/channelStateInformation.h" #include "types/channelStateInformation.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // Bps
} PrrtRateSample;
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
} PrrtPacketTracking;
typedef struct prrtReceiver { typedef struct prrtReceiver {
const char *host_name; const char *host_name;
...@@ -36,23 +18,39 @@ typedef struct prrtReceiver { ...@@ -36,23 +18,39 @@ typedef struct prrtReceiver {
struct addrinfo *ai; struct addrinfo *ai;
PrrtChannelStateInformation *csi; PrrtChannelStateInformation *csi;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t wait_for_space;
atomic_bool closing;
BBR* bbr;
PrrtInFlightPacketStore *dataPacketStates; PrrtInFlightPacketStore *dataInflightPacketStore;
PrrtInFlightPacketStore *redundancyPacketStates; PrrtInFlightPacketStore *redundancyInflightPacketStore;
PrrtRateSample *rateSample; PrrtRateSample *rateSample;
PrrtPacketTracking *packetTracking; PrrtPacketTracking *packetTracking;
} PrrtReceiver; } PrrtReceiver;
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port); PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t maximum_payload_size);
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime);
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime); void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_rto_check(PrrtReceiver *recv, PrrtApplicationConstraints *constraints);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *feedbackPayload, prrtTimestamp_t receiveTime,
prrtTimedelta_t rtt, PrrtApplicationConstraints *constraints);
bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver, prrtByteCount_t maximum_payload_size,
PrrtApplicationConstraints *pConstraints);
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *socket, PrrtApplicationConstraints *applicationConstraints);
void PrrtReceiver_on_application_write(PrrtReceiver* receiver); prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver);
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);
void PrrtReceiver_interrupt(PrrtReceiver *receiver);
bool PrrtReceiver_destroy(PrrtReceiver *receiver); bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H #endif //PRRT_RECEIVER_H
This diff is collapsed.
#ifndef PRRT_SOCKET_H #ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H #define PRRT_SOCKET_H
#include "../defines.h" #include "../defines.h"
#include "../util/list.h" #include "../util/list.h"
#include "../util/pipe.h" #include "../util/pipe.h"
...@@ -9,6 +10,8 @@ ...@@ -9,6 +10,8 @@
#include "stores/dataPacketStore.h" #include "stores/dataPacketStore.h"
#include "stores/deliveredPacketTable.h" #include "stores/deliveredPacketTable.h"
#include "stores/packetTimeoutTable.h" #include "stores/packetTimeoutTable.h"
#include "stores/pace.h"
#include "stores/paceFilter.h"
#include "stores/receptionTable.h" #include "stores/receptionTable.h"
#include "stores/repairBlockStore.h" #include "stores/repairBlockStore.h"
#include "stores/packetDeliveryStore.h" #include "stores/packetDeliveryStore.h"
...@@ -17,6 +20,7 @@ ...@@ -17,6 +20,7 @@
#include "types/lossStatistics.h" #include "types/lossStatistics.h"
#include "types/packet.h" #include "types/packet.h"
#include "clock.h" #include "clock.h"
#include "timer.h"
#include "xlap.h" #include "xlap.h"
#include "receiver.h" #include "receiver.h"
...@@ -31,8 +35,12 @@ typedef struct prrtSocket { ...@@ -31,8 +35,12 @@ typedef struct prrtSocket {
struct sockaddr_in *address; struct sockaddr_in *address;
bool isBound; bool isBound;
bool withTimestamp;
bool pacingEnabled;
PrrtClock clock; PrrtClock clock;
PrrtBlock* receiveBlock;
pthread_t sendDataThread; pthread_t sendDataThread;
Pipe *sendDataQueue; Pipe *sendDataQueue;
...@@ -50,6 +58,8 @@ typedef struct prrtSocket { ...@@ -50,6 +58,8 @@ typedef struct prrtSocket {
atomic_bool closing; atomic_bool closing;
prrtTimestamp_t nextSendTime;
prrtSequenceNumber_t packetsCount; prrtSequenceNumber_t packetsCount;
prrtSequenceNumber_t sequenceNumberSource; prrtSequenceNumber_t sequenceNumberSource;
prrtSequenceNumber_t sequenceNumberRepetition; prrtSequenceNumber_t sequenceNumberRepetition;
...@@ -69,6 +79,15 @@ typedef struct prrtSocket { ...@@ -69,6 +79,15 @@ typedef struct prrtSocket {
PrrtCodingConfiguration *codingParameters; PrrtCodingConfiguration *codingParameters;
PrrtCoder *coder; PrrtCoder *coder;
// Pacing
PrrtPace* appSendPace;
PrrtPace* prrtTransmitPace;
PrrtPace* prrtReceivePace;
PrrtPace* appDeliverPace;
prrtTimedelta_t send_peer_btl_pace;
prrtTimedelta_t recv_peer_btl_pace;
_Atomic (XlapTimestampTable *) tstable[2]; _Atomic (XlapTimestampTable *) tstable[2];
pthread_attr_t *sendDataThreadAttr; pthread_attr_t *sendDataThreadAttr;
...@@ -77,8 +96,12 @@ typedef struct prrtSocket { ...@@ -77,8 +96,12 @@ typedef struct prrtSocket {
atomic_bool isHardwareTimestamping; atomic_bool isHardwareTimestamping;
char *interfaceName; char *interfaceName;
PrrtChannelStateInformation* senderChannelStateInformation;
atomic_bool isThreadPinning; atomic_bool isThreadPinning;
prrtByteCount_t maximum_payload_size; prrtByteCount_t maximum_payload_size;
PrrtTimer *retransmissionTimer;
} PrrtSocket; } PrrtSocket;
...@@ -88,7 +111,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa ...@@ -88,7 +111,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s); bool PrrtSocket_enable_thread_pinning(PrrtSocket *s);
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port); int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value); bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value);
...@@ -106,7 +129,9 @@ int PrrtSocket_close(PrrtSocket *s); ...@@ -106,7 +129,9 @@ int PrrtSocket_close(PrrtSocket *s);
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port); bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len); int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
bool PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr); int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
...@@ -118,7 +143,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr ...@@ -118,7 +143,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us); int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline); int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
bool PrrtSocket_cleanup(PrrtSocket *s); bool PrrtSocket_cleanup(PrrtSocket *s);
bool PrrtSocket_closing(PrrtSocket *s); bool PrrtSocket_closing(PrrtSocket *s);
...@@ -126,11 +150,23 @@ bool PrrtSocket_closing(PrrtSocket *s); ...@@ -126,11 +150,23 @@ bool PrrtSocket_closing(PrrtSocket *s);
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s); bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s); uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s); prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s); prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s); uint64_t PrrtSocket_get_full_bw(PrrtSocket *s);
bool PrrtSocket_get_filled_pipe(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s);
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s);
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s);
float PrrtSocket_get_pacing_gain(PrrtSocket *s);
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s);
uint32_t PrrtSocket_get_inflight(PrrtSocket *s);
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s);
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s);
#endif // PRRT_SOCKET_H #endif // PRRT_SOCKET_H
...@@ -80,9 +80,11 @@ void PrrtDataPacketStore_remove_range(PrrtDataPacketStore *store, List *res, prr ...@@ -80,9 +80,11 @@ void PrrtDataPacketStore_remove_range(PrrtDataPacketStore *store, List *res, prr
bool PrrtDataStore_insert(PrrtDataPacketStore *store, PrrtPacket *packet) bool PrrtDataStore_insert(PrrtDataPacketStore *store, PrrtPacket *packet)
{ {
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
debug(DEBUG_BLOCK, "PrrtDataStore_insert: %d", packet->sequenceNumber);
if(BPTree_get(store->dataStore, packet->sequenceNumber) == NULL) { if(BPTree_get(store->dataStore, packet->sequenceNumber) == NULL) {
store->dataStore = BPTree_insert(store->dataStore, packet->sequenceNumber, packet); store->dataStore = BPTree_insert(store->dataStore, packet->sequenceNumber, packet);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed."); check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
debug(DEBUG_BLOCK, "PrrtDataStore_size: %d", PrrtDataPacketStore_size(store));
return true; return true;
} }
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed."); check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
......
#include <stdint.h> #include <stdint.h>
#include "inFlightPacketStore.h" #include "../clock.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
#include "inFlightPacketStore.h"
PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void) PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
...@@ -9,7 +10,8 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void) ...@@ -9,7 +10,8 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
PrrtInFlightPacketStore* packetStore = calloc(1, sizeof(PrrtInFlightPacketStore)); PrrtInFlightPacketStore* packetStore = calloc(1, sizeof(PrrtInFlightPacketStore));
check_mem(packetStore); check_mem(packetStore);
packetStore->outstandingPackets = NULL; packetStore->outstandingPackets_by_seqno = NULL;
packetStore->outstandingPackets_by_senttime = NULL;
return packetStore; return packetStore;
...@@ -20,34 +22,63 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void) ...@@ -20,34 +22,63 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet) void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet)
{ {
packetStore->outstandingPackets = BPTree_insert(packetStore->outstandingPackets, packet->sequenceNumber, packetStore->outstandingPackets_by_seqno = BPTree_insert(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber,
packet); packet);
packetStore->packetQueueSize++; packetStore->outstandingPackets_by_senttime = BPTree_insert(packetStore->outstandingPackets_by_senttime, packet->sent_time,
packet);
}
void* PrrtInFlightPacketStore_get_packet_by_seqno(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum)
{
return BPTree_get(packetStore->outstandingPackets_by_seqno, seqNum);
} }
void* PrrtInFlightPacketStore_get_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum) void* PrrtInFlightPacketStore_get_packet_by_senttime(PrrtInFlightPacketStore *packetStore, prrtTimestamp_t senttime)
{ {
return BPTree_get(packetStore->outstandingPackets, seqNum); return BPTree_get(packetStore->outstandingPackets_by_senttime, senttime);
} }
void PrrtInFlightPacketStore_remove_outstanding_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum) void PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(PrrtInFlightPacketStore *packetStore,
BPTreeKey_t seqNum)
{ {
PrrtPacket* packet = (PrrtPacket*) BPTree_get(packetStore->outstandingPackets, seqNum); PrrtPacket* outstanding_packet = (PrrtPacket*) BPTree_get(packetStore->outstandingPackets_by_seqno, seqNum);
if (packet != NULL) { if(outstanding_packet != NULL) {
packetStore->outstandingPackets = BPTree_delete(packetStore->outstandingPackets, seqNum); packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, seqNum);
packetStore->packetQueueSize--; packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, outstanding_packet->sent_time);
PrrtPacket_destroy(outstanding_packet);
}
}
prrtByteCount_t PrrtInFlightPacketStore_clear_before(PrrtInFlightPacketStore *packetStore,
prrtTimestamp_t time) {
prrtByteCount_t lostBytes = 0;
List *packetStateList = List_create();
if(time >= HALF_TIMESTAMP) {
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, time - HALF_TIMESTAMP, time - 1);
} else {
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, 0, time - 1);
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, time + HALF_TIMESTAMP, MAX_TIMESTAMP);
}
while (List_count(packetStateList) > 0) {
PrrtPacket *packet = List_shift(packetStateList);
lostBytes += packet->payloadLength;
packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber);
packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, packet->sent_time);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
} }
List_destroy(packetStateList);
return lostBytes;
} }
bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore) bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore)
{ {
if (packetStore->outstandingPackets != NULL) { if (packetStore->outstandingPackets_by_seqno != NULL) {
List *packetStateList = List_create(); List *packetStateList = List_create();
BPTree_get_range(packetStore->outstandingPackets, packetStateList, 0, SEQNO_SPACE - 1); BPTree_get_range(packetStore->outstandingPackets_by_seqno, packetStateList, 0, SEQNO_SPACE - 1);
while (List_count(packetStateList) > 0) { while (List_count(packetStateList) > 0) {
PrrtPacket *packet = List_shift(packetStateList); PrrtPacket *packet = List_shift(packetStateList);
packetStore->outstandingPackets = BPTree_delete(packetStore->outstandingPackets, packet->sequenceNumber); packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber);
packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, packet->sent_time);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
} }
...@@ -56,8 +87,3 @@ bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore) ...@@ -56,8 +87,3 @@ bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore)
free(packetStore); free(packetStore);
return true; return true;
} }
uint32_t PrrtInFlightPacketStore_get_queue_size(PrrtInFlightPacketStore *packetStore)
{
return packetStore->packetQueueSize;
}
\ No newline at end of file
...@@ -5,16 +5,19 @@ ...@@ -5,16 +5,19 @@
#include "../types/packet.h" #include "../types/packet.h"
typedef struct inFlightPackets { typedef struct inFlightPackets {
BPTreeNode* outstandingPackets; BPTreeNode* outstandingPackets_by_seqno;
uint32_t packetQueueSize; BPTreeNode* outstandingPackets_by_senttime;
} PrrtInFlightPacketStore; } PrrtInFlightPacketStore;
PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void); PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void);
void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet); void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet);
void PrrtInFlightPacketStore_remove_outstanding_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum); void PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(PrrtInFlightPacketStore *packetStore,
BPTreeKey_t seqNum);
bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore); bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore);
uint32_t PrrtInFlightPacketStore_get_queue_size(PrrtInFlightPacketStore *packetStore); void* PrrtInFlightPacketStore_get_packet_by_seqno(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum);
void* PrrtInFlightPacketStore_get_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum); PrrtPacket * PrrtInFlightPacketStore_get_first_packet_before_seqno(PrrtInFlightPacketStore *packetStore,
prrtSequenceNumber_t seqNum);
prrtByteCount_t PrrtInFlightPacketStore_clear_before(PrrtInFlightPacketStore *packetStore, prrtTimestamp_t time);
......
#include "pace.h"
#include "../clock.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../../util/time.h"
#include <math.h>
PrrtPace* PrrtPace_create(void) {
PrrtPace* pace = (PrrtPace*) calloc(1, sizeof(PrrtPace));
check_mem(pace);
prrtTimedelta_t filterLength_us = 2 * 1000 * 1000; // 2 seconds
pace->internalPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MAX);
pace->dependentPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MAX);
pace->externalPace = PrrtPaceFilter_create(filterLength_us, FILTER_TYPE_MAX);
pace->totalPauseDuration_ns = 0;
pace->initialized = false;
pace->firstRoundDone = false;
clock_gettime(CLOCK_REALTIME, &pace->lastStartTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastEndTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
return pace;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
bool PrrtPace_destroy(PrrtPace* pace) {
if(pace->internalPace != NULL) {
check(PrrtPaceFilter_destroy(pace->internalPace), "Cannot destroy internalPace.")
}
if(pace->dependentPace != NULL) {
check(PrrtPaceFilter_destroy(pace->dependentPace), "Cannot destroy dependentPace.")
}
if(pace->externalPace != NULL) {
check(PrrtPaceFilter_destroy(pace->externalPace), "Cannot destroy externalPace.")
}
free(pace);
return true;
error:
return false;
}
prrtTimedelta_t PrrtPace_get_internal(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->internalPace);
}
prrtTimedelta_t PrrtPace_get_external(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->externalPace);
}
prrtTimedelta_t PrrtPace_get_dependent(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->dependentPace);
}
prrtTimedelta_t PrrtPace_get_total(PrrtPace* pace) {
return PrrtPaceFilter_get(pace->internalPace) + PrrtPaceFilter_get(pace->externalPace);
}
prrtTimedelta_t PrrtPace_get_effective(PrrtPace* pace) {
return (prrtTimedelta_t) MAX(0, ((int64_t) PrrtPaceFilter_get(pace->externalPace)) + ((int64_t)PrrtPaceFilter_get(pace->internalPace)) - ((int64_t)PrrtPaceFilter_get(pace->dependentPace)));
}
void PrrtPace_track_start(PrrtPace* pace) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
if (pace->initialized) {
long long int internalDelta_ns = timedelta(&pace->lastEndTimestamp, &pace->lastStartTimestamp);
long long int externalDelta_ns = timedelta(&now, &pace->lastEndTimestamp);
if (pace->firstRoundDone) {
// make sure internal >= dependent
PrrtPaceFilter_update(pace->internalPace, (prrtTimedelta_t) MAX(0, round(((double) MAX(internalDelta_ns, pace->totalPauseDuration_ns)) / 1000)));
PrrtPaceFilter_update(pace->externalPace, (prrtTimedelta_t) MAX(0, round(((double) externalDelta_ns) / 1000)));
PrrtPaceFilter_update(pace->dependentPace, (prrtTimedelta_t) MAX(0, round(((double) pace->totalPauseDuration_ns) / 1000)));
}
pace->firstRoundDone = true;
}
pace->totalPauseDuration_ns = 0;
pace->lastStartTimestamp = now;
pace->initialized = true;
}
void PrrtPace_track_end(PrrtPace* pace) {
clock_gettime(CLOCK_REALTIME, &pace->lastEndTimestamp);
}
void PrrtPace_track_pause(PrrtPace* pace) {
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
}
void PrrtPace_track_resume(PrrtPace* pace) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int delta = timedelta(&now, &pace->lastPauseTimestamp);
clock_gettime(CLOCK_REALTIME, &pace->lastPauseTimestamp);
pace->totalPauseDuration_ns += delta;
}
#ifndef PRRT_PACE_H
#define PRRT_PACE_H
#include "paceFilter.h"
typedef struct prrtPace {
PrrtPaceFilter* internalPace;
PrrtPaceFilter* externalPace;
PrrtPaceFilter* dependentPace;
struct timespec lastStartTimestamp;
struct timespec lastEndTimestamp;
struct timespec lastPauseTimestamp;
prrtTimedelta_t totalPauseDuration_ns;
bool initialized;
bool firstRoundDone;
} PrrtPace;
PrrtPace* PrrtPace_create(void);
bool PrrtPace_destroy(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_internal(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_external(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_dependent(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_total(PrrtPace* pace);
prrtTimedelta_t PrrtPace_get_effective(PrrtPace* pace);
void PrrtPace_track_start(PrrtPace* pace);
void PrrtPace_track_end(PrrtPace* pace);