Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
LARN
PRRT
Commits
b6175edc
Commit
b6175edc
authored
Nov 03, 2020
by
Pablo Gil Pereira
Browse files
Merge branch 'fix/red_package_encoding'
parents
43b221f7
8b8e6efb
Pipeline
#4676
failed with stages
in 0 seconds
Changes
15
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
.gitignore
View file @
b6175edc
...
...
@@ -13,3 +13,4 @@ prrt.so
.idea/
.venv/
Pipfile.lock
.vscode/
.gitlab-ci.yml
View file @
b6175edc
...
...
@@ -27,28 +27,6 @@ build:prrt:
-
CC=gcc-5 CXX=g++-5 cmake .
-
make
build:container:
stage
:
build
tags
:
-
docker
script
:
-
export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME" | sed 's#/#_#' | sed 's#^master$#latest#')
-
docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile .
-
docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
-
docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
-
docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
build:container_tcp:
stage
:
build
tags
:
-
docker
script
:
-
export DOCKER_TAG=$(echo "$CI_BUILD_REF_NAME""_tcp" | sed 's#/#_#' | sed 's#^master$#latest#')
-
docker build -t $CI_REGISTRY_IMAGE:$DOCKER_TAG --build-arg http_proxy=http://www-proxy.uni-saarland.de:3128 -f docker/Dockerfile_tcp .
-
docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN $CI_REGISTRY
-
docker push $CI_REGISTRY_IMAGE:$DOCKER_TAG
-
docker rmi $CI_REGISTRY_IMAGE:$DOCKER_TAG
test:prrt_mem:
stage
:
test
dependencies
:
...
...
examples/sender.py
View file @
b6175edc
...
...
@@ -9,5 +9,5 @@ s = prrt.PrrtSocket(("0.0.0.0", localport), maximum_payload_size=150)
s
.
connect
((
host
,
port
))
for
i
in
range
(
10
):
s
.
send
(
"Packet {}"
.
format
(
i
).
encode
(
"utf8"
))
s
.
send
(
"Close"
.
encode
(
"utf8"
))
s
.
send
_sync
(
"Packet {}"
.
format
(
i
).
encode
(
"utf8"
))
s
.
send
_sync
(
"Close"
.
encode
(
"utf8"
))
prrt/proto/processes/dataTransmitter.c
View file @
b6175edc
...
...
@@ -181,6 +181,8 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return
false
;
}
void
block_timeout
(
void
*
arg
);
typedef
struct
timer_arg
{
PrrtSocket
*
socket
;
PrrtBlock
*
block
;
...
...
@@ -261,9 +263,10 @@ void PrrtDataTransmitter_transmit(PrrtSocket *sock_ptr, PrrtPacket *packet) {
RetransmissionTimerArgs
*
args
=
(
RetransmissionTimerArgs
*
)
calloc
(
1
,
sizeof
(
RetransmissionTimerArgs
));
args
->
block
=
sock_ptr
->
receiveBlock
;
sock_ptr
->
receiveBlock
=
NULL
;
args
->
socket
=
sock_ptr
;
retransmission_round_handler
(
args
);
sock_ptr
->
receiveBlock
=
NULL
;
}
}
else
{
PrrtPacket_destroy
(
packet
);
...
...
prrt/proto/socket.c
View file @
b6175edc
...
...
@@ -86,8 +86,6 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
s
->
isHardwareTimestamping
=
false
;
s
->
interfaceName
=
NULL
;
s
->
retransmissionTimer
=
PrrtTimer_create
(
3
);
s
->
isThreadPinning
=
false
;
PrrtClock_init
(
&
s
->
clock
);
...
...
@@ -97,6 +95,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t maximum_payload_size, prrtTimedelt
atomic_store_explicit
(
&
s
->
closing
,
false
,
memory_order_release
);
s
->
receiver
=
NULL
;
s
->
retransmissionTimer
=
PrrtTimer_create
(
0
);
uint8_t
n_cycle
[
1
]
=
{
N_START
-
K_START
};
s
->
codingParameters
=
PrrtCodingConfiguration_create
(
K_START
,
N_START
,
1
,
n_cycle
);
s
->
coder
=
PrrtCoder_create
(
s
->
codingParameters
);
...
...
prrt/proto/socket.h
View file @
b6175edc
...
...
@@ -100,7 +100,6 @@ typedef struct prrtSocket {
atomic_bool
isThreadPinning
;
prrtByteCount_t
maximum_payload_size
;
PrrtTimer
*
retransmissionTimer
;
}
PrrtSocket
;
...
...
prrt/proto/stores/packetDeliveryStore.c
View file @
b6175edc
...
...
@@ -99,22 +99,32 @@ PrrtPacket *PrrtPacketDeliveryStore_get_packet_timedwait(PrrtPacketDeliveryStore
PrrtPacket
*
PrrtPacketDeliveryStore_get_packet
(
PrrtPacketDeliveryStore
*
store
,
prrtTimestamp_t
start
,
prrtTimestamp_t
stop
)
{
PrrtPacket
*
packet
=
NULL
;
bool
timed_out
;
check
(
pthread_mutex_lock
(
&
store
->
lock
)
==
EXIT_SUCCESS
,
"Lock failed."
);
if
(
start
>
stop
)
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
(
BPTreeKey_t
)
start
,
TIMESTAMP_SPACE
-
1
);
if
(
packet
==
NULL
)
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
0
,
(
BPTreeKey_t
)
stop
);
do
{
timed_out
=
false
;
if
(
start
>
stop
)
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
(
BPTreeKey_t
)
start
,
TIMESTAMP_SPACE
-
1
);
if
(
packet
==
NULL
)
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
0
,
(
BPTreeKey_t
)
stop
);
}
}
else
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
(
BPTreeKey_t
)
start
,
(
BPTreeKey_t
)
stop
);
}
}
else
{
packet
=
BPTree_get_first_in_range
(
store
->
tree
,
(
BPTreeKey_t
)
start
,
(
BPTreeKey_t
)
stop
);
}
if
(
packet
!=
NULL
)
{
store
->
tree
=
BPTree_delete
(
store
->
tree
,
PrrtDataPacket_packet_timeout
(
packet
));
}
if
(
packet
!=
NULL
)
{
store
->
tree
=
BPTree_delete
(
store
->
tree
,
PrrtDataPacket_packet_timeout
(
packet
));
prrtTimestamp_t
now
=
PrrtClock_get_current_time_us
();
if
(
PrrtTimestamp_cmp
(
now
,
PrrtDataPacket_packet_timeout
(
packet
))
>
0
)
{
timed_out
=
true
;
PrrtPacket_destroy
(
packet
);
packet
=
NULL
;
}
}
}
while
(
timed_out
);
check
(
pthread_mutex_unlock
(
&
store
->
lock
)
==
EXIT_SUCCESS
,
"Unlock failed."
);
return
packet
;
...
...
prrt/proto/timer.c
View file @
b6175edc
...
...
@@ -133,7 +133,7 @@ struct prrtTimer {
atomic_int
wait
;
_Atomic
(
TimerNode
*
)
new
;
_Atomic
(
TimerNode
*
)
old
;
_Atomic
(
TimerNode
*
)
del
;
_Atomic
(
TimerNode
*
)
cur
;
TimerDateUDiff_t
precision
;
TimerDateUDiff_t
lcp
;
TimerDateUDiff_t
osp
;
...
...
@@ -143,7 +143,7 @@ struct prrtTimer {
typedef
struct
prrtTimer
Timer
;
static
bool
timer_date_is_due
(
Timer
*
self
,
TimerDate
*
when
,
const
TimerDate
*
now
)
static
bool
timer_date_is_due
(
Timer
*
self
,
const
TimerDate
*
when
,
const
TimerDate
*
now
)
{
// TODO: use self->precision to check whether now and *when are similar enough
(
void
)
self
;
...
...
@@ -208,6 +208,8 @@ static void *timer_worker_loop(void *arg)
TimerNode
*
task
=
atomic_load
(
&
self
->
new
);
assert
(
task
!=
NULL
&&
"task list contains NULL node"
);
atomic_store
(
&
self
->
cur
,
task
);
if
(
timer_date_is_inf
(
&
task
->
date
))
{
if
(
!
atomic_load_explicit
(
&
self
->
alive
,
memory_order_acquire
))
{
if
(
task
==
atomic_load
(
&
self
->
new
))
...
...
@@ -260,11 +262,9 @@ static void *timer_worker_loop(void *arg)
task
->
done
=
true
;
}
atomic_store
(
&
self
->
del
,
task
);
TimerNode
*
next
=
atomic_load
(
&
task
->
next
);
TimerNode
*
temp
=
task
;
atomic_compare_exchange_strong
(
&
self
->
new
,
&
temp
,
next
);
atomic_store
(
&
self
->
del
,
NULL
);
if
(
slept
&&
!
learned
)
{
clock_gettime
(
CLOCK_REALTIME
,
&
td1
);
...
...
@@ -306,7 +306,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)
atomic_store_explicit
(
&
self
->
wait
,
0
,
memory_order_relaxed
);
atomic_store_explicit
(
&
self
->
new
,
node
,
memory_order_relaxed
);
atomic_store_explicit
(
&
self
->
old
,
node
,
memory_order_relaxed
);
atomic_store_explicit
(
&
self
->
del
,
NULL
,
memory_order_relaxed
);
atomic_store_explicit
(
&
self
->
cur
,
node
,
memory_order_relaxed
);
self
->
precision
=
timer_measure_clock_precision
();
for
(
int
i
=
0
;
i
<
OSP_WINDOW_SIZE
;
i
++
)
...
...
@@ -364,7 +364,7 @@ PrrtTimer *PrrtTimer_create(unsigned int core)
int
PrrtTimer_submit
(
PrrtTimer
*
self
,
const
TimerDate
*
when
,
const
PrrtTimerTask
*
what
)
{
TimerNode
*
iter
,
*
stop
,
*
next
;
TimerNode
*
iter
,
*
stop
,
*
next
,
*
hold
;
TimerNode
*
node
=
malloc
(
sizeof
(
TimerNode
));
if
(
!
node
)
return
-
1
;
...
...
@@ -380,13 +380,14 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
iter
=
atomic_load
(
&
self
->
old
);
stop
=
atomic_load
(
&
self
->
new
);
while
(
iter
!=
stop
)
{
hold
=
atomic_load
(
&
self
->
cur
);
while
(
iter
!=
stop
&&
iter
!=
hold
)
{
next
=
iter
->
next
;
assert
(
iter
->
done
&&
"cleanup task that is not marked as done"
);
free
(
iter
);
iter
=
next
;
}
atomic_store
(
&
self
->
old
,
stop
);
atomic_store
(
&
self
->
old
,
iter
);
_Atomic
(
TimerNode
*
)
*
addr
=
&
self
->
old
;
while
(
1
)
{
...
...
@@ -407,9 +408,8 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
atomic_store
(
&
node
->
next
,
iter
);
atomic_store
(
addr
,
node
);
TimerNode
*
del
=
atomic_load
(
&
self
->
del
);
TimerNode
*
tail
=
atomic_load
(
&
self
->
new
);
if
(
del
==
tail
||
timer_date_is_lt
(
&
node
->
date
,
&
tail
->
date
))
{
if
(
timer_date_is_lt
(
&
node
->
date
,
&
tail
->
date
)
||
(
addr
==
&
tail
->
next
&&
atomic_load
(
&
tail
->
done
))
)
{
atomic_store
(
&
self
->
new
,
node
);
timer_wake_worker
(
self
,
false
);
}
...
...
@@ -417,6 +417,50 @@ int PrrtTimer_submit(PrrtTimer *self, const TimerDate *when, const PrrtTimerTask
return
-
1
;
}
static
void
wake_sleeping_thread
(
void
*
arg
)
{
atomic_int
*
ip
=
(
atomic_int
*
)
arg
;
atomic_store_explicit
(
ip
,
1
,
memory_order_release
);
futex
((
int
*
)
ip
,
FUTEX_WAKE
|
FUTEX_PRIVATE_FLAG
,
1
,
NULL
,
NULL
,
0
);
}
void
PrrtTimer_sleep_until
(
PrrtTimer
*
self
,
const
TimerDate
*
end
)
{
atomic_int
cond
;
atomic_store_explicit
(
&
cond
,
0
,
memory_order_release
);
TimerDate
now
;
TimerDate
care
=
*
end
;
PrrtTimerTask
what
;
what
.
fun
=
wake_sleeping_thread
;
what
.
arg
=
&
cond
;
timer_date_sub
(
&
care
,
2
*
self
->
osp
);
clock_gettime
(
CLOCK_REALTIME
,
&
now
);
if
(
!
timer_date_is_due
(
self
,
&
care
,
&
now
))
{
PrrtTimer_submit
(
self
,
&
care
,
&
what
);
while
(
!
atomic_load
(
&
cond
))
{
clock_gettime
(
CLOCK_REALTIME
,
&
now
);
if
(
!
timer_date_is_due
(
self
,
&
care
,
&
now
))
//futex(&cond, FUTEX_WAIT_BITSET|FUTEX_PRIVATE_FLAG|FUTEX_CLOCK_REALTIME, 1, care, NULL, FUTEX_BITSET_MATCH_ANY);
futex
(
&
cond
,
FUTEX_WAIT
|
FUTEX_PRIVATE_FLAG
,
0
,
NULL
,
NULL
,
0
);
}
}
while
(
1
)
{
clock_gettime
(
CLOCK_REALTIME
,
&
now
);
if
(
timer_date_is_due
(
self
,
end
,
&
now
))
break
;
}
}
void
PrrtTimer_sleep_nanos
(
PrrtTimer
*
self
,
TimerDateUDiff_t
nanos
)
{
TimerDate
when
;
clock_gettime
(
CLOCK_REALTIME
,
&
when
);
timer_date_add
(
&
when
,
nanos
);
PrrtTimer_sleep_until
(
self
,
&
when
);
}
void
PrrtTimer_end
(
PrrtTimer
*
self
)
{
atomic_store_explicit
(
&
self
->
alive
,
false
,
memory_order_release
);
...
...
prrt/proto/timer.h
View file @
b6175edc
...
...
@@ -10,6 +10,7 @@ typedef void *prrtTimerTaskArg;
typedef
void
(
*
prrtTimerTaskFun
)(
prrtTimerTaskArg
);
typedef
struct
timespec
prrtTimerDate
;
typedef
unsigned
long
long
TimerDateUDiff_t
;
typedef
struct
prrtTimerTask
{
prrtTimerTaskFun
fun
;
...
...
@@ -24,4 +25,7 @@ int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTime
void
PrrtTimer_end
(
PrrtTimer
*
timer
);
#endif // PRRT_TIMER_H
\ No newline at end of file
void
PrrtTimer_sleep_until
(
PrrtTimer
*
self
,
const
prrtTimerDate
*
end
);
void
PrrtTimer_sleep_nanos
(
PrrtTimer
*
self
,
TimerDateUDiff_t
nanos
);
#endif // PRRT_TIMER_H
prrt/proto/types/block.c
View file @
b6175edc
...
...
@@ -18,7 +18,14 @@ static void gather_redundancy_packets(const PrrtBlock *block_ptr, gf *const *fec
m
++
;
}
PrrtPacket_copy_payload_to_buffer
(
fec
[
m
],
packet
,
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
// Take the data payload out of the redundancy payload and
// decode/deserialize it into the fec matrix
PrrtPacket_decode_payload
(
packet
->
payload
+
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
,
PACKET_TYPE_DATA
,
fec
[
m
],
packet
->
payloadLength
);
idx_p
[
m
]
=
packet
->
index
;
PrrtPacket_destroy
(
packet
);
}
...
...
@@ -219,7 +226,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
check_mem
(
src
[
j
]);
PrrtPacket
*
pkt
=
cur
->
value
;
pkt
->
index
=
(
uint8_t
)
((
pkt
->
sequenceNumber
-
baseSequenceNumber
)
%
SEQNO_SPACE
);
PrrtPacket_
copy
_payload
_to_buffer
(
src
[
j
],
pkt
,
0
);
PrrtPacket_
encode
_payload
(
src
[
j
],
pkt
);
j
++
;
}
...
...
@@ -228,6 +235,9 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, prrtSequenceNumber_t *seqno)
for
(
j
=
0
;
j
<
r
;
j
++
)
{
fec
[
j
]
=
calloc
(
length
,
sizeof
(
gf
));
check_mem
(
fec
[
j
]);
// Encode/Serialize the payload and store it in the fec matrix
// A simple copy does not suffice, as the payload is send 'as is'
// over the wire. It however has to conform the protocol specification.
PrrtCoder_encode
(
block_ptr
->
coder
,
src
,
fec
[
j
],
j
+
k
,
length
);
PrrtPacket
*
red_packet_ptr
=
PrrtPacket_create_redundancy_packet
(
0
,
(
void
*
)
fec
[
j
],
length
,
*
seqno
,
(
uint8_t
)
(
k
+
j
),
block_ptr
->
baseSequenceNumber
,
...
...
@@ -276,15 +286,19 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
idx_p
[
i
]
=
-
1
;
}
// Store the data payload with header in the fec matrix
gather_data_packets
(
block_ptr
,
fec
,
idx_p
);
// Store the data payload with header from the redundancy packets in the fec matrix
gather_redundancy_packets
(
block_ptr
,
fec
,
idx_p
);
// Decode the fec matrix in place
check
(
PrrtCoder_decode
(
block_ptr
->
coder
,
fec
,
idx_p
,
length
)
==
EXIT_SUCCESS
,
"Could not decode current block."
);
for
(
j
=
0
;
j
<
k
;
j
++
)
{
if
(
idx_p
[
j
]
>=
k
)
{
// The content of fec[j] is correctly deserialized and can be
// cast into a payload struct
PrrtPacketDataPayload
*
packet_and_payload
=
(
PrrtPacketDataPayload
*
)
fec
[
j
];
PrrtPacket
*
packet
=
PrrtPacket_reconstruct_data_packet
(
packet_and_payload
,
j
,
(
prrtSequenceNumber_t
)
(
baseSequenceNumber
+
j
));
debug
(
DEBUG_BLOCK
,
"Reconstructed [D]: %d"
,
packet
->
sequenceNumber
);
...
...
prrt/proto/types/packet.c
View file @
b6175edc
...
...
@@ -128,27 +128,30 @@ create_header(uint8_t priority, prrtSequenceNumber_t seqno, prrtPacketLength_t s
return
NULL
;
}
bool
PrrtPacket_encode
(
void
*
buf_ptr
,
uint16_t
buf_size
,
PrrtPacket
*
packet_ptr
)
{
void
*
payload
=
packet_ptr
->
payload
;
check
(
packet_ptr
->
payloadLength
+
PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH
<=
buf_size
,
"Buffer too small."
);
buf_ptr
=
encode_general_header
(
buf_ptr
,
packet_ptr
);
bool
PrrtPacket_encode_payload
(
void
*
buf_ptr
,
PrrtPacket
*
packet_ptr
)
{
prrtPacketType_t
type
=
PrrtPacket_type
(
packet_ptr
);
if
(
type
==
PACKET_TYPE_DATA
)
{
buf_ptr
=
encode_data_header
(
buf_ptr
,
payload
);
buf_ptr
=
encode_data_header
(
buf_ptr
,
packet_ptr
->
payload
);
PrrtPacket_copy_payload_to_buffer
(
buf_ptr
,
packet_ptr
,
PRRT_PACKET_DATA_HEADER_SIZE
);
}
else
if
(
type
==
PACKET_TYPE_REDUNDANCY
)
{
buf_ptr
=
encode_redundancy_header
(
buf_ptr
,
payload
);
buf_ptr
=
encode_redundancy_header
(
buf_ptr
,
packet_ptr
->
payload
);
PrrtPacket_copy_payload_to_buffer
(
buf_ptr
,
packet_ptr
,
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
}
else
if
(
type
==
PACKET_TYPE_FEEDBACK
)
{
encode_feedback_header
(
buf_ptr
,
payload
);
encode_feedback_header
(
buf_ptr
,
packet_ptr
->
payload
);
}
else
{
perror
(
"NOT IMPLEMENTED"
);
return
false
;
}
return
true
;
}
bool
PrrtPacket_encode
(
void
*
buf_ptr
,
uint16_t
buf_size
,
PrrtPacket
*
packet_ptr
)
{
check
(
packet_ptr
->
payloadLength
+
PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH
<=
buf_size
,
"Buffer too small."
);
buf_ptr
=
encode_general_header
(
buf_ptr
,
packet_ptr
);
return
PrrtPacket_encode_payload
(
buf_ptr
,
packet_ptr
);
error:
return
false
;
...
...
@@ -216,6 +219,22 @@ void *encode_general_header(void *buf_ptr, const PrrtPacket *packet) {
return
buf_ptr
;
}
bool
PrrtPacket_decode_payload
(
void
*
srcBuffer
,
prrtPacketType_t
packetType
,
void
*
payload
,
uint32_t
payload_len
)
{
if
(
packetType
==
PACKET_TYPE_DATA
)
{
srcBuffer
=
decode_data_header
(
srcBuffer
,
payload
);
memcpy
(
payload
+
PRRT_PACKET_DATA_HEADER_SIZE
,
srcBuffer
,
payload_len
-
PRRT_PACKET_DATA_HEADER_SIZE
);
}
else
if
(
packetType
==
PACKET_TYPE_REDUNDANCY
)
{
srcBuffer
=
decode_redundancy_header
(
srcBuffer
,
payload
);
memcpy
(
payload
+
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
,
srcBuffer
,
payload_len
-
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
}
else
if
(
packetType
==
PACKET_TYPE_FEEDBACK
)
{
decode_feedback_header
(
srcBuffer
,
payload
);
}
else
{
perror
(
"NOT IMPLEMENTED
\n
"
);
return
false
;
}
return
true
;
}
bool
PrrtPacket_decode
(
void
*
srcBuffer
,
uint16_t
srcBufferSize
,
PrrtPacket
*
targetPacket
)
{
// targetPacket is uninitialized, so we need to set the reference count
...
...
@@ -238,18 +257,7 @@ bool PrrtPacket_decode(void *srcBuffer, uint16_t srcBufferSize, PrrtPacket *targ
targetPacket
->
payloadLength
=
payload_len
;
prrtPacketType_t
packetType
=
PrrtPacket_type
(
targetPacket
);
if
(
packetType
==
PACKET_TYPE_DATA
)
{
srcBuffer
=
decode_data_header
(
srcBuffer
,
payload_buffer
);
PrrtPacket_copy_buffer_to_payload
(
targetPacket
,
srcBuffer
,
PRRT_PACKET_DATA_HEADER_SIZE
);
}
else
if
(
packetType
==
PACKET_TYPE_REDUNDANCY
)
{
srcBuffer
=
decode_redundancy_header
(
srcBuffer
,
payload_buffer
);
PrrtPacket_copy_buffer_to_payload
(
targetPacket
,
srcBuffer
,
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
}
else
if
(
packetType
==
PACKET_TYPE_FEEDBACK
)
{
decode_feedback_header
(
srcBuffer
,
payload_buffer
);
}
else
{
printf
(
"NOT IMPLEMENTED
\n
"
);
}
return
true
;
return
PrrtPacket_decode_payload
(
srcBuffer
,
packetType
,
payload_buffer
,
payload_len
);
error:
return
false
;
...
...
prrt/proto/types/packet.h
View file @
b6175edc
...
...
@@ -162,8 +162,10 @@ PrrtPacket *PrrtPacket_create_redundancy_packet(uint8_t priority, void *payloadP
PrrtCodingConfiguration
*
codingParams
);
bool
PrrtPacket_decode
(
void
*
srcBuffer
,
uint16_t
srcBufferSize
,
PrrtPacket
*
targetPacket
);
bool
PrrtPacket_decode_payload
(
void
*
srcBuffer
,
prrtPacketType_t
packetType
,
void
*
payload
,
uint32_t
payload_len
);
bool
PrrtPacket_encode
(
void
*
buf_ptr
,
uint16_t
buf_size
,
PrrtPacket
*
packet_ptr
);
bool
PrrtPacket_encode_payload
(
void
*
buf_ptr
,
PrrtPacket
*
packet_ptr
);
int
PrrtPacket_destroy
(
PrrtPacket
*
packet
);
...
...
prrt/util/common.c
View file @
b6175edc
...
...
@@ -43,3 +43,17 @@ int pin_thread_to_core(pthread_attr_t *ap, int core)
return
pthread_attr_setaffinity_np
(
ap
,
sizeof
(
cpu_set_t
),
&
cpuset
);
}
struct
timespec
abstime_from_now
(
uint32_t
wait_time
)
{
struct
timespec
now
;
clock_gettime
(
CLOCK_REALTIME
,
&
now
);
struct
timespec
deadline
;
uint32_t
diff_s
=
wait_time
/
1000000
;
uint32_t
diff_ns
=
(
wait_time
%
1000000
)
*
1000
;
__syscall_slong_t
sum
=
diff_ns
+
now
.
tv_nsec
;
__syscall_slong_t
carry
=
(
sum
)
/
1000000000
;
__syscall_slong_t
rest
=
(
sum
)
%
1000000000
;
deadline
.
tv_sec
=
diff_s
+
now
.
tv_sec
+
carry
;
deadline
.
tv_nsec
=
rest
;
return
deadline
;
}
prrt/util/common.h
View file @
b6175edc
...
...
@@ -9,6 +9,7 @@
int
print_buffer
(
const
char
*
buf
,
const
int
length
);
void
print_gf
(
const
gf
*
start
,
const
int
len
);
int
pin_thread_to_core
(
pthread_attr_t
*
ap
,
int
core
);
struct
timespec
abstime_from_now
(
uint32_t
wait_time
);
#define PERROR(fmt, args...) \
printf("PRRT ERROR (" __FILE__ ":%d)\n" fmt, __LINE__, ## args);
...
...
prrt/util/time.c
View file @
b6175edc
...
...
@@ -15,21 +15,6 @@ struct timespec abstime_now() {
return
now
;
}
struct
timespec
abstime_from_now
(
uint32_t
wait_time
)
{
struct
timespec
now
;
clock_gettime
(
CLOCK_REALTIME
,
&
now
);
struct
timespec
deadline
;
uint32_t
diff_s
=
wait_time
/
1000000
;
uint32_t
diff_ns
=
(
wait_time
%
1000000
)
*
1000
;
__syscall_slong_t
sum
=
diff_ns
+
now
.
tv_nsec
;
__syscall_slong_t
carry
=
(
sum
)
/
1000000000
;
__syscall_slong_t
rest
=
(
sum
)
%
1000000000
;
deadline
.
tv_sec
=
diff_s
+
now
.
tv_sec
+
carry
;
deadline
.
tv_nsec
=
rest
;
return
deadline
;
}
// < 0: a less than b (b is in the future)
// > 0: a greater b (b is in the past)
// == 0: a equal b
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment