Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
LARN
PRRT
Commits
0307ec2c
Commit
0307ec2c
authored
Mar 15, 2016
by
Andreas Schmidt
Browse files
Merged branch develop into feature/lossGathering
parents
44cca42d
293d07ce
Changes
19
Expand all
Hide whitespace changes
Inline
Side-by-side
src/cython/cprrt.pxd
View file @
0307ec2c
...
...
@@ -63,7 +63,7 @@ cdef extern from "prrt/block.h":
uint32_t
redundancy_count
PrrtCodingParams
coding_params
uint32_t
largest_data_length
uint16_t
base
_seqno
uint16_t
base
SequenceNumber
List
*
data_blocks
List
*
redundancy_blocks
uint8_t
is_coded
...
...
@@ -74,9 +74,9 @@ cdef extern from "prrt/packet.h":
cdef
struct
prrtPacket
:
uint8_t
type_priority
;
uint8_t
index
;
uint16_t
seq
no
;
uint16_t
seq
uenceNumber
;
void
*
payload
;
uint32_t
payload
_len
;
uint32_t
payload
Length
;
ctypedef
prrtPacket
PrrtPacket
...
...
@@ -121,7 +121,7 @@ cdef extern from "prrt/socket.h":
ctypedef
prrtSocket
PrrtSocket
cdef
PrrtSocket
*
PrrtSocket_create
(
bint
is
_s
ender
)
cdef
PrrtSocket
*
PrrtSocket_create
(
bint
is
S
ender
)
bint
PrrtSocket_bind
(
PrrtSocket
*
sock_ptr
,
const_char
*
ipAddress
,
const
uint16_t
port
)
int
PrrtSocket_close
(
const
PrrtSocket
*
sock_ptr
)
int
PrrtSocket_connect
(
PrrtSocket
*
sock_ptr
,
const_char
*
host
,
const
uint16_t
port
)
...
...
src/cython/prrt.pyx
View file @
0307ec2c
...
...
@@ -3,7 +3,7 @@ cimport cprrt
cdef
extern
from
"prrt/stores/forwardPacketTable.c"
:
int
PrrtForwardPacketTable_create
(
cprrt
.
PrrtForwardPacketTable
*
fpt_prt
);
int
PrrtForwardPacketTable_test_set_is_number_relevant
(
cprrt
.
PrrtForwardPacketTable
*
fpt_ptr
,
uint16_t
seq
no
);
int
PrrtForwardPacketTable_test_set_is_number_relevant
(
cprrt
.
PrrtForwardPacketTable
*
fpt_ptr
,
uint16_t
seq
uenceNumber
);
int
PrrtForwardPacketTable_test_is_block_relevant
(
cprrt
.
PrrtForwardPacketTable
*
forwardPacketTable
,
uint16_t
start
,
uint16_t
length
);
int
PrrtForwardPacketTable_destroy
(
cprrt
.
PrrtForwardPacketTable
*
fpt_prt
);
...
...
@@ -42,7 +42,7 @@ cdef extern from "prrt/packet.c":
int
PrrtPacket_print
(
cprrt
.
PrrtPacket
*
packet_ptr
)
cdef
extern
from
"prrt/socket.c"
:
cprrt
.
PrrtSocket
*
PrrtSocket_create
(
uint16_t
port
,
uint8_t
is
_s
ender
)
cprrt
.
PrrtSocket
*
PrrtSocket_create
(
uint16_t
port
,
uint8_t
is
S
ender
)
cdef
extern
from
"util/bptree.c"
:
cprrt
.
BPTreeNode
*
BPTree_insert
(
cprrt
.
BPTreeNode
*
root
,
int
key
,
void
*
value
)
...
...
src/prrt/block.c
View file @
0307ec2c
...
...
@@ -61,7 +61,7 @@ void PrrtBlock_destroy(PrrtBlock *mblock)
free
(
mblock
);
}
PrrtBlock
*
PrrtBlock_create
(
const
PrrtCodingParams
*
cpar
,
uint16_t
base_seqno
)
PrrtBlock
*
PrrtBlock_create
(
const
PrrtCodingParams
*
cpar
,
prrtSequenceNumber_t
baseSequenceNumber
)
{
PrrtBlock
*
block
=
calloc
(
1
,
sizeof
(
PrrtBlock
));
check_mem
(
block
);
...
...
@@ -69,7 +69,7 @@ PrrtBlock * PrrtBlock_create(const PrrtCodingParams *cpar, uint16_t base_seqno)
block
->
codingParams
=
*
cpar
;
block
->
dataPackets
=
List_create
();
block
->
redundancyPackets
=
List_create
();
block
->
baseSequenceNumber
=
base
_seqno
;
block
->
baseSequenceNumber
=
base
SequenceNumber
;
block
->
largestDataLength
=
0
;
return
block
;
...
...
@@ -84,15 +84,15 @@ bool PrrtBlock_insert_data_packet(PrrtBlock *prrtBlock, const PrrtPacket *prrtPa
bool
found
=
false
;
LIST_FOREACH
(
prrtBlock
->
dataPackets
,
first
,
next
,
cur
)
{
PrrtPacket
*
pkt
=
cur
->
value
;
if
(
pkt
->
seq
no
==
prrtPacket
->
seq
no
)
{
if
(
pkt
->
seq
uenceNumber
==
prrtPacket
->
seq
uenceNumber
)
{
found
=
true
;
}
}
if
(
found
==
false
)
{
List_push
(
prrtBlock
->
dataPackets
,
prrtPacket
);
prrtBlock
->
largestDataLength
=
(
uint32
_t
)
MAX
(
prrtBlock
->
largestDataLength
,
prrtPacket
->
payload
_len
-
PRRT_PACKET_DATA_HEADER_SIZE
);
prrtBlock
->
largestDataLength
=
(
prrtPacketLength
_t
)
MAX
(
prrtBlock
->
largestDataLength
,
prrtPacket
->
payload
Length
-
PRRT_PACKET_DATA_HEADER_SIZE
);
return
true
;
}
else
{
return
false
;
...
...
@@ -104,15 +104,15 @@ bool PrrtBlock_insert_redundancy_packet(PrrtBlock *block_ptr, const PrrtPacket *
bool
found
=
false
;
LIST_FOREACH
(
block_ptr
->
redundancyPackets
,
first
,
next
,
cur
)
{
PrrtPacket
*
pkt
=
cur
->
value
;
if
(
pkt
->
seq
no
==
ptr
->
seq
no
)
{
if
(
pkt
->
seq
uenceNumber
==
ptr
->
seq
uenceNumber
)
{
found
=
true
;
}
}
if
(
found
==
false
)
{
List_push
(
block_ptr
->
redundancyPackets
,
ptr
);
block_ptr
->
largestDataLength
=
(
uint32
_t
)
MAX
(
block_ptr
->
largestDataLength
,
ptr
->
payload
_len
-
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
block_ptr
->
largestDataLength
=
(
prrtPacketLength
_t
)
MAX
(
block_ptr
->
largestDataLength
,
ptr
->
payload
Length
-
PRRT_PACKET_REDUNDANCY_HEADER_SIZE
);
return
true
;
}
else
{
return
false
;
...
...
@@ -134,14 +134,14 @@ PrrtPacket *PrrtBlock_get_first_data(PrrtBlock *block_ptr)
return
List_shift
(
block_ptr
->
dataPackets
);
}
void
PrrtBlock_encode
(
PrrtBlock
*
block_ptr
,
uint16
_t
*
seqno
)
void
PrrtBlock_encode
(
PrrtBlock
*
block_ptr
,
prrtSequenceNumber
_t
*
seqno
)
{
int
j
=
0
;
uint8_t
k
=
block_ptr
->
codingParams
.
k
;
uint8_t
n
=
block_ptr
->
codingParams
.
n
;
uint8_t
r
=
block_ptr
->
codingParams
.
r
;
uint16
_t
baseSequenceNumber
=
block_ptr
->
baseSequenceNumber
;
uint32
_t
length
=
block_ptr
->
largestDataLength
;
prrtSequenceNumber
_t
baseSequenceNumber
=
block_ptr
->
baseSequenceNumber
;
prrtPacketLength
_t
length
=
block_ptr
->
largestDataLength
;
PrrtCoder
*
coder
=
NULL
;
...
...
@@ -154,7 +154,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
src
[
j
]
=
calloc
(
length
,
sizeof
(
gf
));
check_mem
(
src
[
j
]);
PrrtPacket
*
pkt
=
cur
->
value
;
pkt
->
index
=
(
uint8_t
)
((
pkt
->
seq
no
-
baseSequenceNumber
)
%
SEQNO_SPACE
);
pkt
->
index
=
(
uint8_t
)
((
pkt
->
seq
uenceNumber
-
baseSequenceNumber
)
%
SEQNO_SPACE
);
PrrtPacket_copy_payload_to_buffer
(
src
[
j
],
pkt
,
PRRT_PACKET_DATA_HEADER_SIZE
);
j
++
;
}
...
...
@@ -168,7 +168,7 @@ void PrrtBlock_encode(PrrtBlock *block_ptr, uint16_t *seqno)
PrrtPacket
*
red_packet_ptr
=
PrrtPacket_create_redundancy_packet
(
0
,
(
void
*
)
fec
[
j
],
length
,
*
seqno
,
(
uint8_t
)
(
k
+
j
),
block_ptr
->
baseSequenceNumber
,
block_ptr
->
codingParams
);
*
seqno
=
(
uint16
_t
)
((
*
seqno
+
1
)
%
SEQNO_SPACE
);
*
seqno
=
(
prrtSequenceNumber
_t
)
((
*
seqno
+
1
)
%
SEQNO_SPACE
);
PrrtBlock_insert_redundancy_packet
(
block_ptr
,
red_packet_ptr
);
}
...
...
@@ -193,8 +193,8 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
int
*
idx_p
=
NULL
;
uint8_t
n
=
block_ptr
->
codingParams
.
n
;
uint8_t
k
=
block_ptr
->
codingParams
.
k
;
uint16
_t
baseSequenceNumber
=
block_ptr
->
baseSequenceNumber
;
uint32
_t
length
=
block_ptr
->
largestDataLength
;
prrtSequenceNumber
_t
baseSequenceNumber
=
block_ptr
->
baseSequenceNumber
;
prrtPacketLength
_t
length
=
block_ptr
->
largestDataLength
;
PrrtCoder
*
coder
=
NULL
;
...
...
@@ -219,7 +219,7 @@ bool PrrtBlock_decode(PrrtBlock *block_ptr)
for
(
j
=
0
;
j
<
k
;
j
++
)
{
if
(
idx_p
[
j
]
>=
k
)
{
PrrtPacket
*
packet
=
PrrtPacket_create_data_packet
(
0
,
fec
[
j
],
length
,
(
uint16
_t
)
(
baseSequenceNumber
+
j
));
PrrtPacket
*
packet
=
PrrtPacket_create_data_packet
(
0
,
fec
[
j
],
length
,
(
prrtSequenceNumber
_t
)
(
baseSequenceNumber
+
j
));
if
(
PrrtBlock_insert_data_packet
(
block_ptr
,
packet
)
==
false
)
{
debug
(
"Tried to insert unnecessary packet."
);
PrrtPacket_destroy
(
packet
);
...
...
src/prrt/block.h
View file @
0307ec2c
...
...
@@ -7,8 +7,8 @@
typedef
struct
prrtBlock
{
PrrtCodingParams
codingParams
;
uint32
_t
largestDataLength
;
uint16
_t
baseSequenceNumber
;
prrtPacketLength
_t
largestDataLength
;
prrtSequenceNumber
_t
baseSequenceNumber
;
List
*
dataPackets
;
List
*
redundancyPackets
;
bool
isCoded
;
...
...
@@ -18,7 +18,7 @@ typedef struct prrtBlock {
/**
* Allocate space for a block.
*/
PrrtBlock
*
PrrtBlock_create
(
const
PrrtCodingParams
*
cpar
,
uint16_t
base_seqno
);
PrrtBlock
*
PrrtBlock_create
(
const
PrrtCodingParams
*
cpar
,
prrtSequenceNumber_t
baseSequenceNumber
);
/**
* Frees the PrrtBlock data structure.
...
...
@@ -33,7 +33,7 @@ bool PrrtBlock_decode_ready(const PrrtBlock *block_ptr);
PrrtPacket
*
PrrtBlock_get_first_data
(
PrrtBlock
*
block_ptr
);
void
PrrtBlock_encode
(
PrrtBlock
*
block_ptr
,
uint16
_t
*
seqno
);
void
PrrtBlock_encode
(
PrrtBlock
*
block_ptr
,
prrtSequenceNumber
_t
*
seqno
);
bool
PrrtBlock_decode
(
PrrtBlock
*
block_ptr
);
PrrtPacket
*
PrrtBlock_get_first_red_data
(
PrrtBlock
*
block_ptr
);
...
...
src/prrt/channelStateInformation.c
View file @
0307ec2c
...
...
@@ -16,12 +16,12 @@ void PrrtChannelStateInformation_init(PrrtChannelStateInformation *csi)
PERROR
(
"Should not happen.%s"
,
""
);
}
void
PrrtChannelStateInformation_update_rtt
(
PrrtChannelStateInformation
*
csi
,
uint32
_t
rtt
)
void
PrrtChannelStateInformation_update_rtt
(
PrrtChannelStateInformation
*
csi
,
prrtTimedelta
_t
rtt
)
{
int32_t
delta
=
rtt
-
csi
->
rttMean
;
// TODO: ensure that there are no arithemtic problems via rounding etc.
csi
->
rttMean
=
(
uint16
_t
)
(
csi
->
rttMean
+
RRT_ALPHA
*
delta
);
csi
->
rttDev
=
(
uint16
_t
)
(
csi
->
rttDev
+
RRT_ALPHA
*
(
labs
(
delta
)
-
csi
->
rttDev
));
csi
->
rttMean
=
(
prrtTimedelta
_t
)
(
csi
->
rttMean
+
RRT_ALPHA
*
delta
);
csi
->
rttDev
=
(
prrtTimedelta
_t
)
(
csi
->
rttDev
+
RRT_ALPHA
*
(
labs
(
delta
)
-
csi
->
rttDev
));
}
void
PrrtChannelStateInformation_print
(
PrrtChannelStateInformation
*
csi
)
...
...
src/prrt/channelStateInformation.h
View file @
0307ec2c
#ifndef PRRT_CHANNELSTATEINFORMATION_H
#define PRRT_CHANNELSTATEINFORMATION_H
#include
<stdin
t.h
>
#include
"packe
t.h
"
typedef
struct
prrtChannelStateInformation
{
uint16
_t
rttMean
;
uint16
_t
rttDev
;
prrtTimedelta
_t
rttMean
;
prrtTimedelta
_t
rttDev
;
}
PrrtChannelStateInformation
;
void
PrrtChannelStateInformation_init
(
PrrtChannelStateInformation
*
csi
);
void
PrrtChannelStateInformation_update_rtt
(
PrrtChannelStateInformation
*
csi
,
uint32
_t
rtt
);
void
PrrtChannelStateInformation_update_rtt
(
PrrtChannelStateInformation
*
csi
,
prrtTimedelta
_t
rtt
);
void
PrrtChannelStateInformation_print
(
PrrtChannelStateInformation
*
csi
);
...
...
src/prrt/clock.c
View file @
0307ec2c
#include
<sys/time.h>
#include
<stddef.h>
#include
"clock.h"
#include
"packet.h"
uint32
_t
PrrtClock_get_current_time
(
void
)
{
prrtTimestamp
_t
PrrtClock_get_current_time
(
void
)
{
struct
timeval
tv
;
gettimeofday
(
&
tv
,
NULL
);
uint32_t
time_in_micros
=
(
uint32_t
)
(
1000000
*
tv
.
tv_sec
+
tv
.
tv_usec
);
return
time_in_micros
;
return
(
prrtTimestamp_t
)
(
1000000
*
tv
.
tv_sec
+
tv
.
tv_usec
);
}
src/prrt/clock.h
View file @
0307ec2c
#ifndef PRRT_CLOCK_H
#define PRRT_CLOCK_H
#include
<stdin
t.h
>
#include
"packe
t.h
"
uint32
_t
PrrtClock_get_current_time
(
void
);
prrtTimestamp
_t
PrrtClock_get_current_time
(
void
);
#endif //PRRT_CLOCK_H
src/prrt/codingParams.h
View file @
0307ec2c
...
...
@@ -3,6 +3,7 @@
#include
<pthread.h>
#include
<stdint.h>
#include
<stdbool.h>
typedef
struct
prrtCodingParams
{
pthread_mutex_t
lock
;
...
...
src/prrt/packet.c
View file @
0307ec2c
This diff is collapsed.
Click to expand it.
src/prrt/packet.h
View file @
0307ec2c
#ifndef PRRT_FRAME_H
#define PRRT_FRAME_H
// CREATE: PRRT_PACKET
// ENCODE: PRRT_PACKET -> BYTES
// DECODE: BYTES -> PRRT_PACKET
// DESTROY:
#include
<stdint.h>
#include
"codingParams.h"
#include
<stdbool.h>
#define PACKET_TYPE_DATA 0
#define PACKET_TYPE_REPETITION 1
#define PACKET_TYPE_REDUNDANCY 2
#define PACKET_TYPE_FEEDBACK 3
#define PACKET_TYPE_PRE_REDUNDANCY 4
#define PACKET_TYPE_CHANNEL_FEEDBACK 5
typedef
enum
{
PACKET_TYPE_DATA
=
0
,
PACKET_TYPE_REPETITION
=
1
,
PACKET_TYPE_REDUNDANCY
=
2
,
PACKET_TYPE_FEEDBACK
=
3
,
PACKET_TYPE_PRESENT_REDUNDANCY
=
4
,
PACKET_TYPE_CHANNEL_FEEDBACK
=
5
}
prrtPacketType_t
;
typedef
uint16_t
prrtSequenceNumber_t
;
typedef
uint8_t
prrtIndex_t
;
typedef
uint32_t
prrtTimestamp_t
;
typedef
uint32_t
prrtTimedelta_t
;
typedef
uint32_t
prrtPacketLength_t
;
typedef
struct
prrtIncompleteBlock
{
prrtSequenceNumber_t
sequenceNumberBase
;
uint16_t
repairCycleIndex
;
}
PrrtIncompleteBlock
;
typedef
struct
prrtPacket
{
uint8_t
type_priority
;
uint8
_t
index
;
uint16_t
seqno
;
void
*
payload
;
uint32
_t
payload
_len
;
prrtIndex
_t
index
;
prrtSequenceNumber_t
sequenceNumber
;
void
*
payload
;
prrtPacketLength
_t
payload
Length
;
}
PrrtPacket
;
#define PRRT_PACKET_GENERAL_HEADER_SIZE 8
#define PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH 4
typedef
struct
prrtPacketDataPayload
{
uint32
_t
timestamp
;
uint16_t
group_round_trip_time
;
uint16
_t
packet
_t
imeout
;
uint16
_t
decoding
_t
imeout
;
uint16
_t
feedback
_t
imeout
;
prrtTimestamp
_t
timestamp
;
prrtTimedelta_t
groupRTT_us
;
prrtTimedelta
_t
packet
T
imeout
_us
;
prrtTimedelta
_t
decoding
T
imeout
_us
;
prrtTimedelta
_t
feedback
T
imeout
_us
;
}
PrrtPacketDataPayload
;
#define PRRT_PACKET_DATA_HEADER_SIZE sizeof(PrrtPacketDataPayload)
typedef
struct
prrtPacketRedundancyPayload
{
uint16_t
base_seqno
;
prrtSequenceNumber_t
baseSequenceNumber
;
uint8_t
n
;
uint8_t
k
;
}
PrrtPacketRedundancyPayload
;
#define PRRT_PACKET_REDUNDANCY_HEADER_SIZE sizeof(PrrtPacketRedundancyPayload)
typedef
struct
prrtPacketFeedbackPayload
{
uint32_t
receiver_addr
;
uint32_t
group_round_trip_time
;
uint32_t
forward_trip_timestamp
;
uint32_t
packet_loss_rate
;
uint16_t
gap
;
uint16_t
ngap
;
uint16_t
burst
;
uint16_t
nburst
;
uint32_t
bandwidth_estimate
;
uint32_t
buffer_feedback
;
uint32_t
receiverAddress
;
prrtTimedelta_t
groupRTT_us
;
prrtTimestamp_t
forwardTripTimestamp_us
;
prrtSequenceNumber_t
erasureCount
;
prrtSequenceNumber_t
packetCount
;
prrtSequenceNumber_t
gapLength
;
prrtSequenceNumber_t
gapCount
;
prrtSequenceNumber_t
burstLength
;
prrtSequenceNumber_t
burstCount
;
uint32_t
bandwidthEstimate
;
PrrtIncompleteBlock
*
incompleteBlocks
;
}
PrrtPacketFeedbackPayload
;
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE sizeof(PrrtPacketFeedbackPayload)
#define PRRT_PACKET_FEEDBACK_HEADER_SIZE sizeof(PrrtPacketFeedbackPayload) - sizeof(PrrtIncompleteBlock*)
prrtPacketType_t
PrrtPacket_type
(
PrrtPacket
*
packet_ptr
);
uint8_t
PrrtPacket_type
(
PrrtPacket
*
packet_ptr
);
uint8_t
PrrtPacket_priority
(
PrrtPacket
*
packet_ptr
);
uint16_t
PrrtPacket_size
(
PrrtPacket
*
packet_ptr
);
prrtPacketLength_t
PrrtPacket_size
(
PrrtPacket
*
packet_ptr
);
int
PrrtPacket_print
(
PrrtPacket
*
packet_ptr
);
PrrtPacket
*
PrrtPacket_copy
(
PrrtPacket
*
original
);
PrrtPacket
*
PrrtPacket_create_data_packet
(
uint8_t
priority
,
const
void
*
payloadPointer
,
uint32_t
payloadLength
,
uint16_t
sequenceNumber
);
PrrtPacket
*
PrrtPacket_create_data_packet
(
uint8_t
priority
,
const
void
*
payloadPointer
,
prrtPacketLength_t
payloadLength
,
prrtSequenceNumber_t
sequenceNumber
);
PrrtPacket
*
PrrtPacket_create_feedback_packet
(
uint8_t
priority
,
uint8_t
index
,
uint16_t
sequenceNumber
,
uint32_t
roundTripTime
,
uint32_t
packetLossRate
,
uint16_t
gap
,
uint16_t
ngap
,
uint16_t
burst
,
uint16_t
nburst
,
uint32_t
bandwidth
,
uint32_t
bufferFeedback
,
uint32_t
receiverAddr
);
PrrtPacket
*
PrrtPacket_create_feedback_packet
(
uint8_t
priority
,
uint8_t
index
,
prrtSequenceNumber_t
sequenceNumber
,
prrtTimedelta_t
roundTripTime
,
prrtSequenceNumber_t
gapLength
,
prrtSequenceNumber_t
gapCount
,
prrtSequenceNumber_t
burstLength
,
prrtSequenceNumber_t
burstCount
,
uint32_t
bandwidth
,
uint32_t
receiverAddr
);
PrrtPacket
*
PrrtPacket_create_redundancy_packet
(
uint8_t
priority
,
void
*
payloadPointer
,
uint32_t
payloadLength
,
uint16_t
sequenceNumber
,
uint8_t
index
,
uint16_t
baseSequenceNumber
,
PrrtCodingParams
codingParams
);
PrrtPacket
*
PrrtPacket_create_redundancy_packet
(
uint8_t
priority
,
void
*
payloadPointer
,
prrtPacketLength_t
payloadLength
,
prrtSequenceNumber_t
sequenceNumber
,
uint8_t
index
,
prrtSequenceNumber_t
baseSequenceNumber
,
PrrtCodingParams
codingParams
);
bool
PrrtPacket_decode
(
void
*
srcBuffer
,
uint16_t
srcBufferSize
,
PrrtPacket
*
targetPacket
);
bool
PrrtPacket_encode
(
void
*
buf_ptr
,
uint16_t
buf_size
,
PrrtPacket
*
packet_ptr
);
int
PrrtPacket_destroy
(
PrrtPacket
*
packet
);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload
_len
- header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload
_len
- header_size);
#define PrrtPacket_copy_payload_to_buffer(dst, packet, header_size) memcpy(dst, packet->payload + header_size, packet->payload
Length
- header_size);
#define PrrtPacket_copy_buffer_to_payload(packet, src, header_size) memcpy(packet->payload + header_size, src, packet->payload
Length
- header_size);
#define PrrtPacket_get_data_timestamp(packet) ((PrrtPacketDataPayload*) packet->payload)->timestamp;
...
...
src/prrt/processes/dataReceiver.c
View file @
0307ec2c
...
...
@@ -9,7 +9,7 @@
#include
"../socket.h"
#include
"dataReceiver.h"
void
retrieve_data_blocks
(
PrrtSocket
*
sock_ptr
,
uint16
_t
base_seqno
,
uint8_t
k
,
const
PrrtBlock
*
block
)
void
retrieve_data_blocks
(
PrrtSocket
*
sock_ptr
,
prrtSequenceNumber
_t
base_seqno
,
uint8_t
k
,
const
PrrtBlock
*
block
)
{
List
*
res
=
List_create
();
BPTree_get_range
(
sock_ptr
->
dataStore
,
res
,
base_seqno
,
base_seqno
+
k
-
1
);
...
...
@@ -17,20 +17,20 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
LIST_FOREACH
(
res
,
first
,
next
,
cur
)
{
PrrtPacket
*
packetPtr
=
cur
->
value
;
check
(
PrrtBlock_insert_data_packet
((
PrrtBlock
*
)
block
,
packetPtr
),
"Insert failed!"
)
sock_ptr
->
dataStore
=
BPTree_delete
(
sock_ptr
->
dataStore
,
packetPtr
->
seq
no
);
sock_ptr
->
dataStore
=
BPTree_delete
(
sock_ptr
->
dataStore
,
packetPtr
->
seq
uenceNumber
);
}
error:
List_destroy
(
res
);
}
void
decode_block
(
PrrtSocket
*
sock_ptr
,
PrrtBlock
*
block
,
uint16
_t
base_seqno
)
void
decode_block
(
PrrtSocket
*
sock_ptr
,
PrrtBlock
*
block
,
prrtSequenceNumber
_t
base_seqno
)
{
if
(
block
!=
NULL
&&
PrrtBlock_decode_ready
(
block
))
{
check
(
PrrtBlock_decode
(
block
),
"Decoding failed"
);
while
(
List_count
(
block
->
dataPackets
)
>
0
)
{
PrrtPacket
*
pkt
=
List_shift
(
block
->
dataPackets
);
if
(
PrrtForwardPacketTable_test_set_is_number_relevant
(
sock_ptr
->
forwardPacketTable
,
pkt
->
seq
no
))
{
if
(
PrrtForwardPacketTable_test_set_is_number_relevant
(
sock_ptr
->
forwardPacketTable
,
pkt
->
seq
uenceNumber
))
{
check
(
pthread_mutex_lock
(
&
sock_ptr
->
inQueueFilledMutex
)
==
0
,
"Lock failed."
);
List_push
(
sock_ptr
->
inQueue
,
pkt
);
check
(
pthread_cond_signal
(
&
sock_ptr
->
inQueueFilledCv
)
==
0
,
"Signal failed."
);
...
...
@@ -62,15 +62,16 @@ bool send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote)
hp
=
gethostbyname
(
remote_host
);
memcpy
((
void
*
)
&
targetaddr
.
sin_addr
,
hp
->
h_addr_list
[
0
],
(
size_t
)
hp
->
h_length
);
PrrtPacket
*
feedback_pkt_ptr
=
PrrtPacket_create_feedback_packet
(
0
,
19
,
4715
,
3
,
50
,
4
,
6
,
8
,
9
,
5
,
1
,
sock_ptr
->
address
->
sin_addr
.
s_addr
);
uint32_t
length
=
PrrtPacket_size
(
feedback_pkt_ptr
);
PrrtPacket
*
feedback_pkt_ptr
=
PrrtPacket_create_feedback_packet
(
0
,
19
,
4715
,
3
,
4
,
6
,
8
,
9
,
5
,
sock_ptr
->
address
->
sin_addr
.
s_addr
);
prrtPacketLength_t
length
=
PrrtPacket_size
(
feedback_pkt_ptr
);
void
*
buf
=
calloc
(
1
,
length
);
check_mem
(
buf
);
check
(
PrrtPacket_encode
(
buf
,
MAX_PAYLOAD_LENGTH
,
feedback_pkt_ptr
),
"Buffer for encoding feedback is too small"
);
uint32
_t
forwardTripTime
=
htonl
(
PrrtClock_get_current_time
()
+
sock_ptr
->
lastSentTimestamp
-
sock_ptr
->
lastReceivedTimestamp
);
((
PrrtPacketFeedbackPayload
*
)
(
buf
+
PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH
))
->
forward
_t
rip
_t
imestamp
=
forwardTripTime
;
prrtTimestamp
_t
forwardTripTime
=
htonl
(
PrrtClock_get_current_time
()
+
sock_ptr
->
lastSentTimestamp
-
sock_ptr
->
lastReceivedTimestamp
);
((
PrrtPacketFeedbackPayload
*
)
(
buf
+
PRRT_PACKET_ENCODED_GENERAL_HEADER_LENGTH
))
->
forward
T
rip
T
imestamp
_us
=
forwardTripTime
;
// TODO: [LATENCY] By knowing the time for htonl and writing bytes, one could compensate the timestamp.
check
(
sendto
(
sock_ptr
->
feedbackSocketFd
,
buf
,
length
,
0
,
(
struct
sockaddr
*
)
&
targetaddr
,
sizeof
(
targetaddr
))
==
length
,
"Sending feedback failed."
);
...
...
@@ -105,21 +106,21 @@ void *receive_data_loop(void *ptr)
// TODO: make something useful with RTT approximation
PrrtPacketDataPayload
*
payload
=
packet
->
payload
;
debug
(
"RTT: %d"
,
payload
->
group
_round_trip_time
);
debug
(
"RTT: %d"
,
payload
->
group
RTT_us
);
sock_ptr
->
lastSentTimestamp
=
PrrtPacket_get_data_timestamp
(
packet
);
check
(
send_feedback
(
sock_ptr
,
remote
),
"Sending feedback failed."
);
uint8
_t
packetType
=
PrrtPacket_type
(
packet
);
prrtPacketType
_t
packetType
=
PrrtPacket_type
(
packet
);
if
(
packetType
==
PACKET_TYPE_DATA
)
{
// TODO: packet.timestamp + packet.timeout < now: break
if
(
PrrtForwardPacketTable_test_set_is_number_relevant
(
sock_ptr
->
forwardPacketTable
,
packet
->
seq
no
)
==
if
(
PrrtForwardPacketTable_test_set_is_number_relevant
(
sock_ptr
->
forwardPacketTable
,
packet
->
seq
uenceNumber
)
==
false
)
{
PrrtPacket_destroy
(
packet
);
}
else
{
uint16
_t
baseSequenceNumber
=
packet
->
seq
no
-
packet
->
index
;
prrtSequenceNumber
_t
baseSequenceNumber
=
packet
->
seq
uenceNumber
-
packet
->
index
;
PrrtPacket
*
reference
=
PrrtPacket_copy
(
packet
);
...
...
@@ -129,8 +130,8 @@ void *receive_data_loop(void *ptr)
decode_block
(
sock_ptr
,
block
,
baseSequenceNumber
);
}
else
{
// Check for duplicate data packet.
if
(
BPTree_get
(
sock_ptr
->
dataStore
,
packet
->
seq
no
)
==
NULL
)
{
sock_ptr
->
dataStore
=
BPTree_insert
(
sock_ptr
->
dataStore
,
packet
->
seq
no
,
reference
);
if
(
BPTree_get
(
sock_ptr
->
dataStore
,
packet
->
seq
uenceNumber
)
==
NULL
)
{
sock_ptr
->
dataStore
=
BPTree_insert
(
sock_ptr
->
dataStore
,
packet
->
seq
uenceNumber
,
reference
);
}
else
{
PrrtPacket_destroy
(
reference
);
}
...
...
@@ -147,28 +148,28 @@ void *receive_data_loop(void *ptr)
PrrtPacketRedundancyPayload
*
redundancyPayload
=
packet
->
payload
;
if
(
!
PrrtForwardPacketTable_test_is_block_relevant
(
sock_ptr
->
forwardPacketTable
,
redundancyPayload
->
base
_seqno
,
redundancyPayload
->
base
SequenceNumber
,
redundancyPayload
->
n
))
{
PrrtPacket_destroy
(
packet
);
}
else
{
PrrtBlock
*
block
=
BPTree_get
(
sock_ptr
->
blockStore
,
redundancyPayload
->
base
_seqno
);
PrrtBlock
*
block
=
BPTree_get
(
sock_ptr
->
blockStore
,
redundancyPayload
->
base
SequenceNumber
);
if
(
block
==
NULL
)
{
// TODO: PROPER CREATION
PrrtCodingParams
*
cpar
=
PrrtCodingParams_create
();
cpar
->
k
=
redundancyPayload
->
k
;
cpar
->
n
=
redundancyPayload
->
n
;
block
=
PrrtBlock_create
(
cpar
,
redundancyPayload
->
base
_seqno
);
block
=
PrrtBlock_create
(
cpar
,
redundancyPayload
->
base
SequenceNumber
);
PrrtCodingParams_destroy
(
cpar
);
sock_ptr
->
blockStore
=
BPTree_insert
(
sock_ptr
->
blockStore
,
redundancyPayload
->
base
_seqno
,
sock_ptr
->
blockStore
=
BPTree_insert
(
sock_ptr
->
blockStore
,
redundancyPayload
->
base
SequenceNumber
,
block
);
}
retrieve_data_blocks
(
sock_ptr
,
redundancyPayload
->
base
_seqno
,
block
->
codingParams
.
k
,
block
);
retrieve_data_blocks
(
sock_ptr
,
redundancyPayload
->
base
SequenceNumber
,
block
->
codingParams
.
k
,
block
);
if
(
PrrtBlock_insert_redundancy_packet
(
block
,
packet
))
{
decode_block
(
sock_ptr
,
block
,
redundancyPayload
->
base
_seqno
);
decode_block
(
sock_ptr
,
block
,
redundancyPayload
->
base
SequenceNumber
);
}
else
{
PrrtPacket_destroy
(
packet
);
}
...
...
src/prrt/processes/dataTransmitter.c
View file @
0307ec2c
...
...
@@ -15,8 +15,8 @@
bool
send_packet
(
PrrtSocket
*
sock_ptr
,
PrrtPacket
*
packet
)
{
uint8_t
buf
[
MAX_PAYLOAD_LENGTH
];
memset
(
buf
,
0
,
sizeof
(
buf
));
uint32
_t
length
=
PrrtPacket_size
(
packet
);
uint8
_t
type
=
PrrtPacket_type
(
packet
);
prrtPacketLength
_t
length
=
PrrtPacket_size
(
packet
);
prrtPacketType
_t
type
=
PrrtPacket_type
(
packet
);
check
(
PrrtPacket_encode
(
buf
,
MAX_PAYLOAD_LENGTH
,
packet
),
"Buffer too small."
);
...
...
@@ -77,12 +77,12 @@ void * send_data_loop(void *ptr) {
}
PrrtPacket
*
packet
=
List_shift
(
sock_ptr
->
outQueue
);
packet
->
seq
no
=
sock_ptr
->
sequenceNumberSource
++
;
packet
->
index
=
(
uint8_t
)
(
packet
->
seq
no
-
block
->
baseSequenceNumber
);
packet
->
seq
uenceNumber
=
sock_ptr
->
sequenceNumberSource
++
;
packet
->
index
=
(
uint8_t
)
(
packet
->
seq
uenceNumber
-
block
->
baseSequenceNumber
);
PrrtPacketDataPayload
*
payload
=
packet
->
payload
;
// TODO: should lock here !
payload
->
group
_round_trip_time
=
(
uint16
_t
)
(
sock_ptr
->
csi
.
rttMean
/
1000
);
payload
->
group
RTT_us
=
(
prrtTimedelta
_t
)
(
sock_ptr
->
csi
.
rttMean
/
1000
);
PrrtPacket
*
packetToSend
=
PrrtPacket_copy
(
packet
);
send_packet
(
sock_ptr
,
packetToSend
);
...
...
src/prrt/socket.c
View file @
0307ec2c
...
...
@@ -18,13 +18,13 @@
#include
"receiver.h"
#include
"clock.h"
PrrtSocket
*
PrrtSocket_create
(
const
uint8_t
is_sender
)
PrrtSocket
*
PrrtSocket_create
(
const
bool
is_sender
)
{
assert
(
sizeof
(
float
)
==
4
);
PrrtSocket
*
sock_ptr
=
(
PrrtSocket
*
)
calloc
(
1
,
sizeof
(
PrrtSocket
));
check_mem
(
sock_ptr
);
sock_ptr
->
is
_s
ender
=
is_sender
;
sock_ptr
->
is
S
ender
=
is_sender
;
sock_ptr
->
isBound
=
false
;
sock_ptr
->
sequenceNumberSource
=
1
;
...
...
@@ -85,7 +85,7 @@ bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t
check
(
bind
(
sock_ptr
->
dataSocketFd
,
(
struct
sockaddr
*
)
address
,
size
)
==
EXIT_SUCCESS
,
"Cannot bind data socket."
);
if
(
sock_ptr
->
is
_s
ender
)
{
if
(
sock_ptr
->
is
S
ender
)
{
check
(
pthread_create
(
&
sock_ptr
->
receiveFeedbackThread
,
NULL
,
receive_feedback_loop
,
(
void
*
)
sock_ptr
)
==
EXIT_SUCCESS
,
"Cannot create receive feedback thread."
);
check
(
pthread_create
(
&
sock_ptr
->
sendThread
,
NULL
,
send_data_loop
,
(
void
*
)
sock_ptr
)
==
EXIT_SUCCESS
,
...
...
@@ -109,10 +109,10 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
}
int
PrrtSocket_send
(
PrrtSocket
*
sock_ptr
,
const
uint8_t
*
data
,
const
size_t
data_len
)
{
check
(
sock_ptr
->
is
_s
ender
,
"Cannot send on receiver socket."
)
check
(
sock_ptr
->
is
S
ender
,
"Cannot send on receiver socket."
)
check
(
pthread_mutex_lock
(
&
sock_ptr
->
outQueueFilledMutex
)
==
0
,
"Lock failed."
);
PrrtPacket
*
packet
=
PrrtPacket_create_data_packet
(
5
,
data
,
(
uint32
_t
)
data_len
,
0
);
PrrtPacket
*
packet
=
PrrtPacket_create_data_packet
(
5
,
data
,
(
prrtPacketLength
_t
)
data_len
,
0
);
List_push
(
sock_ptr
->
outQueue
,
packet
);
check
(
pthread_cond_signal
(
&
sock_ptr
->
outQueueFilledCv
)
==
0
,
"Signal failed."
);
...
...
@@ -125,7 +125,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data
}
int32_t
PrrtSocket_recv
(
PrrtSocket
*
sock_ptr
,
void
*
buf_ptr
)
{
check
(
sock_ptr
->
is
_s
ender
==
false
,
"Cannot receive on sender socket."
)
check
(
sock_ptr
->
is
S
ender
==
false
,
"Cannot receive on sender socket."
)
while
(
1
)
{
check
(
pthread_mutex_lock
(
&
sock_ptr
->
inQueueFilledMutex
)
==
0
,
"Lock failed."
);