357 lines
11 KiB
Python
357 lines
11 KiB
Python
"""Tests for clawbench.dynamics."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from clawbench.dynamics import (
|
|
TOOL_FAMILIES,
|
|
Dynamics,
|
|
Regime,
|
|
Sensitivity,
|
|
SurvivalPoint,
|
|
StratumStats,
|
|
StratifiedAssessment,
|
|
_classify_tool,
|
|
_cosine_dist,
|
|
_entropy,
|
|
_js_divergence,
|
|
_levenshtein,
|
|
build_strata,
|
|
compute_dynamics,
|
|
compute_sensitivity,
|
|
find_event_step,
|
|
kaplan_meier,
|
|
stratify_by_regime,
|
|
stratify_by_tier,
|
|
)
|
|
from clawbench.schemas import (
|
|
TokenUsage,
|
|
ToolCall,
|
|
Transcript,
|
|
TranscriptMessage,
|
|
TaskRunResult,
|
|
)
|
|
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _msg(role, text="", family=None, success=True, error="", ts=0, tok=100):
|
|
tcs = []
|
|
if family:
|
|
tcs.append(ToolCall(
|
|
name=f"tool_{family}", family=family,
|
|
success=success, error=error, mutating=family == "edit",
|
|
))
|
|
return TranscriptMessage(
|
|
role=role, text=text, tool_calls=tcs, timestamp_ms=ts,
|
|
usage=TokenUsage(input_tokens=tok, output_tokens=tok // 2,
|
|
total_tokens=tok + tok // 2),
|
|
)
|
|
|
|
|
|
def _simple_transcript(families, errors=None):
|
|
if errors is None:
|
|
errors = [False] * len(families)
|
|
msgs = [_msg("user", "task")]
|
|
for i, (fam, err) in enumerate(zip(families, errors)):
|
|
msgs.append(_msg("assistant", f"step {i}", family=fam,
|
|
success=not err, error="err" if err else "",
|
|
ts=(i + 1) * 1000, tok=100 + i * 10))
|
|
return Transcript(messages=msgs)
|
|
|
|
|
|
def _run(transcript, score=0.5, task_id="t1"):
|
|
return TaskRunResult(
|
|
task_id=task_id, run_index=0, transcript=transcript,
|
|
run_score=score, duration_ms=10000,
|
|
token_usage=transcript.total_usage,
|
|
)
|
|
|
|
|
|
# ── _cosine_dist ─────────────────────────────────────────────────────
|
|
|
|
|
|
def test_cosine_dist_identical():
|
|
a = np.array([1.0, 0.0, 0.5])
|
|
assert _cosine_dist(a, a) == pytest.approx(0.0, abs=1e-9)
|
|
|
|
|
|
def test_cosine_dist_orthogonal():
|
|
assert _cosine_dist(np.array([1, 0, 0.0]), np.array([0, 1, 0.0])) == pytest.approx(1.0)
|
|
|
|
|
|
def test_cosine_dist_zero_vector():
|
|
assert _cosine_dist(np.zeros(3), np.array([1, 2, 3.0])) == 1.0
|
|
|
|
|
|
# ── _entropy ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_entropy_uniform():
|
|
assert _entropy({"a": 10, "b": 10}) == pytest.approx(1.0)
|
|
|
|
|
|
def test_entropy_single():
|
|
assert _entropy({"a": 100}) == pytest.approx(0.0)
|
|
|
|
|
|
def test_entropy_empty():
|
|
assert _entropy({}) == 0.0
|
|
|
|
|
|
# ── _js_divergence ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_jsd_identical():
|
|
d = {"a": 5, "b": 5}
|
|
assert _js_divergence(d, d) == pytest.approx(0.0, abs=1e-9)
|
|
|
|
|
|
def test_jsd_disjoint():
|
|
assert _js_divergence({"a": 10}, {"b": 10}) > 0.5
|
|
|
|
|
|
# ── _levenshtein ────────────────────────────────────────────────────
|
|
|
|
|
|
def test_levenshtein_equal():
|
|
assert _levenshtein([1, 2, 3], [1, 2, 3]) == 0
|
|
|
|
|
|
def test_levenshtein_empty():
|
|
assert _levenshtein([], [1, 2]) == 2
|
|
|
|
|
|
def test_levenshtein_different():
|
|
assert _levenshtein(["a", "b"], ["c", "d"]) == 2
|
|
|
|
|
|
# ── _classify_tool ──────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.parametrize("name,expected", [
|
|
("bash_execute", "execute"),
|
|
("file_read", "read"),
|
|
("tool_edit", "edit"),
|
|
("web_browser", "browser"),
|
|
("grep_search", "search"),
|
|
("write_file", "edit"),
|
|
("run_tests", "execute"),
|
|
])
|
|
def test_classify_tool(name, expected):
|
|
assert _classify_tool(name) == expected
|
|
|
|
|
|
# ── compute_dynamics ─────────────────────────────────────────────────
|
|
|
|
|
|
def test_dynamics_basic():
|
|
t = _simple_transcript(["read", "edit", "execute", "read", "edit"])
|
|
d = compute_dynamics(t)
|
|
assert d.n_steps == 5
|
|
assert len(d.drift) == 5
|
|
assert len(d.step_size) == 5
|
|
assert len(d.entropy_series) == 5
|
|
assert len(d.tool_sequence) == 5
|
|
assert d.tool_entropy > 0
|
|
|
|
|
|
def test_dynamics_empty():
|
|
t = Transcript(messages=[_msg("user", "hi")])
|
|
d = compute_dynamics(t)
|
|
assert d.n_steps == 0
|
|
assert d.regime == Regime.unknown
|
|
|
|
|
|
def test_dynamics_trapped():
|
|
t = _simple_transcript(["execute"] * 15, errors=[True] * 15)
|
|
d = compute_dynamics(t)
|
|
assert d.regime == Regime.trapped
|
|
assert d.error_rate > 0.5
|
|
|
|
|
|
def test_dynamics_convergent():
|
|
cycle = ["read", "search", "edit", "read", "execute"] * 6
|
|
t = _simple_transcript(cycle[:30])
|
|
d = compute_dynamics(t)
|
|
assert d.regime in (Regime.convergent, Regime.limit_cycle, Regime.diffusive, Regime.unknown)
|
|
assert d.error_rate == 0.0
|
|
|
|
|
|
def test_dynamics_markov_keys():
|
|
t = _simple_transcript(["read", "edit", "read"])
|
|
d = compute_dynamics(t)
|
|
assert "read" in d.markov
|
|
assert "edit" in d.markov["read"]
|
|
|
|
|
|
def test_dynamics_constraint_index_range():
|
|
t = _simple_transcript(["read", "edit", "search", "execute", "browser", "memory"] * 3)
|
|
d = compute_dynamics(t)
|
|
assert 0 <= d.constraint_index <= 1
|
|
|
|
|
|
def test_dynamics_memory_depth():
|
|
t = _simple_transcript(["read", "edit", "read", "edit", "read", "edit"] * 3)
|
|
d = compute_dynamics(t)
|
|
assert d.memory_depth >= 0
|
|
|
|
|
|
def test_dynamics_normalizes_unknown_tool_family():
|
|
transcript = Transcript(
|
|
messages=[
|
|
_msg("user", "task"),
|
|
TranscriptMessage(
|
|
role="assistant",
|
|
text="searching",
|
|
tool_calls=[
|
|
ToolCall(
|
|
name="grep_search",
|
|
family="unknown",
|
|
success=True,
|
|
error="",
|
|
mutating=False,
|
|
)
|
|
],
|
|
timestamp_ms=1000,
|
|
usage=TokenUsage(input_tokens=10, output_tokens=5, total_tokens=15),
|
|
),
|
|
_msg("assistant", "next", family="read", ts=2000),
|
|
_msg("assistant", "done", family="edit", ts=3000),
|
|
]
|
|
)
|
|
|
|
dynamics = compute_dynamics(transcript)
|
|
|
|
assert dynamics.tool_sequence[0] == "search"
|
|
assert "search" in dynamics.markov
|
|
|
|
|
|
# ── compute_sensitivity ──────────────────────────────────────────────
|
|
|
|
|
|
def test_sensitivity_identical_runs():
|
|
t = _simple_transcript(["read", "edit", "execute"])
|
|
ra = _run(t, score=0.8)
|
|
rb = _run(t, score=0.8)
|
|
s = compute_sensitivity(ra, rb)
|
|
assert s.score_delta == pytest.approx(0.0)
|
|
assert s.tool_edit_distance == 0
|
|
|
|
|
|
def test_sensitivity_different_runs():
|
|
ta = _simple_transcript(["read", "edit", "execute"])
|
|
tb = _simple_transcript(["search", "browser", "memory"])
|
|
ra = _run(ta, score=0.9)
|
|
rb = _run(tb, score=0.3)
|
|
s = compute_sensitivity(ra, rb)
|
|
assert s.score_delta == pytest.approx(0.6)
|
|
assert s.tool_edit_distance > 0
|
|
assert s.family_js_divergence > 0
|
|
|
|
|
|
# ── kaplan_meier ─────────────────────────────────────────────────────
|
|
|
|
|
|
def test_km_basic():
|
|
pts = kaplan_meier([1, 2, 3])
|
|
assert pts[0].time == 0.0
|
|
assert pts[0].survival == 1.0
|
|
assert pts[-1].survival == pytest.approx(0.0)
|
|
|
|
|
|
def test_km_with_censoring():
|
|
pts = kaplan_meier([1, 5, 3], censored=[False, True, False])
|
|
assert len(pts) == 3
|
|
assert pts[-1].survival > 0
|
|
|
|
|
|
def test_km_empty():
|
|
assert kaplan_meier([]) == []
|
|
|
|
|
|
# ── find_event_step ──────────────────────────────────────────────────
|
|
|
|
|
|
def test_find_first_correct_write():
|
|
t = _simple_transcript(["read", "search", "edit", "execute"])
|
|
assert find_event_step(t, "first_correct_write") == 2.0
|
|
|
|
|
|
def test_find_first_error_recovery():
|
|
t = _simple_transcript(
|
|
["read", "execute", "read"],
|
|
errors=[False, True, False],
|
|
)
|
|
assert find_event_step(t, "first_error_recovery") == 2.0
|
|
|
|
|
|
def test_find_task_completion():
|
|
t = _simple_transcript(["read", "edit"])
|
|
assert find_event_step(t, "task_completion") == 1.0
|
|
|
|
|
|
def test_find_event_none():
|
|
t = _simple_transcript(["read", "read"])
|
|
assert find_event_step(t, "first_correct_write") is None
|
|
|
|
|
|
# ── build_strata + reweight ──────────────────────────────────────────
|
|
|
|
|
|
def test_build_strata_by_tier():
|
|
runs, dyns, scores = [], [], []
|
|
for tid, sc in [("t1-a", 0.8), ("t1-b", 0.6), ("t2-a", 0.4), ("t2-b", 0.3)]:
|
|
t = _simple_transcript(["read", "edit", "execute"])
|
|
r = _run(t, score=sc, task_id=tid)
|
|
runs.append(r)
|
|
dyns.append(compute_dynamics(t))
|
|
scores.append(sc)
|
|
|
|
sa = build_strata(runs, dyns, scores, stratify_by_tier, "tier")
|
|
assert sa.total_runs == 4
|
|
names = sa.stratum_names()
|
|
assert "tier1" in names
|
|
assert "tier2" in names
|
|
for s in sa.strata:
|
|
assert s.n_runs == 2
|
|
assert s.weight == pytest.approx(0.5)
|
|
|
|
|
|
def test_reweight_shifts_mean():
|
|
runs, dyns, scores = [], [], []
|
|
for tid, sc in [("t1-a", 0.9), ("t1-b", 0.8), ("t2-a", 0.2), ("t2-b", 0.1)]:
|
|
t = _simple_transcript(["read", "edit", "execute"])
|
|
r = _run(t, score=sc, task_id=tid)
|
|
runs.append(r)
|
|
dyns.append(compute_dynamics(t))
|
|
scores.append(sc)
|
|
|
|
sa = build_strata(runs, dyns, scores, stratify_by_tier, "tier")
|
|
|
|
# Reweight towards tier1 (high scores)
|
|
high = sa.reweight({"tier1": 0.9, "tier2": 0.1})
|
|
# Reweight towards tier2 (low scores)
|
|
low = sa.reweight({"tier1": 0.1, "tier2": 0.9})
|
|
|
|
assert high["score_mean"] > low["score_mean"]
|
|
|
|
|
|
def test_reweight_unknown_stratum():
|
|
runs, dyns, scores = [], [], []
|
|
t = _simple_transcript(["read", "edit"])
|
|
r = _run(t, score=0.5, task_id="t1-x")
|
|
runs.append(r)
|
|
dyns.append(compute_dynamics(t))
|
|
scores.append(0.5)
|
|
|
|
sa = build_strata(runs, dyns, scores, stratify_by_tier, "tier")
|
|
# Reweight with a stratum that doesn't exist — should fall back
|
|
result = sa.reweight({"nonexistent": 1.0})
|
|
assert "score_mean" in result
|