Loading prrt/proto/socket.c +7 −1 Original line number Diff line number Diff line Loading @@ -49,6 +49,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt s->interfaceName = NULL; s->isThreadPinning = false; s->isGapFilling = true; s->gapFillingByte = 'A'; PrrtClock_init(&s->clock); Loading Loading @@ -475,7 +477,11 @@ bool PrrtSocket_cleanup(PrrtSocket *s) { prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber - last->index - 1); PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase); prrtSequenceNumber_t purgePackets = PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase); if(s->isGapFilling) { PrrtDeliveryStore_insert_gap_packets(s->packetDeliveryStore, purgePackets, s->gapFillingByte, s->maximum_payload_size); } PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase, lastSequenceNumberBase); Loading prrt/proto/socket.h +2 −0 Original line number Diff line number Diff line Loading @@ -79,6 +79,8 @@ typedef struct prrtSocket { atomic_bool isThreadPinning; prrtByteCount_t maximum_payload_size; bool isGapFilling; uint8_t gapFillingByte; } PrrtSocket; Loading prrt/proto/stores/deliveredPacketTable.c +9 −4 Original line number Diff line number Diff line Loading @@ -88,18 +88,23 @@ bool PrrtDeliveredPacketTable_test_is_block_relevant(PrrtDeliveredPacketTable *t return res; } void PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start) prrtSequenceNumber_t PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start) { pthread_mutex_lock(&fpt_ptr->lock); prrtSequenceNumber_t purged_packets = 0; check(pthread_mutex_lock(&fpt_ptr->lock), "Lock failed."); prrtSequenceNumber_t seqno = fpt_ptr->start; purged_packets = new_start - fpt_ptr->start; while (seqno != new_start) { Bitmap_set(fpt_ptr->bitmap, seqno, true); seqno++; } // TODO: make more efficient by using the Bitmap_set_range function fpt_ptr->start = seqno; pthread_mutex_unlock(&fpt_ptr->lock); return purged_packets; error: PERROR("PrrtDeliveredPacketTable_forward_start failed.") return 0; } prrt/proto/stores/deliveredPacketTable.h +1 −1 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ bool PrrtDeliveredPacketTable_test_is_block_relevant(PrrtDeliveredPacketTable *t prrtSequenceNumber_t start, prrtSequenceNumber_t length); void PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start); prrtSequenceNumber_t PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start); #endif //PRRT_DELIVERED_PACKET_TABLE_H prrt/proto/stores/packetDeliveryStore.c +24 −2 Original line number Diff line number Diff line Loading @@ -130,10 +130,31 @@ bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *pack return true; error: PERROR("Insert failed%s.", ""); PERROR("PrrtPacketDeliveryStore_insert failed%s.", ""); return false; } bool PrrtDeliveryStore_insert_gap_packets(PrrtPacketDeliveryStore *pds, prrtSequenceNumber_t packets, uint8_t gap_filling_byte, prrtByteCount_t gap_size) { check(pthread_mutex_lock(&pds->lock) == EXIT_SUCCESS, "Lock failed."); int i = 0; uint8_t* buffer = calloc(1, gap_size); PrrtPacket* packet = PrrtPacket_create_data_packet(0, buffer, gap_size, ) for (i = 0; i < packets; i++) { } check(pthread_mutex_unlock(&pds->lock) == EXIT_SUCCESS, "Unlock failed."); return true; error: PERROR("PrrtDeliveryStore_insert_gap_packets failed%s.", ""); return false; } bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) { List *packetList = List_create(); BPTree_get_range(q->tree, packetList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1)); Loading Loading @@ -161,3 +182,4 @@ void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q) { pthread_cond_broadcast(&q->wait_for_data); pthread_mutex_unlock(&q->lock); } Loading
prrt/proto/socket.c +7 −1 Original line number Diff line number Diff line Loading @@ -49,6 +49,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt s->interfaceName = NULL; s->isThreadPinning = false; s->isGapFilling = true; s->gapFillingByte = 'A'; PrrtClock_init(&s->clock); Loading Loading @@ -475,7 +477,11 @@ bool PrrtSocket_cleanup(PrrtSocket *s) { prrtSequenceNumber_t lastSequenceNumberBase = (prrtSequenceNumber_t) (last->sequenceNumber - last->index - 1); PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase); prrtSequenceNumber_t purgePackets = PrrtDeliveredPacketTable_forward_start(s->deliveredPacketTable, lastSequenceNumberBase); if(s->isGapFilling) { PrrtDeliveryStore_insert_gap_packets(s->packetDeliveryStore, purgePackets, s->gapFillingByte, s->maximum_payload_size); } PrrtRepairBlockStore_expire_block_range(s->repairBlockStore, firstSequenceNumberBase, lastSequenceNumberBase); Loading
prrt/proto/socket.h +2 −0 Original line number Diff line number Diff line Loading @@ -79,6 +79,8 @@ typedef struct prrtSocket { atomic_bool isThreadPinning; prrtByteCount_t maximum_payload_size; bool isGapFilling; uint8_t gapFillingByte; } PrrtSocket; Loading
prrt/proto/stores/deliveredPacketTable.c +9 −4 Original line number Diff line number Diff line Loading @@ -88,18 +88,23 @@ bool PrrtDeliveredPacketTable_test_is_block_relevant(PrrtDeliveredPacketTable *t return res; } void PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start) prrtSequenceNumber_t PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start) { pthread_mutex_lock(&fpt_ptr->lock); prrtSequenceNumber_t purged_packets = 0; check(pthread_mutex_lock(&fpt_ptr->lock), "Lock failed."); prrtSequenceNumber_t seqno = fpt_ptr->start; purged_packets = new_start - fpt_ptr->start; while (seqno != new_start) { Bitmap_set(fpt_ptr->bitmap, seqno, true); seqno++; } // TODO: make more efficient by using the Bitmap_set_range function fpt_ptr->start = seqno; pthread_mutex_unlock(&fpt_ptr->lock); return purged_packets; error: PERROR("PrrtDeliveredPacketTable_forward_start failed.") return 0; }
prrt/proto/stores/deliveredPacketTable.h +1 −1 Original line number Diff line number Diff line Loading @@ -22,7 +22,7 @@ bool PrrtDeliveredPacketTable_test_is_block_relevant(PrrtDeliveredPacketTable *t prrtSequenceNumber_t start, prrtSequenceNumber_t length); void PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start); prrtSequenceNumber_t PrrtDeliveredPacketTable_forward_start(PrrtDeliveredPacketTable *fpt_ptr, prrtSequenceNumber_t new_start); #endif //PRRT_DELIVERED_PACKET_TABLE_H
prrt/proto/stores/packetDeliveryStore.c +24 −2 Original line number Diff line number Diff line Loading @@ -130,10 +130,31 @@ bool PrrtPacketDeliveryStore_insert(PrrtPacketDeliveryStore *q, PrrtPacket *pack return true; error: PERROR("Insert failed%s.", ""); PERROR("PrrtPacketDeliveryStore_insert failed%s.", ""); return false; } bool PrrtDeliveryStore_insert_gap_packets(PrrtPacketDeliveryStore *pds, prrtSequenceNumber_t packets, uint8_t gap_filling_byte, prrtByteCount_t gap_size) { check(pthread_mutex_lock(&pds->lock) == EXIT_SUCCESS, "Lock failed."); int i = 0; uint8_t* buffer = calloc(1, gap_size); PrrtPacket* packet = PrrtPacket_create_data_packet(0, buffer, gap_size, ) for (i = 0; i < packets; i++) { } check(pthread_mutex_unlock(&pds->lock) == EXIT_SUCCESS, "Unlock failed."); return true; error: PERROR("PrrtDeliveryStore_insert_gap_packets failed%s.", ""); return false; } bool PrrtPacketDeliveryStore_destroy(PrrtPacketDeliveryStore *q) { List *packetList = List_create(); BPTree_get_range(q->tree, packetList, 0, (prrtSequenceNumber_t) (SEQNO_SPACE - 1)); Loading Loading @@ -161,3 +182,4 @@ void PrrtPacketDeliveryStore_interrupt(PrrtPacketDeliveryStore *q) { pthread_cond_broadcast(&q->wait_for_data); pthread_mutex_unlock(&q->lock); }