time-protocol.h 9.8 KB
Newer Older
1
#include <assert.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include "util/time.h"
3
4
5
6
7
8
9
10
11
12
13
14

#ifdef XLAP
#  error "does not support XLAP right now"
#endif

#ifdef TCP
#
#  define PROTOCOL "tcp"
#  include <sys/types.h>
#  include <sys/socket.h>
#  include <netdb.h>
#  include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
15
#  include <netinet/in.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
16
#  include <netinet/tcp.h>
17
18
19

typedef int socket_t;

Andreas Schmidt's avatar
Andreas Schmidt committed
20
static inline void _time() {
Andreas Schmidt's avatar
Andreas Schmidt committed
21
22
23
    long long       ns; // Milliseconds
    time_t          s;  // Seconds
    struct timespec spec;
Andreas Schmidt's avatar
Andreas Schmidt committed
24

Andreas Schmidt's avatar
Andreas Schmidt committed
25
26
27
28
    clock_gettime(CLOCK_REALTIME, &spec);

    s  = spec.tv_sec;
    ns = spec.tv_nsec;
Andreas Schmidt's avatar
Fix.    
Andreas Schmidt committed
29
    printf("T: %09ld.%09ld\n", s, ns);
Andreas Schmidt's avatar
Andreas Schmidt committed
30
31
}

32
33
static inline socket_t _open_sender(struct arguments *args)
{
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    struct addrinfo hints = {
      .ai_socktype = SOCK_STREAM,
      .ai_family = AF_UNSPEC,
      .ai_flags = AI_ADDRCONFIG,
    };

    char port[16];
    snprintf(port, sizeof(port), "%d", (int) args->port);

    struct addrinfo *head;
    int fail = getaddrinfo(args->target, port, &hints, &head);
    assert(!fail);
    int sock;
    struct addrinfo *iter;
Andreas Schmidt's avatar
Andreas Schmidt committed
48
49
    int send_buffer_size = args->sndbuf;
    uint32_t send_buffer_size_len = sizeof(send_buffer_size);
50
51
52

    for (iter = head; iter != NULL; iter = iter->ai_next) {
        sock = socket(iter->ai_family, iter->ai_socktype, iter->ai_protocol);
Pablo's avatar
Pablo committed
53

54
55
56
57
58
59
60
61
        if(args->optimized) {
          // Disable Nagles algorithm.
          int flag = 1;
          setsockopt(sock, SOL_TCP, TCP_NODELAY, &flag, sizeof(flag));
          setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &send_buffer_size, send_buffer_size_len);
          getsockopt(sock, SOL_SOCKET, SO_SNDBUF, &send_buffer_size, &send_buffer_size_len);
          printf("SO_SNDBUF: %d\n", send_buffer_size);
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
62
        int connect_res = connect(sock, iter->ai_addr, iter->ai_addrlen);
63
        if (connect_res == 0) {
64
            break;
Andreas Schmidt's avatar
Andreas Schmidt committed
65
66
67
        } else {
            printf("CONNECT ERROR: %d, errno: %d\n", connect_res, errno);
        }
68
69
70
71
72
        close(sock);
    }
    assert(iter && "failed to connect via TCP");
    freeaddrinfo(head);
    return sock;
73
74
75
76
}

static inline socket_t _open_receiver(struct arguments *args)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
77
    int ssock = socket(AF_INET, SOCK_STREAM, 0);
78
    assert(ssock >= 0);
79

80
81
82
83
    if(args->optimized) {
      int yes = 1;
      setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
      setsockopt(ssock, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes));
Andreas Schmidt's avatar
Andreas Schmidt committed
84

85
86
87
88
89
90
      int recv_buffer_size = args->rcvbuf;
      uint32_t recv_buffer_size_len = sizeof(recv_buffer_size);
      setsockopt(ssock, SOL_SOCKET, SO_RCVBUF, &recv_buffer_size, recv_buffer_size_len);
      getsockopt(ssock, SOL_SOCKET, SO_RCVBUF, &recv_buffer_size, &recv_buffer_size_len);
      printf("SO_RCVBUF: %d\n", recv_buffer_size);
    }
91

Andreas Schmidt's avatar
Andreas Schmidt committed
92
93
94
    struct sockaddr_in address = {
        .sin_family = AF_INET,
        .sin_port = htons(args->port),
95
    };
Andreas Schmidt's avatar
Andreas Schmidt committed
96
97
    memset(address.sin_zero, '\0', sizeof(address.sin_zero));
    address.sin_addr.s_addr = htonl(INADDR_ANY);
98
    assert(0 == bind(ssock, (const struct sockaddr *) &address, sizeof(address)));
99

100
    assert(0 == listen(ssock, SOMAXCONN));
101

102
103
    int sock = accept(ssock, NULL, NULL);
    assert(sock >= 0);
104

Andreas Schmidt's avatar
Andreas Schmidt committed
105
106
    int res = close(ssock);
    assert(res >= 0);
107

108
    return sock;
109
110
}

Andreas Schmidt's avatar
Andreas Schmidt committed
111
static inline ssize_t _send(socket_t conn, const char *buf, size_t size)
112
{
Andreas Schmidt's avatar
Andreas Schmidt committed
113
    int written = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
114
    ssize_t n = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
115
    while (written < size) {
Andreas Schmidt's avatar
Andreas Schmidt committed
116
        n = send(conn, buf, size - written, 0);
Andreas Schmidt's avatar
Andreas Schmidt committed
117
        if (n < 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
118
            _time();
Andreas Schmidt's avatar
Andreas Schmidt committed
119
            printf("WRITE FAILED: %ld, errno: %d\n", n, errno);
Andreas Schmidt's avatar
Andreas Schmidt committed
120
            printf("%s\n", buf - written);
Andreas Schmidt's avatar
Andreas Schmidt committed
121
            return written;
Andreas Schmidt's avatar
Andreas Schmidt committed
122
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
123
124
125
        written += n;
        buf += n;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
126
    return written;
127
128
129
130
}

static inline ssize_t _recv(socket_t conn, char *buf, size_t size)
{
Andreas Schmidt's avatar
Andreas Schmidt committed
131
    ssize_t received = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
132
    ssize_t n = 0;
133
134
135
136
137
138
139
    while (received < size && keepRunning) {
        fd_set rfds;
        struct timeval tv;
        int retval;
        FD_ZERO(&rfds);
        FD_SET(conn, &rfds);

140
        tv.tv_sec = 1;
141
        tv.tv_usec = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
142
        retval = select(conn + 1, &rfds, NULL, NULL, &tv);
Andreas Schmidt's avatar
Fix.    
Andreas Schmidt committed
143
        if (!FD_ISSET(conn, &rfds)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
144
145
            continue;
        }
Andreas Schmidt's avatar
Andreas Schmidt committed
146
        if (!keepRunning) {
Andreas Schmidt's avatar
Andreas Schmidt committed
147
            printf("No longer running.\n");
Andreas Schmidt's avatar
Andreas Schmidt committed
148
149
            return -1;
        }
150
151

        if (retval == -1) {
Andreas Schmidt's avatar
Andreas Schmidt committed
152
            printf("Select failed: %d.\n", errno);
153
            return -1;
Andreas Schmidt's avatar
Andreas Schmidt committed
154
        } else if (retval > 0) {
155
            n = recv(conn, buf, size - received, 0);
Andreas Schmidt's avatar
Andreas Schmidt committed
156
            if (n < 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
157
158
                _time();
                printf("recv failed: %ld, errno: %d, received: %ld, size: %lu\n", n, errno, received, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
159
160
                return -1;
            }
161
162
163
164
165
166
167
            if (n == 0) {
                break;
            }
            buf += n;
            received += n;
        } else {
            continue;
Andreas Schmidt's avatar
Andreas Schmidt committed
168
169
170
        }
    }
    return received;
171
172
173
174
}

static inline void _close(socket_t conn)
{
Andreas Schmidt's avatar
Fix.    
Andreas Schmidt committed
175
176
    char buffer[10];
    int res;
Andreas Schmidt's avatar
Andreas Schmidt committed
177
178
    shutdown(conn, SHUT_WR);
    for(;;) {
Andreas Schmidt's avatar
Andreas Schmidt committed
179
        res = read(conn, buffer, 1);
Andreas Schmidt's avatar
Andreas Schmidt committed
180
181
182
183
184
185
186
        if(res < 0) {
            perror("reading");
            exit(1);
        }
        if(!res)
            break;
    }
Andreas Schmidt's avatar
Fix.    
Andreas Schmidt committed
187
    res = close(conn);
Andreas Schmidt's avatar
Andreas Schmidt committed
188
    assert(res >= 0);
189
190
}

Andreas Schmidt's avatar
Andreas Schmidt committed
191
192
193
194
195
196
197
198
static inline long long int _send_peer_btl_pace(socket_t conn) {
    return 0;
}

static inline long long int _recv_peer_btl_pace(socket_t conn) {
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
199
200
201
202
static inline long long int _nw_pace(socket_t conn) {
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
203
static inline long long int _app_send_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
204
205
206
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
207
static inline long long int _app_send_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
208
209
210
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
211
static inline long long int _app_send_pace_external(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
212
213
214
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
215
static inline long long int _trans_transmit_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
216
217
218
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
219
static inline long long int _trans_transmit_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
220
221
222
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
223
static inline long long int _trans_transmit_pace_external(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
224
225
226
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
227
static inline long long int _trans_receive_pace_internal(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
228
229
230
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
231
static inline long long int _trans_receive_pace_dependent(socket_t conn) {
Andreas Schmidt's avatar
Andreas Schmidt committed
232
233
234
    return 0;
}

Andreas Schmidt's avatar
Andreas Schmidt committed
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
static inline long long int _trans_receive_pace_external(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_internal(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_dependent(socket_t conn) {
    return 0;
}

static inline long long int _app_deliver_pace_external(socket_t conn) {
    return 0;
}


252
253
254
255
256
#else
#  define PROTOCOL "prrt"

typedef PrrtSocket *socket_t;

257
static inline socket_t _open_sender(struct arguments *args) {
Andreas Schmidt's avatar
Andreas Schmidt committed
258
    PrrtSocket *s = PrrtSocket_create(args->size, 100 * 1000);
259
    s->withTimestamp = true;
260
261
    assert(s != NULL && "Socket create failed.");

262
    if (args->thread_pinning) {
263
264
        PrrtSocket_enable_thread_pinning(s);
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
265
    PrrtSocket_set_coding_parameters(s, 1, 1, 0, NULL);
266
267
268
269

    assert(PrrtSocket_bind(s, "0.0.0.0", args->local_port) && "bind failed");

    PrrtSocket_connect(s, args->target, args->port);
270
    return s;
271
272
}

273
static inline socket_t _open_receiver(struct arguments *args) {
Andreas Schmidt's avatar
Andreas Schmidt committed
274
    socket_t s = PrrtSocket_create(args->size, HALF_TIMESTAMP - 1);
275
276
    assert(s != NULL && "Could not create socket.");

277
    if (args->thread_pinning) {
278
279
280
281
        PrrtSocket_enable_thread_pinning(s);
    }

    assert(PrrtSocket_bind(s, "0.0.0.0", args->port) && "bind failed");
282
    return s;
283
284
}

Andreas Schmidt's avatar
Andreas Schmidt committed
285
static inline ssize_t _send(socket_t conn, const char *buf, size_t size) {
Andreas Schmidt's avatar
Andreas Schmidt committed
286
    PrrtSocket_send_sync(conn, (unsigned char *) buf, size);
Andreas Schmidt's avatar
Andreas Schmidt committed
287
    return size;
288
289
}

290
static inline ssize_t _recv(socket_t conn, char *buf, size_t size) {
291
    struct sockaddr_in addr;
Andreas Schmidt's avatar
Andreas Schmidt committed
292
    return PrrtSocket_receive_asap_wait(conn, buf, (struct sockaddr*) &addr);
293
294
}

295
static inline void _close(socket_t conn) {
296
297
298
299
    PrrtSocket_close(conn);
    free(conn);
}

Andreas Schmidt's avatar
Andreas Schmidt committed
300
301
302
303
304
305
306
307
static inline long long int _send_peer_btl_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "send_peer_btl_pace");
}

static inline long long int _recv_peer_btl_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "recv_peer_btl_pace");
}

Andreas Schmidt's avatar
Andreas Schmidt committed
308
309
310
311
static inline long long int _nw_pace(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "nw_pace");
}

Andreas Schmidt's avatar
Andreas Schmidt committed
312
313
314
315
316
317
static inline long long int _app_send_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_internal");
}

static inline long long int _app_send_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
318
319
}

Andreas Schmidt's avatar
Andreas Schmidt committed
320
321
static inline long long int _app_send_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appSend_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
322
323
}

Andreas Schmidt's avatar
Andreas Schmidt committed
324
325
static inline long long int _trans_transmit_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_internal");
Andreas Schmidt's avatar
Andreas Schmidt committed
326
327
}

Andreas Schmidt's avatar
Andreas Schmidt committed
328
329
static inline long long int _trans_transmit_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
330
331
}

Andreas Schmidt's avatar
Andreas Schmidt committed
332
333
static inline long long int _trans_transmit_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtTransmit_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
334
335
}

Andreas Schmidt's avatar
Andreas Schmidt committed
336
337
static inline long long int _trans_receive_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_internal");
Andreas Schmidt's avatar
Andreas Schmidt committed
338
339
}

Andreas Schmidt's avatar
Andreas Schmidt committed
340
341
static inline long long int _trans_receive_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_dependent");
Andreas Schmidt's avatar
Andreas Schmidt committed
342
343
}

Andreas Schmidt's avatar
Andreas Schmidt committed
344
345
static inline long long int _trans_receive_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "prrtReceive_pace_external");
Andreas Schmidt's avatar
Andreas Schmidt committed
346
347
}

Andreas Schmidt's avatar
Andreas Schmidt committed
348
349
350
351
352
353
354
355
356
357
358
static inline long long int _app_deliver_pace_internal(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_internal");
}

static inline long long int _app_deliver_pace_dependent(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_dependent");
}

static inline long long int _app_deliver_pace_external(socket_t conn) {
    return PrrtSocket_get_sock_opt(conn, "appDeliver_pace_external");
}
Andreas Schmidt's avatar
Andreas Schmidt committed
359

360
361
#endif

362
363
364
365
366
static inline socket_t _open(bool sender, struct arguments *args) {
    if (sender)
        return _open_sender(args);
    else
        return _open_receiver(args);
367
}