Commit a49bb228 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Merge branch 'develop' into feature/evaluation

Conflicts:
	src/prrt/block.c
	src/prrt/packet.c
	src/prrt/packet.h
	src/prrt/socket.c
	src/prrt/socket.h
	src/sender.c
parents 38f90a60 72e50c57
.idea/
build/
bin/
lib/
*.cmake
prrt.cpython-35m-x86_64-linux-gnu.so
src/cython/prrt.c
......
before_script:
- which cmake
- which gcc
- which g++
- which valgrind
- cmake .
- make
run_tests:
script:
- ./bin/prrtTests
\ No newline at end of file
......@@ -8,13 +8,10 @@
#define MAX_PAYLOAD_LENGTH 65528 // maximum UDP packet length (2^16 - 8)
#define TRUE 1
#define FALSE 0
#define GF_BITS 8
#define K_START 2
#define N_START 4
#define K_START 4
#define N_START 7
#define N_P_START 1
// Uncomment the line below if you are compiling on Windows.
......@@ -22,10 +19,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#ifdef WINDOWS
#define bool char
#define false 0
#define true 1
#endif
#endif //PRRT_DEFINES_H
......@@ -9,29 +9,31 @@
void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
uint32_t i;
uint32_t m = List_count(block_ptr->dataPackets);
int i;
uint32_t m = 0;
PrrtPacket *packet = NULL;
uint32_t redundancyBlocks = MIN(List_count(block_ptr->redundancyPackets), block_ptr->codingParams.k - m);
uint32_t redundancyBlocks = List_count(block_ptr->redundancyPackets);
for(i = 0; i < redundancyBlocks; i++) {
packet = List_shift(block_ptr->redundancyPackets);
PrrtPacket_copy_payload_to_buffer(fec[m + i], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m + i] = (int) packet->index;
while(idx_p[m] != -1) {
m++;
}
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
}
}
void gather_data_packets(PrrtBlock *block_ptr, gf *const *fec, int *idx_p)
{
uint32_t m = 0;
uint32_t dataBlocks = List_count(block_ptr->dataPackets);
for(m = 0; m < dataBlocks; m++) {
PrrtPacket *packet = List_shift(block_ptr->dataPackets);
PrrtPacket_copy_payload_to_buffer(fec[m], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[m] = packet->index;
PrrtPacket_destroy(packet);
LIST_FOREACH(block_ptr->dataPackets, first, next, current) {
PrrtPacket *packet = current->value;
PrrtPacket_copy_payload_to_buffer(fec[packet->index], packet, PRRT_PACKET_DATA_HEADER_SIZE);
idx_p[packet->index] = packet->index;
}
}
......@@ -71,40 +73,54 @@ int PrrtBlock_create(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t b
return 0;
}
int PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket)
bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket)
{
check(List_count(prrtBlock->dataPackets) < prrtBlock->codingParams.k, "Inserting an unnecessary item.");
bool found = false;
LIST_FOREACH(prrtBlock->dataPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == prrtPacket->seqno) {
found = true;
}
}
if(found == false) {
List_push(prrtBlock->dataPackets, prrtPacket);
check(prrtPacket->payload_len >= PRRT_PACKET_DATA_HEADER_SIZE, "Inserted packet too small.")
prrtBlock->largestDataLength = (uint32_t) MAX(prrtBlock->largestDataLength,
prrtPacket->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
return 0;
error:
PNOTIMPLEMENTED("HANDLING MISSING");
return true;
} else {
return false;
}
}
int PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr)
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *ptr)
{
bool found = false;
LIST_FOREACH(block_ptr->redundancyPackets, first, next, cur) {
PrrtPacket *pkt = cur->value;
if(pkt->seqno == ptr->seqno) {
found = true;
}
}
if(found == false) {
List_push(block_ptr->redundancyPackets, ptr);
check(ptr->payload_len >= PRRT_PACKET_REDUNDANCY_HEADER_SIZE, "Inserted packet too small.")
block_ptr->largestDataLength = (uint32_t) MAX(block_ptr->largestDataLength,
ptr->payload_len - PRRT_PACKET_REDUNDANCY_HEADER_SIZE);
return 0;
error:
PNOTIMPLEMENTED("HANDLING MISSING");
return true;
} else {
return false;
}
}
int PrrtBlock_encode_ready(const PrrtBlock *block_ptr)
bool PrrtBlock_encode_ready(const PrrtBlock *block_ptr)
{
return (List_count(block_ptr->dataPackets) == block_ptr->codingParams.k) ? TRUE : FALSE;
return (List_count(block_ptr->dataPackets) == block_ptr->codingParams.k);
}
int PrrtBlock_decode_ready(const PrrtBlock *block_ptr)
bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr)
{
return (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams.k)
? TRUE : FALSE;
return (List_count(block_ptr->dataPackets) + List_count(block_ptr->redundancyPackets) == block_ptr->codingParams.k);
}
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
......@@ -118,6 +134,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
uint8_t k = block_ptr->codingParams.k;
uint8_t n = block_ptr->codingParams.n;
uint8_t r = block_ptr->codingParams.r;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
......@@ -129,7 +146,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
LIST_FOREACH(block_ptr->dataPackets, first, next, cur) {
src[j] = calloc(length, sizeof(gf));
PrrtPacket *pkt = cur->value;
pkt->index = (uint8_t) j;
pkt->index = (uint8_t) ((pkt->seqno - baseSequenceNumber) % SEQNO_SPACE);
PrrtPacket_copy_payload_to_buffer(src[j], pkt, PRRT_PACKET_DATA_HEADER_SIZE);
j++;
}
......@@ -144,11 +161,11 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
PrrtPacket *red_packet_ptr = PrrtPacket_create_redundancy_packet(0, (void *) fec[j], length, *seqno,
(uint8_t) (k + j), base_seqno,
block_ptr->codingParams);
*seqno = (uint16_t) (*seqno + 1 % SEQNO_SPACE);
*seqno = (uint16_t) ((*seqno + 1) % SEQNO_SPACE);
PrrtBlock_insert_redundancy_packet(block_ptr, red_packet_ptr);
}
block_ptr->isCoded = TRUE;
block_ptr->isCoded = true;
PrrtCoder_destroy(coder);
......@@ -166,36 +183,46 @@ void PrrtBlock_decode(PrrtBlock *block_ptr)
int *idx_p = NULL;
uint8_t n = block_ptr->codingParams.n;
uint8_t k = block_ptr->codingParams.k;
uint16_t baseSequenceNumber = block_ptr->baseSequenceNumber;
uint32_t length = block_ptr->largestDataLength;
PrrtCoder *coder = NULL;
PrrtCoder_get_coder(&coder, n, k);
fec = calloc(n, sizeof(gf *));
fec = calloc(k, sizeof(gf *));
check_mem(fec);
for(i = 0; i < n; i++) {
for(i = 0; i < k; i++) {
fec[i] = calloc(block_ptr->largestDataLength, sizeof(gf));
}
idx_p = calloc(k, sizeof(int));
check_mem(idx_p);
for(i = 0; i < k; i++) {
idx_p[i] = -1;
}
gather_redundancy_packets(block_ptr, fec, idx_p);
gather_data_packets(block_ptr, fec, idx_p);
gather_redundancy_packets(block_ptr, fec, idx_p);
PrrtCoder_decode(coder, fec, idx_p, length);
int decodingRes = PrrtCoder_decode(coder, fec, idx_p, length);
check(decodingRes == EXIT_SUCCESS, "Decoding failed.");
for(j = 0; j < k; j++) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (block_ptr->baseSequenceNumber + j));
PrrtBlock_insert_data_packet(block_ptr, packet);
if(idx_p[j] >= k) {
PrrtPacket *packet = PrrtPacket_create_data_packet(0, fec[j], length, (uint16_t) (baseSequenceNumber + j));
bool insertRes = PrrtBlock_insert_data_packet(block_ptr, packet);
if(insertRes == false) {
debug("Tried to insert unnecessary packet.");
PrrtPacket_destroy(packet);
}
}
}
error:
PrrtCoder_destroy(coder);
clear_list(fec, n);
clear_list(fec, k);
if(fec != NULL) {
free(fec);
}
......
......@@ -11,7 +11,7 @@ typedef struct prrtBlock {
uint16_t baseSequenceNumber;
List*dataPackets;
List*redundancyPackets;
uint8_t isCoded;
bool isCoded;
} PrrtBlock;
......@@ -25,11 +25,11 @@ int PrrtBlock_create(PrrtBlock *mblock, const PrrtCodingParams *cpar, uint16_t b
*/
void PrrtBlock_destroy(PrrtBlock *mblock);
int PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
int PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
bool PrrtBlock_insert_redundancy_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPacket);
int PrrtBlock_encode_ready(const PrrtBlock *block_ptr);
int PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
bool PrrtBlock_encode_ready(const PrrtBlock *block_ptr);
bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr);
......
......@@ -5,6 +5,7 @@
#include <netinet/in.h>
#include "../util/common.h"
#include "../util/dbg.h"
#include <stdbool.h>
#include "packet.h"
void *encode_general_header(void *buf_ptr, const PrrtPacket *packet);
......@@ -125,7 +126,7 @@ PrrtPacket *create_header(uint8_t priority, uint16_t seqno, uint32_t size, uint8
return NULL;
}
int PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
{
void *payload = packet_ptr->payload;
......@@ -145,12 +146,12 @@ int PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr)
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet_ptr, PRRT_PACKET_FEEDBACK_HEADER_SIZE);
} else {
perror("NOT IMPLEMENTED");
return -1;
return false;
}
return 0;
return true;
error:
return -1;
return false;
}
void *encode_redundancy_header(void *buf_ptr, const void *payload)
......@@ -262,7 +263,7 @@ void *encode_general_header(void *buf_ptr, const PrrtPacket *packet)
return buf_ptr;
}
int PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket)
bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket)
{
uint32_t payload_len = (uint32_t) (srcBufferSize - PRRT_PACKET_GENERAL_HEADER_SIZE);
targetPacket->type_priority = *(uint8_t *) srcBuffer;
......@@ -292,10 +293,10 @@ int PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targe
} else {
printf("NOT IMPLEMENTED\n");
}
return 0;
return true;
error:
return EXIT_FAILURE;
return false;
}
void *decode_redundancy_header(void *dstBuffer, const void *srcBuffer)
......
......@@ -8,6 +8,7 @@
#include <stdint.h>
#include "coding_params.h"
#include <stdbool.h>
#define PACKET_TYPE_DATA 0
#define PACKET_TYPE_REPETITION 1
......@@ -73,8 +74,8 @@ PrrtPacket *PrrtPacket_create_feedback_packet(uint8_t priority, uint8_t index, u
PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadPointer, uint32_t payloadLength,
uint16_t sequenceNumber, uint8_t index, uint16_t baseSequenceNumber, PrrtCodingParams codingParams);
int PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket);
int PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr);
bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targetPacket);
bool PrrtPacket_encode(void *buf_ptr, uint16_t buf_size, PrrtPacket *packet_ptr);
int PrrtPacket_destroy(PrrtPacket *packet);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload_len - header_size);
......
......@@ -15,20 +15,16 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr);
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
}
error:
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno)
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, uint16_t base_seqno)
{
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if(block != NULL && PrrtBlock_decode_ready(block)) {
retrieve_data_blocks(sock_ptr, base_seqno, block->codingParams.k, block);
PrrtBlock_decode(block);
while(List_count(block->dataPackets) > 0) {
......@@ -49,7 +45,7 @@ void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno)
}
}
int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
{
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
......@@ -63,23 +59,21 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, MAX_PAYLOAD_LENGTH);
uint32_t length = PrrtPacket_size(feedback_pkt_ptr);
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) == EXIT_SUCCESS,
"Buffer for encoding feedback is too small");
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
check(sendto(sock_ptr->feedbackSocketFd, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) ==
length, "Sending feedback failed.");
PrrtPacket_destroy(feedback_pkt_ptr);
return EXIT_SUCCESS;
return true;
error:
return EXIT_FAILURE;
return false;
}
void *receive_data_loop(void *ptr)
......@@ -94,7 +88,7 @@ void *receive_data_loop(void *ptr)
while(1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
check(send_feedback(sock_ptr, remote) == EXIT_SUCCESS, "Sending feedback failed.");
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
check_mem(packet);
......@@ -102,18 +96,28 @@ void *receive_data_loop(void *ptr)
switch(PrrtPacket_type(packet)) {
case PACKET_TYPE_DATA:
// packet.timestamp + packet.timeout < now: break
// TODO: packet.timestamp + packet.timeout < now: break
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, packet->seqno) ==
FALSE) {
false) {
PrrtPacket_destroy(packet);
} else {
// check incomplete_prrt_blocks for this seqno: insert if found
// else: insert in data_packet_store
uint16_t baseSequenceNumber = packet->seqno - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtPacket* reference = PrrtPacket_copy(packet);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
if(block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->seqno) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->seqno, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
......@@ -121,8 +125,6 @@ void *receive_data_loop(void *ptr)
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
decode_block(sock_ptr, packet->seqno - packet->index);
}
break;
case PACKET_TYPE_REDUNDANCY:
......@@ -148,8 +150,13 @@ void *receive_data_loop(void *ptr)
block);
}
PrrtBlock_insert_redundancy_packet(block, packet);
decode_block(sock_ptr, redundancyPayload->base_seqno);
retrieve_data_blocks(sock_ptr, redundancyPayload->base_seqno, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(sock_ptr, block, redundancyPayload->base_seqno);
} else {
PrrtPacket_destroy(packet);
}
}
break;
default:
......
......@@ -10,13 +10,12 @@
#include "data_transmitter.h"
int send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, sizeof(buf));
uint32_t length = PrrtPacket_size(packet);
int encodeResult = PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet);
check(encodeResult >= 0, "Buffer too small.");
check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, packet), "Buffer too small.");
// SENDING TO ALL RECEIVERS
LIST_FOREACH(sock_ptr->receivers, first, next, cur) {
......@@ -37,11 +36,11 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
PrrtPacket_destroy(packet);
return EXIT_SUCCESS;
return true;
error:
PERROR("Something is wrong%s.", "")
return EXIT_FAILURE;
return false;
}
void * send_data_loop(void *ptr) {
......@@ -60,8 +59,6 @@ void * send_data_loop(void *ptr) {
if(block != NULL) {
PrrtBlock_destroy(block);
}
block = NULL;
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
return NULL;
......@@ -74,10 +71,11 @@ void * send_data_loop(void *ptr) {
block = calloc(1, sizeof(PrrtBlock));
check_mem(block);
PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberRedundancy);
PrrtBlock_create(block, cpar, sock_ptr->sequenceNumberSource);
}
PrrtPacket *packet = List_shift(sock_ptr->outQueue);
packet->seqno = sock_ptr->sequenceNumberSource++;
PrrtBlock_insert_data_packet(block, packet);
......
......@@ -11,7 +11,7 @@ void * receive_feedback_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
pthread_mutex_lock(&sock_ptr->closingMutex);
while (sock_ptr->closing == FALSE) {
while (sock_ptr->closing == false) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
......
......@@ -15,10 +15,12 @@
#include "socket.h"
#include "block.h"
PrrtSocket * PrrtSocket_create(uint16_t port, uint8_t is_sender) {
PrrtSocket* sock_ptr = calloc(1, sizeof(PrrtSocket));
PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
check_mem(sock_ptr);
sock_ptr->is_sender = is_sender;
sock_ptr->sequenceNumberSource = 1;
sock_ptr->sequenceNumberRedundancy = 1;
......@@ -97,7 +99,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
check(sock_ptr->is_sender, "Cannot send on receiver socket.")
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
List_push(sock_ptr->outQueue, packet);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
......@@ -109,7 +111,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
}
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
check(sock_ptr->is_sender == FALSE, "Cannot receive on sender socket.")
check(sock_ptr->is_sender == false, "Cannot receive on sender socket.")
pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
while (1) {
pthread_mutex_lock(&t);
......@@ -127,7 +129,6 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
pthread_mutex_unlock(&t);
......@@ -140,7 +141,7 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
sock_ptr->closing = TRUE;
sock_ptr->closing = true;
pthread_mutex_unlock(&sock_ptr->closingMutex);
void **res = NULL;
......
......@@ -47,7 +47,7 @@ typedef struct prrtSocket {
} PrrtSocket;
PrrtSocket * PrrtSocket_create(uint16_t port, uint8_t is_sender);
PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender);
int PrrtSocket_interrupt(PrrtSocket *sock_ptr);
int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
......
......@@ -5,12 +5,12 @@
#include "forward_packet_table.h"
int check_position(const PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
int is_position_relevant(const PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
uint16_t stop = (uint16_t) (SEQNO_SPACE / 2 + fpt_ptr->start);
if (fpt_ptr->start < stop && !(fpt_ptr->start <= seqno && seqno <= stop)) {
return FALSE;
return false;
} else if (!(seqno <= stop || fpt_ptr->start <= seqno)) {
return FALSE;
return false;
} else {
uint16_t which_byte = (uint16_t) (seqno / 32);