Loading prrt/proto/processes/dataTransmitter.c +10 −8 Original line number Original line Diff line number Diff line Loading @@ -93,16 +93,18 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { default:; default:; } } if (sock_ptr->pacingEnabled) { prrtTimeDifference_t diff; prrtTimeDifference_t diff; do { do { diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); if (diff > 0) { if (diff > 0) { usleep_nano(diff / 2); usleep_nano(diff / 2); } } } while (diff > 0); } } while (diff > 0); prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (BBR_getPacingRate(sock_ptr->receiver->bbr) != 0) { if (!sock_ptr->pacingEnabled || BBR_getPacingRate(sock_ptr->receiver->bbr) != 0) { debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, BBR_getPacingRate(sock_ptr->receiver->bbr), packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, BBR_getPacingRate(sock_ptr->receiver->bbr), packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); sock_ptr->nextSendTime = now + (packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); sock_ptr->nextSendTime = now + (packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); } } Loading prrt/proto/socket.c +7 −0 Original line number Original line Diff line number Diff line Loading @@ -55,6 +55,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay check_mem(s); check_mem(s); s->nextSendTime = 0; s->nextSendTime = 0; s->pacingEnabled = false; s->isSender = is_sender; s->isSender = is_sender; s->isHardwareTimestamping = false; s->isHardwareTimestamping = false; Loading Loading @@ -128,6 +129,12 @@ bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) { return false; return false; } } bool PrrtSocket_enable_pacing(PrrtSocket *s) { s->pacingEnabled = true; return true; } bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) { bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) { check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()"); check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()"); s->isHardwareTimestamping = true; s->isHardwareTimestamping = true; Loading prrt/proto/socket.h +3 −0 Original line number Original line Diff line number Diff line Loading @@ -35,6 +35,8 @@ typedef struct prrtSocket { struct sockaddr_in *address; struct sockaddr_in *address; bool isBound; bool isBound; bool pacingEnabled; PrrtClock clock; PrrtClock clock; pthread_t sendDataThread; pthread_t sendDataThread; Loading Loading @@ -93,6 +95,7 @@ PrrtSocket *PrrtSocket_create(bool is_sender, prrtTimedelta_t target_delay_us); bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name); bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name); bool PrrtSocket_enable_thread_pinning(PrrtSocket *s); bool PrrtSocket_enable_thread_pinning(PrrtSocket *s); bool PrrtSocket_enable_pacing(PrrtSocket *s); bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port); bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port); Loading prrt/time-receiver.c +1 −1 Original line number Original line Diff line number Diff line Loading @@ -119,7 +119,7 @@ int main(int argc, char **argv) { while (i < rounds && keepRunning) { while (i < rounds && keepRunning) { char buffer[MAX_PAYLOAD_LENGTH + 1]; char buffer[MAX_PAYLOAD_LENGTH + 1]; debug(DEBUG_RECEIVER, "About to receive."); debug(DEBUG_RECEIVER, "About to receive."); int n = PrrtSocket_recv(s, buffer); int n = PrrtSocket_receive_asap_wait(s, buffer); if (n < 0) { if (n < 0) { continue; continue; } } Loading prrt/time-sender.c +11 −2 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ static struct argp_option options[] = { {"rounds", 'r', "ROUNDS", 0, "Rounds" }, {"rounds", 'r', "ROUNDS", 0, "Rounds" }, {"output", 'o', "FILE", 0, "Output to FILE instead of standard output" }, {"output", 'o', "FILE", 0, "Output to FILE instead of standard output" }, {"threadpinning", 'T', 0, 0, "Enable thread pinning"}, {"threadpinning", 'T', 0, 0, "Enable thread pinning"}, {"pacing", 'P', 0, 0, "Enable pacing"}, { 0 } { 0 } }; }; Loading @@ -26,6 +27,7 @@ static struct arguments uint16_t rounds; uint16_t rounds; char* outfile; char* outfile; bool thread_pinning; bool thread_pinning; bool pacing; }; }; static char args_doc[] = ""; static char args_doc[] = ""; Loading @@ -43,6 +45,9 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'p': case 'p': arguments->port = (uint16_t) strtol(arg, &pEnd, 10); arguments->port = (uint16_t) strtol(arg, &pEnd, 10); break; break; case 'P': arguments->pacing = true; break; case 'l': case 'l': arguments->local_port = (uint16_t) strtol(arg, &pEnd, 10); arguments->local_port = (uint16_t) strtol(arg, &pEnd, 10); break; break; Loading Loading @@ -73,6 +78,7 @@ int main(int argc, char **argv) { .port = 5000, .port = 5000, .local_port = 6000, .local_port = 6000, .rounds = 127, .rounds = 127, .pacing = false, .outfile = "-", .outfile = "-", .thread_pinning = false, .thread_pinning = false, }; }; Loading @@ -95,7 +101,7 @@ int main(int argc, char **argv) { #endif #endif PrrtSocket *s = PrrtSocket_create(true, 10 * 1000 * 1000); PrrtSocket *s = PrrtSocket_create(true, 250 * 1000); check(s != NULL, "Socket create failed."); check(s != NULL, "Socket create failed."); if(arguments.thread_pinning) { if(arguments.thread_pinning) { Loading @@ -104,7 +110,10 @@ int main(int argc, char **argv) { check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed"); check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed"); //PrrtSocket_set_coding_parameters(s, 1, 1); // comment this line to re-enable coding. PrrtSocket_set_coding_parameters(s, 1, 1, 1, NULL); // comment this line to re-enable coding. if(arguments.pacing) { PrrtSocket_enable_pacing(s); } PrrtSocket_connect(s, arguments.target, arguments.port); PrrtSocket_connect(s, arguments.target, arguments.port); Loading Loading
prrt/proto/processes/dataTransmitter.c +10 −8 Original line number Original line Diff line number Diff line Loading @@ -93,16 +93,18 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { default:; default:; } } if (sock_ptr->pacingEnabled) { prrtTimeDifference_t diff; prrtTimeDifference_t diff; do { do { diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); diff = sock_ptr->nextSendTime - PrrtClock_get_current_time_us(); if (diff > 0) { if (diff > 0) { usleep_nano(diff / 2); usleep_nano(diff / 2); } } } while (diff > 0); } } while (diff > 0); prrtTimestamp_t now = PrrtClock_get_current_time_us(); prrtTimestamp_t now = PrrtClock_get_current_time_us(); if (BBR_getPacingRate(sock_ptr->receiver->bbr) != 0) { if (!sock_ptr->pacingEnabled || BBR_getPacingRate(sock_ptr->receiver->bbr) != 0) { debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, BBR_getPacingRate(sock_ptr->receiver->bbr), packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); debug(DEBUG_DATATRANSMITTER, "Payload: %u, PacingRate: %u, Pacing Time: %u", packet->payloadLength, BBR_getPacingRate(sock_ptr->receiver->bbr), packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); sock_ptr->nextSendTime = now + (packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); sock_ptr->nextSendTime = now + (packet->payloadLength / BBR_getPacingRate(sock_ptr->receiver->bbr)); } } Loading
prrt/proto/socket.c +7 −0 Original line number Original line Diff line number Diff line Loading @@ -55,6 +55,7 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay check_mem(s); check_mem(s); s->nextSendTime = 0; s->nextSendTime = 0; s->pacingEnabled = false; s->isSender = is_sender; s->isSender = is_sender; s->isHardwareTimestamping = false; s->isHardwareTimestamping = false; Loading Loading @@ -128,6 +129,12 @@ bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) { return false; return false; } } bool PrrtSocket_enable_pacing(PrrtSocket *s) { s->pacingEnabled = true; return true; } bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) { bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) { check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()"); check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()"); s->isHardwareTimestamping = true; s->isHardwareTimestamping = true; Loading
prrt/proto/socket.h +3 −0 Original line number Original line Diff line number Diff line Loading @@ -35,6 +35,8 @@ typedef struct prrtSocket { struct sockaddr_in *address; struct sockaddr_in *address; bool isBound; bool isBound; bool pacingEnabled; PrrtClock clock; PrrtClock clock; pthread_t sendDataThread; pthread_t sendDataThread; Loading Loading @@ -93,6 +95,7 @@ PrrtSocket *PrrtSocket_create(bool is_sender, prrtTimedelta_t target_delay_us); bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name); bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name); bool PrrtSocket_enable_thread_pinning(PrrtSocket *s); bool PrrtSocket_enable_thread_pinning(PrrtSocket *s); bool PrrtSocket_enable_pacing(PrrtSocket *s); bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port); bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port); Loading
prrt/time-receiver.c +1 −1 Original line number Original line Diff line number Diff line Loading @@ -119,7 +119,7 @@ int main(int argc, char **argv) { while (i < rounds && keepRunning) { while (i < rounds && keepRunning) { char buffer[MAX_PAYLOAD_LENGTH + 1]; char buffer[MAX_PAYLOAD_LENGTH + 1]; debug(DEBUG_RECEIVER, "About to receive."); debug(DEBUG_RECEIVER, "About to receive."); int n = PrrtSocket_recv(s, buffer); int n = PrrtSocket_receive_asap_wait(s, buffer); if (n < 0) { if (n < 0) { continue; continue; } } Loading
prrt/time-sender.c +11 −2 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ static struct argp_option options[] = { {"rounds", 'r', "ROUNDS", 0, "Rounds" }, {"rounds", 'r', "ROUNDS", 0, "Rounds" }, {"output", 'o', "FILE", 0, "Output to FILE instead of standard output" }, {"output", 'o', "FILE", 0, "Output to FILE instead of standard output" }, {"threadpinning", 'T', 0, 0, "Enable thread pinning"}, {"threadpinning", 'T', 0, 0, "Enable thread pinning"}, {"pacing", 'P', 0, 0, "Enable pacing"}, { 0 } { 0 } }; }; Loading @@ -26,6 +27,7 @@ static struct arguments uint16_t rounds; uint16_t rounds; char* outfile; char* outfile; bool thread_pinning; bool thread_pinning; bool pacing; }; }; static char args_doc[] = ""; static char args_doc[] = ""; Loading @@ -43,6 +45,9 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) case 'p': case 'p': arguments->port = (uint16_t) strtol(arg, &pEnd, 10); arguments->port = (uint16_t) strtol(arg, &pEnd, 10); break; break; case 'P': arguments->pacing = true; break; case 'l': case 'l': arguments->local_port = (uint16_t) strtol(arg, &pEnd, 10); arguments->local_port = (uint16_t) strtol(arg, &pEnd, 10); break; break; Loading Loading @@ -73,6 +78,7 @@ int main(int argc, char **argv) { .port = 5000, .port = 5000, .local_port = 6000, .local_port = 6000, .rounds = 127, .rounds = 127, .pacing = false, .outfile = "-", .outfile = "-", .thread_pinning = false, .thread_pinning = false, }; }; Loading @@ -95,7 +101,7 @@ int main(int argc, char **argv) { #endif #endif PrrtSocket *s = PrrtSocket_create(true, 10 * 1000 * 1000); PrrtSocket *s = PrrtSocket_create(true, 250 * 1000); check(s != NULL, "Socket create failed."); check(s != NULL, "Socket create failed."); if(arguments.thread_pinning) { if(arguments.thread_pinning) { Loading @@ -104,7 +110,10 @@ int main(int argc, char **argv) { check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed"); check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed"); //PrrtSocket_set_coding_parameters(s, 1, 1); // comment this line to re-enable coding. PrrtSocket_set_coding_parameters(s, 1, 1, 1, NULL); // comment this line to re-enable coding. if(arguments.pacing) { PrrtSocket_enable_pacing(s); } PrrtSocket_connect(s, arguments.target, arguments.port); PrrtSocket_connect(s, arguments.target, arguments.port); Loading