prrt.pyx 15.5 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
Andreas Schmidt's avatar
Andreas Schmidt committed
2
from libc.stdlib cimport malloc, free
3
from libc.string cimport memset
4
from posix.time cimport timespec
rna's avatar
rna committed
5
cimport cython
Andreas Schmidt's avatar
Andreas Schmidt committed
6

7 8
cimport cprrt

9
import datetime
10 11
import socket
import ipaddress
12

13 14
include "sockets.pxd"

15 16 17 18
class TimeoutException(Exception):
    def __init__(self):
        self.message = "The call timed out."

19 20 21
class PayloadTooBigException(Exception):
    pass

22
cdef extern from "proto/stores/dataPacketStore.c":
23 24
    pass

25
cdef extern from "proto/stores/deliveredPacketTable.c":
26
    pass
27

Andreas Schmidt's avatar
Andreas Schmidt committed
28 29 30 31 32 33
cdef extern from "proto/stores/pace.c":
    pass

cdef extern from "proto/stores/paceFilter.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
34 35 36 37
cdef extern from "proto/stores/receptionTable.c":
    pass

cdef extern from "proto/stores/packetTimeoutTable.c":
38 39
    pass

40
cdef extern from "proto/stores/packetTimeoutTable.c":
41 42
    pass

43 44 45
cdef extern from "proto/stores/inFlightPacketStore.c":
    pass

46
cdef extern from "proto/stores/repairBlockStore.c":
47 48
    pass

49
cdef extern from "proto/stores/packetDeliveryStore.c":
50 51
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
52 53 54
cdef extern from "proto/types/applicationConstraints.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
55 56 57
cdef extern from "proto/types/packetTimeout.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
58 59 60
cdef extern from "proto/types/lossStatistics.c":
    pass

61
cdef extern from "proto/processes/dataReceiver.c":
62
    pass
63

64
cdef extern from "proto/processes/dataTransmitter.c":
65
    pass
66

Andreas Schmidt's avatar
Andreas Schmidt committed
67
cdef extern from "proto/clock.c":
68
    pass
69

70 71 72 73 74 75 76 77 78
cdef extern from "proto/bbr.c":
    pass

cdef extern from "proto/clock.c":
    pass

cdef extern from "proto/timer.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
79
cdef extern from "proto/vdmcode/block_code.c":
80
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
81

Andreas Schmidt's avatar
Andreas Schmidt committed
82
cdef extern from "proto/receiver.c":
83
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
84

Andreas Schmidt's avatar
Andreas Schmidt committed
85
cdef extern from "proto/types/block.c":
86
    pass
87

Andreas Schmidt's avatar
Andreas Schmidt committed
88
cdef extern from "proto/types/channelStateInformation.c":
89
    pass
90

Andreas Schmidt's avatar
Andreas Schmidt committed
91
cdef extern from "proto/types/codingParams.c":
92
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
93

94
cdef extern from "proto/types/packet.c":
95
    pass
96

97
cdef extern from "proto/socket.c":
98
    pass
99 100

cdef extern from "util/bptree.c":
101
    pass
102

103 104
cdef extern from "util/bitmap.c":
    pass
105

106 107 108
cdef extern from "util/common.c":
    pass

109 110
cdef extern from "util/list.c":
    pass
111

Andreas Schmidt's avatar
Andreas Schmidt committed
112 113 114
cdef extern from "util/pipe.c":
    pass

rna's avatar
rna committed
115 116 117
cdef extern from "util/time.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
118 119 120
cdef extern from "util/mpsc_queue.c":
    pass

121 122 123
cdef extern from "util/windowedFilter.c":
    pass

124
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
125
    return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port))
126

rna's avatar
rna committed
127
class PrrtCodingConfiguration:
128 129 130
    def __init__(self, n, k, n_cycle=None):
        if n < k:
            raise ValueError("n must be greater or equal k.")
rna's avatar
rna committed
131 132
        self.n = n
        self.k = k
133 134
        self.r = n - k

135 136 137 138 139
        if self.r != 0 and n_cycle is None:
            raise ValueError("n_cycle cannot be None if (n-k) != 0.")

        if sum(n_cycle) != (n-k):
            raise ValueError("The elements in n_cycle must sum up to n-k.")
140 141

        self.n_cycle = n_cycle if n_cycle is not None else [0]
142

143
    def __repr__(self):
rna's avatar
rna committed
144
        return "PrrtCodingConfiguration(n={},k={},n_cycle={})".format(self.n, self.k, self.n_cycle)
145

146
    def __str__(self):
147 148
        return "({},{},{})".format(self.n, self.k, self.n_cycle)

Andreas Schmidt's avatar
Andreas Schmidt committed
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
class PrrtPace:
    def __init__(self, internal, dependent, external):
        self.internal = internal
        self.dependent = dependent
        self.external = external

    @property
    def total(self):
        return self.internal + self.external

    @property
    def effective(self):
        return self.internal + self.external - self.dependent

class PrrtPaces:
164
    def __init__(self, appSend, prrtTransmit, prrtReceive, appDeliver, peerPace, networkPace):
Andreas Schmidt's avatar
Andreas Schmidt committed
165 166 167 168
        self.appSend = appSend
        self.prrtTransmit = prrtTransmit
        self.prrtReceive = prrtReceive
        self.appDeliver = appDeliver
169 170
        self.peerPace = peerPace
        self.networkPace = networkPace
Andreas Schmidt's avatar
Andreas Schmidt committed
171

172 173
cdef class PrrtSocket:
    cdef cprrt.PrrtSocket* _c_socket
174
    _epoch = datetime.datetime.utcfromtimestamp(0)
175

176
    def __cinit__(self, address, maximum_payload_size = 1400, target_delay = 1, thread_pinning = False, hardware_timestamping_interface=None):
177
        host, port = address
178
        target_delay_us = target_delay * 1000**2
rna's avatar
rna committed
179
        self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
180 181
        if thread_pinning:
            cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
182 183
        if hardware_timestamping_interface is not None:
            cprrt.PrrtSocket_enable_hardware_timestamping(self._c_socket, hardware_timestamping_interface.encode("UTF-8"))
184 185
        h_errno = cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
        if h_errno != 0:
186 187 188 189
            # PrrtSocket_bind calls PrrtSocket_close on error
            # so we need to set _c_socket to NULL because otherwise __dealloc__
            # will attempt to call PrrtSocket_close again on the closed socket
            self._c_socket = NULL
190 191 192 193 194 195 196 197
            # TODO: use hstrerror() instead
            raise ValueError("PrrtSocket_bind failed: " + ({
                1: 'host not found.',
                2: 'try again.',
                3: 'no recovery.',
                4: 'no data.',
                -1: 'netdb internal error.',
            }).get(h_errno, 'unknown error.'))
Andreas Schmidt's avatar
Andreas Schmidt committed
198

Andreas Schmidt's avatar
Andreas Schmidt committed
199 200
    # Channel Properties
    property data_rate_btl_fwd:
Andreas Schmidt's avatar
Andreas Schmidt committed
201
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
202
            return cprrt.PrrtSocket_get_btldatarate_fwd(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
203

Andreas Schmidt's avatar
Andreas Schmidt committed
204
    property data_rate_btl_back:
205
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
206
            return cprrt.PrrtSocket_get_btldatarate_back(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
207 208 209 210 211 212 213 214

    property rtt_prop_fwd:
        def __get__(self):
            return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001

    property loss_rate_fwd:
        def __get__(self):
            return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
215

216

Andreas Schmidt's avatar
Andreas Schmidt committed
217
    # Application Properties
218 219
    property target_delay:
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
220
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "target_delay") * 0.000001
221

rna's avatar
rna committed
222 223 224 225
    property maximum_payload_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")

Andreas Schmidt's avatar
Andreas Schmidt committed
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
    # Pacing
    property paces:
        def __get__(self):
            appSend = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_internal") * 0.000001,
                               cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_dependent") * 0.000001,
                               cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_external") * 0.000001)

            prrtTransmit = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_internal") * 0.000001,
                                    cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_dependent") * 0.000001,
                                    cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_external") * 0.000001)

            prrtReceive = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_internal") * 0.000001,
                                   cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_dependent") * 0.000001,
                                   cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_external") * 0.000001)

            appDeliver = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_internal") * 0.000001,
                                  cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_dependent") * 0.000001,
                                  cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_external") * 0.000001)

245 246 247 248
            peerPace = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "recv_peer_btl_pace") * 0.000001
            networkPace =  cprrt.PrrtSocket_get_sock_opt(self._c_socket, "nw_pace") * 0.000001

            return PrrtPaces(appSend, prrtTransmit, prrtReceive, appDeliver, peerPace, networkPace)
Andreas Schmidt's avatar
Andreas Schmidt committed
249 250 251 252 253 254 255 256 257 258 259 260 261

    property nw_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "nw_pace") * 0.000001

    property recv_peer_btl_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "recv_peer_btl_pace") * 0.000001

    property send_peer_btl_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "send_peer_btl_pace") * 0.000001

Andreas Schmidt's avatar
Andreas Schmidt committed
262 263 264 265 266 267

    # Protocol configuration
    property thread_pinning:
        def __get__(self):
            return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)

268 269 270 271 272 273 274
    property app_queue_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "app_queue_size")

        def __set__(self, value):
            cprrt.PrrtSocket_set_sock_opt(self._c_socket, "app_queue_size", value)

275 276
    property coding_configuration:
        def __get__(self):
277 278
            cdef cprrt.PrrtCodingConfiguration *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
            return PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
rna's avatar
rna committed
279 280 281 282 283 284 285 286

        def __set__(self, params: PrrtCodingConfiguration):
            cdef uint8_t* n_cycle
            c = len(params.n_cycle)
            n_cycle = <uint8_t *> malloc(c * cython.sizeof(int))
            for i, x in enumerate(params.n_cycle):
              n_cycle[i] = x
            cprrt.PrrtSocket_set_coding_parameters(self._c_socket, params.k, params.n, c, n_cycle)
287

Andreas Schmidt's avatar
Andreas Schmidt committed
288 289 290 291 292
    # Sending
    def connect(self, address):
        host, port = address
        cdef bytes encodedHost = host.encode("utf-8")
        cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
293

Andreas Schmidt's avatar
Andreas Schmidt committed
294
    def send(self, data):
rna's avatar
rna committed
295
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
296
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
rna's avatar
rna committed
297
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
298
        data_len = len(data)
rna's avatar
rna committed
299
        if len(data) <= maximum_payload_size:
300
            cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
301
        else:
rna's avatar
rna committed
302
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
Andreas Schmidt's avatar
Andreas Schmidt committed
303

304 305 306 307 308 309 310 311 312
    def send_sync(self, data):
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
        data_len = len(data)
        if len(data) <= maximum_payload_size:
            cprrt.PrrtSocket_send_sync(self._c_socket, data, data_len)
        else:
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
313

Andreas Schmidt's avatar
Andreas Schmidt committed
314
    # Receiving
Andreas Schmidt's avatar
Andreas Schmidt committed
315
    def recv(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
316 317
        cdef char buffer[65536]
        cdef int32_t len
318
        cdef sockaddr_in addr
Andreas Schmidt's avatar
Andreas Schmidt committed
319
        with nogil:
320 321
            len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
322

323 324 325
    def receive_asap(self):
        cdef char buffer[65536]
        cdef int32_t len
326
        cdef sockaddr_in addr
327
        with nogil:
328 329
            len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
330 331 332 333

    def receive_asap_wait(self):
        cdef char buffer[65536]
        cdef int32_t len
334
        cdef sockaddr_in addr
335
        with nogil:
336 337
            len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
338 339 340 341

    def receive_asap_timedwait(self, deadline):
        cdef char buffer[65536]
        cdef int32_t len
342
        cdef sockaddr_in addr
343
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
344
        with nogil:
345
            len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
346 347 348 349
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
350
        return buffer[:len], sockaddr_to_addr_and_port(addr)
351 352 353 354

    def receive_ordered(self, time_window):
        cdef char buffer[65536]
        cdef int32_t len
355
        cdef sockaddr_in addr
356 357
        cdef uint32_t time_window_us = time_window * (1000**2)
        with nogil:
358 359
            len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
360

361
    def receive_ordered_wait(self, time_window):
362 363
        cdef char buffer[65536]
        cdef int32_t len
364
        cdef sockaddr_in addr
365
        cdef uint32_t time_window_us = time_window * (1000**2)
366
        with nogil:
367 368
            len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
369

370 371 372
    def receive_ordered_timedwait(self, time_window, deadline):
        cdef char buffer[65536]
        cdef int32_t len
373
        cdef sockaddr_in addr
374
        cdef uint32_t time_window_us = time_window * (1000**2)
375
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
376
        with nogil:
377
            len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*>  &addr, time_window_us, &deadline_timespec)
378 379 380 381
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
382
        return buffer[:len], sockaddr_to_addr_and_port(addr)
383

Andreas Schmidt's avatar
Andreas Schmidt committed
384

Andreas Schmidt's avatar
Andreas Schmidt committed
385
    # Internals
386 387 388 389 390 391 392 393
    property bbr_state:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_state(self._c_socket)

    property bbr_filled_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_filled_pipe(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
394
    property bbr_full_datarate:
395
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
396
            return cprrt.PrrtSocket_get_full_datarate(self._c_socket)
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438

    property bbr_cycle_index:
        def __get__(self):
            return cprrt.PrrtSocket_get_cycle_index(self._c_socket)

    property bbr_pacing_gain:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_gain(self._c_socket)

    property bbr_cwnd:
        def __get__(self):
            return cprrt.PrrtSocket_get_cwnd(self._c_socket)

    property bbr_inflight:
        def __get__(self):
            return cprrt.PrrtSocket_get_inflight(self._c_socket)

    property bbr_pacing_rate:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_rate(self._c_socket)

    property bbr_send_quantum:
        def __get__(self):
            return cprrt.PrrtSocket_get_send_quantum(self._c_socket)

    property bbr_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_pipe(self._c_socket)

    property bbr_delivered:
        def __get__(self):
            return cprrt.PrrtSocket_get_delivered(self._c_socket)

    property bbr_is_app_limited:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_is_app_limited(self._c_socket)

    property bbr_app_limited:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_app_limited(self._c_socket)

    property bbr_round_start:
Andreas Schmidt's avatar
Andreas Schmidt committed
439
        def __get__(self):
440
            return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
441

442 443 444
    def __dealloc__(self):
        if self._c_socket != NULL:
            cprrt.PrrtSocket_close(self._c_socket)
445 446 447 448 449 450 451

    def _convert_deadline(self, deadline):
        diff = deadline - self._epoch
        seconds = int(diff.total_seconds())
        nanoseconds = int((diff.total_seconds() % 1) * (1000**3))
        cdef timespec deadline_timespec = timespec(seconds, nanoseconds)
        return deadline_timespec