Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
LARN
PRRT
Commits
aec84586
Commit
aec84586
authored
Mar 11, 2021
by
Sven Liefgen
Browse files
Use xlap macros
parent
a6f47d88
Pipeline
#4779
passed with stages
in 3 minutes and 32 seconds
Changes
4
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
prrt/Cargo.lock
View file @
aec84586
...
...
@@ -1254,6 +1254,7 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "xlap"
version = "0.1.0"
source = "git+https://git.nt.uni-saarland.de/LARN/X-Lap.git?branch=rust#254dfd5ab3b7f45e01fecb594f1cca1dc06e835c"
dependencies = [
"strum",
"yaml-rust",
...
...
prrt/Cargo.toml
View file @
aec84586
...
...
@@ -32,9 +32,9 @@ socket2 = "0.3"
bit-vec
=
"0.6"
thread_timer
=
"0.3"
[dependencies.xlap]
#
git = "https://git.nt.uni-saarland.de/LARN/X-Lap.git"
#
branch = "rust"
path
=
"../../xlap"
git
=
"https://git.nt.uni-saarland.de/LARN/X-Lap.git"
branch
=
"rust"
#
path = "../../xlap"
[dependencies.uom]
version
=
"0.30"
features
=
[
...
...
prrt/examples/sender.rs
View file @
aec84586
...
...
@@ -25,7 +25,7 @@ fn main() {
let
opt
=
Opt
::
from_args
();
let
s
=
Socket
::
<
VdmCoder
>
::
new
(
Information
::
new
::
<
byte
>
(
150
),
Duration
::
from_secs
(
1
));
unsafe
{
xlap
::
init
()
};
unsafe
{
xlap
::
init
!
()
};
let
mut
s
=
s
.bind
(
format!
(
"0.0.0.0:{}"
,
opt
.local
))
.unwrap
();
s
.connect
(
format!
(
"{}:{}"
,
opt
.host
,
opt
.port
))
.unwrap
();
...
...
prrt/src/socket.rs
View file @
aec84586
...
...
@@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use
std
::
sync
::
mpsc
::{
channel
,
Receiver
as
Queue
,
RecvTimeoutError
,
Sender
};
use
std
::
sync
::{
Arc
,
Mutex
};
use
std
::
thread
;
use
std
::
time
::{
Duration
,
SystemTime
,
Instant
};
use
std
::
time
::{
Duration
,
Instant
,
SystemTime
};
use
typenum
::
Bit
;
use
typenum
::{
False
,
True
};
...
...
@@ -689,9 +689,15 @@ impl<C: 'static + Coder> BoundSocket<C> {
fn
deliver_packet
(
&
self
,
mut
buf
:
&
mut
[
u8
],
packet
:
Packet
)
->
(
usize
,
SocketAddr
)
{
let
seqno
=
packet
.sequence_number
as
usize
;
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
,
PrrtReceivePackage
);
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
,
PrrtReceivePackage
);
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
,
CopyOutputStart
);
}
unsafe
{
xlap
::
timestamp_clock!
(
TS_DATA
,
seqno
,
PrrtReceivePackage
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
,
PrrtReceivePackage
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
,
CopyOutputStart
);
}
let
channel_receive
=
packet
.channel_receive
;
// TODO: unwrap
let
addr
=
packet
.sender_addr
.unwrap
();
...
...
@@ -704,9 +710,15 @@ impl<C: 'static + Coder> BoundSocket<C> {
len
+=
channel_receive
.len
()
.get
::
<
byte
>
()
as
usize
;
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
,
CopyOutputEnd
);
}
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
,
PrrtDeliver
);
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
,
PrrtDeliver
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
,
CopyOutputEnd
);
}
unsafe
{
xlap
::
timestamp_clock!
(
TS_DATA
,
seqno
,
PrrtDeliver
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
,
PrrtDeliver
);
}
(
len
,
addr
)
}
...
...
@@ -725,7 +737,7 @@ impl<C: 'static + Coder> BoundSocket<C> {
if
self
.inner.receiver
.is_set
()
{
lock!
(
self
.inner.app_send_pace
)
.track_start
();
let
prrt_send_start
=
(
xlap
::
get_timestamp_clock
(),
xlap
::
get_timestamp_cycle
());
let
prrt_send_start
=
(
xlap
::
get_timestamp_clock
!
(),
xlap
::
get_timestamp_cycle
!
());
let
packet
=
DataPayload
::
new
(
5
,
// TODO: magic number
buf
,
...
...
@@ -733,10 +745,16 @@ impl<C: 'static + Coder> BoundSocket<C> {
self
.inner.application_constraints.target_delay
,
);
let
seqno
=
packet
.sequence_number
as
usize
;
unsafe
{
xlap
::
set_timestamp
(
TS_DATA
,
seqno
,
PrrtSendStart
,
prrt_send_start
);
}
unsafe
{
xlap
::
set_timestamp!
(
TS_DATA
,
seqno
,
PrrtSendStart
,
prrt_send_start
);
}
self
._send_packet
::
<
Sync
,
Try
>
(
packet
)
?
;
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
,
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
,
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_clock!
(
TS_DATA
,
seqno
,
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
,
PrrtSendEnd
);
}
lock!
(
self
.inner.app_send_pace
)
.track_end
();
self
.sequence_number_source
+=
1
;
}
else
{
...
...
@@ -763,11 +781,19 @@ impl<C: 'static + Coder> BoundSocket<C> {
if
self
.inner.receiver
.is_set
()
{
let
seqno
=
packet
.sequence_number
;
lock!
(
self
.inner.app_send_pace
)
.track_start
();
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
.into
(),
PrrtSendStart
);
}
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
.into
(),
PrrtSendStart
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
.into
(),
PrrtSendStart
);
}
unsafe
{
xlap
::
timestamp_clock!
(
TS_DATA
,
seqno
.into
(),
PrrtSendStart
);
}
self
._send_packet
::
<
Sync
,
Try
>
(
packet
)
?
;
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
.into
(),
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
.into
(),
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_clock!
(
TS_DATA
,
seqno
.into
(),
PrrtSendEnd
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
seqno
.into
(),
PrrtSendEnd
);
}
lock!
(
self
.inner.app_send_pace
)
.track_end
();
}
else
{
return
Err
(
io
::
Error
::
new
(
...
...
@@ -779,7 +805,9 @@ impl<C: 'static + Coder> BoundSocket<C> {
}
fn
_send_packet
<
Sync
:
Bit
,
Try
:
Bit
>
(
&
mut
self
,
packet
:
Packet
)
->
std
::
io
::
Result
<
()
>
{
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
packet
.sequence_number
.into
(),
PrrtSubmitPackage
);
}
unsafe
{
xlap
::
timestamp_cycle!
(
TS_DATA
,
packet
.sequence_number
.into
(),
PrrtSubmitPackage
);
}
lock!
(
self
.inner.app_send_pace
)
.track_pause
();
if
Sync
::
BOOL
{
if
self
...
...
@@ -905,11 +933,11 @@ fn receive_data_loop<C: Coder>(mut s: ReceiveSocket<C>) {
}
// debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
// packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
let
link_receive
=
(
xlap
::
get_timestamp_clock
(),
xlap
::
get_timestamp_cycle
());
let
decode_start
=
xlap
::
get_timestamp_cycle
();
let
link_receive
=
(
xlap
::
get_timestamp_clock
!
(),
xlap
::
get_timestamp_cycle
!
());
let
decode_start
=
xlap
::
get_timestamp_cycle
!
();
let
mut
packet
=
Packet
::
decode
(
&
buf
[
..
len
])
.expect
(
"Decoding packet failed!"
);
// TODO: better error handling?
let
decode_end
=
xlap
::
get_timestamp_cycle
();
let
decode_end
=
xlap
::
get_timestamp_cycle
!
();
let
seqno
=
packet
.sequence_number
;
let
prrt_recv_timestamp
=
timestamp
.into
();
// TODO: error handling if not IPv4
...
...
@@ -919,16 +947,16 @@ fn receive_data_loop<C: Coder>(mut s: ReceiveSocket<C>) {
.duration_since
(
SystemTime
::
UNIX_EPOCH
)
.expect
(
"Timestamp before UNIX_EPOCH"
);
if
let
Some
(
sent_timestamp
)
=
packet
.timestamp
()
{
let
kind
=
match
packet
.packet_type
{
let
kind
=
xlap
::
map_kind!
(
packet
.packet_type
,
PacketType
::
Data
=>
TS_DATA
,
PacketType
::
Redundancy
=>
TS_REDUNDANCY
,
_
=>
unreachable!
(),
};
unsafe
{
xlap
::
set_timestamp
(
kind
,
seqno
.into
(),
ChannelReceive
,
packet_receive
)
};
unsafe
{
xlap
::
set_timestamp
(
kind
,
seqno
.into
(),
LinkReceive
,
link_receive
)
};
unsafe
{
xlap
::
set_timestamp_cycle
(
kind
,
seqno
.into
(),
DecodeStart
,
decode_start
)
};
PacketType
::
Redundancy
=>
TS_REDUNDANCY
);
unsafe
{
xlap
::
set_timestamp!
(
kind
,
seqno
.into
(),
ChannelReceive
,
packet_receive
)
};
unsafe
{
xlap
::
set_timestamp!
(
kind
,
seqno
.into
(),
LinkReceive
,
link_receive
)
};
unsafe
{
xlap
::
set_timestamp_cycle!
(
kind
,
seqno
.into
(),
DecodeStart
,
decode_start
)
};
unsafe
{
xlap
::
timestamp_cycle
(
kind
,
seqno
.into
(),
HandlePacketStart
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
kind
,
seqno
.into
(),
HandlePacketStart
)
};
let
packet_type
=
packet
.packet_type
;
handle_packet
(
&
mut
s
,
packet
);
send_feedback
(
...
...
@@ -960,7 +988,12 @@ fn receive_from_socket(
unimplemented!
()
}
else
{
let
(
len
,
address
)
=
socket
.recv_from
(
buf
)
?
;
Ok
((
len
,
address
,
(
Instant
::
now
(),
xlap
::
get_timestamp_cycle
()),
SystemTime
::
now
()))
Ok
((
len
,
address
,
(
Instant
::
now
(),
xlap
::
get_timestamp_cycle!
()),
SystemTime
::
now
(),
))
}
}
...
...
@@ -986,12 +1019,8 @@ fn handle_data_packet<C: Coder>(s: &mut ReceiveSocket<C>, packet: Packet) {
let
now
=
Timestamp
::
now
();
if
now
>
payload
.packet_timeout
{
info!
[
target
:
"Receiver"
,
"Timeout data packet {} ({:?} > {:?})"
,
seqno
,
now
,
payload
.packet_timeout
];
s
.delivered_packet_table
.test_set_is_number_relevant
(
seqno
);
}
else
if
!
s
.delivered_packet_table
.test_set_is_number_relevant
(
seqno
)
{
s
.delivered_packet_table
.test_set_is_number_relevant
(
seqno
);
}
else
if
!
s
.delivered_packet_table
.test_set_is_number_relevant
(
seqno
)
{
info!
[
target
:
"Receiver"
,
"Packet not relevant: {}"
,
seqno
];
}
else
{
s
.sender_csi
.set_delivery_rate
(
payload
.btl_datarate
);
...
...
@@ -1018,8 +1047,8 @@ fn handle_data_packet<C: Coder>(s: &mut ReceiveSocket<C>, packet: Packet) {
info!
[
target
:
"Receiver"
,
"Inserted {}"
,
seqno
];
}
}
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
.into
(),
PrrtReturnPackage
)
};
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
.into
(),
PrrtReturnPackage
)
};
unsafe
{
xlap
::
timestamp_clock
!
(
TS_DATA
,
seqno
.into
(),
PrrtReturnPackage
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
TS_DATA
,
seqno
.into
(),
PrrtReturnPackage
)
};
}
}
...
...
@@ -1113,12 +1142,12 @@ fn send_feedback<C: Coder>(
_sent_timestamp
:
Timestamp
,
ty
:
PacketType
,
)
->
bool
{
let
kind
=
match
ty
{
let
kind
=
xlap
::
map_kind!
(
packet
.packet_type
,
PacketType
::
Data
=>
TS_DATA
,
PacketType
::
Redundancy
=>
TS_REDUNDANCY
,
_
=>
unreachable!
(),
};
unsafe
{
xlap
::
timestamp_cycle
(
kind
,
seqno
.into
(),
SendFeedbackStart
)
};
PacketType
::
Redundancy
=>
TS_REDUNDANCY
);
unsafe
{
xlap
::
timestamp_cycle!
(
kind
,
seqno
.into
(),
SendFeedbackStart
)
};
let
forward_trip_timestamp
=
Timestamp
::
now
()
+
(
s
.last_sent_timestamp
-
receive_stamp
);
...
...
@@ -1148,7 +1177,7 @@ fn send_feedback<C: Coder>(
packet
.encode
(
buf
.as_mut_slice
());
let
result
=
s
.inner.socket
.send_to
(
buf
.as_slice
(),
&
remote
)
.is_ok
();
unsafe
{
xlap
::
timestamp_cycle
(
kind
,
seqno
.into
(),
SendFeedbackEnd
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
kind
,
seqno
.into
(),
SendFeedbackEnd
)
};
result
}
...
...
@@ -1187,8 +1216,8 @@ fn send_data_loop<C: 'static + Coder>(mut s: SendSocket<C>, send_data_queue: Que
fn
transmit
<
C
:
'static
+
Coder
>
(
s
:
&
mut
SendSocket
<
C
>
,
mut
packet
:
Packet
)
{
let
seqno
=
packet
.sequence_number
;
s
.inner.prrt_transmit_pace
.lock
()
.unwrap
()
.track_start
();
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitStart
)
};
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitStart
)
};
unsafe
{
xlap
::
timestamp_clock
!
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitStart
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitStart
)
};
let
mut
block
=
s
.receive_block
.take
()
.unwrap_or_else
(||
{
Block
::
sender
(
&
s
.inner.coding_configuration
.lock
()
.unwrap
(),
...
...
@@ -1206,17 +1235,17 @@ fn transmit<C: 'static + Coder>(s: &mut SendSocket<C>, mut packet: Packet) {
// Clone packet to be able to insert it at two different places (outstanding packet store and
// block). In neither of those places the packet is mutated.
if
send_packet
(
s
,
packet
.clone
())
{
unsafe
{
xlap
::
timestamp_cycle
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_clock
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_clock
!
(
TS_DATA
,
seqno
.into
(),
PrrtTransmitEnd
)
};
if
!
block
.insert_data_packet
(
packet
)
{
panic!
(
"Failed to insert packet {}"
,
seqno
);
}
if
block
.encode_ready
()
{
unsafe
{
xlap
::
timestamp_cycle
(
TS_REDUNDANCY
,
seqno
.into
(),
PrrtEncodeStart
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
TS_REDUNDANCY
,
seqno
.into
(),
PrrtEncodeStart
)
};
block
.encode
(
&
mut
s
.sequence_number_redundancy
);
unsafe
{
xlap
::
timestamp_cycle
(
TS_REDUNDANCY
,
seqno
.into
(),
PrrtEncodeEnd
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
TS_REDUNDANCY
,
seqno
.into
(),
PrrtEncodeEnd
)
};
// TODO: unwrap
retransmission_round_handler
(
s
.inner
.clone
(),
block
,
s
.send_data_queue
.clone
());
...
...
@@ -1312,23 +1341,23 @@ fn send_packet<C: Coder>(s: &mut SendSocket<C>, mut packet: Packet) -> bool {
packet
.encode
(
&
mut
buf
);
let
kind
=
match
packet
.packet_type
{
let
kind
=
xlap
::
map_kind!
(
packet
.packet_type
,
PacketType
::
Data
=>
TS_DATA
,
PacketType
::
Redundancy
=>
TS_REDUNDANCY
,
_
=>
unreachable!
(),
};
unsafe
{
xlap
::
timestamp_cycle
(
kind
,
seqno
.into
(),
LinkTransmitStart
)
};
unsafe
{
xlap
::
timestamp_clock
(
kind
,
seqno
.into
(),
LinkTransmitStart
)
};
PacketType
::
Redundancy
=>
TS_REDUNDANCY
);
unsafe
{
xlap
::
timestamp_cycle!
(
kind
,
seqno
.into
(),
LinkTransmitStart
)
};
unsafe
{
xlap
::
timestamp_clock!
(
kind
,
seqno
.into
(),
LinkTransmitStart
)
};
let
channel_transmit
=
send_to_socket
(
s
,
&
buf
);
unsafe
{
xlap
::
set_timestamp
(
kind
,
seqno
.into
(),
ChannelTransmit
,
channel_transmit
)
};
unsafe
{
xlap
::
set_timestamp
!
(
kind
,
seqno
.into
(),
ChannelTransmit
,
channel_transmit
)
};
s
.inner
.receiver
.add_outstanding_packet_state
(
packet
,
Timestamp
::
now
());
unsafe
{
xlap
::
timestamp_clock
(
kind
,
seqno
.into
(),
LinkTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_cycle
(
kind
,
seqno
.into
(),
LinkTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_clock
!
(
kind
,
seqno
.into
(),
LinkTransmitEnd
)
};
unsafe
{
xlap
::
timestamp_cycle
!
(
kind
,
seqno
.into
(),
LinkTransmitEnd
)
};
true
}
...
...
@@ -1416,10 +1445,7 @@ fn send_to_socket<C: Coder>(s: &SendSocket<C>, buf: &[u8]) -> (std::time::Instan
.socket
.send_to
(
buf
,
&
recv
.address
())
.expect
(
"Sendto failed"
);
(
xlap
::
get_timestamp_clock
(),
xlap
::
get_timestamp_cycle
()
)
(
xlap
::
get_timestamp_clock!
(),
xlap
::
get_timestamp_cycle!
())
}
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment