Commit e2fbdea1 authored by Andreas Schmidt's avatar Andreas Schmidt

IP and port are return upon reception.

* Update examples.
parent 94baf44d
......@@ -3,11 +3,12 @@ import prrt
port = int(sys.argv[1])
s = prrt.PrrtSocket(port=port, isSender=False)
s = prrt.PrrtSocket(port=port)
while True:
d = s.recv()
d, addr = s.recv()
d = d.decode("utf8")
if d != "Close":
print d
print(d, addr)
else:
break
......@@ -3,10 +3,11 @@ import prrt
host = sys.argv[1]
port = int(sys.argv[2])
localport = int(sys.argv[3])
s = prrt.PrrtSocket(port=port, isSender=True)
s = prrt.PrrtSocket(port=localport)
s.connect(host, port)
for i in range(10):
s.send("Packet {}".format(i))
s.send("Close")
s.send("Packet {}".format(i).encode("utf8"))
s.send("Close".encode("utf8"))
include "posix/time.pxd"
include "sockets.pxd"
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.string cimport const_char
cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t:
pass
......@@ -123,14 +125,14 @@ cdef extern from "proto/socket.h":
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, timespec* deadline) nogil
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr) nogil
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us) nogil
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, sockaddr* addr, prrtTimedelta_t time_window_us, timespec* deadline) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
......@@ -149,6 +151,9 @@ cdef extern from "proto/socket.h":
bint PrrtSocket_get_app_limited(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
char *PrrtSocket_inet_ntoa(in_addr*)
uint16_t PrrtSocket_ntohs(uint16_t v)
cdef extern from "proto/stores/packetDeliveryStore.h":
ctypedef struct PrrtPacketDeliveryStore:
pass
......@@ -171,3 +176,4 @@ cdef extern from "util/pipe.h":
cdef extern from "util/mpsc_queue.h":
ctypedef struct MPSCQueue:
pass
......@@ -303,6 +303,7 @@ void *receive_data_loop(void *ptr) {
XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
prrtSequenceNumber_t seqno = packet->sequenceNumber;
prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
memcpy(&packet->sender_addr, &remote, addrlen);
prrtPacketType_t packetType = PrrtPacket_type(packet);
debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
......
......@@ -18,7 +18,7 @@
#include "types/packetTimeout.h"
#include "socket.h"
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet) {
static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffer, PrrtPacket *packet, struct sockaddr* addr) {
prrtPacketLength_t len = 0;
if(packet != NULL) {
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
......@@ -29,6 +29,8 @@ static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffe
PrrtPacket_copy_payload_to_buffer(buffer, packet, PRRT_PACKET_DATA_HEADER_SIZE);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, CopyOutputEnd);
memcpy(addr, &(packet->sender_addr), sizeof(struct sockaddr_in));
XlapTimeStampClock(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
XlapTimeStampCycle(s, ts_data_packet, packet->sequenceNumber, PrrtDeliver);
PrrtPacket_destroy(packet);
......@@ -238,13 +240,13 @@ bool PrrtSocket_closing(PrrtSocket *s) {
return atomic_load_explicit(&s->closing, memory_order_acquire);
}
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr) {
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPacket *packet;
packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
PrrtPacket *packet;
do {
packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, 0, MAX_TIMESTAMP);
......@@ -253,30 +255,30 @@ int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr) {
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline) {
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline) {
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, 0, MAX_TIMESTAMP, deadline);
if (packet == NULL && errno == ETIMEDOUT) {
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr) {
return PrrtSocket_receive_asap_wait(s, buf_ptr);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr) {
return PrrtSocket_receive_asap_wait(s, buf_ptr, addr);
}
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet(s->packetDeliveryStore, now - time_window_us,
now + time_window_us);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us) {
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us) {
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
......@@ -287,10 +289,10 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimede
}
} while (!packet);
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us,
......@@ -299,7 +301,7 @@ int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtT
return -1 * ETIMEDOUT;
}
return deliver_packet(s, buf_ptr, packet);
return deliver_packet(s, buf_ptr, packet, addr);
}
......@@ -542,4 +544,12 @@ PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration
s->coder = PrrtCoder_create(codingParams);
}
return PrrtCoder_copy(s->coder);
};
\ No newline at end of file
};
char *PrrtSocket_inet_ntoa(struct in_addr* in) {
return inet_ntoa(*in);
}
uint16_t PrrtSocket_ntohs(uint16_t v) {
return ntohs(v);
}
\ No newline at end of file
......@@ -107,15 +107,15 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, uint16_t port);
int PrrtSocket_send(PrrtSocket *s, const uint8_t *data, size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_recv(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr);
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct timespec* deadline);
int32_t PrrtSocket_receive_asap(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr);
int32_t PrrtSocket_receive_asap_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, struct timespec* deadline);
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, prrtTimedelta_t time_window_us, struct timespec* deadline);
int32_t PrrtSocket_receive_ordered(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline);
bool PrrtSocket_cleanup(PrrtSocket *s);
......@@ -132,4 +132,7 @@ prrtDeliveryRate_t PrrtSocket_get_btlbw_fwd(PrrtSocket *s);
prrtDeliveryRate_t PrrtSocket_get_btlbw_back(PrrtSocket *s);
bool PrrtSocket_get_app_limited(PrrtSocket *s);
char *PrrtSocket_inet_ntoa(struct in_addr* in);
uint16_t PrrtSocket_ntohs(uint16_t v);
#endif // PRRT_SOCKET_H
......@@ -5,6 +5,7 @@
#include "../../util/pipe.h"
#include "../codingParams.h"
#include <stdbool.h>
#include <netinet/in.h>
typedef enum {
......@@ -47,6 +48,8 @@ typedef struct prrtPacket {
bool is_app_limited;
prrtTimestamp_t sent_time;
prrtTimedelta_t rtt;
struct sockaddr_in sender_addr;
} PrrtPacket;
#define PRRT_PACKET_GENERAL_HEADER_SIZE 8
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
......
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.stdlib cimport malloc, free
from libc.string cimport memset
from posix.time cimport timespec
cimport cython
cimport cprrt
include "sockets.pxd"
cdef extern from "proto/applicationConstraints.c":
pass
......@@ -86,6 +89,9 @@ cdef extern from "util/pipe.c":
cdef extern from "util/mpsc_queue.c":
pass
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
return (cprrt.PrrtSocket_inet_ntoa(&addr.sin_addr), cprrt.PrrtSocket_ntohs(addr.sin_port))
class PrrtCodingConfiguration:
def __init__(self, n, k, n_cycle=None):
if n < k:
......@@ -175,56 +181,63 @@ cdef class PrrtSocket:
def recv(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer)
return buffer[:len]
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer)
return buffer[:len]
len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap_wait(self):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
with nogil:
len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer)
return buffer[:len]
len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_asap_timedwait(self, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, &deadline_timespec)
return buffer[:len]
len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered_wait(self, time_window):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def receive_ordered_timedwait(self, time_window, deadline):
cdef char buffer[65536]
cdef int32_t len
cdef sockaddr_in addr
cdef uint32_t time_window_us = time_window * (1000**2)
cdef timespec deadline_timespec = timespec(deadline.seconds, deadline.microseconds * 1000)
with nogil:
len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, time_window_us, &deadline_timespec)
return buffer[:len]
len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us, &deadline_timespec)
return buffer[:len], sockaddr_to_addr_and_port(addr)
def connect(self, host, port):
cdef bytes encodedHost = host.encode("utf-8")
......
......@@ -118,10 +118,11 @@ int main(int argc, char **argv) {
XlapTimestampTableInstall(s, ts_redundancy_packet, tstable_redundancy);
uint32_t i = 0;
struct sockaddr_in addr;
while (i < rounds && keepRunning) {
char buffer[MAX_PAYLOAD_LENGTH + 1];
debug(DEBUG_RECEIVER, "About to receive.");
int n = PrrtSocket_recv(s, buffer);
int n = PrrtSocket_recv(s, buffer, (struct sockaddr *) &addr);
if (n < 0) {
continue;
}
......
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
cdef struct in_addr:
uint32_t s_addr
ctypedef uint16_t sa_family_t
cdef struct sockaddr:
sa_family_t sa_family
char sa_data[14]
cdef struct sockaddr_in:
sa_family_t sin_family
unsigned short sin_port
in_addr sin_addr
char sa_data[250]
......@@ -7,7 +7,7 @@ import versioneer
os.environ["CC"] = "gcc-5"
os.environ["CXX"] = "g++-5"
ext = Extension(name='prrt', sources=["prrt/*.pyx"])
ext = Extension(name='prrt', language="c", sources=["prrt/*.pyx"])
try:
os.remove(os.path.join(os.path.dirname(os.path.realpath(__file__)), "prrt/prrt.c"))
except OSError as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment