| | import difflib |
| | from typing import Any, Dict, List, Optional |
| |
|
| | from .bundle import load_bundle |
| |
|
| |
|
| | def _normalize_for_compare(x: Any) -> Any: |
| | if isinstance(x, dict): |
| | return {k: _normalize_for_compare(x[k]) for k in sorted(x.keys())} |
| | if isinstance(x, list): |
| | return [_normalize_for_compare(v) for v in x] |
| | return x |
| |
|
| |
|
| | def _event_core(ev: Dict[str, Any]) -> Any: |
| | return _normalize_for_compare({k: ev.get(k) for k in ("kind", "step", "payload")}) |
| |
|
| |
|
| | def build_alignment(A_events: List[Dict[str, Any]], B_events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| | rows: List[Dict[str, Any]] = [] |
| | n = max(len(A_events), len(B_events)) |
| | for i in range(n): |
| | a = A_events[i] if i < len(A_events) else None |
| | b = B_events[i] if i < len(B_events) else None |
| |
|
| | if a is None: |
| | status = "missing_in_A" |
| | elif b is None: |
| | status = "missing_in_B" |
| | else: |
| | status = "same" if _event_core(a) == _event_core(b) else "diff" |
| |
|
| | rows.append( |
| | { |
| | "i": i, |
| | "status": status, |
| | "kind_a": a.get("kind") if a else None, |
| | "step_a": a.get("step") if a else None, |
| | "kind_b": b.get("kind") if b else None, |
| | "step_b": b.get("step") if b else None, |
| | } |
| | ) |
| | return rows |
| |
|
| |
|
| | def _json_diff(a: Any, b: Any, path: str = "") -> List[Dict[str, Any]]: |
| | diffs: List[Dict[str, Any]] = [] |
| |
|
| | if type(a) != type(b): |
| | diffs.append({"path": path or "$", "kind": "type", "a": str(type(a)), "b": str(type(b))}) |
| | return diffs |
| |
|
| | if isinstance(a, dict): |
| | akeys = set(a.keys()) |
| | bkeys = set(b.keys()) |
| | for k in sorted(akeys - bkeys): |
| | diffs.append({"path": f"{path}.{k}" if path else k, "kind": "removed", "a": a[k], "b": None}) |
| | for k in sorted(bkeys - akeys): |
| | diffs.append({"path": f"{path}.{k}" if path else k, "kind": "added", "a": None, "b": b[k]}) |
| | for k in sorted(akeys & bkeys): |
| | diffs.extend(_json_diff(a[k], b[k], f"{path}.{k}" if path else k)) |
| | return diffs |
| |
|
| | if isinstance(a, list): |
| | n = max(len(a), len(b)) |
| | for i in range(n): |
| | pa = a[i] if i < len(a) else None |
| | pb = b[i] if i < len(b) else None |
| | if i >= len(a): |
| | diffs.append({"path": f"{path}[{i}]", "kind": "added", "a": None, "b": pb}) |
| | elif i >= len(b): |
| | diffs.append({"path": f"{path}[{i}]", "kind": "removed", "a": pa, "b": None}) |
| | else: |
| | diffs.extend(_json_diff(pa, pb, f"{path}[{i}]")) |
| | return diffs |
| |
|
| | if a != b: |
| | diffs.append({"path": path or "$", "kind": "value", "a": a, "b": b}) |
| | return diffs |
| |
|
| |
|
| | def _classify_divergence(kind_a: Optional[str], kind_b: Optional[str]) -> str: |
| | if kind_a != kind_b: |
| | return "control-flow" |
| | if kind_a in ("tool_call", "tool_result"): |
| | return "tool" |
| | if kind_a in ("memory_write", "memory_read"): |
| | return "memory" |
| | if kind_a in ("llm_sample", "llm_call"): |
| | return "sampling" |
| | if kind_a in ("guardrail",): |
| | return "governance" |
| | return "state" |
| |
|
| |
|
| | def _text_delta(a: str, b: str) -> str: |
| | a_lines = a.splitlines() |
| | b_lines = b.splitlines() |
| | diff = difflib.unified_diff(a_lines, b_lines, fromfile="A", tofile="B", lineterm="") |
| | return "\n".join(diff) |
| |
|
| |
|
| | def _extract_final_reward(events: List[Dict[str, Any]]) -> Optional[float]: |
| | """ |
| | Looks for last state_snapshot payload containing: |
| | - payload.reward_total |
| | - payload.metrics.reward_total |
| | """ |
| | for ev in reversed(events): |
| | if ev.get("kind") != "state_snapshot": |
| | continue |
| | p = ev.get("payload", {}) or {} |
| | if isinstance(p, dict): |
| | rt = p.get("reward_total") |
| | if isinstance(rt, (int, float)): |
| | return float(rt) |
| | m = p.get("metrics") |
| | if isinstance(m, dict): |
| | rt2 = m.get("reward_total") |
| | if isinstance(rt2, (int, float)): |
| | return float(rt2) |
| | return None |
| |
|
| |
|
| | def _event_link(manifest: Dict[str, Any], i: int) -> Optional[str]: |
| | """ |
| | Optional deep-link generation. |
| | Supported: |
| | - manifest.replay.base_url + manifest.replay.pattern with {run_id} and {i} |
| | - manifest.run_url + ?i={i} |
| | """ |
| | run_id = manifest.get("run_id") |
| | replay = manifest.get("replay") |
| |
|
| | if isinstance(replay, dict): |
| | base = replay.get("base_url") |
| | pattern = replay.get("pattern", "") |
| | if isinstance(base, str) and isinstance(pattern, str) and run_id: |
| | try: |
| | return base.rstrip("/") + pattern.format(run_id=run_id, i=i) |
| | except Exception: |
| | return None |
| |
|
| | run_url = manifest.get("run_url") |
| | if isinstance(run_url, str) and run_url: |
| | |
| | joiner = "&" if "?" in run_url else "?" |
| | return f"{run_url}{joiner}i={i}" |
| |
|
| | return None |
| |
|
| |
|
| | def diff_bundles(zip_a: str, zip_b: str) -> Dict[str, Any]: |
| | A = load_bundle(zip_a) |
| | B = load_bundle(zip_b) |
| |
|
| | ea = A.events |
| | eb = B.events |
| |
|
| | alignment = build_alignment(ea, eb) |
| |
|
| | |
| | first_div: Optional[int] = None |
| | for row in alignment: |
| | if row["status"] != "same": |
| | first_div = row["i"] |
| | break |
| |
|
| | |
| | per_event: List[Dict[str, Any]] = [] |
| | n = min(len(ea), len(eb)) |
| | for i in range(n): |
| | na = _event_core(ea[i]) |
| | nb = _event_core(eb[i]) |
| | if na == nb: |
| | continue |
| |
|
| | diffs = _json_diff(na, nb) |
| | item: Dict[str, Any] = { |
| | "i": i, |
| | "kind_a": ea[i].get("kind"), |
| | "kind_b": eb[i].get("kind"), |
| | "step_a": ea[i].get("step"), |
| | "step_b": eb[i].get("step"), |
| | "class": _classify_divergence(ea[i].get("kind"), eb[i].get("kind")), |
| | "diffs": diffs[:200], |
| | "link_a": _event_link(A.manifest, i), |
| | "link_b": _event_link(B.manifest, i), |
| | } |
| |
|
| | ta = (ea[i].get("payload", {}) or {}).get("text") |
| | tb = (eb[i].get("payload", {}) or {}).get("text") |
| | if isinstance(ta, str) and isinstance(tb, str) and ta != tb: |
| | item["text_unified_diff"] = _text_delta(ta, tb)[:20000] |
| |
|
| | per_event.append(item) |
| |
|
| | diff_count = sum(1 for r in alignment if r["status"] == "diff") |
| | missing_count = sum(1 for r in alignment if r["status"] in ("missing_in_A", "missing_in_B")) |
| |
|
| | ra = _extract_final_reward(ea) |
| | rb = _extract_final_reward(eb) |
| | reward_delta = (rb - ra) if (ra is not None and rb is not None) else None |
| |
|
| | |
| | counts: Dict[str, int] = {} |
| | for item in per_event: |
| | c = item["class"] |
| | counts[c] = counts.get(c, 0) + 1 |
| |
|
| | summary: Dict[str, Any] = { |
| | "run_a": A.manifest.get("run_id"), |
| | "run_b": B.manifest.get("run_id"), |
| | "framework_a": A.manifest.get("framework"), |
| | "framework_b": B.manifest.get("framework"), |
| | "model_a": A.manifest.get("model_id"), |
| | "model_b": B.manifest.get("model_id"), |
| | "events_a": len(ea), |
| | "events_b": len(eb), |
| | "first_divergence_index": first_div, |
| | "identical_until_index": first_div, |
| | "diff_event_count": diff_count, |
| | "missing_event_count": missing_count, |
| | "final_reward_a": ra, |
| | "final_reward_b": rb, |
| | "final_reward_delta": reward_delta, |
| | "run_link_a": _event_link(A.manifest, 0), |
| | "run_link_b": _event_link(B.manifest, 0), |
| | } |
| |
|
| | return { |
| | "summary": summary, |
| | "class_counts": counts, |
| | "alignment": alignment, |
| | "differences": per_event[:400], |
| | } |