Commit fe116471 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Cleaning up data store and block store. Proper locking in dataPacketStore.

parent 744f9201
Pipeline #106 failed with stage
...@@ -16,16 +16,17 @@ void *cleanup(void *ptr) ...@@ -16,16 +16,17 @@ void *cleanup(void *ptr)
socket->clock)); socket->clock));
if(List_count(expired_packets) > 0) { if(List_count(expired_packets) > 0) {
PrrtPacket *first = List_first(expired_packets); PrrtPacket *first = List_first(expired_packets);
prrtSequenceNumber_t firstSequenceNumberBase = first->sequenceNumber - first->index; prrtSequenceNumber_t firstSequenceNumberBase = first->sequenceNumber - first->index - SEQNO_SPACE/4;
PrrtPacket *last = List_last(expired_packets); PrrtPacket *last = List_last(expired_packets);
prrtSequenceNumber_t lastSequenceNumberBase = last->sequenceNumber - last->index - 1; prrtSequenceNumber_t lastSequenceNumberBase = last->sequenceNumber - last->index - 1;
// TODO: clean block store
PrrtRepairBlockStore_expire_block_range(socket->repairBlockStore, firstSequenceNumberBase, PrrtRepairBlockStore_expire_block_range(socket->repairBlockStore, firstSequenceNumberBase,
lastSequenceNumberBase); lastSequenceNumberBase);
// TODO: clean data store List *list = List_create();
PrrtDataPacketStore_remove_range(socket->dataPacketStore, list, firstSequenceNumberBase, last->sequenceNumber);
List_destroy(list);
while(List_count(expired_packets) > 0) { while(List_count(expired_packets) > 0) {
PrrtPacket* packet = (PrrtPacket*) List_shift(expired_packets); PrrtPacket* packet = (PrrtPacket*) List_shift(expired_packets);
......
...@@ -13,14 +13,17 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, ...@@ -13,14 +13,17 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno,
{ {
List *res = List_create(); List *res = List_create();
PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno, (prrtSequenceNumber_t) (base_seqno + k - 1)); PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
(prrtSequenceNumber_t) (base_seqno + k - 1));
LIST_FOREACH(res, first, next, cur) { LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value; PrrtPacket *packetPtr = cur->value;
check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!") check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr), "Insert failed!")
} }
List_destroy(res);
return;
error: error:
PERROR("Insert failed.");
List_destroy(res); List_destroy(res);
} }
......
...@@ -59,19 +59,34 @@ bool PrrtDataPacketStore_destroy(PrrtDataPacketStore *store) ...@@ -59,19 +59,34 @@ bool PrrtDataPacketStore_destroy(PrrtDataPacketStore *store)
void PrrtDataPacketStore_remove_range(PrrtDataPacketStore *store, List *res, prrtSequenceNumber_t start, prrtSequenceNumber_t stop) void PrrtDataPacketStore_remove_range(PrrtDataPacketStore *store, List *res, prrtSequenceNumber_t start, prrtSequenceNumber_t stop)
{ {
// TODO: handle if start > stop check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
BPTree_get_range(store->dataStore, res, start, stop);
if(start > stop) {
BPTree_get_range(store->dataStore, res, start, SEQNO_SPACE-1);
BPTree_get_range(store->dataStore, res, 0, stop);
} else {
BPTree_get_range(store->dataStore, res, start, stop);
}
LIST_FOREACH(res, first, next, current) { LIST_FOREACH(res, first, next, current) {
PrrtPacket *packet = current->value; PrrtPacket *packet = current->value;
store->dataStore = BPTree_delete(store->dataStore, packet->sequenceNumber); store->dataStore = BPTree_delete(store->dataStore, packet->sequenceNumber);
} }
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return;
error:
PERROR("Failed to remove range%s.","");
} }
bool PrrtDataStore_insert(PrrtDataPacketStore *store, PrrtPacket *packet) bool PrrtDataStore_insert(PrrtDataPacketStore *store, PrrtPacket *packet)
{ {
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
if(BPTree_get(store->dataStore, packet->sequenceNumber) == NULL) { if(BPTree_get(store->dataStore, packet->sequenceNumber) == NULL) {
store->dataStore = BPTree_insert(store->dataStore, packet->sequenceNumber, packet); store->dataStore = BPTree_insert(store->dataStore, packet->sequenceNumber, packet);
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true; return true;
} }
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
error:
return false; return false;
} }
\ No newline at end of file
...@@ -88,10 +88,15 @@ bool PrrtRepairBlockStore_expire_block_range(PrrtRepairBlockStore *store, prrtSe ...@@ -88,10 +88,15 @@ bool PrrtRepairBlockStore_expire_block_range(PrrtRepairBlockStore *store, prrtSe
{ {
check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed."); check(pthread_mutex_lock(&store->lock) == EXIT_SUCCESS, "Lock failed.");
while(start != stop) { while(start != stop) {
store->blockTree = BPTree_delete(store->blockTree, start); PrrtBlock* block = BPTree_get(store->blockTree, start);
if(block != NULL) {
PrrtBlock_destroy(block);
store->blockTree = BPTree_delete(store->blockTree, start);
}
start++; start++;
} }
check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed."); check(pthread_mutex_unlock(&store->lock) == EXIT_SUCCESS, "Unlock failed.");
return true; return true;
error: error:
PERROR("Could not expire%s.",""); PERROR("Could not expire%s.","");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment