Commit 385537fb authored by Andreas Schmidt's avatar Andreas Schmidt

Merge branch 'develop' into feature/congestionControl

parents e7808ea1 feadf321
Pipeline #2238 passed with stages
in 1 minute and 10 seconds
......@@ -8,6 +8,7 @@
* Clock synchronization between sending stack and receiving stack
* Applications can specify packet-level expiration times
* Different receive modes for ASAP and time-synchronized operation
* Passive measurement of propagation delay, bottleneck data rate and packet loss rate
* Packet-level timing analysis using [X-Lap](http://xlap.larn.systems)
* [Hardware timestamping support](https://git.nt.uni-saarland.de/LARN/PRRT/wikis/hardware-timestamping)
......
......@@ -3,11 +3,12 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(("127.0.0.1", port))
while True:
d = s.recv()
d, addr = s.recv()
d = d.decode("utf8")
if d != "Close":
print d
print(d, addr)
else:
break
......@@ -3,10 +3,11 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port, isSender=True)
s.connect(host, port)
s = prrt.PrrtSocket(("127.0.1.1", localport))
s.connect((host, port))
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
include "posix/time.pxd"
include "sockets.pxd"
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t, uint64_t
from libc.string cimport const_char
cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t:
pass
......@@ -123,14 +125,14 @@ cdef extern from "proto/socket.h":
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, timespec* deadline) nogil
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
......@@ -139,7 +141,6 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtprop_fwd(PrrtSocket *socket)
float PrrtSocket_get_plr_fwd(PrrtSocket *socket)
uint32_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *socket)
......@@ -160,9 +161,11 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_get_bbr_round_start(PrrtSocket *s)
uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *socket)
bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
char *PrrtSocket_inet_ntoa(in_addr*)
uint16_t PrrtSocket_ntohs(uint16_t v)
cdef extern from "proto/stores/packetDeliveryStore.h":
ctypedef struct PrrtPacketDeliveryStore:
pass
......@@ -185,3 +188,4 @@ cdef extern from "util/pipe.h":
cdef extern from "util/mpsc_queue.h":
ctypedef struct MPSCQueue:
pass
......@@ -17,7 +17,6 @@ typedef struct prrtChannelStateInformation {
PrrtChannelStateInformation* PrrtChannelStateInformation_create(void);
prrtDeliveryRate_t PrrtChannelStateInformation_get_delivery_rate(PrrtChannelStateInformation *csi);
bool PrrtChannelStateInformation_get_app_limited(PrrtChannelStateInformation *csi);
prrtPacketLossRate_t PrrtChannelStateInformation_get_plr(PrrtChannelStateInformation* csi);
void PrrtChannelStateInformation_update_plr(PrrtChannelStateInformation *csi, prrtSequenceNumber_t erasures,
......
......@@ -207,7 +207,7 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
return;
}
void handle_feedback_packet(const PrrtSocket *prrtSocket, const PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) {
check(prrtPacket != NULL, "Cannot be null");
debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload;
......@@ -297,6 +297,7 @@ void *receive_data_loop(void *ptr) {
XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
memcpy(&packet->sender_addr, &remote, addrlen);
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
......
......@@ -18,7 +18,7 @@
#include "types/packetTimeout.h"
#include "socket.h"
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
size_t timespec_size = sizeof(struct timespec);
prrtPacketLength_t len = 0;
if(packet != NULL) {
......@@ -35,6 +35,8 @@ static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffe
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet);
......@@ -146,6 +148,8 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
struct sockaddr_in *address = calloc(1, size);
check_mem(address);
// TODO: Allow DNS names to be passed as ipAddress.
address->sin_family = AF_INET;
address->sin_addr.s_addr = inet_addr(ipAddress);
address->sin_port = htons((uint16_t) (port));
......@@ -249,13 +253,13 @@ bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire);
}
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPacket *packet;
packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPacket *packet;
do {
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
......@@ -263,31 +267,30 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
return -1;
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
if (packet == NULL && errno == ETIMEDOUT) {
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
return PrrtSocket_receive_asap_wait(s, buf_ptr);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
}
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
now + time_window_us);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
......@@ -297,11 +300,10 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimede
return -1;
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
......@@ -310,7 +312,7 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
......@@ -536,11 +538,14 @@ prrtDeliveryRate_t PrrtSocket_get_delivery_rate_fwd(PrrtSocket *s) {
return PrrtChannelStateInformation_get_delivery_rate(s->receiver->csi);
}
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s) {
return BBR_getRoundStart(s->receiver->bbr);
}
uint32_t PrrtSocket_get_bbr_state(PrrtSocket *s) {
return BBR_getState(s->receiver->bbr);
}
bool PrrtSocket_get_filled_pipe(PrrtSocket *s) {
return BBR_getFilledPipe(s->receiver->bbr);
}
......@@ -596,9 +601,6 @@ uint32_t PrrtSocket_get_bbr_app_limited(PrrtSocket *s) {
return s->receiver->packetTracking->app_limited;
}
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s) {
return BBR_getRoundStart(s->receiver->bbr);
}
PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration *codingParams) {
if (s->coder == NULL || PrrtCoder_get_k(s->coder) != codingParams->k ||
......@@ -609,4 +611,12 @@ PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration
s->coder = PrrtCoder_create(codingParams);
}
return PrrtCoder_copy(s->coder);
};
\ No newline at end of file
};
char *PrrtSocket_inet_ntoa(struct in_addr* in) {
return inet_ntoa(*in);
}
uint16_t PrrtSocket_ntohs(uint16_t v) {
return ntohs(v);
}
\ No newline at end of file
......@@ -114,15 +114,15 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline);
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline);
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline);
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
bool PrrtSocket_cleanup(PrrtSocket *s);
......@@ -150,6 +150,8 @@ prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s);
prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s);
bool PrrtSocket_get_bbr_round_start(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
char *PrrtSocket_inet_ntoa(struct in_addr* in);
uint16_t PrrtSocket_ntohs(uint16_t v);
#endif // PRRT_SOCKET_H
......@@ -5,6 +5,7 @@
#include "../../util/pipe.h"
#include "../codingParams.h"
#include <stdbool.h>
#include <netinet/in.h>
typedef enum {
......@@ -48,6 +49,8 @@ typedef struct prrtPacket {
bool is_app_limited;
prrtTimestamp_t sent_time;
prrtTimedelta_t rtt;
struct sockaddr_in sender_addr;
} PrrtPacket;
#define PRRT_PACKET_GENERAL_HEADER_SIZE 8
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
......
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.stdlib cimport malloc, free
from libc.string cimport memset
from posix.time cimport timespec
cimport cython
cimport cprrt
include "sockets.pxd"
cdef extern from "proto/applicationConstraints.c":
pass
......@@ -92,6 +95,10 @@ cdef extern from "util/mpsc_queue.c":
cdef extern from "util/windowedFilter.c":
pass
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
return (cprrt.PrrtSocket_inet_ntoa(&addr.sin_addr), cprrt.PrrtSocket_ntohs(addr.sin_port))
class PrrtCodingConfiguration:
def __init__(self, n, k, n_cycle=None):
if n < k:
......@@ -118,26 +125,44 @@ class PrrtCodingConfiguration:
cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
def __cinit__(self, port, target_delay = 1, thread_pinning = False):
def __cinit__(self, address, target_delay = 1, thread_pinning = False):
host, port = address
target_delay_us = target_delay * 1000**2
self._c_socket = cprrt.PrrtSocket_create(target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
cprrt.PrrtSocket_bind(self._c_socket, "0.0.0.0", port)
cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
# Channel Properties
property data_rate_btl_fwd:
def __get__(self):
return cprrt.PrrtSocket_get_btlbw_fwd(self._c_socket)
property plr_fwd:
property data_rate_btl_back:
def __get__(self):
return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
return cprrt.PrrtSocket_get_btlbw_back(self._c_socket)
property thread_pinning:
property rtt_prop_fwd:
def __get__(self):
return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)
return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001
property loss_rate_fwd:
def __get__(self):
return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
# Application Properties
property target_delay:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
# Protocol configuration
property thread_pinning:
def __get__(self):
return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)
property app_queue_size:
def __get__(self):
return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "app_queue_size")
......@@ -145,10 +170,6 @@ cdef class PrrtSocket:
def __set__(self, value):
cprrt.PrrtSocket_set_sock_opt(self._c_socket, "app_queue_size", value)
property rtt_fwd:
def __get__(self):
return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001
property coding_configuration:
def __get__(self):
cdef cprrt.PrrtCodingConfiguration *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
......@@ -162,18 +183,80 @@ cdef class PrrtSocket:
n_cycle[i] = x
cprrt.PrrtSocket_set_coding_parameters(self._c_socket, params.k, params.n, c, n_cycle)
property delivery_rate_fwd:
def __get__(self):
return cprrt.PrrtSocket_get_delivery_rate_fwd(self._c_socket)
# Sending
def connect(self, address):
host, port = address
cdef bytes encodedHost = host.encode("utf-8")
cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
property btlbw_fwd:
def __get__(self):
return cprrt.PrrtSocket_get_btlbw_fwd(self._c_socket)
def send(self, data):
cprrt.PrrtSocket_send(self._c_socket, data, len(data))
property btlbw_back:
def __get__(self):
return cprrt.PrrtSocket_get_btlbw_back(self._c_socket)
# Receiving
def recv(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap_wait(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap_timedwait(self, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered_wait(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered_timedwait(self, time_window, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us, &deadline_timespec)
return buffer[:len], sockaddr_to_addr_and_port(addr)
# Internals
property bbr_state:
def __get__(self):
return cprrt.PrrtSocket_get_bbr_state(self._c_socket)
......@@ -230,67 +313,6 @@ cdef class PrrtSocket:
def __get__(self):
return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)
def recv(self):
cdef char buffer[65536]
cdef int32_t len
with nogil:
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer)
return buffer[:len]
def receive_asap(self):
cdef char buffer[65536]
cdef int32_t len
with nogil:
len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer)
return buffer[:len]
def receive_asap_wait(self):
cdef char buffer[65536]
cdef int32_t len
with nogil:
len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer)
return buffer[:len]
def receive_asap_timedwait(self, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, &deadline_timespec)
return buffer[:len]
def receive_ordered(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
def receive_ordered_wait(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
def receive_ordered_timedwait(self, time_window, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef uint32_t time_window_us = time_window * (1000**2)
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, time_window_us, &deadline_timespec)
return buffer[:len]
def connect(self, host, port):
cdef bytes encodedHost = host.encode("utf-8")
cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
def send(self, data):
cprrt.PrrtSocket_send(self._c_socket, data, len(data))
def __dealloc__(self):
if self._c_socket != NULL:
cprrt.PrrtSocket_close(self._c_socket)
......@@ -118,10 +118,11 @@ int main(int argc, char **argv) {
XlapTimestampTableInstall(s, ts_redundancy_packet, tstable_redundancy);
uint32_t i = 0;
struct sockaddr_in addr;
while (i < rounds && keepRunning) {
char buffer[MAX_PAYLOAD_LENGTH + 1];
debug(DEBUG_RECEIVER, "About to receive.");
int n = PrrtSocket_recv(s, buffer);
int n = PrrtSocket_recv(s, buffer, (struct sockaddr *) &addr);
if (n < 0) {
continue;
}
......
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
cdef struct in_addr:
uint32_t s_addr
ctypedef uint16_t sa_family_t
cdef struct sockaddr:
sa_family_t sa_family
char sa_data[14]
cdef struct sockaddr_in:
sa_family_t sin_family
unsigned short sin_port
in_addr sin_addr
char sa_data[250]
......@@ -140,7 +140,8 @@ static inline void _send(socket_t conn, const char *buf, size_t size) {
}
static inline ssize_t _recv(socket_t conn, char *buf, size_t size) {
return PrrtSocket_receive_asap_wait(conn, buf);
struct sockaddr_in addr;
return PrrtSocket_receive_asap_wait(conn, buf, (struct sockaddr*) &addr);
}
static inline void _close(socket_t conn) {
......
......@@ -7,7 +7,7 @@ import versioneer
os.environ["CC"] = "gcc-5"
os.environ["CXX"] = "g++-5"
ext = Extension(name='prrt', sources=["prrt/*.pyx"])
ext = Extension(name='prrt', language="c", sources=["prrt/*.pyx"])
try:
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/prrt.c"))
except OSError as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment