Commit 86f60908 authored by Andreas Schmidt's avatar Andreas Schmidt

Add cleanup method instead of separate thread.

parent c8769948
Pipeline #966 failed with stages
in 1 minute
......@@ -14,7 +14,6 @@ add_library(PRRT ../defines.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
processes/cleaner.c processes/cleaner.h
stores/repairBlockStore.c stores/repairBlockStore.h
stores/packetTimeoutTable.c stores/packetTimeoutTable.h
stores/dataPacketStore.c stores/dataPacketStore.h)
......
#include <unistd.h>
#include <stdatomic.h>
#include "../socket.h"
#include"../../util/dbg.h"
#include "cleaner.h"
void *cleanup(void *ptr)
{
PrrtSocket *socket = ptr;
while(1) {
if(socket->isSender) {
} else {
if(socket->packetTimeoutTable != NULL) {
List *expired_packets = PrrtPacketTimeoutTable_expire_packets(socket->packetTimeoutTable,
PrrtClock_get_prrt_time_us(
&socket->clock));
if(List_count(expired_packets) > 0) {
PrrtPacket *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = first->sequenceNumber - first->index - SEQNO_SPACE/4;
PrrtPacket *last = List_last(expired_packets);
prrtSequenceNumber_t lastSequenceNumberBase = last->sequenceNumber - last->index - 1;
PrrtRepairBlockStore_expire_block_range(socket->repairBlockStore, firstSequenceNumberBase,
lastSequenceNumberBase);
List *list = List_create();
PrrtDataPacketStore_remove_range(socket->dataPacketStore, list, firstSequenceNumberBase, last->sequenceNumber);
List_destroy(list);
while(List_count(expired_packets) > 0) {
PrrtPacket* packet = (PrrtPacket*) List_shift(expired_packets);
PrrtPacket_destroy(packet);
}
}
List_destroy(expired_packets);
}
}
if (!atomic_load_explicit(&socket->closing, memory_order_acquire))
break;
usleep(1000000);
}
return NULL;
}
......@@ -27,26 +27,25 @@ static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base
List_destroy(res);
}
static void decode_block(PrrtSocket *socket, PrrtBlock *block)
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(socket->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&socket->inQueueFilledMutex) == 0, "Lock failed.");
List_push(socket->inQueue, pkt);
check(pthread_cond_broadcast(&socket->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&socket->inQueueFilledMutex) == 0, "Unlock failed.");
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
}
}
PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
PrrtBlock_destroy(block);
PrrtRepairBlockStore_delete(socket->repairBlockStore, block->baseSequenceNumber);
}
return;
......@@ -243,6 +242,8 @@ void *receive_data_loop(void *ptr)
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
PrrtSocket_cleanup(sock_ptr);
}
error:
......
......@@ -12,7 +12,6 @@
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "processes/cleaner.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
#include "processes/feedbackReceiver.h"
......@@ -137,9 +136,6 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char *ipAddress, const uint16_t
"Cannot create data receiving thread.");
}
check(pthread_create(&sock_ptr->cleanupThread, NULL, cleanup, (void *) sock_ptr) == EXIT_SUCCESS,
"Cannot create cleanup thread.");
return true;
error:
PrrtSocket_close(sock_ptr);
......@@ -295,12 +291,6 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr)
sock_ptr->receiveFeedbackThread = 0;
}
if(sock_ptr->cleanupThread != 0) {
check(pthread_join(sock_ptr->cleanupThread, res) == 0, "Join failed.");
sock_ptr->cleanupThread = 0;
}
return EXIT_SUCCESS;
error:
......@@ -423,3 +413,44 @@ bool PrrtSocket_set_coding_parameters(PrrtSocket *sock_ptr, uint8_t k, uint8_t n
PrrtCodingParams_update(sock_ptr->codingParameters, k, n);
return true;
}
bool PrrtSocket_cleanup(PrrtSocket *socket)
{
if(socket->isSender) {
} else {
if(socket->packetTimeoutTable != NULL) {
List *expired_packets = PrrtPacketTimeoutTable_expire_packets(socket->packetTimeoutTable,
PrrtClock_get_prrt_time_us(
&socket->clock));
if(List_count(expired_packets) > 0) {
PrrtPacket *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = (prrtSequenceNumber_t) (first->sequenceNumber -
first->index - SEQNO_SPACE / 2);
PrrtPacket *last = List_last(expired_packets);
prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber -
last->index - 1);
PrrtRepairBlockStore_expire_block_range(socket->repairBlockStore, firstSequenceNumberBase, lastSequenceNumberBase);
List *list = List_create();
PrrtDataPacketStore_remove_range(socket->dataPacketStore, list, firstSequenceNumberBase,
last->sequenceNumber);
while(List_count(list) > 0) {
PrrtPacket* packet = (PrrtPacket*) List_shift(list);
PrrtPacket_destroy(packet);
}
List_destroy(list);
while(List_count(expired_packets) > 0) {
PrrtPacket *packet = (PrrtPacket *) List_shift(expired_packets);
PrrtPacket_destroy(packet);
}
}
List_destroy(expired_packets);
}
}
return true;
}
......@@ -37,8 +37,6 @@ typedef struct prrtSocket {
pthread_cond_t inQueueFilledCv;
List *inQueue;
pthread_t cleanupThread;
PrrtPacketTimeoutTable* packetTimeoutTable;
PrrtDataPacketStore* dataPacketStore;
PrrtRepairBlockStore* repairBlockStore;
......@@ -84,5 +82,6 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time);
bool PrrtSocket_cleanup(PrrtSocket *socket);
#endif // PRRT_SOCKET_H
......@@ -27,9 +27,6 @@ cdef extern from "proto/stores/repairBlockStore.c":
cdef extern from "proto/processes/feedbackReceiver.c":
pass
cdef extern from "proto/processes/cleaner.c":
pass
cdef extern from "proto/processes/dataReceiver.c":
pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment