Commit bc42bc24 authored by Andreas Schmidt's avatar Andreas Schmidt

Merge branch 'develop' into feature/congestionControl

parents 733996d1 a9f17210
Pipeline #2439 failed with stages
in 16 seconds
......@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport))
s = prrt.PrrtSocket(("127.0.1.1", localport), mtu=150)
s.connect((host, port))
for i in range(10):
......
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
......@@ -119,7 +119,7 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
......
add_library(PRRT ../defines.h
set (PRRT_SOURCES ../defines.h
block.c block.h
bbr.c bbr.h
channelStateInformation.c channelStateInformation.h
......@@ -6,7 +6,6 @@ add_library(PRRT ../defines.h
codingParams.c codingParams.h
receiver.c receiver.h
socket.c socket.h
../xlap/xlap.c ../xlap/xlap.h
applicationConstraints.c applicationConstraints.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
......@@ -22,4 +21,10 @@ add_library(PRRT ../defines.h
types/packet.c types/packet.h
vdmcode/block_code.c vdmcode/block_code.h types/packetTracking.c types/packetTracking.h types/rateSample.c types/rateSample.h)
if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif()
add_library(PRRT ${PRRT_SOURCES})
target_link_libraries(PRRT rt)
This diff is collapsed.
#include <malloc.h>
#include <math.h>
#include "../xlap/xlap.h"
#include "xlap.h"
#include "../util/common.h"
#include "../util/dbg.h"
#include "stores/inFlightPacketStore.h"
......
......@@ -56,14 +56,14 @@ struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
return deadline;
}
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us) {
PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
check_mem(s);
s->nextSendTime = 0;
s->pacingEnabled = true;
s->mtu = mtu;
s->isHardwareTimestamping = false;
s->interfaceName = NULL;
......@@ -230,6 +230,10 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
}
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
if (data_len > s->mtu) {
PERROR("Data to be sent (%d bytes) is too long, as MTU is %d.\n", data_len, s->mtu);
return -1;
}
XlapTimestampPlaceholder tsph;
XlapTimestampPlaceholderInitialize(&tsph);
XlapTimeStampClock(&tsph, ts_any_packet, 0, PrrtSendStart);
......@@ -439,8 +443,10 @@ int PrrtSocket_close(PrrtSocket *s) {
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
if (strcmp(name, "targetdelay") == 0) {
return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
} else if (strcmp(name, "mtu") == 0) {
return s->mtu;
} else {
PERROR("Unknwon property %s", name);
PERROR("Unknown property %s\n", name);
return 0;
}
}
......
......@@ -17,7 +17,7 @@
#include "types/lossStatistics.h"
#include "types/packet.h"
#include "clock.h"
#include "../xlap/xlap.h"
#include "xlap.h"
#include "receiver.h"
#ifndef SIOCSHWTSTAMP
......@@ -85,10 +85,11 @@ typedef struct prrtSocket {
PrrtChannelStateInformation* senderChannelStateInformation;
atomic_bool isThreadPinning;
prrtByteCount_t mtu;
} PrrtSocket;
PrrtSocket *PrrtSocket_create(prrtTimedelta_t target_delay_us);
PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_us);
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name);
......
......@@ -29,16 +29,18 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (!packet && !atomic_load_explicit(&q->closing, memory_order_acquire)) {
do {
int res = pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
if (res == ETIMEDOUT) {
errno = ETIMEDOUT;
return NULL;
break;
} else {
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (atomic_load_explicit(&q->closing, memory_order_acquire)) {
break;
}
}
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
}
} while(!packet);
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
return packet;
......
#ifndef PRRT_XLAP_H
#define PRRT_XLAP_H
#include "pp.h"
#ifdef XLAP
#include "../xlap/xlap.h"
#else
#define TIMESTAMP_ID_LIST \
PrrtSendStart, \
PrrtSendEnd, \
PrrtSubmitPackage, \
PrrtEncodeStart, \
PrrtEncodeEnd, \
PrrtTransmitStart, \
PrrtTransmitEnd, \
LinkTransmitStart, \
LinkTransmitEnd, \
ChannelTransmit, \
ChannelReceive, \
LinkReceive, \
DecodeStart, \
DecodeEnd, \
HandlePacketStart, \
HandlePacketEnd, \
CopyOutputStart, \
CopyOutputEnd, \
SendFeedbackStart, \
SendFeedbackEnd, \
PrrtReturnPackage, \
PrrtReceivePackage, \
PrrtDeliver \
#define TIMESSTAMP_ID_TO_NAME(id) ts_##id
typedef enum XlapTimestampId {
PP_foreach(PP_join_comma, TIMESSTAMP_ID_TO_NAME, TIMESTAMP_ID_LIST),
ts_count
} XlapTimestampId;
typedef char XlapTimestampTable;
typedef char XlapTimestampPlaceholder;
typedef enum XlapTimestampPacketKind {
ts_data_packet = 0,
ts_any_packet = 0,
ts_redundancy_packet = 1,
ts_feedback_packet = 2
} XlapTimestampPacketKind;
#define XlapTimeStampValue(sck, kind, seqno, id, value) do { \
(void) (sck); \
(void) (kind); \
(void) (seqno); \
(void) (ts_##id); \
(void) (value); \
} while (0)
#define XlapCycleStampValue(sck, kind, seqno, id, value) do { \
(void) (sck); \
(void) (kind); \
(void) (seqno); \
(void) (ts_##id); \
(void) (value); \
} while (0)
#define XlapTimeStampClock(sck, kind, seqno, id) do { \
(void) (sck); \
(void) (kind); \
(void) (seqno); \
(void) (ts_##id); \
} while (0)
#define XlapTimeStampCycle(sck, kind, seqno, id) do { \
(void) (sck); \
(void) (kind); \
(void) (seqno); \
(void) (ts_##id); \
} while (0)
#define XlapTimestampPlaceholderUse(sck, kind, seqno, ph) do { \
(void) (sck); \
(void) (kind); \
(void) (seqno); \
(void) (ph); \
} while (0)
#define XlapTimestampTableDump(f, k, t) do { \
(void) (f); \
(void) (k); \
(void) (t); \
} while (0)
#define XlapTimestampTableInstall(sck, kind, tstp) do { \
(void) (sck); \
(void) (kind); \
(void) (tstp); \
} while (0)
#define XlapTimestampTableDumpHeader(f) do { \
(void) (f); \
} while (0)
#define XlapTimestampPlaceholderInitialize(ph) do { \
(void) (ph); \
} while (0)
#endif
#endif
......@@ -6,8 +6,17 @@ cimport cython
cimport cprrt
import datetime
include "sockets.pxd"
class TimeoutException(Exception):
def __init__(self):
self.message = "The call timed out."
class PayloadTooBigException(Exception):
pass
cdef extern from "proto/applicationConstraints.c":
pass
......@@ -98,7 +107,6 @@ cdef extern from "util/windowedFilter.c":
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
return (cprrt.PrrtSocket_inet_ntoa(&addr.sin_addr).decode("utf8"), cprrt.PrrtSocket_ntohs(addr.sin_port))
class PrrtCodingConfiguration:
def __init__(self, n, k, n_cycle=None):
if n < k:
......@@ -121,15 +129,14 @@ class PrrtCodingConfiguration:
def __repr__(self):
return "({},{},{})".format(self.n, self.k, self.n_cycle)
cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
_epoch = datetime.datetime.utcfromtimestamp(0)
def __cinit__(self, address, target_delay = 1, thread_pinning = False):
def __cinit__(self, address, mtu = 1500, target_delay = 1, thread_pinning = False):
host, port = address
target_delay_us = target_delay * 1000**2
self._c_socket = cprrt.PrrtSocket_create(target_delay_us)
self._c_socket = cprrt.PrrtSocket_create(mtu, target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
......@@ -190,7 +197,12 @@ cdef class PrrtSocket:
cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
def send(self, data):
cprrt.PrrtSocket_send(self._c_socket, data, len(data))
mtu = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "mtu")
data_len = len(data)
if len(data) < mtu:
cprrt.PrrtSocket_send(self._c_socket, data, data_len)
else:
raise PayloadTooBigException("Sending packet of {} bytes on a socket with MTU of {} bytes failed.".format(data_len, mtu))
# Receiving
......@@ -222,9 +234,13 @@ cdef class PrrtSocket:
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
cdef timespec deadline_timespec = self._convert_deadline(deadline)
with nogil:
len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
if len < 0:
raise TimeoutException()
if len == 0:
return (None, None)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered(self, time_window):
......@@ -250,9 +266,13 @@ cdef class PrrtSocket:
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
cdef timespec deadline_timespec = self._convert_deadline(deadline)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us, &deadline_timespec)
if len < 0:
raise TimeoutException()
if len == 0:
return (None, None)
return buffer[:len], sockaddr_to_addr_and_port(addr)
......@@ -316,3 +336,10 @@ cdef class PrrtSocket:
def __dealloc__(self):
if self._c_socket != NULL:
cprrt.PrrtSocket_close(self._c_socket)
def _convert_deadline(self, deadline):
diff = deadline - self._epoch
seconds = int(diff.total_seconds())
nanoseconds = int((diff.total_seconds() % 1) * (1000**3))
cdef timespec deadline_timespec = timespec(seconds, nanoseconds)
return deadline_timespec
......@@ -97,7 +97,7 @@ int main(int argc, char **argv) {
}
#endif
s = PrrtSocket_create(HALF_TIMESTAMP-1);
s = PrrtSocket_create(1500, HALF_TIMESTAMP - 1);
check(s != NULL, "Could not create socket.");
if(strcmp(arguments.hardwarestamping_interface, "-") != 0) {
......
......@@ -101,7 +101,7 @@ int main(int argc, char **argv) {
#endif
PrrtSocket *s = PrrtSocket_create(10 * 1000 * 1000);
PrrtSocket *s = PrrtSocket_create(1400, 10 * 1000 * 1000);
check(s != NULL, "Socket create failed.");
if(strcmp(arguments.hardwarestamping_interface, "-") != 0) {
......
cython==0.25.2
cython==0.28.3
\ No newline at end of file
......@@ -4,9 +4,6 @@ from Cython.Build import cythonize
import os, errno
import versioneer
os.environ["CC"] = "gcc-5"
os.environ["CXX"] = "g++-5"
ext = Extension(name='prrt', language="c", sources=["prrt/*.pyx"])
try:
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/prrt.c"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment