clawbench/scripts/compute_constraint_index.py

225 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""Compute posterior Constraint Index C(q) from cached runs.
Task-level constraint index:
C(q) = -z(PR(q)) - z(H(q)) + z(BOPS(q))
Where:
PR(q) = participation ratio of the task response covariance
H(q) = Shannon entropy of the covariance eigenspectrum
BOPS(q) = within-model inter-run predictability proxy
High C(q) means a task is more constrained: models and repeated runs tend to
land in a narrower response manifold. Low C(q) means the task is more open or
stylistically underconstrained.
This implementation uses a normalized bag-of-words representation built from
the full assistant trajectory text plus tool-call names and compacted inputs.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from clawbench.dynamics_archive import load_task_runs_by_model
WORD_RE = re.compile(r"[a-z]{3,}")
STOPWORDS = set(
"the and that with this have from what your will can but not "
"was are been one would there they their has had its were only some "
"than about these which into also each when where them how who very "
"much more most other then here such does like just make many want need take".split()
)
def _assistant_trajectory_text(run, max_chars: int = 4000) -> str:
parts = []
for message in run.transcript.assistant_messages:
if message.text:
parts.append(message.text)
for call in message.tool_calls:
parts.append(call.name)
if call.input:
parts.append(json.dumps(call.input, sort_keys=True)[:200])
return " ".join(p for p in parts if p).strip()[:max_chars]
def _fallback_text_from_any_message(run) -> str:
for msg in reversed(run.transcript.messages):
parts = []
if msg.text:
parts.append(msg.text)
for call in msg.tool_calls:
parts.append(call.name)
if call.input:
parts.append(json.dumps(call.input, sort_keys=True)[:200])
if parts:
return " ".join(parts).strip()
return ""
def tokenize(text: str) -> list[str]:
return [w for w in WORD_RE.findall((text or "").lower()) if w not in STOPWORDS]
def build_vocab(texts: list[str], top_k: int = 500) -> dict[str, int]:
counts = Counter()
for text in texts:
counts.update(set(tokenize(text)))
return {word: idx for idx, (word, _) in enumerate(counts.most_common(top_k))}
def vectorize(text: str, vocab: dict[str, int]) -> np.ndarray:
vec = np.zeros(len(vocab), dtype=np.float32)
toks = tokenize(text)
if not toks:
return vec
counts = Counter(toks)
for word, cnt in counts.items():
if word in vocab:
vec[vocab[word]] = cnt
norm = np.linalg.norm(vec)
return vec / norm if norm > 0 else vec
def participation_ratio(X: np.ndarray) -> float:
"""PR(X) = (tr Sigma)^2 / tr(Sigma^2), an effective dimensionality proxy."""
if X.shape[0] < 2:
return 1.0
sigma = np.cov(X.T)
if sigma.ndim == 0:
return 1.0
tr = np.trace(sigma)
tr_sq = np.trace(sigma @ sigma)
if tr_sq < 1e-12:
return 1.0
return float((tr**2) / tr_sq)
def response_entropy(X: np.ndarray) -> float:
"""Entropy over normalized covariance eigenvalues, in bits."""
if X.shape[0] < 2:
return 0.0
sigma = np.cov(X.T)
eigs = np.linalg.eigvalsh(sigma)
eigs = np.clip(eigs, 1e-12, None)
probs = eigs / eigs.sum()
return float(-np.sum(probs * np.log2(probs)))
def bops_inter_run_predictability(run_vecs: dict[str, list[np.ndarray]]) -> float:
"""Mean within-model pairwise cosine similarity across repeated runs."""
per_model_means = []
for vecs in run_vecs.values():
if len(vecs) < 2:
continue
sims = []
for i in range(len(vecs)):
for j in range(i + 1, len(vecs)):
v1, v2 = vecs[i], vecs[j]
n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2)
if n1 > 0 and n2 > 0:
sims.append(float(v1 @ v2 / (n1 * n2)))
if sims:
per_model_means.append(float(np.mean(sims)))
return float(np.mean(per_model_means)) if per_model_means else 0.0
def zscore(value: float, arr: np.ndarray) -> float:
std = arr.std()
return float((value - arr.mean()) / std) if std > 1e-12 else 0.0
def main() -> None:
parser = argparse.ArgumentParser(description="Compute posterior constraint index per task")
parser.add_argument("--archive-dir", type=Path, default=Path(".clawbench/run_cache"))
parser.add_argument("--reports-dir", type=Path, default=Path("reports"))
parser.add_argument("--tier", choices=["tier1", "tier2", "tier3", "tier4", "tier5"], default=None)
args = parser.parse_args()
grouped = load_task_runs_by_model(args.archive_dir, tier=args.tier)
if not grouped:
raise SystemExit(f"No cached runs found under {args.archive_dir}")
per_task_texts: dict[str, list[str]] = defaultdict(list)
per_task_model_texts: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list))
use_fallback_messages = False
for model_name, task_runs in grouped.items():
for task_id, runs in task_runs.items():
for run in runs:
text = _assistant_trajectory_text(run)
if text:
per_task_texts[task_id].append(text)
per_task_model_texts[task_id][model_name].append(text)
all_texts = [text for texts in per_task_texts.values() for text in texts]
if not all_texts:
use_fallback_messages = True
for model_name, task_runs in grouped.items():
for task_id, runs in task_runs.items():
for run in runs:
text = _fallback_text_from_any_message(run)
if text:
per_task_texts[task_id].append(text)
per_task_model_texts[task_id][model_name].append(text)
all_texts = [text for texts in per_task_texts.values() for text in texts]
if not all_texts:
raise SystemExit("No usable text found in cached transcripts.")
vocab = build_vocab(all_texts, top_k=500)
per_task: dict[str, dict[str, float | str]] = {}
for task_id, texts in sorted(per_task_texts.items()):
X = np.stack([vectorize(text, vocab) for text in texts])
pr = participation_ratio(X)
ent = response_entropy(X)
model_vecs = {
model_name: [vectorize(text, vocab) for text in model_texts]
for model_name, model_texts in per_task_model_texts[task_id].items()
}
bops = bops_inter_run_predictability(model_vecs)
per_task[task_id] = {
"n_responses": len(texts),
"PR": pr,
"entropy": ent,
"BOPS": bops,
"data_source": "fallback_any_message" if use_fallback_messages else "assistant_final",
}
if not per_task:
raise SystemExit("Not enough data to compute C(q).")
prs = np.array([v["PR"] for v in per_task.values()])
ents = np.array([v["entropy"] for v in per_task.values()])
bopss = np.array([v["BOPS"] for v in per_task.values()])
for task_id, v in per_task.items():
z_pr = zscore(v["PR"], prs)
z_ent = zscore(v["entropy"], ents)
z_bops = zscore(v["BOPS"], bopss)
v["z_PR"] = z_pr
v["z_entropy"] = z_ent
v["z_BOPS"] = z_bops
v["C_q"] = -z_pr - z_ent + z_bops
args.reports_dir.mkdir(parents=True, exist_ok=True)
out_path = args.reports_dir / "constraint_index.json"
out_path.write_text(json.dumps(per_task, indent=2), encoding="utf-8")
print(f"Wrote: {out_path}")
if __name__ == "__main__":
main()