...
 
Commits (10)
......@@ -36,4 +36,4 @@ if(PRRT_TESTS)
endif()
add_custom_target(perftest COMMAND python3 tests/eval.py)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
add_custom_target(memtest COMMAND bash ./memtest.sh DEPENDS sender receiver)
\ No newline at end of file
......@@ -3,7 +3,7 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(("127.0.0.1", port))
s = prrt.PrrtSocket(("0.0.0.0", port))
while True:
d, addr = s.recv()
......
......@@ -5,7 +5,7 @@ host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(("127.0.1.1", localport), maximum_payload_size=150)
s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s.connect((host, port))
for i in range(10):
......
......@@ -25,11 +25,14 @@ add_executable(receiver receiver.c)
add_executable(time-sender time-sender.c)
add_executable(time-receiver time-receiver.c)
target_link_libraries(sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-sender LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(time-receiver LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
add_executable(refcount refcount.c)
target_link_libraries(refcount LINK_PUBLIC PRRT UTIL ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(refcount LINK_PUBLIC prrt_shared prrtUtil_shared ${CMAKE_THREAD_LIBS_INIT})
install(DIRECTORY ./ DESTINATION include/prrt
FILES_MATCHING PATTERN "*.h")
\ No newline at end of file
......@@ -120,7 +120,7 @@ cdef extern from "proto/socket.h":
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(const uint32_t maximum_payload_size, const uint32_t target_delay)
bint PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_bind(PrrtSocket *sock_ptr, const_char *ipAddress, const uint16_t port)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send_sync(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
......
......@@ -26,6 +26,19 @@ if (XLAP)
set(PRRT_SOURCES ${PRRT_SOURCES} ../xlap/xlap.c ../xlap/xlap.h)
endif()
add_library(PRRT ${PRRT_SOURCES})
add_library(prrt_obj OBJECT ${PRRT_SOURCES})
set_property(TARGET prrt_obj PROPERTY POSITION_INDEPENDENT_CODE 1)
target_link_libraries(PRRT rt)
add_library(prrt_static STATIC $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_static rt)
set_target_properties(prrt_static PROPERTIES OUTPUT_NAME "prrt")
add_library(prrt_shared SHARED $<TARGET_OBJECTS:prrt_obj>)
target_link_libraries(prrt_shared rt)
set_target_properties(prrt_shared PROPERTIES OUTPUT_NAME "prrt")
install(TARGETS prrt_static prrt_shared
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#include <sys/time.h>
#include <stdlib.h>
#include "../util/time.h"
#include "../util/common.h"
#include "clock.h"
......@@ -19,50 +20,45 @@ prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock)
prrtTimestamp_t lastMeasurement = clock->lastMeasurement;
prrtTimeDifference_t diff = diff_abs_ts(currentTime, lastMeasurement);
prrtTimeDifference_t skew = (diff * clock->skew) / 400;
return (prrtTimestamp_t) virtualTime + clock->meanDeviation + skew;
return (prrtTimestamp_t) (virtualTime + clock->meanDeviation + skew);
} else {
return currentTime;
}
}
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t rtt)
bool PrrtClock_update(PrrtClock *clock, uint32_t referenceTime, uint32_t propagation_delay)
{
prrtTimestamp_t virtualTime = PrrtClock_get_prrt_time_us(clock);
prrtTimestamp_t currentTime = PrrtClock_get_current_time_us();
prrtTimestamp_t virtualTime = clock->virtualTime;
prrtTimeDifference_t clockSkew = clock->skew;
prrtTimedelta_t delay = rtt/2; // half the rtt
prrtTimeDifference_t phaseError = referenceTime - virtualTime + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13d, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
int64_t clockSkew = clock->skew;
prrtTimedelta_t delay = propagation_delay / 2; // half the rtt
int64_t phaseError = PrrtTimestamp_cmp(referenceTime, virtualTime) + delay;
//printf("V: %13u, C: %13u, S: %13d, R: %13u, E: %13ld, D: %3u L: %u\n", virtualTime, currentTime, clockSkew, referenceTime, phaseError, delay, clock->lastMeasurement);
if(abs(phaseError) > 10000) {
if(labs(phaseError) > 10000) {
clock->meanDeviation = 0;
clock->skew = 0;
clock->lastMeasurement = currentTime;
clock->virtualTime = currentTime;
}
prrtTimeDifference_t deviationSum = (prrtTimeDifference_t) (phaseError + 3 * clock->meanDeviation); // TODO: why 3?
int64_t deviationSum =(phaseError + 3 * clock->meanDeviation); // TODO: why 3?
prrtTimeDifference_t meanDeviation = (prrtTimeDifference_t) (deviationSum / 4); // TODO: find out why???
prrtTimeDifference_t period = diff_abs_ts(currentTime, clock->lastMeasurement);
int64_t period = PrrtTimestamp_cmp(currentTime, clock->lastMeasurement);
if(period > 0) {
clockSkew = (meanDeviation / period) + 15 * clock->skew;
clockSkew = (prrtTimeDifference_t) (clockSkew / 16);
}
virtualTime = virtualTime + meanDeviation + period * clockSkew / 400;
virtualTime = clock->virtualTime + meanDeviation + period * clockSkew / 400;
clock->meanDeviation = meanDeviation;
clock->skew = clockSkew;
clock->skew = (prrtTimeDifference_t) clockSkew;
clock->lastMeasurement = currentTime;
clock->virtualTime = virtualTime;
if(abs(phaseError) > 10000) {
clock->virtualTime = currentTime;
}
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
//printf("V: %13u, C: %13u; S: %13d; M: %13d, P:%13u\n\n", virtualTime, currentTime, clockSkew, meanDeviation, period);
return true;
}
......
......@@ -26,7 +26,7 @@ prrtTimestamp_t PrrtClock_get_current_time_us(void);
prrtTimestamp_t PrrtClock_get_prrt_time_us(PrrtClock *clock);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t rtt);
bool PrrtClock_update(PrrtClock *clock, prrtTimestamp_t referenceTime, prrtTimedelta_t propagation_delay);
#define diff_abs_ts(timeA, timeB) (prrtTimeDifference_t) abs(((prrtTimeDifference_t) timeA) - ((prrtTimeDifference_t)timeB))
......
......@@ -83,18 +83,6 @@ static bool send_feedback(PrrtSocket *sock_ptr,
.sentTime = sentTimestamp
};
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
struct sockaddr_in targetaddr;
memset((char *) &targetaddr, 0, sizeof(targetaddr));
targetaddr.sin_family = AF_INET;
targetaddr.sin_port = htons((uint16_t) (remote_port));
struct hostent *hp;
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
(sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));
......@@ -114,7 +102,7 @@ static bool send_feedback(PrrtSocket *sock_ptr,
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &remote, sizeof(remote)) ==
length, "Sending feedback failed.");
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(buf);
......@@ -157,10 +145,6 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw);
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
// forward to application layer
debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
PrrtPacket *reference = PrrtPacket_copy(packet);
// forward to application layer
......
......@@ -93,8 +93,6 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
"Socket option set failed.");
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
check(setsockopt(s->socketFd, SOL_SOCKET, SO_REUSEPORT, &enabled, sizeof(enabled)) == EXIT_SUCCESS,
"Socket option set failed.");
s->sendDataQueue = Pipe_create();
s->deliveredPacketTable = PrrtDeliveredPacketTable_create();
......@@ -134,7 +132,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa
return false;
}
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port) {
check(port <= 65534, "Port %d cannot be bound to.", port);
size_t size = sizeof(struct sockaddr_in);
......@@ -205,10 +203,13 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
s->isBound = true;
return true;
return 0;
error:
PrrtSocket_close(s);
return false;
if (h_errno) {
return h_errno;
}
return -2;
}
bool PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) {
......@@ -554,9 +555,6 @@ bool PrrtSocket_cleanup(PrrtSocket *s) {
PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase);
PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase,
lastSequenceNumberBase);
List *list = List_create();
PrrtDataPacketStore_remove_range(s->dataPacketStore, list, firstSequenceNumberBase,
last->sequenceNumber);
......
......@@ -99,7 +99,7 @@ bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interfa
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s);
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
int PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value);
......
......@@ -150,12 +150,20 @@ cdef class PrrtSocket:
self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
if not cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port):
h_errno = cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
if h_errno != 0:
# PrrtSocket_bind calls PrrtSocket_close on error
# so we need to set _c_socket to NULL because otherwise __dealloc__
# will attempt to call PrrtSocket_close again on the closed socket
self._c_socket = NULL
raise ValueError("PrrtSocket_bind failed")
# TODO: use hstrerror() instead
raise ValueError("PrrtSocket_bind failed: " + ({
1: 'host not found.',
2: 'try again.',
3: 'no recovery.',
4: 'no data.',
-1: 'netdb internal error.',
}).get(h_errno, 'unknown error.'))
# Channel Properties
property data_rate_btl_fwd:
......
......@@ -108,7 +108,7 @@ int main(int argc, char **argv) {
PrrtSocket_enable_thread_pinning(s);
}
check(PrrtSocket_bind(s, "0.0.0.0", arguments.port), "bind failed");
check(PrrtSocket_bind(s, "0.0.0.0", arguments.port) == 0, "bind failed");
XlapTimestampTable *tstable_data = malloc(sizeof(XlapTimestampTable));
XlapTimestampTable *tstable_redundancy = malloc(sizeof(XlapTimestampTable));
......@@ -129,6 +129,10 @@ int main(int argc, char **argv) {
buffer[n] = '\0';
unsigned int seqno;
sscanf(buffer, "%u", &seqno);
prrtTimestamp_t current_time = PrrtClock_get_current_time_us();
prrtTimestamp_t prrt_time = PrrtClock_get_prrt_time_us(&s->clock);
printf("C: %d\nP: %d\nD: \n%d\n\n", current_time, prrt_time, prrt_time - current_time);
debug(DEBUG_RECEIVER, "[B (n: %3d, i: %3d)] %s", n, i, buffer);
i++;
}
......
......@@ -122,7 +122,7 @@ int main(int argc, char **argv) {
PrrtSocket_enable_thread_pinning(s);
}
check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed");
check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port) == EXIT_SUCCESS, "bind failed");
//PrrtSocket_set_coding_parameters(s, 1, 1); // comment this line to re-enable coding.
......
......@@ -24,6 +24,7 @@ struct arguments {
uint16_t size;
char *outfile;
char *target;
bool optimized;
bool thread_pinning;
};
......@@ -89,6 +90,7 @@ int main(int argc, char **argv) {
.rounds = 127,
.size = 1500,
.outfile = "-",
.optimized = true,
.thread_pinning = false,
};
argp_parse(&argp, argc, argv, 0, 0, &arguments);
......
......@@ -15,6 +15,7 @@ static struct argp_option options[] = {
{"rounds", 'r', "ROUNDS", 0, "Rounds" },
{"size", 's', "SIZE", 0, "Size" },
{"output", 'o', "FILE", 0, "Output to FILE instead of standard output" },
{"unoptimized", 'U', 0, 0, "Disable TCP optimizations"},
{"threadpinning", 'T', 0, 0, "Enable thread pinning"},
{ 0 }
};
......@@ -27,6 +28,7 @@ struct arguments
uint16_t rounds;
uint16_t size;
char* outfile;
bool optimized;
bool thread_pinning;
};
......@@ -57,6 +59,9 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'o':
arguments->outfile = arg;
break;
case 'U':
arguments->optimized = false;
break;
case 'T':
arguments->thread_pinning = true;
break;
......@@ -81,6 +86,7 @@ int main(int argc, char **argv) {
.size = 1500,
.rounds = 127,
.outfile = "-",
.optimized = true,
.thread_pinning = false,
};
argp_parse (&argp, argc, argv, 0, 0, &arguments);
......
add_library(UTIL ../defines.h
set (UTIL_SOURCES ../defines.h
common.c common.h
list.c list.h
pipe.c pipe.h
dbg.h
bptree.c bptree.h
bitmap.c bitmap.h
mpsc_queue.c mpsc_queue.h
time.c time.h
mpsc_queue.c mpsc_queue.h
windowedFilter.c windowedFilter.h)
set_property(TARGET UTIL PROPERTY C_STANDARD 99)
target_link_libraries(UTIL ${M_LIB})
add_library(prrtUtil_obj OBJECT ${UTIL_SOURCES})
set_property(TARGET prrtUtil_obj PROPERTY C_STANDARD 99)
add_library(prrtUtil_shared SHARED $<TARGET_OBJECTS:prrtUtil_obj>)
set_target_properties(prrtUtil_shared PROPERTIES OUTPUT_NAME "prrtUtil")
add_library(prrtUtil_static SHARED $<TARGET_OBJECTS:prrtUtil_obj>)
set_target_properties(prrtUtil_static PROPERTIES OUTPUT_NAME "prrtUtil")
target_link_libraries(prrtUtil_shared ${M_LIB})
target_link_libraries(prrtUtil_static ${M_LIB})
install(TARGETS prrtUtil_shared prrtUtil_static
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
......@@ -2,4 +2,4 @@ add_subdirectory(lib/gtest-1.8.0)
include_directories(SYSTEM ${gtest_SOURCE_DIR}/include ${gtest_SOURCE_DIR})
add_executable(prrtTests common.h util_tests.cpp bitmap_tests.cpp receptionTable_tests.cpp delivered_packet_table_tests.cpp bptree_tests.cpp PrrtBlock_tests.cpp)
target_link_libraries(prrtTests LINK_PUBLIC gtest PRRT UTIL gtest_main)
\ No newline at end of file
target_link_libraries(prrtTests LINK_PUBLIC gtest prrt_shared prrtUtil_shared gtest_main)
\ No newline at end of file