prrt.pyx 11.6 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
Andreas Schmidt's avatar
Andreas Schmidt committed
2
from libc.stdlib cimport malloc, free
3
from libc.string cimport memset
4
from posix.time cimport timespec
rna's avatar
rna committed
5
cimport cython
Andreas Schmidt's avatar
Andreas Schmidt committed
6

7 8
cimport cprrt

9
import datetime
10 11
import socket
import ipaddress
12

13 14
include "sockets.pxd"

15 16 17 18
class TimeoutException(Exception):
    def __init__(self):
        self.message = "The call timed out."

19 20 21
class PayloadTooBigException(Exception):
    pass

22
cdef extern from "proto/applicationConstraints.c":
23 24
    pass

25
cdef extern from "proto/stores/dataPacketStore.c":
26 27
    pass

28
cdef extern from "proto/stores/deliveredPacketTable.c":
29
    pass
30

Andreas Schmidt's avatar
Andreas Schmidt committed
31 32 33 34
cdef extern from "proto/stores/receptionTable.c":
    pass

cdef extern from "proto/stores/packetTimeoutTable.c":
35 36
    pass

37
cdef extern from "proto/stores/packetTimeoutTable.c":
38 39
    pass

40 41 42
cdef extern from "proto/stores/inFlightPacketStore.c":
    pass

43
cdef extern from "proto/stores/repairBlockStore.c":
44 45
    pass

46
cdef extern from "proto/stores/packetDeliveryStore.c":
47 48
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
49 50 51
cdef extern from "proto/types/packetTimeout.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
52 53 54
cdef extern from "proto/types/lossStatistics.c":
    pass

55
cdef extern from "proto/processes/dataReceiver.c":
56
    pass
57

58
cdef extern from "proto/processes/dataTransmitter.c":
59
    pass
60

61
cdef extern from "proto/block.c":
62
    pass
63

Andreas Schmidt's avatar
Andreas Schmidt committed
64 65 66
cdef extern from "proto/bbr.c":
    pass

67
cdef extern from "proto/clock.c":
68
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
69

70
cdef extern from "proto/channelStateInformation.c":
71
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
72

73
cdef extern from "proto/vdmcode/block_code.c":
74
    pass
75

76
cdef extern from "proto/codingParams.c":
77
    pass
78

79
cdef extern from "proto/receiver.c":
80
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
81

82
cdef extern from "proto/types/packet.c":
83
    pass
84

85
cdef extern from "proto/socket.c":
86
    pass
87 88

cdef extern from "util/bptree.c":
89
    pass
90

91 92
cdef extern from "util/bitmap.c":
    pass
93

94 95 96
cdef extern from "util/common.c":
    pass

97 98
cdef extern from "util/list.c":
    pass
99

Andreas Schmidt's avatar
Andreas Schmidt committed
100 101 102
cdef extern from "util/pipe.c":
    pass

rna's avatar
rna committed
103 104 105
cdef extern from "util/time.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
106 107 108
cdef extern from "util/mpsc_queue.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
109 110 111
cdef extern from "util/windowedFilter.c":
    pass

112
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
113
    return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port))
114

rna's avatar
rna committed
115
class PrrtCodingConfiguration:
116 117 118
    def __init__(self, n, k, n_cycle=None):
        if n < k:
            raise ValueError("n must be greater or equal k.")
rna's avatar
rna committed
119 120
        self.n = n
        self.k = k
121 122 123 124 125 126 127 128 129
        self.r = n - k

        if self.r != 0:
            if n_cycle is None:
                raise ValueError("n_cycle cannot be None if n == k.")
            elif sum(n_cycle) != (n-k):
                raise ValueError("The elements in n_cycle must sum up to n-k.")

        self.n_cycle = n_cycle if n_cycle is not None else [0]
130

131
    def __repr__(self):
rna's avatar
rna committed
132
        return "PrrtCodingConfiguration(n={},k={},n_cycle={})".format(self.n, self.k, self.n_cycle)
133

134
    def __str__(self):
135 136
        return "({},{},{})".format(self.n, self.k, self.n_cycle)

137 138
cdef class PrrtSocket:
    cdef cprrt.PrrtSocket* _c_socket
139
    _epoch = datetime.datetime.utcfromtimestamp(0)
140

rna's avatar
rna committed
141
    def __cinit__(self, address, maximum_payload_size = 1400, target_delay = 1, thread_pinning = False):
142
        host, port = address
143
        target_delay_us = target_delay * 1000**2
rna's avatar
rna committed
144
        self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
145 146
        if thread_pinning:
            cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
147
        cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
Andreas Schmidt's avatar
Andreas Schmidt committed
148

Andreas Schmidt's avatar
Andreas Schmidt committed
149 150
    # Channel Properties
    property data_rate_btl_fwd:
Andreas Schmidt's avatar
Andreas Schmidt committed
151
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
152
            return cprrt.PrrtSocket_get_btlbw_fwd(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
153

Andreas Schmidt's avatar
Andreas Schmidt committed
154
    property data_rate_btl_back:
Andreas Schmidt's avatar
Andreas Schmidt committed
155
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
156
            return cprrt.PrrtSocket_get_btlbw_back(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
157

Andreas Schmidt's avatar
Andreas Schmidt committed
158
    property rtt_prop_fwd:
159
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
160 161 162 163 164
            return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001

    property loss_rate_fwd:
        def __get__(self):
            return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
165 166


Andreas Schmidt's avatar
Andreas Schmidt committed
167
    # Application Properties
168 169
    property target_delay:
        def __get__(self):
170
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
171

rna's avatar
rna committed
172 173 174 175
    property maximum_payload_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")

Andreas Schmidt's avatar
Andreas Schmidt committed
176 177 178 179 180 181

    # Protocol configuration
    property thread_pinning:
        def __get__(self):
            return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)

182 183 184 185 186 187 188
    property app_queue_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "app_queue_size")

        def __set__(self, value):
            cprrt.PrrtSocket_set_sock_opt(self._c_socket, "app_queue_size", value)

189 190
    property coding_configuration:
        def __get__(self):
191 192
            cdef cprrt.PrrtCodingConfiguration *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
            return PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
rna's avatar
rna committed
193 194 195 196 197 198 199 200

        def __set__(self, params: PrrtCodingConfiguration):
            cdef uint8_t* n_cycle
            c = len(params.n_cycle)
            n_cycle = <uint8_t *> malloc(c * cython.sizeof(int))
            for i, x in enumerate(params.n_cycle):
              n_cycle[i] = x
            cprrt.PrrtSocket_set_coding_parameters(self._c_socket, params.k, params.n, c, n_cycle)
201

Andreas Schmidt's avatar
Andreas Schmidt committed
202 203 204 205 206
    # Sending
    def connect(self, address):
        host, port = address
        cdef bytes encodedHost = host.encode("utf-8")
        cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
Andreas Schmidt's avatar
Andreas Schmidt committed
207

Andreas Schmidt's avatar
Andreas Schmidt committed
208
    def send(self, data):
rna's avatar
rna committed
209
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
210
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
rna's avatar
rna committed
211
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
212
        data_len = len(data)
rna's avatar
rna committed
213
        if len(data) <= maximum_payload_size:
214
            cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
215
        else:
rna's avatar
rna committed
216
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
217

218 219 220 221 222 223 224 225 226
    def send_sync(self, data):
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
        data_len = len(data)
        if len(data) <= maximum_payload_size:
            cprrt.PrrtSocket_send_sync(self._c_socket, data, data_len)
        else:
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
Andreas Schmidt's avatar
Andreas Schmidt committed
227

Andreas Schmidt's avatar
Andreas Schmidt committed
228
    # Receiving
Andreas Schmidt's avatar
Andreas Schmidt committed
229
    def recv(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
230 231
        cdef char buffer[65536]
        cdef int32_t len
232
        cdef sockaddr_in addr
Andreas Schmidt's avatar
Andreas Schmidt committed
233
        with nogil:
234 235
            len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
236

237 238 239
    def receive_asap(self):
        cdef char buffer[65536]
        cdef int32_t len
240
        cdef sockaddr_in addr
241
        with nogil:
242 243
            len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
244 245 246 247

    def receive_asap_wait(self):
        cdef char buffer[65536]
        cdef int32_t len
248
        cdef sockaddr_in addr
249
        with nogil:
250 251
            len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
252 253 254 255

    def receive_asap_timedwait(self, deadline):
        cdef char buffer[65536]
        cdef int32_t len
256
        cdef sockaddr_in addr
257
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
258
        with nogil:
259
            len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
260 261 262 263
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
264
        return buffer[:len], sockaddr_to_addr_and_port(addr)
265 266 267 268

    def receive_ordered(self, time_window):
        cdef char buffer[65536]
        cdef int32_t len
269
        cdef sockaddr_in addr
270 271
        cdef uint32_t time_window_us = time_window * (1000**2)
        with nogil:
272 273
            len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
274

275
    def receive_ordered_wait(self, time_window):
276 277
        cdef char buffer[65536]
        cdef int32_t len
278
        cdef sockaddr_in addr
279
        cdef uint32_t time_window_us = time_window * (1000**2)
280
        with nogil:
281 282
            len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
283

284 285 286
    def receive_ordered_timedwait(self, time_window, deadline):
        cdef char buffer[65536]
        cdef int32_t len
287
        cdef sockaddr_in addr
288
        cdef uint32_t time_window_us = time_window * (1000**2)
289
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
290
        with nogil:
291
            len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*>  &addr, time_window_us, &deadline_timespec)
292 293 294 295
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
296
        return buffer[:len], sockaddr_to_addr_and_port(addr)
297

Andreas Schmidt's avatar
Andreas Schmidt committed
298

Andreas Schmidt's avatar
Andreas Schmidt committed
299
    # Internals
300 301 302 303 304 305 306 307 308 309 310 311
    property bbr_state:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_state(self._c_socket)

    property bbr_filled_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_filled_pipe(self._c_socket)

    property bbr_full_bw:
        def __get__(self):
            return cprrt.PrrtSocket_get_full_bw(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
312 313 314 315 316 317 318 319
    property bbr_cycle_index:
        def __get__(self):
            return cprrt.PrrtSocket_get_cycle_index(self._c_socket)

    property bbr_pacing_gain:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_gain(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
320 321 322 323 324 325 326 327
    property bbr_cwnd:
        def __get__(self):
            return cprrt.PrrtSocket_get_cwnd(self._c_socket)

    property bbr_inflight:
        def __get__(self):
            return cprrt.PrrtSocket_get_inflight(self._c_socket)

328 329 330 331
    property bbr_pacing_rate:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_rate(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
332 333 334 335
    property bbr_send_quantum:
        def __get__(self):
            return cprrt.PrrtSocket_get_send_quantum(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
336 337 338 339
    property bbr_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_pipe(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
340 341 342 343
    property bbr_delivered:
        def __get__(self):
            return cprrt.PrrtSocket_get_delivered(self._c_socket)

344
    property bbr_is_app_limited:
345
        def __get__(self):
346 347 348 349 350
            return cprrt.PrrtSocket_get_bbr_is_app_limited(self._c_socket)

    property bbr_app_limited:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_app_limited(self._c_socket)
351

Andreas Schmidt's avatar
Fixes.  
Andreas Schmidt committed
352 353 354 355
    property bbr_round_start:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)

356 357 358
    def __dealloc__(self):
        if self._c_socket != NULL:
            cprrt.PrrtSocket_close(self._c_socket)
359 360 361 362 363 364 365

    def _convert_deadline(self, deadline):
        diff = deadline - self._epoch
        seconds = int(diff.total_seconds())
        nanoseconds = int((diff.total_seconds() % 1) * (1000**3))
        cdef timespec deadline_timespec = timespec(seconds, nanoseconds)
        return deadline_timespec