Commit 96a298a8 authored by Andreas Schmidt's avatar Andreas Schmidt

Add evaluation code and data for NSDI'18.

parent 7817e094
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
data_files:
sender: "sender.csv"
receiver: "receiver.csv"
threads:
- app_send
- trans_send
- trans_recv
- app_recv
cycle_reference:
app_send:
Start: PrrtSendStart
Stop: PrrtSendEnd
trans_send:
Start: PrrtTransmitStart
Stop: LinkTransmitEnd
trans_recv:
Start: LinkReceive
Stop: PrrtReturnPackage
app_recv:
Start: PrrtReceivePackage
Stop: PrrtDeliver
time_reference:
sender:
Start: PrrtSendStart
Stop: LinkTransmitEnd
receiver:
Start: LinkReceive
Stop: PrrtDeliver
stamps:
PrrtSendStart:
Source: sender
Thread: app_send
Type: time
PrrtSubmitPackage:
Source: sender
Thread: app_send
Type: cycle
PrrtSendEnd:
Source: sender
Thread: app_send
Type: time
PrrtTransmitStart:
Source: sender
Thread: trans_send
Type: time
PrrtTransmitEnd:
Source: sender
Thread: trans_send
Type: cycle
PrrtEncodeStart:
Source: sender
Thread: trans_send
Type: cycle
PrrtEncodeEnd:
Source: sender
Thread: trans_send
Type: cycle
LinkTransmitStart:
Source: sender
Thread: trans_send
Type: cycle
ChannelTransmit:
Source: sender
Thread: trans_send
Type: time
LinkTransmitEnd:
Source: sender
Thread: trans_send
Type: time
ChannelReceive:
Source: receiver
Thread: trans_recv
Type: time
LinkReceive:
Source: receiver
Thread: trans_recv
Type: time
SendFeedbackStart:
Source: receiver
Thread: trans_recv
Type: cycle
SendFeedbackEnd:
Source: receiver
Thread: trans_recv
Type: cycle
DecodeStart:
Source: receiver
Thread: trans_recv
Type: cycle
DecodeEnd:
Source: receiver
Thread: trans_recv
Type: cycle
HandlePacketStart:
Source: receiver
Thread: trans_recv
Type: cycle
HandlePacketEnd:
Source: receiver
Thread: trans_recv
Type: cycle
PrrtReturnPackage:
Source: receiver
Thread: trans_recv
Type: time
PrrtDeliver:
Source: receiver
Thread: app_recv
Type: time
CopyOutputStart:
Source: receiver
Thread: app_recv
Type: cycle
CopyOutputEnd:
Source: receiver
Thread: app_recv
Type: cycle
PrrtReceivePackage:
Source: receiver
Thread: app_recv
Type: time
durations:
Send:
Start: PrrtSendStart
Stop: PrrtSendEnd
Source: sender
PrrtTransmit:
Start: PrrtTransmitStart
Stop: PrrtTransmitEnd
Source: sender
LinkTransmit:
Start: LinkTransmitStart
Stop: LinkTransmitEnd
Source: sender
Submit:
Start: PrrtSendStart
Stop: PrrtSubmitPackage
Source: sender
Enqueue:
Start: PrrtSubmitPackage
Stop: PrrtSendEnd
Source: sender
SenderIPC:
Start: PrrtSubmitPackage
Stop: PrrtTransmitStart
Source: sender
SenderEnqueued:
Start: PrrtSendEnd
Stop: LinkTransmitStart
Source: sender
ReceiverIPC:
Start: PrrtReturnPackage
Stop: PrrtReceivePackage
Source: receiver
HandlePacket:
Start: HandlePacketStart
Stop: HandlePacketEnd
Source: receiver
Feedback:
Start: SendFeedbackStart
Stop: SendFeedbackEnd
Source: receiver
Decoding:
Start: DecodeStart
Stop: DecodeEnd
Source: receiver
packet_types:
Data: 0
Redundancy: 1
......@@ -24,7 +24,8 @@ static inline unsigned long long timestampByTime(struct timespec *ts)
void XlapTimestampTableDump(FILE *out, XlapTimestampPacketKind kind, XlapTimestampTable *table)
{
# define OUT(id) fprintf(out, ",%llu,%llu", timestampByTime(&table->rows[row].time[ts_##id].actual.t), (unsigned long long) table->rows[row].time[ts_##id].actual.c);
for (unsigned int row = 0; row < TS_ROWS; row++) {
// Start at 1 to remove the scratch row 0 from output.
for (unsigned int row = 1; row < TS_ROWS; row++) {
fprintf(out, "%u,%u", row, (unsigned) kind);
PP_foreach(PP_join_space, OUT, TIMESTAMP_ID_LIST)
fprintf(out, "\n");
......
This diff is collapsed.
data_files:
sender: "rtn2017/results/on/2017_03_28_09_33_00_Sender.csv"
receiver: "rtn2017/results/on/2017_03_28_09_33_00_Receiver.csv"
sender: "nsdi2018/results/2018_02_14_12_57_01_sender.csv"
receiver: "nsdi2018/results/2018_02_14_12_57_01_receiver.csv"
threads:
- app_send
- trans_send
- trans_recv
- app_recv
cycle_reference:
sender:
app_send:
Start: PrrtSendStart
Stop: PrrtSendEnd
trans_send:
Start: PrrtTransmitStart
Stop: LinkTransmitEnd
receiver:
trans_recv:
Start: LinkReceive
Stop: PrrtReturnPackage
app_recv:
Start: PrrtReceivePackage
Stop: PrrtDeliver
time_reference:
......@@ -18,70 +29,98 @@ time_reference:
stamps:
PrrtSendStart:
Source: sender
Type: time
PrrtSendEnd:
Source: sender
Thread: app_send
Type: time
PrrtSubmitPackage:
Source: sender
Thread: app_send
Type: cycle
PrrtEncodeStart:
Source: sender
Type: cycle
PrrtEncodeEnd:
PrrtSendEnd:
Source: sender
Type: cycle
Thread: app_send
Type: time
PrrtTransmitStart:
Source: sender
Thread: trans_send
Type: time
PrrtTransmitEnd:
Source: sender
Thread: trans_send
Type: cycle
PrrtTransmitStart:
PrrtEncodeStart:
Source: sender
Thread: trans_send
Type: cycle
PrrtTransmitEnd:
PrrtEncodeEnd:
Source: sender
Thread: trans_send
Type: cycle
LinkTransmitStart:
Source: sender
Thread: trans_send
Type: cycle
ChannelTransmit:
Source: sender
Thread: trans_send
Type: time
LinkTransmitEnd:
Source: sender
Thread: trans_send
Type: time
LinkReceive:
ChannelReceive:
Source: receiver
Thread: trans_recv
Type: time
PrrtDeliver:
LinkReceive:
Source: receiver
Thread: trans_recv
Type: time
SendFeedbackStart:
Source: receiver
Thread: trans_recv
Type: cycle
SendFeedbackEnd:
Source: receiver
Thread: trans_recv
Type: cycle
DecodeStart:
Source: receiver
Thread: trans_recv
Type: cycle
DecodeEnd:
Source: receiver
Thread: trans_recv
Type: cycle
HandlePacketStart:
Source: receiver
Thread: trans_recv
Type: cycle
HandlePacketEnd:
Source: receiver
Thread: trans_recv
Type: cycle
PrrtReturnPackage:
Source: receiver
Thread: trans_recv
Type: time
PrrtDeliver:
Source: receiver
Thread: app_recv
Type: time
CopyOutputStart:
Source: receiver
Thread: app_recv
Type: cycle
CopyOutputEnd:
Source: receiver
Type: cycle
PrrtReturnPackage:
Source: receiver
Thread: app_recv
Type: cycle
PrrtReceivePackage:
Source: receiver
Thread: app_recv
Type: time
durations:
Send:
......@@ -129,6 +168,7 @@ durations:
Start: DecodeStart
Stop: DecodeEnd
Source: receiver
packet_types:
Data: 0
Redundancy: 1
......@@ -28,12 +28,13 @@ def jitter_causes(df, durations, export=False, file_name=None):
print("Outliers:", len(outliers), ";", "Threshold[us]:", threshold)
def trace_jitter(data_frame, export=False, file_name=None):
def trace_jitter(data_frame, threshold=None, export=False, file_name=None):
"""
Displays (and saves) a stacked boxplot of durations.
"""
thresh = get_outlier_threshold(data_frame["EndToEnd_D"].describe())
df_no_outliers = data_frame[data_frame["EndToEnd_D"] <= thresh]
if threshold is None:
threshold = get_outlier_threshold(data_frame["EndToEnd_D"].describe())
df_no_outliers = data_frame[data_frame["EndToEnd_D"] <= threshold]
box(df_no_outliers, export, file_name)
print("{} / {} are no outliers.".format(len(df_no_outliers), len(data_frame)))
fig = plt.gcf()
......
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats.stats import pearsonr
import numpy as np
import sys
import graphviz
class LatencyAnalysis():
def __init__(self, cfg=None, hdb=None):
self.cfg = cfg
correlations = []
labels = []
for x in hdb:
correlations += [x["Correlation"]]
labels += ["{} -> {}".format(x["Start"], x["End"])]
corr = pd.Series(correlations, index=labels)
self.corr = corr
def _get_thread_for_event(config, e):
name = str(e)[:-2]
try:
return config["stamps"][name]["Thread"]
except KeyError:
print("Cannot find {}".format(name), file=sys.stderr)
return None
def _happens_before(df, a, b, config):
"""
check if a happens-before b in the trace
"""
# check whether a and b occur in the same thread. If so, a and b cannot be
# concurrent.
ta = _get_thread_for_event(config, a)
tb = _get_thread_for_event(config, b)
cname_a = str(a)[:-2] + "_C"
cname_b = str(b)[:-2] + "_C"
if (ta == tb and ta != None and tb != None):
tg = df[a] > df[b]
if tg.any():
return False
df2 = df[df[a] == df[b]]
return not ((df2[cname_a] > df2[cname_b]) & df2[cname_b] != 0).any()
# since a and b occur in different threads, we cannot compare cyclestamps.
# If in doubt, a and b are concurrent.
return not (df[a] >= df[b]).any()
def _fast_happens_before(df, a, b, hb):
"""
check if a happens-before b, using a pre-computed relation
"""
return any(r['Start'] == str(a) and r['End'] == str(b) for r in hb)
def _happens_directly_before(df, a, b, hb):
"""
check if a happens-directly-before b in the trace
"""
if not _fast_happens_before(df, a, b, hb):
return False
for event in df:
if str(event) == str(a) or str(event) == str(b):
continue
if _fast_happens_before(df, a, event, hb) and _fast_happens_before(df, event, b, hb):
return False
return True
def _locally_happens_directly_before(df, a, b, hb, config):
"""
check if a happens-directly-before b in the trace but, be a little bit more
tolerant regarding intra-thread durations. Consider the following scenario
Thread A: A1 -> A2 -> A3
Thread B: B1 -> B2
This setup can result in the following directly-happens-before graph:
A1 B1
\ /
A2
/ \
A3 B2
In this case, B1 does not happens-directly-before B2 because, by chance, A2
happens inbetween. If we ignore this anomaly, we only analyse <B1,A2> and
<A2,B2>. Both ranges probably have low latency criticality, since their
durations are random (because they depend on thread interleavings).
However, we really want to analyse <B1,B2> because they actually represent
a meaningful range in thread B.
This function therefore computes a modified happens-directly-before graph
with relaxed restrictions for intra-thread ranges:
A1 B1
\ /|
A2 |
/ \|
A3 B2
"""
if not _fast_happens_before(df, a, b, hb):
return False
ta = _get_thread_for_event(config, a)
tb = _get_thread_for_event(config, b)
if ta == tb and ta != None and tb != None:
for c in df:
if _get_thread_for_event(config, c) != ta:
continue
if _fast_happens_before(df, a, c, hb) and _fast_happens_before(df, c, b, hb):
return False
return True
else:
return _happens_directly_before(df, a, b, hb)
def _plot_controlflow_graph(df, hdb):
"""
generate the control flow graph using dot
"""
t_columns = [x for x in df.columns if x.endswith("_T")]
graph = graphviz.Digraph(filename="graph", format="pdf")
for event1 in df[t_columns]:
graph.node(str(event1)[:-2])
for edge in hdb:
graph.edge(edge["Start"][:-2], edge["End"][:-2])
graph.render() # saves to graph.pdf in local folder
return graph
# Taken from: http://composition.al/blog/2015/11/29/a-better-way-to-add-labels-to-bar-charts-with-matplotlib/
def autolabel(rects, ax, labels):
# Get y-axis height to calculate label position from.
(x_left, x_right) = ax.get_xlim()
x_width = x_right - x_left
for i, rect in enumerate(rects):
width = rect.get_width()
color = "black"
align = "left"
# Fraction of axis height taken up by this rectangle
p_width = (width / x_width)
# If we can fit the label above the column, do that;
# otherwise, put it inside the column.
if p_width > 0.50: # arbitrary; 95% looked good to me.
label_position = width - (x_width) + 0.7
color = "white"
align = "right"
else:
label_position = width + (x_width * 0.01)
ax.text(label_position, rect.get_y(), labels[i], ha=align, va='bottom', rotation=0, color=color)
def _plot_critical_regions(hdb):
"""
plot regions, sorted by latency criticality
"""
relevant = sorted([x for x in hdb if x['Correlation'] > 0], key=lambda x: -x['Correlation'], reverse=True)
x = np.arange(len(relevant))
correlations = list(map(lambda x: x['Correlation'], relevant))
ticks = list(map(lambda x: "%s-%s" % (x['Start'][:-2], x['End'][:-2]), relevant))
fig, ax = plt.subplots()
rects = ax.barh(x, correlations, align="center", tick_label="")
autolabel(rects, ax, ticks)
plt.tight_layout()
plt.savefig("latency-criticality.pdf")
plt.close()
def analyse(df, config):
hb = []
events = [column for column in df.columns if column.endswith("_T")]
for event1 in df[events]:
for event2 in df[events]:
if str(event1) == str(event2):
continue
if _happens_before(df, event1, event2, config):
hb += [{'Start': str(event1), 'End': str(event2)}]
hdb = []
e2e = list(df['EndToEnd_D'])
for event1 in df[events]:
for event2 in df[events]:
if str(event1) == str(event2):
continue
# if _locally_happens_directly_before(df, event1, event2, hb, config):
if _happens_directly_before(df, event1, event2, hb):
# compute the correlation between e2e latency and event1-event2 latency
l3 = list(df[event2] - df[event1])
if any(map(lambda x: x != 0, l3)):
correlation = pearsonr(l3, e2e)[0]
else:
correlation = 0
hdb += [{'Start': str(event1), 'End': str(event2), 'Correlation': correlation}]
cfg = _plot_controlflow_graph(df, hdb)
_plot_critical_regions(hdb)
return LatencyAnalysis(cfg=cfg, hdb=hdb)
......@@ -2,9 +2,11 @@ import argparse
from xlap.parse import evaluate, parse_config
import xlap.analyse.jitter as jitter
import xlap.analyse.latency as latency
tasks = {
"jitter": None,
"latency": None,
"capture": None
}
......@@ -39,6 +41,10 @@ def main():
f.write("\n")
else:
print(output)
elif command == "latency":
df_data = evaluate(data_files["sender"], data_files["receiver"], config=config, kind=0)
a = latency.analyse(df_data, config)
print(a.corr.sort_values(ascending=False))
else:
df_data = evaluate(data_files["sender"], data_files["receiver"], config=config, kind=0)
......@@ -53,3 +59,6 @@ def main():
df = jitter.prep(df_data, config)
jitter.trace_jitter(df, **params1)
jitter.jitter_causes(df, config["durations"], **params2)
if __name__ == "__main__":
main()
import pandas as pd
import numpy as np
import ruamel.yaml
import ruamel.yaml as yaml
import logging
import copy
def _stamp_name_by_src_and_type(all_stamps, src, kind=None):
if kind is None:
kind = ["time", "cycle"]
kind = ["time", "cycle", "none"]
return [c for c, v in all_stamps.items() if v["Source"] == src and v["Type"] in kind]
def _stamp_name_by_thread_and_type(all_stamps, thread, kind=None):
if kind is None:
kind = ["time", "cycle"]
return [c for c, v in all_stamps.items() if v["Thread"] == thread and v["Type"] in kind]
def _extract_stamps_by_type(all_stamps, src, kind=None):
def _extract_stamps_by_src_and_kind(all_stamps, src, kind=None):
columns = _stamp_name_by_src_and_type(all_stamps, src, kind)
return [x + "_T" for x in columns] + [x + "_C" for x in columns]
def _evaluate_file(file_name, stamps, kind, sender=False):
# Remove first line, as this is the dummy line for intermittently storing data.
df = pd.read_csv(file_name)[1:]
def _evaluate_file(file_name, stamps, kind, measured_column, sender=False):
df = pd.read_csv(file_name)
df = df[df["Kind"] == kind].drop(["Kind"], axis=1).set_index("SeqNo")
# Drop columns of opposing side.
if sender:
df.drop(_extract_stamps_by_type(stamps, "receiver"), axis=1, inplace=True)
df.drop(_extract_stamps_by_src_and_kind(stamps, "receiver"), axis=1, inplace=True)
else:
df.drop(_extract_stamps_by_type(stamps, "sender"), axis=1, inplace=True)
df.drop(_extract_stamps_by_src_and_kind(stamps, "sender"), axis=1, inplace=True)
# Drop empty rows (as they have probably not been written out).
return df[pd.notnull(df).all(axis=1)]
# Drop rows with value 0 (as they have probably not been written out).
return df[df[measured_column + "_T"] != 0]
def _diff_t_c(df, start, stop):
......@@ -36,49 +41,79 @@ def _diff_t_c(df, start, stop):
return time.astype(float), cycles.astype(float)
def _generate_thread_durations(df, cycle_reference, thread, stamps):
# Generate Processing Duration
src_name = "".join(map(str.capitalize, thread.split("_")))
# Generate Cycle Times
time, cycles = _diff_t_c(df, cycle_reference[thread]["Start"], cycle_reference[thread]["Stop"])
# TODO: Introduce check if both are on the same host.
df[src_name + "_D"] = time
df[src_name + "_C"] = cycles
df[src_name + "Cycle_D"] = time / cycles
# Recreate missing timestamps from cycles
for stamp_name in _stamp_name_by_thread_and_type(stamps, thread, "cycle"):
start_stamp = cycle_reference[thread]["Start"]
diff = df[stamp_name + "_C"] - df[start_stamp + "_C"]
try:
df[stamp_name + "_T"] = (diff * df[src_name + "Cycle_D"] + df[start_stamp + "_T"]).astype(int)
except ValueError as e:
df[stamp_name + "_T"] = np.nan
logging.debug("Stamp '%s' caused a ValueError (Src: %s, Start: %s)", stamp_name, src_name, start_stamp)
def evaluate_side(file, config, side="sender", kind=0):
config = copy.deepcopy(config)
stamps = config["stamps"]
df = _evaluate_file(file, stamps, kind, _stamp_name_by_src_and_type(stamps, "sender", kind=["time"])[0], True)
tr = config["time_reference"]
cr = config["cycle_reference"]
for src in config["threads"]:
_generate_thread_durations(df, cr, src, config["stamps"])
config["durations"] = dict([(x,y) for (x,y) in config["durations"].items() if y["Source"] == side])
_generate_durations(df, config)
df["Sender_D"] = 0
df["Receiver_D"] = 0
df["EndToEnd_D"] = df[tr[side]["Stop"] + "_T"] - df[tr[side]["Start"] + "_T"]
return df
def evaluate(sender_file, receiver_file, config, kind=0):
stamps = config["stamps"]
df1 = _evaluate_file(sender_file, stamps, kind, True)
df2 = _evaluate_file(receiver_file, stamps, kind)
df1 = _evaluate_file(sender_file, stamps, kind, _stamp_name_by_src_and_type(stamps, "sender", kind=["time"])[0], True)
df2 = _evaluate_file(receiver_file, stamps, kind, _stamp_name_by_src_and_type(stamps, "receiver", kind=["time"])[0])
df = df1.join(df2)
tr = config["time_reference"]
cr = config["cycle_reference"]
# Determine Channel Duration
df["Channel_D"] = df[tr["receiver"]["Start"] + "_T"] - df[tr["sender"]["Stop"] + "_T"]
df["Transport_D"] = df[tr["receiver"]["Start"] + "_T"] - df[tr["sender"]["Stop"] + "_T"]
# Correlate Receiver Timestamps with Sender Timestamps (subtracting Channel Duration)
for s in _stamp_name_by_src_and_type(stamps, "receiver", kind=["time"]):
df[s + "_T"] -= df["Channel_D"]
df[s + "_T"] -= df["Transport_D"]
for src in ["sender", "receiver"]:
# Generate Processing Duration
src_name = src.capitalize()
for src in config["threads"]:
_generate_thread_durations(df, cr, src, stamps)
time, cycles = _diff_t_c(df, tr[src]["Start"], tr[src]["Stop"])
df[src_name + "_D"] = time
df[src_name + "_C"] = cycles
_generate_durations(df, config)
# Generate Cycle Times
time, cycles = _diff_t_c(df, cr[src]["Start"], cr[src]["Stop"])
df[src_name + "Cycle_D"] = time / cycles
df["Sender_D"] = df[tr["sender"]["Stop"] + "_T"] - df[tr["sender"]["Start"] + "_T"]
df["Receiver_D"] = df[tr["receiver"]["Stop"] + "_T"] - df[tr["receiver"]["Start"] + "_T"]
df["EndToEnd_D"] = df[tr["receiver"]["Stop"] + "_T"] - df[tr["sender"]["Start"] + "_T"]
# Recreate missing timestamps from cycles
for stamp_name in _stamp_name_by_src_and_type(stamps, src, "cycle"):
start_stamp = tr[src]["Start"]
diff = df[stamp_name + "_C"] - df[start_stamp + "_C"]
df[stamp_name + "_T"] = (diff * df[src_name + "Cycle_D"] + df[start_stamp + "_T"]).astype(int)