Skip to content
Commits on Source (267)
......@@ -38,6 +38,17 @@ build:container:
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem:
stage: test
dependencies:
......
......@@ -29,9 +29,9 @@ port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
d = s.recv().decode("utf8")
if d != "Close":
print d
print(d)
else:
break
```
......@@ -44,25 +44,27 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port)
s = prrt.PrrtSocket(port=localport)
s.connect(host, port)
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
```
Start the receiver by:
```bash
python receiver.py 5000
python3 receiver.py 5000
```
In a separate terminal, run:
```bash
python sender.py 127.0.0.1 5000
python3 sender.py 127.0.0.1 5000 6000
```
This should generate the following output in the receiver console:
......
......@@ -25,16 +25,22 @@ local pf_data_length = ProtoField.uint32("prrt.data.length", "Length")
local pf_data_timestamp = ProtoField.uint32("prrt.data.timestamp", "Timestamp")
local pf_data_groupRTprop = ProtoField.uint32("prrt.data.grouprtprop", "Group RTprop")
local pf_data_packettimeout = ProtoField.uint32("prrt.data.packettimeout", "Packet Timeout")
local pf_data_btlbw = ProtoField.uint32("prrt.data.btlbw", "Bottleneck Bandwidth")
local pf_data_btl_pace = ProtoField.uint32("prrt.data.btl_pace", "Bottleneck Pace")
local pf_data_appSendTotal_pace = ProtoField.uint32("prrt.data.appSendTotal_pace", "Sender total application pace")
local pf_red = ProtoField.new("Redundancy", "prrt.redundancy", ftypes.BYTES, base.NONE)
local pf_red_baseSeqN = ProtoField.uint16("prrt.redundancy.baseSequenceNumber", "Base Sequence Number", base.DEC)
local pf_red_timestamp = ProtoField.uint32("prrt.redundancy.timestamp", "Timestamp")
local pf_red_btl_pace = ProtoField.uint32("prrt.redundancy.btl_pace", "Bottleneck Pace")
local pf_red_appSendTotal_pace = ProtoField.uint32("prrt.redundancy.appSendTotal_pace", "Sender total application pace")
local pf_red_n = ProtoField.uint8("prrt.redundancy.n", "n")
local pf_red_k = ProtoField.uint8("prrt.redundancy.k", "k")
local pf_fb = ProtoField.new("Feedback", "prrt.feedback", ftypes.BYTES, base.NONE)
local pf_fb_groupRTT = ProtoField.uint32("prrt.feedback.groupRTT", "Group RTT")
local pf_fb_ftt = ProtoField.uint32("prrt.feedback.FTT", "FTT")
local pf_fb_btlPace = ProtoField.uint32("prrt.feedback.btlPace", "Bottleneck pace")
local pf_fb_erasurecount = ProtoField.uint16("prrt.feedback.erasureCount", "Erasure count")
local pf_fb_packetcount = ProtoField.uint16("prrt.feedback.packetCount", "Packet count")
local pf_fb_gaplength = ProtoField.uint16("prrt.feedback.gapLength", "Gap length")
......@@ -56,16 +62,22 @@ prrt_proto.fields = {
pf_data_timestamp,
pf_data_groupRTprop,
pf_data_packettimeout,
pf_data_btlbw,
pf_data_btl_pace,
pf_data_appSendTotal_pace,
pf_red,
pf_red_baseSeqN,
pf_red_timestamp,
pf_red_btl_pace,
pf_red_appSendTotal_pace,
pf_red_n,
pf_red_k,
pf_fb,
pf_fb_groupRTT,
pf_fb_ftt,
pf_fb_btlPace,
pf_fb_erasurecount,
pf_fb_packetcount,
pf_fb_gaplength,
......@@ -89,9 +101,16 @@ local ex_type = Field.new("prrt.type")
local function getType() return ex_type()() end
local function getTypeName() return prrtPacketTypeNames[getType()] end
local ex_index = Field.new("prrt.index")
local function getIndex() return ex_index()() end
local ex_data_length = Field.new("prrt.data.length")
local function getDataLength() return ex_data_length()() end
local ex_red_baseseqno = Field.new("prrt.redundancy.baseSequenceNumber")
local function getRedBaseSeqNo() return ex_red_baseseqno()() end
local ex_red_n = Field.new("prrt.redundancy.n")
local function getRedN() return ex_red_n()() end
......@@ -108,8 +127,11 @@ local function dissect_data(buffer, pinfo, root)
tree:add(pf_data_timestamp, buffer:range(4,4))
tree:add(pf_data_groupRTprop, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4))
tree:add(pf_data_btlbw, buffer:range(16,4))
tree:add(pf_data_btl_pace, buffer:range(20,4))
tree:add(pf_data_appSendTotal_pace, buffer:range(24,4))
local label = "DATA Len=" .. getDataLength()
local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -118,10 +140,12 @@ local function dissect_redundancy(buffer, pinfo, root)
local tree = root:add(pf_red, buffer:range(0))
tree:add(pf_red_baseSeqN, buffer:range(0,2))
tree:add(pf_red_timestamp, buffer:range(2,4))
tree:add(pf_red_n, buffer:range(6,1))
tree:add(pf_red_k, buffer:range(7,1))
tree:add(pf_red_btl_pace, buffer:range(6,4))
tree:add(pf_red_appSendTotal_pace, buffer:range(10,4))
tree:add(pf_red_n, buffer:range(14,1))
tree:add(pf_red_k, buffer:range(15,1))
local label = "REDUNDANCY n=" .. getRedN() .. " k=" .. getRedK()
local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -130,16 +154,17 @@ local function dissect_feedback(buffer, pinfo, root)
local tree = root:add(pf_fb, buffer:range(0))
tree:add(pf_fb_groupRTT, buffer:range(0,4))
tree:add(pf_fb_ftt, buffer:range(4,4))
tree:add(pf_fb_erasurecount, buffer:range(8,2))
tree:add(pf_fb_packetcount, buffer:range(10,2))
tree:add(pf_fb_gaplength, buffer:range(12,2))
tree:add(pf_fb_gapcount, buffer:range(14,2))
tree:add(pf_fb_burstlength, buffer:range(16,2))
tree:add(pf_fb_burstcount, buffer:range(18,2))
tree:add(pf_fb_acktype, buffer:range(20,1))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2))
local label = "FEEDBACK"
tree:add(pf_fb_btlPace, buffer:range(8,4))
tree:add(pf_fb_erasurecount, buffer:range(12,2))
tree:add(pf_fb_packetcount, buffer:range(14,2))
tree:add(pf_fb_gaplength, buffer:range(16,2))
tree:add(pf_fb_gapcount, buffer:range(18,2))
tree:add(pf_fb_burstlength, buffer:range(20,2))
tree:add(pf_fb_burstcount, buffer:range(22,2))
tree:add(pf_fb_acktype, buffer:range(24,1))
tree:add(pf_fb_ackSeqN, buffer:range(25, 2))
local label = "[F]"
tree:set_text(label)
pinfo.cols.info:set(label)
end
......
FROM gcc:5
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y cmake
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
iperf3 \
psmisc \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake . \
&& make
ENV PATH /prrt/bin:$PATH
WORKDIR /prrt/bin
ENV PATH /prrt:$PATH
VOLUME /output
......
FROM gcc:8
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
iperf3 \
psmisc \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
COPY docker/sysctl.conf /etc/sysctl.d/01-disable-ipv6.conf
WORKDIR /prrt
RUN cmake -DTCP=1 . \
&& make
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
......@@ -5,13 +5,15 @@ dev=eth0
command=$1
shift
if [[ "$command" == "sender" || "$command" == "receiver" ]]; then
if [[ "$command" == "sender" || "$command" == "receiver" || "$command" == "time-sender" || "$command" == "time-receiver" ]]; then
:
else
echo "Command should be either sender or receiver."
exit 0;
fi
TARGET="127.0.0.1"
OUTPUT="/dev/null"
NETEM=()
PRRT=()
while [[ $# -gt 0 ]]
......@@ -19,7 +21,24 @@ do
key="$1"
case $key in
-t|--target|-p|--port|-r|--rounds)
-t|--target)
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
PRRT+=("$1 $2")
fi
TARGET=("$2")
shift
shift
;;
-w|--wireshark)
OUTPUT=("$2")
shift
shift
;;
-T|--threadpinning|-U)
PRRT+=("$1")
shift
;;
-p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay|-j|--appjitter)
PRRT+=("$1 $2")
shift
shift
......@@ -34,8 +53,32 @@ done
PRRT_PARAMS="${PRRT[@]}"
NETEM_PARAMS="${NETEM[@]}"
echo "Starting Wireshark."
tshark -i eth0 -w $OUTPUT.pcap &
TSHARK_PID=$!
sleep 2
start=$(date +%s.%N);
echo "Checking reachability of $TARGET."
until ping -c1 $TARGET &>/dev/null; do sleep 1; done
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Reachable after %.6f seconds\n" $dur
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
echo "Delaying $command"
sleep 10
fi
start=$(date +%s.%N);
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
tc qdisc add dev $dev root netem $NETEM_PARAMS
/prrt/$command $PRRT_PARAMS -o /output/log.csv
tc qdisc del dev $dev root
#tc qdisc add dev $dev root netem $NETEM_PARAMS
trap 'echo "Caught SIGINT."; echo "$(ps -a)"; killall -SIGINT $command' INT
LOG=$(/prrt/$command $PRRT_PARAMS 2>&1)
printf "$LOG\n"
echo "Exit status: $?"
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Done after %.6f seconds\n" $dur
#tc qdisc del dev $dev root
kill $TSHARK_PID
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
......@@ -10,14 +10,26 @@ if (XLAP)
add_definitions(-DXLAP)
endif()
option (TCP "Set time protocol to TCP.")
if (TCP)
add_definitions(-DTCP)
endif()
add_subdirectory(proto)
add_subdirectory(util)
add_executable(sender sender.c)
add_executable(receiver receiver.c)
add_executable(refcount refcount.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
......@@ -119,11 +119,12 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
......@@ -144,11 +145,23 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s)
bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s)
float PrrtSocket_get_pacing_gain(PrrtSocket *s)
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s)
uint32_t PrrtSocket_get_inflight(PrrtSocket *s)
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s)
uint32_t PrrtSocket_get_send_quantum(PrrtSocket *s)
uint32_t PrrtSocket_get_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_delivered(PrrtSocket *s)
bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h":
......
......@@ -30,8 +30,8 @@
#define GF_BITS 8
#define K_START 4
#define N_START 7
#define K_START 1
#define N_START 1
#define N_P_START 1
#define RRT_ALPHA 0.125
......
set (PRRT_SOURCES ../defines.h
types/block.c types/block.h
types/channelStateInformation.c types/channelStateInformation.h
bbr.c bbr.h
clock.c clock.h
types/codingParams.c types/codingParams.h
receiver.c receiver.h
socket.c socket.h
types/applicationConstraints.c types/applicationConstraints.h
timer.c timer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/pace.c stores/pace.h
stores/paceFilter.c stores/paceFilter.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
......
#include "bbr.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "receiver.h"
#include <math.h>
prrtByteCount_t BBR_Inflight(BBR* bbr, double gain)
{
if (bbr->rtprop == RTprop_Inf)
return bbr->initial_cwnd; /* no valid RTT samples yet */
uint32_t quanta = bbr->mps;
uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000));
return (uint32_t)(gain * estimated_bdp + quanta);
}
void BBR_EnterStartup(BBR* bbr)
{
bbr->state = STARTUP;
bbr->pacing_gain = BBRHighGain;
bbr->cwnd_gain = BBRHighGain;
}
void BBR_UpdateBtlBw(BBR* bbr, PrrtRateSample* rs, PrrtPacketTracking* tracking)
{
if (tracking->delivered >= bbr->next_round_delivered) {
bbr->next_round_delivered = tracking->delivered;
bbr->round_count++;
bbr->round_start = true;
} else {
bbr->round_start = false;
}
uint32_t delivery_rate_Bps = (uint32_t)((float) rs->delivery_rate);
if ((delivery_rate_Bps >= bbr->bw || !rs->is_app_limited) && delivery_rate_Bps != 0) {
bbr->bw = (uint32_t)WindowedFilter_push(bbr->btlBwFilter, delivery_rate_Bps);
debug(DEBUG_BBR, "Current BtlBw: %u, RS delivery rate: %u", bbr->bw, delivery_rate_Bps);
}
}
void BBR_CheckFullPipe(BBR* bbr, PrrtRateSample* rs)
{
if (bbr->filled_pipe || !bbr->round_start || rs->is_app_limited)
return; // no need to check for a full pipe now
if (bbr->bw >= bbr->full_bw * PROBE_GAIN) { // BBR.BtlBw still growing?
bbr->full_bw = bbr->bw; // record new baseline level
bbr->full_bw_count = 0;
return;
}
bbr->full_bw_count++; // another round w/o much growth
if (bbr->full_bw_count >= 3)
bbr->filled_pipe = true;
}
bool BBR_IsNextCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight)
{
bool is_full_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > bbr->rtprop;
if (bbr->pacing_gain == 1)
return is_full_length;
if (bbr->pacing_gain > 1)
return is_full_length && (bytes_lost > 0 || prior_inflight >= BBR_Inflight(bbr, bbr->pacing_gain));
bool is_max_length = (PrrtClock_get_current_time_us() - bbr->cycle_stamp) > 4 * bbr->rtprop;
return is_max_length || (prior_inflight <= BBR_Inflight(bbr, 1.0));
}
void BBR_AdvanceCyclePhase(BBR* bbr)
{
bbr->cycle_stamp = PrrtClock_get_current_time_us();
bbr->cycle_index = (uint8_t )((bbr->cycle_index + 1) % BBRGainCycleLen);
float pacing_gain_cycle[BBRGainCycleLen] = {PROBE_GAIN, DRAIN_GAIN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0};
bbr->pacing_gain = pacing_gain_cycle[bbr->cycle_index];
debug(DEBUG_BBR, "Advanced cycle with gain: %f", bbr->pacing_gain);
}
void BBR_CheckCyclePhase(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t prior_inflight) {
if (bbr->state == PROBE_BW && BBR_IsNextCyclePhase(bbr, bytes_lost, prior_inflight))
BBR_AdvanceCyclePhase(bbr);
}
void BBR_EnterProbeBW(BBR* bbr)
{
bbr->state = PROBE_BW;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 2;
bbr->cycle_index = (uint8_t)(BBRGainCycleLen - 1 - (random() % 7));
BBR_AdvanceCyclePhase(bbr);
}
void BBR_CheckDrain(BBR* bbr, prrtByteCount_t bytes_inflight)
{
if (bbr->state == STARTUP && bbr->filled_pipe) {
//Drain
bbr->state = DRAIN;
bbr->pacing_gain = 1 / BBRHighGain; // pace slowly
bbr->cwnd_gain = BBRHighGain; // maintain cwnd
}
if (bbr->state == DRAIN && bytes_inflight <= BBR_Inflight(bbr, 1.0))
BBR_EnterProbeBW(bbr); // we estimate queue is drained
}
void BBR_ExitProbeRTT(BBR* bbr)
{
if (bbr->filled_pipe)
BBR_EnterProbeBW(bbr);
else
BBR_EnterStartup(bbr);
}
uint32_t BBR_SaveCwnd(BBR* bbr)
{
if (!bbr->is_loss_recovery && bbr->state != PROBE_RTT)
return bbr->cwnd;
return MAX(bbr->prior_cwnd ,bbr->cwnd);
}
void BBR_RestoreCwnd(BBR* bbr)
{
bbr->cwnd = MAX(bbr->cwnd, bbr->prior_cwnd);
}
void BBR_UpdateRTprop(BBR* bbr, prrtTimedelta_t rtt)
{
bbr->rtprop_expired = PrrtClock_get_current_time_us() > (bbr->rtprop_stamp + RTpropFilterLen);
if (rtt >= 0 && (rtt <= bbr->rtprop || bbr->rtprop_expired)) {
bbr->rtprop = rtt;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
}
}
void BBR_EnterProbeRTT(BBR *bbr) {
bbr->state = PROBE_RTT;
bbr->pacing_gain = 1;
bbr->cwnd_gain = 1;
}
void BBR_HandleProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
/* Ignore low rate samples during ProbeRTT: */
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (bbr->probe_rtt_done_stamp == 0 && tracking->pipe <= bbr->min_pipe_cwnd) {
bbr->probe_rtt_done_stamp = now + ProbeRTTDuration;
bbr->probe_rtt_round_done = false;
bbr->next_round_delivered = tracking->delivered;
} else if (bbr->probe_rtt_done_stamp != 0) {
if (bbr->round_start) {
bbr->probe_rtt_round_done = true;
}
if (bbr->probe_rtt_round_done && (now > bbr->probe_rtt_done_stamp)) {
bbr->rtprop_stamp = now;
BBR_RestoreCwnd(bbr);
BBR_ExitProbeRTT(bbr);
}
}
}
void BBR_CheckProbeRTT(BBR *bbr, PrrtPacketTracking *tracking) {
if (bbr->state != PROBE_RTT && bbr->rtprop_expired && !bbr->idle_restart) {
BBR_EnterProbeRTT(bbr);
BBR_SaveCwnd(bbr);
bbr->probe_rtt_done_stamp = 0;
}
if (bbr->state == PROBE_RTT) {
BBR_HandleProbeRTT(bbr, tracking);
}
bbr->idle_restart = false;
}
void BBR_UpdateModelAndState(BBR *bbr, PrrtRateSample *rs, PrrtPacketTracking *packetTracking, prrtTimedelta_t rtt)
{
BBR_UpdateBtlBw(bbr, rs, packetTracking);
BBR_CheckCyclePhase(bbr, packetTracking->bytes_lost, packetTracking->prior_inflight);
BBR_CheckFullPipe(bbr, rs);
BBR_CheckDrain(bbr, packetTracking->pipe);
BBR_UpdateRTprop(bbr, rtt);
BBR_CheckProbeRTT(bbr, packetTracking);
}
void BBR_UpdateTargetCwnd(BBR* bbr)
{
bbr->target_cwnd = BBR_Inflight(bbr, bbr->cwnd_gain);
}
void BBR_ModulateCwndForProbeRTT(BBR* bbr)
{
if (bbr->state == PROBE_RTT)
bbr->cwnd = MIN(bbr->cwnd, bbr->min_pipe_cwnd);
}
void BBR_ModulateCwndForRecovery(BBR* bbr, prrtByteCount_t bytes_lost, prrtByteCount_t pipe, prrtByteCount_t delivered)
{
if (bytes_lost > 0) {
if (bbr->cwnd > bytes_lost) {
bbr->cwnd = MAX(bbr->cwnd - bytes_lost, bbr->mps);
} else {
bbr->cwnd = bbr->mps;
}
}
if (bbr->packet_conservation)
bbr->cwnd = MAX(bbr->cwnd, pipe + delivered);
}
void BBR_SetCwnd(BBR* bbr, PrrtPacketTracking* packetTracking)
{
BBR_UpdateTargetCwnd(bbr);
BBR_ModulateCwndForRecovery(bbr, packetTracking->bytes_lost, packetTracking->pipe, packetTracking->delivered);
if (!bbr->packet_conservation) {
if (bbr->filled_pipe)
bbr->cwnd = MIN(bbr->cwnd + packetTracking->delivered, bbr->target_cwnd);
else if (bbr->cwnd < bbr->target_cwnd || packetTracking->delivered < bbr->initial_cwnd)
bbr->cwnd = bbr->cwnd + packetTracking->delivered;
bbr->cwnd = MAX(bbr->cwnd, bbr->min_pipe_cwnd);
}
BBR_ModulateCwndForProbeRTT(bbr);
debug(DEBUG_BBR, "New cwnd: %u, State: %u", bbr->cwnd, bbr->state);
}
void BBR_SetPacingRateWithGain(BBR* bbr, double pacing_gain)
{
double rate = (pacing_gain * ((double)bbr->bw));
debug(DEBUG_BBR, "Current rate: %f, Pacing gain: %f, BtlBw: %u, Calc Rate: %f, Filled pipe: %u", bbr->pacing_rate,
pacing_gain, bbr->bw, rate, bbr->filled_pipe);
if (rate != 0 && (bbr->filled_pipe || rate > bbr->pacing_rate))
bbr->pacing_rate = rate;
}
void BBR_SetPacingRate(BBR* bbr)
{
BBR_SetPacingRateWithGain(bbr, bbr->pacing_gain);
}
void BBR_SetSendQuantum(BBR* bbr) {
if(bbr->pacing_rate < 150000) { // 1.2Mbps = 0.15 MBps = 150000 Bps
bbr->send_quantum = 1 * bbr->mps;
} else if (bbr->pacing_rate < 3000000) { // 24 Mbps = 20 * 1.2Mbps = 3000000
bbr->send_quantum = 2 * bbr->mps;
} else {
bbr->send_quantum = MIN((prrtByteCount_t) round((double) bbr->pacing_rate / 1000), 64000);
}
}
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
BBR_UpdateModelAndState(bbr, rs, packetTracking, rtt);
BBR_SetPacingRate(bbr);
BBR_SetSendQuantum(bbr);
BBR_SetCwnd(bbr, packetTracking);
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(!bbr->is_loss_recovery) {
bbr->is_loss_recovery = true;
bbr->loss_recovery_stamp = PrrtClock_get_current_time_us();
bbr->prior_cwnd = BBR_SaveCwnd(bbr);
bbr->cwnd = tracking->pipe + MAX(tracking->delivered, 1);
bbr->packet_conservation = true;
} else if (PrrtClock_get_current_time_us() > bbr->loss_recovery_stamp + bbr->rtprop){
bbr->packet_conservation = false;
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnRTOLoss(BBR *bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(!bbr->is_loss_recovery) {
bbr->is_loss_recovery = true;
bbr->prior_cwnd = BBR_SaveCwnd(bbr);
bbr->cwnd = bbr->mps;
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
void BBR_OnLossExit(BBR *bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
if(bbr->is_loss_recovery) {
bbr->is_loss_recovery = false;
bbr->packet_conservation = false;
BBR_RestoreCwnd(bbr);
}
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return;
error:
PERROR("BBR_OnACK failed.")
}
BBR* BBR_Init(prrtByteCount_t maximum_payload_size)
{
BBR* bbr = calloc(1, sizeof(BBR));
check_mem(bbr);
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&bbr->lock, &attr) == 0, "lock init failed.");
bbr->mps = maximum_payload_size;
bbr->min_pipe_cwnd = 4 * maximum_payload_size;
bbr->initial_cwnd = 4 * maximum_payload_size;
bbr->has_seen_rtt = false;
bbr->btlBwFilter = WindowedFilter_create(true, 10);
bbr->rtprop = RTprop_Inf;
bbr->rtprop_stamp = PrrtClock_get_current_time_us();
bbr->probe_rtt_done_stamp = 0;
bbr->probe_rtt_round_done = false;
bbr->packet_conservation = false;
bbr->prior_cwnd = 0;
bbr->cwnd = bbr->initial_cwnd;
bbr->idle_restart = false;
bbr->is_loss_recovery = false;
//Init round counting
bbr->next_round_delivered = 0;
bbr->round_start = false;
bbr->round_count = 0;
//Init full pipe
bbr->filled_pipe = false;
bbr->full_bw = 0;
bbr->full_bw_count = 0;
//Init pacing rate
double nominal_bandwidth = bbr->initial_cwnd / (bbr->has_seen_rtt ? bbr->rtprop : 1000);
bbr->pacing_rate = bbr->pacing_gain * nominal_bandwidth;
BBR_EnterStartup(bbr);
return bbr;
error:
PERROR("Failed to init BBR%s.", "");
return NULL;
}
void BBR_destroy(BBR* bbr)
{
WindowedFilter_destroy(bbr->btlBwFilter);
check(pthread_mutex_destroy(&bbr->lock) == 0, "lock destroy failed.");
free(bbr);
return;
error:
PERROR("BBR_destroy failed%s.", "");
}
double BBR_getPacingRate(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
double res = bbr->pacing_rate;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getPacingRate failed.")
return 0;
}
prrtByteCount_t BBR_getCwnd(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->cwnd;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getCwnd failed.")
return 0;
}
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtDeliveryRate_t res = bbr->bw;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getBtlBw failed.")
return 0;
}
uint32_t BBR_getRTProp(BBR* bbr)
{
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->rtprop;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getRTProp failed.")
return 0;
}
uint32_t BBR_getState(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->state;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getState failed.")
return 0;
}
uint32_t BBR_getCycleIndex(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
uint32_t res = bbr->cycle_index;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getCycleIndex failed.")
return 0;
}
double BBR_getPacingGain(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
double res = bbr->pacing_gain;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getPacingGain failed.")
return 0;
}
bool BBR_getFilledPipe(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
bool res = bbr->filled_pipe;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getFilledPipe failed.")
return 0;
}
bool BBR_getRoundStart(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
bool res = bbr->round_start;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getRoundStart failed.")
return 0;
}
prrtByteCount_t BBR_getFullBw(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->full_bw;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getInflight failed.")
return 0;
}
prrtByteCount_t BBR_getInflight(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = BBR_Inflight(bbr, bbr->pacing_gain);
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getInflight failed.")
return 0;
}
prrtByteCount_t BBR_getSendQuantum(BBR* bbr) {
check(pthread_mutex_lock(&bbr->lock) == 0, "Lock failed.");
prrtByteCount_t res = bbr->send_quantum;
check(pthread_mutex_unlock(&bbr->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("BBR_getSendQuantum failed.")
return 0;
}
\ No newline at end of file
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include <stdint.h>
#include "stdbool.h"
#include "types/packet.h"
#include "clock.h"
#include "types/channelStateInformation.h"
#include "types/packetTracking.h"
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define PROBE_GAIN 1.1
#define DRAIN_GAIN 0.9
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define RTprop_Inf UINT32_MAX
enum bbr_state {
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
typedef struct bbr {
pthread_mutex_t lock;
prrtByteCount_t mps;
prrtByteCount_t min_pipe_cwnd;
prrtByteCount_t initial_cwnd;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimestamp_t probe_rtt_done_stamp;
bool probe_rtt_round_done;
bool packet_conservation;
prrtByteCount_t prior_cwnd;
prrtByteCount_t cwnd;
prrtByteCount_t target_cwnd;
bool idle_restart;
enum bbr_state state;
double pacing_gain;
float cwnd_gain;
bool filled_pipe;
prrtByteCount_t full_bw;
uint32_t full_bw_count;
double pacing_rate;
bool has_seen_rtt;
uint32_t next_round_delivered;
bool round_start;
uint32_t round_count;
uint32_t next_rtt_delivered;
uint32_t rtt_count;
bool rtprop_expired;
bool is_loss_recovery;
prrtTimestamp_t loss_recovery_stamp;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
prrtByteCount_t send_quantum;
prrtDeliveryRate_t bw;
WindowedFilter* btlBwFilter;
} BBR;
BBR* BBR_Init(prrtByteCount_t maximum_payload_size);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnRTOLoss(BBR *bbr);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);
double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
prrtByteCount_t BBR_getInflight(BBR* bbr);
prrtByteCount_t BBR_getSendQuantum(BBR* bbr);
bool BBR_getRoundStart(BBR* bbr);
#endif //PRRT_BBR_H
......@@ -4,24 +4,27 @@
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "../types/lossStatistics.h"
#include "../types/block.h"
#include "../clock.h"
#include "../socket.h"
#include "dataReceiver.h"
static void retrieve_data_blocks(PrrtSocket *sock_ptr,
static void retrieve_data_packets_for_block(PrrtSocket *sock_ptr,
prrtSequenceNumber_t base_seqno,
uint8_t k,
const PrrtBlock *block) {
List *res = List_create();
prrtSequenceNumber_t last_seqno = (prrtSequenceNumber_t) (base_seqno + k - 1);
debug(DEBUG_BLOCK, "Size: %d", PrrtDataPacketStore_size(sock_ptr->dataPacketStore));
PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
(prrtSequenceNumber_t) (base_seqno + k - 1));
last_seqno);
debug(DEBUG_BLOCK, "Retrieve %d packets in range: %u-%u.", List_count(res), base_seqno, last_seqno);
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
PrrtPacket *packet = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packet), "Insert failed!")
}
List_destroy(res);
return;
......@@ -32,6 +35,10 @@ static void retrieve_data_blocks(PrrtSocket *sock_ptr,
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
if (block != NULL && PrrtBlock_decode_ready(block)) {
bool data_relevant = PrrtDeliveredPacketTable_test_is_block_relevant(sock_ptr->deliveredPacketTable,
block->baseSequenceNumber,
block->codingParams->n);
if (data_relevant) {
check(PrrtBlock_decode(block), "Decoding failed");
while (List_count(block->dataPackets) > 0) {
......@@ -43,7 +50,7 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
PrrtPacket_destroy(pkt);
}
}
}
PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
PrrtBlock_destroy(block);
}
......@@ -67,7 +74,6 @@ static bool send_feedback(PrrtSocket *sock_ptr,
kind = ts_redundancy_packet;
}
debug(DEBUG_FEEDBACK, "Send feedback %d %d", type, seqno);
XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);
prrtFeedback_t feedback = {
......@@ -95,20 +101,23 @@ static bool send_feedback(PrrtSocket *sock_ptr,
PrrtLossStatistics stats = sock_ptr->lossStatistics;
int group_RTT = 0; // TODO: To be determined.
uint32_t local_bottleneck_pace = MAX(PrrtPace_get_effective(sock_ptr->appDeliverPace), PrrtPace_get_effective(sock_ptr->prrtReceivePace));
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
stats.gapLength, stats.gapCount, stats.burstLength,
stats.burstCount, forwardTripTime,
stats.erasureCount, stats.packetCount,
feedback.seqNo,
feedback.type);
feedback.seqNo, feedback.type,
local_bottleneck_pace);
prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
void *buf = calloc(1, length);
check_mem(buf);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(buf);
PrrtPacket_destroy(feedback_pkt_ptr);
......@@ -122,35 +131,40 @@ static bool send_feedback(PrrtSocket *sock_ptr,
return false;
}
static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) {
/* TODO: implement */
return false;
}
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t sentTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = sentTimestamp;
PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
debug(DEBUG_DATARECEIVER, "Timeout: %lu", payload->packetTimeout_us);
PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
prrtSequenceNumber_t seqno = packet->sequenceNumber;
PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno, sentTimestamp);
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(&sock_ptr->clock);
if (is_timeout(now, payload->packetTimeout_us)) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (PrrtTimestamp_cmp(now, payload->packetTimeout_us) > 0) {
debug(DEBUG_DATARECEIVER, "Timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
PrrtPacket_destroy(packet);
debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
(unsigned long) payload->packetTimeout_us);
} else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
packet->sequenceNumber)) {
debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno);
PrrtPacket_destroy(packet);
} else {
PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw);
sock_ptr->send_peer_btl_pace = payload->btl_pace;
sock_ptr->send_peer_app_total_pace = payload->appSendTotal_pace;
sock_ptr->rtt = payload->groupRTprop_us;
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
// forward to application layer
debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
PrrtPacket *reference = PrrtPacket_copy(packet);
// forward to application layer
......@@ -159,18 +173,21 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
if (block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed: %d, %d", baseSequenceNumber,
seqno);
if(PrrtBlock_insert_data_packet(block, reference)) {
decode_block(sock_ptr, block);
} else {
PrrtPacket_destroy(reference);
}
} else {
debug(DEBUG_DATARECEIVER, "Inserting data packet %d for later.", reference->sequenceNumber);
if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
debug(DEBUG_DATARECEIVER, "Failed to insert %d.", reference->sequenceNumber);
PrrtPacket_destroy(reference);
} else {
debug(DEBUG_DATARECEIVER, "Inserted %d.", reference->sequenceNumber);
}
}
// forward to application layer
debug(DEBUG_DATARECEIVER, "forward %u", seqno);
XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
}
......@@ -181,9 +198,13 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
}
static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
PrrtPacketRedundancyPayload *payload = packet->payload;
prrtTimestamp_t sentTimestamp = payload->timestamp;
socket->lastSentTimestamp = sentTimestamp;
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
PrrtReceptionTable_mark_received(socket->redundancyReceptionTable, packet->sequenceNumber);
PrrtReceptionTable_mark_received(socket->redundancyReceptionTable, packet->sequenceNumber, redundancyPayload->timestamp);
if (!PrrtDeliveredPacketTable_test_is_block_relevant(socket->deliveredPacketTable,
redundancyPayload->baseSequenceNumber,
......@@ -192,6 +213,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
} else {
PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
redundancyPayload->baseSequenceNumber);
socket->send_peer_btl_pace = payload->btl_pace;
if (block == NULL) {
uint8_t n_cycle[1] = {redundancyPayload->n - redundancyPayload->k};
PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
......@@ -201,9 +223,9 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
redundancyPayload->baseSequenceNumber);
PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
}
retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
retrieve_data_packets_for_block(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
}
if (PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(socket, block);
......@@ -213,31 +235,16 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
}
}
void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
void handle_feedback_packet(PrrtSocket *socket, PrrtPacket *packet, prrtTimestamp_t receiveTime) {
check(packet != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) packet->payload;
prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;
bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver,
feedbackPayload->ackSequenceNumber,
feedbackPayload->ackPacketType, receiveTime);
debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample ");
prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
socket->recv_peer_btl_pace = feedbackPayload->btl_pace;
if (valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi,
prrtSocket->receiver->rateSample->delivery_rate);
}
PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi,
prrtSocket->receiver->rateSample->is_app_limited);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited ");
PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi,
(prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop ");
PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount,
feedbackPayload->packetCount);
debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr ");
PrrtReceiver_on_ack(socket->receiver, feedbackPayload, receiveTime, rtt, socket->applicationConstraints);
return;
error:
......@@ -297,6 +304,8 @@ void *receive_data_loop(void *ptr) {
PrrtSocket *s = ptr;
while (1) {
PrrtPace_track_start(s->prrtReceivePace);
debug(DEBUG_DATARECEIVER, "About to receive.");
XlapTimestampPlaceholder tsph1;
XlapTimestampPlaceholder tsph2;
XlapTimestampPlaceholder tsph3;
......@@ -306,8 +315,11 @@ void *receive_data_loop(void *ptr) {
struct timespec packet_recv_timestamp;
uint64_t packet_recv_cyclestamp = 0;
PrrtPace_track_pause(s->prrtReceivePace);
receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
if (PrrtSocket_closing(s)) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
PrrtPace_track_resume(s->prrtReceivePace);
if (atomic_load_explicit(&s->closing, memory_order_acquire)) {
break;
}
......@@ -327,6 +339,7 @@ void *receive_data_loop(void *ptr) {
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
packet->channelReceive = packet_recv_timestamp;
enum XlapTimestampPacketKind kind = ts_any_packet;
prrtTimestamp_t sentTimestamp;
......@@ -369,9 +382,9 @@ void *receive_data_loop(void *ptr) {
debug(DEBUG_DATARECEIVER, "Cleanup");
PrrtSocket_cleanup(s);
debug(DEBUG_DATARECEIVER, "Cleaned");
PrrtPace_track_end(s->prrtReceivePace);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
}
PrrtSocket_cleanup(s);
return NULL;
error:
PERROR("receive_data_loop() ended unexpectedly.");
......
......@@ -2,12 +2,15 @@
#include <netdb.h>
#include <string.h>
#include "../../defines.h"
#include "../timer.h"
#include "../receiver.h"
#include "../socket.h"
#include "../types/block.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "dataTransmitter.h"
#include <math.h>
bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
PrrtReceiver *recv = sock_ptr->receiver;
......@@ -76,7 +79,65 @@ bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t lengt
static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
prrtPacketLength_t length = PrrtPacket_size(packet);
PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
bool paceSuccessful = PrrtSocket_pace(sock_ptr, true);
PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
if (!paceSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");
PrrtPace_track_pause(sock_ptr->prrtTransmitPace);
bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
sock_ptr->applicationConstraints);
PrrtPace_track_resume(sock_ptr->prrtTransmitPace);
if(!waitSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Space available.");
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) {
prrtTimedelta_t peerPacingTime = 0;
prrtTimedelta_t channelPacingTime = 0;
double pacing_rate = PrrtReceiver_get_BBR_pacingRate(sock_ptr->receiver);
double pacing_gain = PrrtReceiver_get_BBR_pacingGain(sock_ptr->receiver) * 0.9;
uint32_t state = PrrtReceiver_get_BBR_state(sock_ptr->receiver);
if(pacing_rate != 0) {
channelPacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ((double) packet->payloadLength)) / pacing_rate));
}
// Cross-Pace iff PROBE_BW and unity gain
if(sock_ptr->recv_peer_btl_pace != 0 && state == PROBE_BW) {
double pt = round(((double) sock_ptr->recv_peer_btl_pace) / pacing_gain);
if (pt > (TIMESTAMP_SPACE-1)) {
peerPacingTime = TIMESTAMP_SPACE-1;
} else {
peerPacingTime = (prrtTimedelta_t) pt;
}
}
prrtTimedelta_t pacingTime = MAX(channelPacingTime, peerPacingTime);
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
}
// Update timestamp
prrtTimedelta_t btl_pace = MAX(MAX(PrrtPace_get_effective(sock_ptr->prrtTransmitPace), PrrtPace_get_effective(sock_ptr->appSendPace)), PrrtSocket_get_sock_opt(sock_ptr, "nw_pace"));
prrtTimedelta_t appSendTotal_pace = PrrtPace_get_total(sock_ptr->appSendPace);
if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
((PrrtPacketDataPayload*) (packet->payload))->btl_pace = btl_pace;
((PrrtPacketDataPayload*) (packet->payload))->appSendTotal_pace = appSendTotal_pace;
} else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketRedundancyPayload*) (packet->payload))->btl_pace = btl_pace;
((PrrtPacketRedundancyPayload*) (packet->payload))->appSendTotal_pace = appSendTotal_pace;
}
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
......@@ -96,7 +157,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
struct timespec timestamp;
uint64_t cyclestamp;
send_to_socket(sock_ptr, buf, length, &timestamp, &cyclestamp);
send_to_socket(sock_ptr, buf, PrrtPacket_size(packet), &timestamp, &cyclestamp);
XlapTimeStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, timestamp);
XlapCycleStampValue(sock_ptr, ts_data_packet, packet->sequenceNumber, ChannelTransmit, cyclestamp);
......@@ -117,7 +178,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
case PACKET_TYPE_CHANNEL_FEEDBACK:
default:;
}
return true;
error:
......@@ -125,57 +185,112 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false;
}
void *send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
typedef struct timer_arg {
PrrtSocket* socket;
PrrtBlock* block;
} RetransmissionTimerArgs;
while (1) {
ListNode *job;
do {
job = Pipe_pull(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) {
void retransmission_round_handler(void *arg) {
uint8_t j;
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;
PrrtBlock *block = args->block;
PrrtSocket *socket = args->socket;
if(block->inRound > 0) {
PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
}
if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
PrrtBlock_destroy(block);
free(arg);
return;
}
return NULL;
uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
bool sendResult = send_packet(socket, red_pkt);
if(!sendResult) {
debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
PrrtBlock_destroy(block);
free(arg);
return;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
}
block->inRound++;
PrrtTimerTask task = {
.arg = arg,
.fun = retransmission_round_handler
};
uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
prrtTimerDate deadline = abstime_from_now(waittime_us);
debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPace_track_start(sock_ptr->prrtTransmitPace);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (block == NULL) {
block = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
if (sock_ptr->receiveBlock == NULL) {
sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
sock_ptr->receiveBlock->senderBlock = true;
}
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
int sendResult = send_packet(sock_ptr, packetToSend);
if (sendResult) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet);
if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
PERROR("Failed to insert packet: %d", packet->sequenceNumber);
}
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
args->block = sock_ptr->receiveBlock;
sock_ptr->receiveBlock = NULL;
args->socket = sock_ptr;
retransmission_round_handler(args);
}
} else {
PrrtPacket_destroy(packet);
}
PrrtPace_track_end(sock_ptr->prrtTransmitPace);
}
PrrtBlock_destroy(block);
block = NULL;
void *PrrtDataTransmitter_send_data_loop(void *ptr) {
PrrtSocket *s = ptr;
while (1) {
ListNode *job;
do {
job = Pipe_pull(s->sendDataQueue);
if (PrrtSocket_closing(s)) {
if (s->receiveBlock != NULL) {
PrrtBlock_destroy(s->receiveBlock);
s->receiveBlock = NULL;
}
return NULL;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
PrrtDataTransmitter_transmit(s, packet);
}
}
#ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr);
#include "../socket.h"
void * PrrtDataTransmitter_send_data_loop(void *ptr);
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet);
#endif //PRRT_DATA_TRANSMITTER_H
......@@ -3,10 +3,110 @@
#include "xlap.h"
#include "../util/common.h"
#include "../util/dbg.h"
#include "../util/time.h"
#include "stores/inFlightPacketStore.h"
#include "receiver.h"
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
prrtByteCount_t clean_rto(PrrtReceiver *recv, PrrtApplicationConstraints *constraints) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
uint32_t retransmission_delay = PrrtReceiver_get_retransmission_delay(recv, constraints);
prrtTimestamp_t deadline = now - retransmission_delay;
prrtByteCount_t lostBytes = 0;
lostBytes += PrrtInFlightPacketStore_clear_before(recv->dataInflightPacketStore, deadline);
lostBytes += PrrtInFlightPacketStore_clear_before(recv->redundancyInflightPacketStore, deadline);
recv->packetTracking->pipe -= lostBytes;
recv->packetTracking->bytes_lost = lostBytes;
return lostBytes;
}
void update_rate_sample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime,
PrrtPacketTracking *packetTracking) {
if (packet->delivered_time == 0)
return;
packetTracking->delivered += packet->payloadLength;
packetTracking->delivered_time = receiveTime;
if (packet->delivered > rateSample->prior_delivered) {
rateSample->prior_delivered = packet->delivered;
rateSample->prior_time = packet->delivered_time;
rateSample->is_app_limited = packet->is_app_limited;
rateSample->send_elapsed = packet->sent_time - packet->first_sent_time;
rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time;
packetTracking->first_sent_time = packet->sent_time;
}
packet->delivered_time = 0;
}
bool gnerate_rate_sample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
/* Clear app-limited field */
if (packetTracking->app_limited > 0 && packetTracking->delivered > packetTracking->app_limited)
packetTracking->app_limited = 0;
if (rateSample->prior_time == 0) {
//printf("Prior Time is 0, Cancelling Rate sample calculation\n");
return false;
}
prrtTimedelta_t interval = MAX(rateSample->send_elapsed, rateSample->ack_elapsed);
if(interval < MIN_RTT) {
return false;
}
rateSample->interval = interval;
rateSample->delivered = packetTracking->delivered - rateSample->prior_delivered;
if (rateSample->interval != 0) {
// delivered: bytes; interval: us; convert to bps
rateSample->delivery_rate = (uint32_t) round(
(((double) rateSample->delivered) * 1000.0 * 1000.0) / ((double) rateSample->interval));
}
debug(DEBUG_FEEDBACK, "RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u", rateSample->interval,
rateSample->delivered, rateSample->delivery_rate, rateSample->is_app_limited);
return true;
}
bool update_and_generate_rate_sample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime, prrtTimedelta_t rtt,
PrrtApplicationConstraints *constraints) {
PrrtInFlightPacketStore *inflightPacketStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
inflightPacketStore = recv->dataInflightPacketStore;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
inflightPacketStore = recv->redundancyInflightPacketStore;
} else return false;
bool result = false;
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet_by_seqno(inflightPacketStore, seqnum);
if(packet != NULL) {
update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(inflightPacketStore, seqnum);
prrtByteCount_t lostBytes = clean_rto(recv, constraints);
if (lostBytes > 0) {
BBR_OnSpuriousLoss(recv->bbr, recv->packetTracking);
}
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
result = gnerate_rate_sample(recv->rateSample, recv->packetTracking);
recv->packetTracking->prior_inflight = recv->packetTracking->pipe;
if(recv->rateSample != NULL) {
BBR_OnACK(recv->bbr, recv->csi, recv->rateSample, recv->packetTracking, rtt);
}
}
return result;
}
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t maximum_payload_size) {
PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
check_mem(recv);
recv->host_name = strdup(host);
......@@ -24,8 +124,8 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
recv->csi = PrrtChannelStateInformation_create();
recv->dataPacketStates = PrrtInFlightPacketStore_create();
recv->redundancyPacketStates = PrrtInFlightPacketStore_create();
recv->dataInflightPacketStore = PrrtInFlightPacketStore_create();
recv->redundancyInflightPacketStore = PrrtInFlightPacketStore_create();
struct addrinfo *info;
......@@ -38,12 +138,16 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
check(0 == getaddrinfo(host, portstr, &hints, &info), "getaddrinfo");
recv->ai = info;
recv->bbr = BBR_Init(maximum_payload_size);
recv->closing = false;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&recv->lock, &attr) == 0, "lock init failed.");
check(pthread_cond_init(&recv->wait_for_space, NULL) == EXIT_SUCCESS, "Condition init failed.");
return recv;
......@@ -57,16 +161,20 @@ PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) {
}
bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
if(receiver->bbr) {
BBR_destroy(receiver->bbr);
}
if (receiver->csi != NULL) {
PrrtChannelStateInformation_destroy(receiver->csi);
}
if (receiver->dataPacketStates != NULL) {
PrrtInFlightPacketStore_destroy(receiver->dataPacketStates);
if (receiver->dataInflightPacketStore != NULL) {
PrrtInFlightPacketStore_destroy(receiver->dataInflightPacketStore);
}
if (receiver->redundancyPacketStates != NULL) {
PrrtInFlightPacketStore_destroy(receiver->redundancyPacketStates);
if (receiver->redundancyInflightPacketStore != NULL) {
PrrtInFlightPacketStore_destroy(receiver->redundancyInflightPacketStore);
}
if (receiver->packetTracking != NULL) {
......@@ -81,6 +189,7 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
freeaddrinfo(receiver->ai);
free((void *) receiver->host_name);
check(pthread_mutex_destroy(&receiver->lock) == 0, "lock destroy failed.");
check(pthread_cond_destroy(&receiver->wait_for_space) == 0, "cond destroy failed.");
free(receiver);
return true;
......@@ -89,105 +198,177 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) {
return false;
}
void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime,
PrrtPacketTracking *packetTracking) {
if (packet->delivered_time == 0)
return;
void PrrtReceiver_rto_check(PrrtReceiver *recv, PrrtApplicationConstraints *constraints) {
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
packetTracking->delivered += packet->payloadLength;
packetTracking->delivered_time = receiveTime;
prrtByteCount_t lostBytes = clean_rto(recv, constraints);
if (packet->delivered > rateSample->prior_delivered) {
rateSample->prior_delivered = packet->delivered;
rateSample->prior_time = packet->delivered_time;
rateSample->is_app_limited = packet->is_app_limited;
rateSample->send_elapsed = packet->sent_time - packet->first_sent_time;
rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time;
packetTracking->first_sent_time = packet->sent_time;
if (lostBytes > 0) {
BBR_OnRTOLoss(recv->bbr);
}
packet->delivered_time = 0;
if (lostBytes == 0) {
BBR_OnLossExit(recv->bbr);
}
pthread_cond_broadcast(&recv->wait_for_space);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
return;
error:
PERROR("PrrtReceiver_rto_check() failed.")
}
void PrrtReceiver_on_application_write(PrrtReceiver* receiver) {
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) {
prrtByteCount_t res = 0;
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
res = recv->packetTracking->pipe;
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("PrrtReceiver_get_pipe() failed.")
return 0;
}
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
debug(DEBUG_RECEIVER, "OnApplicationWrite: %u, %d", sequenceNumber, send_queue_length);
PrrtPacketTracking *tracking = receiver->packetTracking;
if(PrrtInFlightPacketStore_get_queue_size(receiver->dataPacketStates) +
PrrtInFlightPacketStore_get_queue_size(receiver->redundancyPacketStates) == 0) {
if(send_queue_length == 0 && tracking->pipe < BBR_getCwnd(receiver->bbr)) {
tracking->app_limited = (tracking->delivered + tracking->pipe) ? : 1;
}
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
debug(DEBUG_RECEIVER, "OnApplicationWrite done: %d.", send_queue_length);
return;
error:
PERROR("Mutex error.%s", "");
}
void PrrtReceiver_on_ack(PrrtReceiver *receiver,
PrrtPacketFeedbackPayload *feedbackPayload,
prrtTimestamp_t receiveTime,
prrtTimedelta_t rtt,
PrrtApplicationConstraints *constraints) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
//debug(DEBUG_RECEIVER, "PrrtReceiver_on_ack");
PrrtChannelStateInformation_update_plr(receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount);
bool valid_sample = update_and_generate_rate_sample(receiver, feedbackPayload->ackSequenceNumber,
feedbackPayload->ackPacketType, receiveTime, rtt,
constraints);
if(valid_sample) {
PrrtChannelStateInformation_update_delivery_rate(receiver->csi, receiver->rateSample->delivery_rate);
}
bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) {
/* Clear app-limited field */
if (packetTracking->app_limited && packetTracking->delivered > packetTracking->app_limited)
packetTracking->app_limited = 0;
pthread_cond_broadcast(&receiver->wait_for_space);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return;
if (rateSample->prior_time == 0) {
//printf("Prior Time is 0, Cancelling Rate sample calculation\n");
return false;
error:
PERROR("Mutex error.%s", "");
}
prrtTimedelta_t interval = MAX(rateSample->send_elapsed, rateSample->ack_elapsed);
if(interval < MIN_RTT) {
return false;
double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingRate(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0.0;
}
rateSample->interval = interval;
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
uint32_t res = BBR_getState(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
rateSample->delivered = packetTracking->delivered - rateSample->prior_delivered;
error:
PERROR("Mutex error.%s", "");
return 0;
}
if (rateSample->interval != 0) {
// delivered: bytes; interval: us; convert to bps
rateSample->delivery_rate = (uint32_t) round(
(((double) rateSample->delivered) * 1000.0 * 1000.0 * 8.0) / ((double) rateSample->interval));
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
double res = BBR_getPacingGain(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return 0.0;
}
debug(DEBUG_FEEDBACK, "RS interval: %u, RS delivered: %u, RS delivery_rate: %u, App Limited: %u", rateSample->interval,
rateSample->delivered, rateSample->delivery_rate, rateSample->is_app_limited);
return true;
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
prrtByteCount_t cwnd = BBR_getCwnd(receiver->bbr);
prrtByteCount_t pipe = receiver->packetTracking->pipe;
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
if (cwnd < pipe) {
return 0;
}
return cwnd - pipe;
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (packetType == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
} else if (packetType == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
} else return false;
bool result = false;
error:
PERROR("Mutex error.%s", "");
return 0;
}
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(packetStore, seqnum);
if (packet != NULL) {
PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking);
result = PrrtReceiver_generateRateSample(recv->rateSample, recv->packetTracking);
recv->packetTracking->pipe -= packet->payloadLength;
PrrtInFlightPacketStore_remove_outstanding_packet(packetStore, seqnum);
bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver,
prrtByteCount_t maximum_payload_size,
PrrtApplicationConstraints *applicationConstraints) {
bool waitResult = true;
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
int64_t space = PrrtReceiver_get_space(receiver);
while (space < maximum_payload_size) {
uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(receiver, applicationConstraints);
debug(DEBUG_RECEIVER, "Wait for: %d", waittime_us);
struct timespec deadline = abstime_from_now(waittime_us);
pthread_cond_timedwait(&receiver->wait_for_space, &receiver->lock, &deadline);
if (atomic_load_explicit(&receiver->closing, memory_order_acquire)) {
waitResult = false;
break;
}
PrrtReceiver_rto_check(receiver, applicationConstraints);
space = PrrtReceiver_get_space(receiver);
}
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return waitResult;
error:
PERROR("Mutex error.%s", "");
return false;
}
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
return result;
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *receiver, PrrtApplicationConstraints* applicationConstraints) {
uint32_t rtprop = BBR_getRTProp(receiver->bbr);
if (rtprop == RTprop_Inf) {
rtprop = (uint32_t) round(0.5 * PrrtApplicationConstraints_get_target_delay(applicationConstraints));
}
int processing_margin_us = 1000;
double waittime_us = rtprop * 1.0 + processing_margin_us;
return (uint32_t) round(waittime_us);
}
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return res;
error:
PERROR("Mutex error.%s", "");
return false;
return 0;
}
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime) {
PrrtInFlightPacketStore *packetStore = NULL;
if (PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
packetStore = recv->dataPacketStates;
packetStore = recv->dataInflightPacketStore;
} else if (PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
packetStore = recv->redundancyPacketStates;
packetStore = recv->redundancyInflightPacketStore;
} else return;
//printf("Adding Packet #%u to %u\n", packet->sequenceNumber, PrrtPacket_type(packet));
check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed.");
if (recv->packetTracking->pipe == 0) {
......@@ -202,10 +383,22 @@ void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *p
packet->is_app_limited = (recv->packetTracking->app_limited != 0);
PrrtInFlightPacketStore_add_outstanding_packet(packetStore, packet);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
pthread_cond_broadcast(&recv->wait_for_space);
check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed.");
return;
error:
PERROR("Lock error.%s", "");
}
void PrrtReceiver_interrupt(PrrtReceiver *receiver) {
check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed.");
atomic_store_explicit(&receiver->closing, true, memory_order_release);
pthread_cond_broadcast(&receiver->wait_for_space);
check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed.");
return;
error:
PERROR("PrrtReceiver_interrupt failed.");
}
\ No newline at end of file
......@@ -7,28 +7,10 @@
#include <sys/socket.h>
#include <netdb.h>
#include "stores/inFlightPacketStore.h"
#include "bbr.h"
#include "types/applicationConstraints.h"
#include "types/channelStateInformation.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // Bps
} PrrtRateSample;
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
} PrrtPacketTracking;
typedef struct prrtReceiver {
const char *host_name;
......@@ -36,23 +18,39 @@ typedef struct prrtReceiver {
struct addrinfo *ai;
PrrtChannelStateInformation *csi;
pthread_mutex_t lock;
pthread_cond_t wait_for_space;
atomic_bool closing;
BBR* bbr;
PrrtInFlightPacketStore *dataPacketStates;
PrrtInFlightPacketStore *redundancyPacketStates;
PrrtInFlightPacketStore *dataInflightPacketStore;
PrrtInFlightPacketStore *redundancyInflightPacketStore;
PrrtRateSample *rateSample;
PrrtPacketTracking *packetTracking;
} PrrtReceiver;
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port);
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime);
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t maximum_payload_size);
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_rto_check(PrrtReceiver *recv, PrrtApplicationConstraints *constraints);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *feedbackPayload, prrtTimestamp_t receiveTime,
prrtTimedelta_t rtt, PrrtApplicationConstraints *constraints);
bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver, prrtByteCount_t maximum_payload_size,
PrrtApplicationConstraints *pConstraints);
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *socket, PrrtApplicationConstraints *applicationConstraints);
void PrrtReceiver_on_application_write(PrrtReceiver* receiver);
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingRate(PrrtReceiver *receiver);
double PrrtReceiver_get_BBR_pacingGain(PrrtReceiver *receiver);
uint32_t PrrtReceiver_get_BBR_state(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);
void PrrtReceiver_interrupt(PrrtReceiver *receiver);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H
This diff is collapsed.
#ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H
#include "../defines.h"
#include "../util/list.h"
#include "../util/pipe.h"
......@@ -9,6 +10,8 @@
#include "stores/dataPacketStore.h"
#include "stores/deliveredPacketTable.h"
#include "stores/packetTimeoutTable.h"
#include "stores/pace.h"
#include "stores/paceFilter.h"
#include "stores/receptionTable.h"
#include "stores/repairBlockStore.h"
#include "stores/packetDeliveryStore.h"
......@@ -17,6 +20,7 @@
#include "types/lossStatistics.h"
#include "types/packet.h"
#include "clock.h"
#include "timer.h"
#include "xlap.h"
#include "receiver.h"
......@@ -31,8 +35,12 @@ typedef struct prrtSocket {
struct sockaddr_in *address;
bool isBound;
bool withTimestamp;
bool pacingEnabled;
PrrtClock clock;
PrrtBlock* receiveBlock;
pthread_t sendDataThread;
Pipe *sendDataQueue;
......@@ -50,6 +58,8 @@ typedef struct prrtSocket {
atomic_bool closing;
prrtTimestamp_t nextSendTime;
prrtSequenceNumber_t packetsCount;
prrtSequenceNumber_t sequenceNumberSource;
prrtSequenceNumber_t sequenceNumberRepetition;
......@@ -69,6 +79,18 @@ typedef struct prrtSocket {
PrrtCodingConfiguration *codingParameters;
PrrtCoder *coder;
prrtTimedelta_t rtt;
// Pacing
PrrtPace* appSendPace;
PrrtPace* prrtTransmitPace;
PrrtPace* prrtReceivePace;
PrrtPace* appDeliverPace;
prrtTimedelta_t send_peer_btl_pace;
prrtTimedelta_t recv_peer_btl_pace;
prrtTimedelta_t send_peer_app_total_pace;
_Atomic (XlapTimestampTable *) tstable[2];
pthread_attr_t *sendDataThreadAttr;
......@@ -77,8 +99,12 @@ typedef struct prrtSocket {
atomic_bool isHardwareTimestamping;
char *interfaceName;
PrrtChannelStateInformation* senderChannelStateInformation;
atomic_bool isThreadPinning;
prrtByteCount_t maximum_payload_size;
PrrtTimer *retransmissionTimer;
} PrrtSocket;
......@@ -106,7 +132,9 @@ int PrrtSocket_close(PrrtSocket *s);
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
bool PrrtSocket_pace(PrrtSocket *sock_ptr, bool prepace);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
......@@ -118,7 +146,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
bool PrrtSocket_cleanup(PrrtSocket *s);
bool PrrtSocket_closing(PrrtSocket *s);
......@@ -126,11 +153,23 @@ bool PrrtSocket_closing(PrrtSocket *s);
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s);
bool PrrtSocket_get_filled_pipe(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s);
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s);
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s);
float PrrtSocket_get_pacing_gain(PrrtSocket *s);
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s);
uint32_t PrrtSocket_get_inflight(PrrtSocket *s);
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s);
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s);
#endif // PRRT_SOCKET_H