#!/usr/bin/env python3 """Benchmark regression detection using Welch's t-test. Compares the current benchmark run against historical data stored on the perf-data git branch. A regression is flagged when: 1. Welch's t-test p-value < significance threshold (default 0.01) 2. The relative change exceeds a minimum percentage (default 5%) 3. The direction is a slowdown (higher real_time) Exit codes: 0 no regressions 1 regressions detected 2 error """ import argparse import glob import json import os import subprocess import sys import xml.etree.ElementTree as ET from collections import defaultdict, namedtuple # scipy is the only external dependency (pip-installed in the CI job). from scipy.stats import ttest_ind Regression = namedtuple( "Regression", ["target", "key", "current_mean", "historical_mean", "change_pct", "p_value"], ) def parse_args(): p = argparse.ArgumentParser(description=__doc__) p.add_argument( "--results-dir", required=True, help="Directory containing current run JSON files.", ) p.add_argument( "--perf-branch", default="perf-data", help="Git branch storing historical benchmark data.", ) p.add_argument( "--history-count", type=int, default=14, help="Number of past runs to compare against.", ) p.add_argument( "--significance", type=float, default=0.01, help="P-value threshold for Welch's t-test.", ) p.add_argument( "--min-change-pct", type=float, default=5.0, help="Minimum percentage change to flag.", ) p.add_argument( "--output-report", default="regression_report.txt", help="Path for text report.", ) return p.parse_args() def clone_perf_branch(branch, clone_dir): """Shallow-clone the perf-data branch. Returns True on success.""" # Construct clone URL from CI environment or fall back to current remote. url = os.environ.get("CI_REPOSITORY_URL", "") if not url: try: url = subprocess.check_output( ["git", "remote", "get-url", "origin"], text=True ).strip() except Exception: return False try: subprocess.check_call( [ "git", "clone", "--depth=1", "--single-branch", "--branch", branch, url, clone_dir, ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except subprocess.CalledProcessError: return False def _history_sort_key(fpath): """Sort key for historical result files. Prefer the recorded UTC timestamp in the JSON metadata. Fall back to the filename so older date-only files still participate in the history window. """ try: with open(fpath) as f: metadata = json.load(f).get("metadata", {}) except Exception: metadata = {} return metadata.get("timestamp") or metadata.get("date") or os.path.basename(fpath) def load_historical_data(perf_dir, target, history_count): """Load per-repetition real_time values from the last *history_count* runs. Returns dict: benchmark_key -> list of raw real_time values (multiple per run). We load the same non-aggregate rows that load_current_results uses so both sides of the t-test contain the same kind of measurement (individual repetitions), avoiding a unit mismatch between per-rep and per-run means. """ target_dir = os.path.join(perf_dir, target) if not os.path.isdir(target_dir): return {} files = sorted( glob.glob(os.path.join(target_dir, "*.json")), key=_history_sort_key, reverse=True, ) files = files[:history_count] history = defaultdict(list) for fpath in files: with open(fpath) as f: data = json.load(f) for exe_name, exe_data in data.get("files", {}).items(): for bm in exe_data.get("benchmarks", []): run_type = bm.get("run_type", "") if run_type == "aggregate": continue name = bm.get("name", "") key = f"{exe_name}/{name}" rt = bm.get("real_time") if rt is not None: history[key].append(rt) return history def load_current_results(results_dir): """Load current run results, keyed by target. Returns dict: target -> dict(benchmark_key -> list of per-repetition real_time). """ data = defaultdict(lambda: defaultdict(list)) for jf in sorted(glob.glob(os.path.join(results_dir, "*_*_*.json"))): with open(jf) as f: run = json.load(f) meta = run.get("metadata", {}) target = meta.get("target", "unknown") for exe_name, exe_data in run.get("files", {}).items(): for bm in exe_data.get("benchmarks", []): name = bm.get("name", "") run_type = bm.get("run_type", "") # Use individual iteration rows (not aggregates) for the # current run so we have per-repetition samples. if run_type == "aggregate": continue key = f"{exe_name}/{name}" rt = bm.get("real_time") if rt is not None: data[target][key].append(rt) return data def find_regressions(current, historical, significance, min_change_pct): """Compare current vs historical using Welch's t-test. Returns (regressions, improvements, skipped_count). """ regressions = [] improvements = [] skipped = 0 for key, current_values in sorted(current.items()): hist_values = historical.get(key) if not hist_values or len(hist_values) < 5: skipped += 1 continue if len(current_values) < 3: skipped += 1 continue cur_mean = sum(current_values) / len(current_values) hist_mean = sum(hist_values) / len(hist_values) if hist_mean == 0: skipped += 1 continue change_pct = (cur_mean - hist_mean) / hist_mean * 100.0 _, p_value = ttest_ind(current_values, hist_values, equal_var=False) entry = Regression( target="", # filled in by caller key=key, current_mean=cur_mean, historical_mean=hist_mean, change_pct=change_pct, p_value=p_value, ) if p_value < significance and abs(change_pct) > min_change_pct: if change_pct > 0: # Higher real_time = slower = regression. regressions.append(entry) else: improvements.append(entry) return regressions, improvements, skipped def _qualified_key(r): """Target-qualified display key, e.g. '[x86-64-avx2] bench_gemm/BM_Gemm/256'.""" return f"[{r.target}] {r.key}" def write_text_report(regressions, improvements, skipped, total, path): """Write a human-readable summary.""" with open(path, "w") as f: f.write("# Benchmark Regression Report\n\n") if regressions: f.write(f"## Regressions ({len(regressions)})\n\n") f.write( f"{'Benchmark':<70s} {'Historical':>12s} {'Current':>12s} " f"{'Change':>8s} {'p-value':>8s}\n" ) f.write("-" * 114 + "\n") for r in sorted(regressions, key=lambda x: -x.change_pct): f.write( f"{_qualified_key(r):<70s} {r.historical_mean:>12.1f} {r.current_mean:>12.1f} " f"{r.change_pct:>+7.1f}% {r.p_value:>8.4f}\n" ) f.write("\n") if improvements: f.write(f"## Improvements ({len(improvements)})\n\n") f.write( f"{'Benchmark':<70s} {'Historical':>12s} {'Current':>12s} " f"{'Change':>8s} {'p-value':>8s}\n" ) f.write("-" * 114 + "\n") for r in sorted(improvements, key=lambda x: x.change_pct): f.write( f"{_qualified_key(r):<70s} {r.historical_mean:>12.1f} {r.current_mean:>12.1f} " f"{r.change_pct:>+7.1f}% {r.p_value:>8.4f}\n" ) f.write("\n") f.write(f"## Summary\n\n") f.write(f"- Benchmarks analyzed: {total}\n") f.write(f"- Regressions: {len(regressions)}\n") f.write(f"- Improvements: {len(improvements)}\n") f.write(f"- Skipped (insufficient data): {skipped}\n") def write_junit_report(regressions, analyzed_keys, path): """Write JUnit XML so GitLab displays results in the test report tab. Keys in *analyzed_keys* and regression entries are target-qualified (e.g. "[x86-64-avx2] bench_gemm/BM_Gemm/256") so the same benchmark on different ISA targets appears as separate test cases. """ suite = ET.Element( "testsuite", name="benchmark-regressions", tests=str(len(analyzed_keys)), failures=str(len(regressions)), ) regression_by_qkey = {_qualified_key(r): r for r in regressions} for key in sorted(analyzed_keys): tc = ET.SubElement(suite, "testcase", name=key, classname="benchmark") r = regression_by_qkey.get(key) if r is not None: ET.SubElement( tc, "failure", message=f"{r.change_pct:+.1f}% regression (p={r.p_value:.4f})", ).text = ( f"historical_mean={r.historical_mean:.1f} " f"current_mean={r.current_mean:.1f} " f"change={r.change_pct:+.1f}% p={r.p_value:.6f}" ) tree = ET.ElementTree(suite) ET.indent(tree) tree.write(path, xml_declaration=True, encoding="utf-8") def main(): args = parse_args() results_dir = args.results_dir # Load current results (keyed by target). current_by_target = load_current_results(results_dir) if not current_by_target: print("No current benchmark results found.") sys.exit(2) total_benchmarks = sum(len(v) for v in current_by_target.values()) print(f"Loaded {total_benchmarks} benchmarks from current run.") print(f"Targets: {', '.join(sorted(current_by_target.keys()))}") # Clone historical data. perf_dir = "/tmp/perf-data-history" has_history = clone_perf_branch(args.perf_branch, perf_dir) if not has_history: print("No historical data found (perf-data branch missing).") print("This is expected on the first run. Storing baseline only.") sys.exit(0) # Run analysis per target. all_regressions = [] all_improvements = [] total_analyzed = 0 total_skipped = 0 all_keys = set() for target in sorted(current_by_target.keys()): target_current = current_by_target[target] historical = load_historical_data(perf_dir, target, args.history_count) if not historical: print(f" {target}: no historical data, skipping analysis.") continue regs, imps, skipped = find_regressions( target_current, historical, args.significance, args.min_change_pct ) # Tag regressions with the target. regs = [r._replace(target=target) for r in regs] imps = [r._replace(target=target) for r in imps] all_regressions.extend(regs) all_improvements.extend(imps) total_analyzed += len(target_current) - skipped total_skipped += skipped # Use target-qualified keys so the same benchmark on different ISAs # shows up as separate entries in reports. all_keys.update(f"[{target}] {k}" for k in target_current) print( f" {target}: {len(regs)} regressions, " f"{len(imps)} improvements, {skipped} skipped" ) # Write reports. report_path = args.output_report write_text_report( all_regressions, all_improvements, total_skipped, total_analyzed, report_path ) print(f"\nText report: {report_path}") junit_path = report_path.replace(".txt", ".xml") write_junit_report(all_regressions, all_keys, junit_path) print(f"JUnit report: {junit_path}") # Print summary and exit. if all_regressions: print(f"\nREGRESSIONS DETECTED: {len(all_regressions)} benchmark(s)") for r in all_regressions: print(f" [{r.target}] {r.key}: {r.change_pct:+.1f}% (p={r.p_value:.4f})") sys.exit(1) else: n_imp = len(all_improvements) print(f"\nNo regressions detected. {n_imp} improvement(s) found.") sys.exit(0) if __name__ == "__main__": main()