Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
LARN
PRRT
Commits
eb7f6c12
Commit
eb7f6c12
authored
Mar 08, 2016
by
Andreas Schmidt
Browse files
Merge branch 'feature/evaluation' into develop
parents
72e50c57
0511f92d
Pipeline
#54
passed with stage
Changes
26
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitignore
View file @
eb7f6c12
.idea/
build/
bin/
*.cmake
prrt.cpython-35m-x86_64-linux-gnu.so
src/cython/prrt.c
CMakeCache.txt
CMakeFiles/
Makefile
\ No newline at end of file
build.sh
0 → 100644
View file @
eb7f6c12
#!/usr/bin/env bash
rm
src/cython/prrt.c
python setup.py build_ext
--inplace
eval.py
0 → 100644
View file @
eb7f6c12
import
sys
sys
.
path
.
insert
(
0
,
"./build"
)
import
prrt
import
time
as
time
import
threading
class
receiverThread
(
threading
.
Thread
):
def
__init__
(
self
,
seqnoDigits
):
threading
.
Thread
.
__init__
(
self
)
self
.
sock
=
prrt
.
PrrtSocket
(
5000
,
False
)
self
.
stats
=
{}
self
.
received
=
set
()
self
.
duplicates
=
0
self
.
seqnoDigits
=
seqnoDigits
def
run
(
self
):
while
(
True
):
len
,
data
=
self
.
sock
.
recv
()
v
=
data
[:
len
].
decode
(
'UTF-8'
)
seqno
=
v
[:
self
.
seqnoDigits
]
if
seqno
in
self
.
received
:
self
.
duplicates
=
self
.
duplicates
+
1
else
:
self
.
received
.
update
(
set
([
seqno
]))
time
.
sleep
(
0.000001
)
class
senderThread
(
threading
.
Thread
):
def
__init__
(
self
,
seqnoDigits
,
packetCount
):
threading
.
Thread
.
__init__
(
self
)
self
.
sock
=
prrt
.
PrrtSocket
(
6000
,
True
)
self
.
packetCount
=
packetCount
self
.
seqnoDigits
=
seqnoDigits
def
run
(
self
):
self
.
sock
.
connect
(
"127.0.0.1"
,
5000
)
for
i
in
range
(
self
.
packetCount
):
self
.
sock
.
send
(
str
(
i
).
zfill
(
self
.
seqnoDigits
))
time
.
sleep
(
0.001
)
self
.
sock
.
close
();
if
__name__
==
"__main__"
:
# Works: 2^15; Segfault at: 2^16
packetCount
=
2
**
16
seqnoDigits
=
10
recvThread
=
receiverThread
(
seqnoDigits
)
recvThread
.
daemon
=
True
sendThread
=
senderThread
(
seqnoDigits
,
packetCount
)
recvThread
.
start
()
sendThread
.
run
()
time
.
sleep
(
10
)
count
=
len
(
recvThread
.
received
)
print
(
"Residual Loss:"
,
1
-
(
count
/
packetCount
),
"% (received"
,
count
,
"of"
,
packetCount
,
")"
)
print
(
"Duplicates:"
,
recvThread
.
duplicates
)
setup.py
0 → 100644
View file @
eb7f6c12
from
distutils.core
import
setup
from
distutils.extension
import
Extension
from
Cython.Build
import
cythonize
setup
(
include_dirs
=
[
"./src"
],
ext_modules
=
cythonize
([
"src/cython/prrt.pyx"
],
gdb_debug
=
True
)
)
\ No newline at end of file
src/cython/cprrt.pxd
0 → 100644
View file @
eb7f6c12
from
libc.stdint
cimport
uint32_t
,
uint16_t
,
uint8_t
,
int32_t
from
libc.string
cimport
const_char
cdef
extern
from
"pthread.h"
nogil
:
ctypedef
struct
pthread_t
:
pass
ctypedef
struct
pthread_mutex_t
:
pass
ctypedef
struct
pthread_cond_t
:
pass
cdef
extern
from
"prrt/stores/forward_packet_table.h"
:
cdef
struct
prrtForwardPacketTable
:
pass
ctypedef
prrtForwardPacketTable
PrrtForwardPacketTable
cdef
extern
from
"prrt/vdmcode/block_code.h"
:
cdef
struct
prrtCoder
:
pass
ctypedef
prrtCoder
PrrtCoder
cdef
extern
from
"prrt/coding_params.h"
:
cdef
struct
prrtCodingParams
:
uint8_t
k
;
uint8_t
r
;
uint8_t
n
;
uint8_t
n_p
;
ctypedef
prrtCodingParams
PrrtCodingParams
cdef
extern
from
"util/list.h"
:
cdef
struct
list
:
pass
ctypedef
list
List
cdef
struct
listNode
:
pass
ctypedef
listNode
ListNode
void
List_push
(
List
*
list
,
const
void
*
value
)
void
*
List_pop
(
const
List
*
list
)
void
List_unshift
(
List
*
list
,
const
void
*
value
)
void
*
List_shift
(
const
List
*
list
)
void
*
List_remove
(
List
*
list
,
const
ListNode
*
node
)
cdef
extern
from
"prrt/block.h"
:
cdef
struct
prrtBlock
:
uint32_t
data_count
uint32_t
redundancy_count
PrrtCodingParams
coding_params
uint32_t
largest_data_length
uint16_t
base_seqno
List
*
data_blocks
List
*
redundancy_blocks
uint8_t
is_coded
ctypedef
prrtBlock
PrrtBlock
cdef
extern
from
"prrt/packet.h"
:
cdef
struct
prrtPacket
:
uint8_t
type_priority
;
uint8_t
index
;
uint16_t
seqno
;
void
*
payload
;
uint32_t
payload_len
;
ctypedef
prrtPacket
PrrtPacket
cdef
extern
from
"prrt/socket.h"
:
ctypedef
struct
PrrtReceiver
:
const
char
*
host_name
uint16_t
port
cdef
struct
prrtSocket
:
int
dataSocketFd
int
feedbackSocketFd
pthread_t
receiveFeedbackThread
pthread_t
sendThread
pthread_mutex_t
outQueueFilledMutex
pthread_cond_t
outQueueFilledCv
List
*
outQueue
pthread_t
receiveDataThread
pthread_mutex_t
inQueueFilledMutex
pthread_cond_t
inQueueFilledMutexCv
List
*
inQueue
BPTreeNode
*
dataStore
BPTreeNode
*
blockStore
PrrtForwardPacketTable
*
forwardPacketTable
PrrtReceiver
receivers
[
10
]
int
receiverLength
uint16_t
packetsCount
uint16_t
sequenceNumberSource
uint16_t
sequenceNumberRepetition
uint16_t
sequenceNumberRedundancy
uint16_t
sequenceNumberFeedback
ctypedef
prrtSocket
PrrtSocket
cdef
PrrtSocket
*
PrrtSocket_create
(
uint16_t
port
,
uint8_t
is_sender
)
int
PrrtSocket_close
(
const
PrrtSocket
*
sock_ptr
)
int
PrrtSocket_connect
(
PrrtSocket
*
sock_ptr
,
const_char
*
host
,
const
uint16_t
port
)
int
PrrtSocket_send
(
PrrtSocket
*
sock_ptr
,
const
uint8_t
*
data
,
const
size_t
data_len
)
int32_t
PrrtSocket_recv
(
PrrtSocket
*
sock_ptr
,
void
*
buf_ptr
)
nogil
PrrtPacket
*
PrrtSocket_recv_feedback
(
const
PrrtSocket
*
sock_ptr
,
void
*
bufin
,
const
size_t
length
)
cdef
extern
from
"util/bptree.h"
:
ctypedef
struct
BPTreeNode
:
pass
BPTreeNode
*
BPTree_insert
(
BPTreeNode
*
root
,
int
key
,
void
*
value
);
BPTreeNode
*
BPTree_delete
(
BPTreeNode
*
root
,
int
key
);
BPTreeNode
*
BPTree_destroy
(
BPTreeNode
*
root
);
void
*
BPTree_get
(
BPTreeNode
*
root
,
int
key
);
void
BPTree_get_range
(
BPTreeNode
*
root
,
List
*
list
,
int
key_start
,
int
key_end
);
src/cython/prrt.pyx
0 → 100644
View file @
eb7f6c12
from
libc.stdint
cimport
uint32_t
,
uint16_t
,
uint8_t
,
int32_t
cimport
cprrt
cdef
extern
from
"prrt/stores/forward_packet_table.c"
:
int
PrrtForwardPacketTable_create
(
cprrt
.
PrrtForwardPacketTable
*
fpt_prt
);
int
PrrtForwardPacketTable_test_set_is_number_relevant
(
cprrt
.
PrrtForwardPacketTable
*
fpt_ptr
,
uint16_t
seqno
);
int
PrrtForwardPacketTable_test_is_block_relevant
(
cprrt
.
PrrtForwardPacketTable
*
forwardPacketTable
,
uint16_t
start
,
uint16_t
length
);
int
PrrtForwardPacketTable_destroy
(
cprrt
.
PrrtForwardPacketTable
*
fpt_prt
);
cdef
extern
from
"prrt/processes/feedback_receiver.c"
:
void
receive_feedback_loop
(
void
*
ptr
)
cdef
extern
from
"prrt/processes/data_receiver.c"
:
void
receive_data_loop
(
void
*
ptr
)
cdef
extern
from
"prrt/processes/data_transmitter.c"
:
void
send_data_loop
(
void
*
ptr
)
cdef
extern
from
"prrt/block.c"
:
cprrt
.
PrrtPacket
*
PrrtBlock_get_first_data
(
cprrt
.
PrrtBlock
*
block_ptr
)
cdef
extern
from
"prrt/vdmcode/block_code.c"
:
int
PrrtCoder_get_coder
(
cprrt
.
PrrtCoder
**
cod
,
uint8_t
n
,
uint8_t
k
)
cdef
extern
from
"prrt/coding_params.c"
:
void
PrrtCodingParams_init
(
cprrt
.
PrrtCodingParams
*
cpar
)
cdef
extern
from
"prrt/packet.c"
:
uint8_t
PrrtPacket_type
(
cprrt
.
PrrtPacket
*
packet_ptr
)
uint8_t
PrrtPacket_priority
(
cprrt
.
PrrtPacket
*
packet_ptr
)
uint16_t
PrrtPacket_size
(
cprrt
.
PrrtPacket
*
packet_ptr
)
int
PrrtPacket_print
(
cprrt
.
PrrtPacket
*
packet_ptr
)
cdef
extern
from
"prrt/socket.c"
:
cprrt
.
PrrtSocket
*
PrrtSocket_create
(
uint16_t
port
,
uint8_t
is_sender
)
cdef
extern
from
"util/bptree.c"
:
cprrt
.
BPTreeNode
*
BPTree_insert
(
cprrt
.
BPTreeNode
*
root
,
int
key
,
void
*
value
)
cprrt
.
BPTreeNode
*
BPTree_delete
(
cprrt
.
BPTreeNode
*
root
,
int
key
)
cprrt
.
BPTreeNode
*
BPTree_destroy
(
cprrt
.
BPTreeNode
*
root
)
void
*
BPTree_get
(
cprrt
.
BPTreeNode
*
root
,
int
key
)
void
BPTree_get_range
(
cprrt
.
BPTreeNode
*
root
,
cprrt
.
List
*
list
,
int
key_start
,
int
key_end
)
cdef
extern
from
"util/list.c"
:
void
List_push
(
cprrt
.
List
*
list
,
const
void
*
value
)
void
*
List_pop
(
const
cprrt
.
List
*
list
)
void
List_unshift
(
cprrt
.
List
*
list
,
const
void
*
value
)
void
*
List_shift
(
const
cprrt
.
List
*
list
)
void
*
List_remove
(
cprrt
.
List
*
list
,
const
cprrt
.
ListNode
*
node
)
cdef
class
PrrtSocket
:
cdef
cprrt
.
PrrtSocket
*
_c_socket
cdef
bint
isSender
def
__cinit__
(
self
,
port
,
isSender
):
self
.
_c_socket
=
cprrt
.
PrrtSocket_create
(
port
,
isSender
)
self
.
isSender
=
isSender
def
recv
(
self
):
cdef
char
buffer
[
65536
]
cdef
int32_t
len
with
nogil
:
len
=
cprrt
.
PrrtSocket_recv
(
self
.
_c_socket
,
<
void
*>
buffer
)
return
len
,
buffer
def
connect
(
self
,
host
,
port
):
cdef
bytes
encodedHost
=
host
.
encode
(
"utf-8"
)
cprrt
.
PrrtSocket_connect
(
self
.
_c_socket
,
encodedHost
,
port
)
def
send
(
self
,
data
):
cdef
bytes
encodedData
=
data
.
encode
(
"utf-8"
)
cprrt
.
PrrtSocket_send
(
self
.
_c_socket
,
encodedData
,
len
(
data
))
def
close
(
self
):
cprrt
.
PrrtSocket_close
(
self
.
_c_socket
)
\ No newline at end of file
src/prrt/block.c
View file @
eb7f6c12
#include
<string.h>
#include
<src
/defines.h
>
#include
<src/prrt/vdmcode/block_code
.h
>
#include
<src
/util/
list
.h
>
#include
<src
/util/
dbg
.h
>
#include
<src/util/common
.h
>
#include
"..
/defines.h
"
#include
"../util/list
.h
"
#include
"..
/util/
dbg
.h
"
#include
"..
/util/
common
.h
"
#include
"packet
.h
"
#include
"block.h"
#include
"coding_params.h"
void
gather_redundancy_packets
(
const
PrrtBlock
*
block_ptr
,
gf
*
const
*
fec
,
int
*
idx_p
)
{
int
i
;
u
int
32_t
i
;
uint32_t
m
=
0
;
PrrtPacket
*
packet
=
NULL
;
...
...
src/prrt/block.h
View file @
eb7f6c12
#ifndef PRRT_BLOCK_H
#define PRRT_BLOCK_H
#include
<src/prrt/
coding_params.h
>
#include
<src/prrt/
packet.h
>
#include
<src
/util/list.h
>
#include
"
coding_params.h
"
#include
"
packet.h
"
#include
"..
/util/list.h
"
typedef
struct
{
typedef
struct
prrtBlock
{
PrrtCodingParams
codingParams
;
uint32_t
largestDataLength
;
uint16_t
baseSequenceNumber
;
...
...
src/prrt/coding_params.c
View file @
eb7f6c12
#include
<src
/defines.h
>
#include
<src/prrt/
coding_params.h
>
#include
"..
/defines.h
"
#include
"
coding_params.h
"
void
PrrtCodingParams_init
(
PrrtCodingParams
*
cpar
)
{
pthread_mutex_init
(
&
cpar
->
lock
,
NULL
);
...
...
src/prrt/coding_params.h
View file @
eb7f6c12
...
...
@@ -4,7 +4,7 @@
#include
<pthread.h>
#include
<stdint.h>
typedef
struct
{
typedef
struct
prrtCodingParams
{
pthread_mutex_t
lock
;
uint8_t
k
;
...
...
src/prrt/packet.c
View file @
eb7f6c12
...
...
@@ -3,8 +3,8 @@
#include
<sys/time.h>
#include
<string.h>
#include
<netinet/in.h>
#include
<src
/util/
dbg
.h
>
#include
<src
/util/
common
.h
>
#include
"..
/util/
common
.h
"
#include
"..
/util/
dbg
.h
"
#include
<stdbool.h>
#include
"packet.h"
...
...
@@ -104,7 +104,8 @@ PrrtPacket *PrrtPacket_copy(PrrtPacket *original)
return
newPacket
;
error:
PERROR
(
"Not enough memory for packet copies."
);
PERROR
(
"Not enough memory for packet copies.%s"
,
""
);
return
NULL
;
}
PrrtPacket
*
create_header
(
uint8_t
priority
,
uint16_t
seqno
,
uint32_t
size
,
uint8_t
type
,
uint8_t
index
)
...
...
@@ -122,6 +123,7 @@ PrrtPacket *create_header(uint8_t priority, uint16_t seqno, uint32_t size, uint8
error:
PERROR
(
"Could not create packet.%s"
,
""
);
return
NULL
;
}
bool
PrrtPacket_encode
(
void
*
buf_ptr
,
uint16_t
buf_size
,
PrrtPacket
*
packet_ptr
)
...
...
@@ -422,7 +424,8 @@ PrrtPacket *PrrtPacket_create_data_packet(uint8_t priority, const void *payloadP
return
packet
;
error:
PERROR
(
"Could not create packet."
);
PERROR
(
"Could not create packet.%s"
,
""
);
return
NULL
;
}
PrrtPacket
*
PrrtPacket_create_redundancy_packet
(
uint8_t
priority
,
void
*
payloadPointer
,
uint32_t
payloadLength
,
...
...
src/prrt/packet.h
View file @
eb7f6c12
...
...
@@ -7,7 +7,7 @@
// DESTROY:
#include
<stdint.h>
#include
<src/prrt/
coding_params.h
>
#include
"
coding_params.h
"
#include
<stdbool.h>
#define PACKET_TYPE_DATA 0
...
...
@@ -28,7 +28,7 @@ typedef struct prrtPacket {
#define SEQNO_SPACE 65536 // 2**16 as seqno is uint16_t
typedef
struct
{
typedef
struct
prrtPacketDataPayload
{
uint32_t
timestamp
;
uint16_t
group_round_trip_time
;
uint16_t
packet_timeout
;
...
...
@@ -37,14 +37,14 @@ typedef struct {
}
PrrtPacketDataPayload
;
#define PRRT_PACKET_DATA_HEADER_SIZE sizeof(PrrtPacketDataPayload)
typedef
struct
{
typedef
struct
prrtPacketRedundancyPayload
{
uint16_t
base_seqno
;
uint8_t
n
;
uint8_t
k
;
}
PrrtPacketRedundancyPayload
;
#define PRRT_PACKET_REDUNDANCY_HEADER_SIZE sizeof(PrrtPacketRedundancyPayload)
typedef
struct
{
typedef
struct
prrtPacketFeedbackPayload
{
uint32_t
receiver_addr
;
uint32_t
group_round_trip_time
;
uint32_t
forward_trip_time
;
...
...
src/prrt/processes/data_receiver.c
View file @
eb7f6c12
#include
<netdb.h>
#include
<stdio.h>
#include
<arpa/inet.h>
#include
<src
/defines.h
>
#include
<src
/util/dbg.h
>
#include
<src/prrt/socket
.h
>
#include
<src/prrt/bl
ock.h
>
#include
<src/util/common
.h
>
#include
"../..
/defines.h
"
#include
"../..
/util/dbg.h
"
#include
"../../util/common
.h
"
#include
"../s
ock
et
.h
"
#include
"../block
.h
"
#include
"data_receiver.h"
void
retrieve_data_blocks
(
PrrtSocket
*
sock_ptr
,
uint16_t
base_seqno
,
uint8_t
k
,
const
PrrtBlock
*
block
)
...
...
src/prrt/processes/data_transmitter.c
View file @
eb7f6c12
...
...
@@ -2,11 +2,11 @@
#include
<netdb.h>
#include
<unistd.h>
#include
<string.h>
#include
<src
/defines.h
>
#include
<src/prrt
/socket.h
>
#include
<src/prrt
/block.h
>
#include
<src
/util/dbg.h
>
#include
<src
/util/common.h
>
#include
"../..
/defines.h
"
#include
"..
/socket.h
"
#include
"..
/block.h
"
#include
"../..
/util/dbg.h
"
#include
"../..
/util/common.h
"
#include
"data_transmitter.h"
...
...
@@ -20,6 +20,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
// SENDING TO ALL RECEIVERS
LIST_FOREACH
(
sock_ptr
->
receivers
,
first
,
next
,
cur
)
{
PrrtReceiver
*
recv
=
cur
->
value
;
struct
hostent
*
hp
;
struct
sockaddr_in
targetaddr
;
...
...
@@ -28,6 +29,7 @@ bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
targetaddr
.
sin_port
=
htons
(
recv
->
port
);
hp
=
gethostbyname
(
recv
->
host_name
);
check
(
hp
!=
NULL
,
"Could not resolve host '%s'."
,
recv
->
host_name
)
memcpy
((
void
*
)
&
targetaddr
.
sin_addr
,
hp
->
h_addr_list
[
0
],
(
size_t
)
hp
->
h_length
);
ssize_t
sendtoRes
=
sendto
(
sock_ptr
->
dataSocketFd
,
buf
,
length
,
0
,
(
struct
sockaddr
*
)
&
targetaddr
,
sizeof
(
targetaddr
));
...
...
@@ -80,7 +82,7 @@ void * send_data_loop(void *ptr) {
PrrtBlock_insert_data_packet
(
block
,
packet
);
if
(
PrrtBlock_encode_ready
(
block
))
{
int
j
=
0
;
u
int
32_t
j
=
0
;
PrrtBlock_encode
(
block
,
&
sock_ptr
->
sequenceNumberRedundancy
);
uint32_t
redundancyBlocks
=
List_count
(
block
->
redundancyPackets
);
...
...
src/prrt/processes/feedback_receiver.c
View file @
eb7f6c12
#include
<string.h>
#include
<unistd.h>
#include
<src/defines.h>
#include
<src/prrt/packet.h>
#include
<src/prrt/socket.h>
#include
<src/util/dbg.h>
#include
"../../defines.h"
#include
"../packet.h"
#include
"../socket.h"
#include
"feedback_receiver.h"
void
*
receive_feedback_loop
(
void
*
ptr
)
{
void
*
receive_feedback_loop
(
void
*
ptr
)
{
char
bufin
[
MAX_PAYLOAD_LENGTH
];
PrrtSocket
*
sock_ptr
=
ptr
;
...
...
@@ -23,4 +22,6 @@ void *receive_feedback_loop(void *ptr) {
pthread_mutex_lock
(
&
sock_ptr
->
closingMutex
);
}
pthread_mutex_unlock
(
&
sock_ptr
->
closingMutex
);
return
NULL
;
}
\ No newline at end of file
src/prrt/processes/feedback_receiver.h
View file @
eb7f6c12
#ifndef PRRT_FEEDBACK_RECEIVER_H
#define PRRT_FEEDBACK_RECEIVER_H
void
*
receive_feedback_loop
(
void
*
ptr
);
void
*
receive_feedback_loop
(
void
*
ptr
);
#endif //PRRT_FEEDBACK_RECEIVER_H
src/prrt/socket.c
View file @
eb7f6c12
...
...
@@ -4,20 +4,23 @@
#include
<stdlib.h>
#include
<string.h>
#include
<unistd.h>
#include
<src/defines.h>
#include
<src/prrt/packet.h>
#include
<src/prrt/socket.h>
#include
<src/prrt/processes/feedback_receiver.h>
#include
<src/prrt/processes/data_transmitter.h>
#include
<src/prrt/processes/data_receiver.h>
#include
<src/util/dbg.h>
#include
<sys/poll.h>
#include
"../defines.h"
#include
"packet.h"
#include
"../util/dbg.h"
#include
"../util/common.h"
#include
"processes/feedback_receiver.h"
#include
"processes/data_transmitter.h"
#include
"processes/data_receiver.h"
#include
"socket.h"
#include
"block.h"
PrrtSocket
*
PrrtSocket_create
(
const
uint16_t
port
,
const
uint8_t
is_sender
)
{
PrrtSocket
*
sock_ptr
=
(
PrrtSocket
*
)
calloc
(
1
,
sizeof
(
PrrtSocket
));
check_mem
(
sock_ptr
);
sock_ptr
->
is_sender
=
is_sender
;
bool
PrrtSocket_create
(
PrrtSocket
*
sock_ptr
,
const
uint16_t
port
,
const
uint8_t
is_sender
)
{
sock_ptr
->
sequenceNumberSource
=
1
;
sock_ptr
->
sequenceNumberRedundancy
=
1
;
...
...
@@ -76,23 +79,26 @@ bool PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t
"Cannot create data receiving thread."
);
}
return
tr
ue
;
return
sock_p
tr
;
error:
PrrtSocket_close
(
sock_ptr
);
return
false
;
return
NULL
;
}
int
PrrtSocket_connect
(
PrrtSocket
*
sock_ptr
,
const
char
*
host
,
const
uint16_t
port
)
{
PrrtReceiver
*
recv
=
calloc
(
1
,
sizeof
(
PrrtReceiver
));
recv
->
host_name
=
host
;
uint32_t
hostLength
=
(
uint32_t
)
strlen
(
host
);
recv
->
host_name
=
calloc
(
1
,
hostLength
+
1
);
memcpy
((
void
*
)
recv
->
host_name
,
host
,
hostLength
);
recv
->
port
=
port
;
List_push
(
sock_ptr
->
receivers
,
recv
);
return
0
;
}
int
PrrtSocket_send
(
PrrtSocket
*
sock_ptr
,
const
void
*
data
,
const
size_t
data_len
)
{
int
PrrtSocket_send
(
PrrtSocket
*
sock_ptr
,
uint8_t
*
data
,
const
size_t
data_len
)
{
check
(
sock_ptr
->
is_sender
,
"Cannot send on receiver socket."
)
pthread_mutex_lock
(
&
sock_ptr
->
outQueueFilledMutex
);
PrrtPacket
*
packet
=
PrrtPacket_create_data_packet
(
5
,
data
,
(
uint32_t
)
data_len
,
0
);
...
...
@@ -102,32 +108,38 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
pthread_mutex_unlock
(
&
sock_ptr
->
outQueueFilledMutex
);
return
0
;
error:
return
-
1
;
}
int32_t
PrrtSocket_recv
(
PrrtSocket
*
sock_ptr
,
void
*
buf_ptr
)
{
pthread_mutex_t
t
=
sock_ptr
->
inQueueFilledMutex
;
check
(
sock_ptr
->
is_sender
==
false
,
"Cannot receive on sender socket."
)
pthread_mutex_t
filledMutex
=
sock_ptr
->
inQueueFilledMutex
;
pthread_mutex_t
closingMutex
=
sock_ptr
->
closingMutex
;
while
(
1
)
{
pthread_mutex_lock
(
&
t
);
pthread_mutex_lock
(
&
filledMutex
);
while
(
List_count
(
sock_ptr
->
inQueue
)
==
0
)
{
pthread_mutex_lock
(
&
sock_ptr
->
closingMutex
);
pthread_mutex_lock
(
&
closingMutex
);
if
(
sock_ptr
->
closing
)
{
pthread_mutex_unlock
(
&
sock_ptr
->
closingMutex
);
pthread_mutex_unlock
(
&
t
);
pthread_mutex_unlock
(
&
closingMutex
);
pthread_mutex_unlock
(
&
filledMutex
);
return
-
1
;
}
pthread_mutex_unlock
(
&
sock_ptr
->
closingMutex
);
pthread_cond_wait
(
&
sock_ptr
->
inQueueFilledCv
,
&
t
);
pthread_mutex_unlock
(
&
closingMutex
);
pthread_cond_wait
(
&
sock_ptr
->
inQueueFilledCv
,
&
filledMutex
);
}
PrrtPacket
*
packet
=
List_shift
(
sock_ptr
->
inQueue
);
uint32_t
len
=
packet
->
payload_len
-
PRRT_PACKET_DATA_HEADER_SIZE
;
uint32_t
len
=
(
uint32_t
)
(
packet
->
payload_len
-
PRRT_PACKET_DATA_HEADER_SIZE
)
;
PrrtPacket_copy_payload_to_buffer
(
buf_ptr
,
packet
,
PRRT_PACKET_DATA_HEADER_SIZE
);
PrrtPacket_destroy
(
packet
);
pthread_mutex_unlock
(
&
t
);
pthread_mutex_unlock
(
&
filledMutex
);