Commit d97830b8 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

merge develop

parent 08b1d0e6
Pipeline #3377 passed with stages
in 9 minutes and 4 seconds
......@@ -9,3 +9,4 @@ tests/__pycache__/
MANIFEST
prrt.cpython*.so
prrt.so
.ipynb_checkpoints/
......@@ -22,8 +22,8 @@ build:prrt:
- which cmake
- which gcc
- which g++
- pip3 list | grep Cython
- pip3 list | grep numpy
- pip3 list --format=legacy | grep Cython
- pip3 list --format=legacy | grep numpy
- CC=gcc-5 CXX=g++-5 cmake . -DPRRT_TESTS=1
- make
......@@ -105,4 +105,4 @@ deploy:profile:
paths:
- gprof-send.txt
- gprof-recv.txt
expire_in: 30 days
\ No newline at end of file
expire_in: 30 days
......@@ -36,4 +36,4 @@ if(PRRT_TESTS)
endif()
add_custom_target(perftest COMMAND python3 tests/eval.py)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
\ No newline at end of file
......@@ -5,12 +5,13 @@
## Features
* Hybrid error control (FEC + ARQ) using systematic Vandermonde codes
* Congestion control using a variant of [BBR](https://groups.google.com/forum/#!forum/bbr-dev)
* Congestion control and pacing using a variant of [BBR](https://groups.google.com/forum/#!forum/bbr-dev)
* Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](http://xlap.larn.systems)
* Wireshark dissector written in Lua
* [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping)
## Installation
......
......@@ -7,8 +7,6 @@ ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
iperf3 \
psmisc \
traceroute \
tshark
......
......@@ -7,8 +7,6 @@ ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && apt-get install --yes --force-yes \
bc \
cmake \
iperf3 \
psmisc \
traceroute \
tshark
......
......@@ -81,4 +81,6 @@ dur=$(echo "$(date +%s.%N) - $start" | bc);
printf "Done after %.6f seconds\n" $dur
#tc qdisc del dev $dev root
kill $TSHARK_PID
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.read_csv(\"eval.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
import sys
sys.path.insert(0, "./build")
import tests.perf as perf
def run_setup_and_report(setup):
results = setup.run()
print("Setup:\n ", setup)
print("Results:\n " + str(results).replace("\n","\n "))
results.export()
if __name__ == "__main__":
# Packet Count Works: 2^15; Segfault at: 2^16
# TODO: support multiple tests via proper socket termination
setups = [
perf.TestSetup(packets=2**17,delay=1,loss=0,reorder=0,duplicate=0)
]
for setup in setups:
run_setup_and_report(setup)
......@@ -3,7 +3,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(("127.0.0.1", port))
s = prrt.PrrtSocket(("0.0.0.0", port))
while True:
d, addr = s.recv()
......
......@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport), maximum_payload_size=150)
s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port))
for i in range(10):
......
......@@ -25,11 +25,14 @@ add_executable(receiver receiver.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(refcount LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
install(DIRECTORY ./ DESTINATION include/prrt
FILES_MATCHING PATTERN "*.h")
......@@ -120,7 +120,7 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
......
......@@ -28,6 +28,19 @@ if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif()
add_library(PRRT ${PRRT_SOURCES})
add_library(prrt_obj OBJECT ${PRRT_SOURCES})
set_property(TARGET prrt_obj PROPERTY POSITION_INDEPENDENT_CODE 1)
target_link_libraries(PRRT rt)
add_library(prrt_static STATIC $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_static rt)
set_target_properties(prrt_static PROPERTIES OUTPUT_NAME "prrt")
add_library(prrt_shared SHARED $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_shared rt)
set_target_properties(prrt_shared PROPERTIES OUTPUT_NAME "prrt")
install(TARGETS prrt_static prrt_shared
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#include <sys/time.h>
#include <stdlib.h>
#include "../util/time.h"
#include "../util/common.h"
#include "clock.h"
......@@ -19,50 +20,45 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock)
prrtTimestamp_t lastMeasurement = clock->lastMeasurement;
prrtTimeDifference_t diff = diff_abs_ts(currentTime, lastMeasurement);
prrtTimeDifference_t skew = (diff * clock->skew) / 400;
return (prrtTimestamp_t) virtualTime + clock->meanDeviation + skew;
return (prrtTimestamp_t) (virtualTime + clock->meanDeviation + skew);
} else {
return currentTime;
}
}
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t propagation_delay)
{
prrtTimestamp_t virtualTime = PrrtClock_get_prrt_time_us(clock);
prrtTimestamp_t currentTime = PrrtClock_get_current_time_us();
prrtTimestamp_t virtualTime = clock->virtualTime;
prrtTimeDifference_t clockSkew = clock->skew;
prrtTimedelta_t delay = rtt/2; // half the rtt
prrtTimeDifference_t phaseError = referenceTime - virtualTime + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13d, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
int64_t clockSkew = clock->skew;
prrtTimedelta_t delay = propagation_delay / 2; // half the rtt
int64_t phaseError = PrrtTimestamp_cmp(referenceTime, virtualTime) + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13ld, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
if(abs(phaseError) > 10000) {
if(labs(phaseError) > 10000) {
clock->meanDeviation = 0;
clock->skew = 0;
clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime;
}
prrtTimeDifference_t deviationSum = (prrtTimeDifference_t) (phaseError + 3 * clock->meanDeviation); // TODO: why 3?
int64_t deviationSum =(phaseError + 3 * clock->meanDeviation); // TODO: why 3?
prrtTimeDifference_t meanDeviation = (prrtTimeDifference_t) (deviationSum / 4); // TODO: find out why???
prrtTimeDifference_t period = diff_abs_ts(currentTime, clock->lastMeasurement);
int64_t period = PrrtTimestamp_cmp(currentTime, clock->lastMeasurement);
if(period > 0) {
clockSkew = (meanDeviation / period) + 15 * clock->skew;
clockSkew = (prrtTimeDifference_t) (clockSkew / 16);
}
virtualTime = virtualTime + meanDeviation + period * clockSkew / 400;
virtualTime = clock->virtualTime + meanDeviation + period * clockSkew / 400;
clock->meanDeviation = meanDeviation;
clock->skew = clockSkew;
clock->skew = (prrtTimeDifference_t) clockSkew;
clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime;
if(abs(phaseError) > 10000) {
clock->virtualTime = currentTime;
}
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
return true;
}
......
......@@ -26,7 +26,7 @@ prrtTimestamp_t PrrtClock_get_current_time_us(void);
prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t propagation_delay);
#define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB))
......
......@@ -83,18 +83,6 @@ static bool send_feedback(PrrtSocket *sock_ptr,
.sentTime = sentTimestamp
};
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
(sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));
......@@ -115,7 +103,7 @@ static bool send_feedback(PrrtSocket *sock_ptr,
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &remote, sizeof(remote)) ==
length, "Sending feedback failed.");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(buf);
......
......@@ -107,7 +107,6 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
s->senderChannelStateInformation = PrrtChannelStateInformation_create();
check(target_delay_us < HALF_TIMESTAMP, "Specify target delay between 0 and %i", HALF_TIMESTAMP-1)
s->applicationConstraints = PrrtApplicationConstraints_create(target_delay_us);
......@@ -122,8 +121,6 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
"Socket option set failed.");
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
s->sendDataQueue = Pipe_create();
s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
......@@ -169,7 +166,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa
return false;
}
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
check(port <= 65534, "Port %d cannot be bound to.", port);
size_t size = sizeof(struct sockaddr_in);
......@@ -233,17 +230,20 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveDataThreadAttr, 2);
check(pin_thread_to_core(s->receiveDataThreadAttr, 2) == EXIT_SUCCESS, "Cannot pin receive-data thread to core 2.");
}
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
(void *) s) == EXIT_SUCCESS, "Cannot create data receiving thread.");
s->isBound = true;
return true;
return 0;
error:
PrrtSocket_close(s);
return false;
if (h_errno) {
return h_errno;
}
return -2;
}
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
......@@ -251,7 +251,7 @@ bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
s->receiver = PrrtReceiver_create(host, port, s->maximum_payload_size);
s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
if(s->isThreadPinning) {
pin_thread_to_core(s->sendDataThreadAttr, 1);
check(pin_thread_to_core(s->sendDataThreadAttr, 1) == EXIT_SUCCESS, "Cannot pin send-data thread to core 1.");
}
pthread_attr_init(s->sendDataThreadAttr);
......@@ -649,9 +649,6 @@ bool PrrtSocket_cleanup(PrrtSocket *s) {
PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);
PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
lastSequenceNumberBase);
List *list = List_create();
PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
last->sequenceNumber);
......@@ -763,6 +760,7 @@ uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
}
PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
PrrtCoder_get_n(s->coder) != codingParams->n) {
......
......@@ -111,7 +111,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s);
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value);
......
......@@ -177,12 +177,20 @@ cdef class PrrtSocket:
self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
if not cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port):
h_errno = cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
if h_errno != 0:
# PrrtSocket_bind calls PrrtSocket_close on error
# so we need to set _c_socket to NULL because otherwise __dealloc__
# will attempt to call PrrtSocket_close again on the closed socket
self._c_socket = NULL
raise ValueError("PrrtSocket_bind failed")
# TODO: use hstrerror() instead
raise ValueError("PrrtSocket_bind failed: " + ({
1: 'host not found.',
2: 'try again.',
3: 'no recovery.',
4: 'no data.',
-1: 'netdb internal error.',
}).get(h_errno, 'unknown error.'))
# Channel Properties
property data_rate_btl_fwd:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment