Commit 7650f453 authored by rna's avatar rna

Merge branch 'develop' into feature/congestionControl

parents 0672d883 b98eda49
Pipeline #2531 failed with stages
in 1 minute and 12 seconds
......@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport), mtu=150)
s = prrt.PrrtSocket(("127.0.1.1", localport), maximum_payload_size=150)
s.connect((host, port))
for i in range(10):
......
......@@ -119,7 +119,7 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t mtu, const uint32_t target_delay)
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
......
......@@ -85,7 +85,6 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
do {
prrtTimeDifference_t now = (prrtTimeDifference_t) PrrtClock_get_current_time_us();
diff = ((prrtTimeDifference_t) sock_ptr->nextSendTime) - now;
debug(DEBUG_DATATRANSMITTER, "C: %u, P: %u, S: %d", cwnd, pipe, space);
if (diff > 0) {
debug(DEBUG_DATATRANSMITTER, "S: %u, Pacing for %d (%u).", packet->sequenceNumber, diff, now);
usleep_nano((uint32_t) diff);
......@@ -96,7 +95,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
}
int64_t space = PrrtReceiver_get_space(sock_ptr->receiver);
while (space < sock_ptr->mtu) {
while (space < sock_ptr->maximum_payload_size) {
//PrrtReceiver_check_rto(sock_ptr->receiver, packet->sequenceNumber, PrrtPacket_type(packet));
PrrtReceiver_wait_for_space(sock_ptr->receiver);
space = PrrtReceiver_get_space(sock_ptr->receiver);
......
......@@ -10,8 +10,9 @@
#include <assert.h>
#include <sys/ioctl.h>
#include "../defines.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "../util/dbg.h"
#include "../util/time.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
#include "stores/deliveredPacketTable.h"
......@@ -44,14 +45,14 @@ static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffe
return len;
}
PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_us) {
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
check_mem(s);
s->nextSendTime = 0;
s->pacingEnabled = true;
s->mtu = mtu;
s->maximum_payload_size = maximum_payload_size;
s->isHardwareTimestamping = false;
s->interfaceName = NULL;
......@@ -213,13 +214,13 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
if(s->receiver != NULL) {
PrrtReceiver_destroy(s->receiver);
}
s->receiver = PrrtReceiver_create(host, port, s->mtu);
s->receiver = PrrtReceiver_create(host, port, s->maximum_payload_size);
return 0;
}
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, const size_t data_len) {
if (data_len > s->mtu) {
PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->mtu);
if (data_len > s->maximum_payload_size) {
PERROR("Data to be sent (%ld bytes) is too long, as MTU is %d.\n", data_len, s->maximum_payload_size);
return -1;
}
XlapTimestampPlaceholder tsph;
......@@ -254,7 +255,7 @@ int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* a
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPacket *packet;
do {
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore);
if (PrrtSocket_closing(s)) {
return -1;
}
......@@ -286,7 +287,8 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now, now + time_window_us);
struct timespec deadline = abstime_from_now(time_window_us);
packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, &deadline);
if (PrrtSocket_closing(s)) {
return -1;
}
......@@ -429,8 +431,8 @@ int PrrtSocket_close(PrrtSocket *s) {
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *s, const char *name) {
if (strcmp(name, "targetdelay") == 0) {
return PrrtApplicationConstraints_get_target_delay(s->applicationConstraints);
} else if (strcmp(name, "mtu") == 0) {
return s->mtu;
} else if (strcmp(name, "maximum_payload_size") == 0) {
return s->maximum_payload_size;
} else {
PERROR("Unknown property %s\n", name);
return 0;
......
......@@ -85,11 +85,11 @@ typedef struct prrtSocket {
PrrtChannelStateInformation* senderChannelStateInformation;
atomic_bool isThreadPinning;
prrtByteCount_t mtu;
prrtByteCount_t maximum_payload_size;
} PrrtSocket;
PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_us);
PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelta_t target_delay_us);
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name);
......
#include <pthread.h>
#include "../../defines.h"
#include "../clock.h"
#include "../types/packet.h"
#include "../../defines.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../../util/time.h"
#include "packetDeliveryStore.h"
int packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop, const struct timespec *deadline) {
q->last_start = start;
q->last_stop = stop;
q->in_ordered_wait = true;
int result = pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
debug(DEBUG_RECEIVER, "After wait: %d", result);
q->in_ordered_wait = false;
return result;
}
PrrtPacketDeliveryStore *PrrtPacketDeliveryStore_create() {
PrrtPacketDeliveryStore *q = (PrrtPacketDeliveryStore *) calloc(1, sizeof(PrrtPacketDeliveryStore));
q->tree = NULL;
q->closing = false;
q->last_start = 0;
q->last_stop = 0;
q->in_ordered_wait = false;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
......@@ -30,16 +45,18 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
do {
int res = pthread_cond_timedwait(&q->wait_for_data, &q->lock, deadline);
if (res == ETIMEDOUT) {
errno = ETIMEDOUT;
break;
} else {
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (packet == NULL) {
int res = packet_wait(q, start, stop, deadline);
packet = PrrtPacketDeliveryStore_get_packet(q, start, stop);
if (atomic_load_explicit(&q->closing, memory_order_acquire)) {
if (packet == NULL && res == ETIMEDOUT) {
errno = ETIMEDOUT;
break;
}
}
if (atomic_load_explicit(&q->closing, memory_order_acquire)) {
break;
}
} while(!packet);
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
......@@ -50,9 +67,10 @@ PrrtPacket * PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStor
return NULL;
}
PrrtPacket * PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
prrtTimestamp_t stop) {
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q) {
PrrtPacket *packet = NULL;
prrtTimestamp_t start = 0;
prrtTimestamp_t stop = MAX_TIMESTAMP;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
......@@ -100,8 +118,14 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtT
bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *packet) {
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
q->tree = BPTree_insert(q->tree, PrrtDataPacket_packet_timeout(packet), packet);
pthread_cond_broadcast(&q->wait_for_data);
prrtTimestamp_t timeout = PrrtDataPacket_packet_timeout(packet);
q->tree = BPTree_insert(q->tree, timeout, packet);
bool packet_awaited = (PrrtTimestamp_cmp(q->last_start, timeout) <= 0) && (PrrtTimestamp_cmp(timeout, q->last_stop) <= 0);
if (!q->in_ordered_wait || packet_awaited) {
pthread_cond_broadcast(&q->wait_for_data);
}
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
......
......@@ -9,14 +9,16 @@ typedef struct prrtReceiveDataQueue {
BPTreeNode *tree;
pthread_cond_t wait_for_data;
atomic_bool closing;
prrtTimestamp_t last_start;
prrtTimestamp_t last_stop;
bool in_ordered_wait;
} PrrtPacketDeliveryStore;
PrrtPacketDeliveryStore* PrrtPacketDeliveryStore_create(void);
PrrtPacket *PrrtPacketDeliveryStore_get_packet(PrrtPacketDeliveryStore *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q, prrtTimestamp_t start,
prrtTimestamp_t stop);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_wait(PrrtPacketDeliveryStore *q);
PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore *queue, prrtTimestamp_t start,
prrtTimestamp_t stop, const struct timespec *deadline);
......
......@@ -136,10 +136,10 @@ cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
_epoch = datetime.datetime.utcfromtimestamp(0)
def __cinit__(self, address, mtu = 1500, target_delay = 1, thread_pinning = False):
def __cinit__(self, address, maximum_payload_size = 1400, target_delay = 1, thread_pinning = False):
host, port = address
target_delay_us = target_delay * 1000**2
self._c_socket = cprrt.PrrtSocket_create(mtu, target_delay_us)
self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
......@@ -167,6 +167,10 @@ cdef class PrrtSocket:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
property maximum_payload_size:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
# Protocol configuration
property thread_pinning:
......@@ -200,12 +204,12 @@ cdef class PrrtSocket:
cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
def send(self, data):
mtu = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "mtu")
maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
data_len = len(data)
if len(data) < mtu:
if len(data) < maximum_payload_size:
cprrt.PrrtSocket_send(self._c_socket, data, data_len)
else:
raise PayloadTooBigException("Sending packet of {} bytes on a socket with MTU of {} bytes failed.".format(data_len, mtu))
raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
# Receiving
......
......@@ -12,14 +12,14 @@
#define DEBUG_BBR 1
#define DEBUG_BLOCK 0
#define DEBUG_PACKET 0
#define DEBUG_RECEIVER 0
#define DEBUG_RECEIVER 1
#define DEBUG_SENDER 0
#define DEBUG_SOCKET 0
#define DEBUG_DATARECEIVER 0
#define DEBUG_CLEANUP 1
#define DEBUG_DATATRANSMITTER 0
#define DEBUG_HARDSTAMPING 0
#define DEBUG_FEEDBACK 1
#define DEBUG_FEEDBACK 0
#ifdef DEBUG
#define debug(DOMAIN, M, ...) do { if (DEBUG_ALL||(DOMAIN)) fprintf(stderr, "DEBUG %-20s %s:%d: " M "\n", #DOMAIN + 6, __FILE__, __LINE__, ##__VA_ARGS__); } while (0)
......
......@@ -21,3 +21,14 @@ struct timespec abstime_from_now(uint32_t wait_time) {
return deadline;
}
// < 0: a less than b (b is in the future)
// > 0: a greater b (b is in the past)
// == 0: a equal b
int64_t PrrtTimestamp_cmp(prrtTimestamp_t a, prrtTimestamp_t b) {
prrtTimestamp_t diff = (prrtTimestamp_t) (a - b);
if (diff < TIMESTAMP_SPACE / 2) {
return diff;
} else {
return ((int64_t)diff) - ((int64_t)(TIMESTAMP_SPACE) + 1);
}
}
\ No newline at end of file
#ifndef PRRT_TIME_H
#define PRRT_TIME_H
#include "../proto/types/packet.h"
long long timedelta(struct timespec *t1, struct timespec *t2);
struct timespec abstime_from_now(uint32_t wait_time);
int64_t PrrtTimestamp_cmp(prrtTimestamp_t a, prrtTimestamp_t b);
#endif //PRRT_TIME_H
add_subdirectory(lib/gtest-1.8.0)
include_directories(SYSTEM ${gtest_SOURCE_DIR}/include ${gtest_SOURCE_DIR})
add_executable(prrtTests common.h bitmap_tests.cpp receptionTable_tests.cpp delivered_packet_table_tests.cpp bptree_tests.cpp PrrtBlock_tests.cpp)
add_executable(prrtTests common.h util_tests.cpp bitmap_tests.cpp receptionTable_tests.cpp delivered_packet_table_tests.cpp bptree_tests.cpp PrrtBlock_tests.cpp)
target_link_libraries(prrtTests LINK_PUBLIC gtest PRRT UTIL gtest_main)
\ No newline at end of file
#include "common.h"
extern "C" {
#include "prrt/util/time.h"
}
class UtilTests : public ::testing::Test {
protected:
virtual void SetUp()
{
}
virtual void TearDown()
{
}
};
TEST_F(UtilTests, Timestamps)
{
prrtTimestamp_t a = 5;
prrtTimestamp_t b = 47;
prrtTimestamp_t c = 4294967295;
ASSERT_EQ(PrrtTimestamp_cmp(a,b), -42);
ASSERT_EQ(PrrtTimestamp_cmp(b,a), 42);
ASSERT_EQ(PrrtTimestamp_cmp(c,a), -6);
ASSERT_EQ(PrrtTimestamp_cmp(a,c), 6);
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment