Files
eigen/ci/scripts/detect_regressions.py
2026-03-29 16:03:56 -07:00

395 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""Benchmark regression detection using Welch's t-test.
Compares the current benchmark run against historical data stored on
the perf-data git branch. A regression is flagged when:
1. Welch's t-test p-value < significance threshold (default 0.01)
2. The relative change exceeds a minimum percentage (default 5%)
3. The direction is a slowdown (higher real_time)
Exit codes:
0 no regressions
1 regressions detected
2 error
"""
import argparse
import glob
import json
import os
import subprocess
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict, namedtuple
# scipy is the only external dependency (pip-installed in the CI job).
from scipy.stats import ttest_ind
Regression = namedtuple(
"Regression",
["target", "key", "current_mean", "historical_mean", "change_pct", "p_value"],
)
def parse_args():
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--results-dir",
required=True,
help="Directory containing current run JSON files.",
)
p.add_argument(
"--perf-branch",
default="perf-data",
help="Git branch storing historical benchmark data.",
)
p.add_argument(
"--history-count",
type=int,
default=14,
help="Number of past runs to compare against.",
)
p.add_argument(
"--significance",
type=float,
default=0.01,
help="P-value threshold for Welch's t-test.",
)
p.add_argument(
"--min-change-pct",
type=float,
default=5.0,
help="Minimum percentage change to flag.",
)
p.add_argument(
"--output-report",
default="regression_report.txt",
help="Path for text report.",
)
return p.parse_args()
def clone_perf_branch(branch, clone_dir):
"""Shallow-clone the perf-data branch. Returns True on success."""
# Construct clone URL from CI environment or fall back to current remote.
url = os.environ.get("CI_REPOSITORY_URL", "")
if not url:
try:
url = subprocess.check_output(
["git", "remote", "get-url", "origin"], text=True
).strip()
except Exception:
return False
try:
subprocess.check_call(
[
"git",
"clone",
"--depth=1",
"--single-branch",
"--branch",
branch,
url,
clone_dir,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except subprocess.CalledProcessError:
return False
def _history_sort_key(fpath):
"""Sort key for historical result files.
Prefer the recorded UTC timestamp in the JSON metadata. Fall back to the
filename so older date-only files still participate in the history window.
"""
try:
with open(fpath) as f:
metadata = json.load(f).get("metadata", {})
except Exception:
metadata = {}
return metadata.get("timestamp") or metadata.get("date") or os.path.basename(fpath)
def load_historical_data(perf_dir, target, history_count):
"""Load per-repetition real_time values from the last *history_count* runs.
Returns dict: benchmark_key -> list of raw real_time values (multiple per run).
We load the same non-aggregate rows that load_current_results uses so both
sides of the t-test contain the same kind of measurement (individual
repetitions), avoiding a unit mismatch between per-rep and per-run means.
"""
target_dir = os.path.join(perf_dir, target)
if not os.path.isdir(target_dir):
return {}
files = sorted(
glob.glob(os.path.join(target_dir, "*.json")),
key=_history_sort_key,
reverse=True,
)
files = files[:history_count]
history = defaultdict(list)
for fpath in files:
with open(fpath) as f:
data = json.load(f)
for exe_name, exe_data in data.get("files", {}).items():
for bm in exe_data.get("benchmarks", []):
run_type = bm.get("run_type", "")
if run_type == "aggregate":
continue
name = bm.get("name", "")
key = f"{exe_name}/{name}"
rt = bm.get("real_time")
if rt is not None:
history[key].append(rt)
return history
def load_current_results(results_dir):
"""Load current run results, keyed by target.
Returns dict: target -> dict(benchmark_key -> list of per-repetition real_time).
"""
data = defaultdict(lambda: defaultdict(list))
for jf in sorted(glob.glob(os.path.join(results_dir, "*_*_*.json"))):
with open(jf) as f:
run = json.load(f)
meta = run.get("metadata", {})
target = meta.get("target", "unknown")
for exe_name, exe_data in run.get("files", {}).items():
for bm in exe_data.get("benchmarks", []):
name = bm.get("name", "")
run_type = bm.get("run_type", "")
# Use individual iteration rows (not aggregates) for the
# current run so we have per-repetition samples.
if run_type == "aggregate":
continue
key = f"{exe_name}/{name}"
rt = bm.get("real_time")
if rt is not None:
data[target][key].append(rt)
return data
def find_regressions(current, historical, significance, min_change_pct):
"""Compare current vs historical using Welch's t-test.
Returns (regressions, improvements, skipped_count).
"""
regressions = []
improvements = []
skipped = 0
for key, current_values in sorted(current.items()):
hist_values = historical.get(key)
if not hist_values or len(hist_values) < 5:
skipped += 1
continue
if len(current_values) < 3:
skipped += 1
continue
cur_mean = sum(current_values) / len(current_values)
hist_mean = sum(hist_values) / len(hist_values)
if hist_mean == 0:
skipped += 1
continue
change_pct = (cur_mean - hist_mean) / hist_mean * 100.0
_, p_value = ttest_ind(current_values, hist_values, equal_var=False)
entry = Regression(
target="", # filled in by caller
key=key,
current_mean=cur_mean,
historical_mean=hist_mean,
change_pct=change_pct,
p_value=p_value,
)
if p_value < significance and abs(change_pct) > min_change_pct:
if change_pct > 0:
# Higher real_time = slower = regression.
regressions.append(entry)
else:
improvements.append(entry)
return regressions, improvements, skipped
def _qualified_key(r):
"""Target-qualified display key, e.g. '[x86-64-avx2] bench_gemm/BM_Gemm/256'."""
return f"[{r.target}] {r.key}"
def write_text_report(regressions, improvements, skipped, total, path):
"""Write a human-readable summary."""
with open(path, "w") as f:
f.write("# Benchmark Regression Report\n\n")
if regressions:
f.write(f"## Regressions ({len(regressions)})\n\n")
f.write(
f"{'Benchmark':<70s} {'Historical':>12s} {'Current':>12s} "
f"{'Change':>8s} {'p-value':>8s}\n"
)
f.write("-" * 114 + "\n")
for r in sorted(regressions, key=lambda x: -x.change_pct):
f.write(
f"{_qualified_key(r):<70s} {r.historical_mean:>12.1f} {r.current_mean:>12.1f} "
f"{r.change_pct:>+7.1f}% {r.p_value:>8.4f}\n"
)
f.write("\n")
if improvements:
f.write(f"## Improvements ({len(improvements)})\n\n")
f.write(
f"{'Benchmark':<70s} {'Historical':>12s} {'Current':>12s} "
f"{'Change':>8s} {'p-value':>8s}\n"
)
f.write("-" * 114 + "\n")
for r in sorted(improvements, key=lambda x: x.change_pct):
f.write(
f"{_qualified_key(r):<70s} {r.historical_mean:>12.1f} {r.current_mean:>12.1f} "
f"{r.change_pct:>+7.1f}% {r.p_value:>8.4f}\n"
)
f.write("\n")
f.write(f"## Summary\n\n")
f.write(f"- Benchmarks analyzed: {total}\n")
f.write(f"- Regressions: {len(regressions)}\n")
f.write(f"- Improvements: {len(improvements)}\n")
f.write(f"- Skipped (insufficient data): {skipped}\n")
def write_junit_report(regressions, analyzed_keys, path):
"""Write JUnit XML so GitLab displays results in the test report tab.
Keys in *analyzed_keys* and regression entries are target-qualified
(e.g. "[x86-64-avx2] bench_gemm/BM_Gemm/256") so the same benchmark
on different ISA targets appears as separate test cases.
"""
suite = ET.Element(
"testsuite",
name="benchmark-regressions",
tests=str(len(analyzed_keys)),
failures=str(len(regressions)),
)
regression_by_qkey = {_qualified_key(r): r for r in regressions}
for key in sorted(analyzed_keys):
tc = ET.SubElement(suite, "testcase", name=key, classname="benchmark")
r = regression_by_qkey.get(key)
if r is not None:
ET.SubElement(
tc,
"failure",
message=f"{r.change_pct:+.1f}% regression (p={r.p_value:.4f})",
).text = (
f"historical_mean={r.historical_mean:.1f} "
f"current_mean={r.current_mean:.1f} "
f"change={r.change_pct:+.1f}% p={r.p_value:.6f}"
)
tree = ET.ElementTree(suite)
ET.indent(tree)
tree.write(path, xml_declaration=True, encoding="utf-8")
def main():
args = parse_args()
results_dir = args.results_dir
# Load current results (keyed by target).
current_by_target = load_current_results(results_dir)
if not current_by_target:
print("No current benchmark results found.")
sys.exit(2)
total_benchmarks = sum(len(v) for v in current_by_target.values())
print(f"Loaded {total_benchmarks} benchmarks from current run.")
print(f"Targets: {', '.join(sorted(current_by_target.keys()))}")
# Clone historical data.
perf_dir = "/tmp/perf-data-history"
has_history = clone_perf_branch(args.perf_branch, perf_dir)
if not has_history:
print("No historical data found (perf-data branch missing).")
print("This is expected on the first run. Storing baseline only.")
sys.exit(0)
# Run analysis per target.
all_regressions = []
all_improvements = []
total_analyzed = 0
total_skipped = 0
all_keys = set()
for target in sorted(current_by_target.keys()):
target_current = current_by_target[target]
historical = load_historical_data(perf_dir, target, args.history_count)
if not historical:
print(f" {target}: no historical data, skipping analysis.")
continue
regs, imps, skipped = find_regressions(
target_current, historical, args.significance, args.min_change_pct
)
# Tag regressions with the target.
regs = [r._replace(target=target) for r in regs]
imps = [r._replace(target=target) for r in imps]
all_regressions.extend(regs)
all_improvements.extend(imps)
total_analyzed += len(target_current) - skipped
total_skipped += skipped
# Use target-qualified keys so the same benchmark on different ISAs
# shows up as separate entries in reports.
all_keys.update(f"[{target}] {k}" for k in target_current)
print(
f" {target}: {len(regs)} regressions, "
f"{len(imps)} improvements, {skipped} skipped"
)
# Write reports.
report_path = args.output_report
write_text_report(
all_regressions, all_improvements, total_skipped, total_analyzed, report_path
)
print(f"\nText report: {report_path}")
junit_path = report_path.replace(".txt", ".xml")
write_junit_report(all_regressions, all_keys, junit_path)
print(f"JUnit report: {junit_path}")
# Print summary and exit.
if all_regressions:
print(f"\nREGRESSIONS DETECTED: {len(all_regressions)} benchmark(s)")
for r in all_regressions:
print(f" [{r.target}] {r.key}: {r.change_pct:+.1f}% (p={r.p_value:.4f})")
sys.exit(1)
else:
n_imp = len(all_improvements)
print(f"\nNo regressions detected. {n_imp} improvement(s) found.")
sys.exit(0)
if __name__ == "__main__":
main()