Commit 5cc72c25 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Evalution works technically.

parent f575c60a
Pipeline #53 passed with stage
import sys import sys
sys.path.insert(0, "./build") sys.path.insert(0, "./build")
import prrt import prrt
import time as time
import threading import threading
class receiverThread(threading.Thread): class receiverThread(threading.Thread):
...@@ -12,8 +13,11 @@ class receiverThread(threading.Thread): ...@@ -12,8 +13,11 @@ class receiverThread(threading.Thread):
print("Receiving") print("Receiving")
while(1): while(1):
print("Recv") print("Recv")
p = self.sock.recv() len, data = self.sock.recv()
print(p) v = data[:len].decode('UTF-8')
print(len, v)
time.sleep(1)
print("Receiver Thread") print("Receiver Thread")
...@@ -24,16 +28,17 @@ class senderThread(threading.Thread): ...@@ -24,16 +28,17 @@ class senderThread(threading.Thread):
def run(self): def run(self):
print("Connecting") print("Connecting")
self.sock.connect("localhost", 5000) self.sock.connect("127.0.0.1", 5000)
print("Connected") print("Connected")
for i in range(10):
self.sock.send("Test") self.sock.send("Test")
print("Sent") time.sleep(0.001)
self.sock.close();
if __name__ == "__main__": if __name__ == "__main__":
recvThread = receiverThread() recvThread = receiverThread()
recvThread.daemon = True
sendThread = senderThread() sendThread = senderThread()
sendThread.daemon = True
recvThread.start() recvThread.start()
sendThread.start() sendThread.start()
from distutils.core import setup from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize from Cython.Build import cythonize
setup( setup(
include_dirs=["./src"], include_dirs=["./src"],
ext_modules = cythonize("src/cython/prrt.pyx") ext_modules = cythonize(["src/cython/prrt.pyx"], gdb_debug=True)
) )
\ No newline at end of file
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
from libc.string cimport const_char
cdef extern from "pthread.h" nogil: cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t: ctypedef struct pthread_t:
...@@ -10,19 +11,19 @@ cdef extern from "pthread.h" nogil: ...@@ -10,19 +11,19 @@ cdef extern from "pthread.h" nogil:
ctypedef struct pthread_cond_t: ctypedef struct pthread_cond_t:
pass pass
cdef extern from "prrt/stores/forward_packet_table.h" nogil: cdef extern from "prrt/stores/forward_packet_table.h":
cdef struct prrtForwardPacketTable: cdef struct prrtForwardPacketTable:
pass pass
ctypedef prrtForwardPacketTable PrrtForwardPacketTable ctypedef prrtForwardPacketTable PrrtForwardPacketTable
cdef extern from "prrt/vdmcode/block_code.h" nogil: cdef extern from "prrt/vdmcode/block_code.h":
cdef struct prrtCoder: cdef struct prrtCoder:
pass pass
ctypedef prrtCoder PrrtCoder ctypedef prrtCoder PrrtCoder
cdef extern from "prrt/coding_params.h" nogil: cdef extern from "prrt/coding_params.h":
cdef struct prrtCodingParams: cdef struct prrtCodingParams:
uint8_t k; uint8_t k;
uint8_t r; uint8_t r;
...@@ -31,7 +32,7 @@ cdef extern from "prrt/coding_params.h" nogil: ...@@ -31,7 +32,7 @@ cdef extern from "prrt/coding_params.h" nogil:
ctypedef prrtCodingParams PrrtCodingParams ctypedef prrtCodingParams PrrtCodingParams
cdef extern from "util/list.h" nogil: cdef extern from "util/list.h":
cdef struct list: cdef struct list:
pass pass
...@@ -50,7 +51,7 @@ cdef extern from "util/list.h" nogil: ...@@ -50,7 +51,7 @@ cdef extern from "util/list.h" nogil:
void *List_remove(List *list, const ListNode *node) void *List_remove(List *list, const ListNode *node)
cdef extern from "prrt/block.h" nogil: cdef extern from "prrt/block.h":
cdef struct prrtBlock: cdef struct prrtBlock:
uint32_t data_count uint32_t data_count
uint32_t redundancy_count uint32_t redundancy_count
...@@ -63,7 +64,7 @@ cdef extern from "prrt/block.h" nogil: ...@@ -63,7 +64,7 @@ cdef extern from "prrt/block.h" nogil:
ctypedef prrtBlock PrrtBlock ctypedef prrtBlock PrrtBlock
cdef extern from "prrt/packet.h" nogil: cdef extern from "prrt/packet.h":
cdef struct prrtPacket: cdef struct prrtPacket:
uint8_t type_priority; uint8_t type_priority;
uint8_t index; uint8_t index;
...@@ -73,7 +74,7 @@ cdef extern from "prrt/packet.h" nogil: ...@@ -73,7 +74,7 @@ cdef extern from "prrt/packet.h" nogil:
ctypedef prrtPacket PrrtPacket ctypedef prrtPacket PrrtPacket
cdef extern from "prrt/socket.h" nogil: cdef extern from "prrt/socket.h":
ctypedef struct PrrtReceiver: ctypedef struct PrrtReceiver:
const char* host_name const char* host_name
uint16_t port uint16_t port
...@@ -109,14 +110,14 @@ cdef extern from "prrt/socket.h" nogil: ...@@ -109,14 +110,14 @@ cdef extern from "prrt/socket.h" nogil:
ctypedef prrtSocket PrrtSocket ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t is_sender) nogil cdef PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t is_sender)
int PrrtSocket_close(const PrrtSocket *sock_ptr) nogil int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) nogil int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) nogil int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) nogil PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length)
cdef extern from "util/bptree.h" nogil: cdef extern from "util/bptree.h":
ctypedef struct BPTreeNode: ctypedef struct BPTreeNode:
pass pass
......
from libc.stdint cimport uint32_t, uint16_t, uint8_t from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
cimport cprrt cimport cprrt
cdef extern from "prrt/stores/forward_packet_table.c": cdef extern from "prrt/stores/forward_packet_table.c":
...@@ -60,13 +60,19 @@ cdef class PrrtSocket: ...@@ -60,13 +60,19 @@ cdef class PrrtSocket:
self.isSender = isSender self.isSender = isSender
def recv(self): def recv(self):
cdef char buf[65536] cdef char buffer[65536]
cdef bytes s cdef int32_t len
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buf) with nogil:
s = buf len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer)
s[len] = '\0' return len, buffer
return s
def connect(self, host, port): def connect(self, host, port):
print "Connecting to", host, "on port", str(port) cdef bytes encodedHost = host.encode("utf-8")
cprrt.PrrtSocket_connect(self._c_socket, host, port) cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
def send(self, data):
cdef bytes encodedData = data.encode("utf-8")
cprrt.PrrtSocket_send(self._c_socket, encodedData, len(data))
def close(self):
cprrt.PrrtSocket_close(self._c_socket)
\ No newline at end of file
...@@ -20,6 +20,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -20,6 +20,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
// SENDING TO ALL RECEIVERS // SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) { LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
PrrtReceiver* recv = cur->value; PrrtReceiver* recv = cur->value;
struct hostent *hp; struct hostent *hp;
struct sockaddr_in targetaddr; struct sockaddr_in targetaddr;
...@@ -28,6 +29,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -28,6 +29,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
targetaddr.sin_port = htons(recv->port); targetaddr.sin_port = htons(recv->port);
hp = gethostbyname(recv->host_name); hp = gethostbyname(recv->host_name);
check(hp != NULL, "Could not resolve host '%s'.", recv->host_name)
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length); memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
ssize_t sendtoRes = sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)); ssize_t sendtoRes = sendto(sock_ptr->dataSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr));
......
...@@ -88,14 +88,16 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) { ...@@ -88,14 +88,16 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) { int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver)); PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
recv->host_name = host; uint32_t hostLength = (uint32_t) strlen(host);
recv->host_name = calloc(1, hostLength + 1);
memcpy((void *) recv->host_name, host, hostLength);
recv->port = port; recv->port = port;
List_push(sock_ptr->receivers, recv); List_push(sock_ptr->receivers, recv);
return 0; return 0;
} }
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) { int PrrtSocket_send(PrrtSocket *sock_ptr, uint8_t *data, const size_t data_len) {
check(sock_ptr->is_sender, "Cannot send on receiver socket.") check(sock_ptr->is_sender, "Cannot send on receiver socket.")
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex); pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
...@@ -112,18 +114,19 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le ...@@ -112,18 +114,19 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) { int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
check(sock_ptr->is_sender == false, "Cannot receive on sender socket.") check(sock_ptr->is_sender == false, "Cannot receive on sender socket.")
pthread_mutex_t t = sock_ptr->inQueueFilledMutex; pthread_mutex_t filledMutex = sock_ptr->inQueueFilledMutex;
pthread_mutex_t closingMutex = sock_ptr->closingMutex;
while (1) { while (1) {
pthread_mutex_lock(&t); pthread_mutex_lock(&filledMutex);
while (List_count(sock_ptr->inQueue) == 0) { while (List_count(sock_ptr->inQueue) == 0) {
pthread_mutex_lock(&sock_ptr->closingMutex); pthread_mutex_lock(&closingMutex);
if (sock_ptr->closing) { if (sock_ptr->closing) {
pthread_mutex_unlock(&sock_ptr->closingMutex); pthread_mutex_unlock(&closingMutex);
pthread_mutex_unlock(&t); pthread_mutex_unlock(&filledMutex);
return -1; return -1;
} }
pthread_mutex_unlock(&sock_ptr->closingMutex); pthread_mutex_unlock(&closingMutex);
pthread_cond_wait(&sock_ptr->inQueueFilledCv, &t); pthread_cond_wait(&sock_ptr->inQueueFilledCv, &filledMutex);
} }
PrrtPacket *packet = List_shift(sock_ptr->inQueue); PrrtPacket *packet = List_shift(sock_ptr->inQueue);
...@@ -131,7 +134,7 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) { ...@@ -131,7 +134,7 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet); PrrtPacket_destroy(packet);
pthread_mutex_unlock(&t); pthread_mutex_unlock(&filledMutex);
return len; return len;
} }
error: error:
......
...@@ -51,7 +51,7 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender); ...@@ -51,7 +51,7 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender);
int PrrtSocket_interrupt(PrrtSocket *sock_ptr); int PrrtSocket_interrupt(PrrtSocket *sock_ptr);
int PrrtSocket_close(PrrtSocket *sock_ptr); int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len); int PrrtSocket_send(PrrtSocket *sock_ptr, uint8_t *data, const size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr); int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length); PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment