prrt.pyx 12 KB
Newer Older
Andreas Schmidt's avatar
Andreas Schmidt committed
1
from libc.stdint cimport uint32_t, uint16_t, uint8_t, int32_t
Andreas Schmidt's avatar
Andreas Schmidt committed
2
from libc.stdlib cimport malloc, free
3
from libc.string cimport memset
4
from posix.time cimport timespec
rna's avatar
rna committed
5
cimport cython
Andreas Schmidt's avatar
Andreas Schmidt committed
6

7 8
cimport cprrt

9
import datetime
10 11
import socket
import ipaddress
12

13 14
include "sockets.pxd"

15 16 17 18
class TimeoutException(Exception):
    def __init__(self):
        self.message = "The call timed out."

19 20 21
class PayloadTooBigException(Exception):
    pass

22
cdef extern from "proto/stores/dataPacketStore.c":
23 24
    pass

25
cdef extern from "proto/stores/deliveredPacketTable.c":
26
    pass
27

Andreas Schmidt's avatar
Andreas Schmidt committed
28 29 30 31
cdef extern from "proto/stores/receptionTable.c":
    pass

cdef extern from "proto/stores/packetTimeoutTable.c":
32 33
    pass

34
cdef extern from "proto/stores/packetTimeoutTable.c":
35 36
    pass

37 38 39
cdef extern from "proto/stores/inFlightPacketStore.c":
    pass

40
cdef extern from "proto/stores/repairBlockStore.c":
41 42
    pass

43
cdef extern from "proto/stores/packetDeliveryStore.c":
44 45
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
46 47 48
cdef extern from "proto/types/applicationConstraints.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
49 50 51
cdef extern from "proto/types/packetTimeout.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
52 53 54
cdef extern from "proto/types/lossStatistics.c":
    pass

55
cdef extern from "proto/processes/dataReceiver.c":
56
    pass
57

58
cdef extern from "proto/processes/dataTransmitter.c":
59
    pass
60

Andreas Schmidt's avatar
Andreas Schmidt committed
61
cdef extern from "proto/clock.c":
62
    pass
63

Andreas Schmidt's avatar
Andreas Schmidt committed
64 65 66
cdef extern from "proto/bbr.c":
    pass

67
cdef extern from "proto/clock.c":
68
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
69

Andreas Schmidt's avatar
Andreas Schmidt committed
70 71 72
cdef extern from "proto/timer.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
73
cdef extern from "proto/vdmcode/block_code.c":
74
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
75

Andreas Schmidt's avatar
Andreas Schmidt committed
76
cdef extern from "proto/receiver.c":
77
    pass
78

Andreas Schmidt's avatar
Andreas Schmidt committed
79
cdef extern from "proto/types/block.c":
80
    pass
81

Andreas Schmidt's avatar
Andreas Schmidt committed
82
cdef extern from "proto/types/channelStateInformation.c":
83
    pass
84

Andreas Schmidt's avatar
Andreas Schmidt committed
85
cdef extern from "proto/types/codingParams.c":
86
    pass
Andreas Schmidt's avatar
Andreas Schmidt committed
87

88
cdef extern from "proto/types/packet.c":
89
    pass
90

91
cdef extern from "proto/socket.c":
92
    pass
93 94

cdef extern from "util/bptree.c":
95
    pass
96

97 98
cdef extern from "util/bitmap.c":
    pass
99

100 101 102
cdef extern from "util/common.c":
    pass

103 104
cdef extern from "util/list.c":
    pass
105

Andreas Schmidt's avatar
Andreas Schmidt committed
106 107 108
cdef extern from "util/pipe.c":
    pass

rna's avatar
rna committed
109 110 111
cdef extern from "util/time.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
112 113 114
cdef extern from "util/mpsc_queue.c":
    pass

Andreas Schmidt's avatar
Andreas Schmidt committed
115 116 117
cdef extern from "util/windowedFilter.c":
    pass

118
cdef sockaddr_to_addr_and_port(sockaddr_in addr):
119
    return (ipaddress.ip_address(socket.ntohl(addr.sin_addr.s_addr)), socket.ntohs(addr.sin_port))
120

rna's avatar
rna committed
121
class PrrtCodingConfiguration:
122 123 124
    def __init__(self, n, k, n_cycle=None):
        if n < k:
            raise ValueError("n must be greater or equal k.")
rna's avatar
rna committed
125 126
        self.n = n
        self.k = k
127 128
        self.r = n - k

129 130 131 132 133
        if self.r != 0 and n_cycle is None:
            raise ValueError("n_cycle cannot be None if (n-k) != 0.")

        if sum(n_cycle) != (n-k):
            raise ValueError("The elements in n_cycle must sum up to n-k.")
134 135

        self.n_cycle = n_cycle if n_cycle is not None else [0]
136

137
    def __repr__(self):
rna's avatar
rna committed
138
        return "PrrtCodingConfiguration(n={},k={},n_cycle={})".format(self.n, self.k, self.n_cycle)
139

140
    def __str__(self):
141 142
        return "({},{},{})".format(self.n, self.k, self.n_cycle)

143 144
cdef class PrrtSocket:
    cdef cprrt.PrrtSocket* _c_socket
145
    _epoch = datetime.datetime.utcfromtimestamp(0)
146

rna's avatar
rna committed
147
    def __cinit__(self, address, maximum_payload_size = 1400, target_delay = 1, thread_pinning = False):
148
        host, port = address
149
        target_delay_us = target_delay * 1000**2
rna's avatar
rna committed
150
        self._c_socket = cprrt.PrrtSocket_create(maximum_payload_size, target_delay_us)
151 152
        if thread_pinning:
            cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
153 154 155 156 157 158
        if not cprrt.PrrtSocket_bind(self._c_socket, host.encode("utf8"), port):
            # PrrtSocket_bind calls PrrtSocket_close on error
            # so we need to set _c_socket to NULL because otherwise __dealloc__
            # will attempt to call PrrtSocket_close again on the closed socket
            self._c_socket = NULL
            raise ValueError("PrrtSocket_bind failed")
Andreas Schmidt's avatar
Andreas Schmidt committed
159

Andreas Schmidt's avatar
Andreas Schmidt committed
160 161
    # Channel Properties
    property data_rate_btl_fwd:
Andreas Schmidt's avatar
Andreas Schmidt committed
162
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
163
            return cprrt.PrrtSocket_get_btlbw_fwd(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
164

Andreas Schmidt's avatar
Andreas Schmidt committed
165
    property data_rate_btl_back:
Andreas Schmidt's avatar
Andreas Schmidt committed
166
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
167
            return cprrt.PrrtSocket_get_btlbw_back(self._c_socket)
Andreas Schmidt's avatar
Andreas Schmidt committed
168

Andreas Schmidt's avatar
Andreas Schmidt committed
169
    property rtt_prop_fwd:
170
        def __get__(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
171 172 173 174 175
            return cprrt.PrrtSocket_get_rtprop_fwd(self._c_socket) * 0.000001

    property loss_rate_fwd:
        def __get__(self):
            return cprrt.PrrtSocket_get_plr_fwd(self._c_socket)
176 177


Andreas Schmidt's avatar
Andreas Schmidt committed
178
    # Application Properties
179 180
    property target_delay:
        def __get__(self):
181
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "targetdelay") * 0.000001
182

rna's avatar
rna committed
183 184 185 186
    property maximum_payload_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")

Andreas Schmidt's avatar
Andreas Schmidt committed
187 188 189 190 191 192

    # Protocol configuration
    property thread_pinning:
        def __get__(self):
            return cprrt.PrrtSocket_uses_thread_pinning(self._c_socket)

193 194 195 196 197 198 199
    property app_queue_size:
        def __get__(self):
            return cprrt.PrrtSocket_get_sock_opt(self._c_socket, "app_queue_size")

        def __set__(self, value):
            cprrt.PrrtSocket_set_sock_opt(self._c_socket, "app_queue_size", value)

200 201
    property coding_configuration:
        def __get__(self):
202 203
            cdef cprrt.PrrtCodingConfiguration *params = cprrt.PrrtSocket_get_coding_parameters(self._c_socket)
            return PrrtCodingConfiguration(params.n, params.k, list(<uint8_t[:params.c]> params.n_cycle))
rna's avatar
rna committed
204 205 206 207 208 209 210 211

        def __set__(self, params: PrrtCodingConfiguration):
            cdef uint8_t* n_cycle
            c = len(params.n_cycle)
            n_cycle = <uint8_t *> malloc(c * cython.sizeof(int))
            for i, x in enumerate(params.n_cycle):
              n_cycle[i] = x
            cprrt.PrrtSocket_set_coding_parameters(self._c_socket, params.k, params.n, c, n_cycle)
212

Andreas Schmidt's avatar
Andreas Schmidt committed
213 214 215 216 217
    # Sending
    def connect(self, address):
        host, port = address
        cdef bytes encodedHost = host.encode("utf-8")
        cprrt.PrrtSocket_connect(self._c_socket, encodedHost, port)
Andreas Schmidt's avatar
Andreas Schmidt committed
218

Andreas Schmidt's avatar
Andreas Schmidt committed
219
    def send(self, data):
rna's avatar
rna committed
220
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
221
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
rna's avatar
rna committed
222
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
223
        data_len = len(data)
rna's avatar
rna committed
224
        if len(data) <= maximum_payload_size:
225
            cprrt.PrrtSocket_send_async(self._c_socket, data, data_len)
226
        else:
rna's avatar
rna committed
227
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
228

229 230 231 232 233 234 235 236 237
    def send_sync(self, data):
        if cprrt.PrrtSocket_get_sock_opt(self._c_socket, "connected") == 0:
            raise Exception("PrrtSocket must be connected first, before data can be sent.")
        maximum_payload_size = cprrt.PrrtSocket_get_sock_opt(self._c_socket, "maximum_payload_size")
        data_len = len(data)
        if len(data) <= maximum_payload_size:
            cprrt.PrrtSocket_send_sync(self._c_socket, data, data_len)
        else:
            raise PayloadTooBigException("Sending packet of {} bytes on a socket with maximum payload size of {} bytes failed.".format(data_len, maximum_payload_size))
Andreas Schmidt's avatar
Andreas Schmidt committed
238

Andreas Schmidt's avatar
Andreas Schmidt committed
239
    # Receiving
Andreas Schmidt's avatar
Andreas Schmidt committed
240
    def recv(self):
Andreas Schmidt's avatar
Andreas Schmidt committed
241 242
        cdef char buffer[65536]
        cdef int32_t len
243
        cdef sockaddr_in addr
Andreas Schmidt's avatar
Andreas Schmidt committed
244
        with nogil:
245 246
            len = cprrt.PrrtSocket_recv(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
247

248 249 250
    def receive_asap(self):
        cdef char buffer[65536]
        cdef int32_t len
251
        cdef sockaddr_in addr
252
        with nogil:
253 254
            len = cprrt.PrrtSocket_receive_asap(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
255 256 257 258

    def receive_asap_wait(self):
        cdef char buffer[65536]
        cdef int32_t len
259
        cdef sockaddr_in addr
260
        with nogil:
261 262
            len = cprrt.PrrtSocket_receive_asap_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
263 264 265 266

    def receive_asap_timedwait(self, deadline):
        cdef char buffer[65536]
        cdef int32_t len
267
        cdef sockaddr_in addr
268
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
269
        with nogil:
270
            len = cprrt.PrrtSocket_receive_asap_timedwait(self._c_socket, <void*> buffer, <sockaddr*> &addr, &deadline_timespec)
271 272 273 274
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
275
        return buffer[:len], sockaddr_to_addr_and_port(addr)
276 277 278 279

    def receive_ordered(self, time_window):
        cdef char buffer[65536]
        cdef int32_t len
280
        cdef sockaddr_in addr
281 282
        cdef uint32_t time_window_us = time_window * (1000**2)
        with nogil:
283 284
            len = cprrt.PrrtSocket_receive_ordered(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
285

286
    def receive_ordered_wait(self, time_window):
287 288
        cdef char buffer[65536]
        cdef int32_t len
289
        cdef sockaddr_in addr
290
        cdef uint32_t time_window_us = time_window * (1000**2)
291
        with nogil:
292 293
            len = cprrt.PrrtSocket_receive_ordered_wait(self._c_socket, <void*> buffer, <sockaddr*> &addr, time_window_us)
        return buffer[:len], sockaddr_to_addr_and_port(addr)
294

295 296 297
    def receive_ordered_timedwait(self, time_window, deadline):
        cdef char buffer[65536]
        cdef int32_t len
298
        cdef sockaddr_in addr
299
        cdef uint32_t time_window_us = time_window * (1000**2)
300
        cdef timespec deadline_timespec = self._convert_deadline(deadline)
301
        with nogil:
302
            len = cprrt.PrrtSocket_receive_ordered_timedwait(self._c_socket, <void*> buffer, <sockaddr*>  &addr, time_window_us, &deadline_timespec)
303 304 305 306
        if len < 0:
            raise TimeoutException()
        if len == 0:
            return (None, None)
307
        return buffer[:len], sockaddr_to_addr_and_port(addr)
308

Andreas Schmidt's avatar
Andreas Schmidt committed
309

Andreas Schmidt's avatar
Andreas Schmidt committed
310
    # Internals
311 312 313 314 315 316 317 318 319 320 321 322
    property bbr_state:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_state(self._c_socket)

    property bbr_filled_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_filled_pipe(self._c_socket)

    property bbr_full_bw:
        def __get__(self):
            return cprrt.PrrtSocket_get_full_bw(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
323 324 325 326 327 328 329 330
    property bbr_cycle_index:
        def __get__(self):
            return cprrt.PrrtSocket_get_cycle_index(self._c_socket)

    property bbr_pacing_gain:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_gain(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
331 332 333 334 335 336 337 338
    property bbr_cwnd:
        def __get__(self):
            return cprrt.PrrtSocket_get_cwnd(self._c_socket)

    property bbr_inflight:
        def __get__(self):
            return cprrt.PrrtSocket_get_inflight(self._c_socket)

339 340 341 342
    property bbr_pacing_rate:
        def __get__(self):
            return cprrt.PrrtSocket_get_pacing_rate(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
343 344 345 346
    property bbr_send_quantum:
        def __get__(self):
            return cprrt.PrrtSocket_get_send_quantum(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
347 348 349 350
    property bbr_pipe:
        def __get__(self):
            return cprrt.PrrtSocket_get_pipe(self._c_socket)

Andreas Schmidt's avatar
Andreas Schmidt committed
351 352 353 354
    property bbr_delivered:
        def __get__(self):
            return cprrt.PrrtSocket_get_delivered(self._c_socket)

355
    property bbr_is_app_limited:
356
        def __get__(self):
357 358 359 360 361
            return cprrt.PrrtSocket_get_bbr_is_app_limited(self._c_socket)

    property bbr_app_limited:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_app_limited(self._c_socket)
362

Andreas Schmidt's avatar
Fixes.  
Andreas Schmidt committed
363 364 365 366
    property bbr_round_start:
        def __get__(self):
            return cprrt.PrrtSocket_get_bbr_round_start(self._c_socket)

367 368 369
    def __dealloc__(self):
        if self._c_socket != NULL:
            cprrt.PrrtSocket_close(self._c_socket)
370 371 372 373 374 375 376

    def _convert_deadline(self, deadline):
        diff = deadline - self._epoch
        seconds = int(diff.total_seconds())
        nanoseconds = int((diff.total_seconds() % 1) * (1000**3))
        cdef timespec deadline_timespec = timespec(seconds, nanoseconds)
        return deadline_timespec