Loading prrt/cprrt.pxd +0 −3 Original line number Diff line number Diff line Loading @@ -163,9 +163,6 @@ cdef extern from "proto/socket.h": bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) char *PrrtSocket_inet_ntoa(in_addr*) uint16_t PrrtSocket_ntohs(uint16_t v) cdef extern from "proto/stores/packetDeliveryStore.h": ctypedef struct PrrtPacketDeliveryStore: pass Loading prrt/proto/processes/dataReceiver.c +23 −15 Original line number Diff line number Diff line Loading @@ -271,7 +271,7 @@ void *receive_data_loop(void *ptr) { struct sockaddr_in remote; socklen_t addrlen = sizeof(remote); unsigned char buffer[MAX_PAYLOAD_LENGTH]; PrrtSocket *sock_ptr = ptr; PrrtSocket *s = ptr; while (1) { debug(DEBUG_DATARECEIVER, "About to receive."); Loading @@ -284,7 +284,11 @@ void *receive_data_loop(void *ptr) { struct timespec packet_recv_timestamp; uint64_t packet_recv_cyclestamp = 0; receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp); receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp); if (!atomic_load_explicit(&s->closing, memory_order_acquire)) { break; } debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec, packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr)); XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive); Loading Loading @@ -315,28 +319,28 @@ void *receive_data_loop(void *ptr) { kind = ts_feedback_packet; } if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) { sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp; s->lastReceivedTimestamp = prrt_recv_timestamp; XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp); XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp); XlapCycleStampValue(s, kind, seqno, ChannelReceive, packet_recv_cyclestamp); XlapTimeStampValue(s, kind, seqno, ChannelReceive, packet_recv_timestamp); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph1); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph2); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph3); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph1); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph2); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph3); XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart); XlapTimeStampCycle(s, kind, seqno, HandlePacketStart); if (packetType == PACKET_TYPE_DATA) { handle_data_packet(sock_ptr, packet); handle_data_packet(s, packet); } else if (packetType == PACKET_TYPE_REDUNDANCY) { handle_redundancy_packet(sock_ptr, packet); handle_redundancy_packet(s, packet); } else { goto error; } send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd); send_feedback(s, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(s, kind, seqno, HandlePacketEnd); } else if (packetType == PACKET_TYPE_FEEDBACK) { handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp); handle_feedback_packet(s, packet, prrt_recv_timestamp); PrrtPacket_destroy(packet); } else { PrrtPacket_print(packet); Loading @@ -344,10 +348,14 @@ void *receive_data_loop(void *ptr) { } debug(DEBUG_DATARECEIVER, "Cleanup"); PrrtSocket_cleanup(sock_ptr); PrrtSocket_cleanup(s); debug(DEBUG_DATARECEIVER, "Cleaned"); } PrrtSocket_cleanup(s); return NULL; error: PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE"); PrrtSocket_cleanup(s); return NULL; } prrt/proto/socket.c +1 −9 Original line number Diff line number Diff line Loading @@ -333,7 +333,7 @@ int PrrtSocket_interrupt(PrrtSocket *s) { } if (s->receiveDataThread != 0) { check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed."); pthread_cancel(s->receiveDataThread); check(pthread_join(s->receiveDataThread, res) == 0, "Join failed."); pthread_attr_destroy(s->receiveDataThreadAttr); s->receiveDataThread = 0; Loading Loading @@ -614,11 +614,3 @@ PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration } return PrrtCoder_copy(s->coder); }; char *PrrtSocket_inet_ntoa(struct in_addr* in) { return inet_ntoa(*in); } uint16_t PrrtSocket_ntohs(uint16_t v) { return ntohs(v); } No newline at end of file prrt/proto/socket.h +0 −3 Original line number Diff line number Diff line Loading @@ -152,7 +152,4 @@ prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s); prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s); bool PrrtSocket_get_bbr_round_start(PrrtSocket *s); char *PrrtSocket_inet_ntoa(struct in_addr* in); uint16_t PrrtSocket_ntohs(uint16_t v); #endif // PRRT_SOCKET_H prrt/prrt.pyx +3 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,8 @@ cimport cython cimport cprrt import datetime import socket import ipaddress include "sockets.pxd" Loading Loading @@ -108,7 +110,7 @@ cdef extern from "util/windowedFilter.c": pass cdef sockaddr_to_addr_and_port(sockaddr_in addr): return (cprrt.PrrtSocket_inet_ntoa(&addr.sin_addr).decode("utf8"), cprrt.PrrtSocket_ntohs(addr.sin_port)) return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port)) class PrrtCodingConfiguration: def __init__(self, n, k, n_cycle=None): Loading Loading
prrt/cprrt.pxd +0 −3 Original line number Diff line number Diff line Loading @@ -163,9 +163,6 @@ cdef extern from "proto/socket.h": bint PrrtSocket_get_bbr_is_app_limited(PrrtSocket *socket) bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket) char *PrrtSocket_inet_ntoa(in_addr*) uint16_t PrrtSocket_ntohs(uint16_t v) cdef extern from "proto/stores/packetDeliveryStore.h": ctypedef struct PrrtPacketDeliveryStore: pass Loading
prrt/proto/processes/dataReceiver.c +23 −15 Original line number Diff line number Diff line Loading @@ -271,7 +271,7 @@ void *receive_data_loop(void *ptr) { struct sockaddr_in remote; socklen_t addrlen = sizeof(remote); unsigned char buffer[MAX_PAYLOAD_LENGTH]; PrrtSocket *sock_ptr = ptr; PrrtSocket *s = ptr; while (1) { debug(DEBUG_DATARECEIVER, "About to receive."); Loading @@ -284,7 +284,11 @@ void *receive_data_loop(void *ptr) { struct timespec packet_recv_timestamp; uint64_t packet_recv_cyclestamp = 0; receive_from_socket(sock_ptr, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp); receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp); if (!atomic_load_explicit(&s->closing, memory_order_acquire)) { break; } debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec, packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr)); XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive); Loading Loading @@ -315,28 +319,28 @@ void *receive_data_loop(void *ptr) { kind = ts_feedback_packet; } if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) { sock_ptr->lastReceivedTimestamp = prrt_recv_timestamp; s->lastReceivedTimestamp = prrt_recv_timestamp; XlapCycleStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_cyclestamp); XlapTimeStampValue(sock_ptr, kind, seqno, ChannelReceive, packet_recv_timestamp); XlapCycleStampValue(s, kind, seqno, ChannelReceive, packet_recv_cyclestamp); XlapTimeStampValue(s, kind, seqno, ChannelReceive, packet_recv_timestamp); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph1); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph2); XlapTimestampPlaceholderUse(sock_ptr, kind, seqno, &tsph3); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph1); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph2); XlapTimestampPlaceholderUse(s, kind, seqno, &tsph3); XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketStart); XlapTimeStampCycle(s, kind, seqno, HandlePacketStart); if (packetType == PACKET_TYPE_DATA) { handle_data_packet(sock_ptr, packet); handle_data_packet(s, packet); } else if (packetType == PACKET_TYPE_REDUNDANCY) { handle_redundancy_packet(sock_ptr, packet); handle_redundancy_packet(s, packet); } else { goto error; } send_feedback(sock_ptr, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(sock_ptr, kind, seqno, HandlePacketEnd); send_feedback(s, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType); XlapTimeStampCycle(s, kind, seqno, HandlePacketEnd); } else if (packetType == PACKET_TYPE_FEEDBACK) { handle_feedback_packet(sock_ptr, packet, prrt_recv_timestamp); handle_feedback_packet(s, packet, prrt_recv_timestamp); PrrtPacket_destroy(packet); } else { PrrtPacket_print(packet); Loading @@ -344,10 +348,14 @@ void *receive_data_loop(void *ptr) { } debug(DEBUG_DATARECEIVER, "Cleanup"); PrrtSocket_cleanup(sock_ptr); PrrtSocket_cleanup(s); debug(DEBUG_DATARECEIVER, "Cleaned"); } PrrtSocket_cleanup(s); return NULL; error: PNOTIMPLEMENTED("SHOULD IMPLEMENT ERROR HANDLER HERE"); PrrtSocket_cleanup(s); return NULL; }
prrt/proto/socket.c +1 −9 Original line number Diff line number Diff line Loading @@ -333,7 +333,7 @@ int PrrtSocket_interrupt(PrrtSocket *s) { } if (s->receiveDataThread != 0) { check(pthread_cancel(s->receiveDataThread) == 0, "Cancel failed."); pthread_cancel(s->receiveDataThread); check(pthread_join(s->receiveDataThread, res) == 0, "Join failed."); pthread_attr_destroy(s->receiveDataThreadAttr); s->receiveDataThread = 0; Loading Loading @@ -614,11 +614,3 @@ PrrtCoder *PrrtSocket_get_matching_coder(PrrtSocket *s, PrrtCodingConfiguration } return PrrtCoder_copy(s->coder); }; char *PrrtSocket_inet_ntoa(struct in_addr* in) { return inet_ntoa(*in); } uint16_t PrrtSocket_ntohs(uint16_t v) { return ntohs(v); } No newline at end of file
prrt/proto/socket.h +0 −3 Original line number Diff line number Diff line Loading @@ -152,7 +152,4 @@ prrtByteCount_t PrrtSocket_get_pipe(PrrtSocket *s); prrtByteCount_t PrrtSocket_get_delivered(PrrtSocket *s); bool PrrtSocket_get_bbr_round_start(PrrtSocket *s); char *PrrtSocket_inet_ntoa(struct in_addr* in); uint16_t PrrtSocket_ntohs(uint16_t v); #endif // PRRT_SOCKET_H
prrt/prrt.pyx +3 −1 Original line number Diff line number Diff line Loading @@ -7,6 +7,8 @@ cimport cython cimport cprrt import datetime import socket import ipaddress include "sockets.pxd" Loading Loading @@ -108,7 +110,7 @@ cdef extern from "util/windowedFilter.c": pass cdef sockaddr_to_addr_and_port(sockaddr_in addr): return (cprrt.PrrtSocket_inet_ntoa(&addr.sin_addr).decode("utf8"), cprrt.PrrtSocket_ntohs(addr.sin_port)) return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port)) class PrrtCodingConfiguration: def __init__(self, n, k, n_cycle=None): Loading