Commit 81f0d394 authored by rna's avatar rna

Receive mode that takes target_delay into account.

* sendQueue is a pipe.
* receiveQueue is a B-tree.
* recv_sync mode yields packet that are due.
parent 8a0682a7
Pipeline #1458 passed with stages
in 2 minutes and 7 seconds
......@@ -95,12 +95,12 @@ cdef extern from "proto/socket.h":
pthread_t sendDataThread
pthread_mutex_t outQueueFilledMutex
pthread_cond_t outQueueFilledCv
List* sendDataQueue
Pipe* sendDataQueue
pthread_t receiveDataThread
pthread_mutex_t inQueueFilledMutex
pthread_cond_t inQueueFilledMutexCv
List *receiveDataQueue
PrrtReceiveDataQueue *receiveDataQueue
BPTreeNode* dataStore
......@@ -120,12 +120,18 @@ cdef extern from "proto/socket.h":
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
int32_t PrrtSocket_recv_sync(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t time_window_us) nogil
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
cdef extern from "proto/stores/receiveDataQueue.h":
ctypedef struct PrrtReceiveDataQueue:
pass
cdef extern from "util/bptree.h":
ctypedef struct BPTreeNode:
pass
......@@ -135,3 +141,8 @@ cdef extern from "util/bptree.h":
BPTreeNode *BPTree_destroy(BPTreeNode *root);
void *BPTree_get(BPTreeNode *root, int key);
void BPTree_get_range(BPTreeNode *root, List *list, int key_start, int key_end);
cdef extern from "util/pipe.h":
ctypedef struct Pipe:
pass
\ No newline at end of file
......@@ -10,6 +10,8 @@
#define SEQNO_SPACE UINT16_MAX // 2**16 as seqno is uint16_t
#define TIMESTAMP_SPACE UINT32_MAX
#define GF_BITS 8
#define K_START 4
......
......@@ -17,6 +17,7 @@ add_library(PRRT ../defines.h
stores/repairBlockStore.c stores/repairBlockStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h
types/packetTimeout.c types/packetTimeout.h)
types/packetTimeout.c types/packetTimeout.h
stores/receiveDataQueue.c stores/receiveDataQueue.h)
target_link_libraries(PRRT rt)
......@@ -108,4 +108,6 @@ int PrrtPacket_destroy(PrrtPacket *packet);
#define PrrtPacket_byListNode(lnptr) ((PrrtPacket *) (lnptr))
#define PrrtDataPacket_packet_timeout(packet) ((PrrtPacketDataPayload*) packet->payload)->packetTimeout_us
#endif //PRRT_FRAME_H
......@@ -34,7 +34,7 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
Pipe_push(sock_ptr->receiveDataQueue, &pkt->asListNode);
PrrtReceiveDataQueue_insert(sock_ptr->receiveDataQueue, pkt);
} else {
PrrtPacket_destroy(pkt);
}
......@@ -139,7 +139,7 @@ static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct
XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
Pipe_push(sock_ptr->receiveDataQueue, &packet->asListNode);
PrrtReceiveDataQueue_insert(sock_ptr->receiveDataQueue, packet);
}
return;
......
......@@ -68,7 +68,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
sock_ptr->forwardPacketTable = PrrtForwardPacketTable_create();
sock_ptr->repairBlockStore = PrrtRepairBlockStore_create();
sock_ptr->receiveDataQueue = Pipe_create();
sock_ptr->receiveDataQueue = PrrtReceiveDataQueue_create();
}
return sock_ptr;
......@@ -169,13 +169,13 @@ bool PrrtSocket_closing(PrrtSocket *sock_ptr) {
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while (1) {
ListNode *job;
PrrtPacket *packet;
do {
job = Pipe_pull(sock_ptr->receiveDataQueue);
packet = PrrtReceiveDataQueue_get_packet_sync(sock_ptr->receiveDataQueue, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(sock_ptr))
return -1;
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
} while (!packet);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
......@@ -196,7 +196,31 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
return -1;
}
int32_t PrrtSocket_recv_sync(PrrtSocket *sock_ptr, void *buf_ptr, prrtTimedelta_t time_window_us) {
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while (1) {
PrrtPacket *packet;
do {
prrtTimestamp_t now = PrrtClock_get_current_time_us();
packet = PrrtReceiveDataQueue_get_packet_sync(sock_ptr->receiveDataQueue, now - time_window_us, now + time_window_us);
if (PrrtSocket_closing(sock_ptr))
return -1;
} while (!packet);
prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
return len;
}
error:
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) {
PERROR("Not implemented.%s", "");
return -1;
int clk_id = CLOCK_REALTIME;
struct timespec ts, res;
......@@ -209,13 +233,12 @@ int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t
check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
while (1) {
/* TODO: use the specified timeout */
ListNode *job;
PrrtPacket *packet;
do {
job = Pipe_pull(sock_ptr->receiveDataQueue);
packet = PrrtReceiveDataQueue_get_packet(sock_ptr->receiveDataQueue, 0, MAX_TIMESTAMP);
if (PrrtSocket_closing(sock_ptr))
return -1;
} while (!job);
PrrtPacket *packet = PrrtPacket_byListNode(job);
} while (!packet);
XlapTimeStampClock(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
XlapTimeStampCycle(sock_ptr, ts_data_packet, packet->sequenceNumber, PrrtReceivePackage);
......@@ -240,7 +263,7 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
atomic_store_explicit(&sock_ptr->closing, true, memory_order_release);
if (sock_ptr->receiveDataQueue)
Pipe_wake(sock_ptr->receiveDataQueue);
PrrtReceiveDataQueue_wake(sock_ptr->receiveDataQueue);
if (sock_ptr->sendDataQueue)
Pipe_wake(sock_ptr->sendDataQueue);
......@@ -303,7 +326,7 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
}
if (sock_ptr->receiveDataQueue != NULL) {
Pipe_destroy(sock_ptr->receiveDataQueue);
PrrtReceiveDataQueue_destroy(sock_ptr->receiveDataQueue);
sock_ptr->receiveDataQueue = NULL;
}
......
......@@ -2,7 +2,6 @@
#define PRRT_SOCKET_H
#include <stdatomic.h>
#include "../defines.h"
#include "packet.h"
#include "stores/forwardPacketTable.h"
......@@ -14,6 +13,7 @@
#include "stores/dataPacketStore.h"
#include "stores/packetTimeoutTable.h"
#include "stores/repairBlockStore.h"
#include "stores/receiveDataQueue.h"
#include "clock.h"
#include "../xlap/xlap.h"
......@@ -32,7 +32,8 @@ typedef struct prrtSocket {
Pipe *sendDataQueue;
pthread_t receiveDataThread;
Pipe *receiveDataQueue;
PrrtReceiveDataQueue* receiveDataQueue;
PrrtPacketTimeoutTable *packetTimeoutTable;
PrrtDataPacketStore *dataPacketStore;
......@@ -88,6 +89,8 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
int32_t PrrtSocket_recv_sync(PrrtSocket *sock_ptr, void *buf_ptr, prrtTimedelta_t time_window_us);
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time);
bool PrrtSocket_cleanup(PrrtSocket *socket);
......
#include <pthread.h>
#include "../packet.h"
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "receiveDataQueue.h"
PrrtReceiveDataQueue *PrrtReceiveDataQueue_create() {
PrrtReceiveDataQueue *q = (PrrtReceiveDataQueue *) calloc(1, sizeof(PrrtReceiveDataQueue));
q->tree = NULL;
pthread_mutexattr_t attr;
check(pthread_mutexattr_init(&attr) == EXIT_SUCCESS, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == EXIT_SUCCESS, "Setting type failed.");
check(pthread_mutex_init(&q->lock, &attr) == 0, "Mutex init failed.");
check(pthread_cond_init(&q->wait_for_data, NULL) == EXIT_SUCCESS, "Condition init failed.");
return q;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
PrrtPacket * PrrtReceiveDataQueue_get_packet_sync(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket *packet = NULL;
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
prrtTimedelta_t diff_us = stop - start;
struct timespec deadline;
prrtTimedelta_t diff_s = diff_us / 1000000;
prrtTimedelta_t diff_ns = (diff_us % 1000000) * 1000;
deadline.tv_sec = diff_s + now.tv_sec;
deadline.tv_nsec = diff_ns + now.tv_nsec;
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
if (!packet) {
pthread_cond_timedwait(&q->wait_for_data, &q->lock, &deadline);
packet = PrrtReceiveDataQueue_get_packet(q, start, stop);
}
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
return packet;
error:
PERROR("PrrtReceiveDataQueue_get_packet_sync failed%s.", "");
return NULL;
}
PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop) {
PrrtPacket *packet = NULL;
List *packets = List_create();
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
if (start > stop) {
BPTree_get_range(q->tree, packets, (BPTreeKey_t) start, TIMESTAMP_SPACE-1);
BPTree_get_range(q->tree, packets, 0, (BPTreeKey_t) stop);
} else {
BPTree_get_range(q->tree, packets, (BPTreeKey_t) start, (BPTreeKey_t) stop);
}
if(List_count(packets) > 0) {
packet = List_shift(packets);
q->tree = BPTree_delete(q->tree, PrrtDataPacket_packet_timeout(packet));
}
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
List_destroy(packets);
return packet;
error:
PERROR("Get failed%s.", "");
return NULL;
}
bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet) {
check(pthread_mutex_lock(&q->lock) == EXIT_SUCCESS, "Lock failed.");
q->tree = BPTree_insert(q->tree, PrrtDataPacket_packet_timeout(packet), packet);
pthread_cond_broadcast(&q->wait_for_data);
check(pthread_mutex_unlock(&q->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Insert failed%s.", "");
return false;
}
bool PrrtReceiveDataQueue_destroy(PrrtReceiveDataQueue *q) {
pthread_mutex_destroy(&q->lock);
pthread_cond_destroy(&q->wait_for_data);
free(q);
return true;
}
void PrrtReceiveDataQueue_wake(PrrtReceiveDataQueue *q) {
pthread_mutex_lock(&q->lock);
pthread_cond_broadcast(&q->wait_for_data);
pthread_mutex_unlock(&q->lock);
}
#ifndef PRRT_RECEIVEDATAQUEUE_H
#define PRRT_RECEIVEDATAQUEUE_H
#include <sys/types.h>
#include "../../util/bptree.h"
typedef struct prrtReceiveDataQueue {
pthread_mutex_t lock;
BPTreeNode *tree;
pthread_cond_t wait_for_data;
} PrrtReceiveDataQueue;
PrrtReceiveDataQueue* PrrtReceiveDataQueue_create();
PrrtPacket *PrrtReceiveDataQueue_get_packet(PrrtReceiveDataQueue *q, prrtTimestamp_t start, prrtTimestamp_t stop);
PrrtPacket *PrrtReceiveDataQueue_get_packet_sync(PrrtReceiveDataQueue *queue, prrtTimestamp_t start, prrtTimestamp_t stop);
bool PrrtReceiveDataQueue_insert(PrrtReceiveDataQueue *q, PrrtPacket* packet);
void PrrtReceiveDataQueue_wake(PrrtReceiveDataQueue* q);
bool PrrtReceiveDataQueue_destroy(PrrtReceiveDataQueue* q);
#endif //PRRT_RECEIVEDATAQUEUE_H
......@@ -21,6 +21,9 @@ cdef extern from "proto/stores/packetTimeoutTable.c":
cdef extern from "proto/stores/repairBlockStore.c":
pass
cdef extern from "proto/stores/receiveDataQueue.c":
pass
cdef extern from "proto/types/packetTimeout.c":
pass
......@@ -116,6 +119,14 @@ cdef class PrrtSocket:
len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer)
return buffer[:len]
def recv_sync(self, granularity):
cdef char buffer[65536]
cdef int32_t len
cdef uint32_t time_window_us = granularity * (1000**2)
with nogil:
len = cprrt.PrrtSocket_recv_sync(self._c_socket, <void*> buffer, time_window_us)
return buffer[:len]
def connect(self, host, port):
cdef bytes encodedHost = host.encode("utf-8")
cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment