Commit e43934d4 authored by Andreas Schmidt's avatar Andreas Schmidt

Thread pinning is now enabled by a method call.

parent 59b4a78f
Pipeline #1580 failed with stages
in 1 minute and 32 seconds
......@@ -4,12 +4,6 @@ if (DEBUG)
add_definitions(-DDEBUG)
endif()
option(THREAD_PINNING "Sender and receiver threads are pinned to a core." OFF)
if (THREAD_PINNING)
add_definitions(-DTHREAD_PINNING)
endif()
option(XLAP "Run XLAP to trace packets." OFF)
if (XLAP)
......
......@@ -11,9 +11,6 @@ cdef extern from "pthread.h" nogil:
ctypedef struct pthread_cond_t:
pass
cdef extern from "py_defines.h":
pass
cdef extern from "proto/vdmcode/block_code.h":
cdef struct prrtCoder:
pass
......@@ -124,9 +121,12 @@ cdef extern from "proto/socket.h":
int32_t PrrtSocket_timedrecv(PrrtSocket *sock_ptr, void *buf_ptr, const uint32_t wait_time) nogil
bint PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const_char *name, const uint32_t value)
uint32_t PrrtSocket_get_sock_opt(PrrtSocket *sock_ptr, const_char *name)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
uint32_t PrrtSocket_get_rtt(PrrtSocket *socket)
bint PrrtSocket_uses_thread_pinning(PrrtSocket *socket)
bint PrrtSocket_enable_thread_pinning(PrrtSocket *socket)
cdef extern from "proto/stores/receiveDataQueue.h":
ctypedef struct PrrtReceiveDataQueue:
......
......@@ -31,6 +31,8 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
s->isHardwareTimestamping = false;
s->interfaceName = NULL;
s->isThreadPinning = false;
PrrtClock_init(&s->clock);
s->isBound = false;
......@@ -83,6 +85,16 @@ PrrtSocket *PrrtSocket_create(const bool is_sender, prrtTimedelta_t target_delay
return NULL;
}
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s) {
check(s->isBound == false, "Thread pinning can only be enabled before calling PrrtSocket_bind()");
s->isThreadPinning = true;
return true;
error:
PERROR("PrrtSocket_enable_thread_pinning() failed.%s", "");
return false;
}
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name) {
check(s->isBound == false, "Hardware timestamping can only be enabled before calling PrrtSocket_bind()");
s->isHardwareTimestamping = true;
......@@ -160,8 +172,10 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
pthread_attr_init(s->receiveFeedbackThreadAttr);
s->sendDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->sendDataThreadAttr);
pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
pin_thread_to_core(s->sendDataThreadAttr, 2);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveFeedbackThreadAttr, 1);
pin_thread_to_core(s->sendDataThreadAttr, 2);
}
check(pthread_create(&s->receiveFeedbackThread, s->receiveFeedbackThreadAttr,
receive_feedback_loop,
......@@ -174,7 +188,9 @@ bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port)
} else {
s->receiveDataThreadAttr = calloc(1, sizeof(pthread_attr_t));
pthread_attr_init(s->receiveDataThreadAttr);
pin_thread_to_core(s->receiveDataThreadAttr, 3);
if(s->isThreadPinning) {
pin_thread_to_core(s->receiveDataThreadAttr, 3);
}
check(pthread_create(&s->receiveDataThread, s->receiveDataThreadAttr, receive_data_loop,
(void *) s) ==
EXIT_SUCCESS,
......@@ -518,11 +534,7 @@ bool PrrtSocket_cleanup(PrrtSocket *s) {
}
bool PrrtSocket_uses_thread_pinning(PrrtSocket *s) {
#ifdef THREAD_PINNING
return true;
#else
return false;
#endif
return atomic_load_explicit(&s->isThreadPinning, memory_order_acquire);
}
uint32_t PrrtSocket_get_rtt(PrrtSocket *s) {
......
......@@ -72,8 +72,10 @@ typedef struct prrtSocket {
pthread_attr_t *sendDataThreadAttr;
pthread_attr_t *receiveDataThreadAttr;
bool isHardwareTimestamping;
atomic_bool isHardwareTimestamping;
char *interfaceName;
atomic_bool isThreadPinning;
} PrrtSocket;
......@@ -81,6 +83,8 @@ PrrtSocket *PrrtSocket_create(bool is_sender, prrtTimedelta_t target_delay_us);
bool PrrtSocket_enable_hardware_timestamping(PrrtSocket *s, const char * interface_name);
bool PrrtSocket_enable_thread_pinning(PrrtSocket *s);
bool PrrtSocket_bind(PrrtSocket *s, const char *ipAddress, const uint16_t port);
bool PrrtSocket_set_sock_opt(PrrtSocket *s, const char *name, const uint32_t value);
......
......@@ -66,6 +66,9 @@ cdef extern from "util/bptree.c":
cdef extern from "util/bitmap.c":
pass
cdef extern from "util/common.c":
pass
cdef extern from "util/list.c":
pass
......@@ -76,9 +79,11 @@ cdef class PrrtSocket:
cdef cprrt.PrrtSocket* _c_socket
cdef bint isSender
def __cinit__(self, port, isSender, target_delay = 1):
def __cinit__(self, port, isSender, target_delay = 1, thread_pinning = False):
target_delay_us = target_delay * 1000**2
self._c_socket = cprrt.PrrtSocket_create(isSender, target_delay_us)
if thread_pinning:
cprrt.PrrtSocket_enable_thread_pinning(self._c_socket)
cprrt.PrrtSocket_bind(self._c_socket, "0.0.0.0", port)
self.isSender = isSender
......
#ifndef PRRT_PY_DEFINES_H
#define PRRT_PY_DEFINES_H
#define THREAD_PINNING
#endif //PRRT_PY_DEFINES_H
......@@ -14,6 +14,7 @@ static struct argp_option options[] = {
{"port", 'p', "PORT", 0, "Target Port" },
{"rounds", 'r', "ROUNDS", 0, "Rounds" },
{"output", 'o', "FILE", 0, "Output to FILE instead of standard output" },
{"threadpinning", 'T', 0, 0, "Enable thread pinning"},
{"hardwaretimestamping", 777, "INTERFACE", 0, "Enable hardware timestamping and bind to interface"},
{ 0 }
};
......@@ -23,6 +24,7 @@ static struct arguments
uint16_t port;
uint16_t rounds;
char* outfile;
bool thread_pinning;
char* hardwarestamping_interface;
};
......@@ -44,6 +46,9 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'o':
arguments->outfile = arg;
break;
case 'T':
arguments->thread_pinning = true;
break;
case 777:
arguments->hardwarestamping_interface = arg;
break;
......@@ -71,6 +76,7 @@ int main(int argc, char **argv) {
.port = 5000,
.rounds = 127,
.outfile = "-",
.thread_pinning = false,
.hardwarestamping_interface = "-"
};
argp_parse (&argp, argc, argv, 0, 0, &arguments);
......@@ -97,6 +103,11 @@ int main(int argc, char **argv) {
if(strcmp(arguments.hardwarestamping_interface, "-") != 0) {
PrrtSocket_enable_hardware_timestamping(s, arguments.hardwarestamping_interface);
}
if(arguments.thread_pinning) {
PrrtSocket_enable_thread_pinning(s);
}
check(PrrtSocket_bind(s, "0.0.0.0", arguments.port), "bind failed");
XlapTimestampTable *tstable_data = malloc(sizeof(XlapTimestampTable));
......
......@@ -12,6 +12,7 @@ static struct argp_option options[] = {
{"port", 'p', "PORT", 0, "Target Port" },
{"rounds", 'r', "ROUNDS", 0, "Rounds" },
{"output", 'o', "FILE", 0, "Output to FILE instead of standard output" },
{"threadpinning", 'T', 0, 0, "Enable thread pinning"},
{"hardwaretimestamping", 777, "INTERFACE", 0, "Enable hardware timestamping and bind to interface"},
{ 0 }
};
......@@ -22,6 +23,7 @@ static struct arguments
uint16_t port;
uint16_t rounds;
char* outfile;
bool thread_pinning;
char* hardwarestamping_interface;
};
......@@ -46,6 +48,9 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'o':
arguments->outfile = arg;
break;
case 'T':
arguments->thread_pinning = true;
break;
case 777:
arguments->hardwarestamping_interface = arg;
break;
......@@ -68,6 +73,7 @@ int main(int argc, char **argv) {
.port = 5000,
.rounds = 127,
.outfile = "-",
.thread_pinning = false,
.hardwarestamping_interface = "-"
};
argp_parse (&argp, argc, argv, 0, 0, &arguments);
......@@ -90,23 +96,29 @@ int main(int argc, char **argv) {
uint16_t local_port = 6000;
PrrtSocket *socket = PrrtSocket_create(true, 60 * 1000 * 1000);
check(socket != NULL, "Socket create failed.");
PrrtSocket *s = PrrtSocket_create(true, 60 * 1000 * 1000);
check(s != NULL, "Socket create failed.");
if(strcmp(arguments.hardwarestamping_interface, "-") != 0) {
PrrtSocket_enable_hardware_timestamping(socket, arguments.hardwarestamping_interface);
PrrtSocket_enable_hardware_timestamping(s, arguments.hardwarestamping_interface);
}
check(PrrtSocket_bind(socket, "0.0.0.0", local_port), "bind failed");
//PrrtSocket_set_coding_parameters(socket, 1, 1); // comment this line to re-enable coding.
if(arguments.thread_pinning) {
PrrtSocket_enable_thread_pinning(s);
}
check(PrrtSocket_bind(s, "0.0.0.0", local_port), "bind failed");
//PrrtSocket_set_coding_parameters(s, 1, 1); // comment this line to re-enable coding.
PrrtSocket_connect(socket, arguments.target, arguments.port);
PrrtSocket_connect(s, arguments.target, arguments.port);
XlapTimestampTable *tstable_data = malloc(sizeof(XlapTimestampTable));
XlapTimestampTable *tstable_redundancy = malloc(sizeof(XlapTimestampTable));
check(tstable_data != NULL, "malloc failed");
check(tstable_redundancy != NULL, "malloc failed");
XlapTimestampTableInstall(socket, ts_data_packet, tstable_data);
XlapTimestampTableInstall(socket, ts_redundancy_packet, tstable_redundancy);
XlapTimestampTableInstall(s, ts_data_packet, tstable_data);
XlapTimestampTableInstall(s, ts_redundancy_packet, tstable_redundancy);
uint32_t j = 0;
while (j < rounds) {
......@@ -114,7 +126,7 @@ int main(int argc, char **argv) {
// 1400 bytes.
sprintf(buf, "%1400d", j + 1);
PrrtSocket_send(socket, (unsigned char *) buf, strlen(buf));
PrrtSocket_send(s, (unsigned char *) buf, strlen(buf));
j++;
// Send every 1us, as this is a sensible packet interval.
......@@ -131,8 +143,8 @@ int main(int argc, char **argv) {
fclose(out_desc);
}
PrrtSocket_close(socket);
free(socket);
PrrtSocket_close(s);
free(s);
free(tstable_data);
free(tstable_redundancy);
return 0;
......
#include <stdio.h>
#include <arpa/inet.h>
#include "common.h"
int print_buffer(const char *buf, const int length) {
......@@ -26,3 +27,11 @@ void print_gf(const gf *start, const int len) {
}
printf("\n");
}
void pin_thread_to_core(pthread_attr_t *ap, int core)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core, &cpuset);
pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset);
}
......@@ -7,6 +7,7 @@
int print_buffer(const char *buf, const int length);
void print_gf(const gf *start, const int len);
void pin_thread_to_core(pthread_attr_t *ap, int core);
#define PERROR(fmt, args...) \
printf("PRRT ERROR: \n" fmt, ## args);
......@@ -15,15 +16,4 @@ printf("PRRT ERROR: \n" fmt, ## args);
printf("NOT IMPLEMENTED: %s\n", args); \
exit(0)
#ifdef THREAD_PINNING
#define pin_thread_to_core(ap, core) { \
cpu_set_t cpuset; \
CPU_ZERO(&cpuset); \
CPU_SET(core, &cpuset); \
pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset); \
}
#else
#define pin_thread_to_core(ap, core) { }
#endif
#endif //PRRT_COMMON_H
......@@ -10,7 +10,7 @@ import math
class ReceiverThread(threading.Thread):
def __init__(self, seqnoDigits, packetCount):
threading.Thread.__init__(self)
self.sock = prrt.PrrtSocket(7000, False)
self.sock = prrt.PrrtSocket(7000, False, thread_pinning=True)
self.running = True
self.stats = {}
......@@ -37,7 +37,7 @@ class ReceiverThread(threading.Thread):
class SenderThread(threading.Thread):
def __init__(self, seqnoDigits, packetCount, packetInterval):
threading.Thread.__init__(self)
self.sock = prrt.PrrtSocket(7005, True, 1)
self.sock = prrt.PrrtSocket(7005, True, target_delay=1, thread_pinning=True)
self.packetCount = packetCount
self.seqnoDigits = seqnoDigits
self.packetInterval = packetInterval
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment