...
 
Commits (116)
......@@ -22,6 +22,9 @@ build:prrt:
- which cmake
- which gcc
- which g++
- cmake --version
- gcc --version
- g++ --version
- pip3 list | grep Cython
- pip3 list | grep numpy
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
......@@ -38,6 +41,28 @@ build:container:
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_xlap:
stage: build
tags:
- docker
script:
- export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_xlap" | sed 's#/#_#' | sed 's#^master$#latest#')
- docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_xlap .
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
- docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
- docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem:
stage: test
dependencies:
......
......@@ -10,7 +10,7 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
set(CMAKE_C_FLAGS "-O2 -Wall -std=gnu11 -D_GNU_SOURCE -fPIC" )
set(CMAKE_C_FLAGS_DEBUG "-O0 -fsanitize=undefined -fsanitize=address -g3" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE -fsanitize=undefined -fsanitize=address" )
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -Wall -ggdb -fsanitize=undefined -fsanitize=address -g3" )
set(CMAKE_CXX_FLAGS_RELEASE "-Os -Wall" )
......
......@@ -29,9 +29,9 @@ port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
d = s.recv().decode("utf8")
if d != "Close":
print d
print(d)
else:
break
```
......@@ -44,25 +44,27 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port)
s = prrt.PrrtSocket(port=localport)
s.connect(host, port)
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
```
Start the receiver by:
```bash
python receiver.py 5000
python3 receiver.py 5000
```
In a separate terminal, run:
```bash
python sender.py 127.0.0.1 5000
python3 sender.py 127.0.0.1 5000 6000
```
This should generate the following output in the receiver console:
......
......@@ -89,9 +89,16 @@ local ex_type = Field.new("prrt.type")
local function getType() return ex_type()() end
local function getTypeName() return prrtPacketTypeNames[getType()] end
local ex_index = Field.new("prrt.index")
local function getIndex() return ex_index()() end
local ex_data_length = Field.new("prrt.data.length")
local function getDataLength() return ex_data_length()() end
local ex_red_baseseqno = Field.new("prrt.redundancy.baseSequenceNumber")
local function getRedBaseSeqNo() return ex_red_baseseqno()() end
local ex_red_n = Field.new("prrt.redundancy.n")
local function getRedN() return ex_red_n()() end
......@@ -109,7 +116,7 @@ local function dissect_data(buffer, pinfo, root)
tree:add(pf_data_groupRTprop, buffer:range(8,4))
tree:add(pf_data_packettimeout, buffer:range(12,4))
local label = "DATA Len=" .. getDataLength()
local label = "[D] Idx=" .. getIndex() .. " Len=" .. getDataLength()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -121,7 +128,7 @@ local function dissect_redundancy(buffer, pinfo, root)
tree:add(pf_red_n, buffer:range(6,1))
tree:add(pf_red_k, buffer:range(7,1))
local label = "REDUNDANCY n=" .. getRedN() .. " k=" .. getRedK()
local label = "[R] Idx=" .. getIndex() .. " b=" .. getRedBaseSeqNo() .. " n=" .. getRedN() .. " k=" .. getRedK()
tree:set_text(label)
pinfo.cols.info:set(label)
end
......@@ -139,7 +146,7 @@ local function dissect_feedback(buffer, pinfo, root)
tree:add(pf_fb_acktype, buffer:range(20,1))
tree:add(pf_fb_ackSeqN, buffer:range(21, 2))
local label = "FEEDBACK"
local label = "[F]"
tree:set_text(label)
pinfo.cols.info:set(label)
end
......
......@@ -2,9 +2,13 @@ FROM gcc:5
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y cmake
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
......@@ -15,8 +19,7 @@ WORKDIR /prrt
RUN cmake . \
&& make
ENV PATH /prrt/bin:$PATH
WORKDIR /prrt/bin
ENV PATH /prrt:$PATH
VOLUME /output
......
FROM gcc:5
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
WORKDIR /prrt
RUN cmake -DTCP=1 . \
&& make
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
FROM gcc:5
MAINTAINER Andreas Schmidt <schmidt@nt.uni-saarland.de>
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
traceroute \
tshark
COPY CMakeLists.txt /prrt/
COPY prrt /prrt/prrt
COPY tests /prrt/tests
COPY docker/entrypoint.sh /
WORKDIR /prrt
RUN cmake -DCMAKE_BUILD_TYPE=Debug -DXLAP=1 -DDEBUG=1 . \
&& make -i
ENV PATH /prrt:$PATH
VOLUME /output
ENTRYPOINT ["/entrypoint.sh"]
......@@ -5,13 +5,15 @@ dev=eth0
command=$1
shift
if [[ "$command" == "sender" || "$command" == "receiver" ]]; then
if [[ "$command" == "sender" || "$command" == "receiver" || "$command" == "time-sender" || "$command" == "time-receiver" ]]; then
:
else
echo "Command should be either sender or receiver."
exit 0;
fi
TARGET="127.0.0.1"
OUTPUT="/dev/null"
NETEM=()
PRRT=()
while [[ $# -gt 0 ]]
......@@ -19,7 +21,24 @@ do
key="$1"
case $key in
-t|--target|-p|--port|-r|--rounds)
-t|--target)
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
PRRT+=("$1 $2")
fi
TARGET=("$2")
shift
shift
;;
-w|--wireshark)
OUTPUT=("$2")
shift
shift
;;
-T|--threadpinning)
PRRT+=("$1")
shift
;;
-p|--port|-r|--rounds|-s|--size|-R|--rcvbuf|-S|--sndbuf|-o|--output|-a|--appdelay)
PRRT+=("$1 $2")
shift
shift
......@@ -34,8 +53,31 @@ done
PRRT_PARAMS="${PRRT[@]}"
NETEM_PARAMS="${NETEM[@]}"
echo "Starting Wireshark."
tshark -i eth0 -w $OUTPUT.pcap &
TSHARK_PID=$!
sleep 5
start=$(date +%s.%N);
echo "Checking reachability of $TARGET."
until ping -c1 $TARGET &>/dev/null; sleep 1; do :; done
dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Reachable after %.6f seconds\n" $dur
traceroute $TARGET > $OUTPUT.tr
echo "Traceroute done."
if [[ "$command" == "sender" || "$command" == "time-sender" ]]; then
echo "Delaying sender start."
else
echo "Delaying receiver start."
fi
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
tc qdisc add dev $dev root netem $NETEM_PARAMS
/prrt/$command $PRRT_PARAMS -o /output/log.csv
/prrt/$command $PRRT_PARAMS
echo "Done."
tc qdisc del dev $dev root
kill $TSHARK_PID
......@@ -10,14 +10,26 @@ if (XLAP)
add_definitions(-DXLAP)
endif()
option (TCP "Set time protocol to TCP.")
if (TCP)
add_definitions(-DTCP)
endif()
add_subdirectory(proto)
add_subdirectory(util)
add_executable(sender sender.c)
add_executable(receiver receiver.c)
add_executable(refcount refcount.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
......@@ -119,11 +119,13 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int PrrtSocket_send_async(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
......@@ -144,11 +146,23 @@ cdef extern from "proto/socket.h":
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s)
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s)
bint PrrtSocket_get_filled_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s)
float PrrtSocket_get_pacing_gain(PrrtSocket *s)
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s)
uint32_t PrrtSocket_get_inflight(PrrtSocket *s)
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s)
uint32_t PrrtSocket_get_send_quantum(PrrtSocket *s)
uint32_t PrrtSocket_get_pipe(PrrtSocket *s)
uint32_t PrrtSocket_get_delivered(PrrtSocket *s)
bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/packetDeliveryStore.h":
......
set (PRRT_SOURCES ../defines.h
types/block.c types/block.h
types/channelStateInformation.c types/channelStateInformation.h
bbr.c bbr.h
clock.c clock.h
types/codingParams.c types/codingParams.h
receiver.c receiver.h
socket.c socket.h
types/applicationConstraints.c types/applicationConstraints.h
timer.c timer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/dataPacketStore.c stores/dataPacketStore.h
......@@ -15,10 +13,14 @@ set (PRRT_SOURCES ../defines.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
types/applicationConstraints.c types/applicationConstraints.h
types/block.c types/block.h
types/codingParams.c types/codingParams.h
types/channelStateInformation.c types/channelStateInformation.h
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
types/packetTimeout.c types/packetTimeout.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
......
This diff is collapsed.
#ifndef PRRT_BBR_H
#define PRRT_BBR_H
#include <stdint.h>
#include "stdbool.h"
#include "types/packet.h"
#include "clock.h"
#include "types/channelStateInformation.h"
#include "types/packetTracking.h"
#include "types/rateSample.h"
#include "../util/windowedFilter.h"
#define PROBE_GAIN 1.25
#define DRAIN_GAIN 0.75
#define RTpropFilterLen 10000000 //10s
#define BBRHighGain ((((float)2885) / 1000) + 1)
#define BBRGainCycleLen 8
#define ProbeRTTDuration 200000 //200ms
#define RTprop_Inf UINT32_MAX
enum bbr_state {
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
typedef struct bbr {
pthread_mutex_t lock;
prrtByteCount_t mps;
prrtByteCount_t min_pipe_cwnd;
prrtByteCount_t initial_cwnd;
prrtTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimestamp_t probe_rtt_done_stamp;
bool probe_rtt_round_done;
bool packet_conservation;
prrtByteCount_t prior_cwnd;
prrtByteCount_t cwnd;
prrtByteCount_t target_cwnd;
bool idle_restart;
enum bbr_state state;
double pacing_gain;
float cwnd_gain;
bool filled_pipe;
prrtByteCount_t full_bw;
uint32_t full_bw_count;
double pacing_rate;
bool has_seen_rtt;
uint32_t next_round_delivered;
bool round_start;
uint32_t round_count;
uint32_t next_rtt_delivered;
uint32_t rtt_count;
bool rtprop_expired;
bool is_loss_recovery;
prrtTimestamp_t loss_recovery_stamp;
prrtTimestamp_t cycle_stamp;
uint8_t cycle_index;
float* pacing_gain_cycle;
prrtByteCount_t send_quantum;
prrtDeliveryRate_t bw;
WindowedFilter* btlBwFilter;
} BBR;
BBR* BBR_Init(prrtByteCount_t maximum_payload_size);
void BBR_OnACK(BBR* bbr, PrrtChannelStateInformation* csi, PrrtRateSample* rs, PrrtPacketTracking* packetTracking, prrtTimedelta_t rtt);
void BBR_OnSpuriousLoss(BBR *bbr, PrrtPacketTracking *tracking);
void BBR_OnRTOLoss(BBR *bbr);
void BBR_OnLossExit(BBR *bbr);
void BBR_destroy(BBR* bbr);
double BBR_getPacingRate(BBR* bbr);
prrtByteCount_t BBR_getCwnd(BBR* bbr);
prrtDeliveryRate_t BBR_getBtlBw(BBR* bbr);
uint32_t BBR_getState(BBR* bbr);
prrtByteCount_t BBR_getFullBw(BBR* bbr);
double BBR_getPacingGain(BBR* bbr);
uint32_t BBR_getCycleIndex(BBR* bbr);
bool BBR_getFilledPipe(BBR* bbr);
uint32_t BBR_getRTProp(BBR* bbr);
prrtByteCount_t BBR_getInflight(BBR* bbr);
prrtByteCount_t BBR_getSendQuantum(BBR* bbr);
bool BBR_getRoundStart(BBR* bbr);
#endif //PRRT_BBR_H
This diff is collapsed.
......@@ -2,12 +2,15 @@
#include <netdb.h>
#include <string.h>
#include "../../defines.h"
#include "../timer.h"
#include "../receiver.h"
#include "../socket.h"
#include "../types/block.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../../util/time.h"
#include "dataTransmitter.h"
#include <math.h>
bool send_to_socket(PrrtSocket* sock_ptr, uint8_t* buf, prrtPacketLength_t length, struct timespec *packet_timestamp, uint64_t* packet_clockstamp) {
PrrtReceiver *recv = sock_ptr->receiver;
......@@ -77,6 +80,41 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
prrtPacketLength_t length = PrrtPacket_size(packet);
prrtPacketLength_t payloadLength = packet->payloadLength;
bool paceSuccessful = PrrtSocket_pace(sock_ptr);
if (!paceSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Pacing interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Pacing interval passed.");
bool waitSuccessful = PrrtReceiver_wait_for_space(sock_ptr->receiver, sock_ptr->maximum_payload_size,
sock_ptr->applicationConstraints);
if(!waitSuccessful) {
debug(DEBUG_DATATRANSMITTER, "Wait for space interrupted.");
PrrtPacket_destroy(packet);
return false;
}
debug(DEBUG_DATATRANSMITTER, "Space available.");
prrtTimestamp_t now = PrrtClock_get_current_time_us();
if (sock_ptr->pacingEnabled) {
double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver);
if(pacing_rate != 0) {
prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate));
debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime);
sock_ptr->nextSendTime = now + pacingTime;
}
}
// Update timestamp
if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) {
((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_get_BBR_btlDr(sock_ptr->receiver);
} else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) {
((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us();
}
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
......@@ -125,57 +163,110 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false;
}
void *send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
typedef struct timer_arg {
PrrtSocket* socket;
PrrtBlock* block;
} RetransmissionTimerArgs;
while (1) {
ListNode *job;
do {
job = Pipe_pull(sock_ptr->sendDataQueue);
if (PrrtSocket_closing(sock_ptr)) {
if (block != NULL) {
PrrtBlock_destroy(block);
}
return NULL;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (block == NULL) {
block = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
void retransmission_round_handler(void *arg) {
uint8_t j;
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;
PrrtBlock *block = args->block;
PrrtSocket *socket = args->socket;
if(block->inRound > 0) {
PrrtReceiver_rto_check(socket->receiver, socket->applicationConstraints);
}
if (PrrtSocket_closing(socket) || block->inRound >= block->codingParams->c) {
PrrtBlock_destroy(block);
free(arg);
return;
}
uint32_t redundancyPackets = block->codingParams->n_cycle[block->inRound];
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_next_red_data(block);
bool sendResult = send_packet(socket, red_pkt);
if(!sendResult) {
debug(DEBUG_DATATRANSMITTER, "Sending redundancy data failed.");
PrrtBlock_destroy(block);
free(arg);
return;
}
}
packet->index = (uint8_t) (packet->sequenceNumber - block->baseSequenceNumber);
block->inRound++;
PrrtTimerTask task = {
.arg = arg,
.fun = retransmission_round_handler
};
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
uint32_t waittime_us = PrrtReceiver_get_retransmission_delay(socket->receiver, socket->applicationConstraints);
prrtTimerDate deadline = abstime_from_now(waittime_us);
debug(DEBUG_DATATRANSMITTER, "Set timer to expire in: %dus", waittime_us);
PrrtTimer_submit(socket->retransmissionTimer, &deadline, &task);
}
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitStart);
if (sock_ptr->receiveBlock == NULL) {
sock_ptr->receiveBlock = PrrtBlock_create(PrrtCodingConfiguration_copy(sock_ptr->codingParameters), PrrtCoder_copy(sock_ptr->coder), packet->sequenceNumber);
sock_ptr->receiveBlock->senderBlock = true;
}
packet->index = (uint8_t) (packet->sequenceNumber - sock_ptr->receiveBlock->baseSequenceNumber);
PrrtPacketDataPayload *payload = packet->payload;
payload->groupRTprop_us = PrrtSocket_get_rtprop_fwd(sock_ptr);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
send_packet(sock_ptr, packetToSend);
PrrtPacket *packetToSend = PrrtPacket_copy(packet);
debug(DEBUG_DATATRANSMITTER, "Send: %i", packet->sequenceNumber);
int sendResult = send_packet(sock_ptr, packetToSend);
if (sendResult) {
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtTransmitEnd);
PrrtBlock_insert_data_packet(block, packet);
if(PrrtBlock_insert_data_packet(sock_ptr->receiveBlock, packet) == false) {
PERROR("Failed to insert packet: %d", packet->sequenceNumber);
}
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
if (PrrtBlock_encode_ready(sock_ptr->receiveBlock)) {
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeStart);
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
PrrtBlock_encode(sock_ptr->receiveBlock, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
PrrtBlock_destroy(block);
block = NULL;
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
args->block = sock_ptr->receiveBlock;
sock_ptr->receiveBlock = NULL;
args->socket = sock_ptr;
retransmission_round_handler(args);
}
} else {
PrrtPacket_destroy(packet);
}
}
void *PrrtDataTransmitter_send_data_loop(void *ptr) {
PrrtSocket *s = ptr;
while (1) {
ListNode *job;
do {
job = Pipe_pull(s->sendDataQueue);
if (PrrtSocket_closing(s)) {
if (s->receiveBlock != NULL) {
PrrtBlock_destroy(s->receiveBlock);
s->receiveBlock = NULL;
}
return NULL;
}
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
PrrtDataTransmitter_transmit(s, packet);
}
}
#ifndef PRRT_DATA_TRANSMITTER_H
#define PRRT_DATA_TRANSMITTER_H
void * send_data_loop(void *ptr);
#include "../socket.h"
void * PrrtDataTransmitter_send_data_loop(void *ptr);
void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet);
#endif //PRRT_DATA_TRANSMITTER_H
This diff is collapsed.
......@@ -7,28 +7,10 @@
#include <sys/socket.h>
#include <netdb.h>
#include "stores/inFlightPacketStore.h"
#include "bbr.h"
#include "types/applicationConstraints.h"
#include "types/channelStateInformation.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // Bps
} PrrtRateSample;
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
} PrrtPacketTracking;
typedef struct prrtReceiver {
const char *host_name;
......@@ -36,23 +18,38 @@ typedef struct prrtReceiver {
struct addrinfo *ai;
PrrtChannelStateInformation *csi;
pthread_mutex_t lock;
pthread_cond_t wait_for_space;
atomic_bool closing;
BBR* bbr;
PrrtInFlightPacketStore *dataPacketStates;
PrrtInFlightPacketStore *redundancyPacketStates;
PrrtInFlightPacketStore *dataInflightPacketStore;
PrrtInFlightPacketStore *redundancyInflightPacketStore;
PrrtRateSample *rateSample;
PrrtPacketTracking *packetTracking;
} PrrtReceiver;
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port);
bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType,
prrtTimestamp_t receiveTime);
PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t maximum_payload_size);
void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime);
prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv);
void PrrtReceiver_rto_check(PrrtReceiver *recv, PrrtApplicationConstraints *constraints);
void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber);
void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload *feedbackPayload, prrtTimestamp_t receiveTime,
prrtTimedelta_t rtt, PrrtApplicationConstraints *constraints);
uint32_t PrrtReceiver_get_retransmission_delay(PrrtReceiver *socket, PrrtApplicationConstraints *applicationConstraints);
bool PrrtReceiver_wait_for_space(PrrtReceiver *receiver, prrtByteCount_t maximum_payload_size,
PrrtApplicationConstraints *pConstraints);
void PrrtReceiver_on_application_write(PrrtReceiver* receiver);
prrtByteCount_t PrrtReceiver_get_space(PrrtReceiver *receiver);
double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver);
prrtDeliveryRate_t PrrtReceiver_get_BBR_btlDr(PrrtReceiver *receiver);
void PrrtReceiver_interrupt(PrrtReceiver *receiver);
bool PrrtReceiver_destroy(PrrtReceiver *receiver);
#endif //PRRT_RECEIVER_H
This diff is collapsed.
......@@ -17,6 +17,7 @@
#include "types/lossStatistics.h"
#include "types/packet.h"
#include "clock.h"
#include "timer.h"
#include "xlap.h"
#include "receiver.h"
......@@ -31,8 +32,12 @@ typedef struct prrtSocket {
struct sockaddr_in *address;
bool isBound;
bool withTimestamp;
bool pacingEnabled;
PrrtClock clock;
PrrtBlock* receiveBlock;
pthread_t sendDataThread;
Pipe *sendDataQueue;
......@@ -50,6 +55,8 @@ typedef struct prrtSocket {
atomic_bool closing;
prrtTimestamp_t nextSendTime;
prrtSequenceNumber_t packetsCount;
prrtSequenceNumber_t sequenceNumberSource;
prrtSequenceNumber_t sequenceNumberRepetition;
......@@ -77,8 +84,12 @@ typedef struct prrtSocket {
atomic_bool isHardwareTimestamping;
char *interfaceName;
PrrtChannelStateInformation* senderChannelStateInformation;
atomic_bool isThreadPinning;
prrtByteCount_t maximum_payload_size;
PrrtTimer *retransmissionTimer;
} PrrtSocket;
......@@ -106,7 +117,10 @@ int PrrtSocket_close(PrrtSocket *s);
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int PrrtSocket_send_async(PrrtSocket *s, const uint8_t *data, const size_t data_len);
int PrrtSocket_send_sync(PrrtSocket *s, const uint8_t *data, size_t data_len);
bool PrrtSocket_pace(PrrtSocket *s);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
......@@ -118,7 +132,6 @@ int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
bool PrrtSocket_cleanup(PrrtSocket *s);
bool PrrtSocket_closing(PrrtSocket *s);
......@@ -126,11 +139,23 @@ bool PrrtSocket_closing(PrrtSocket *s);
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s);
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *s);
prrtPacketLossRate_t PrrtSocket_get_plr_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
uint64_t PrrtSocket_get_full_bw(PrrtSocket *s);
bool PrrtSocket_get_filled_pipe(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s);
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s);
bool PrrtSocket_get_bbr_is_app_limited(PrrtSocket *s);
uint32_t PrrtSocket_get_cycle_index(PrrtSocket *s);
float PrrtSocket_get_pacing_gain(PrrtSocket *s);
uint32_t PrrtSocket_get_cwnd(PrrtSocket *s);
uint32_t PrrtSocket_get_inflight(PrrtSocket *s);
uint32_t PrrtSocket_get_pacing_rate(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s);
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s);
#endif // PRRT_SOCKET_H
......@@ -80,9 +80,11 @@ void PrrtDataPacketStore_remove_range(PrrtDataPacketStore *store, List *res, prr
bool PrrtDataStore_insert(PrrtDataPacketStore *store, PrrtPacket *packet)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
debug(DEBUG_BLOCK, "PrrtDataStore_insert: %d", packet->sequenceNumber);
if(BPTree_get(store->dataStore, packet->sequenceNumber) == NULL) {
store->dataStore = BPTree_insert(store->dataStore, packet->sequenceNumber, packet);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
debug(DEBUG_BLOCK, "PrrtDataStore_size: %d", PrrtDataPacketStore_size(store));
return true;
}
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
......
#include <stdint.h>
#include "inFlightPacketStore.h"
#include "../clock.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "inFlightPacketStore.h"
PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
......@@ -9,7 +10,8 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
PrrtInFlightPacketStore* packetStore = calloc(1, sizeof(PrrtInFlightPacketStore));
check_mem(packetStore);
packetStore->outstandingPackets = NULL;
packetStore->outstandingPackets_by_seqno = NULL;
packetStore->outstandingPackets_by_senttime = NULL;
return packetStore;
......@@ -20,34 +22,63 @@ PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void)
void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet)
{
packetStore->outstandingPackets = BPTree_insert(packetStore->outstandingPackets, packet->sequenceNumber,
packetStore->outstandingPackets_by_seqno = BPTree_insert(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber,
packet);
packetStore->packetQueueSize++;
packetStore->outstandingPackets_by_senttime = BPTree_insert(packetStore->outstandingPackets_by_senttime, packet->sent_time,
packet);
}
void* PrrtInFlightPacketStore_get_packet_by_seqno(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum)
{
return BPTree_get(packetStore->outstandingPackets_by_seqno, seqNum);
}
void* PrrtInFlightPacketStore_get_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum)
void* PrrtInFlightPacketStore_get_packet_by_senttime(PrrtInFlightPacketStore *packetStore, prrtTimestamp_t senttime)
{
return BPTree_get(packetStore->outstandingPackets, seqNum);
return BPTree_get(packetStore->outstandingPackets_by_senttime, senttime);
}
void PrrtInFlightPacketStore_remove_outstanding_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum)
void PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(PrrtInFlightPacketStore *packetStore,
BPTreeKey_t seqNum)
{
PrrtPacket* packet = (PrrtPacket*) BPTree_get(packetStore->outstandingPackets, seqNum);
if (packet != NULL) {
packetStore->outstandingPackets = BPTree_delete(packetStore->outstandingPackets, seqNum);
packetStore->packetQueueSize--;
PrrtPacket* outstanding_packet = (PrrtPacket*) BPTree_get(packetStore->outstandingPackets_by_seqno, seqNum);
if(outstanding_packet != NULL) {
packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, seqNum);
packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, outstanding_packet->sent_time);
PrrtPacket_destroy(outstanding_packet);
}
}
prrtByteCount_t PrrtInFlightPacketStore_clear_before(PrrtInFlightPacketStore *packetStore,
prrtTimestamp_t time) {
prrtByteCount_t lostBytes = 0;
List *packetStateList = List_create();
if(time >= HALF_TIMESTAMP) {
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, time - HALF_TIMESTAMP, time - 1);
} else {
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, 0, time - 1);
BPTree_get_range(packetStore->outstandingPackets_by_senttime, packetStateList, time + HALF_TIMESTAMP, MAX_TIMESTAMP);
}
while (List_count(packetStateList) > 0) {
PrrtPacket *packet = List_shift(packetStateList);
lostBytes += packet->payloadLength;
packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber);
packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, packet->sent_time);
PrrtPacket_destroy(packet);
}
List_destroy(packetStateList);
return lostBytes;
}
bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore)
{
if (packetStore->outstandingPackets != NULL) {
if (packetStore->outstandingPackets_by_seqno != NULL) {
List *packetStateList = List_create();
BPTree_get_range(packetStore->outstandingPackets, packetStateList, 0, SEQNO_SPACE - 1);
BPTree_get_range(packetStore->outstandingPackets_by_seqno, packetStateList, 0, SEQNO_SPACE - 1);
while (List_count(packetStateList) > 0) {
PrrtPacket *packet = List_shift(packetStateList);
packetStore->outstandingPackets = BPTree_delete(packetStore->outstandingPackets, packet->sequenceNumber);
packetStore->outstandingPackets_by_seqno = BPTree_delete(packetStore->outstandingPackets_by_seqno, packet->sequenceNumber);
packetStore->outstandingPackets_by_senttime = BPTree_delete(packetStore->outstandingPackets_by_senttime, packet->sent_time);
PrrtPacket_destroy(packet);
}
......@@ -56,8 +87,3 @@ bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore)
free(packetStore);
return true;
}
uint32_t PrrtInFlightPacketStore_get_queue_size(PrrtInFlightPacketStore *packetStore)
{
return packetStore->packetQueueSize;
}
\ No newline at end of file
......@@ -5,16 +5,19 @@
#include "../types/packet.h"
typedef struct inFlightPackets {
BPTreeNode* outstandingPackets;
uint32_t packetQueueSize;
BPTreeNode* outstandingPackets_by_seqno;
BPTreeNode* outstandingPackets_by_senttime;
} PrrtInFlightPacketStore;
PrrtInFlightPacketStore* PrrtInFlightPacketStore_create(void);
void PrrtInFlightPacketStore_add_outstanding_packet(PrrtInFlightPacketStore *packetStore, PrrtPacket *packet);
void PrrtInFlightPacketStore_remove_outstanding_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum);
void PrrtInFlightPacketStore_remove_outstanding_packet_by_seqno(PrrtInFlightPacketStore *packetStore,
BPTreeKey_t seqNum);
bool PrrtInFlightPacketStore_destroy(PrrtInFlightPacketStore *packetStore);
uint32_t PrrtInFlightPacketStore_get_queue_size(PrrtInFlightPacketStore *packetStore);
void* PrrtInFlightPacketStore_get_packet(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum);
void* PrrtInFlightPacketStore_get_packet_by_seqno(PrrtInFlightPacketStore *packetStore, BPTreeKey_t seqNum);
PrrtPacket * PrrtInFlightPacketStore_get_first_packet_before_seqno(PrrtInFlightPacketStore *packetStore,
prrtSequenceNumber_t seqNum);
prrtByteCount_t PrrtInFlightPacketStore_clear_before(PrrtInFlightPacketStore *packetStore, prrtTimestamp_t time);
......
......@@ -20,6 +20,7 @@ PrrtRepairBlockStore *PrrtRepairBlockStore_create(void)
bool PrrtRepairBlockStore_destroy(PrrtRepairBlockStore *store)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
List *blockList = List_create();
BPTree_get_range(store->blockTree, blockList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1));
while (List_count(blockList) > 0) {
......@@ -27,16 +28,19 @@ bool PrrtRepairBlockStore_destroy(PrrtRepairBlockStore *store)
store->blockTree = BPTree_delete(store->blockTree, block->baseSequenceNumber);
PrrtBlock_destroy(block);
}
List_destroy(blockList);
if(store->blockTree != NULL) {
store->blockTree = BPTree_destroy(store->blockTree);
}
pthread_mutex_destroy(&store->lock);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
check(pthread_mutex_destroy(&store->lock) == EXIT_SUCCESS, "Destroy lock failed.");
free(store);
return true;
error:
PERROR("PrrtRepairBlockStore_destroy() failed.");
return false;
}
bool PrrtRepairBlockStore_delete(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber)
......@@ -68,7 +72,7 @@ bool PrrtRepairBlockStore_insert(PrrtRepairBlockStore *store, PrrtBlock *block)
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Could not insert%s.","");
PERROR("Could not insert block with base sequence number %d.",block->baseSequenceNumber);
return false;
}
......
This diff is collapsed.
#ifndef PRRT_TIMER_H
#define PRRT_TIMER_H
#include <pthread.h>
#include <stdatomic.h>
#include "../util/futex.h"
#include "../util/list.h"
typedef void *prrtTimerTaskArg;
typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg);
typedef struct timespec prrtTimerDate;
typedef struct prrtTimerTask {
prrtTimerTaskFun fun;
prrtTimerTaskArg arg;
} PrrtTimerTask;
typedef struct prrtTimer PrrtTimer;
PrrtTimer *PrrtTimer_create(unsigned int core);
int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTimerTask *what);
void PrrtTimer_end(PrrtTimer *timer);
#endif // PRRT_TIMER_H
\ No newline at end of file
This diff is collapsed.
......@@ -11,11 +11,19 @@ typedef struct prrtBlock {
PrrtCodingConfiguration* codingParams;
prrtPacketLength_t largestPayloadLength;
prrtSequenceNumber_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
PrrtPacket **packets;
int dataPacketCount;
int redPacketCount;
bool isCoded;
bool isDecoded;
pthread_mutex_t lock;
PrrtCoder *coder;
uint16_t inRound;
uint8_t nextRedundancyPacket;
uint8_t nextDataPacket;
bool senderBlock;
} PrrtBlock;
......@@ -27,19 +35,22 @@ PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, pr
/**
* Frees the PrrtBlock data structure.
*/
bool PrrtBlock_destroy(PrrtBlock *block_ptr);
bool PrrtBlock_destroy(PrrtBlock *block);
bool PrrtBlock_insert_data_packet(PrrtBlock *block_ptr, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *packet);
bool PrrtBlock_encode_ready(PrrtBlock *block_ptr);
bool PrrtBlock_decode_ready(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
void PrrtBlock_print(PrrtBlock *block);
void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno);
bool PrrtBlock_decode(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_red_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_next_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_next_red_data(PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_packet(PrrtBlock *block_ptr, int index);
#endif //PRRT_BLOCK_H
#include <stdlib.h>
#include "channelStateInformation.h"
#include "../../defines.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../clock.h"
#include "../receiver.h"
#include "channelStateInformation.h"
PrrtChannelStateInformation * PrrtChannelStateInformation_create()
{
PrrtChannelStateInformation *csi = calloc(1, sizeof(PrrtChannelStateInformation));
check_mem(csi);
check(pthread_mutex_init(&csi->lock, NULL) == 0, "Mutex init failed.");
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&csi->lock, &attr) == 0, "Mutex init failed.");
csi->rtprop = TIMESTAMP_SPACE - 1;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
csi->btlbw = 0;
csi->btlbw_next_round_delivered = 0;
csi->btlbw_round_start = false;
csi->btlbw_round_count = 0;
csi->btlbw_filter_length = 10;
csi->appLimited = 0;
csi->plr = 0.0;
return csi;
......@@ -63,33 +57,6 @@ prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInforma
return 0;
}
// RTprop
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
{
check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
prrtTimestamp_t now = PrrtClock_get_current_time_us();
csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
csi->rtprop = rtprop;
csi->rtprop_stamp = now;
}
check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
return;
error:
PERROR("PrrtChannelStateInformation_update_rtprop() failed.");
}
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
{
check(pthread_mutex_lock(&csi->lock) == EXIT_SUCCESS, "Lock failed.");
prrtAtomicTimedelta_t res = csi->rtprop;
check(pthread_mutex_unlock(&csi->lock) == EXIT_SUCCESS, "Unlock failed.");
return (prrtTimedelta_t) res;
error:
PERROR("PrrtChannelStateInformation_get_rtprop() failed.");
return 0;
}
// Delivery Rate
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate) {
atomic_store_explicit(&csi->deliveryRate, rate, memory_order_release);
......@@ -98,18 +65,3 @@ void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformatio
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
return (prrtDeliveryRate_t) atomic_load_explicit(&csi->deliveryRate, memory_order_acquire);
}
// App Limited
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
atomic_store_explicit(&csi->appLimited, appLimited, memory_order_release);
}
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi) {
return atomic_load_explicit(&csi->appLimited, memory_order_acquire);
}
// BtlBw
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {
return (prrtDeliveryRate_t) atomic_load_explicit(&csi->btlbw, memory_order_acquire);
}
......@@ -2,43 +2,25 @@
#define PRRT_CHANNELSTATEINFORMATION_H
#include <stdbool.h>
#include "rateSample.h"
#include "packet.h"
typedef struct prrtChannelStateInformation {
pthread_mutex_t lock;
prrtAtomicTimedelta_t rtprop;
prrtTimestamp_t rtprop_stamp;
prrtTimedelta_t rtprop_filter_length_us;
bool rtprop_expired;
prrtPacketLossRate_t plr;
prrtAtomicDeliveryRate_t deliveryRate;
prrtAtomicDeliveryRate_t btlbw;
prrtByteCount_t btlbw_next_round_delivered;
bool btlbw_round_start;
uint32_t btlbw_round_count;
uint8_t btlbw_filter_length;
atomic_bool appLimited;
} PrrtChannelStateInformation;
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop);
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets);
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, prrtDeliveryRate_t rate);
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation* csi);
#endif //PRRT_CHANNELSTATEINFORMATION_H
......@@ -202,6 +202,7 @@ void *encode_data_header(void *buf_ptr, const void *payload) {
PrrtPacketField_encode(data_payload, buf_ptr, prrtTimestamp_t, timestamp, htonl);
PrrtPacketField_encode(data_payload, buf_ptr, prrtTimedelta_t, groupRTprop_us, htonl);
PrrtPacketField_encode(data_payload, buf_ptr, prrtTimestamp_t, packetTimeout_us, htonl);
PrrtPacketField_encode(data_payload, buf_ptr, prrtDeliveryRate_t, btlbw, htonl);
return buf_ptr;
}
......@@ -299,6 +300,7 @@ void *decode_data_header(void *dstBuffer, const void *srcBuffer) {
PrrtPacketField_decode(data_payload, dstBuffer, prrtTimestamp_t, timestamp, ntohl);
PrrtPacketField_decode(data_payload, dstBuffer, prrtTimedelta_t, groupRTprop_us, ntohl);
PrrtPacketField_decode(data_payload, dstBuffer, prrtTimedelta_t, packetTimeout_us, ntohl);
PrrtPacketField_decode(data_payload, dstBuffer, prrtDeliveryRate_t, btlbw, ntohl);
return dstBuffer;
}
......
......@@ -49,6 +49,7 @@ typedef struct prrtPacket {
prrtIndex_t index;
prrtSequenceNumber_t sequenceNumber;
void *payload;
struct timespec channelReceive;
prrtPacketLength_t payloadLength;
// Packet State field that are not put into the packet on the wire
......@@ -71,13 +72,15 @@ typedef struct prrtPacketDataPayload {
prrtTimestamp_t timestamp;
prrtTimedelta_t groupRTprop_us;
prrtTimestamp_t packetTimeout_us;
prrtDeliveryRate_t btlbw;
} __attribute__((packed)) PrrtPacketDataPayload;
#define PRRT_PACKET_DATA_HEADER_SIZE (\
sizeof(prrtPacketLength_t) + \
sizeof(prrtTimestamp_t) + \
sizeof(prrtTimedelta_t) + \
sizeof(prrtTimestamp_t) )
sizeof(prrtTimestamp_t) + \
sizeof(prrtDeliveryRate_t))
typedef struct prrtPacketRedundancyPayload {
prrtSequenceNumber_t baseSequenceNumber;
......
#include "packetTracking.h"
#ifndef PRRT_PACKETTRACKING_H
#define PRRT_PACKETTRACKING_H
#include "packet.h"
typedef struct packetTracking {
prrtByteCount_t pipe;
prrtByteCount_t delivered;
prrtTimestamp_t delivered_time;
prrtTimestamp_t first_sent_time;
prrtByteCount_t app_limited;
prrtByteCount_t bytes_lost;
prrtByteCount_t prior_inflight;
} PrrtPacketTracking;
#endif //PRRT_PACKETTRACKING_H
#include "rateSample.h"
#ifndef PRRT_RATESAMPLE_H
#define PRRT_RATESAMPLE_H
#include "packet.h"
typedef struct prrtRateSample {
prrtByteCount_t prior_delivered;
prrtTimestamp_t prior_time;
prrtTimedelta_t send_elapsed;
prrtTimedelta_t ack_elapsed;
prrtTimedelta_t interval;
prrtByteCount_t delivered;
bool is_app_limited;
prrtDeliveryRate_t delivery_rate; // bps
} PrrtRateSample;
#endif //PRRT_RATESAMPLE_H
......@@ -16,6 +16,9 @@ class TimeoutException(Exception):
def __init__(self):
self.message = "The call timed out."
class PayloadTooBigException(Exception):
pass
cdef extern from "proto/stores/dataPacketStore.c":
pass
......@@ -58,6 +61,15 @@ cdef extern from "proto/processes/dataTransmitter.c":
cdef extern from "proto/clock.c":
pass
cdef extern from "proto/bbr.c":
pass
cdef extern from "proto/clock.c":
pass
cdef extern from "proto/timer.c":
pass
cdef extern from "proto/vdmcode/block_code.c":
pass
......@@ -100,12 +112,12 @@ cdef extern from "util/time.c":
cdef extern from "util/mpsc_queue.c":
pass
cdef extern from "util/windowedFilter.c":
pass
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port))
class PayloadTooBigException(Exception):
pass
class PrrtCodingConfiguration:
def __init__(self, n, k, n_cycle=None):
if n < k:
......@@ -162,6 +174,7 @@ cdef class PrrtSocket:
def __get__(self):
return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
# Application Properties
property target_delay:
def __get__(self):
......@@ -209,10 +222,19 @@ cdef class PrrtSocket:
maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
data_len = len(data)
if len(data) <= maximum_payload_size:
cprrt.PrrtSocket_send(self._c_socket, data, data_len)
cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
else:
raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))