#!/usr/bin/env python3 """ analyze_epoch_tail_csv.py Compute correct tail proxy statistics from Phase 51/52 epoch CSV. Input CSV (from scripts/soak_mixed_single_process.sh): epoch,iter,throughput_ops_s,rss_mb Key points: - Tail in throughput space is the *LOW* tail (p1/p0.1), not p99. - Tail in latency space is the *HIGH* tail (p99/p999), computed from per-epoch latency values: latency_ns = 1e9 / throughput_ops_s - Do NOT compute latency percentiles as 1e9 / throughput_percentile (nonlinear + order inversion). """ from __future__ import annotations import argparse import csv import math from dataclasses import dataclass from typing import List, Tuple def percentile(sorted_vals: List[float], p: float) -> float: if not sorted_vals: return float("nan") if p <= 0: return sorted_vals[0] if p >= 100: return sorted_vals[-1] # linear interpolation between closest ranks k = (len(sorted_vals) - 1) * (p / 100.0) f = math.floor(k) c = math.ceil(k) if f == c: return sorted_vals[int(k)] d0 = sorted_vals[f] * (c - k) d1 = sorted_vals[c] * (k - f) return d0 + d1 @dataclass class Stats: mean: float stdev: float cv: float p50: float p90: float p99: float p999: float p10: float p1: float p01: float minv: float maxv: float def compute_stats(vals: List[float]) -> Stats: if not vals: nan = float("nan") return Stats(nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan) n = len(vals) mean = sum(vals) / n var = sum((v - mean) ** 2 for v in vals) / n stdev = math.sqrt(var) cv = (stdev / mean) if mean != 0 else float("nan") s = sorted(vals) return Stats( mean=mean, stdev=stdev, cv=cv, p50=percentile(s, 50), p90=percentile(s, 90), p99=percentile(s, 99), p999=percentile(s, 99.9), p10=percentile(s, 10), p1=percentile(s, 1), p01=percentile(s, 0.1), minv=s[0], maxv=s[-1], ) def read_csv(path: str) -> Tuple[List[float], List[float]]: thr: List[float] = [] rss: List[float] = [] with open(path, newline="") as f: reader = csv.DictReader(f) for row in reader: t = row.get("throughput_ops_s", "").strip() r = row.get("rss_mb", "").strip() if not t: continue try: thr.append(float(t)) except ValueError: continue if r: try: rss.append(float(r)) except ValueError: pass return thr, rss def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("csv", help="epoch CSV (from scripts/soak_mixed_single_process.sh)") args = ap.parse_args() thr, rss = read_csv(args.csv) thr_stats = compute_stats(thr) lat = [(1e9 / t) for t in thr if t > 0] lat_stats = compute_stats(lat) print(f"epochs={len(thr)}") print("") print("Throughput (ops/s) [NOTE: tail = low throughput]") print(f" mean={thr_stats.mean:,.0f} stdev={thr_stats.stdev:,.0f} cv={thr_stats.cv*100:.2f}%") print(f" p50={thr_stats.p50:,.0f} p10={thr_stats.p10:,.0f} p1={thr_stats.p1:,.0f} p0.1={thr_stats.p01:,.0f}") print(f" min={thr_stats.minv:,.0f} max={thr_stats.maxv:,.0f}") print("") print("Latency proxy (ns/op) [NOTE: tail = high latency]") print(f" mean={lat_stats.mean:,.2f} stdev={lat_stats.stdev:,.2f} cv={lat_stats.cv*100:.2f}%") print(f" p50={lat_stats.p50:,.2f} p90={lat_stats.p90:,.2f} p99={lat_stats.p99:,.2f} p99.9={lat_stats.p999:,.2f}") print(f" min={lat_stats.minv:,.2f} max={lat_stats.maxv:,.2f}") if rss: rss_stats = compute_stats(rss) print("") print("RSS (MB) [peak per epoch sample]") print(f" mean={rss_stats.mean:,.2f} stdev={rss_stats.stdev:,.2f} cv={rss_stats.cv*100:.2f}%") print(f" min={rss_stats.minv:,.2f} max={rss_stats.maxv:,.2f}") return 0 if __name__ == "__main__": raise SystemExit(main())