Commit 9681988e authored by Andreas Schmidt's avatar Andreas Schmidt

Created first prototype of Python bindings.

Conflicts:
	src/prrt/block.h
	src/prrt/packet.c
	src/prrt/processes/data_receiver.c
	src/prrt/processes/data_transmitter.c
	src/prrt/processes/feedback_receiver.c
	src/prrt/socket.c
	src/prrt/socket.h
	src/prrt/stores/forward_packet_table.c
	src/receiver.c
	src/sender.c
	src/util/list.c
	src/util/list.h
parent ba9d6c5b
.idea/
build/
bin/
lib/
*.cmake
prrt.cpython-35m-x86_64-linux-gnu.so
src/cython/prrt.c
CMakeCache.txt
CMakeFiles/
Makefile
\ No newline at end of file
#!/usr/bin/env bash
rm src/cython/prrt.c
python setup.py build_ext --inplace
\ No newline at end of file
import sys
sys.path.insert(0, "./build")
import prrt
import threading
class receiverThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.sock = prrt.PrrtSocket(5000, False)
def run(self):
print("Receiving")
while(1):
print("Recv")
p = self.sock.recv()
print(p)
print("Receiver Thread")
class senderThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.sock = prrt.PrrtSocket(6000, True)
print("CREATED sock")
def run(self):
print("Connecting")
self.sock.connect("localhost", 5000)
print("Connected")
self.sock.send("Test")
print("Sent")
if __name__ == "__main__":
recvThread = receiverThread()
sendThread = senderThread()
recvThread.start()
sendThread.run()
from distutils.core import setup
from Cython.Build import cythonize
setup(
include_dirs=["./src"],
ext_modules = cythonize("src/cython/prrt.pyx")
)
\ No newline at end of file
from libc.stdint cimport uint32_t, uint16_t, uint8_t
cdef extern from "pthread.h" nogil:
ctypedef struct pthread_t:
pass
ctypedef struct pthread_mutex_t:
pass
ctypedef struct pthread_cond_t:
pass
cdef extern from "prrt/stores/forward_packet_table.h":
cdef struct prrtForwardPacketTable:
pass
ctypedef prrtForwardPacketTable PrrtForwardPacketTable
cdef extern from "prrt/vdmcode/block_code.h":
cdef struct prrtCoder:
pass
ctypedef prrtCoder PrrtCoder
cdef extern from "prrt/coding_params.h":
cdef struct prrtCodingParams:
uint8_t k;
uint8_t r;
uint8_t n;
uint8_t n_p;
ctypedef prrtCodingParams PrrtCodingParams
cdef extern from "util/list.h":
cdef struct list:
pass
ctypedef list List
cdef struct listNode:
pass
ctypedef listNode ListNode
void List_push(List *list, const void *value)
void *List_pop(const List *list)
void List_unshift(List *list, const void *value)
void *List_shift(const List *list)
void *List_remove(List *list, const ListNode *node)
cdef extern from "prrt/block.h":
cdef struct prrtBlock:
uint32_t data_count
uint32_t redundancy_count
PrrtCodingParams coding_params
uint32_t largest_data_length
uint16_t base_seqno
List* data_blocks
List* redundancy_blocks
uint8_t is_coded
ctypedef prrtBlock PrrtBlock
cdef extern from "prrt/packet.h":
cdef struct prrtPacket:
uint8_t type_priority;
uint8_t index;
uint16_t seqno;
void* payload;
uint32_t payload_len;
ctypedef prrtPacket PrrtPacket
cdef extern from "prrt/socket.h":
ctypedef struct PrrtReceiver:
const char* host_name
uint16_t port
cdef struct prrtSocket:
int dataSocketFd
int feedbackSocketFd
pthread_t receiveFeedbackThread
pthread_t sendThread
pthread_mutex_t outQueueFilledMutex
pthread_cond_t outQueueFilledCv
List* outQueue
pthread_t receiveDataThread
pthread_mutex_t inQueueFilledMutex
pthread_cond_t inQueueFilledMutexCv
List *inQueue
BPTreeNode* dataStore
BPTreeNode* blockStore
PrrtForwardPacketTable* forwardPacketTable
PrrtReceiver receivers[10]
int receiverLength
uint16_t packetsCount
uint16_t sequenceNumberSource
uint16_t sequenceNumberRepetition
uint16_t sequenceNumberRedundancy
uint16_t sequenceNumberFeedback
ctypedef prrtSocket PrrtSocket
cdef PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t is_sender)
int PrrtSocket_close(const PrrtSocket *sock_ptr)
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len)
void PrrtSocket_recv(const PrrtSocket *sock_ptr, void **buf_ptr, ssize_t *length)
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length)
cdef extern from "util/bptree.h":
ctypedef struct BPTreeNode:
pass
BPTreeNode *BPTree_insert(BPTreeNode *root, int key, void *value);
BPTreeNode *BPTree_delete(BPTreeNode *root, int key);
BPTreeNode *BPTree_destroy(BPTreeNode *root);
void *BPTree_get(BPTreeNode *root, int key);
void BPTree_get_range(BPTreeNode *root, List *list, int key_start, int key_end);
from libc.stdint cimport uint32_t, uint16_t, uint8_t
cimport cprrt
cdef extern from "prrt/stores/forward_packet_table.c":
int PrrtForwardPacketTable_create(cprrt.PrrtForwardPacketTable* fpt_prt);
int PrrtForwardPacketTable_test_set_is_number_relevant(cprrt.PrrtForwardPacketTable *fpt_ptr, uint16_t seqno);
int PrrtForwardPacketTable_test_is_block_relevant(cprrt.PrrtForwardPacketTable * forwardPacketTable, uint16_t start, uint16_t length);
int PrrtForwardPacketTable_destroy(cprrt.PrrtForwardPacketTable* fpt_prt);
cdef extern from "prrt/processes/feedback_receiver.c":
void receive_feedback_loop(void *ptr)
cdef extern from "prrt/processes/data_receiver.c":
void receive_data_loop(void *ptr)
cdef extern from "prrt/processes/data_transmitter.c":
void send_data_loop(void *ptr)
cdef extern from "prrt/block.c":
cprrt.PrrtPacket *PrrtBlock_get_first_data(cprrt.PrrtBlock *block_ptr)
cdef extern from "prrt/vdmcode/block_code.c":
int PrrtCoder_get_coder(cprrt.PrrtCoder **cod, uint8_t n, uint8_t k)
cdef extern from "prrt/coding_params.c":
void PrrtCodingParams_init(cprrt.PrrtCodingParams *cpar)
cdef extern from "prrt/packet.c":
uint8_t PrrtPacket_type(cprrt.PrrtPacket *packet_ptr)
uint8_t PrrtPacket_priority(cprrt.PrrtPacket *packet_ptr)
uint16_t PrrtPacket_size(cprrt.PrrtPacket *packet_ptr)
int PrrtPacket_print(cprrt.PrrtPacket *packet_ptr)
cdef extern from "prrt/socket.c":
cprrt.PrrtSocket* PrrtSocket_create(uint16_t port, uint8_t is_sender)
cdef extern from "util/bptree.c":
cprrt.BPTreeNode *BPTree_insert(cprrt.BPTreeNode *root, int key, void *value)
cprrt.BPTreeNode *BPTree_delete(cprrt.BPTreeNode *root, int key)
cprrt.BPTreeNode *BPTree_destroy(cprrt.BPTreeNode *root)
void *BPTree_get(cprrt.BPTreeNode *root, int key)
void BPTree_get_range(cprrt.BPTreeNode *root, cprrt.List *list, int key_start, int key_end)
cdef extern from "util/list.c":
void List_push(cprrt.List *list, const void *value)
void *List_pop(const cprrt.List *list)
void List_unshift(cprrt.List *list, const void *value)
void *List_shift(const cprrt.List *list)
void *List_remove(cprrt.List *list, const cprrt.ListNode *node)
cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
def __cinit__(self, port, sender):
self._c_socket = cprrt.PrrtSocket_create(port, sender)
def recv(self):
cdef char* buf = NULL
cdef Py_ssize_t length = 0
cprrt.PrrtSocket_recv(self._c_socket, <void**> &buf, &length)
cdef bytes s = buf
return s
def connect(self, host, port):
cprrt.PrrtSocket_connect(self._c_socket, host, port)
#include <string.h>
#include <src/defines.h>
#include <src/prrt/vdmcode/block_code.h>
#include <src/util/list.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "../defines.h"
#include "../util/list.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "packet.h"
#include "block.h"
#include "coding_params.h"
......
#ifndef PRRT_BLOCK_H
#define PRRT_BLOCK_H
#include <src/prrt/coding_params.h>
#include <src/prrt/packet.h>
#include <src/util/list.h>
#include "coding_params.h"
#include "packet.h"
#include "../util/list.h"
typedef struct {
typedef struct prrtBlock {
PrrtCodingParams codingParams;
uint32_t largestDataLength;
uint16_t baseSequenceNumber;
......
#include <src/defines.h>
#include <src/prrt/coding_params.h>
#include "../defines.h"
#include "coding_params.h"
void PrrtCodingParams_init(PrrtCodingParams *cpar) {
pthread_mutex_init(&cpar->lock, NULL);
......
......@@ -4,7 +4,7 @@
#include <pthread.h>
#include <stdint.h>
typedef struct {
typedef struct prrtCodingParams {
pthread_mutex_t lock;
uint8_t k;
......
......@@ -3,8 +3,8 @@
#include <sys/time.h>
#include <string.h>
#include <netinet/in.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include "packet.h"
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet);
......@@ -103,7 +103,7 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original)
return newPacket;
error:
PERROR("Not enough memory for packet copies.");
PERROR("Not enough memory for packet copies.%s", "");
}
PrrtPacket *create_header(uint8_t priority, uint16_t seqno, uint32_t size, uint8_t type, uint8_t index)
......@@ -421,7 +421,7 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
return packet;
error:
PERROR("Could not create packet.");
PERROR("Could not create packet.%s","");
}
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer, uint32_t payloadLength,
......
......@@ -7,7 +7,7 @@
// DESTROY:
#include <stdint.h>
#include <src/prrt/coding_params.h>
#include "coding_params.h"
#define PACKET_TYPE_DATA 0
#define PACKET_TYPE_REPETITION 1
......@@ -27,7 +27,7 @@ typedef struct prrtPacket {
#define SEQNO_SPACE 65536 // 2**16 as seqno is uint16_t
typedef struct {
typedef struct prrtPacketDataPayload {
uint32_t timestamp;
uint16_t group_round_trip_time;
uint16_t packet_timeout;
......@@ -36,14 +36,14 @@ typedef struct {
} PrrtPacketDataPayload;
#define PRRT_PACKET_DATA_HEADER_SIZE sizeof(PrrtPacketDataPayload)
typedef struct {
typedef struct prrtPacketRedundancyPayload {
uint16_t base_seqno;
uint8_t n;
uint8_t k;
} PrrtPacketRedundancyPayload;
#define PRRT_PACKET_REDUNDANCY_HEADER_SIZE sizeof(PrrtPacketRedundancyPayload)
typedef struct {
typedef struct prrtPacketFeedbackPayload {
uint32_t receiver_addr;
uint32_t group_round_trip_time;
uint32_t forward_trip_time;
......
#include <netdb.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <src/defines.h>
#include <src/util/dbg.h>
#include <src/prrt/socket.h>
#include <src/prrt/block.h>
#include <src/util/common.h>
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "../socket.h"
#include "../block.h"
#include "data_receiver.h"
void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, const PrrtBlock *block)
......
......@@ -2,11 +2,11 @@
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <src/defines.h>
#include <src/prrt/socket.h>
#include <src/prrt/block.h>
#include <src/util/dbg.h>
#include <src/util/common.h>
#include "../../defines.h"
#include "../socket.h"
#include "../block.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "data_transmitter.h"
......
#include <string.h>
#include <unistd.h>
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/util/dbg.h>
#include "../../defines.h"
#include "../packet.h"
#include "../socket.h"
#include "feedback_receiver.h"
void *receive_feedback_loop(void *ptr) {
void * receive_feedback_loop(void *ptr) {
char bufin[MAX_PAYLOAD_LENGTH];
PrrtSocket *sock_ptr = ptr;
......@@ -23,4 +22,6 @@ void *receive_feedback_loop(void *ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
return NULL;
}
\ No newline at end of file
#ifndef PRRT_FEEDBACK_RECEIVER_H
#define PRRT_FEEDBACK_RECEIVER_H
void *receive_feedback_loop(void *ptr);
void * receive_feedback_loop(void *ptr);
#endif //PRRT_FEEDBACK_RECEIVER_H
......@@ -4,20 +4,21 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
#include <sys/poll.h>
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "processes/feedback_receiver.h"
#include "processes/data_transmitter.h"
#include "processes/data_receiver.h"
#include "socket.h"
#include "block.h"
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
PrrtSocket * PrrtSocket_create(uint16_t port, uint8_t is_sender) {
PrrtSocket* sock_ptr = calloc(1, sizeof(PrrtSocket));
check_mem(sock_ptr);
sock_ptr->is_sender = is_sender;
sock_ptr->sequenceNumberSource = 1;
sock_ptr->sequenceNumberRedundancy = 1;
......@@ -76,11 +77,11 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
"Cannot create data receiving thread.");
}
return EXIT_SUCCESS;
return sock_ptr;
error:
PrrtSocket_close(sock_ptr);
return EXIT_FAILURE;
return NULL;
}
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
......@@ -93,6 +94,7 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
}
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
check(sock_ptr->is_sender, "Cannot send on receiver socket.")
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
......@@ -102,9 +104,12 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
return 0;
error:
return -1;
}
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
check(sock_ptr->is_sender == FALSE, "Cannot receive on sender socket.")
pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
while (1) {
pthread_mutex_lock(&t);
......@@ -118,10 +123,9 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_cond_wait(&sock_ptr->inQueueFilledCv, &t);
}
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
......@@ -129,6 +133,9 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_unlock(&t);
return len;
}
error:
PERROR("Wrong socket type.", "");
return -1;
}
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
......
#ifndef PRRT_SOCKET_H
#define PRRT_SOCKET_H
#include <src/defines.h>
#include <src/util/list.h>
#include <src/prrt/packet.h>
#include <src/prrt/stores/forward_packet_table.h>
#include <src/util/bptree.h>
#include "../defines.h"
#include "packet.h"
#include "stores/forward_packet_table.h"
#include "../util/list.h"
#include "../util/bptree.h"
typedef struct {
typedef struct prrtReceiver {
const char* host_name;
uint16_t port;
} PrrtReceiver;
typedef struct {
typedef struct prrtSocket {
int dataSocketFd;
int feedbackSocketFd;
pthread_t receiveFeedbackThread;
......@@ -43,10 +43,11 @@ typedef struct {
uint16_t sequenceNumberRepetition;
uint16_t sequenceNumberRedundancy;
uint16_t sequenceNumberFeedback;
uint8_t is_sender;
} PrrtSocket;
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender);
PrrtSocket * PrrtSocket_create(uint16_t port, uint8_t is_sender);
int PrrtSocket_interrupt(PrrtSocket *sock_ptr);
int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
......
#include <stdint.h>
#include <string.h>
#include <src/defines.h>
#include <src/prrt/packet.h>
#include "../../defines.h"
#include "../packet.h"
#include "forward_packet_table.h"
......@@ -39,6 +39,12 @@ void move_start(PrrtForwardPacketTable *fpt_ptr) {
int PrrtForwardPacketTable_create(PrrtForwardPacketTable *fpt_prt) {
fpt_prt->start = 1;
memset(fpt_prt->data, 0, sizeof(fpt_prt->data));
return EXIT_SUCCESS;
}
int PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt) {
free(fpt_prt);
return EXIT_SUCCESS;
}
int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
......
......@@ -77,7 +77,7 @@ struct FecParams {
* This struture contains the members for a coder that can en/decode.
* To change the code you need to free the old coder and allocate a new one.
*/
typedef struct PrrtCoder {
typedef struct prrtCoder {
struct FecParams params;
gf gf_exp[2*GF_SIZE]; /* index->poly form conversion table */
int gf_log[GF_SIZE + 1];/* Poly->index form conversion table */
......
......@@ -2,6 +2,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include "util/dbg.h"
#include "prrt/socket.h"
PrrtSocket sock;
......@@ -24,15 +25,13 @@ int main(int argc, char* const argv[]) {
printf("PRRT - RECEIVER\n");
if(PrrtSocket_create(&sock, port, 0) < 0) {
perror("could not create socket");
return 0;
}
PrrtSocket* sock = PrrtSocket_create(port, FALSE);
check(sock != NULL, "Could not create socket.");
int i = 1;
while(keepRunning) {
unsigned char buffer[MAX_PAYLOAD_LENGTH];
int n = PrrtSocket_recv(&sock, buffer);
int n = PrrtSocket_recv(sock, buffer);
if(n < 0 ) {
continue;
}
......@@ -42,6 +41,12 @@ int main(int argc, char* const argv[]) {
usleep(1);
}
PrrtSocket_close(&sock);
PrrtSocket_close(sock);
pthread_exit(NULL);
return 0;
error:
return -1;
}
......@@ -2,7 +2,7 @@
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <src/util/dbg.h>
#include "util/dbg.h"
#include "prrt/socket.h"
int main(int argc, char *const argv) {
......@@ -12,19 +12,19 @@ int main(int argc, char *const argv) {
}
uint16_t local_port = 6000;
PrrtSocket* sock = calloc(1, sizeof(PrrtSocket));
debug("PRRT - SENDER\n");
PrrtSocket* socket = NULL;
printf("PRRT - SENDER\n");
int res = PrrtSocket_create(sock, local_port, TRUE);
check(res == EXIT_SUCCESS, "Socket creation failed.");
socket = PrrtSocket_create(local_port, TRUE);
check(socket != NULL, "Socket creation failed.");
char *remote_host = "127.0.0.1";
uint16_t remote_port = 5000;
PrrtSocket_connect(sock, remote_host, remote_port);
PrrtSocket_connect(socket, remote_host, remote_port);
char *remote_host2 = "127.0.0.1";
uint16_t remote_port2 = 5004;
PrrtSocket_connect(sock, remote_host2, remote_port2);
PrrtSocket_connect(socket, remote_host2, remote_port2);
debug("SENDING\n");
......@@ -39,21 +39,18 @@ int main(int argc, char *const argv) {
while ((getline(&line, &len, fp)) != -1) {
char buf[MAX_PAYLOAD_LENGTH];
sprintf(buf, "%s", line);
PrrtSocket_send(sock, buf, strlen(buf));
PrrtSocket_send(socket, buf, strlen(buf));
}
fclose(fp);
if (line)
free(line);
usleep(1000 * 1000);
PrrtSocket_close(sock);
free(sock);
debug("COMPLETELY CLOSED\n");
usleep(1000*1000);
PrrtSocket_close(socket);
printf("COMPLETELY CLOSED\n");
return 0;
error:
return EXIT_FAILURE;
return -1;
}
......@@ -43,7 +43,7 @@
#ifndef PRRT_BPTREE_H
#define PRRT_BPTREE_H
#include <src/defines.h>
#include "../defines.h"
#include "list.h"
/* Type representing a node in the B+ tree. This type is general enough to serve for both the leaf and the internal
......
......@@ -87,6 +87,7 @@ void List_unshift(List *list, const void *value)
list->count++;
return;
error:
PERROR("Cannot add necessary list item: %p.", value);
}
......
......@@ -6,15 +6,15 @@
// Taken from: http://c.learncodethehardway.org/book/ex32.html
struct ListNode;
struct listNode;
typedef struct ListNode {
struct ListNode *next;
struct ListNode *prev;
typedef struct listNode {
struct listNode *next;
struct listNode *prev;
void *value;
} ListNode;
typedef struct List {
typedef struct list {
uint32_t count;
ListNode *first;
ListNode *last;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment