Commit ab2b00fd authored by Andreas Schmidt's avatar Andreas Schmidt

Merge branch 'develop'

parents 8293e674 d8ad1570
Pipeline #2321 passed with stages
in 1 minute and 26 seconds
......@@ -54,7 +54,7 @@ test:prrt_functional:
- bash
- exec ./bin/prrtTests
- exec ./prrtTests
stage: test
[submodule "prrt/xlap"]
path = prrt/xlap
url = ../../as/X-Lap.git
url = ../X-Lap.git
......@@ -3,11 +3,11 @@ project (PRRT)
option(PRRT_TESTS "Build tests" OFF)
set(CMAKE_C_FLAGS "-O2 -Wall -std=gnu11 -D_GNU_SOURCE" )
set(CMAKE_C_FLAGS "-O2 -Wall -std=gnu11 -D_GNU_SOURCE -fPIC" )
set(CMAKE_CXX_FLAGS "-fstack-protector -fstack-protector-all -Wall -std=gnu++11 -D_GNU_SOURCE" )
set(CMAKE_CXX_FLAGS_DEBUG "-O2 -Wall -ggdb" )
......@@ -8,6 +8,7 @@
* Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](
* [Hardware timestamping support](
......@@ -25,7 +26,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
......@@ -44,7 +45,7 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=port)
s.connect(host, port)
for i in range(10):
PRRT Dissector for Wireshark
* Download Wireshark Source Code:
apt-get install qt-sdk libgtk-3-dev libpcap-dev
tar xvjf wireshark-1.10.2.tar.bz2
* Copy `packet-prrt.c` to `epan/dissectors`.
* Add a line `packet-prrt.c \` in `DISSECTOR_SRC` of `epan/dissectors/Makefile.common`.
* Call make in the root folder. You can now start wireshark using `./wireshark`.
This tutorial will show you how to apply the dissector of PRRT in your wireshark.
What our PRRT dissector can do?
Our PRRT dissector can dissector the following PRRT packets: data, repeated data, redundancy, pre-sent redundancy, feedback and channel feedback. By now it can basically display the header info of PRRT packets. The sophistical display and advanced features will be supported later.
How to use it?
To use this dissector in the wireshark basically you need to perform the following steps:
0. Download the source code of wireshark and extract it.
1. Copy our dissector code (packet-prrt.c) to the directory epan/dissectors/ of the source code.
2. Add a line "packet-prrt.c \" in DISSECTOR_SRC of epan/dissectors/Makefile.common, so that we can compile our dissector code.
3. This step is optional and for advanced usage.
If you want our dissector to call subdissectors for some purposes (e.g. using mp2t to parse the payload as MPEG TS.), you need to register the subdissector by adding the following _pattern_ line in the handoff function of the subdissector:
heur_dissector_add("prrt", heuristic_dissect_method, protocol_id);
e.g. add the line in the function proto_reg_handoff_mp2t() of packet-mp2t.c for parsing PRRT payload as MPEG TS.
heur_dissector_add("prrt", heur_dissect_mp2t, proto_mp2t);
NOTE : The subdissector called by PRRT dissector at the moment must support heuristic dissecting.
NOTE : By now we provide two dissector files respectively for two wireshark versions (1.4.6 and 1.6.1), because different version of wireshark provides different methods used in the dissector code. Even the dissector code for the version 1.4.6 is compatible with wireshark 1.6.1, we still recommand you use the dissector code for the version 1.6.1 and wireshark 1.6.1, since the later version of wireshark would not support the old method any more.
The following is fully commands under a specific scenario (see the prerequisite), you may follow, to install wireshark and apply our dissector code. For installation you can choose 1.a or 1.b depending on the version of wireshark you prefer to.
0. Prerequisite :
+ Linux OS : ubuntu 10.04 or later.
+ wireshark 1.4.6 or wireshark 1.6.1.
+ libgtk2.0-dev, bison, automake1.9 and libpcap. (Maybe more dependencies required.)
1.a Installation (wireshark 1.4.6)
sudo apt-get source wireshark-dev
sudo chown -R your_account:your_account wireshark-1.4.6/
mkdir wireshark-installed
svn co
cp prrt-dissector/wireshark-1.4.6/packet-prrt.c wireshark-1.4.6/epan/dissectors/
cd wireshark-1.4.6
./configure --prefix=$PWD/../wireshark-installed/
make -j 2
make install
2.b Installation (wireshark 1.6.1)
download wireshark 1.6.1 from the
tar -xjvf wireshark-1.6.1.tar.bz2
mkdir wireshark-installed
svn co
cp prrt-dissector/wireshark-1.6.1/packet-prrt.c wireshark-1.6.1/epan/dissectors/
cd wireshark-1.6.1
./configure --prefix=$PWD/../wireshark-installed/
make -j 2
make install
3. Running the wireshark
sudo ../wireshark-installed/bin/wireshark
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -37,5 +37,5 @@ NETEM_PARAMS="${NETEM[@]}"
echo "Running PRRT with command: \"$command $PRRT_PARAMS\" and link parameters: \"$NETEM_PARAMS\""
tc qdisc add dev $dev root netem $NETEM_PARAMS
/prrt/bin/$command $PRRT_PARAMS -o /output/log.csv
/prrt/$command $PRRT_PARAMS -o /output/log.csv
tc qdisc del dev $dev root
......@@ -3,11 +3,12 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(("", port))
while True:
d = s.recv()
d, addr = s.recv()
d = d.decode("utf8")
if d != "Close":
print d
print(d, addr)
......@@ -3,10 +3,11 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port, isSender=True)
s.connect(host, port)
s = prrt.PrrtSocket(("", localport))
s.connect((host, port))
for i in range(10):
s.send("Packet {}".format(i))
s.send("Packet {}".format(i).encode("utf8"))
......@@ -15,7 +15,7 @@ add_subdirectory(proto)
add_executable(sender sender.c)
add_executable(receiver receiver.c)
add_executable(receiver receiver.c ../tests/common.h)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
include "posix/time.pxd"
include "sockets.pxd"
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.string cimport const_char
cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t:
......@@ -24,13 +28,17 @@ cdef extern from "proto/channelStateInformation.h":
ctypedef prrtChannelStateInformation PrrtChannelStateInformation
cdef extern from "proto/codingParams.h":
cdef struct prrtCodingParams:
uint8_t k;
uint8_t r;
uint8_t n;
uint8_t n_p;
ctypedef prrtCodingParams PrrtCodingParams
ctypedef struct prrtCodingConfiguration:
uint8_t k
uint8_t r
uint8_t n
uint8_t c
uint8_t *n_cycle
ctypedef prrtCodingConfiguration PrrtCodingConfiguration
PrrtCodingConfiguration *PrrtCodingConfiguration_create()
PrrtCodingConfiguration *PrrtCodingConfiguration_copy(PrrtCodingConfiguration *cpar)
bint PrrtCodingConfiguration_destroy(PrrtCodingConfiguration *cpar)
cdef extern from "util/list.h":
cdef struct list:
......@@ -55,7 +63,7 @@ cdef extern from "proto/block.h":
cdef struct prrtBlock:
uint32_t data_count
uint32_t redundancy_count
PrrtCodingParams coding_params
PrrtCodingConfiguration coding_params
uint32_t largest_data_length
uint16_t baseSequenceNumber
List* data_blocks
......@@ -64,7 +72,7 @@ cdef extern from "proto/block.h":
ctypedef prrtBlock PrrtBlock
cdef extern from "proto/packet.h":
cdef extern from "proto/types/packet.h":
cdef struct prrtPacket:
uint8_t type_priority;
uint8_t index;
......@@ -74,6 +82,8 @@ cdef extern from "proto/packet.h":
ctypedef prrtPacket PrrtPacket
ctypedef uint32_t prrtTimedelta_t;
cdef extern from "proto/receiver.h":
ctypedef struct PrrtReceiver:
const char* host_name
......@@ -85,23 +95,21 @@ cdef extern from "proto/receiver.h":
cdef extern from "proto/socket.h":
cdef struct prrtSocket:
int dataSocketFd
int feedbackSocketFd
pthread_t receiveFeedbackThread
int socketFd
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
pthread_cond_t outQueueFilledCv
Pipe* sendDataQueue
MPSCQueue* sendDataQueue
pthread_t receiveDataThread
pthread_mutex_t inQueueFilledMutex
pthread_cond_t inQueueFilledMutexCv
PrrtReceiveDataQueue *receiveDataQueue
PrrtPacketDeliveryStore *packetDeliveryStore
BPTreeNode* dataStore
List* receivers
PrrtReceiver* receiver
uint16_t packetsCount
uint16_t sequenceNumberSource
......@@ -111,25 +119,43 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(bint isSender, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_recv_sync(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t time_window_us) nogil
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) nogil
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
bint PrrtSocket_set_coding_parameters(PrrtSocket *s, uint8_t k, uint8_t n, uint8_t c, uint8_t *n_cycle)
PrrtCodingConfiguration *PrrtSocket_get_coding_parameters(PrrtSocket *s)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
uint32_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
char *PrrtSocket_inet_ntoa(in_addr*)
uint16_t PrrtSocket_ntohs(uint16_t v)
cdef extern from "proto/stores/receiveDataQueue.h":
ctypedef struct PrrtReceiveDataQueue:
cdef extern from "proto/stores/packetDeliveryStore.h":
ctypedef struct PrrtPacketDeliveryStore:
cdef extern from "util/bptree.h":
......@@ -145,4 +171,9 @@ cdef extern from "util/bptree.h":
cdef extern from "util/pipe.h":
ctypedef struct Pipe:
\ No newline at end of file
cdef extern from "util/mpsc_queue.h":
ctypedef struct MPSCQueue:
#if defined __arm__
# define __builtin_ia32_rdtsc() (0)
#ifndef __cplusplus
# include <stdatomic.h>
# include <atomic>
# define _Atomic(X) std::atomic< X >
#ifndef MAX
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
......@@ -20,6 +36,8 @@
#define RRT_ALPHA 0.125
#define MIN_RTT 300
// Uncomment the line below if you are compiling on Windows.
// #define WINDOWS
#include <stdio.h>
add_library(PRRT ../defines.h
set (PRRT_SOURCES ../defines.h
block.c block.h
channelStateInformation.c channelStateInformation.h
clock.c clock.h
codingParams.c codingParams.h
packet.c packet.h
receiver.c receiver.h
socket.c socket.h
../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h
vdmcode/block_code.c vdmcode/block_code.h
stores/forwardPacketTable.c stores/forwardPacketTable.h
stores/lossGatherer.c stores/lossGatherer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
stores/repairBlockStore.c stores/repairBlockStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h
stores/deliveredPacketTable.c stores/deliveredPacketTable.h
stores/inFlightPacketStore.c stores/inFlightPacketStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/packetDeliveryStore.c stores/packetDeliveryStore.h
stores/receptionTable.c stores/receptionTable.h
stores/repairBlockStore.c stores/repairBlockStore.h
types/packetTimeout.c types/packetTimeout.h
stores/receiveDataQueue.c stores/receiveDataQueue.h)
types/lossStatistics.c types/lossStatistics.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
add_library(PRRT ${PRRT_SOURCES})
target_link_libraries(PRRT rt)
#include "packet.h"
#include "types/packet.h"
typedef struct applicationConstraints {
prrtTimedelta_t targetDelay_us;
......@@ -3,7 +3,7 @@
#include "../util/list.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "packet.h"
#include "types/packet.h"
#include "block.h"
static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int16_t *idx_p)
......@@ -57,6 +57,14 @@ bool PrrtBlock_destroy(PrrtBlock *block_ptr)
if(block_ptr->coder != NULL) {
if(block_ptr->codingParams != NULL) {
......@@ -68,17 +76,17 @@ bool PrrtBlock_destroy(PrrtBlock *block_ptr)
return false;
PrrtBlock * PrrtBlock_create(PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber)
PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, prrtSequenceNumber_t baseSequenceNumber)
PrrtBlock *block_ptr = calloc(1, sizeof(PrrtBlock));
block_ptr->coder = coder;
block_ptr->codingParams = cpar;
block_ptr->dataPackets = List_create();
block_ptr->redundancyPackets = List_create();
block_ptr->baseSequenceNumber = baseSequenceNumber;
block_ptr->largestPayloadLength = 0;
block_ptr->coder = NULL;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
......@@ -184,8 +192,6 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
gf **src = calloc(k, sizeof(gf *));
......@@ -203,7 +209,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
for(j = 0; j < r; j++) {
fec[j] = calloc(length, sizeof(gf));
PrrtCoder_encode(coder, src, fec[j], j + k, length);
PrrtCoder_encode(block_ptr->coder, src, fec[j], j + k, length);
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), block_ptr->baseSequenceNumber,
......@@ -237,8 +243,6 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
prrtSequenceNumber_t baseSequenceNumber = block_ptr->baseSequenceNumber;
prrtPacketLength_t length = block_ptr->largestPayloadLength;
PrrtCoder *coder = block_ptr->codingParams->coder;
fec = calloc(k, sizeof(gf *));
for(i = 0; i < k; i++) {
......@@ -254,7 +258,7 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
gather_data_packets(block_ptr, fec, idx_p);
gather_redundancy_packets(block_ptr, fec, idx_p);
check(PrrtCoder_decode(coder, fec, idx_p, length) == EXIT_SUCCESS, "Could not decode current block.");
check(PrrtCoder_decode(block_ptr->coder, fec, idx_p, length) == EXIT_SUCCESS, "Could not decode current block.");
for(j = 0; j < k; j++) {
if(idx_p[j] >= k) {
......@@ -3,26 +3,26 @@
#include <stdbool.h>
#include "codingParams.h"
#include "packet.h"
#include "types/packet.h"
#include "../util/list.h"
#include "vdmcode/block_code.h"
typedef struct prrtBlock {
PrrtCodingParams* codingParams;
PrrtCodingConfiguration* codingParams;
prrtPacketLength_t largestPayloadLength;
prrtSequenceNumber_t baseSequenceNumber;
bool isCoded;
PrrtCoder *coder;
pthread_mutex_t lock;
PrrtCoder *coder;
} PrrtBlock;
* Allocate space for a block.
PrrtBlock * PrrtBlock_create(PrrtCodingParams *cpar, prrtSequenceNumber_t baseSequenceNumber);
PrrtBlock * PrrtBlock_create(PrrtCodingConfiguration *cpar, PrrtCoder* coder, prrtSequenceNumber_t baseSequenceNumber);
* Frees the PrrtBlock data structure.
......@@ -3,6 +3,8 @@
#include "../util/common.h"
#include "../util/dbg.h"
#include "channelStateInformation.h"
#include "clock.h"
#include "receiver.h"
PrrtChannelStateInformation * PrrtChannelStateInformation_create()
......@@ -11,8 +13,18 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
check(pthread_mutex_init(&csi->lock, NULL) == 0, "Mutex init failed.");
csi->rttMean = 0;
csi->rttDev = 0;
csi->rtprop = TIMESTAMP_SPACE;
csi->rtprop_filter_length_us = 2 * 1000 * 1000; // 2 seconds
csi->deliveryRate = 0;
csi->btlbw = 0;
csi->btlbw_next_round_delivered = 0;
csi->btlbw_round_start = false;
csi->btlbw_round_count = 0;
csi->btlbw_filter_length = 10;
csi->appLimited = 0;
csi->plr = 0.0;
return csi;
......@@ -20,31 +32,41 @@ PrrtChannelStateInformation * PrrtChannelStateInformation_create()
return NULL;
void PrrtChannelStateInformation_update_rtt(PrrtChannelStateInformation *csi, prrtTimedelta_t rtt)
void PrrtChannelStateInformation_update_rtprop(PrrtChannelStateInformation *csi, prrtTimedelta_t rtprop)
int32_t delta = (int32_t) rtt - (int32_t) csi->rttMean;
// TODO: ensure that there are no arithemtic problems via rounding etc.
csi->rttMean = (prrtTimedelta_t) (csi->rttMean + RRT_ALPHA * delta);
csi->rttDev = (prrtTimedelta_t) (csi->rttDev + RRT_ALPHA * (labs(delta) - csi->rttDev));
prrtTimestamp_t now = PrrtClock_get_current_time_us();
csi->rtprop_expired = now > (csi->rtprop_stamp + csi->rtprop_filter_length_us);
if (rtprop >= 0 && (rtprop <= csi->rtprop || csi->rtprop_expired)) {
csi->rtprop = rtprop;
csi->rtprop_stamp = now;
void PrrtChannelStateInformation_print(PrrtChannelStateInformation *csi)
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
prrtSequenceNumber_t packets) {
check(csi != NULL, "Input should not be NULL.");
csi->plr = ((float) erasures) / packets;
PERROR("Should not happen.%s","");
void PrrtChannelStateInformation_update_delivery_rate(PrrtChannelStateInformation *csi, PrrtPacket* packet, prrtDeliveryRate_t rate) {
csi->deliveryRate = rate;
void PrrtChannelStateInformation_update_app_limited(PrrtChannelStateInformation *csi, bool appLimited) {
csi->appLimited = appLimited;
prrtTimedelta_t PrrtChannelStateInformation_get_rtt(PrrtChannelStateInformation *csi)
prrtTimedelta_t PrrtChannelStateInformation_get_rtprop(PrrtChannelStateInformation *csi)
prrtTimedelta_t res = csi->rttMean;
prrtTimedelta_t res = csi->rtprop;
return res;
......@@ -57,3 +79,19 @@ bool PrrtChannelStateInformation_destroy(PrrtChannelStateInformation *csi)
return false;
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation *csi) {
return csi->plr;
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi) {
return csi->deliveryRate;
prrtDeliveryRate_t PrrtChannelStateInformation_get_btlbw(PrrtChannelStateInformation *csi) {