Loading prrt/proto/bbr.c +1 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ prrtByteCount_t BBR_Inflight(BBR* bbr, double gain) if (bbr->rtprop == Inf) return InitialCwnd; /* no valid RTT samples yet */ uint32_t quanta = 0; uint32_t estimated_bdp = (uint32_t) round(((double)bbr->bw * bbr->rtprop) / (1000 * 1000)); uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000)); return (uint32_t)(gain * estimated_bdp + quanta); } Loading prrt/proto/processes/dataTransmitter.c +7 −3 Original line number Diff line number Diff line Loading @@ -94,14 +94,18 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { default:; } double payloadLength = (double)packet->payloadLength; if (sock_ptr->pacingEnabled) { prrtTimeDifference_t diff; prrtByteCount_t space; do { diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); if (diff > 0) { space = MAX(BBR_getCwnd(sock_ptr->receiver->bbr) - PrrtReceiver_get_pipe(sock_ptr->receiver), 0); if (diff > 0 && payloadLength <= space) { usleep_nano(diff / 2); } } while (diff > 0); } while (diff > 0 && payloadLength <= space); } else { usleep_nano(1); } Loading @@ -112,7 +116,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { if(pacing_rate != 0) { debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, pacing_rate, packet->payloadLength / pacing_rate); sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * ((double)packet->payloadLength)) / pacing_rate))); sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * payloadLength) / pacing_rate))); } } Loading prrt/proto/receiver.c +12 −0 Original line number Diff line number Diff line Loading @@ -99,6 +99,18 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) { return false; } prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) { prrtByteCount_t res = 0; check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed."); res = recv->packetTracking->pipe; check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed."); return res; error: PERROR("PrrtReceiver_get_pipe failed.") return 0; } void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) Loading prrt/proto/receiver.h +1 −0 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime); prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv); void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i); void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); Loading prrt/proto/socket.c +1 −1 Original line number Diff line number Diff line Loading @@ -637,7 +637,7 @@ prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s) { }; prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s) { return s->receiver->packetTracking->pipe; return PrrtReceiver_get_pipe(s->receiver); }; prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s) { Loading Loading
prrt/proto/bbr.c +1 −1 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ prrtByteCount_t BBR_Inflight(BBR* bbr, double gain) if (bbr->rtprop == Inf) return InitialCwnd; /* no valid RTT samples yet */ uint32_t quanta = 0; uint32_t estimated_bdp = (uint32_t) round(((double)bbr->bw * bbr->rtprop) / (1000 * 1000)); uint32_t estimated_bdp = (uint32_t) round((((double)bbr->bw) * bbr->rtprop) / (1000 * 1000)); return (uint32_t)(gain * estimated_bdp + quanta); } Loading
prrt/proto/processes/dataTransmitter.c +7 −3 Original line number Diff line number Diff line Loading @@ -94,14 +94,18 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { default:; } double payloadLength = (double)packet->payloadLength; if (sock_ptr->pacingEnabled) { prrtTimeDifference_t diff; prrtByteCount_t space; do { diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); if (diff > 0) { space = MAX(BBR_getCwnd(sock_ptr->receiver->bbr) - PrrtReceiver_get_pipe(sock_ptr->receiver), 0); if (diff > 0 && payloadLength <= space) { usleep_nano(diff / 2); } } while (diff > 0); } while (diff > 0 && payloadLength <= space); } else { usleep_nano(1); } Loading @@ -112,7 +116,7 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { if(pacing_rate != 0) { debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, pacing_rate, packet->payloadLength / pacing_rate); sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * ((double)packet->payloadLength)) / pacing_rate))); sock_ptr->nextSendTime = now + ((prrtTimedelta_t) round(((1000 * 1000 * payloadLength) / pacing_rate))); } } Loading
prrt/proto/receiver.c +12 −0 Original line number Diff line number Diff line Loading @@ -99,6 +99,18 @@ bool PrrtReceiver_destroy(PrrtReceiver *receiver) { return false; } prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv) { prrtByteCount_t res = 0; check(pthread_mutex_lock(&recv->lock) == 0, "Lock failed."); res = recv->packetTracking->pipe; check(pthread_mutex_unlock(&recv->lock) == 0, "Unlock failed."); return res; error: PERROR("PrrtReceiver_get_pipe failed.") return 0; } void PrrtReceiver_updateRateSample(PrrtRateSample *rateSample, PrrtPacket *packet, prrtTimestamp_t receiveTime, PrrtPacketTracking *packetTracking) { if (packet->delivered_time == 0) Loading
prrt/proto/receiver.h +1 −0 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ bool PrrtReceiver_updateAndGenerateRateSample(PrrtReceiver *recv, prrtSequenceNu prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); void PrrtReceiver_add_outstanding_packet_state(PrrtReceiver *recv, PrrtPacket *packet, prrtTimestamp_t sentTime); prrtByteCount_t PrrtReceiver_get_pipe(PrrtReceiver *recv); void PrrtReceiver_on_application_write(PrrtReceiver *receiver, uint32_t i); void PrrtReceiver_on_ack(PrrtReceiver *receiver, PrrtPacketFeedbackPayload * feedbackPayload, prrtTimestamp_t receiveTime, prrtTimedelta_t rtt); Loading
prrt/proto/socket.c +1 −1 Original line number Diff line number Diff line Loading @@ -637,7 +637,7 @@ prrtByteCount_t PrrtSocket_get_send_quantum(PrrtSocket *s) { }; prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s) { return s->receiver->packetTracking->pipe; return PrrtReceiver_get_pipe(s->receiver); }; prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s) { Loading