Commit b10cd23a authored by Stefan Reif's avatar Stefan Reif
Browse files

Add support for TCP in time-{sender,receiver}

parent 07b5494c
Loading
Loading
Loading
Loading
Loading

prrt/time-protocol.h

0 → 100644
+144 −0
Original line number Diff line number Diff line

#include <assert.h>

#ifdef XLAP
#  error "does not support XLAP right now"
#endif

#ifdef TCP
#
#  define PROTOCOL "tcp"
#  include <sys/types.h>
#  include <sys/socket.h>
#  include <netdb.h>
#  include <unistd.h>

typedef int socket_t;

static inline socket_t _open_sender(struct arguments *args)
{
	struct addrinfo hints = {
	  .ai_socktype = SOCK_STREAM,
	  .ai_family = AF_UNSPEC,
	  .ai_flags = AI_ADDRCONFIG,
	};

	char port[16];
	snprintf(port, sizeof(port), "%d", (int) args->port);

	struct addrinfo *head;
	int fail = getaddrinfo(args->target, port, &hints, &head);
	assert(!fail);
	int sock;
	struct addrinfo *iter;

	for (iter = head; iter != NULL; iter = iter->ai_next) {
		sock = socket(iter->ai_family, iter->ai_socktype, iter->ai_protocol);
		if (0 ==connect(sock, iter->ai_addr, iter->ai_addrlen))
			break;
		close(sock);
	}
	assert(iter && "failed to connect via TCP");
	freeaddrinfo(head);
	return sock;
}

static inline socket_t _open_receiver(struct arguments *args)
{
	int ssock = socket(AF_INET6, SOCK_STREAM, 0);
	assert(ssock >= 0);

	int yes = 1;
	setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));

	struct sockaddr_in6 address = {
		.sin6_family = AF_INET6,
		.sin6_port = htons(args->port),
		.sin6_addr = in6addr_any,
	};
	assert(0 == bind(ssock, (const struct sockaddr *) &address, sizeof(address)));

	assert(0 == listen(ssock, SOMAXCONN));

	int sock = accept(ssock, NULL, NULL);
	assert(sock >= 0);

	close(ssock);

	return sock;
}

static inline void _send(socket_t conn, const char *buf, size_t size)
{
	write(conn, buf, size);
}

static inline ssize_t _recv(socket_t conn, char *buf, size_t size)
{
	return read(conn, buf, size);
}

static inline void _close(socket_t conn)
{
	close(conn);
}

#else
#  define PROTOCOL "prrt"

typedef PrrtSocket *socket_t;

static inline socket_t _open_sender(struct arguments *args)
{
    PrrtSocket *s = PrrtSocket_create(true, 10 * 1000 * 1000);
    assert(s != NULL && "Socket create failed.");

    if(args->thread_pinning) {
        PrrtSocket_enable_thread_pinning(s);
    }

    assert(PrrtSocket_bind(s, "0.0.0.0", args->local_port) && "bind failed");

    PrrtSocket_connect(s, args->target, args->port);
	return s;
}

static inline socket_t _open_receiver(struct arguments *args)
{
    socket_t s = PrrtSocket_create(false, HALF_TIMESTAMP-1);
    assert(s != NULL && "Could not create socket.");

    if(args->thread_pinning) {
        PrrtSocket_enable_thread_pinning(s);
    }

    assert(PrrtSocket_bind(s, "0.0.0.0", args->port) && "bind failed");
	return s;
}

static inline void _send(socket_t conn, const char *buf, size_t size)
{
    PrrtSocket_send(conn, (unsigned char *) buf, sizeof(buf));
}

static inline ssize_t _recv(socket_t conn, char *buf, size_t size)
{
	return PrrtSocket_receive_asap_wait(conn, buf);
}

static inline void _close(socket_t conn)
{
    PrrtSocket_close(conn);
    free(conn);
}

#endif

static inline socket_t _open(bool sender, struct arguments *args)
{
	if (sender)
		return _open_sender(args);
	else
		return _open_receiver(args);
}
+15 −34
Original line number Diff line number Diff line
@@ -18,11 +18,13 @@ static struct argp_option options[] = {
        { 0 }
};

static struct arguments
struct arguments
{
    uint16_t port;
	uint16_t local_port;
    uint16_t rounds;
    char* outfile;
	char *target;
    bool thread_pinning;
};

@@ -70,6 +72,8 @@ static long long timedelta(struct timespec *t1, struct timespec *t2)
	return delta;
}

#include "time-protocol.h"

int main(int argc, char **argv) {
    signal(SIGINT, intHandler);

@@ -101,28 +105,14 @@ int main(int argc, char **argv) {

	long long int *rtts = calloc((size_t) rounds, sizeof(long long int));
	long long int *rtts2 = calloc((size_t) rounds, sizeof(long long int));

    s = PrrtSocket_create(false, HALF_TIMESTAMP-1);
    check(s != NULL, "Could not create socket.");

    if(arguments.thread_pinning) {
        PrrtSocket_enable_thread_pinning(s);
    }

    check(PrrtSocket_bind(s, "0.0.0.0", arguments.port), "bind failed");

    XlapTimestampTable *tstable_data = malloc(sizeof(XlapTimestampTable));
    XlapTimestampTable *tstable_redundancy = malloc(sizeof(XlapTimestampTable));
    check(tstable_data != NULL, "malloc failed");
    check(tstable_redundancy != NULL, "malloc failed");
    XlapTimestampTableInstall(s, ts_data_packet, tstable_data);
    XlapTimestampTableInstall(s, ts_redundancy_packet, tstable_redundancy);
	assert(rtts && rtts2);
	socket_t s = _open(false, &arguments);

    uint32_t i = 0;
    while (i < rounds && keepRunning) {
        char buffer[MAX_PAYLOAD_LENGTH + 1];
        debug(DEBUG_RECEIVER, "About to receive.");
        int n = PrrtSocket_receive_asap_wait(s, buffer);
        int n = _recv(s, buffer, sizeof(buffer));
        if (n < 0) {
            continue;
        }
@@ -131,6 +121,7 @@ int main(int argc, char **argv) {
        struct timespec * old;
        struct timespec * channel;
		clock_gettime(CLOCK_REALTIME, &now);

		sscanf(buffer, "%10d", &round);
		old = (struct timespec *) (buffer + 20);
		channel = (struct timespec *) (buffer + n - sizeof(struct timespec));
@@ -145,25 +136,15 @@ int main(int argc, char **argv) {
    }


    XlapTimestampTableDumpHeader(out_desc);
    XlapTimestampTableDump(out_desc, ts_data_packet, tstable_data);
    XlapTimestampTableDump(out_desc, ts_redundancy_packet, tstable_redundancy);

    if (file_output) {
        fclose(out_desc);
    }

    PrrtSocket_close(s);
	_close(s);
    free(rtts);
    free(rtts2);
    free(s);
    free(tstable_data);
    free(tstable_redundancy);

    pthread_exit(NULL);

    return 0;

    error:
    return -1;
}
+6 −36
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ static struct argp_option options[] = {
        { 0 }
};

static struct arguments
struct arguments
{
    char* target;
    uint16_t port;
@@ -74,6 +74,7 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
    return 0;
}

#include "time-protocol.h"

int main(int argc, char **argv) {
    struct argp argp = { options, parse_opt, args_doc, doc };
@@ -106,29 +107,7 @@ int main(int argc, char **argv) {
    }
    #endif


    PrrtSocket *s = PrrtSocket_create(true, 500 * 1000);
    check(s != NULL, "Socket create failed.");

    if(arguments.thread_pinning) {
        PrrtSocket_enable_thread_pinning(s);
    }

    check(PrrtSocket_bind(s, "0.0.0.0", arguments.local_port), "bind failed");

    PrrtSocket_set_coding_parameters(s, 1, 1, 1, NULL); // comment this line to re-enable coding.
    if(arguments.pacing) {
        PrrtSocket_enable_pacing(s);
    }

    PrrtSocket_connect(s, arguments.target, arguments.port);

    XlapTimestampTable *tstable_data = malloc(sizeof(XlapTimestampTable));
    XlapTimestampTable *tstable_redundancy = malloc(sizeof(XlapTimestampTable));
    check(tstable_data != NULL, "malloc failed");
    check(tstable_redundancy != NULL, "malloc failed");
    XlapTimestampTableInstall(s, ts_data_packet, tstable_data);
    XlapTimestampTableInstall(s, ts_redundancy_packet, tstable_redundancy);
	socket_t s = _open(true, &arguments);

    uint32_t j = 0;
    uint16_t buffer_size = arguments.size;
@@ -140,27 +119,18 @@ int main(int argc, char **argv) {
        sprintf(buf, "%10d", j);
		memcpy(buf + 20, &now, sizeof(struct timespec));

        PrrtSocket_send(s, (unsigned char *) buf, sizeof(buf));
        _send(s, buf, sizeof(buf));

        j++;
    }
    sleep_nano(10);

    XlapTimestampTableDumpHeader(out_desc);
    XlapTimestampTableDump(out_desc, ts_data_packet, tstable_data);
    XlapTimestampTableDump(out_desc, ts_redundancy_packet, tstable_redundancy);

    if (file_output) {
        fflush(out_desc);
        fclose(out_desc);
    }

    PrrtSocket_close(s);
    free(s);
    free(tstable_data);
    free(tstable_redundancy);
    return 0;
	_close(s);

    error:
    return -1;
    return 0;
}