time-protocol.h 10.5 KB
Newer Older
1
#include <assert.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include "util/time.h"
3 4 5 6 7 8 9 10 11 12 13 14

#ifdef XLAP
#  error "does not support XLAP right now"
#endif

#ifdef TCP
#
#  define PROTOCOL "tcp"
#  include <sys/types.h>
#  include <sys/socket.h>
#  include <netdb.h>
#  include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
15
#  include <netinet/in.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
16
#  include <netinet/tcp.h>
17 18 19

typedef int socket_t;

Andreas Schmidt's avatar
Andreas Schmidt committed
20
static inline void _time() {
Andreas Schmidt's avatar
Andreas Schmidt committed
21 22 23
    long long       ns; // Milliseconds
    time_t          s;  // Seconds
    struct timespec spec;
Andreas Schmidt's avatar
Andreas Schmidt committed
24

Andreas Schmidt's avatar
Andreas Schmidt committed
25 26 27 28
    clock_gettime(CLOCK_REALTIME, &spec);

    s  = spec.tv_sec;
    ns = spec.tv_nsec;
Andreas Schmidt's avatar
Fix.  
Andreas Schmidt committed
29
    printf("T: %09ld.%09ld\n", s, ns);
Andreas Schmidt's avatar
Andreas Schmidt committed
30 31
}

32 33
static inline socket_t _open_sender(struct arguments *args)
{
34 35 36 37 38 39 40 41 42 43 44 45 46 47
    struct addrinfo hints = {
      .ai_socktype = SOCK_STREAM,
      .ai_family = AF_UNSPEC,
      .ai_flags = AI_ADDRCONFIG,
    };

    char port[16];
    snprintf(port, sizeof(port), "%d", (int) args->port);

    struct addrinfo *head;
    int fail = getaddrinfo(args->target, port, &hints, &head);
    assert(!fail);
    int sock;
    struct addrinfo *iter;
Andreas Schmidt's avatar
Andreas Schmidt committed
48 49
    int send_buffer_size = args->sndbuf;
    uint32_t send_buffer_size_len = sizeof(send_buffer_size);
50 51 52

    for (iter = head; iter != NULL; iter = iter->ai_next) {
        sock = socket(iter->ai_family, iter->ai_socktype, iter->ai_protocol);
Pablo's avatar
Pablo committed
53

54 55 56 57 58 59 60 61
        if(args->optimized) {
          // Disable Nagles algorithm.
          int flag = 1;
          setsockopt(sock, SOL_TCP, TCP_NODELAY, &flag, sizeof(flag));
          setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &send_buffer_size, send_buffer_size_len);
          getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &send_buffer_size, &send_buffer_size_len);
          printf("SO_SNDBUF: %d\n", send_buffer_size);
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
62
        int connect_res = connect(sock, iter->ai_addr, iter->ai_addrlen);
63
        if (connect_res == 0) {
64
            break;
Andreas Schmidt's avatar
Andreas Schmidt committed
65 66 67
        } else {
            printf("CONNECT ERROR: %d, errno: %d\n", connect_res, errno);
        }
68 69 70 71 72
        close(sock);
    }
    assert(iter && "failed to connect via TCP");
    freeaddrinfo(head);
    return sock;
73 74 75 76
}

static inline socket_t _open_receiver(struct arguments *args)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
77
    int ssock = socket(AF_INET, SOCK_STREAM, 0);
78
    assert(ssock >= 0);
79

rna's avatar
rna committed
80 81 82
    int yes = 1;
    setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));

83 84
    if(args->optimized) {
      setsockopt(ssock, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes));
Andreas Schmidt's avatar
Andreas Schmidt committed
85

86 87 88 89 90 91
      int recv_buffer_size = args->rcvbuf;
      uint32_t recv_buffer_size_len = sizeof(recv_buffer_size);
      setsockopt(ssock, SOL_SOCKET, SO_RCVBUF, &recv_buffer_size, recv_buffer_size_len);
      getsockopt(ssock, SOL_SOCKET, SO_RCVBUF, &recv_buffer_size, &recv_buffer_size_len);
      printf("SO_RCVBUF: %d\n", recv_buffer_size);
    }
92

Andreas Schmidt's avatar
Andreas Schmidt committed
93 94 95
    struct sockaddr_in address = {
        .sin_family = AF_INET,
        .sin_port = htons(args->port),
96
    };
Andreas Schmidt's avatar
Andreas Schmidt committed
97 98
    memset(address.sin_zero, '\0', sizeof(address.sin_zero));
    address.sin_addr.s_addr = htonl(INADDR_ANY);
99
    assert(0 == bind(ssock, (const struct sockaddr *) &address, sizeof(address)));
100

101
    assert(0 == listen(ssock, SOMAXCONN));
102

103 104
    int sock = accept(ssock, NULL, NULL);
    assert(sock >= 0);
105

Andreas Schmidt's avatar
Andreas Schmidt committed
106 107
    int res = close(ssock);
    assert(res >= 0);
108

109
    return sock;
110 111
}

Andreas Schmidt's avatar
Andreas Schmidt committed
112
static inline ssize_t _send(socket_t conn, const char *buf, size_t size)
113
{
Andreas Schmidt's avatar
Andreas Schmidt committed
114
    int written = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
115
    ssize_t n = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
116
    while (written < size) {
Andreas Schmidt's avatar
Andreas Schmidt committed
117 118 119 120 121 122 123 124 125 126 127 128 129
        fd_set wfds;
        struct timeval tv;
        int retval;
        FD_ZERO(&wfds);
        FD_SET(conn, &wfds);

        tv.tv_sec = 1;
        tv.tv_usec = 0;
        retval = select(conn + 1, NULL, &wfds, NULL, &tv);
        if (retval == -1) {
            printf("Select failed: %d.\n", errno);
            return -1;
        } else if (retval > 0) {
130 131 132 133
            if (!FD_ISSET(conn, &wfds)) {
                printf("FD not set: %d\n", conn);
                continue;
            }
Andreas Schmidt's avatar
Andreas Schmidt committed
134
            n = write(conn, buf, size - written);
Andreas Schmidt's avatar
Andreas Schmidt committed
135 136
            if (n < 0) {
                _time();
Andreas Schmidt's avatar
Andreas Schmidt committed
137
                printf("SEND FAILED: %ld, errno: %d, written: %ld / %lu\n", n, errno, written, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
138 139 140 141 142 143 144 145 146 147 148 149
                printf("%s\n", buf - written);
                return written;
            }
            if (n == 0) {
                break;
            }
            written += n;
            buf += n;
        } else {
            continue;
        }

Andreas Schmidt's avatar
Andreas Schmidt committed
150
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
151
    return written;
152 153 154 155
}

static inline ssize_t _recv(socket_t conn, char *buf, size_t size)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
156
    ssize_t received = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
157
    ssize_t n = 0;
158 159 160 161 162 163 164
    while (received < size && keepRunning) {
        fd_set rfds;
        struct timeval tv;
        int retval;
        FD_ZERO(&rfds);
        FD_SET(conn, &rfds);

165
        tv.tv_sec = 1;
166
        tv.tv_usec = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
167
        retval = select(conn + 1, &rfds, NULL, NULL, &tv);
Andreas Schmidt's avatar
Andreas Schmidt committed
168
        if (!keepRunning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
169
            printf("No longer running.\n");
Andreas Schmidt's avatar
Andreas Schmidt committed
170 171
            return -1;
        }
172 173

        if (retval == -1) {
Andreas Schmidt's avatar
Andreas Schmidt committed
174
            printf("Select failed: %d.\n", errno);
175
            return -1;
Andreas Schmidt's avatar
Andreas Schmidt committed
176
        } else if (retval > 0) {
177 178 179 180
            if (!FD_ISSET(conn, &rfds)) {
                printf("FD not set: %d\n", conn);
                continue;
            }
181
            n = recv(conn, buf, size - received, 0);
Andreas Schmidt's avatar
Andreas Schmidt committed
182
            if (n < 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
183
                _time();
Andreas Schmidt's avatar
Andreas Schmidt committed
184
                printf("RECV FAILED: %ld, errno: %d, received: %ld / %lu\n", n, errno, received, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
185 186
                return -1;
            }
187 188 189 190 191 192 193
            if (n == 0) {
                break;
            }
            buf += n;
            received += n;
        } else {
            continue;
Andreas Schmidt's avatar
Andreas Schmidt committed
194 195 196
        }
    }
    return received;
197 198 199 200
}

static inline void _close(socket_t conn)
{
Andreas Schmidt's avatar
Fix.  
Andreas Schmidt committed
201 202
    char buffer[10];
    int res;
Andreas Schmidt's avatar
Andreas Schmidt committed
203 204
    shutdown(conn, SHUT_WR);
    for(;;) {
Andreas Schmidt's avatar
Andreas Schmidt committed
205
        res = read(conn, buffer, 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
206 207 208 209 210 211 212
        if(res < 0) {
            perror("reading");
            exit(1);
        }
        if(!res)
            break;
    }
Andreas Schmidt's avatar
Fix.  
Andreas Schmidt committed
213
    res = close(conn);
Andreas Schmidt's avatar
Andreas Schmidt committed
214
    assert(res >= 0);
215 216
}

Andreas Schmidt's avatar
Andreas Schmidt committed
217 218 219 220 221 222 223 224
static inline long long int _send_peer_btl_pace(socket_t conn) {
    return 0;
}

static inline long long int _recv_peer_btl_pace(socket_t conn) {
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
225 226 227 228
static inline long long int _nw_pace(socket_t conn) {
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
229
static inline long long int _app_send_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
230 231 232
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
233
static inline long long int _app_send_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
234 235 236
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
237
static inline long long int _app_send_pace_external(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
238 239 240
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
241
static inline long long int _trans_transmit_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
242 243 244
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
245
static inline long long int _trans_transmit_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
246 247 248
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
249
static inline long long int _trans_transmit_pace_external(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
250 251 252
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
253
static inline long long int _trans_receive_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
254 255 256
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
257
static inline long long int _trans_receive_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
258 259 260
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
static inline long long int _trans_receive_pace_external(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_internal(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_dependent(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_external(socket_t conn) {
    return 0;
}

277 278 279 280 281
#else
#  define PROTOCOL "prrt"

typedef PrrtSocket *socket_t;

282
static inline socket_t _open_sender(struct arguments *args) {
Andreas Schmidt's avatar
Andreas Schmidt committed
283
    PrrtSocket *s = PrrtSocket_create(args->size, 100 * 1000);
284
    s->withTimestamp = true;
285 286
    assert(s != NULL && "Socket create failed.");

287
    if (args->thread_pinning) {
288 289
        PrrtSocket_enable_thread_pinning(s);
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
290
    PrrtSocket_set_coding_parameters(s, 1, 1, 0, NULL);
291 292 293 294

    assert(PrrtSocket_bind(s, "0.0.0.0", args->local_port) && "bind failed");

    PrrtSocket_connect(s, args->target, args->port);
295
    return s;
296 297
}

298
static inline socket_t _open_receiver(struct arguments *args) {
Andreas Schmidt's avatar
Andreas Schmidt committed
299
    socket_t s = PrrtSocket_create(args->size, HALF_TIMESTAMP - 1);
300 301
    assert(s != NULL && "Could not create socket.");

302
    if (args->thread_pinning) {
303 304 305 306
        PrrtSocket_enable_thread_pinning(s);
    }

    assert(PrrtSocket_bind(s, "0.0.0.0", args->port) && "bind failed");
307
    return s;
308 309
}

Andreas Schmidt's avatar
Andreas Schmidt committed
310
static inline ssize_t _send(socket_t conn, const char *buf, size_t size) {
Andreas Schmidt's avatar
Andreas Schmidt committed
311
    PrrtSocket_send_sync(conn, (unsigned char *) buf, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
312
    return size;
313 314
}

315
static inline ssize_t _recv(socket_t conn, char *buf, size_t size) {
316
    struct sockaddr_in addr;
Andreas Schmidt's avatar
Andreas Schmidt committed
317
    return PrrtSocket_receive_asap_wait(conn, buf, (struct sockaddr*) &addr);
318 319
}

320
static inline void _close(socket_t conn) {
321 322 323 324
    PrrtSocket_close(conn);
    free(conn);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
325 326 327 328 329 330 331 332
static inline long long int _send_peer_btl_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "send_peer_btl_pace");
}

static inline long long int _recv_peer_btl_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "recv_peer_btl_pace");
}

Andreas Schmidt's avatar
Andreas Schmidt committed
333 334 335 336
static inline long long int _nw_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "nw_pace");
}

Andreas Schmidt's avatar
Andreas Schmidt committed
337 338 339 340 341 342
static inline long long int _app_send_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_internal");
}

static inline long long int _app_send_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
343 344
}

Andreas Schmidt's avatar
Andreas Schmidt committed
345 346
static inline long long int _app_send_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
347 348
}

Andreas Schmidt's avatar
Andreas Schmidt committed
349 350
static inline long long int _trans_transmit_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_internal");
Andreas Schmidt's avatar
Andreas Schmidt committed
351 352
}

Andreas Schmidt's avatar
Andreas Schmidt committed
353 354
static inline long long int _trans_transmit_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
355 356
}

Andreas Schmidt's avatar
Andreas Schmidt committed
357 358
static inline long long int _trans_transmit_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
359 360
}

Andreas Schmidt's avatar
Andreas Schmidt committed
361 362
static inline long long int _trans_receive_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_internal");
Andreas Schmidt's avatar
Andreas Schmidt committed
363 364
}

Andreas Schmidt's avatar
Andreas Schmidt committed
365 366
static inline long long int _trans_receive_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
367 368
}

Andreas Schmidt's avatar
Andreas Schmidt committed
369 370
static inline long long int _trans_receive_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
371 372
}

Andreas Schmidt's avatar
Andreas Schmidt committed
373 374 375 376 377 378 379 380 381 382 383
static inline long long int _app_deliver_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_internal");
}

static inline long long int _app_deliver_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_dependent");
}

static inline long long int _app_deliver_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_external");
}
Andreas Schmidt's avatar
Andreas Schmidt committed
384

385 386
#endif

387 388 389 390 391
static inline socket_t _open(bool sender, struct arguments *args) {
    if (sender)
        return _open_sender(args);
    else
        return _open_receiver(args);
392
}