prrt.pyx 15 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
Andreas Schmidt's avatar
Andreas Schmidt committed
2
from libc.stdlib cimport malloc, free
3
from libc.string cimport memset
4
from posix.time cimport timespec
rna's avatar
rna committed
5
cimport cython
Andreas Schmidt's avatar
Andreas Schmidt committed
6

7 8
cimport cprrt

9
import datetime
10 11
import socket
import ipaddress
12

13 14
include "sockets.pxd"

15 16 17 18
class TimeoutException(Exception):
    def __init__(self):
        self.message = "The call timed out."

19 20 21
class PayloadTooBigException(Exception):
    pass

22
cdef extern from "proto/stores/dataPacketStore.c":
23 24
    pass

25
cdef extern from "proto/stores/deliveredPacketTable.c":
26
    pass
27

Andreas Schmidt's avatar
Andreas Schmidt committed
28 29 30 31 32 33
cdef extern from "proto/stores/pace.c":
    pass

cdef extern from "proto/stores/paceFilter.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
34 35 36 37
cdef extern from "proto/stores/receptionTable.c":
    pass

cdef extern from "proto/stores/packetTimeoutTable.c":
38 39
    pass

40
cdef extern from "proto/stores/packetTimeoutTable.c":
41 42
    pass

43 44 45
cdef extern from "proto/stores/inFlightPacketStore.c":
    pass

46
cdef extern from "proto/stores/repairBlockStore.c":
47 48
    pass

49
cdef extern from "proto/stores/packetDeliveryStore.c":
50 51
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
52 53 54
cdef extern from "proto/types/applicationConstraints.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
55 56 57
cdef extern from "proto/types/packetTimeout.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
58 59 60
cdef extern from "proto/types/lossStatistics.c":
    pass

61
cdef extern from "proto/processes/dataReceiver.c":
62
    pass
63

64
cdef extern from "proto/processes/dataTransmitter.c":
65
    pass
66

Andreas Schmidt's avatar
Andreas Schmidt committed
67
cdef extern from "proto/clock.c":
68
    pass
69

Andreas Schmidt's avatar
Andreas Schmidt committed
70 71 72
cdef extern from "proto/bbr.c":
    pass

73
cdef extern from "proto/clock.c":
74
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
75

Andreas Schmidt's avatar
Andreas Schmidt committed
76
cdef extern from "proto/timer.c":
77
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
78

79
cdef extern from "proto/vdmcode/block_code.c":
80
    pass
81

Andreas Schmidt's avatar
Andreas Schmidt committed
82
cdef extern from "proto/receiver.c":
83
    pass
84

Andreas Schmidt's avatar
Andreas Schmidt committed
85
cdef extern from "proto/types/block.c":
86
    pass
87

Andreas Schmidt's avatar
Andreas Schmidt committed
88
cdef extern from "proto/types/channelStateInformation.c":
89
    pass
90

Andreas Schmidt's avatar
Andreas Schmidt committed
91
cdef extern from "proto/types/codingParams.c":
92
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
93

94
cdef extern from "proto/types/packet.c":
95
    pass
96

97
cdef extern from "proto/socket.c":
98
    pass
99 100

cdef extern from "util/bptree.c":
101
    pass
102

103 104
cdef extern from "util/bitmap.c":
    pass
105

106 107 108
cdef extern from "util/common.c":
    pass

109 110
cdef extern from "util/list.c":
    pass
111

Andreas Schmidt's avatar
Andreas Schmidt committed
112 113 114
cdef extern from "util/pipe.c":
    pass

rna's avatar
rna committed
115 116 117
cdef extern from "util/time.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
118 119 120
cdef extern from "util/mpsc_queue.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
121 122 123
cdef extern from "util/windowedFilter.c":
    pass

124
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
125
    return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port))
126

rna's avatar
rna committed
127
class PrrtCodingConfiguration:
128 129 130
    def __init__(self, n, k, n_cycle=None):
        if n < k:
            raise ValueError("n must be greater or equal k.")
rna's avatar
rna committed
131 132
        self.n = n
        self.k = k
133 134
        self.r = n - k

135 136 137 138 139
        if self.r != 0 and n_cycle is None:
            raise ValueError("n_cycle cannot be None if (n-k) != 0.")

        if sum(n_cycle) != (n-k):
            raise ValueError("The elements in n_cycle must sum up to n-k.")
140 141

        self.n_cycle = n_cycle if n_cycle is not None else [0]
142

143
    def __repr__(self):
rna's avatar
rna committed
144
        return "PrrtCodingConfiguration(n={},k={},n_cycle={})".format(self.n, self.k, self.n_cycle)
145

146
    def __str__(self):
147 148
        return "({},{},{})".format(self.n, self.k, self.n_cycle)

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
class PrrtPace:
    def __init__(self, internal, dependent, external):
        self.internal = internal
        self.dependent = dependent
        self.external = external

    @property
    def total(self):
        return self.internal + self.external

    @property
    def effective(self):
        return self.internal + self.external - self.dependent

class PrrtPaces:
    def __init__(self, appSend, prrtTransmit, prrtReceive, appDeliver):
        self.appSend = appSend
        self.prrtTransmit = prrtTransmit
        self.prrtReceive = prrtReceive
        self.appDeliver = appDeliver

170 171
cdef class PrrtSocket:
    cdef cprrt.PrrtSocket* _c_socket
172
    _epoch = datetime.datetime.utcfromtimestamp(0)
173

rna's avatar
rna committed
174
    def __cinit__(self, address, maximum_payload_size = 1400, target_delay = 1, thread_pinning = False):
175
        host, port = address
176
        target_delay_us = target_delay * 1000**2
rna's avatar
rna committed
177
        self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
178 179
        if thread_pinning:
            cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
180 181
        h_errno = cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port)
        if h_errno != 0:
182 183 184 185
            # PrrtSocket_bind calls PrrtSocket_close on error
            # so we need to set _c_socket to NULL because otherwise __dealloc__
            # will attempt to call PrrtSocket_close again on the closed socket
            self._c_socket = NULL
Andreas Schmidt's avatar
Andreas Schmidt committed
186 187 188 189 190 191 192 193
            # TODO: use hstrerror() instead
            raise ValueError("PrrtSocket_bind failed: " + ({
                1: 'host not found.',
                2: 'try again.',
                3: 'no recovery.',
                4: 'no data.',
                -1: 'netdb internal error.',
            }).get(h_errno, 'unknown error.'))
Andreas Schmidt's avatar
Andreas Schmidt committed
194

Andreas Schmidt's avatar
Andreas Schmidt committed
195 196
    # Channel Properties
    property data_rate_btl_fwd:
Andreas Schmidt's avatar
Andreas Schmidt committed
197
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
198
            return cprrt.PrrtSocket_get_btlbw_fwd(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
199

Andreas Schmidt's avatar
Andreas Schmidt committed
200
    property data_rate_btl_back:
Andreas Schmidt's avatar
Andreas Schmidt committed
201
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
202
            return cprrt.PrrtSocket_get_btlbw_back(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
203

Andreas Schmidt's avatar
Andreas Schmidt committed
204
    property rtt_prop_fwd:
205
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
206 207 208 209 210
            return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001

    property loss_rate_fwd:
        def __get__(self):
            return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
211 212


Andreas Schmidt's avatar
Andreas Schmidt committed
213
    # Application Properties
214 215
    property target_delay:
        def __get__(self):
216
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
217

rna's avatar
rna committed
218 219 220 221
    property maximum_payload_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")

Andreas Schmidt's avatar
Andreas Schmidt committed
222
    # Pacing
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
    property paces:
        def __get__(self):
            appSend = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_internal") * 0.000001,
                               cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_dependent") * 0.000001,
                               cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appSend_pace_external") * 0.000001)

            prrtTransmit = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_internal") * 0.000001,
                                    cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_dependent") * 0.000001,
                                    cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtTransmit_pace_external") * 0.000001)

            prrtReceive = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_internal") * 0.000001,
                                   cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_dependent") * 0.000001,
                                   cprrt.PrrtSocket_get_sock_opt(self._c_socket, "prrtReceive_pace_external") * 0.000001)

            appDeliver = PrrtPace(cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_internal") * 0.000001,
                                  cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_dependent") * 0.000001,
                                  cprrt.PrrtSocket_get_sock_opt(self._c_socket, "appDeliver_pace_external") * 0.000001)

            return PrrtPaces(appSend, prrtTransmit, prrtReceive, appDeliver)

Andreas Schmidt's avatar
Andreas Schmidt committed
243 244 245
    property nw_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "nw_pace") * 0.000001
Andreas Schmidt's avatar
Andreas Schmidt committed
246 247 248 249 250 251 252 253 254 255

    property recv_peer_btl_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "recv_peer_btl_pace") * 0.000001

    property send_peer_btl_pace:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "send_peer_btl_pace") * 0.000001


Andreas Schmidt's avatar
Andreas Schmidt committed
256 257 258 259 260
    # Protocol configuration
    property thread_pinning:
        def __get__(self):
            return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)

261 262 263 264 265 266 267
    property app_queue_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "app_queue_size")

        def __set__(self, value):
            cprrt.PrrtSocket_set_sock_opt(self._c_socket, "app_queue_size", value)

268 269
    property coding_configuration:
        def __get__(self):
270 271
            cdef cprrt.PrrtCodingConfiguration *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
            return PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
rna's avatar
rna committed
272 273 274 275 276 277 278 279

        def __set__(self, params: PrrtCodingConfiguration):
            cdef uint8_t* n_cycle
            c = len(params.n_cycle)
            n_cycle = <uint8_t *> malloc(c * cython.sizeof(int))
            for i, x in enumerate(params.n_cycle):
              n_cycle[i] = x
            cprrt.PrrtSocket_set_coding_parameters(self._c_socket, params.k, params.n, c, n_cycle)
280

Andreas Schmidt's avatar
Andreas Schmidt committed
281 282 283 284 285
    # Sending
    def connect(self, address):
        host, port = address
        cdef bytes encodedHost = host.encode("utf-8")
        cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
Andreas Schmidt's avatar
Andreas Schmidt committed
286

Andreas Schmidt's avatar
Andreas Schmidt committed
287
    def send(self, data):
rna's avatar
rna committed
288
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
289
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
rna's avatar
rna committed
290
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
291
        data_len = len(data)
rna's avatar
rna committed
292
        if len(data) <= maximum_payload_size:
Andreas Schmidt's avatar
Andreas Schmidt committed
293
            cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
294
        else:
rna's avatar
rna committed
295
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
296

Andreas Schmidt's avatar
Andreas Schmidt committed
297 298 299 300 301 302 303 304 305
    def send_sync(self, data):
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
        data_len = len(data)
        if len(data) <= maximum_payload_size:
            cprrt.PrrtSocket_send_sync(self._c_socket, data, data_len)
        else:
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
Andreas Schmidt's avatar
Andreas Schmidt committed
306

Andreas Schmidt's avatar
Andreas Schmidt committed
307
    # Receiving
Andreas Schmidt's avatar
Andreas Schmidt committed
308
    def recv(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
309 310
        cdef char buffer[65536]
        cdef int32_t len
311
        cdef sockaddr_in addr
Andreas Schmidt's avatar
Andreas Schmidt committed
312
        with nogil:
313 314
            len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
315

316 317 318
    def receive_asap(self):
        cdef char buffer[65536]
        cdef int32_t len
319
        cdef sockaddr_in addr
320
        with nogil:
321 322
            len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
323 324 325 326

    def receive_asap_wait(self):
        cdef char buffer[65536]
        cdef int32_t len
327
        cdef sockaddr_in addr
328
        with nogil:
329 330
            len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
331 332 333 334

    def receive_asap_timedwait(self, deadline):
        cdef char buffer[65536]
        cdef int32_t len
335
        cdef sockaddr_in addr
336
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
337
        with nogil:
338
            len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
339 340 341 342
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
343
        return buffer[:len], sockaddr_to_addr_and_port(addr)
344 345 346 347

    def receive_ordered(self, time_window):
        cdef char buffer[65536]
        cdef int32_t len
348
        cdef sockaddr_in addr
349 350
        cdef uint32_t time_window_us = time_window * (1000**2)
        with nogil:
351 352
            len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
353

354
    def receive_ordered_wait(self, time_window):
355 356
        cdef char buffer[65536]
        cdef int32_t len
357
        cdef sockaddr_in addr
358
        cdef uint32_t time_window_us = time_window * (1000**2)
359
        with nogil:
360 361
            len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
362

363 364 365
    def receive_ordered_timedwait(self, time_window, deadline):
        cdef char buffer[65536]
        cdef int32_t len
366
        cdef sockaddr_in addr
367
        cdef uint32_t time_window_us = time_window * (1000**2)
368
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
369
        with nogil:
370
            len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*>  &addr, time_window_us, &deadline_timespec)
371 372 373 374
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
375
        return buffer[:len], sockaddr_to_addr_and_port(addr)
376

Andreas Schmidt's avatar
Andreas Schmidt committed
377

Andreas Schmidt's avatar
Andreas Schmidt committed
378
    # Internals
379 380 381 382 383 384 385 386 387 388 389 390
    property bbr_state:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_state(self._c_socket)

    property bbr_filled_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_filled_pipe(self._c_socket)

    property bbr_full_bw:
        def __get__(self):
            return cprrt.PrrtSocket_get_full_bw(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
391 392 393 394 395 396 397 398
    property bbr_cycle_index:
        def __get__(self):
            return cprrt.PrrtSocket_get_cycle_index(self._c_socket)

    property bbr_pacing_gain:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_gain(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
399 400 401 402 403 404 405 406
    property bbr_cwnd:
        def __get__(self):
            return cprrt.PrrtSocket_get_cwnd(self._c_socket)

    property bbr_inflight:
        def __get__(self):
            return cprrt.PrrtSocket_get_inflight(self._c_socket)

407 408 409 410
    property bbr_pacing_rate:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_rate(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
411 412 413 414
    property bbr_send_quantum:
        def __get__(self):
            return cprrt.PrrtSocket_get_send_quantum(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
415 416 417 418
    property bbr_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_pipe(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
419 420 421 422
    property bbr_delivered:
        def __get__(self):
            return cprrt.PrrtSocket_get_delivered(self._c_socket)

423
    property bbr_is_app_limited:
424
        def __get__(self):
425 426 427 428 429
            return cprrt.PrrtSocket_get_bbr_is_app_limited(self._c_socket)

    property bbr_app_limited:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_app_limited(self._c_socket)
430

Andreas Schmidt's avatar
Fixes.  
Andreas Schmidt committed
431 432 433 434
    property bbr_round_start:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
435

436 437 438
    def __dealloc__(self):
        if self._c_socket != NULL:
            cprrt.PrrtSocket_close(self._c_socket)
439 440 441 442 443 444 445

    def _convert_deadline(self, deadline):
        diff = deadline - self._epoch
        seconds = int(diff.total_seconds())
        nanoseconds = int((diff.total_seconds() % 1) * (1000**3))
        cdef timespec deadline_timespec = timespec(seconds, nanoseconds)
        return deadline_timespec