Loading prrt/cprrt.pxd +3 −3 Original line number Diff line number Diff line Loading @@ -21,13 +21,13 @@ cdef extern from "proto/vdmcode/block_code.h": ctypedef prrtCoder PrrtCoder cdef extern from "proto/channelStateInformation.h": cdef extern from "proto/types/channelStateInformation.h": cdef struct prrtChannelStateInformation: pass ctypedef prrtChannelStateInformation PrrtChannelStateInformation cdef extern from "proto/codingParams.h": cdef extern from "proto/types/codingParams.h": ctypedef struct prrtCodingConfiguration: uint8_t k uint8_t r Loading Loading @@ -59,7 +59,7 @@ cdef extern from "util/list.h": void *List_remove(List *list, const ListNode *node) cdef extern from "proto/block.h": cdef extern from "proto/types/block.h": cdef struct prrtBlock: uint32_t data_count uint32_t redundancy_count Loading prrt/proto/CMakeLists.txt +4 −4 Original line number Diff line number Diff line set (PRRT_SOURCES ../defines.h block.c block.h channelStateInformation.c channelStateInformation.h types/block.c types/block.h types/channelStateInformation.c types/channelStateInformation.h clock.c clock.h codingParams.c codingParams.h types/codingParams.c types/codingParams.h receiver.c receiver.h socket.c socket.h applicationConstraints.c applicationConstraints.h types/applicationConstraints.c types/applicationConstraints.h processes/dataReceiver.c processes/dataReceiver.h processes/dataTransmitter.c processes/dataTransmitter.h stores/dataPacketStore.c stores/dataPacketStore.h Loading prrt/proto/processes/dataReceiver.c +34 −26 Original line number Diff line number Diff line Loading @@ -5,13 +5,15 @@ #include "../../util/dbg.h" #include "../../util/common.h" #include "../types/lossStatistics.h" #include "../block.h" #include "../types/block.h" #include "../clock.h" #include "../socket.h" #include "dataReceiver.h" static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) { static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) { List *res = List_create(); PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno, Loading @@ -34,7 +36,8 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { while (List_count(block->dataPackets) > 0) { PrrtPacket *pkt = List_shift(block->dataPackets); if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) { if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) { PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt); } else { PrrtPacket_destroy(pkt); Loading @@ -50,8 +53,13 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { PERROR("Decoding failed.%s", "") } static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp, prrtTimestamp_t sentTimestamp, prrtPacketType_t type) { static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp, prrtTimestamp_t sentTimestamp, prrtPacketType_t type) { enum XlapTimestampPacketKind kind = ts_data_packet; if (type == PACKET_TYPE_DATA) { kind = ts_data_packet; Loading Loading @@ -90,7 +98,8 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT, stats.gapLength, stats.gapCount, stats.burstLength, stats.burstCount, forwardTripTime, stats.erasureCount, stats.packetCount, feedback.seqNo, stats.erasureCount, stats.packetCount, feedback.seqNo, feedback.type); prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr); void *buf = calloc(1, length); Loading Loading @@ -118,8 +127,7 @@ static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) { return false; } static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { PrrtPacketDataPayload *payload = packet->payload; prrtTimestamp_t sentTimestamp = payload->timestamp; sock_ptr->lastSentTimestamp = sentTimestamp; Loading @@ -137,9 +145,8 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { PrrtPacket_destroy(packet); debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now, (unsigned long) payload->packetTimeout_us); } else if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber) == false) { } else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber)) { PrrtPacket_destroy(packet); } else { prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index; Loading Loading @@ -168,7 +175,6 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { error: PERROR("Handling data packet failed%s.", ""); return; } static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { Loading @@ -187,7 +193,8 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k, redundancyPayload->n, 0, NULL); block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber); block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber); PrrtRepairBlockStore_insert(socket->repairBlockStore, block); } Loading @@ -200,7 +207,6 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { PrrtPacket_destroy(packet); } } return; } void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) { Loading @@ -209,20 +215,24 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload; prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us; bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime); bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime); debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample "); if (valid_sample) { PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->delivery_rate); } PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited); PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited "); PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop "); PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount); PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr "); return; Loading @@ -231,7 +241,8 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt } void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size, struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr, struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr, uint64_t *packet_cyclestamp_ptr) { if (socket_ptr->isHardwareTimestamping) { struct cmsghdr *cmsg; Loading Loading @@ -338,10 +349,8 @@ void *receive_data_loop(void *ptr) { if (packetType == PACKET_TYPE_DATA) { handle_data_packet(s, packet); } else if (packetType == PACKET_TYPE_REDUNDANCY) { handle_redundancy_packet(s, packet); } else { goto error; handle_redundancy_packet(s, packet); } send_feedback(s, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(s, kind, seqno, HandlePacketEnd); Loading @@ -361,7 +370,6 @@ void *receive_data_loop(void *ptr) { return NULL; error: PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE"); PERROR("receive_data_loop() ended unexpectedly."); PrrtSocket_cleanup(s); return NULL; } prrt/proto/processes/dataTransmitter.c +1 −1 Original line number Diff line number Diff line Loading @@ -2,9 +2,9 @@ #include <netdb.h> #include <string.h> #include "../../defines.h" #include "../block.h" #include "../receiver.h" #include "../socket.h" #include "../types/block.h" #include "../../util/dbg.h" #include "../../util/common.h" #include "dataTransmitter.h" Loading prrt/proto/receiver.h +1 −1 Original line number Diff line number Diff line Loading @@ -6,8 +6,8 @@ #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include "channelStateInformation.h" #include "stores/inFlightPacketStore.h" #include "types/channelStateInformation.h" typedef struct prrtRateSample { prrtByteCount_t prior_delivered; Loading Loading
prrt/cprrt.pxd +3 −3 Original line number Diff line number Diff line Loading @@ -21,13 +21,13 @@ cdef extern from "proto/vdmcode/block_code.h": ctypedef prrtCoder PrrtCoder cdef extern from "proto/channelStateInformation.h": cdef extern from "proto/types/channelStateInformation.h": cdef struct prrtChannelStateInformation: pass ctypedef prrtChannelStateInformation PrrtChannelStateInformation cdef extern from "proto/codingParams.h": cdef extern from "proto/types/codingParams.h": ctypedef struct prrtCodingConfiguration: uint8_t k uint8_t r Loading Loading @@ -59,7 +59,7 @@ cdef extern from "util/list.h": void *List_remove(List *list, const ListNode *node) cdef extern from "proto/block.h": cdef extern from "proto/types/block.h": cdef struct prrtBlock: uint32_t data_count uint32_t redundancy_count Loading
prrt/proto/CMakeLists.txt +4 −4 Original line number Diff line number Diff line set (PRRT_SOURCES ../defines.h block.c block.h channelStateInformation.c channelStateInformation.h types/block.c types/block.h types/channelStateInformation.c types/channelStateInformation.h clock.c clock.h codingParams.c codingParams.h types/codingParams.c types/codingParams.h receiver.c receiver.h socket.c socket.h applicationConstraints.c applicationConstraints.h types/applicationConstraints.c types/applicationConstraints.h processes/dataReceiver.c processes/dataReceiver.h processes/dataTransmitter.c processes/dataTransmitter.h stores/dataPacketStore.c stores/dataPacketStore.h Loading
prrt/proto/processes/dataReceiver.c +34 −26 Original line number Diff line number Diff line Loading @@ -5,13 +5,15 @@ #include "../../util/dbg.h" #include "../../util/common.h" #include "../types/lossStatistics.h" #include "../block.h" #include "../types/block.h" #include "../clock.h" #include "../socket.h" #include "dataReceiver.h" static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) { static void retrieve_data_blocks(PrrtSocket *sock_ptr, prrtSequenceNumber_t base_seqno, uint8_t k, const PrrtBlock *block) { List *res = List_create(); PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno, Loading @@ -34,7 +36,8 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { while (List_count(block->dataPackets) > 0) { PrrtPacket *pkt = List_shift(block->dataPackets); if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) { if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, pkt->sequenceNumber)) { PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt); } else { PrrtPacket_destroy(pkt); Loading @@ -50,8 +53,13 @@ static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) { PERROR("Decoding failed.%s", "") } static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp, prrtTimestamp_t sentTimestamp, prrtPacketType_t type) { static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtSequenceNumber_t seqno, prrtTimestamp_t receiveStamp, prrtTimestamp_t sentTimestamp, prrtPacketType_t type) { enum XlapTimestampPacketKind kind = ts_data_packet; if (type == PACKET_TYPE_DATA) { kind = ts_data_packet; Loading Loading @@ -90,7 +98,8 @@ static bool send_feedback(PrrtSocket *sock_ptr, struct sockaddr_in remote, prrtS PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT, stats.gapLength, stats.gapCount, stats.burstLength, stats.burstCount, forwardTripTime, stats.erasureCount, stats.packetCount, feedback.seqNo, stats.erasureCount, stats.packetCount, feedback.seqNo, feedback.type); prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr); void *buf = calloc(1, length); Loading Loading @@ -118,8 +127,7 @@ static bool is_timeout(prrtTimestamp_t now, prrtTimestamp_t to) { return false; } static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { PrrtPacketDataPayload *payload = packet->payload; prrtTimestamp_t sentTimestamp = payload->timestamp; sock_ptr->lastSentTimestamp = sentTimestamp; Loading @@ -137,9 +145,8 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { PrrtPacket_destroy(packet); debug(DEBUG_RECEIVER, "timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now, (unsigned long) payload->packetTimeout_us); } else if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber) == false) { } else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber)) { PrrtPacket_destroy(packet); } else { prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index; Loading Loading @@ -168,7 +175,6 @@ handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { error: PERROR("Handling data packet failed%s.", ""); return; } static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { Loading @@ -187,7 +193,8 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k, redundancyPayload->n, 0, NULL); block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber); block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams), redundancyPayload->baseSequenceNumber); PrrtRepairBlockStore_insert(socket->repairBlockStore, block); } Loading @@ -200,7 +207,6 @@ static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) { PrrtPacket_destroy(packet); } } return; } void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrtTimestamp_t receiveTime) { Loading @@ -209,20 +215,24 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) prrtPacket->payload; prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us; bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime); bool valid_sample = PrrtReceiver_updateAndGenerateRateSample(prrtSocket->receiver, feedbackPayload->ackSequenceNumber, feedbackPayload->ackPacketType, receiveTime); debug(DEBUG_DATARECEIVER, "PrrtReceiver_updateAndGenerateRateSample "); if (valid_sample) { PrrtChannelStateInformation_update_delivery_rate(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->delivery_rate); } PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited); PrrtChannelStateInformation_update_app_limited(prrtSocket->receiver->csi, prrtSocket->receiver->rateSample->is_app_limited); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_app_limited "); PrrtChannelStateInformation_update_rtprop(prrtSocket->receiver->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_rtprop "); PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount); PrrtChannelStateInformation_update_plr(prrtSocket->receiver->csi, feedbackPayload->erasureCount, feedbackPayload->packetCount); debug(DEBUG_DATARECEIVER, "PrrtChannelStateInformation_update_plr "); return; Loading @@ -231,7 +241,8 @@ void handle_feedback_packet(PrrtSocket *prrtSocket, PrrtPacket *prrtPacket, prrt } void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size, struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr, struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr, struct timespec *packet_timestamp_ptr, uint64_t *packet_cyclestamp_ptr) { if (socket_ptr->isHardwareTimestamping) { struct cmsghdr *cmsg; Loading Loading @@ -338,10 +349,8 @@ void *receive_data_loop(void *ptr) { if (packetType == PACKET_TYPE_DATA) { handle_data_packet(s, packet); } else if (packetType == PACKET_TYPE_REDUNDANCY) { handle_redundancy_packet(s, packet); } else { goto error; handle_redundancy_packet(s, packet); } send_feedback(s, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(s, kind, seqno, HandlePacketEnd); Loading @@ -361,7 +370,6 @@ void *receive_data_loop(void *ptr) { return NULL; error: PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE"); PERROR("receive_data_loop() ended unexpectedly."); PrrtSocket_cleanup(s); return NULL; }
prrt/proto/processes/dataTransmitter.c +1 −1 Original line number Diff line number Diff line Loading @@ -2,9 +2,9 @@ #include <netdb.h> #include <string.h> #include "../../defines.h" #include "../block.h" #include "../receiver.h" #include "../socket.h" #include "../types/block.h" #include "../../util/dbg.h" #include "../../util/common.h" #include "dataTransmitter.h" Loading
prrt/proto/receiver.h +1 −1 Original line number Diff line number Diff line Loading @@ -6,8 +6,8 @@ #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include "channelStateInformation.h" #include "stores/inFlightPacketStore.h" #include "types/channelStateInformation.h" typedef struct prrtRateSample { prrtByteCount_t prior_delivered; Loading