225 lines
7.8 KiB
Python
225 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Compute posterior Constraint Index C(q) from cached runs.
|
|
|
|
Task-level constraint index:
|
|
|
|
C(q) = -z(PR(q)) - z(H(q)) + z(BOPS(q))
|
|
|
|
Where:
|
|
|
|
PR(q) = participation ratio of the task response covariance
|
|
H(q) = Shannon entropy of the covariance eigenspectrum
|
|
BOPS(q) = within-model inter-run predictability proxy
|
|
|
|
High C(q) means a task is more constrained: models and repeated runs tend to
|
|
land in a narrower response manifold. Low C(q) means the task is more open or
|
|
stylistically underconstrained.
|
|
|
|
This implementation uses a normalized bag-of-words representation built from
|
|
the full assistant trajectory text plus tool-call names and compacted inputs.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from clawbench.dynamics_archive import load_task_runs_by_model
|
|
|
|
WORD_RE = re.compile(r"[a-z]{3,}")
|
|
STOPWORDS = set(
|
|
"the and that with this have from what your will can but not "
|
|
"was are been one would there they their has had its were only some "
|
|
"than about these which into also each when where them how who very "
|
|
"much more most other then here such does like just make many want need take".split()
|
|
)
|
|
|
|
|
|
def _assistant_trajectory_text(run, max_chars: int = 4000) -> str:
|
|
parts = []
|
|
for message in run.transcript.assistant_messages:
|
|
if message.text:
|
|
parts.append(message.text)
|
|
for call in message.tool_calls:
|
|
parts.append(call.name)
|
|
if call.input:
|
|
parts.append(json.dumps(call.input, sort_keys=True)[:200])
|
|
return " ".join(p for p in parts if p).strip()[:max_chars]
|
|
|
|
|
|
def _fallback_text_from_any_message(run) -> str:
|
|
for msg in reversed(run.transcript.messages):
|
|
parts = []
|
|
if msg.text:
|
|
parts.append(msg.text)
|
|
for call in msg.tool_calls:
|
|
parts.append(call.name)
|
|
if call.input:
|
|
parts.append(json.dumps(call.input, sort_keys=True)[:200])
|
|
if parts:
|
|
return " ".join(parts).strip()
|
|
return ""
|
|
|
|
|
|
def tokenize(text: str) -> list[str]:
|
|
return [w for w in WORD_RE.findall((text or "").lower()) if w not in STOPWORDS]
|
|
|
|
|
|
def build_vocab(texts: list[str], top_k: int = 500) -> dict[str, int]:
|
|
counts = Counter()
|
|
for text in texts:
|
|
counts.update(set(tokenize(text)))
|
|
return {word: idx for idx, (word, _) in enumerate(counts.most_common(top_k))}
|
|
|
|
|
|
def vectorize(text: str, vocab: dict[str, int]) -> np.ndarray:
|
|
vec = np.zeros(len(vocab), dtype=np.float32)
|
|
toks = tokenize(text)
|
|
if not toks:
|
|
return vec
|
|
counts = Counter(toks)
|
|
for word, cnt in counts.items():
|
|
if word in vocab:
|
|
vec[vocab[word]] = cnt
|
|
norm = np.linalg.norm(vec)
|
|
return vec / norm if norm > 0 else vec
|
|
|
|
|
|
def participation_ratio(X: np.ndarray) -> float:
|
|
"""PR(X) = (tr Sigma)^2 / tr(Sigma^2), an effective dimensionality proxy."""
|
|
if X.shape[0] < 2:
|
|
return 1.0
|
|
sigma = np.cov(X.T)
|
|
if sigma.ndim == 0:
|
|
return 1.0
|
|
tr = np.trace(sigma)
|
|
tr_sq = np.trace(sigma @ sigma)
|
|
if tr_sq < 1e-12:
|
|
return 1.0
|
|
return float((tr**2) / tr_sq)
|
|
|
|
|
|
def response_entropy(X: np.ndarray) -> float:
|
|
"""Entropy over normalized covariance eigenvalues, in bits."""
|
|
if X.shape[0] < 2:
|
|
return 0.0
|
|
sigma = np.cov(X.T)
|
|
eigs = np.linalg.eigvalsh(sigma)
|
|
eigs = np.clip(eigs, 1e-12, None)
|
|
probs = eigs / eigs.sum()
|
|
return float(-np.sum(probs * np.log2(probs)))
|
|
|
|
|
|
def bops_inter_run_predictability(run_vecs: dict[str, list[np.ndarray]]) -> float:
|
|
"""Mean within-model pairwise cosine similarity across repeated runs."""
|
|
per_model_means = []
|
|
for vecs in run_vecs.values():
|
|
if len(vecs) < 2:
|
|
continue
|
|
sims = []
|
|
for i in range(len(vecs)):
|
|
for j in range(i + 1, len(vecs)):
|
|
v1, v2 = vecs[i], vecs[j]
|
|
n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2)
|
|
if n1 > 0 and n2 > 0:
|
|
sims.append(float(v1 @ v2 / (n1 * n2)))
|
|
if sims:
|
|
per_model_means.append(float(np.mean(sims)))
|
|
return float(np.mean(per_model_means)) if per_model_means else 0.0
|
|
|
|
|
|
def zscore(value: float, arr: np.ndarray) -> float:
|
|
std = arr.std()
|
|
return float((value - arr.mean()) / std) if std > 1e-12 else 0.0
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Compute posterior constraint index per task")
|
|
parser.add_argument("--archive-dir", type=Path, default=Path(".clawbench/run_cache"))
|
|
parser.add_argument("--reports-dir", type=Path, default=Path("reports"))
|
|
parser.add_argument("--tier", choices=["tier1", "tier2", "tier3", "tier4", "tier5"], default=None)
|
|
args = parser.parse_args()
|
|
|
|
grouped = load_task_runs_by_model(args.archive_dir, tier=args.tier)
|
|
if not grouped:
|
|
raise SystemExit(f"No cached runs found under {args.archive_dir}")
|
|
|
|
per_task_texts: dict[str, list[str]] = defaultdict(list)
|
|
per_task_model_texts: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list))
|
|
|
|
use_fallback_messages = False
|
|
for model_name, task_runs in grouped.items():
|
|
for task_id, runs in task_runs.items():
|
|
for run in runs:
|
|
text = _assistant_trajectory_text(run)
|
|
if text:
|
|
per_task_texts[task_id].append(text)
|
|
per_task_model_texts[task_id][model_name].append(text)
|
|
|
|
all_texts = [text for texts in per_task_texts.values() for text in texts]
|
|
if not all_texts:
|
|
use_fallback_messages = True
|
|
for model_name, task_runs in grouped.items():
|
|
for task_id, runs in task_runs.items():
|
|
for run in runs:
|
|
text = _fallback_text_from_any_message(run)
|
|
if text:
|
|
per_task_texts[task_id].append(text)
|
|
per_task_model_texts[task_id][model_name].append(text)
|
|
all_texts = [text for texts in per_task_texts.values() for text in texts]
|
|
|
|
if not all_texts:
|
|
raise SystemExit("No usable text found in cached transcripts.")
|
|
|
|
vocab = build_vocab(all_texts, top_k=500)
|
|
per_task: dict[str, dict[str, float | str]] = {}
|
|
for task_id, texts in sorted(per_task_texts.items()):
|
|
X = np.stack([vectorize(text, vocab) for text in texts])
|
|
pr = participation_ratio(X)
|
|
ent = response_entropy(X)
|
|
model_vecs = {
|
|
model_name: [vectorize(text, vocab) for text in model_texts]
|
|
for model_name, model_texts in per_task_model_texts[task_id].items()
|
|
}
|
|
bops = bops_inter_run_predictability(model_vecs)
|
|
per_task[task_id] = {
|
|
"n_responses": len(texts),
|
|
"PR": pr,
|
|
"entropy": ent,
|
|
"BOPS": bops,
|
|
"data_source": "fallback_any_message" if use_fallback_messages else "assistant_final",
|
|
}
|
|
|
|
if not per_task:
|
|
raise SystemExit("Not enough data to compute C(q).")
|
|
|
|
prs = np.array([v["PR"] for v in per_task.values()])
|
|
ents = np.array([v["entropy"] for v in per_task.values()])
|
|
bopss = np.array([v["BOPS"] for v in per_task.values()])
|
|
|
|
for task_id, v in per_task.items():
|
|
z_pr = zscore(v["PR"], prs)
|
|
z_ent = zscore(v["entropy"], ents)
|
|
z_bops = zscore(v["BOPS"], bopss)
|
|
v["z_PR"] = z_pr
|
|
v["z_entropy"] = z_ent
|
|
v["z_BOPS"] = z_bops
|
|
v["C_q"] = -z_pr - z_ent + z_bops
|
|
|
|
args.reports_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = args.reports_dir / "constraint_index.json"
|
|
out_path.write_text(json.dumps(per_task, indent=2), encoding="utf-8")
|
|
print(f"Wrote: {out_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|