Commit 390d2240 authored by Andreas Schmidt's avatar Andreas Schmidt

Add cleaner thread and special structures.

* Refactor dataReceiver.
* Add packetTimeoutTable.
* Add repairBlockStore.
parent 44f47d78
......@@ -12,6 +12,6 @@ add_library(PRRT ../defines.h
stores/lossGatherer.c stores/lossGatherer.h
processes/dataReceiver.c processes/dataReceiver.h
processes/feedbackReceiver.c processes/feedbackReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h)
processes/dataTransmitter.c processes/dataTransmitter.h processes/cleaner.c processes/cleaner.h stores/repairBlockStore.c stores/repairBlockStore.h stores/packetTimeoutTable.c stores/packetTimeoutTable.h)
set_property(TARGET PRRT PROPERTY C_STANDARD 99)
\ No newline at end of file
......@@ -5,6 +5,9 @@
#include <stdint.h>
#include <stdbool.h>
#define MAX_TIMESTAMP (prrtTimestamp_t) 4294967295
#define HALF_TIMESTAMP (prrtTimestamp_t) 2147483648
typedef struct prrtClock {
prrtTimestamp_t lastMeasurement;
prrtTimedelta_t meanDeviation;
......
#include <unistd.h>
#include "../socket.h"
#include"../../util/dbg.h"
#include "cleaner.h"
void *cleanup(void *ptr)
{
PrrtSocket *socket = ptr;
while(1) {
if(socket->isSender) {
} else {
if(socket->packetTimeoutTable != NULL) {
List *expired_packets = PrrtPacketTimeoutTable_expire_packets(socket->packetTimeoutTable,
PrrtClock_get_prrt_time_us(
socket->clock));
if(List_count(expired_packets) > 0) {
PrrtPacket *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = first->sequenceNumber - first->index;
PrrtPacket *last = List_last(expired_packets);
prrtSequenceNumber_t lastSequenceNumberBase = last->sequenceNumber - last->index - 1;
// TODO: clean block store
PrrtRepairBlockStore_expire_block_range(socket->repairBlockStore, firstSequenceNumberBase,
lastSequenceNumberBase);
// TODO: clean data store
while(List_count(expired_packets) > 0) {
PrrtPacket* packet = (PrrtPacket*) List_shift(expired_packets);
PrrtPacket_destroy(packet);
}
}
List_destroy(expired_packets);
debug("I CLEAN; Data-Store Size: %d, Block-Store Size: %d", PrrtDataPacketStore_size(socket->dataPacketStore), PrrtRepairBlockStore_size(socket->repairBlockStore));
}
}
usleep(1000000);
}
return NULL;
}
#ifndef PRRT_CLEANER_H
#define PRRT_CLEANER_H
void * cleanup(void *ptr);
#endif //PRRT_CLEANER_H
......@@ -23,18 +23,18 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno,
List_destroy(res);
}
void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, prrtSequenceNumber_t base_seqno)
void decode_block(PrrtSocket *socket, PrrtBlock *block)
{
if(block != NULL && PrrtBlock_decode_ready(block)) {
check(PrrtBlock_decode(block), "Decoding failed");
while(List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, pkt);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
if(PrrtForwardPacketTable_test_set_is_number_relevant(socket->forwardPacketTable, pkt->sequenceNumber)) {
check(pthread_mutex_lock(&socket->inQueueFilledMutex) == 0, "Lock failed.");
List_push(socket->inQueue, pkt);
check(pthread_cond_signal(&socket->inQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&socket->inQueueFilledMutex) == 0, "Unlock failed.");
} else {
PrrtPacket_destroy(pkt);
}
......@@ -42,7 +42,7 @@ void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block, prrtSequenceNumber_t b
PrrtBlock_destroy(block);
sock_ptr->blockStore = BPTree_delete(sock_ptr->blockStore, base_seqno);
PrrtRepairBlockStore_delete(socket->repairBlockStore, block->baseSequenceNumber);
}
return;
......@@ -90,6 +90,95 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
return false;
}
void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet, struct sockaddr_in remote)
{
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t dataTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = dataTimestamp;
PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
PrrtPacket *copiedPacket = PrrtPacket_copy(packet);
PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, copiedPacket);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
if(now > payload->packetTimeout_us) {
// TODO: note this as loss
PrrtPacket_destroy(packet);
}
else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
packet->sequenceNumber) ==
false) {
PrrtPacket_destroy(packet);
} else {
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
if(block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
decode_block(sock_ptr, block);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
}
return;
error:
PERROR("Handling data packet failed%s.", "");
return;
}
void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet)
{
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(socket->forwardPacketTable,
redundancyPayload->baseSequenceNumber,
redundancyPayload->n)) {
PrrtPacket_destroy(packet);
} else {
PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
redundancyPayload->baseSequenceNumber);
if(block == NULL) {
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
PrrtCodingParams_destroy(cpar);
PrrtRepairBlockStore_insert(socket->repairBlockStore, redundancyPayload->baseSequenceNumber,
block);
}
retrieve_data_blocks(socket, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(socket, block);
} else {
PrrtPacket_destroy(packet);
}
}
return;
}
void *receive_data_loop(void *ptr)
{
ssize_t n;
......@@ -109,86 +198,16 @@ void *receive_data_loop(void *ptr)
prrtPacketType_t packetType = PrrtPacket_type(packet);
if(packetType == PACKET_TYPE_DATA) {
PrrtPacketDataPayload *payload = packet->payload;
prrtTimestamp_t dataTimestamp = payload->timestamp;
sock_ptr->lastSentTimestamp = dataTimestamp;
PrrtClock_update(sock_ptr->clock, dataTimestamp, payload->groupRTT_us);
check(send_feedback(sock_ptr, remote), "Sending feedback failed.");
prrtTimestamp_t now = PrrtClock_get_prrt_time_us(sock_ptr->clock);
if(now > payload->packetTimeout_us) {
debug("LOSS due to %u > %u", now, payload->packetTimeout_us);
// TODO: note this as loss
PrrtPacket_destroy(packet);
}
else if(PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable,
packet->sequenceNumber) ==
false) {
PrrtPacket_destroy(packet);
} else {
prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;
PrrtPacket *reference = PrrtPacket_copy(packet);
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, baseSequenceNumber);
if(block != NULL) {
check(PrrtBlock_insert_data_packet(block, reference), "Inserting failed");
decode_block(sock_ptr, block, baseSequenceNumber);
} else {
// Check for duplicate data packet.
if(BPTree_get(sock_ptr->dataStore, packet->sequenceNumber) == NULL) {
sock_ptr->dataStore = BPTree_insert(sock_ptr->dataStore, packet->sequenceNumber, reference);
} else {
PrrtPacket_destroy(reference);
}
}
// forward to application layer
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
List_push(sock_ptr->inQueue, packet);
check(pthread_cond_signal(&sock_ptr->inQueueFilledCv) == 0, "Cond signal failed.");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
}
handle_data_packet(sock_ptr, packet, remote);
} else if(packetType == PACKET_TYPE_REDUNDANCY) {
PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;
if(!PrrtForwardPacketTable_test_is_block_relevant(sock_ptr->forwardPacketTable,
redundancyPayload->baseSequenceNumber,
redundancyPayload->n)) {
PrrtPacket_destroy(packet);
} else {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber);
if(block == NULL) {
// TODO: PROPER CREATION
PrrtCodingParams *cpar = PrrtCodingParams_create();
cpar->k = redundancyPayload->k;
cpar->n = redundancyPayload->n;
block = PrrtBlock_create(cpar, redundancyPayload->baseSequenceNumber);
PrrtCodingParams_destroy(cpar);
sock_ptr->blockStore = BPTree_insert(sock_ptr->blockStore, redundancyPayload->baseSequenceNumber,
block);
}
retrieve_data_blocks(sock_ptr, redundancyPayload->baseSequenceNumber, block->codingParams.k, block);
if(PrrtBlock_insert_redundancy_packet(block, packet)) {
decode_block(sock_ptr, block, redundancyPayload->baseSequenceNumber);
} else {
PrrtPacket_destroy(packet);
}
}
handle_redundancy_packet(sock_ptr, packet);
} else {
PrrtPacket_print(packet);
PrrtPacket_destroy(packet);
}
}
// TODO: occasionally clean up dataStore and blockStore !!!
// TODO: occasionally clean up dataStore !!!
error:
PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE");
......
......@@ -10,11 +10,11 @@
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "processes/feedbackReceiver.h"
#include "processes/cleaner.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
#include "processes/feedbackReceiver.h"
#include "socket.h"
#include "block.h"
#include "receiver.h"
PrrtSocket *PrrtSocket_create(const bool is_sender)
......@@ -36,6 +36,8 @@ PrrtSocket *PrrtSocket_create(const bool is_sender)
sock_ptr->csi = PrrtChannelStateInformation_create();
sock_ptr->applicationConstraints = PrrtApplicationConstraints_create();
sock_ptr->packetTimeoutTable = PrrtPacketTimeoutTable_create();
sock_ptr->dataStore = NULL;
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
......@@ -54,6 +56,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender)
sock_ptr->receivers = List_create();
} else {
sock_ptr->forwardPacketTable = PrrtForwardPacketTable_create();
sock_ptr->repairBlockStore = PrrtRepairBlockStore_create();
check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
......@@ -97,6 +100,9 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t
"Cannot create data receiving thread.");
}
check(pthread_create(&sock_ptr->cleanupThread, NULL, cleanup, (void *) sock_ptr) ==
EXIT_SUCCESS, "Cannot create cleanup thread.");
return true;
error:
PrrtSocket_close(sock_ptr);
......@@ -185,6 +191,14 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
sock_ptr->receiveFeedbackThread = 0;
}
if(sock_ptr->cleanupThread != 0) {
check(pthread_cancel(sock_ptr->cleanupThread) == 0, "Cancel failed.");
check(pthread_join(sock_ptr->cleanupThread, res) == 0, "Join failed.");
sock_ptr->cleanupThread = 0;
}
return EXIT_SUCCESS;
error:
......@@ -214,17 +228,9 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
}
if (sock_ptr->blockStore != NULL) {
List *blockList = List_create();
BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
while (List_count(blockList) > 0) {
PrrtBlock *block = List_shift(blockList);
PrrtBlock_destroy(block);
}
List_destroy(blockList);
sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
if(sock_ptr->repairBlockStore != NULL) {
PrrtRepairBlockStore_destroy(sock_ptr->repairBlockStore);
sock_ptr->repairBlockStore = NULL;
}
if (sock_ptr->receivers != NULL) {
......@@ -255,6 +261,11 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->forwardPacketTable = NULL;
}
if(sock_ptr->packetTimeoutTable != NULL) {
check(PrrtPacketTimeoutTable_destroy(sock_ptr->packetTimeoutTable), "Destroy failed.");
sock_ptr->packetTimeoutTable = NULL;
}
check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
if(sock_ptr->address != NULL) {
......
......@@ -8,6 +8,8 @@
#include "../util/bptree.h"
#include "channelStateInformation.h"
#include "applicationConstraints.h"
#include "stores/packetTimeoutTable.h"
#include "stores/repairBlockStore.h"
#include "clock.h"
......@@ -31,9 +33,14 @@ typedef struct prrtSocket {
pthread_cond_t inQueueFilledCv;
List *inQueue;
pthread_t cleanupThread;
BPTreeNode* dataStore;
BPTreeNode* blockStore;
PrrtPacketTimeoutTable* packetTimeoutTable;
PrrtDataPacketStore* dataPacketStore;
PrrtRepairBlockStore* repairBlockStore;
PrrtForwardPacketTable* forwardPacketTable;
......
#include "../../util/common.h"
#include "../../util/dbg.h"
#include "../clock.h"
#include "packetTimeoutTable.h"
PrrtPacketTimeoutTable *PrrtPacketTimeoutTable_create(void)
{
PrrtPacketTimeoutTable *table = (PrrtPacketTimeoutTable *) calloc(1, sizeof(PrrtPacketTimeoutTable));
check_mem(table);
table->packetList = NULL;
pthread_mutex_init(&table->lock, NULL);
return table;
error:
PERROR("Out of memory%s.","");
return NULL;
}
bool PrrtPacketTimeoutTable_destroy(PrrtPacketTimeoutTable *table)
{
List* list = List_create();
BPTree_get_range(table->packetList, list, 0, SEQNO_SPACE - 1);
while (List_count(list) > 0) {
PrrtPacket *packet = List_shift(list);
PrrtPacket_destroy(packet);
}
List_destroy(list);
if(table->packetList != NULL) {
table->packetList = BPTree_destroy(table->packetList);
}
pthread_mutex_destroy(&table->lock);
free(table);
return true;
}
bool PrrtPacketTimeoutTable_insert(PrrtPacketTimeoutTable *table, PrrtPacket *packet)
{
check(pthread_mutex_lock(&table->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacketDataPayload* payload = packet->payload;
table->packetList = BPTree_insert(table->packetList, payload->packetTimeout_us, packet);
check(pthread_mutex_unlock(&table->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Insert failed%s.","");
return false;
}
List *PrrtPacketTimeoutTable_expire_packets(PrrtPacketTimeoutTable *table, prrtTimestamp_t now)
{
check(pthread_mutex_lock(&table->lock) == EXIT_SUCCESS, "Lock failed.");
List* list = List_create();
if(table->packetList != NULL) {
if(now > HALF_TIMESTAMP) {
BPTree_get_range(table->packetList, list, now - HALF_TIMESTAMP, now);
} else {
BPTree_get_range(table->packetList, list, 0, now);
BPTree_get_range(table->packetList, list, now - HALF_TIMESTAMP, MAX_TIMESTAMP);
}
}
LIST_FOREACH(list, first, next, current) {
PrrtPacket *packet = current->value;
PrrtPacketDataPayload* payload = packet->payload;
table->packetList = BPTree_delete(table->packetList, payload->packetTimeout_us);
}
check(pthread_mutex_unlock(&table->lock) == EXIT_SUCCESS, "Unlock failed.");
return list;
error:
PERROR("Get expired timestamps failed%s.","");
return NULL;
}
PrrtPacket *PrrtPacketTimeoutTable_remove(PrrtPacketTimeoutTable *table, prrtTimestamp_t stamp)
{
check(pthread_mutex_lock(&table->lock) == EXIT_SUCCESS, "Lock failed.");
PrrtPacket *packet = (PrrtPacket *) BPTree_delete(table->packetList, stamp);
check(pthread_mutex_unlock(&table->lock) == EXIT_SUCCESS, "Unlock failed.");
return packet;
error:
PERROR("Get expired timestamps failed%s.","");
return NULL;
}
#ifndef PRRT_PACKETTIMEOUTTABLE_H
#define PRRT_PACKETTIMEOUTTABLE_H
#include <bits/pthreadtypes.h>
#include "../../util/bptree.h"
#include "../packet.h"
typedef struct prrtPacketTimeoutTable {
pthread_mutex_t lock;
BPTreeNode* packetList;
} PrrtPacketTimeoutTable;
PrrtPacketTimeoutTable* PrrtPacketTimeoutTable_create(void);
bool PrrtPacketTimeoutTable_insert(PrrtPacketTimeoutTable* table, PrrtPacket* packet);
List *PrrtPacketTimeoutTable_expire_packets(PrrtPacketTimeoutTable *table, prrtTimestamp_t now);
PrrtPacket *PrrtPacketTimeoutTable_remove(PrrtPacketTimeoutTable *table, prrtTimestamp_t stamp);
bool PrrtPacketTimeoutTable_destroy(PrrtPacketTimeoutTable* table);
#endif //PRRT_PACKETTIMEOUTTABLE_H
#include <pthread.h>
#include "../../util/dbg.h"
#include "../../util/common.h"
#include "repairBlockStore.h"
PrrtRepairBlockStore *PrrtRepairBlockStore_create(void)
{
PrrtRepairBlockStore *store = (PrrtRepairBlockStore *) calloc(1, sizeof(PrrtRepairBlockStore));
check_mem(store);
pthread_mutex_init(&store->lock, NULL);
store->blockTree = NULL;
return store;
error:
PERROR("Out of memory%s.", "");
return NULL;
}
bool PrrtRepairBlockStore_destroy(PrrtRepairBlockStore *store)
{
List *blockList = List_create();
BPTree_get_range(store->blockTree, blockList, 0, SEQNO_SPACE - 1);
while (List_count(blockList) > 0) {
PrrtBlock *block = List_shift(blockList);
PrrtBlock_destroy(block);
}
List_destroy(blockList);
store->blockTree = BPTree_destroy(store->blockTree);
pthread_mutex_destroy(&store->lock);
free(store);
return true;
}
bool PrrtRepairBlockStore_delete(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
store->blockTree = BPTree_delete(store->blockTree, sequenceNumber);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Could not delete%s.","");
return false;
}
PrrtBlock *PrrtRepairBlockStore_get_block(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
void *block = BPTree_get(store->blockTree, sequenceNumber);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return block;
error:
PERROR("Could not get block%s.","");
return NULL;
}
bool PrrtRepairBlockStore_insert(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber, PrrtBlock *block)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
store->blockTree = BPTree_insert(store->blockTree, sequenceNumber, block);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Could not insert%s.","");
return false;
}
uint32_t PrrtRepairBlockStore_size(PrrtRepairBlockStore *store)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
uint32_t size = BPTree_size(store->blockTree);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return size;
error:
PERROR("Could not get size%s.","");
return 0;
}
bool PrrtRepairBlockStore_expire_block_range(PrrtRepairBlockStore *store, prrtSequenceNumber_t start,
prrtSequenceNumber_t stop)
{
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
while(start != stop) {
store->blockTree = BPTree_delete(store->blockTree, start);
debug("EXPIRE BLOCK: %d", start);
start++;
}
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true;
error:
PERROR("Could not expire%s.","");
return false;
}
#ifndef PRRT_REPAIRBLOCKSTORE_H
#define PRRT_REPAIRBLOCKSTORE_H
#include <bits/pthreadtypes.h>
#include "../packet.h"
#include "../block.h"
#include "../../util/bptree.h"
typedef struct prrtRepairBlockStore {
pthread_mutex_t lock;
BPTreeNode *blockTree;
} PrrtRepairBlockStore;
PrrtRepairBlockStore *PrrtRepairBlockStore_create(void);
bool PrrtRepairBlockStore_delete(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber);
PrrtBlock *PrrtRepairBlockStore_get_block(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber);
bool PrrtRepairBlockStore_insert(PrrtRepairBlockStore *store, prrtSequenceNumber_t sequenceNumber, PrrtBlock *block);