Loading prrt/proto/processes/dataTransmitter.c +2 −2 Original line number Diff line number Diff line Loading @@ -107,7 +107,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (sock_ptr->pacingEnabled) { double pacing_rate = BBR_getPacingRate(sock_ptr->receiver->bbr); double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver); if(pacing_rate != 0) { prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate)); debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime); Loading @@ -117,7 +117,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { // Update timestamp if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); ((PrrtPacketDataPayload*) (packet->payload))->btlbw = BBR_getBtlBw(sock_ptr->receiver->bbr); ((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_getBBRBtlBw(sock_ptr->receiver); } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) { ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); } Loading prrt/proto/receiver.c +54 −21 Original line number Diff line number Diff line Loading @@ -6,7 +6,26 @@ #include "stores/inFlightPacketStore.h" #include "receiver.h" PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) { void update_rate_sample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) return; packetTracking->delivered += packet->payloadLength; packetTracking->delivered_time = receiveTime; if (packet->delivered > rateSample->prior_delivered) { rateSample->prior_delivered = packet->delivered; rateSample->prior_time = packet->delivered_time; rateSample->is_app_limited = packet->is_app_limited; rateSample->send_elapsed = packet->sent_time - packet->first_sent_time; rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time; packetTracking->first_sent_time = packet->sent_time; } packet->delivered_time = 0; } PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss) { PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver)); check_mem(recv); recv->host_name = strdup(host); Loading Loading @@ -148,25 +167,6 @@ prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) { return 0; } void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) return; packetTracking->delivered += packet->payloadLength; packetTracking->delivered_time = receiveTime; if (packet->delivered > rateSample->prior_delivered) { rateSample->prior_delivered = packet->delivered; rateSample->prior_time = packet->delivered_time; rateSample->is_app_limited = packet->is_app_limited; rateSample->send_elapsed = packet->sent_time - packet->first_sent_time; rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time; packetTracking->first_sent_time = packet->sent_time; } packet->delivered_time = 0; } void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); debug(DEBUG_RECEIVER, "OnApplicationWrite: %u, %d", sequenceNumber, send_queue_length); Loading Loading @@ -200,6 +200,39 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * fee PERROR("Mutex error.%s", ""); } double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); double res = BBR_getPacingRate(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0.0; } prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); prrtByteCount_t res = BBR_getCwnd(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0; } prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0; } bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) { /* Clear app-limited field */ Loading Loading @@ -244,7 +277,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed."); PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum); if(packet != NULL) { PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking); update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking); prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum); recv->packetTracking->bytes_lost = lostBytes; Loading prrt/proto/receiver.h +5 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ typedef struct prrtReceiver { PrrtPacketTracking *packetTracking; } PrrtReceiver; PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port); PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss); bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); Loading @@ -39,6 +39,10 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPack void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i, prrtSequenceNumber_t i1); void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver); prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver); prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver); bool PrrtReceiver_destroy(PrrtReceiver *receiver); #endif //PRRT_RECEIVER_H prrt/proto/socket.c +3 −5 Original line number Diff line number Diff line Loading @@ -225,7 +225,7 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) { if(s->receiver != NULL) { PrrtReceiver_destroy(s->receiver); } s->receiver = PrrtReceiver_create(host, port); s->receiver = PrrtReceiver_create(host, port, s->mtu); return 0; } Loading Loading @@ -298,8 +298,7 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc PrrtPacket *packet; do { prrtTimestamp_t now = PrrtClock_get_current_time_us(); packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now, now + time_window_us); packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now, now + time_window_us); if (PrrtSocket_closing(s)) { return -1; } Loading @@ -310,8 +309,7 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) { prrtTimestamp_t now = PrrtClock_get_current_time_us(); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us, now + time_window_us, deadline); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, deadline); if (packet == NULL && errno == ETIMEDOUT) { return -1 * ETIMEDOUT; } Loading Loading
prrt/proto/processes/dataTransmitter.c +2 −2 Original line number Diff line number Diff line Loading @@ -107,7 +107,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (sock_ptr->pacingEnabled) { double pacing_rate = BBR_getPacingRate(sock_ptr->receiver->bbr); double pacing_rate = PrrtReceiver_getBBRPacingRate(sock_ptr->receiver); if(pacing_rate != 0) { prrtTimedelta_t pacingTime = (prrtTimedelta_t) round(((1000 * 1000 * ( (double)payloadLength)) / pacing_rate)); debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %f, Pacing Time: %u", packet->payloadLength, pacing_rate, pacingTime); Loading @@ -117,7 +117,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { // Update timestamp if(PrrtPacket_type(packet) == PACKET_TYPE_DATA) { ((PrrtPacketDataPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); ((PrrtPacketDataPayload*) (packet->payload))->btlbw = BBR_getBtlBw(sock_ptr->receiver->bbr); ((PrrtPacketDataPayload*) (packet->payload))->btlbw = PrrtReceiver_getBBRBtlBw(sock_ptr->receiver); } else if(PrrtPacket_type(packet) == PACKET_TYPE_REDUNDANCY) { ((PrrtPacketRedundancyPayload*) (packet->payload))->timestamp = PrrtClock_get_current_time_us(); } Loading
prrt/proto/receiver.c +54 −21 Original line number Diff line number Diff line Loading @@ -6,7 +6,26 @@ #include "stores/inFlightPacketStore.h" #include "receiver.h" PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port) { void update_rate_sample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) return; packetTracking->delivered += packet->payloadLength; packetTracking->delivered_time = receiveTime; if (packet->delivered > rateSample->prior_delivered) { rateSample->prior_delivered = packet->delivered; rateSample->prior_time = packet->delivered_time; rateSample->is_app_limited = packet->is_app_limited; rateSample->send_elapsed = packet->sent_time - packet->first_sent_time; rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time; packetTracking->first_sent_time = packet->sent_time; } packet->delivered_time = 0; } PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss) { PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver)); check_mem(recv); recv->host_name = strdup(host); Loading Loading @@ -148,25 +167,6 @@ prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) { return 0; } void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) return; packetTracking->delivered += packet->payloadLength; packetTracking->delivered_time = receiveTime; if (packet->delivered > rateSample->prior_delivered) { rateSample->prior_delivered = packet->delivered; rateSample->prior_time = packet->delivered_time; rateSample->is_app_limited = packet->is_app_limited; rateSample->send_elapsed = packet->sent_time - packet->first_sent_time; rateSample->ack_elapsed = packetTracking->delivered_time - packet->delivered_time; packetTracking->first_sent_time = packet->sent_time; } packet->delivered_time = 0; } void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t send_queue_length, prrtSequenceNumber_t sequenceNumber) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); debug(DEBUG_RECEIVER, "OnApplicationWrite: %u, %d", sequenceNumber, send_queue_length); Loading Loading @@ -200,6 +200,39 @@ void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * fee PERROR("Mutex error.%s", ""); } double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); double res = BBR_getPacingRate(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0.0; } prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); prrtByteCount_t res = BBR_getCwnd(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0; } prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver) { check(pthread_mutex_lock(&receiver->lock) == 0, "Lock failed."); prrtDeliveryRate_t res = BBR_getBtlBw(receiver->bbr); check(pthread_mutex_unlock(&receiver->lock) == 0, "Unlock failed."); return res; error: PERROR("Mutex error.%s", ""); return 0; } bool PrrtReceiver_generateRateSample(PrrtRateSample *rateSample, PrrtPacketTracking *packetTracking) { /* Clear app-limited field */ Loading Loading @@ -244,7 +277,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed."); PrrtPacket *packet = PrrtInFlightPacketStore_get_packet(inflightPacketStore, seqnum); if(packet != NULL) { PrrtReceiver_updateRateSample(recv->rateSample, packet, receiveTime, recv->packetTracking); update_rate_sample(recv->rateSample, packet, receiveTime, recv->packetTracking); prrtByteCount_t lostBytes = PrrtReceiver_removePacketAndLostData(recv, packet, inflightPacketStore, seqnum); recv->packetTracking->bytes_lost = lostBytes; Loading
prrt/proto/receiver.h +5 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ typedef struct prrtReceiver { PrrtPacketTracking *packetTracking; } PrrtReceiver; PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port); PrrtReceiver *PrrtReceiver_create(const char *host, uint16_t port, prrtByteCount_t mss); bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNumber_t seqnum, uint8_t packetType, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); Loading @@ -39,6 +39,10 @@ void PrrtReceiver_check_rto(PrrtReceiver *recv, prrtSequenceNumber_t i, prrtPack void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i, prrtSequenceNumber_t i1); void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); double PrrtReceiver_getBBRPacingRate(PrrtReceiver *receiver); prrtDeliveryRate_t PrrtReceiver_getBBRBtlBw(PrrtReceiver *receiver); prrtByteCount_t PrrtReceiver_getBBRCwnd(PrrtReceiver *receiver); bool PrrtReceiver_destroy(PrrtReceiver *receiver); #endif //PRRT_RECEIVER_H
prrt/proto/socket.c +3 −5 Original line number Diff line number Diff line Loading @@ -225,7 +225,7 @@ int PrrtSocket_connect(PrrtSocket *s, const char *host, const uint16_t port) { if(s->receiver != NULL) { PrrtReceiver_destroy(s->receiver); } s->receiver = PrrtReceiver_create(host, port); s->receiver = PrrtReceiver_create(host, port, s->mtu); return 0; } Loading Loading @@ -298,8 +298,7 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc PrrtPacket *packet; do { prrtTimestamp_t now = PrrtClock_get_current_time_us(); packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now, now + time_window_us); packet = PrrtPacketDeliveryStore_get_packet_wait(s->packetDeliveryStore, now, now + time_window_us); if (PrrtSocket_closing(s)) { return -1; } Loading @@ -310,8 +309,7 @@ int32_t PrrtSocket_receive_ordered_wait(PrrtSocket *s, void *buf_ptr, struct soc int32_t PrrtSocket_receive_ordered_timedwait(PrrtSocket *s, void *buf_ptr, struct sockaddr* addr, prrtTimedelta_t time_window_us, struct timespec* deadline) { prrtTimestamp_t now = PrrtClock_get_current_time_us(); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now - time_window_us, now + time_window_us, deadline); PrrtPacket *packet = PrrtPacketDeliveryStore_get_packet_timedwait(s->packetDeliveryStore, now, now + time_window_us, deadline); if (packet == NULL && errno == ETIMEDOUT) { return -1 * ETIMEDOUT; } Loading