...
 
Commits (261)
......@@ -9,3 +9,4 @@ tests/__pycache__/
MANIFEST
prrt.cpython*.so
prrt.so
.ipynb_checkpoints/
......@@ -22,8 +22,8 @@ build:prrt:
- which cmake
- which gcc
- which g++
- pip3 list | grep Cython
- pip3 list | grep numpy
- pip3 list --format=legacy | grep Cython
- pip3 list --format=legacy | grep numpy
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
- make
......@@ -38,6 +38,17 @@ build:container:
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem:
stage: test
dependencies:
......@@ -94,4 +105,4 @@ deploy:profile:
paths:
- gprof-send.txt
- gprof-recv.txt
expire_in: 30 days
\ No newline at end of file
expire_in: 30 days
......@@ -36,4 +36,4 @@ if(PRRT_TESTS)
endif()
add_custom_target(perftest COMMAND python3 tests/eval.py)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
\ No newline at end of file
......@@ -5,11 +5,13 @@
## Features
* Hybrid error control (FEC + ARQ) using systematic Vandermonde codes
* Congestion control and pacing using a variant of [BBR](https://groups.google.com/forum/#!forum/bbr-dev)
* Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](http://xlap.larn.systems)
* Wireshark dissector written in Lua
* [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping)
## Installation
......@@ -29,9 +31,9 @@ port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
d = s.recv().decode("utf8")
if d != "Close":
print d
print(d)
else:
break
```
......@@ -44,25 +46,27 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port)
s = prrt.PrrtSocket(port=localport)
s.connect(host, port)
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
```
Start the receiver by:
```bash
python receiver.py 5000
python3 receiver.py 5000
```
In a separate terminal, run:
```bash
python sender.py 127.0.0.1 5000
python3 sender.py 127.0.0.1 5000 6000
```
This should generate the following output in the receiver console:
......@@ -84,13 +88,13 @@ Packet 9
If you find PRRT useful and incorporate it in your works, we are very happy to hear about it. Please also consider to cite us like this:
```bibtex
@misc{sic2018prrt,
author = {Schmidt, Andreas},
title = {PRRT: Predictably Reliable Real-time Transport},
howpublished={Web page},
url = {http://prrt.larn.systems},
year = {2018}
}
@misc{sic2018prrt,
author = {Schmidt, Andreas},
title = {PRRT: Predictably Reliable Real-time Transport},
howpublished={Web page},
url = {http://prrt.larn.systems},
year = {2018}
}
```
## License
......
......@@ -35,6 +35,7 @@ local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k")
local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE)
local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT")
local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT")
local pf_fb_btlPace = ProtoField.uint32("prrt.feedback.btlPace", "Bottleneck pace")
local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count")
local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count")
local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length")
......@@ -66,6 +67,7 @@ prrt_proto.fields = {
pf_fb,
pf_fb_groupRTT,
pf_fb_ftt,
pf_fb_btlPace,
pf_fb_erasurecount,
pf_fb_packetcount,
pf_fb_gaplength,
......@@ -89,9 +91,16 @@ local ex_type = Field.new("prrt.type")
local function getType() return ex_type()() end
local function getTypeName() return prrtPacketTypeNames[getType()] end
local ex_index = Field.new("prrt.index")
local function getIndex() return ex_index()() end
local ex_data_length = Field.new("prrt.data.length")
local function getDataLength() return ex_data_length()() end
local ex_red_baseseqno = Field.new("prrt.redundancy.baseSequenceNumber")
local function getRedBaseSeqNo() return ex_red_baseseqno()() end
local ex_red_n = Field.new("prrt.redundancy.n")
local function getRedN() return ex_red_n()() end
......@@ -109,7 +118,7 @@ local function dissect_data(buffer, pinfo, root)
tree:add(pf_data_groupRTprop, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4))
local label = "DATA Len=" .. getDataLength()
local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -121,7 +130,7 @@ local function dissect_redundancy(buffer, pinfo, root)
tree:add(pf_red_n, buffer:range(6,1))
tree:add(pf_red_k, buffer:range(7,1))
local label = "REDUNDANCY n=" .. getRedN() .. " k=" .. getRedK()
local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -130,16 +139,17 @@ local function dissect_feedback(buffer, pinfo, root)
local tree = root:add(pf_fb, buffer:range(0))
tree:add(pf_fb_groupRTT, buffer:range(0,4))
tree:add(pf_fb_ftt, buffer:range(4,4))
tree:add(pf_fb_erasurecount, buffer:range(8,2))
tree:add(pf_fb_packetcount, buffer:range(10,2))
tree:add(pf_fb_gaplength, buffer:range(12,2))
tree:add(pf_fb_gapcount, buffer:range(14,2))
tree:add(pf_fb_burstlength, buffer:range(16,2))
tree:add(pf_fb_burstcount, buffer:range(18,2))
tree:add(pf_fb_acktype, buffer:range(20,1))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2))
local label = "FEEDBACK"
tree:add(pf_fb_btlPace, buffer:range(8,4))
tree:add(pf_fb_erasurecount, buffer:range(12,2))
tree:add(pf_fb_packetcount, buffer:range(14,2))
tree:add(pf_fb_gaplength, buffer:range(16,2))
tree:add(pf_fb_gapcount, buffer:range(18,2))
tree:add(pf_fb_burstlength, buffer:range(20,2))
tree:add(pf_fb_burstcount, buffer:range(22,2))
tree:add(pf_fb_acktype, buffer:range(24,1))
tree:add(pf_fb_ackSeqN, buffer:range(25, 2))
local label = "[F]"
tree:set_text(label)
pinfo.cols.info:set(label)
end
......
FROM gcc:5
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y cmake
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake . \
&& make
ENV PATH /prrt/bin:$PATH
WORKDIR /prrt/bin
ENV PATH /prrt:$PATH
VOLUME /output
......
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake -DTCP=1 . \
&& make
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
#!/bin/bash
#!/bin/sh
dev=eth0
command=$1
shift
if [[ "$command" == "sender" || "$command" == "receiver" ]]; then
if [[ "$command" == "sender" || "$command" == "receiver" || "$command" == "time-sender" || "$command" == "time-receiver" ]]; then
:
else
echo "Command should be either sender or receiver."
exit 0;
fi
TARGET="127.0.0.1"
OUTPUT="/dev/null"
NETEM=()
PRRT=()
while [[ $# -gt 0 ]]
......@@ -19,7 +21,24 @@ do
key="$1"
case $key in
-t|--target|-p|--port|-r|--rounds)
-t|--target)
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
PRRT+=("$1 $2")
fi
TARGET=("$2")
shift
shift
;;
-w|--wireshark)
OUTPUT=("$2")
shift
shift
;;
-T|--threadpinning|-U)
PRRT+=("$1")
shift
;;
-p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay|-j|--appjitter)
PRRT+=("$1 $2")
shift
shift
......@@ -34,8 +53,34 @@ done
PRRT_PARAMS="${PRRT[@]}"
NETEM_PARAMS="${NETEM[@]}"
echo "Starting Wireshark."
tshark -i eth0 -w $OUTPUT.pcap &
TSHARK_PID=$!
sleep 2
start=$(date +%s.%N);
echo "Checking reachability of $TARGET."
until ping -c1 $TARGET &>/dev/null; do sleep 1; done
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Reachable after %.6f seconds\n" $dur
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
echo "Delaying $command"
sleep 10
fi
start=$(date +%s.%N);
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
tc qdisc add dev $dev root netem $NETEM_PARAMS
/prrt/$command $PRRT_PARAMS -o /output/log.csv
tc qdisc del dev $dev root
#tc qdisc add dev $dev root netem $NETEM_PARAMS
trap 'echo "Caught SIGINT."; echo "$(ps -a)"; killall -SIGINT $command' INT
LOG=$(/prrt/$command $PRRT_PARAMS 2>&1)
printf "$LOG\n"
echo "Exit status: $?"
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Done after %.6f seconds\n" $dur
#tc qdisc del dev $dev root
kill $TSHARK_PID
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.read_csv(\"eval.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
import sys
sys.path.insert(0, "./build")
import tests.perf as perf
def run_setup_and_report(setup):
results = setup.run()
print("Setup:\n ", setup)
print("Results:\n " + str(results).replace("\n","\n "))
results.export()
if __name__ == "__main__":
# Packet Count Works: 2^15; Segfault at: 2^16
# TODO: support multiple tests via proper socket termination
setups = [
perf.TestSetup(packets=2**17,delay=1,loss=0,reorder=0,duplicate=0)
]
for setup in setups:
run_setup_and_report(setup)
......@@ -3,7 +3,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(("127.0.0.1", port))
s = prrt.PrrtSocket(("0.0.0.0", port))
while True:
d, addr = s.recv()
......
......@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport), maximum_payload_size=150)
s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port))
for i in range(10):
......
......@@ -10,14 +10,29 @@ if (XLAP)
add_definitions(-DXLAP)
endif()
option (TCP "Set time protocol to TCP.")
if (TCP)
add_definitions(-DTCP)
endif()
add_subdirectory(proto)
add_subdirectory(util)
add_executable(sender sender.c)
add_executable(receiver receiver.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
install(DIRECTORY ./ DESTINATION include/prrt
FILES_MATCHING PATTERN "*.h")
......@@ -119,11 +119,12 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
int PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
......@@ -144,11 +145,23 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s)
bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s)
float PrrtSocket_get_pacing_gain(PrrtSocket *s)
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s)
uint32_t PrrtSocket_get_inflight(PrrtSocket *s)
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s)
uint32_t PrrtSocket_get_send_quantum(PrrtSocket *s)
uint32_t PrrtSocket_get_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_delivered(PrrtSocket *s)
bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h":
......
......@@ -30,8 +30,8 @@
#define GF_BITS 8
#define K_START 4
#define N_START 7
#define K_START 1
#define N_START 1
#define N_P_START 1
#define RRT_ALPHA 0.125
......
set (PRRT_SOURCES ../defines.h
types/block.c types/block.h
types/channelStateInformation.c types/channelStateInformation.h
bbr.c bbr.h
clock.c clock.h
types/codingParams.c types/codingParams.h
receiver.c receiver.h
socket.c socket.h
types/applicationConstraints.c types/applicationConstraints.h
timer.c timer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif()
add_library(PRRT ${PRRT_SOURCES})
add_library(prrt_obj OBJECT ${PRRT_SOURCES})
set_property(TARGET prrt_obj PROPERTY POSITION_INDEPENDENT_CODE 1)
add_library(prrt_static STATIC $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_static rt)
set_target_properties(prrt_static PROPERTIES OUTPUT_NAME "prrt")
add_library(prrt_shared SHARED $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_shared rt)
set_target_properties(prrt_shared PROPERTIES OUTPUT_NAME "prrt")
target_link_libraries(PRRT rt)
install(TARGETS prrt_static prrt_shared
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#include "bbr.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "receiver.h"
#include <math.h>
prrtByteCount_t BBR_Inflight(BBR* bbr, double gain)
{
if (bbr->rtprop == RTprop_Inf)
return bbr->initial_cwnd; /* no valid RTT samples yet */
uint32_t quanta = bbr->mps;
uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000));
return (uint32_t)(gain * estimated_bdp + quanta);
}
void BBR_EnterStartup(BBR* bbr)
{
bbr->state = STARTUP;
bbr->pacing_gain = BBRHighGain;
bbr->cwnd_gain = BBRHighGain;
}
void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking)
{
if (tracking->delivered >= bbr->next_round_delivered) {
bbr->next_round_delivered = tracking->delivered;
bbr->round_count++;
bbr->round_start = true;
} else {
bbr->round_start = false;
}
uint32_t delivery_rate_Bps = (uint32_t)((float) rs->delivery_rate);
if ((delivery_rate_Bps >= bbr->bw || !rs->is_app_limited) && delivery_rate_Bps != 0) {
bbr->bw = (uint32_t)WindowedFilter_push(bbr->btlBwFilter, delivery_rate_Bps);
debug(DEBUG_BBR, "Current BtlBw: %u, RS delivery rate: %u", bbr->bw, delivery_rate_Bps);
}
}
void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs)
{
if (bbr->filled_pipe || !bbr->round_start || rs->is_app_limited)
return; // no need to check for a full pipe now
if (bbr->bw >= bbr->full_bw * PROBE_GAIN) { // BBR.BtlBw still growing?
bbr->full_bw = bbr->bw; // record new baseline level
bbr->full_bw_count = 0;
return;
}
bbr->full_bw_count++; // another round w/o much growth
if (bbr->full_bw_count >= 3)
bbr->filled_pipe = true;
}
bool BBR_IsNextCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight)
{
bool is_full_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > bbr->rtprop;
if (bbr->pacing_gain == 1)
return is_full_length;
if (bbr->pacing_gain > 1)
return is_full_length && (bytes_lost > 0 || prior_inflight >= BBR_Inflight(bbr, bbr->pacing_gain));
bool is_max_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > 4 * bbr->rtprop;
return is_max_length || (prior_inflight <= BBR_Inflight(bbr, 1.0));
}
void BBR_AdvanceCyclePhase(BBR* bbr)
{
bbr->cycle_stamp = PrrtClock_get_current_time_us();
bbr->cycle_index = (uint8_t )((bbr->cycle_index + 1) % BBRGainCycleLen);
float pacing_gain_cycle[BBRGainCycleLen] = {PROBE_GAIN, DRAIN_GAIN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0};
bbr->pacing_gain = pacing_gain_cycle[bbr->cycle_index];
debug(DEBUG_BBR, "Advanced cycle with gain: %f", bbr->pacing_gain);
}
void BBR_CheckCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight) {
if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, bytes_lost, prior_inflight))
BBR_AdvanceCyclePhase(bbr);
}
void BBR_EnterProbeBW(BBR* bbr)
{
bbr->state = PROBE_BW;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 2;
bbr->cycle_index = (uint8_t)(BBRGainCycleLen - 1 - (random() % 7));
BBR_AdvanceCyclePhase(bbr);
}
void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight)
{
if (bbr->state == STARTUP && bbr->filled_pipe) {
//Drain
bbr->state = DRAIN;
bbr->pacing_gain = 1 / BBRHighGain; // pace slowly
bbr->cwnd_gain = BBRHighGain; // maintain cwnd
}
if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0))
BBR_EnterProbeBW(bbr); // we estimate queue is drained
}
void BBR_ExitProbeRTT(BBR* bbr)
{
if (bbr->filled_pipe)
BBR_EnterProbeBW(bbr);
else
BBR_EnterStartup(bbr);
}
uint32_t BBR_SaveCwnd(BBR* bbr)
{
if (!bbr->is_loss_recovery && bbr->state != PROBE_RTT)
return bbr->cwnd;
return MAX(bbr->prior_cwnd ,bbr->cwnd);
}
void BBR_RestoreCwnd(BBR* bbr)
{
bbr->cwnd = MAX(bbr->cwnd, bbr->prior_cwnd);
}
void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt)
{
bbr->rtprop_expired = PrrtClock_get_current_time_us() > (bbr->rtprop_stamp + RTpropFilterLen);
if (rtt >= 0 && (rtt <= bbr->rtprop || bbr->rtprop_expired)) {
bbr->rtprop = rtt;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
}
}
void BBR_EnterProbeRTT(BBR *bbr) {
bbr->state = PROBE_RTT;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 1;
}
void BBR_HandleProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
/* Ignore low rate samples during ProbeRTT: */
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (bbr->probe_rtt_done_stamp == 0 && tracking->pipe <= bbr->min_pipe_cwnd) {
bbr->probe_rtt_done_stamp = now + ProbeRTTDuration;
bbr->probe_rtt_round_done = false;
bbr->next_round_delivered = tracking->delivered;
} else if (bbr->probe_rtt_done_stamp != 0) {
if (bbr->round_start) {
bbr->probe_rtt_round_done = true;
}
if (bbr->probe_rtt_round_done && (now > bbr->probe_rtt_done_stamp)) {
bbr->rtprop_stamp = now;
BBR_RestoreCwnd(bbr);
BBR_ExitProbeRTT(bbr);
}
}
}
void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) {
BBR_EnterProbeRTT(bbr);
BBR_SaveCwnd(bbr);
bbr->probe_rtt_done_stamp = 0;
}
if (bbr->state == PROBE_RTT) {
BBR_HandleProbeRTT(bbr, tracking);
}
bbr->idle_restart = false;
}
void BBR_UpdateModelAndState(BBR *bbr, PrrtRateSample *rs, PrrtPacketTracking *packetTracking, prrtTimedelta_t rtt)
{
BBR_UpdateBtlBw(bbr, rs, packetTracking);
BBR_CheckCyclePhase(bbr, packetTracking->bytes_lost, packetTracking->prior_inflight);
BBR_CheckFullPipe(bbr, rs);
BBR_CheckDrain(bbr, packetTracking->pipe);
BBR_UpdateRTprop(bbr, rtt);
BBR_CheckProbeRTT(bbr, packetTracking);
}
void BBR_UpdateTargetCwnd(BBR* bbr)
{
bbr->target_cwnd = BBR_Inflight(bbr, bbr->cwnd_gain);
}
void BBR_ModulateCwndForProbeRTT(BBR* bbr)
{
if (bbr->state == PROBE_RTT)
bbr->cwnd = MIN(bbr->cwnd, bbr->min_pipe_cwnd);
}
void BBR_ModulateCwndForRecovery(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t pipe, prrtByteCount_t delivered)
{
if (bytes_lost > 0) {
if (bbr->cwnd > bytes_lost) {
bbr->cwnd = MAX(bbr->cwnd - bytes_lost, bbr->mps);
} else {
bbr->cwnd = bbr->mps;
}
}
if (bbr->packet_conservation)
bbr->cwnd = MAX(bbr->cwnd, pipe + delivered);
}
void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
{
BBR_UpdateTargetCwnd(bbr);
BBR_ModulateCwndForRecovery(bbr, packetTracking->bytes_lost, packetTracking->pipe, packetTracking->delivered);
if (!bbr->packet_conservation) {
if (bbr->filled_pipe)
bbr->cwnd = MIN(bbr->cwnd + packetTracking->delivered, bbr->target_cwnd);
else if (bbr->cwnd < bbr->target_cwnd || packetTracking->delivered < bbr->initial_cwnd)
bbr->cwnd = bbr->cwnd + packetTracking->delivered;
bbr->cwnd = MAX(bbr->cwnd, bbr->min_pipe_cwnd);
}
BBR_ModulateCwndForProbeRTT(bbr);
debug(DEBUG_BBR, "New cwnd: %u, State: %u", bbr->cwnd, bbr->state);
}
void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain)
{
double rate = (pacing_gain * ((double)bbr->bw));
debug(DEBUG_BBR, "Current rate: %f, Pacing gain: %f, BtlBw: %u, Calc Rate: %f, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
bbr->pacing_rate = rate;
}
void BBR_SetPacingRate(BBR* bbr)
{
BBR_SetPacingRateWithGain(bbr, bbr->pacing_gain);
}
void BBR_SetSendQuantum(BBR* bbr) {
if(bbr->pacing_rate < 150000) { // 1.2Mbps = 0.15 MBps = 150000 Bps
bbr->send_quantum = 1 * bbr->mps;
} else if (bbr->pacing_rate < 3000000) { // 24 Mbps = 20 * 1.2Mbps = 3000000
bbr->send_quantum = 2 * bbr->mps;
} else {
bbr->send_quantum = MIN((prrtByteCount_t) round((double) bbr->pacing_rate / 1000), 64000);
}
}
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
BBR_UpdateModelAndState(bbr, rs, packetTracking, rtt);
BBR_SetPacingRate(bbr);
BBR_SetSendQuantum(bbr);
BBR_SetCwnd(bbr, packetTracking);
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(!bbr->is_loss_recovery) {
bbr->is_loss_recovery = true;
bbr->loss_recovery_stamp = PrrtClock_get_current_time_us();
bbr->prior_cwnd = BBR_SaveCwnd(bbr);
bbr->cwnd = tracking->pipe + MAX(tracking->delivered, 1);
bbr->packet_conservation = true;
} else if (PrrtClock_get_current_time_us() > bbr->loss_recovery_stamp + bbr->rtprop){
bbr->packet_conservation = false;
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnRTOLoss(BBR *bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(!bbr->is_loss_recovery) {
bbr->is_loss_recovery = true;
bbr->prior_cwnd = BBR_SaveCwnd(bbr);
bbr->cwnd = bbr->mps;
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnLossExit(BBR *bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(bbr->is_loss_recovery) {
bbr->is_loss_recovery = false;
bbr->packet_conservation = false;
BBR_RestoreCwnd(bbr);
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
{
BBR* bbr = calloc(1, sizeof(BBR));
check_mem(bbr);
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&bbr->lock, &attr) == 0, "lock init failed.");
bbr->mps = maximum_payload_size;
bbr->min_pipe_cwnd = 4 * maximum_payload_size;
bbr->initial_cwnd = 4 * maximum_payload_size;
bbr->has_seen_rtt = false;
bbr->btlBwFilter = WindowedFilter_create(true, 10);
bbr->rtprop = RTprop_Inf;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
bbr->probe_rtt_done_stamp = 0;
bbr->probe_rtt_round_done = false;
bbr->packet_conservation = false;
bbr->prior_cwnd = 0;
bbr->cwnd = bbr->initial_cwnd;
bbr->idle_restart = false;
bbr->is_loss_recovery = false;
//Init round counting
bbr->next_round_delivered = 0;
bbr->round_start = false;
bbr->round_count = 0;
//Init full pipe
bbr->filled_pipe = false;
bbr->full_bw = 0;
bbr->full_bw_count = 0;
//Init pacing rate
double nominal_bandwidth = bbr->initial_cwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000);
bbr->pacing_rate = bbr->pacing_gain * nominal_bandwidth;
BBR_EnterStartup(bbr);
return bbr;
error:
PERROR("Failed to init BBR%s.", "");
return NULL;
}
void BBR_destroy(BBR* bbr)
{
WindowedFilter_destroy(bbr->btlBwFilter);
check(pthread_mutex_destroy(&bbr->lock) == 0, "lock destroy failed.");
free(bbr);
return;
error:
PERROR("BBR_destroy failed%s.", "");
}
double BBR_getPacingRate(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
double res = bbr->pacing_rate;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getPacingRate failed.")
return 0;
}
prrtByteCount_t BBR_getCwnd(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->cwnd;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getCwnd failed.")
return 0;
}
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtDeliveryRate_t res = bbr->bw;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getBtlBw failed.")
return 0;
}
uint32_t BBR_getRTProp(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->rtprop;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getRTProp failed.")
return 0;
}
uint32_t BBR_getState(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->state;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getState failed.")
return 0;
}
uint32_t BBR_getCycleIndex(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->cycle_index;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getCycleIndex failed.")
return 0;
}
double BBR_getPacingGain(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
double res = bbr->pacing_gain;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getPacingGain failed.")
return 0;
}
bool BBR_getFilledPipe(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
bool res = bbr->filled_pipe;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getFilledPipe failed.")
return 0;
}
bool BBR_getRoundStart(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
bool res = bbr->round_start;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getRoundStart failed.")
return 0;
}
prrtByteCount_t BBR_getFullBw(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->full_bw;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getInflight failed.")
return 0;
}
prrtByteCount_t BBR_getInflight(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = BBR_Inflight(bbr, bbr->pacing_gain);
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getInflight failed.")
return 0;
}
prrtByteCount_t BBR_getSendQuantum(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->send_quantum;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getSendQuantum failed.")
return 0;
}
\ No newline at end of file
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include <stdint.h>
#include "stdbool.h"
#include "types/packet.h"
#include "clock.h"
#include "types/channelStateInformation.h"
#include "types/packetTracking.h"
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define PROBE_GAIN 1.1
#define DRAIN_GAIN 0.9
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define RTprop_Inf UINT32_MAX
enum bbr_state {
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
typedef struct bbr {
pthread_mutex_t lock;
prrtByteCount_t mps;
prrtByteCount_t min_pipe_cwnd;
prrtByteCount_t initial_cwnd;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimestamp_t probe_rtt_done_stamp;
bool probe_rtt_round_done;
bool packet_conservation;
prrtByteCount_t prior_cwnd;
prrtByteCount_t cwnd;
prrtByteCount_t target_cwnd;
bool idle_restart;
enum bbr_state state;
double pacing_gain;
float cwnd_gain;
bool filled_pipe;
prrtByteCount_t full_bw;
uint32_t full_bw_count;
double pacing_rate;
bool has_seen_rtt;
uint32_t next_round_delivered;
bool round_start;
uint32_t round_count;
uint32_t next_rtt_delivered;
uint32_t rtt_count;
bool rtprop_expired;
bool is_loss_recovery;
prrtTimestamp_t loss_recovery_stamp;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
prrtByteCount_t send_quantum;
prrtDeliveryRate_t bw;
WindowedFilter* btlBwFilter;
} BBR;
BBR* BBR_Init(prrtByteCount_t maximum_payload_size);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnRTOLoss(BBR *bbr);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);
double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
prrtByteCount_t BBR_getInflight(BBR* bbr);
prrtByteCount_t BBR_getSendQuantum(BBR* bbr);
bool BBR_getRoundStart(BBR* bbr);
#endif //PRRT_BBR_H
#include <sys/time.h>
#include <stdlib.h>
#include "../util/time.h"
#include "../util/common.h"
#include "clock.h"
......@@ -19,50 +20,45 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock)
prrtTimestamp_t lastMeasurement = clock->lastMeasurement;
prrtTimeDifference_t diff = diff_abs_ts(currentTime, lastMeasurement);
prrtTimeDifference_t skew = (diff * clock->skew) / 400;
return (prrtTimestamp_t) virtualTime + clock->meanDeviation + skew;
return (prrtTimestamp_t) (virtualTime + clock->meanDeviation + skew);
} else {
return currentTime;
}
}
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t propagation_delay)
{
prrtTimestamp_t virtualTime = PrrtClock_get_prrt_time_us(clock);
prrtTimestamp_t currentTime = PrrtClock_get_current_time_us();
prrtTimestamp_t virtualTime = clock->virtualTime;
prrtTimeDifference_t clockSkew = clock->skew;
prrtTimedelta_t delay = rtt/2; // half the rtt
prrtTimeDifference_t phaseError = referenceTime - virtualTime + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13d, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
int64_t clockSkew = clock->skew;
prrtTimedelta_t delay = propagation_delay / 2; // half the rtt
int64_t phaseError = PrrtTimestamp_cmp(referenceTime, virtualTime) + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13ld, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
if(abs(phaseError) > 10000) {
if(labs(phaseError) > 10000) {
clock->meanDeviation = 0;
clock->skew = 0;
clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime;
}
prrtTimeDifference_t deviationSum = (prrtTimeDifference_t) (phaseError + 3 * clock->meanDeviation); // TODO: why 3?
int64_t deviationSum =(phaseError + 3 * clock->meanDeviation); // TODO: why 3?
prrtTimeDifference_t meanDeviation = (prrtTimeDifference_t) (deviationSum / 4); // TODO: find out why???
prrtTimeDifference_t period = diff_abs_ts(currentTime, clock->lastMeasurement);
int64_t period = PrrtTimestamp_cmp(currentTime, clock->lastMeasurement);
if(period > 0) {
clockSkew = (meanDeviation / period) + 15 * clock->skew;
clockSkew = (prrtTimeDifference_t) (clockSkew / 16);
}
virtualTime = virtualTime + meanDeviation + period * clockSkew / 400;
virtualTime = clock->virtualTime + meanDeviation + period * clockSkew / 400;
clock->meanDeviation = meanDeviation;
clock->skew = clockSkew;
clock->skew = (prrtTimeDifference_t) clockSkew;
clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime;
if(abs(phaseError) > 10000) {
clock->virtualTime = currentTime;
}
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
return true;
}
......
......@@ -26,7 +26,7 @@ prrtTimestamp_t PrrtClock_get_current_time_us(void);
prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t propagation_delay);
#define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB))
......
......@@ -4,24 +4,27 @@
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "../types/lossStatistics.h"
#include "../types/block.h"
#include "../clock.h"
#include "../socket.h"
#include "dataReceiver.h"
static void retrieve_data_blocks(PrrtSocket *sock_ptr,
prrtSequenceNumber_t base_seqno,
uint8_t k,
const PrrtBlock *block) {
static void retrieve_data_packets_for_block(PrrtSocket *sock_ptr,
prrtSequenceNumber_t base_seqno,
uint8_t k,
const PrrtBlock *block) {
List *res = List_create();
prrtSequenceNumber_t last_seqno = (prrtSequenceNumber_t) (base_seqno + k - 1);
debug(DEBUG_BLOCK, "Size: %d", PrrtDataPacketStore_size(sock_ptr->dataPacketStore));
PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
(prrtSequenceNumber_t) (base_seqno + k - 1));
last_seqno);
debug(DEBUG_BLOCK, "Retrieve %d packets in range: %u-%u.", List_count(res), base_seqno, last_seqno);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
PrrtPacket *packet = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packet), "Insert failed!")
}
List_destroy(res);
return;
......@@ -32,18 +35,22 @@ static void retrieve_data_blocks(PrrtSocket *sock_ptr,
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
if (block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
pkt->sequenceNumber)) {
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
} else {
PrrtPacket_destroy(pkt);
bool data_relevant = PrrtDeliveredPacketTable_test_is_block_relevant(sock_ptr->deliveredPacketTable,
block->baseSequenceNumber,
block->codingParams->n);
if (data_relevant) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
pkt->sequenceNumber)) {
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
} else {
PrrtPacket_destroy(pkt);
}
}
}
PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
PrrtBlock_destroy(block);
}
......@@ -67,7 +74,6 @@ static bool send_feedback(PrrtSocket *sock_ptr,
kind = ts_redundancy_packet;
}
debug(DEBUG_FEEDBACK, "Send feedback %d %d", type, seqno);
XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);
prrtFeedback_t feedback = {
......@@ -77,38 +83,29 @@ static bool send_feedback(PrrtSocket *sock_ptr,
.sentTime = sentTimestamp
};
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
(sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));
PrrtLossStatistics stats = sock_ptr->lossStatistics;
int group_RTT = 0; // TODO: To be determined.
uint32_t local_bottleneck_pace = MAX(PrrtPace_get_effective(sock_ptr->appDeliverPace), PrrtPace_get_effective(sock_ptr->prrtReceivePace));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime,
stats.erasureCount, stats.packetCount,
feedback.seqNo,
feedback.type);
feedback.seqNo, feedback.type,
local_bottleneck_pace);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &remote, sizeof(remote)) ==
length, "Sending feedback failed.");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(buf);
PrrtPacket_destroy(feedback_pkt_ptr);
......@@ -122,16 +119,12 @@ static bool send_feedback(PrrtSocket *sock_ptr,
return false;
}
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
/* TODO: implement */
return false;
}
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t sentTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = sentTimestamp;
PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
debug(DEBUG_DATARECEIVER, "Timeout: %lu", payload->packetTimeout_us);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
......@@ -139,18 +132,25 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if (is_timeout(now, payload->packetTimeout_us)) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (PrrtTimestamp_cmp(now, payload->packetTimeout_us) > 0) {
debug(DEBUG_DATARECEIVER, "Timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
PrrtPacket_destroy(packet);
debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
} else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
packet->sequenceNumber)) {
debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno);
PrrtPacket_destroy