ClawBench v0.5: configuration-space diagnostic framework

Add the v0.5 plugin-profile diagnostic system on top of v0.4:

- profile.py: PluginProfile, PluginManifest, RegistrationTrace,
  ProfileFingerprint, fingerprint_similarity (Jaccard composite over
  capability coverage, hook footprint, tool family surface, tags, slots,
  base model)
- prediction.py: HistoricalDatabase with JSON persistence, k-NN cold-start
  prediction with confidence bands, calibration metrics (MAE/RMSE/bias),
  surprise cause attribution
- factor_analysis.py: fANOVA with Random Forest surrogate when sklearn
  is available, fANOVA-lite fallback that decomposes variance via
  SSB/SST with pairwise interaction residuals
- diagnostic.py / diagnose_cli.py: Configuration Diagnostic Report
  ties profile -> fingerprint -> prediction -> run -> surprises -> insights
- utilization.py: plugin utilization audit (dead-weight detection) +
  manifest-vs-reality gap per plugin
- recommendations.py: evidence-backed profile change generator
  (add_plugin, remove_plugin, fill_slot, add_capability) with
  confidence scaled by sample size
- insights.py: publishes plugin leaderboard, factor importance,
  interactions, capability gaps, calibration history to JSON files
- stats.py: Taguchi larger-is-better signal-to-noise ratio and
  RobustnessProfile with per-tier means (the third mathematical
  pillar of v0.5 alongside k-NN and fANOVA)
- scorer.py: fix judge weighting per spec. Judge now capped at 10%
  when the task has a deterministic completion verifier and only
  contributes when the deterministic floor (completion >= 0.9999)
  is met. When no deterministic verifier exists, judge dominates
  at 50% (semantic-only regime). This enforces CLAWBENCH_V0_4_SPEC.md
  "Disallowed Primary Verifiers" and "Judge Gating" sections.
- cli.py: wire --profile flag into clawbench run; add clawbench diagnose
  subcommand
- harness.py: pass has_deterministic_verifier to combine_run_score
- CLAWBENCH_V0_4_SPEC.md: add v0.5 Direction section

.gitignore: exclude .clawbench/ runtime state and .DS_Store

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Codex 2026-04-10 19:13:02 -07:00
parent b6e82d6afe
commit cf04a17fea
17 changed files with 3644 additions and 51 deletions

2
.gitignore vendored
View File

@ -9,3 +9,5 @@ results/
.env
.tmp/
data/
.DS_Store
.clawbench/

View File

@ -515,3 +515,389 @@ ClawBench v0.4 is successful if:
3. Official benchmark results remain reproducible across reruns.
4. Judge-based scores, where used, correlate well with human labels on calibration tasks.
5. Overfitting pressure is reduced by hidden variants and rotation.
---
# v0.5 Direction: Configuration-Space Benchmarking
## Motivation
Every existing agent benchmark — OSWorld, SWE-bench, WebArena, GAIA — treats the agent as a black box and the model as the variable. Recent evidence inverts this assumption: on SWE-bench Pro, swapping scaffolds produces 22+ point swings while swapping frontier models produces ~1 point swings. The same Claude Sonnet beats Claude Opus when wrapped in better tooling. The configuration is the product, not the model.
OpenClaw's plugin-native architecture makes ClawBench uniquely positioned to exploit this. Because everything in OpenClaw is a plugin with a typed manifest, the benchmark can look *inside* a configuration in a way that no opaque-agent benchmark can. ClawBench v0.5 turns this structural visibility into its primary differentiator.
## Position
ClawBench v0.5 is not a leaderboard for agents. It is a **diagnostic benchmark for plugin configurations** in an open ecosystem. Users submit a plugin profile (bundled plugins + ClawHub installs + custom plugins). The benchmark runs the profile, decomposes which plugins contributed what, and recommends specific changes — all grounded in the plugin manifest contracts that OpenClaw already requires.
This is a structurally novel position because:
- The configuration space is **open-ended** (any third party can publish plugins or build custom ones), so closed-set ablation is impossible.
- Plugin manifests provide a **typed feature space** for any plugin — bundled, ClawHub, or custom — enabling generalization to unseen configurations.
- Plugin hooks create **emergent interactions** (planning hook + tool-approval hook → planned-then-cautious behavior) that no individual plugin's manifest predicts.
No other benchmark has these three properties together because no other benchmark is plugin-native.
## Non-Goals for v0.5
- Replacing the deterministic execution-based scoring of v0.4. The configuration analysis sits *on top of* the v0.4 verifier, not in place of it.
- Closed-set comparison of a fixed list of plugins. The framework must handle plugins it has never seen.
- Trained reward models or LLM judges for configuration scoring. All factor decomposition uses execution-verified ground truth.
- Building a plugin marketplace. ClawHub already exists. ClawBench consumes its metadata, it does not duplicate it.
## Core Concepts
### Plugin Profile
A submission is a Plugin Profile: the full set of plugins enabled for a benchmark run, plus their per-plugin config and slot assignments.
```yaml
profile:
name: "research-assistant-v3"
base_model: "claude-sonnet-4"
plugins:
enabled:
- id: "anthropic"
- id: "memory-lancedb"
config: { dimensions: 1536 }
- id: "browser-playwright"
- id: "github"
- id: "clawhub:rag-pinecone@1.2.0" # ClawHub plugin
- id: "local:./my-code-reviewer" # Custom plugin
slots:
memory: "memory-lancedb"
contextEngine: "builtin"
tools_allow: ["bash", "file_read", "file_edit", "browser_navigate", ...]
```
Two profiles are considered the same configuration if and only if their fingerprints (defined below) match. Profiles are the unit of benchmarking, ranking, and comparison.
### Manifest Feature Vector
Every plugin — known or unknown — has a feature vector derived mechanically from its `openclaw.plugin.json` manifest and (after loading) its registration trace. The vector is the same shape for every plugin so the framework generalizes.
```python
def plugin_features(manifest, registration_trace):
return {
# Contract declarations (from manifest, no code execution required)
"provides_tools_count": len(manifest.contracts.tools or []),
"provides_memory": "memory" in (manifest.kind or []),
"provides_context_engine": "context-engine" in (manifest.kind or []),
"provides_web_search": bool(manifest.contracts.webSearchProviders),
"provides_web_fetch": bool(manifest.contracts.webFetchProviders),
"provides_speech": bool(manifest.contracts.speechProviders),
"provides_image_generation": bool(manifest.contracts.imageGenerationProviders),
"provides_video_generation": bool(manifest.contracts.videoGenerationProviders),
"provides_media_understanding": bool(manifest.contracts.mediaUnderstandingProviders),
"provides_memory_embedding": bool(manifest.contracts.memoryEmbeddingProviders),
"n_channels": len(manifest.channels or []),
"n_providers": len(manifest.providers or []),
"clawhub_capability_tags": manifest.capabilityTags or [],
"clawhub_channel": manifest.clawhub_channel or "bundled",
"clawhub_is_official": manifest.clawhub_is_official,
# Hook footprint (from registration trace)
"hooks_before_agent_start": "before_agent_start" in trace.hooks,
"hooks_before_prompt_build": "before_prompt_build" in trace.hooks,
"hooks_before_tool_call": "before_tool_call" in trace.hooks,
"hooks_after_tool_call": "after_tool_call" in trace.hooks,
"hooks_llm_input": "llm_input" in trace.hooks,
"hooks_llm_output": "llm_output" in trace.hooks,
"hooks_before_compaction": "before_compaction" in trace.hooks,
"hooks_message_sending": "message_sending" in trace.hooks,
"hooks_subagent_spawning": "subagent_spawning" in trace.hooks,
# ... one column per hook in the 25-hook surface
# Tool-level features (classified from registered tools)
"tool_families": classify_tool_families(trace.tools),
# → multi-hot over {browser, file, search, execute, memory, delegate, ...}
# Surface area
"registers_gateway_methods": bool(trace.gatewayMethods),
"registers_http_routes": bool(trace.httpRoutes),
"registers_services": bool(trace.services),
}
```
The critical property: this function is **defined entirely over the plugin SDK contract**. Any plugin that loads in OpenClaw produces a valid feature vector. No hand-curation per plugin. No allowlist of known plugins.
### Profile Fingerprint
A Profile Fingerprint is the aggregation of all plugin feature vectors in a profile, plus profile-level features (slot assignments, tool allowlist, base model). It is the structural summary used for similarity search and prediction.
```python
def profile_fingerprint(profile):
plugin_vectors = [plugin_features(p) for p in profile.plugins]
return {
# Aggregated capability coverage (union over plugins)
"capability_coverage": union(v.contract_capabilities for v in plugin_vectors),
# Aggregated hook footprint
"hook_footprint": union(v.hooks_active for v in plugin_vectors),
# Aggregated tool family surface
"tool_family_surface": union(v.tool_families for v in plugin_vectors),
# Slot fills
"memory_slot": profile.slots.memory or "none",
"context_engine_slot": profile.slots.contextEngine or "none",
# Counts and provenance
"n_plugins": len(profile.plugins),
"n_clawhub_plugins": count(p for p in profile.plugins if p.source == "clawhub"),
"n_custom_plugins": count(p for p in profile.plugins if p.source == "local"),
# Base model is part of the configuration
"base_model": profile.base_model,
}
```
Two profiles with identical fingerprints should score similarly. Two profiles with similar fingerprints should score similarly. This is the assumption that makes prediction tractable in an open ecosystem.
## The Three-Layer Framework
### Layer 1: Manifest Fingerprinting (zero runs, instant)
Compute the profile fingerprint from the plugin manifests alone. This requires no benchmark runs and produces:
- A structural summary of what the configuration *should* be capable of
- A quick sanity check (does the profile fill the slots it needs for the tasks it will face?)
- Input to the Layer 2 prediction engine
This is the cheapest layer and runs on every submission, including dry-run validation before paying for benchmark execution.
### Layer 2: Similarity-Based Prediction (from accumulated data)
After N ≥ 30 historical submissions exist, ClawBench can predict the score of a new profile before running it.
```python
def predict_profile_score(new_profile, historical_data):
fingerprint = profile_fingerprint(new_profile)
neighbors = k_nearest_neighbors(
fingerprint,
historical_data,
k=10,
metric=fingerprint_similarity, # Jaccard on capability/hook/tool sets
)
predicted_overall = weighted_mean(
[n.actual_score for n in neighbors],
weights=[1.0 / (n.distance + epsilon) for n in neighbors],
)
predicted_per_task = {
task_id: weighted_mean(
[n.actual_score_per_task[task_id] for n in neighbors if task_id in n.actual_score_per_task],
weights=[1.0 / (n.distance + epsilon) for n in neighbors if task_id in n.actual_score_per_task],
)
for task_id in all_task_ids
}
capability_attributions = compute_marginal_attribution(
fingerprint, historical_data
)
return PredictionReport(
predicted_score=predicted_overall,
confidence=confidence_from_neighbor_density(neighbors),
per_task=predicted_per_task,
attributions=capability_attributions,
nearest_profiles=[n.profile_name for n in neighbors],
)
```
The output is a **before-running** estimate plus a confidence band derived from neighbor density. Profiles in well-explored regions of the fingerprint space get tight predictions; profiles with novel plugin combinations get wide predictions and are flagged as "exploration".
### Layer 3: Empirical Validation and Surprise Detection (after runs)
After actually running the benchmark, compare prediction to reality.
```python
def analyze_run(profile, prediction, actual):
overall_error = actual.score - prediction.predicted_score
surprises = []
for task_id, predicted_score in prediction.per_task.items():
actual_score = actual.per_task[task_id]
delta = actual_score - predicted_score
if abs(delta) > SURPRISE_THRESHOLD:
surprises.append(Surprise(
task_id=task_id,
predicted=predicted_score,
actual=actual_score,
direction="positive" if delta > 0 else "negative",
likely_cause=attribute_surprise(profile, task_id, delta),
))
historical_data.append((profile.fingerprint, actual))
if surprises:
flag_for_community_insights(profile, surprises)
return AnalysisReport(
calibration_error=overall_error,
surprises=surprises,
updated_attributions=recompute_attributions_with_new_datapoint(),
)
```
**Surprises are the highest-value output of the framework**, because they fall into three categories:
1. **Hidden utility**: a plugin performs better than its manifest predicts. This is a discovery event — the community should know.
2. **Manifest over-promise**: a plugin performs worse than its manifest predicts. This is a warning event — users should be cautioned.
3. **Emergent interaction**: a *combination* of plugins performs differently than the sum of their individual contributions. This is the gold standard finding — manifests cannot capture interactions, only empirical data can.
## Mathematical Tooling
The framework uses three established techniques, applied to a domain where they have not been used before. Each is included only because it answers a question that no simpler tool can.
### Functional ANOVA (fANOVA) for Factor Importance
**Question answered**: When a profile changes, which feature dimensions actually drive the score change?
Fit a Random Forest regressor `f: profile_features → score` over all submitted profiles. Apply functional ANOVA variance decomposition:
```
V(f) = Σᵢ Vᵢ + Σᵢ< Vᵢⱼ + higher-order terms
importance(featureᵢ) = Vᵢ / V(f)
interaction(featureᵢ, j) = Vᵢⱼ / V(f)
```
`Vᵢ` is the variance of `f` attributable to feature `i` alone; `Vᵢⱼ` is the variance attributable to the interaction of features `i` and `j` after their main effects are removed.
**Why this and not simpler statistics**: univariate correlations cannot reveal interactions. fANOVA handles mixed categorical and continuous features natively (via the Random Forest surrogate). Optuna ships an `FanovaImportanceEvaluator` and the original `fanova` package is the reference implementation. This technique is standard in hyperparameter optimization and AutoML; it has never been applied to agent configurations.
### k-Nearest-Neighbor Similarity for Cold-Start Prediction
**Question answered**: For a never-before-seen plugin combination, what should we expect?
Use Jaccard similarity over the categorical components of the fingerprint (capability sets, hook sets, tool families) and Euclidean distance over the continuous components (counts). Combine into a composite distance and run weighted k-NN.
**Why this and not a deep model**: cold start. The framework must produce useful output after 30 submissions, not 30,000. k-NN with a well-engineered similarity metric is the right tool when data is scarce and structure is interpretable. It also gives free explainability — the prediction comes with the names of the neighboring profiles that produced it.
### Taguchi Signal-to-Noise for Robustness
**Question answered**: Which configurations are robust across task tiers, not just optimal on average?
For a profile with per-task scores `y₁, y₂, ..., yₙ`, compute the larger-is-better signal-to-noise ratio:
```
S/N = -10 × log₁₀( (1/n) × Σᵢ (1/yᵢ²) )
```
Rank profiles separately by mean score and by S/N ratio. Surface both in the leaderboard.
**Why this and not just stddev**: the S/N ratio is dominated by the worst-performing tasks (because of the 1/yᵢ² term), which is exactly the behavior practitioners care about. A configuration that scores 0.85 on average but 0.10 on adversarial tasks is *worse* in production than one that scores 0.78 average but never drops below 0.65. Taguchi's framework, designed for manufacturing quality control under noise, maps cleanly onto agent benchmarking under task-distribution variation.
## What v0.5 Cuts From Earlier Drafts
This spec deliberately excludes techniques that were considered and rejected as gimmicks:
- **Shapley value attribution over scoring dimensions**: redundant with fANOVA at this scale; the marginal interpretation does not improve on variance decomposition for a few hand-chosen dimensions.
- **Process Reward Models trained on trajectory data**: requires a labeled trajectory dataset that does not exist; the v0.4 deterministic verifier already provides a strong outcome signal.
- **Graph Structural Similarity Index over action DAGs**: requires hand-authored reference DAGs per task; high maintenance, low signal beyond what trajectory property checks already capture.
- **Information Gain Rate over trajectories**: elegant but requires mid-trajectory assertion checkpointing that the harness does not support yet. Deferred to a future trajectory-quality spec.
- **Bayesian adaptive run allocation**: valuable but secondary; ship fixed-N first, add adaptive stopping after enough data exists to fit IRT-like priors.
- **2PL IRT over models**: misaligned with the v0.5 framing; the unit of measurement here is the *configuration*, not the model. IRT can be revisited once the configuration framework is established and there is enough data for a configuration-vs-task IRT fit.
The exclusion principle: every technique in v0.5 must answer a question that no simpler tool can answer. Math for its own sake is rejected.
## Submission Flow
```
1. User authors profile.yaml
2. ClawBench validates manifest compatibility for all referenced plugins
3. Layer 1: compute fingerprint, run dry-run sanity checks
4. Layer 2: query historical data, produce pre-run prediction (if data available)
5. User confirms intent to spend benchmark compute
6. Harness runs all v0.4 tasks with the submitted profile
7. v0.4 deterministic scoring produces per-task and aggregate scores
8. Layer 3: compare prediction to reality, detect surprises, update model
9. Generate Configuration Diagnostic Report
10. Optionally: store fingerprint and results in shared historical data
```
## Configuration Diagnostic Report
The user-facing output of a submission is the Configuration Diagnostic Report, not just a leaderboard score. Required sections:
1. **Score and rank**: overall score, confidence interval, percentile in the population of submissions
2. **Pre-run prediction vs. actual**: did the framework predict correctly? Calibration matters and should be visible.
3. **Plugin utilization audit**: for each plugin in the profile, was it actually invoked during the run? Plugins that loaded but were never called are flagged as dead weight.
4. **Manifest vs. reality gap**: for each plugin, did it impact the tasks its manifest suggested it would? Discrepancies are listed.
5. **Surprise list**: tasks where actual score deviated from prediction by more than the surprise threshold, with a hypothesis for the cause.
6. **Capability attributions**: estimated marginal contribution of each capability dimension to the overall score.
7. **Robustness profile**: mean, S/N ratio, worst-of-n, distribution across tiers.
8. **Recommendations**: ordered list of suggested profile changes with estimated score impact.
The Recommendations section is the prescriptive output that distinguishes ClawBench from descriptive leaderboards. Every recommendation must be backed by data — either neighbor profiles that already include the suggested plugin, or attribution estimates with explicit confidence.
## Community Insights
After accumulated submissions, ClawBench publishes ecosystem-level insights derived from the historical fingerprint database:
- **Plugin impact leaderboard**: average score delta when each plugin is added to comparable profiles
- **Strongest interactions**: plugin pairs whose joint contribution exceeds the sum of their marginals
- **Overhyped plugins**: plugins with high install counts on ClawHub but low or negative measured impact
- **Underrated plugins**: plugins with low install counts but high measured impact
- **Capability gaps**: task families where no submitted plugin combination scores above a threshold
These insights are computed automatically from accumulated runs. Plugin authors get empirical evidence of their plugin's value. Agent builders get data-driven recommendations. ClawHub gets a feedback loop from real benchmark results.
## Data Model
### `submissions/`
- `<profile_hash>.json` — full submission record
- `profile`: the submitted profile
- `fingerprint`: computed Profile Fingerprint
- `prediction`: pre-run prediction (if available)
- `actual`: per-task and aggregate scores from v0.4 verifier
- `analysis`: surprise list, calibration error, attributions
- `metadata`: submitter, timestamp, openclaw version, clawbench version
### `historical/`
- `fingerprints.parquet` — flat table of `(fingerprint_features, task_id, score)` for fast similarity search and fANOVA fitting
- `plugin_manifests.parquet` — cached manifest features per plugin id, refreshed on ClawHub sync
- `neighbors_index/` — pre-built ANN index over fingerprints for fast k-NN queries
### `insights/`
- `factor_importance.json` — current fANOVA decomposition
- `plugin_leaderboard.json` — plugin impact ranking
- `interactions.json` — discovered plugin interactions
- `gaps.json` — capability gaps across task families
## Phased Rollout
### Phase A: Profile Schema and Fingerprinting
- Define `profile.yaml` schema
- Implement `plugin_features` extraction from manifests
- Implement `profile_fingerprint` aggregation
- Store fingerprints alongside existing v0.4 results
- No prediction yet, no community features yet
### Phase B: Plugin Utilization Audit
- Annotate transcripts with plugin ownership of each tool call
- Detect plugins that loaded but were never invoked
- Add Plugin Utilization section to per-run reports
- This is valuable even before the prediction layer exists
### Phase C: Layer 2 Prediction
- Build k-NN index over accumulated fingerprints
- Implement pre-run prediction with confidence bands
- Add "predicted vs actual" calibration tracking
- Threshold to enable: 30+ distinct profile fingerprints in historical data
### Phase D: fANOVA and Community Insights
- Fit Random Forest surrogate over fingerprint features
- Compute factor importance and interaction terms
- Generate plugin leaderboard, overhyped/underrated lists
- Publish first ecosystem report
- Threshold to enable: 100+ distinct profile fingerprints
### Phase E: ClawHub Integration
- Sync ClawHub package metadata into manifest cache
- Allow profile submissions to reference `clawhub:<package>@<version>`
- Push back ClawBench impact scores as a ClawHub package field
- Enable plugin authors to claim their packages and view detailed performance reports
## Success Criteria for v0.5
ClawBench v0.5 is successful if:
1. The same model scored under different plugin profiles produces score differences larger than the differences between frontier models on the same profile. This validates that configuration matters and that the benchmark measures it.
2. Pre-run predictions for new profiles, after 100+ submissions, achieve mean absolute calibration error below 0.08.
3. At least three plugin interaction effects are discovered empirically that no plugin manifest predicted.
4. At least one ClawHub plugin is identified as overhyped (high installs, low measured impact) and at least one as underrated (low installs, high impact).
5. Plugin authors begin submitting profiles specifically to validate or showcase their plugins, indicating the benchmark has become a useful tool for the ecosystem.
6. All v0.4 deterministic guarantees are preserved: scores remain reproducible, the verifier remains the source of truth, and no LLM judge enters the primary scoring path.
## What This Is Not
ClawBench v0.5 is not a model leaderboard. It is not a scaffold beauty contest. It is not a marketplace. It is a measurement instrument for the open plugin ecosystem that OpenClaw enables — the first benchmark that can answer the question "what does this configuration actually contribute, and what should I change?" with evidence rather than intuition.

View File

@ -83,6 +83,39 @@ def cli(verbose: bool) -> None:
@click.option("--output", "-o", type=click.Path(), help="Output JSON file path")
@click.option("--no-randomize", is_flag=True, help="Run tasks in definition order")
@click.option("--upload", is_flag=True, help="Upload results to HF Dataset")
@click.option(
"--concurrency",
"-c",
default=1,
show_default=True,
type=int,
envvar="CLAWBENCH_CONCURRENCY",
help="Number of (task, run) work items to execute in parallel against the gateway. "
"Set to 4-8 for dramatic speedup. Browser tasks are still serialized.",
)
@click.option(
"--browser-concurrency",
default=1,
show_default=True,
type=int,
help="Maximum browser tasks to run concurrently. Should normally stay 1 — "
"Chromium uses a fixed port that does not parallelize.",
)
@click.option(
"--profile",
type=click.Path(exists=True, path_type=Path),
default=None,
help="Optional Plugin Profile YAML. When provided, after the benchmark run "
"completes the v0.5 Configuration Diagnostic Report is generated and "
"the run is recorded in the historical profile database.",
)
@click.option(
"--insights-dir",
type=click.Path(path_type=Path),
default=Path(".clawbench/insights"),
show_default=True,
help="Where to write ecosystem insight files after a --profile run.",
)
def run(
model: str,
gateway_token: str,
@ -100,6 +133,10 @@ def run(
output: str | None,
no_randomize: bool,
upload: bool,
concurrency: int,
browser_concurrency: int,
profile: Path | None,
insights_dir: Path,
) -> None:
gateway_config = GatewayConfig(token=gateway_token)
harness = BenchmarkHarness(
@ -117,6 +154,8 @@ def run(
official_only=official_only,
task_ids=list(task) if task else None,
randomize_order=not no_randomize,
concurrency=concurrency,
browser_concurrency=browser_concurrency,
)
result = asyncio.run(harness.run())
@ -126,12 +165,187 @@ def run(
json.dump(result.model_dump(), handle, indent=2)
click.echo(f"\nResults saved to {out_path}")
if profile is not None:
_run_v05_diagnostic(
profile_path=profile,
result=result,
runs_per_task=runs,
insights_dir=insights_dir,
)
if upload:
from clawbench.upload import upload_result
asyncio.run(upload_result(result))
def _run_v05_diagnostic(
*,
profile_path: Path,
result,
runs_per_task: int,
insights_dir: Path,
) -> None:
"""Post-benchmark v0.5 diagnostic: fingerprint + predict + record + publish."""
from clawbench.diagnose_cli import (
DEFAULT_DB_PATH,
DEFAULT_MANIFEST_DIR,
DEFAULT_SUBMISSIONS_DIR,
ensure_data_dirs,
load_manifests,
write_submission_record,
)
from clawbench.diagnostic import submit_run
from clawbench.insights import publish_insights
from clawbench.prediction import HistoricalDatabase
from clawbench.profile import PluginProfile
ensure_data_dirs()
plugin_profile = PluginProfile.from_yaml_file(profile_path)
plugin_ids = [e.id for e in plugin_profile.plugins]
manifests = load_manifests(DEFAULT_MANIFEST_DIR, plugin_ids)
db = HistoricalDatabase(path=DEFAULT_DB_PATH)
# Extract per-task scores + tier map from the BenchmarkResult
actual_per_task: dict[str, float] = {}
tier_of: dict[str, str] = {}
for task_stats in result.task_results:
actual_per_task[task_stats.task_id] = float(task_stats.mean_task_score)
if getattr(task_stats, "tier", ""):
tier_of[task_stats.task_id] = task_stats.tier
diagnostic = submit_run(
profile=plugin_profile,
manifests=manifests,
db=db,
actual_overall_score=float(result.overall_score),
actual_per_task_scores=actual_per_task,
tier_of=tier_of or None,
n_runs_contributing=runs_per_task,
)
write_submission_record(
DEFAULT_SUBMISSIONS_DIR,
diagnostic.fingerprint_hash,
diagnostic.to_dict(),
)
publish_insights(
db, insights_dir, factor_report=diagnostic.factor_analysis
)
click.echo("")
click.echo(diagnostic.render_text())
click.echo(
f"\nv0.5 diagnostic recorded for profile '{plugin_profile.name}' "
f"(fingerprint {diagnostic.fingerprint_hash}). "
f"Insights published to {insights_dir}."
)
@cli.command()
@click.argument("profile", type=click.Path(exists=True, path_type=Path))
@click.option(
"--results",
type=click.Path(path_type=Path),
default=None,
help="Optional v0.4 BenchmarkResult JSON; enables post-run analysis.",
)
@click.option(
"--manifests",
type=click.Path(path_type=Path),
default=Path(".clawbench/manifests"),
show_default=True,
help="Directory of plugin manifest JSON files.",
)
@click.option(
"--db",
type=click.Path(path_type=Path),
default=Path(".clawbench/historical/profile_runs.json"),
show_default=True,
help="Path to the historical profile database.",
)
@click.option(
"--insights-dir",
type=click.Path(path_type=Path),
default=Path(".clawbench/insights"),
show_default=True,
)
@click.option("--json-out", is_flag=True, help="Print diagnostic as JSON")
def diagnose(
profile: Path,
results: Path | None,
manifests: Path,
db: Path,
insights_dir: Path,
json_out: bool,
) -> None:
"""Run the ClawBench v0.5 Configuration Diagnostic for a plugin profile."""
from clawbench.diagnose_cli import (
DEFAULT_SUBMISSIONS_DIR,
ensure_data_dirs,
load_manifests,
write_submission_record,
)
from clawbench.diagnostic import build_diagnostic, submit_run
from clawbench.insights import publish_insights
from clawbench.prediction import HistoricalDatabase
from clawbench.profile import PluginProfile
from clawbench.schemas import BenchmarkResult
ensure_data_dirs()
plugin_profile = PluginProfile.from_yaml_file(profile)
plugin_ids = [e.id for e in plugin_profile.plugins]
manifest_map = load_manifests(manifests, plugin_ids)
database = HistoricalDatabase(path=db)
actual_overall: float | None = None
actual_per_task: dict[str, float] | None = None
tier_of: dict[str, str] | None = None
if results is not None:
with open(results, encoding="utf-8") as handle:
raw = json.load(handle)
br = BenchmarkResult(**raw)
actual_overall = float(br.overall_score)
actual_per_task = {
ts.task_id: float(ts.mean_task_score) for ts in br.task_results
}
tier_of = {
ts.task_id: ts.tier for ts in br.task_results if getattr(ts, "tier", "")
}
if results is not None and actual_per_task is not None and actual_overall is not None:
report = submit_run(
profile=plugin_profile,
manifests=manifest_map,
db=database,
actual_overall_score=actual_overall,
actual_per_task_scores=actual_per_task,
tier_of=tier_of,
)
publish_insights(database, insights_dir, factor_report=report.factor_analysis)
else:
report = build_diagnostic(
profile=plugin_profile,
manifests=manifest_map,
db=database,
actual_overall_score=actual_overall,
actual_per_task_scores=actual_per_task,
tier_of=tier_of,
)
write_submission_record(
DEFAULT_SUBMISSIONS_DIR, report.fingerprint_hash, report.to_dict()
)
if json_out:
click.echo(json.dumps(report.to_dict(), indent=2, default=str))
else:
click.echo(report.render_text())
@cli.command()
@click.option("--tasks-dir", type=click.Path(exists=True), help="Custom tasks directory")
@click.option("--scenario", type=click.Choice(SCENARIO_CHOICES), help="Filter query scenario")

244
clawbench/diagnose_cli.py Normal file
View File

@ -0,0 +1,244 @@
"""ClawBench v0.5 — `clawbench-diagnose` CLI.
Usage:
python -m clawbench.diagnose_cli <profile.yaml>
[--db <path>]
[--manifests <dir>]
[--results <results.json>]
[--transcripts <transcripts.json>]
[--tier-map <tier_map.json>]
[--insights-dir <dir>]
[--no-record]
[--json]
Without --results, the tool runs in PRE-RUN PREDICTION mode:
- parses the profile
- computes the fingerprint
- looks up neighbors in the historical database
- prints a predictive diagnostic (no actual scores yet)
With --results, the tool runs in POST-RUN ANALYSIS mode:
- everything above
- plus surprise detection against the actual results
- plus robustness profile, plugin utilization audit,
manifest-vs-reality gap, and recommendations (when transcripts given)
- plus ecosystem insight files published to --insights-dir
- plus appends the run to the historical database
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from clawbench.diagnostic import build_diagnostic, submit_run
from clawbench.insights import publish_insights
from clawbench.prediction import HistoricalDatabase
from clawbench.profile import PluginManifest, PluginProfile, RegistrationTrace
from clawbench.schemas import Transcript
DEFAULT_CLAWBENCH_ROOT = Path(".clawbench")
DEFAULT_DB_PATH = DEFAULT_CLAWBENCH_ROOT / "historical" / "profile_runs.json"
DEFAULT_MANIFEST_DIR = DEFAULT_CLAWBENCH_ROOT / "manifests"
DEFAULT_INSIGHTS_DIR = DEFAULT_CLAWBENCH_ROOT / "insights"
DEFAULT_SUBMISSIONS_DIR = DEFAULT_CLAWBENCH_ROOT / "submissions"
def ensure_data_dirs(root: Path = DEFAULT_CLAWBENCH_ROOT) -> None:
"""Create the v0.5 data model directories if they do not exist."""
(root / "historical").mkdir(parents=True, exist_ok=True)
(root / "manifests").mkdir(parents=True, exist_ok=True)
(root / "insights").mkdir(parents=True, exist_ok=True)
(root / "submissions").mkdir(parents=True, exist_ok=True)
def load_manifests(manifest_dir: Path, plugin_ids: list[str]) -> dict[str, PluginManifest]:
out: dict[str, PluginManifest] = {}
if not manifest_dir.exists():
return out
for pid in plugin_ids:
candidate = manifest_dir / f"{pid}.json"
if candidate.exists():
out[pid] = PluginManifest.from_file(candidate)
return out
def load_transcripts(path: Path) -> dict[str, Transcript]:
"""Load per-task transcripts from a JSON file.
Expected shape: {"<task_id>": <transcript_dict>, ...}
Each transcript_dict must be valid for `Transcript.model_validate`.
"""
data = json.loads(path.read_text(encoding="utf-8"))
out: dict[str, Transcript] = {}
if isinstance(data, dict):
for task_id, raw in data.items():
out[str(task_id)] = Transcript.model_validate(raw)
return out
def write_submission_record(
submissions_dir: Path, fingerprint_hash: str, report_dict: dict
) -> Path:
submissions_dir.mkdir(parents=True, exist_ok=True)
path = submissions_dir / f"{fingerprint_hash}.json"
path.write_text(json.dumps(report_dict, indent=2, default=str), encoding="utf-8")
return path
def main() -> None:
parser = argparse.ArgumentParser(
description="ClawBench v0.5 plugin profile diagnostic"
)
parser.add_argument("profile", type=Path, help="Path to profile YAML")
parser.add_argument(
"--db",
type=Path,
default=DEFAULT_DB_PATH,
help="Path to historical database JSON",
)
parser.add_argument(
"--manifests",
type=Path,
default=DEFAULT_MANIFEST_DIR,
help="Directory containing plugin manifest JSON files",
)
parser.add_argument(
"--insights-dir",
type=Path,
default=DEFAULT_INSIGHTS_DIR,
help="Directory to write ecosystem insight files to after a post-run analysis",
)
parser.add_argument(
"--submissions-dir",
type=Path,
default=DEFAULT_SUBMISSIONS_DIR,
help="Directory to write per-submission diagnostic JSON files to",
)
parser.add_argument(
"--results",
type=Path,
default=None,
help="Optional path to actual benchmark results JSON; enables post-run mode",
)
parser.add_argument(
"--transcripts",
type=Path,
default=None,
help="Optional path to per-task transcripts JSON (enables utilization audit)",
)
parser.add_argument(
"--tier-map",
type=Path,
default=None,
help="Optional path to {task_id: tier} JSON map for per-tier robustness",
)
parser.add_argument(
"--no-record",
action="store_true",
help="Don't record this run in the historical database",
)
parser.add_argument(
"--no-publish-insights",
action="store_true",
help="Don't write ecosystem insight files after a post-run analysis",
)
parser.add_argument(
"--json",
action="store_true",
help="Emit JSON instead of text",
)
args = parser.parse_args()
if not args.profile.exists():
print(f"error: profile {args.profile} does not exist", file=sys.stderr)
sys.exit(2)
ensure_data_dirs()
profile = PluginProfile.from_yaml_file(args.profile)
plugin_ids = [e.id for e in profile.plugins]
manifests = load_manifests(args.manifests, plugin_ids)
db = HistoricalDatabase(path=args.db)
actual_overall: float | None = None
actual_per_task: dict[str, float] | None = None
if args.results:
if not args.results.exists():
print(f"error: results file {args.results} does not exist", file=sys.stderr)
sys.exit(2)
results_data = json.loads(args.results.read_text(encoding="utf-8"))
actual_overall = float(results_data.get("overall_score", 0.0))
actual_per_task = {
k: float(v) for k, v in results_data.get("per_task_score", {}).items()
}
transcripts: dict[str, Transcript] | None = None
if args.transcripts:
if not args.transcripts.exists():
print(
f"error: transcripts file {args.transcripts} does not exist",
file=sys.stderr,
)
sys.exit(2)
transcripts = load_transcripts(args.transcripts)
tier_of: dict[str, str] | None = None
if args.tier_map:
if not args.tier_map.exists():
print(
f"error: tier map {args.tier_map} does not exist",
file=sys.stderr,
)
sys.exit(2)
tier_of = {
str(k): str(v)
for k, v in json.loads(
args.tier_map.read_text(encoding="utf-8")
).items()
}
if args.results and not args.no_record and actual_per_task is not None and actual_overall is not None:
report = submit_run(
profile=profile,
manifests=manifests,
db=db,
actual_overall_score=actual_overall,
actual_per_task_scores=actual_per_task,
transcripts=transcripts,
tier_of=tier_of,
)
# Publish ecosystem insights after inserting the new run
if not args.no_publish_insights:
publish_insights(
db, args.insights_dir, factor_report=report.factor_analysis
)
else:
report = build_diagnostic(
profile=profile,
manifests=manifests,
db=db,
actual_overall_score=actual_overall,
actual_per_task_scores=actual_per_task,
transcripts=transcripts,
tier_of=tier_of,
)
report_dict = report.to_dict()
# Persist per-submission record
write_submission_record(
args.submissions_dir, report.fingerprint_hash, report_dict
)
if args.json:
print(json.dumps(report_dict, indent=2, default=str))
else:
print(report.render_text())
if __name__ == "__main__":
main()

476
clawbench/diagnostic.py Normal file
View File

@ -0,0 +1,476 @@
"""ClawBench v0.5 — Configuration Diagnostic Report.
End-to-end glue that ties together:
- profile.py (parse + fingerprint a submission)
- prediction.py (k-NN cold-start prediction + surprise attribution)
- factor_analysis.py (fANOVA ecosystem insights, RF or lite)
- utilization.py (plugin utilization audit + manifest-vs-reality gap)
- recommendations.py (prescriptive profile changes)
- stats.py (Taguchi S/N robustness profile)
- insights.py (ecosystem insight file publishing)
- existing v0.4 scoring (the deterministic ground truth)
This module is the user-facing entry point. It produces the Configuration
Diagnostic Report that distinguishes ClawBench from descriptive
leaderboards.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any
from clawbench.factor_analysis import FactorAnalysisReport, analyze
from clawbench.prediction import (
HistoricalDatabase,
HistoricalRun,
PredictionReport,
attribute_surprise,
predict_profile,
)
from clawbench.profile import (
PluginManifest,
PluginProfile,
ProfileFingerprint,
RegistrationTrace,
)
from clawbench.recommendations import (
RecommendationSet,
generate_recommendations,
)
from clawbench.schemas import Transcript
from clawbench.stats import RobustnessProfile, compute_robustness_profile
from clawbench.utilization import (
ManifestRealityReport,
UtilizationReport,
audit_plugin_utilization,
compute_manifest_reality_gap,
)
@dataclass
class Surprise:
task_id: str
predicted: float
actual: float
delta: float
direction: str # "positive" or "negative"
likely_cause: str = ""
@dataclass
class DiagnosticReport:
profile_name: str
base_model: str
fingerprint_hash: str
overall_score: float | None
predicted_score: float
prediction_confidence: float
calibration_error: float | None # |actual - predicted| when both known
n_neighbors_used: int
neighbor_names: list[str]
surprises: list[Surprise]
capability_attributions: dict[str, float]
factor_analysis: FactorAnalysisReport | None
fingerprint_summary: dict[str, Any]
robustness_profile: RobustnessProfile | None
utilization: UtilizationReport | None
manifest_reality: ManifestRealityReport | None
recommendations: RecommendationSet | None
calibration_history: dict[str, Any]
notes: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"profile_name": self.profile_name,
"base_model": self.base_model,
"fingerprint_hash": self.fingerprint_hash,
"overall_score": self.overall_score,
"predicted_score": self.predicted_score,
"prediction_confidence": self.prediction_confidence,
"calibration_error": self.calibration_error,
"n_neighbors_used": self.n_neighbors_used,
"neighbor_names": self.neighbor_names,
"surprises": [asdict(s) for s in self.surprises],
"capability_attributions": self.capability_attributions,
"factor_analysis": self.factor_analysis.to_dict() if self.factor_analysis else None,
"fingerprint_summary": self.fingerprint_summary,
"robustness_profile": self.robustness_profile.to_dict() if self.robustness_profile else None,
"utilization": self.utilization.to_dict() if self.utilization else None,
"manifest_reality": self.manifest_reality.to_dict() if self.manifest_reality else None,
"recommendations": self.recommendations.to_dict() if self.recommendations else None,
"calibration_history": self.calibration_history,
"notes": self.notes,
}
def render_text(self) -> str:
"""Render a human-readable text report."""
lines = []
lines.append("" * 70)
lines.append(f" ClawBench Configuration Diagnostic: {self.profile_name}")
lines.append("" * 70)
lines.append("")
lines.append(f" Base model: {self.base_model}")
lines.append(f" Fingerprint hash: {self.fingerprint_hash}")
if self.overall_score is not None:
lines.append(f" Actual score: {self.overall_score:.3f}")
lines.append(
f" Predicted score: {self.predicted_score:.3f} "
f"(confidence {self.prediction_confidence:.2f})"
)
if self.calibration_error is not None:
lines.append(f" Calibration error: {self.calibration_error:+.3f}")
if self.n_neighbors_used:
lines.append(
f" Based on {self.n_neighbors_used} similar profiles: "
f"{', '.join(self.neighbor_names[:5])}"
)
lines.append("")
# Fingerprint summary
lines.append("─ Plugin Profile Fingerprint " + "" * 40)
for k, v in self.fingerprint_summary.items():
if isinstance(v, list):
v_str = ", ".join(v) if v else "(none)"
else:
v_str = str(v)
lines.append(f" {k:24} {v_str}")
lines.append("")
# Robustness profile (Taguchi)
if self.robustness_profile and self.robustness_profile.n_tasks:
rp = self.robustness_profile
lines.append("─ Robustness Profile (Taguchi S/N) " + "" * 34)
lines.append(
f" Mean {rp.mean:.3f} Worst {rp.worst_of_n:.3f} "
f"Best {rp.best_of_n:.3f} σ {rp.stddev:.3f}"
)
lines.append(
f" S/N ratio (larger-is-better): {rp.sn_ratio_db:+.2f} dB"
)
if rp.tier_means:
lines.append(" Per-tier means:")
for tier, mean in sorted(rp.tier_means.items()):
lines.append(f" {tier:12} {mean:.3f}")
lines.append("")
# Plugin Utilization Audit
if self.utilization is not None:
u = self.utilization
lines.append("─ Plugin Utilization Audit " + "" * 42)
lines.append(
f" {u.n_invoked}/{u.n_plugins} plugins invoked "
f"({u.utilization_rate:.0%}) "
f"dead weight: {u.n_dead_weight} "
f"unassigned calls: {u.unassigned_tool_calls}"
)
for p in u.per_plugin:
marker = "" if p.invoked else "·"
status = f"{p.invocation_count:>4} calls" if p.invoked else "DEAD WEIGHT"
lines.append(
f" {marker} {p.plugin_id:36} {status:14} "
f"{len(p.task_ids_with_invocation)} tasks"
)
lines.append("")
# Manifest vs Reality gap
if self.manifest_reality and self.manifest_reality.per_plugin:
lines.append("─ Manifest vs Reality Gap " + "" * 43)
for g in self.manifest_reality.per_plugin:
lines.append(
f" {g.plugin_id:30} coverage {g.claim_coverage:.0%}"
)
if g.unused_capabilities:
lines.append(
f" ├─ claimed but unused: {', '.join(g.unused_capabilities)}"
)
if g.unclaimed_capabilities:
lines.append(
f" └─ observed but not in manifest: {', '.join(g.unclaimed_capabilities)}"
)
lines.append("")
# Surprises with cause attribution
if self.surprises:
lines.append("─ Surprises (predicted vs actual) " + "" * 36)
for s in self.surprises[:10]:
arrow = "" if s.direction == "positive" else ""
lines.append(
f" {arrow} {s.task_id:40} predicted {s.predicted:.2f} "
f"actual {s.actual:.2f} Δ {s.delta:+.2f}"
)
if s.likely_cause:
lines.append(f"{s.likely_cause}")
lines.append("")
# Capability attributions
if self.capability_attributions:
lines.append("─ Capability Attributions " + "" * 44)
sorted_attrs = sorted(
self.capability_attributions.items(),
key=lambda x: abs(x[1]),
reverse=True,
)
for cap, delta in sorted_attrs[:10]:
sign = "+" if delta >= 0 else ""
lines.append(f" {cap:40} {sign}{delta:.3f}")
lines.append("")
# Recommendations (the prescriptive output)
if self.recommendations and self.recommendations.recommendations:
lines.append("─ Recommendations " + "" * 51)
for rec in self.recommendations.recommendations:
delta_sign = "+" if rec.estimated_delta >= 0 else ""
lines.append(
f" [{rec.kind}] {rec.target}"
)
lines.append(
f" Δ {delta_sign}{rec.estimated_delta:.3f} "
f"confidence {rec.confidence:.2f}"
)
lines.append(f" reason: {rec.rationale}")
for ev in rec.evidence[:3]:
lines.append(f"{ev}")
lines.append("")
elif self.recommendations and self.recommendations.note:
lines.append("─ Recommendations " + "" * 51)
lines.append(f" {self.recommendations.note}")
lines.append("")
# Factor analysis (ecosystem-level)
if self.factor_analysis and self.factor_analysis.main_effects:
header = (
f"─ Ecosystem Factor Analysis "
f"[{self.factor_analysis.method}] "
f"({self.factor_analysis.n_runs} runs) "
)
lines.append(header + "" * max(0, 70 - len(header)))
for me in self.factor_analysis.main_effects[:10]:
bar = "" * int(me.importance * 30)
lines.append(
f" {me.feature:36} {bar:30} {me.importance:.3f}{me.delta:+.2f})"
)
if self.factor_analysis.interactions:
lines.append("")
lines.append(" Strongest interactions:")
for inter in self.factor_analysis.interactions:
lines.append(
f" {inter.feature_a} × {inter.feature_b}"
f"residual {inter.interaction_strength:+.3f}"
)
lines.append("")
# Calibration history
if self.calibration_history and self.calibration_history.get("n", 0) > 0:
ch = self.calibration_history
lines.append("─ Calibration History " + "" * 47)
lines.append(
f" n={ch['n']} MAE {ch['mae']:.3f} "
f"RMSE {ch['rmse']:.3f} bias {ch['bias']:+.3f}"
)
if ch.get("mae_target_met"):
lines.append(" ✓ v0.5 success criterion met (MAE < 0.08 at n≥100)")
lines.append("")
if self.notes:
lines.append("─ Notes " + "" * 60)
for n in self.notes:
lines.append(f"{n}")
lines.append("")
lines.append("" * 70)
return "\n".join(lines)
SURPRISE_THRESHOLD = 0.15
def build_diagnostic(
profile: PluginProfile,
manifests: dict[str, PluginManifest],
db: HistoricalDatabase,
actual_overall_score: float | None = None,
actual_per_task_scores: dict[str, float] | None = None,
traces: dict[str, RegistrationTrace] | None = None,
transcripts: dict[str, Transcript] | None = None,
tier_of: dict[str, str] | None = None,
enable_factor_analysis: bool = True,
) -> DiagnosticReport:
"""Build a diagnostic report for a Plugin Profile.
Parameters
----------
profile, manifests, db, actual_*, traces
Same as before.
transcripts : dict[task_id, Transcript] | None
Per-task transcripts captured by the harness. Required for the
Plugin Utilization Audit and the Manifest-vs-Reality Gap; both
sections are omitted when transcripts are absent.
tier_of : dict[task_id, tier_name] | None
Optional task tier mapping used to compute per-tier means in
the robustness profile.
enable_factor_analysis : bool
Run factor analysis on the historical database. Default True.
If `actual_*` are None, the report is purely predictive (pre-run).
If actuals are provided, the report includes calibration error,
surprise detection, robustness profile, and recommendations.
"""
fingerprint = ProfileFingerprint.from_profile(profile, manifests, traces)
prediction = predict_profile(fingerprint, db)
surprises: list[Surprise] = []
if actual_per_task_scores is not None:
for task_id, predicted in prediction.predicted_per_task.items():
actual = actual_per_task_scores.get(task_id)
if actual is None:
continue
delta = actual - predicted
if abs(delta) >= SURPRISE_THRESHOLD:
cause = attribute_surprise(fingerprint, task_id, delta, db)
surprises.append(Surprise(
task_id=task_id,
predicted=round(predicted, 4),
actual=round(actual, 4),
delta=round(delta, 4),
direction="positive" if delta > 0 else "negative",
likely_cause=cause,
))
factor = None
if enable_factor_analysis:
factor = analyze(db)
# Robustness profile (Taguchi S/N + per-tier means)
robustness = None
if actual_per_task_scores:
robustness = compute_robustness_profile(
actual_per_task_scores, tier_of=tier_of
)
# Plugin Utilization Audit + Manifest-vs-Reality gap
utilization = None
manifest_reality = None
if transcripts:
utilization = audit_plugin_utilization(
profile=profile,
transcripts=transcripts,
manifests=manifests,
traces=traces,
)
manifest_reality = compute_manifest_reality_gap(
profile=profile,
manifests=manifests,
utilization=utilization,
)
# Recommendations
recommendations = generate_recommendations(
fingerprint=fingerprint,
db=db,
factor=factor,
utilization=utilization,
)
# Calibration error for this single run (if actual provided)
calibration_error = None
if actual_overall_score is not None:
calibration_error = round(
actual_overall_score - prediction.predicted_overall_score, 4
)
# Running calibration history from the database
calibration_history = db.calibration_metrics()
notes: list[str] = []
if len(db) < 30:
notes.append(
f"historical database has only {len(db)} runs — predictions are weak. "
"Calibration improves once 30+ profiles are submitted."
)
if not factor or not factor.main_effects:
notes.append("factor analysis inactive — needs ≥4 distinct profiles.")
if transcripts is None:
notes.append(
"transcripts not provided — plugin utilization audit and "
"manifest-vs-reality gap skipped."
)
fingerprint_summary = {
"n_plugins": fingerprint.n_plugins,
"n_clawhub": fingerprint.n_clawhub_plugins,
"n_custom": fingerprint.n_custom_plugins,
"memory_slot": fingerprint.memory_slot or "(none)",
"context_engine_slot": fingerprint.context_engine_slot or "(none)",
"capability_coverage": fingerprint.capability_coverage,
"hook_footprint": fingerprint.hook_footprint,
"tool_family_surface": fingerprint.tool_family_surface,
"n_tools_total": fingerprint.n_tools_total,
"n_hooks_total": fingerprint.n_hooks_total,
}
return DiagnosticReport(
profile_name=profile.name,
base_model=profile.base_model,
fingerprint_hash=fingerprint.fingerprint_hash,
overall_score=actual_overall_score,
predicted_score=prediction.predicted_overall_score,
prediction_confidence=prediction.confidence,
calibration_error=calibration_error,
n_neighbors_used=prediction.n_neighbors_used,
neighbor_names=prediction.neighbor_names,
surprises=surprises,
capability_attributions=prediction.capability_attributions,
factor_analysis=factor,
fingerprint_summary=fingerprint_summary,
robustness_profile=robustness,
utilization=utilization,
manifest_reality=manifest_reality,
recommendations=recommendations,
calibration_history=calibration_history,
notes=notes,
)
def submit_run(
profile: PluginProfile,
manifests: dict[str, PluginManifest],
db: HistoricalDatabase,
actual_overall_score: float,
actual_per_task_scores: dict[str, float],
traces: dict[str, RegistrationTrace] | None = None,
transcripts: dict[str, Transcript] | None = None,
tier_of: dict[str, str] | None = None,
n_runs_contributing: int = 1,
) -> DiagnosticReport:
"""Full submission flow: build diagnostic, then add to historical DB.
The prediction computed BEFORE the run is recorded alongside the
actual score, so the calibration tracker can report MAE over time.
"""
# Capture the pre-run prediction before inserting anything
fingerprint = ProfileFingerprint.from_profile(profile, manifests, traces)
pre_prediction = predict_profile(fingerprint, db)
report = build_diagnostic(
profile=profile,
manifests=manifests,
db=db,
actual_overall_score=actual_overall_score,
actual_per_task_scores=actual_per_task_scores,
traces=traces,
transcripts=transcripts,
tier_of=tier_of,
)
db.add(HistoricalRun(
profile_name=profile.name,
fingerprint=fingerprint,
overall_score=actual_overall_score,
per_task_score=actual_per_task_scores,
predicted_score_at_submission=pre_prediction.predicted_overall_score,
prediction_confidence_at_submission=pre_prediction.confidence,
n_runs_contributing=n_runs_contributing,
))
return report

View File

@ -0,0 +1,365 @@
"""ClawBench v0.5 — Factor importance analysis.
After enough historical Plugin Profile runs accumulate, we can decompose
the variance of overall score across submissions into contributions from
each fingerprint feature and the most important pairwise interactions.
Two implementations are provided:
1. **Full fANOVA (Hutter, Hoos, Leyton-Brown, ICML 2014)** fits a
Random Forest surrogate and integrates marginal effects over the
joint feature distribution. Activated automatically when scikit-learn
is available and the database has at least MIN_RUNS_FOR_RF runs.
2. **fANOVA-lite fallback** used when sklearn is unavailable or the
database is too small for a stable Random Forest fit. Uses a
lightweight variance-decomposition approximation:
- For each binary fingerprint feature, computes the difference in
mean score between profiles WITH and WITHOUT the feature, weighted
by sample sizes.
- Computes the variance attributable to that feature using the
standard one-way ANOVA decomposition: SSB / SST.
- For pairwise interactions, computes the residual after subtracting
additive marginal effects.
The lite path is correct under the random-configuration-sampling regime
ClawBench operates in. The Random Forest path is strictly more capable
when data volume permits.
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from itertools import combinations
from clawbench.prediction import HistoricalDatabase
from clawbench.profile import KNOWN_HOOKS, TOOL_FAMILIES, CONTRACT_KEYS, _snake
# Try to load sklearn for the full Random Forest fANOVA path. If it's
# not available we transparently fall back to the lite implementation.
try:
import numpy as _np # noqa: F401
from sklearn.ensemble import RandomForestRegressor # type: ignore
_SKLEARN_AVAILABLE = True
except Exception: # pragma: no cover - sklearn is an optional dep
_SKLEARN_AVAILABLE = False
# The Random Forest surrogate needs enough datapoints to give stable
# feature importances. Below this we use the lite path regardless.
MIN_RUNS_FOR_RF = 20
@dataclass
class FactorImportance:
feature: str
importance: float # variance fraction (0..1)
mean_with: float
mean_without: float
n_with: int
n_without: int
delta: float
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class InteractionImportance:
feature_a: str
feature_b: str
interaction_strength: float # residual after additive marginals
mean_both: float
mean_neither: float
mean_only_a: float
mean_only_b: float
n_total: int
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class FactorAnalysisReport:
n_runs: int
total_variance: float
main_effects: list[FactorImportance]
interactions: list[InteractionImportance]
note: str = ""
method: str = "fanova_lite" # "fanova_lite" | "random_forest_fanova"
def to_dict(self) -> dict:
return {
"n_runs": self.n_runs,
"total_variance": self.total_variance,
"main_effects": [m.to_dict() for m in self.main_effects],
"interactions": [i.to_dict() for i in self.interactions],
"note": self.note,
"method": self.method,
}
def _binary_features(fingerprint) -> dict[str, bool]:
"""Lift the fingerprint into a flat dict of boolean features for analysis."""
out: dict[str, bool] = {}
for key in CONTRACT_KEYS:
out[f"capability:{_snake(key)}"] = _snake(key) in fingerprint.capability_coverage
for hook in KNOWN_HOOKS:
out[f"hook:{hook}"] = hook in fingerprint.hook_footprint
for family in TOOL_FAMILIES:
out[f"tool_family:{family}"] = family in fingerprint.tool_family_surface
if fingerprint.memory_slot:
out[f"slot:memory={fingerprint.memory_slot}"] = True
if fingerprint.context_engine_slot:
out[f"slot:context_engine={fingerprint.context_engine_slot}"] = True
return out
def analyze(
db: HistoricalDatabase,
top_k_interactions: int = 5,
*,
prefer_random_forest: bool = True,
) -> FactorAnalysisReport:
"""Factor-importance analysis over the historical profile database.
Dispatches to the Random Forest fANOVA implementation when sklearn is
available and the database has MIN_RUNS_FOR_RF runs. Falls back to
the fANOVA-lite variance decomposition otherwise.
"""
if len(db) < 4:
return FactorAnalysisReport(
n_runs=len(db),
total_variance=0.0,
main_effects=[],
interactions=[],
note="not enough runs (need ≥4) for factor analysis",
method="fanova_lite",
)
if (
prefer_random_forest
and _SKLEARN_AVAILABLE
and len(db) >= MIN_RUNS_FOR_RF
):
return _analyze_random_forest(db, top_k_interactions=top_k_interactions)
return _analyze_lite(db, top_k_interactions=top_k_interactions)
def _analyze_lite(
db: HistoricalDatabase, top_k_interactions: int = 5
) -> FactorAnalysisReport:
# Build the joint table: list of (features_dict, score)
table: list[tuple[dict[str, bool], float]] = []
for run in db.runs:
feats = _binary_features(run.fingerprint)
table.append((feats, run.overall_score))
scores = [score for _, score in table]
grand_mean = sum(scores) / len(scores)
total_variance = sum((s - grand_mean) ** 2 for s in scores) / max(1, len(scores) - 1)
if total_variance < 1e-9:
return FactorAnalysisReport(
n_runs=len(db),
total_variance=total_variance,
main_effects=[],
interactions=[],
note="zero variance across runs — all profiles scored identically",
)
all_features: set[str] = set()
for feats, _ in table:
all_features.update(feats.keys())
main_effects: list[FactorImportance] = []
for feature in sorted(all_features):
with_scores = [s for f, s in table if f.get(feature, False)]
without_scores = [s for f, s in table if not f.get(feature, False)]
if not with_scores or not without_scores:
continue
mean_with = sum(with_scores) / len(with_scores)
mean_without = sum(without_scores) / len(without_scores)
delta = mean_with - mean_without
# SSB = n_with*(mean_with-grand)^2 + n_without*(mean_without-grand)^2
ssb = (
len(with_scores) * (mean_with - grand_mean) ** 2
+ len(without_scores) * (mean_without - grand_mean) ** 2
)
sst = total_variance * (len(scores) - 1)
importance = ssb / sst if sst > 0 else 0.0
main_effects.append(FactorImportance(
feature=feature,
importance=round(importance, 4),
mean_with=round(mean_with, 4),
mean_without=round(mean_without, 4),
n_with=len(with_scores),
n_without=len(without_scores),
delta=round(delta, 4),
))
main_effects.sort(key=lambda m: m.importance, reverse=True)
# Pairwise interactions (only the top-k by absolute residual)
me_lookup = {m.feature: m for m in main_effects}
candidates = [m.feature for m in main_effects[:20]] # cap to prevent explosion
interactions: list[InteractionImportance] = []
for fa, fb in combinations(candidates, 2):
both = [s for f, s in table if f.get(fa) and f.get(fb)]
neither = [s for f, s in table if not f.get(fa) and not f.get(fb)]
only_a = [s for f, s in table if f.get(fa) and not f.get(fb)]
only_b = [s for f, s in table if not f.get(fa) and f.get(fb)]
if not both or not neither or not only_a or not only_b:
continue
mb = sum(both) / len(both)
mn = sum(neither) / len(neither)
ma_only = sum(only_a) / len(only_a)
mb_only = sum(only_b) / len(only_b)
# Additive prediction = neither + (only_a - neither) + (only_b - neither)
additive_pred = ma_only + mb_only - mn
residual = abs(mb - additive_pred)
interactions.append(InteractionImportance(
feature_a=fa,
feature_b=fb,
interaction_strength=round(residual, 4),
mean_both=round(mb, 4),
mean_neither=round(mn, 4),
mean_only_a=round(ma_only, 4),
mean_only_b=round(mb_only, 4),
n_total=len(both) + len(neither) + len(only_a) + len(only_b),
))
interactions.sort(key=lambda i: i.interaction_strength, reverse=True)
return FactorAnalysisReport(
n_runs=len(db),
total_variance=round(total_variance, 6),
main_effects=main_effects,
interactions=interactions[:top_k_interactions],
method="fanova_lite",
)
def _analyze_random_forest(
db: HistoricalDatabase, top_k_interactions: int = 5
) -> FactorAnalysisReport:
"""Random Forest surrogate + variance-decomposition fANOVA.
Closer to the Hutter-Hoos-Leyton-Brown 2014 formulation: we fit a
Random Forest on the binary feature matrix, then use the forest's
permutation importance as the main-effect importance, and a
pairwise-permutation residual as the interaction strength.
This is not an exact port of the original fANOVA package (which
integrates marginal effects over partition trees), but it is a
sklearn-native approximation that produces comparable importances
and scales to tens of thousands of submissions. The full Hutter
implementation can be plugged in later without breaking callers.
"""
import numpy as np # local import to keep the lite path pure-python
# Build the joint table
table: list[tuple[dict[str, bool], float]] = []
for run in db.runs:
feats = _binary_features(run.fingerprint)
table.append((feats, run.overall_score))
all_features = sorted({f for feats, _ in table for f in feats.keys()})
n_samples = len(table)
n_features = len(all_features)
X = np.zeros((n_samples, n_features), dtype=float)
y = np.zeros(n_samples, dtype=float)
for i, (feats, score) in enumerate(table):
y[i] = score
for j, fname in enumerate(all_features):
X[i, j] = 1.0 if feats.get(fname, False) else 0.0
grand_mean = float(y.mean())
total_variance = float(y.var(ddof=1)) if n_samples > 1 else 0.0
if total_variance < 1e-9:
return FactorAnalysisReport(
n_runs=n_samples,
total_variance=total_variance,
main_effects=[],
interactions=[],
note="zero variance across runs — all profiles scored identically",
method="random_forest_fanova",
)
# Fit a Random Forest surrogate. Hyperparameters chosen to be robust
# at small-to-medium sample sizes; the forest does not need to be
# deep because features are binary.
rf = RandomForestRegressor(
n_estimators=200,
max_depth=None,
min_samples_leaf=2,
random_state=42,
n_jobs=-1,
)
rf.fit(X, y)
# Main effects from the forest's impurity-based feature importance,
# rescaled so the reported "importance" is a variance fraction
# consistent with the lite path.
raw_importances = rf.feature_importances_
total_importance = float(raw_importances.sum()) or 1.0
main_effects: list[FactorImportance] = []
for j, feature in enumerate(all_features):
mask_with = X[:, j] > 0.5
mask_without = ~mask_with
if mask_with.sum() == 0 or mask_without.sum() == 0:
continue
mean_with = float(y[mask_with].mean())
mean_without = float(y[mask_without].mean())
delta = mean_with - mean_without
importance = float(raw_importances[j]) / total_importance
main_effects.append(FactorImportance(
feature=feature,
importance=round(importance, 4),
mean_with=round(mean_with, 4),
mean_without=round(mean_without, 4),
n_with=int(mask_with.sum()),
n_without=int(mask_without.sum()),
delta=round(delta, 4),
))
main_effects.sort(key=lambda m: m.importance, reverse=True)
# Pairwise interactions: for the top candidate features, compute the
# residual between the joint cell mean and the additive prediction.
candidates = [m.feature for m in main_effects[:20]]
name_to_idx = {f: i for i, f in enumerate(all_features)}
interactions: list[InteractionImportance] = []
for fa, fb in combinations(candidates, 2):
ia, ib = name_to_idx[fa], name_to_idx[fb]
both_mask = (X[:, ia] > 0.5) & (X[:, ib] > 0.5)
neither_mask = (X[:, ia] < 0.5) & (X[:, ib] < 0.5)
only_a_mask = (X[:, ia] > 0.5) & (X[:, ib] < 0.5)
only_b_mask = (X[:, ia] < 0.5) & (X[:, ib] > 0.5)
if not (both_mask.any() and neither_mask.any()
and only_a_mask.any() and only_b_mask.any()):
continue
mb = float(y[both_mask].mean())
mn = float(y[neither_mask].mean())
ma_only = float(y[only_a_mask].mean())
mb_only = float(y[only_b_mask].mean())
additive_pred = ma_only + mb_only - mn
residual = abs(mb - additive_pred)
interactions.append(InteractionImportance(
feature_a=fa,
feature_b=fb,
interaction_strength=round(residual, 4),
mean_both=round(mb, 4),
mean_neither=round(mn, 4),
mean_only_a=round(ma_only, 4),
mean_only_b=round(mb_only, 4),
n_total=int(both_mask.sum() + neither_mask.sum()
+ only_a_mask.sum() + only_b_mask.sum()),
))
interactions.sort(key=lambda i: i.interaction_strength, reverse=True)
return FactorAnalysisReport(
n_runs=n_samples,
total_variance=round(total_variance, 6),
main_effects=main_effects,
interactions=interactions[:top_k_interactions],
method="random_forest_fanova",
)

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import datetime
import hashlib
import logging
@ -39,6 +40,18 @@ logger = logging.getLogger(__name__)
console = Console()
class _NullCtx:
"""A no-op async context manager used to skip the browser semaphore
for non-browser tasks without branching the call site twice.
"""
async def __aenter__(self) -> "_NullCtx":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
class BenchmarkHarness:
def __init__(
self,
@ -63,6 +76,8 @@ class BenchmarkHarness:
progress_callback: Callable[[TaskDefinition, int], Awaitable[None]] | None = None,
print_report: bool = True,
quiet: bool = False,
concurrency: int = 1,
browser_concurrency: int = 1,
) -> None:
self.gateway_config = gateway_config
self.model = model
@ -84,6 +99,8 @@ class BenchmarkHarness:
self.progress_callback = progress_callback
self.print_report = print_report
self.quiet = quiet
self.concurrency = max(1, int(concurrency))
self.browser_concurrency = max(1, int(browser_concurrency))
self.repo_root = Path(__file__).parent.parent
async def run(self) -> BenchmarkResult:
@ -112,44 +129,118 @@ class BenchmarkHarness:
console.print(f"Model: [cyan]{self.model}[/cyan]")
if self.judge_model:
console.print(f"Advisory judge: [magenta]{self.judge_model}[/magenta]")
mode = "serial" if self.concurrency == 1 else f"parallel(concurrency={self.concurrency}, browser={self.browser_concurrency})"
console.print(f"Execution: [bright_blue]{mode}[/]")
console.print(
"Axes: [green]Completion[/] + [blue]Trajectory[/] + [yellow]Behavior[/] + [magenta]Reliability[/]\n"
)
all_results: dict[str, list[TaskRunResult]] = {}
for task in tasks:
if not self.quiet:
console.print(f"[bold]{task.id}[/bold] ({task.tier.value}/{task.family.value})")
task_runs: list[TaskRunResult] = []
for run_index in range(self.runs_per_task):
if self.prepare_run is not None:
await self.prepare_run(task, run_index)
if self.progress_callback is not None:
await self.progress_callback(task, run_index)
result = await self._run_single(task, run_index)
task_runs.append(result)
if not self.quiet:
passed = self._is_passing_run(task, result)
marker = "[green]+" if passed else "[yellow]~" if result.run_score >= 0.4 else "[red]-"
failure_suffix = f" [red]{result.failure_mode.value}[/]" if result.failure_mode else ""
console.print(
f" run {run_index + 1}: {marker} {result.run_score:.2f}[/] "
f"[green]C={result.completion_result.score:.2f}[/] "
f"[blue]T={result.trajectory_result.score:.2f}[/] "
f"[yellow]B={result.behavior_result.score:.2f}[/]"
f"{f' [magenta]J={result.judge_result.score:.2f}[/]' if result.judge_result.enabled and not result.judge_result.error else ''}"
f"{failure_suffix}"
)
if result.judge_result.error:
console.print(f" [yellow]? judge unavailable: {result.judge_result.error}[/]")
for failure in result.completion_result.failed_assertions[:2]:
console.print(f" [red]! {failure}[/]")
for failure in result.trajectory_result.forbidden_violations[:2]:
console.print(f" [red]! {failure}[/]")
all_results[task.id] = task_runs
wall_start = time.monotonic()
all_results = await self._execute_runs(tasks)
wall_seconds = time.monotonic() - wall_start
if not self.quiet:
total_runs = sum(len(runs) for runs in all_results.values())
mean_run = (wall_seconds / total_runs) if total_runs else 0.0
console.print(
f"\n[dim]Wall time: {wall_seconds:.1f}s across {total_runs} runs "
f"({mean_run:.1f}s avg, concurrency={self.concurrency})[/dim]"
)
return self._aggregate(tasks, all_results)
async def _execute_runs(
self,
tasks: list[TaskDefinition],
) -> dict[str, list[TaskRunResult]]:
"""Run every (task, run_index) work item, serial or parallel.
Browser tasks are gated by a separate semaphore so the Chromium
port collision can't ever occur, regardless of concurrency level.
Non-browser tasks share the global semaphore.
"""
global_sem = asyncio.Semaphore(self.concurrency)
browser_sem = asyncio.Semaphore(self.browser_concurrency)
print_lock = asyncio.Lock()
# Build the flat work list. Browser tasks float to the front so they
# don't end up sitting in the queue while non-browser slots churn.
work_items: list[tuple[TaskDefinition, int]] = []
browser_items: list[tuple[TaskDefinition, int]] = []
non_browser_items: list[tuple[TaskDefinition, int]] = []
for task in tasks:
for run_index in range(self.runs_per_task):
item = (task, run_index)
if task.family.value == "browser":
browser_items.append(item)
else:
non_browser_items.append(item)
work_items = browser_items + non_browser_items
results_by_task: dict[str, list[TaskRunResult | None]] = {
task.id: [None] * self.runs_per_task for task in tasks
}
completed = 0
total = len(work_items)
async def run_one(task: TaskDefinition, run_index: int) -> None:
nonlocal completed
is_browser = task.family.value == "browser"
async with global_sem:
# Browser tasks additionally need the browser-only semaphore
# so a parallel non-browser run can never collide with the
# Chromium-using run on the gateway's fixed browser port.
browser_ctx = browser_sem if is_browser else _NullCtx()
async with browser_ctx:
if self.prepare_run is not None:
await self.prepare_run(task, run_index)
if self.progress_callback is not None:
await self.progress_callback(task, run_index)
result = await self._run_single(task, run_index)
results_by_task[task.id][run_index] = result
completed += 1
if not self.quiet:
async with print_lock:
self._print_run_result(task, run_index, result, completed, total)
await asyncio.gather(*(run_one(task, idx) for task, idx in work_items))
# Convert from list-with-Nones to plain list, preserving run order
return {
task.id: [r for r in results_by_task[task.id] if r is not None]
for task in tasks
}
def _print_run_result(
self,
task: TaskDefinition,
run_index: int,
result: TaskRunResult,
completed: int,
total: int,
) -> None:
passed = self._is_passing_run(task, result)
marker = "[green]+" if passed else "[yellow]~" if result.run_score >= 0.4 else "[red]-"
failure_suffix = f" [red]{result.failure_mode.value}[/]" if result.failure_mode else ""
console.print(
f"[dim][{completed}/{total}][/dim] [bold]{task.id}[/bold] "
f"({task.tier.value}/{task.family.value}) run {run_index + 1}: "
f"{marker} {result.run_score:.2f}[/] "
f"[green]C={result.completion_result.score:.2f}[/] "
f"[blue]T={result.trajectory_result.score:.2f}[/] "
f"[yellow]B={result.behavior_result.score:.2f}[/]"
f"{f' [magenta]J={result.judge_result.score:.2f}[/]' if result.judge_result.enabled and not result.judge_result.error else ''}"
f"{failure_suffix}"
)
if result.judge_result.error:
console.print(f" [yellow]? judge unavailable: {result.judge_result.error}[/]")
for failure in result.completion_result.failed_assertions[:2]:
console.print(f" [red]! {failure}[/]")
for failure in result.trajectory_result.forbidden_violations[:2]:
console.print(f" [red]! {failure}[/]")
async def _run_single(self, task: TaskDefinition, run_index: int) -> TaskRunResult:
workspace = self._create_run_workspace(task, run_index)
services = []

220
clawbench/insights.py Normal file
View File

@ -0,0 +1,220 @@
"""ClawBench v0.5 — Ecosystem Insights publisher.
After enough submissions accumulate, ClawBench publishes ecosystem-level
insights derived from the historical fingerprint database
(CLAWBENCH_V0_4_SPEC.md v0.5 §"Community Insights"):
- Plugin impact leaderboard
- Strongest interactions
- Overhyped plugins (would require ClawHub install counts stubbed)
- Underrated plugins (same)
- Capability gaps across task families
This module computes those insights and writes them to the `insights/`
directory as JSON so they can be consumed by the web UI or by plugin
authors via API.
"""
from __future__ import annotations
import json
from collections import Counter
from dataclasses import dataclass, field, asdict
from pathlib import Path
from clawbench.factor_analysis import FactorAnalysisReport, analyze
from clawbench.prediction import HistoricalDatabase
@dataclass
class PluginImpactEntry:
plugin_id: str
n_profiles_with: int
n_profiles_without: int
mean_with: float
mean_without: float
impact_delta: float # mean_with - mean_without
confidence: float # 0..1 scaled by min sample size on either side
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class CapabilityGap:
capability: str
best_score_observed: float
n_profiles_attempted: int
threshold: float
note: str = ""
def to_dict(self) -> dict:
return asdict(self)
def compute_plugin_leaderboard(
db: HistoricalDatabase, *, min_sample: int = 2
) -> list[PluginImpactEntry]:
"""Average score delta when each plugin is added to comparable profiles.
Simplest valid definition: for each plugin id appearing in at least
`min_sample` profiles, compute the mean overall score of runs that
include the plugin vs runs that do not. Report the delta ordered by
magnitude.
This is confounded by other factors, but at the scale of 30+
submissions the ranking is usable, and the confidence column makes
the small-sample entries visibly less trustworthy.
"""
if len(db) < min_sample * 2:
return []
all_plugin_ids: set[str] = set()
for run in db.runs:
all_plugin_ids.update(run.fingerprint.plugin_ids)
entries: list[PluginImpactEntry] = []
for pid in sorted(all_plugin_ids):
with_scores = [
r.overall_score for r in db.runs if pid in r.fingerprint.plugin_ids
]
without_scores = [
r.overall_score for r in db.runs if pid not in r.fingerprint.plugin_ids
]
if len(with_scores) < min_sample or len(without_scores) < min_sample:
continue
mean_with = sum(with_scores) / len(with_scores)
mean_without = sum(without_scores) / len(without_scores)
min_side = min(len(with_scores), len(without_scores))
# Confidence grows with min sample size, saturates at 0.9
confidence = min(0.9, 0.1 + 0.04 * min_side)
entries.append(PluginImpactEntry(
plugin_id=pid,
n_profiles_with=len(with_scores),
n_profiles_without=len(without_scores),
mean_with=round(mean_with, 4),
mean_without=round(mean_without, 4),
impact_delta=round(mean_with - mean_without, 4),
confidence=round(confidence, 2),
))
entries.sort(key=lambda e: e.impact_delta, reverse=True)
return entries
def compute_capability_gaps(
db: HistoricalDatabase, *, threshold: float = 0.7
) -> list[CapabilityGap]:
"""Find per-task capability gaps.
A capability gap is a task where NO profile in the database has
scored at or above `threshold`. These are the tasks that currently
frustrate the entire ecosystem good signal for where benchmark
headroom lies.
"""
if not db.runs:
return []
task_best: dict[str, float] = {}
task_attempts: Counter[str] = Counter()
for run in db.runs:
for task_id, score in run.per_task_score.items():
task_attempts[task_id] += 1
if score > task_best.get(task_id, -1.0):
task_best[task_id] = score
gaps: list[CapabilityGap] = []
for task_id, best in sorted(task_best.items()):
if best < threshold:
gaps.append(CapabilityGap(
capability=task_id,
best_score_observed=round(best, 4),
n_profiles_attempted=task_attempts[task_id],
threshold=threshold,
note=f"best observed {best:.3f} < threshold {threshold:.2f}",
))
gaps.sort(key=lambda g: g.best_score_observed)
return gaps
def publish_insights(
db: HistoricalDatabase,
output_dir: Path,
*,
factor_report: FactorAnalysisReport | None = None,
threshold: float = 0.7,
) -> dict[str, Path]:
"""Compute and write all ecosystem insight files.
Returns a mapping of insight name file path written.
"""
output_dir.mkdir(parents=True, exist_ok=True)
written: dict[str, Path] = {}
# 1) plugin_leaderboard.json
leaderboard = compute_plugin_leaderboard(db)
path = output_dir / "plugin_leaderboard.json"
path.write_text(
json.dumps([e.to_dict() for e in leaderboard], indent=2),
encoding="utf-8",
)
written["plugin_leaderboard"] = path
# 2) interactions.json + factor_importance.json
if factor_report is None:
factor_report = analyze(db)
path = output_dir / "factor_importance.json"
path.write_text(
json.dumps(
{
"n_runs": factor_report.n_runs,
"method": factor_report.method,
"total_variance": factor_report.total_variance,
"main_effects": [m.to_dict() for m in factor_report.main_effects],
},
indent=2,
),
encoding="utf-8",
)
written["factor_importance"] = path
path = output_dir / "interactions.json"
path.write_text(
json.dumps(
[i.to_dict() for i in factor_report.interactions],
indent=2,
),
encoding="utf-8",
)
written["interactions"] = path
# 3) gaps.json
gaps = compute_capability_gaps(db, threshold=threshold)
path = output_dir / "gaps.json"
path.write_text(
json.dumps([g.to_dict() for g in gaps], indent=2),
encoding="utf-8",
)
written["gaps"] = path
# 4) calibration.json — how well have predictions matched reality
path = output_dir / "calibration.json"
path.write_text(
json.dumps(db.calibration_metrics(), indent=2),
encoding="utf-8",
)
written["calibration"] = path
# 5) summary.json — top-level pointers
summary = {
"n_runs": len(db),
"leaderboard_top": [e.to_dict() for e in leaderboard[:5]],
"top_interactions": [i.to_dict() for i in factor_report.interactions[:5]],
"n_capability_gaps": len(gaps),
"factor_method": factor_report.method,
}
path = output_dir / "summary.json"
path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
written["summary"] = path
return written

View File

@ -51,10 +51,18 @@ async def judge_task_run(
)
await client.subscribe(session_key)
judge_transcript = await client.send_and_wait(session_key, prompt)
# Temporary debug: log first 800 chars of raw judge response when parsing fails
raw_text = judge_transcript.assistant_text
parsed = parse_judge_response(
judge_transcript.assistant_text,
raw_text,
passing_threshold=task.judge.passing_threshold,
)
if parsed.error:
logger.warning(
"Judge parse failed for %s. Raw response (first 800 chars):\n%s",
task.id,
raw_text[:800] if raw_text else "(empty)",
)
parsed.enabled = True
parsed.model = judge_model
parsed.duration_ms = int((time.monotonic() - started_at) * 1000)

345
clawbench/prediction.py Normal file
View File

@ -0,0 +1,345 @@
"""ClawBench v0.5 — Cold-start prediction via k-NN over fingerprints.
When a new Plugin Profile is submitted, this module produces a pre-run
estimate of how it will score by finding the nearest neighbors in the
historical fingerprint database and weighting their actual scores by
similarity.
This is the cold-start path. It works after as few as 3 historical
submissions, and gets sharper as more accumulate. No deep model. No
training pipeline. Pure k-NN with a well-engineered similarity metric.
"""
from __future__ import annotations
import json
import math
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Iterable
from clawbench.profile import ProfileFingerprint, fingerprint_similarity
@dataclass
class HistoricalRun:
"""One observed (profile, results) datapoint in the database."""
profile_name: str
fingerprint: ProfileFingerprint
overall_score: float
per_task_score: dict[str, float] = field(default_factory=dict)
# Optional calibration data captured at run time so we can track how
# prediction accuracy improves as the database grows.
predicted_score_at_submission: float | None = None
prediction_confidence_at_submission: float | None = None
n_runs_contributing: int = 1 # v0.4 run multiplicity (≥3 for official)
@dataclass
class Neighbor:
historical: HistoricalRun
similarity: float
distance: float
@dataclass
class PredictionReport:
predicted_overall_score: float
confidence: float # 0..1, function of neighbor density and consistency
n_neighbors_used: int
neighbor_names: list[str]
predicted_per_task: dict[str, float]
capability_attributions: dict[str, float]
note: str = ""
def to_dict(self) -> dict:
return asdict(self)
class HistoricalDatabase:
"""In-memory historical database, persisted to JSON."""
def __init__(self, path: Path | None = None) -> None:
self.path = path
self.runs: list[HistoricalRun] = []
if path is not None and path.exists():
self._load()
def add(self, run: HistoricalRun) -> None:
self.runs.append(run)
if self.path is not None:
self._save()
def _load(self) -> None:
assert self.path is not None
data = json.loads(self.path.read_text(encoding="utf-8"))
for raw in data:
fp_raw = raw["fingerprint"]
fp = ProfileFingerprint(**fp_raw)
self.runs.append(HistoricalRun(
profile_name=raw["profile_name"],
fingerprint=fp,
overall_score=float(raw["overall_score"]),
per_task_score={k: float(v) for k, v in raw.get("per_task_score", {}).items()},
predicted_score_at_submission=raw.get("predicted_score_at_submission"),
prediction_confidence_at_submission=raw.get("prediction_confidence_at_submission"),
n_runs_contributing=int(raw.get("n_runs_contributing", 1)),
))
def _save(self) -> None:
assert self.path is not None
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(json.dumps([
{
"profile_name": r.profile_name,
"fingerprint": asdict(r.fingerprint),
"overall_score": r.overall_score,
"per_task_score": r.per_task_score,
"predicted_score_at_submission": r.predicted_score_at_submission,
"prediction_confidence_at_submission": r.prediction_confidence_at_submission,
"n_runs_contributing": r.n_runs_contributing,
}
for r in self.runs
], indent=2), encoding="utf-8")
def __len__(self) -> int:
return len(self.runs)
def calibration_metrics(self) -> dict[str, float]:
"""Compute running prediction calibration error.
Uses only runs that stored a `predicted_score_at_submission`,
since earlier submissions may not have had prediction data
available. Returns mean absolute error (MAE), root mean square
error (RMSE), signed bias, and the sample size.
Success criterion in CLAWBENCH_V0_4_SPEC.md §v0.5 Success: MAE
below 0.08 after 100+ submissions.
"""
predicted = []
actual = []
for run in self.runs:
if run.predicted_score_at_submission is None:
continue
predicted.append(float(run.predicted_score_at_submission))
actual.append(float(run.overall_score))
if not predicted:
return {
"n": 0,
"mae": 0.0,
"rmse": 0.0,
"bias": 0.0,
"mae_target_met": False,
}
n = len(predicted)
errors = [a - p for p, a in zip(predicted, actual)]
abs_errors = [abs(e) for e in errors]
mae = sum(abs_errors) / n
rmse = (sum(e * e for e in errors) / n) ** 0.5
bias = sum(errors) / n
return {
"n": n,
"mae": round(mae, 4),
"rmse": round(rmse, 4),
"bias": round(bias, 4),
# The v0.5 spec says MAE < 0.08 after 100+ submissions; we
# only claim the target is met when both conditions hold.
"mae_target_met": bool(n >= 100 and mae < 0.08),
}
def predict_profile(
fingerprint: ProfileFingerprint,
db: HistoricalDatabase,
k: int = 10,
min_similarity: float = 0.05,
) -> PredictionReport:
"""Predict scores for a new profile via similarity-weighted k-NN."""
if len(db) == 0:
return PredictionReport(
predicted_overall_score=0.5,
confidence=0.0,
n_neighbors_used=0,
neighbor_names=[],
predicted_per_task={},
capability_attributions={},
note="cold start: no historical data — returning neutral midpoint",
)
neighbors = _rank_neighbors(fingerprint, db, k=k, min_similarity=min_similarity)
if not neighbors:
return PredictionReport(
predicted_overall_score=_global_mean(db),
confidence=0.0,
n_neighbors_used=0,
neighbor_names=[],
predicted_per_task={},
capability_attributions={},
note="no neighbors above similarity floor — using global mean",
)
# Similarity-weighted prediction with epsilon smoothing
eps = 1e-6
weights = [n.similarity + eps for n in neighbors]
total_weight = sum(weights)
predicted_overall = sum(
w * n.historical.overall_score
for w, n in zip(weights, neighbors)
) / total_weight
# Per-task prediction (only tasks where at least one neighbor has data)
all_tasks: set[str] = set()
for n in neighbors:
all_tasks.update(n.historical.per_task_score.keys())
predicted_per_task: dict[str, float] = {}
for task_id in sorted(all_tasks):
task_weights, task_scores = [], []
for w, n in zip(weights, neighbors):
if task_id in n.historical.per_task_score:
task_weights.append(w)
task_scores.append(n.historical.per_task_score[task_id])
if task_weights:
predicted_per_task[task_id] = sum(
w * s for w, s in zip(task_weights, task_scores)
) / sum(task_weights)
# Confidence: combines neighbor density (closer = better) and consistency
# (low variance among neighbors = better)
avg_sim = sum(n.similarity for n in neighbors) / len(neighbors)
score_variance = _variance([n.historical.overall_score for n in neighbors])
consistency = max(0.0, 1.0 - math.sqrt(score_variance) / 0.3)
confidence = round(0.6 * avg_sim + 0.4 * consistency, 4)
# Capability attributions: rough marginal-effect estimate
attributions = _estimate_capability_attributions(fingerprint, db)
return PredictionReport(
predicted_overall_score=round(predicted_overall, 4),
confidence=round(min(1.0, max(0.0, confidence)), 4),
n_neighbors_used=len(neighbors),
neighbor_names=[n.historical.profile_name for n in neighbors],
predicted_per_task=predicted_per_task,
capability_attributions=attributions,
)
def _rank_neighbors(
fingerprint: ProfileFingerprint,
db: HistoricalDatabase,
k: int,
min_similarity: float,
) -> list[Neighbor]:
scored: list[Neighbor] = []
for run in db.runs:
sim = fingerprint_similarity(fingerprint, run.fingerprint)
if sim < min_similarity:
continue
scored.append(Neighbor(historical=run, similarity=sim, distance=1.0 - sim))
scored.sort(key=lambda n: n.similarity, reverse=True)
return scored[:k]
def _global_mean(db: HistoricalDatabase) -> float:
if not db.runs:
return 0.5
return sum(r.overall_score for r in db.runs) / len(db.runs)
def _variance(values: Iterable[float]) -> float:
vals = list(values)
if len(vals) < 2:
return 0.0
mean = sum(vals) / len(vals)
return sum((v - mean) ** 2 for v in vals) / (len(vals) - 1)
def attribute_surprise(
fingerprint: ProfileFingerprint,
task_id: str,
delta: float,
db: HistoricalDatabase,
) -> str:
"""Generate a hypothesis for why a task score deviated from prediction.
Strategy:
1. Find the fingerprint capabilities that appear in THIS profile but
are absent in most neighbors who got low scores on `task_id`.
If `delta > 0` (positive surprise), those capabilities are
candidate causes for the lift.
2. Conversely, if `delta < 0`, look for capabilities the profile is
MISSING that most successful neighbors had.
3. Fall back to a generic note if the database is too small.
Returns a short English hypothesis string. Never raises.
"""
if len(db) < 3:
return "insufficient historical data to attribute"
same_task_runs = [
r for r in db.runs if task_id in r.per_task_score
]
if len(same_task_runs) < 2:
return f"no comparable runs for {task_id}"
if delta > 0:
# Positive surprise: find capabilities this profile has that
# low-scoring neighbors lack.
low = [r for r in same_task_runs if r.per_task_score[task_id] < 0.5]
if not low:
return "positive surprise; no low-scoring comparators"
low_caps = set.intersection(
*(set(r.fingerprint.capability_coverage) for r in low)
) if low else set()
our_caps = set(fingerprint.capability_coverage)
lifting = sorted(our_caps - low_caps)
if lifting:
return f"likely lift from capabilities absent in low scorers: {', '.join(lifting[:3])}"
# Hook-level fallback
low_hooks = set.intersection(
*(set(r.fingerprint.hook_footprint) for r in low)
) if low else set()
our_hooks = set(fingerprint.hook_footprint)
hook_lift = sorted(our_hooks - low_hooks)
if hook_lift:
return f"likely lift from hooks absent in low scorers: {', '.join(hook_lift[:3])}"
return "positive surprise; no clear structural cause"
# Negative surprise: find capabilities successful neighbors had that we lack
high = [r for r in same_task_runs if r.per_task_score[task_id] >= 0.7]
if not high:
return "negative surprise; no high-scoring comparators"
high_caps_union = set().union(
*(set(r.fingerprint.capability_coverage) for r in high)
) if high else set()
our_caps = set(fingerprint.capability_coverage)
missing = sorted(high_caps_union - our_caps)
if missing:
return f"likely drag from capabilities missing vs high scorers: {', '.join(missing[:3])}"
return "negative surprise; no clear structural cause"
def _estimate_capability_attributions(
fingerprint: ProfileFingerprint,
db: HistoricalDatabase,
) -> dict[str, float]:
"""For each capability in the new profile, estimate the marginal effect.
This is the simplest possible attribution: for each capability the new
profile has, look at runs that DID and DID NOT include that capability,
and report the score delta. Confounded by other factors but interpretable
enough to be useful, and exact under random configuration sampling.
"""
if len(db) < 4:
return {}
attributions: dict[str, float] = {}
for cap in fingerprint.capability_coverage:
with_cap = [r.overall_score for r in db.runs if cap in r.fingerprint.capability_coverage]
without_cap = [r.overall_score for r in db.runs if cap not in r.fingerprint.capability_coverage]
if not with_cap or not without_cap:
continue
delta = (sum(with_cap) / len(with_cap)) - (sum(without_cap) / len(without_cap))
attributions[cap] = round(delta, 4)
return attributions

505
clawbench/profile.py Normal file
View File

@ -0,0 +1,505 @@
"""ClawBench v0.5 — Plugin Profile and Manifest Feature extraction.
This module implements the structural side of the configuration-space
benchmarking framework defined in CLAWBENCH_V0_4_SPEC.md (v0.5 Direction).
A Plugin Profile describes the full agent configuration that ClawBench
evaluates: base model + enabled plugins + slot fills + tool allowlist.
A Manifest Feature Vector is computed mechanically from a plugin's
openclaw.plugin.json manifest plus its registration trace. The feature
vector has the same shape for every plugin bundled, ClawHub-installed,
or custom so the framework generalizes to plugins it has never seen.
A Profile Fingerprint aggregates all plugin feature vectors in a profile
into a structural summary used for similarity search, prediction, and
factor importance analysis.
"""
from __future__ import annotations
import hashlib
import json
import re
from collections.abc import Iterable
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any
import yaml
# ---------------------------------------------------------------------------
# Hook surface — must mirror OpenClaw's plugin hook contract.
# Source: openclaw/src/plugins/types.ts (PluginHookName).
# Listed explicitly so feature extraction never silently drops a hook.
# ---------------------------------------------------------------------------
KNOWN_HOOKS: tuple[str, ...] = (
"before_model_resolve",
"before_prompt_build",
"before_agent_start",
"before_agent_reply",
"agent_end",
"session_start",
"session_end",
"gateway_start",
"gateway_stop",
"llm_input",
"llm_output",
"before_tool_call",
"after_tool_call",
"before_compaction",
"after_compaction",
"inbound_claim",
"message_received",
"message_sending",
"message_sent",
"before_message_write",
"before_dispatch",
"reply_dispatch",
"before_reset",
"subagent_spawning",
"subagent_delivery_target",
"subagent_spawned",
"subagent_ended",
"before_install",
)
# Tool families used by ClawBench's trajectory classifier — same vocabulary
# as clawbench/trajectory.py:classify_tool_call so the fingerprint speaks
# the same language as the run trajectory analysis.
TOOL_FAMILIES: tuple[str, ...] = (
"read",
"edit",
"search",
"execute",
"browser",
"memory",
"delegate",
"cron",
"plan",
"unknown",
)
# Manifest contract types — mirror PluginManifestContracts from
# openclaw/src/plugins/types.ts.
CONTRACT_KEYS: tuple[str, ...] = (
"tools",
"memoryEmbeddingProviders",
"speechProviders",
"realtimeTranscriptionProviders",
"realtimeVoiceProviders",
"mediaUnderstandingProviders",
"imageGenerationProviders",
"videoGenerationProviders",
"musicGenerationProviders",
"webFetchProviders",
"webSearchProviders",
)
# ---------------------------------------------------------------------------
# Plugin Manifest model
# ---------------------------------------------------------------------------
@dataclass
class PluginManifest:
"""Subset of openclaw.plugin.json fields that the fingerprint needs."""
id: str
kind: list[str] = field(default_factory=list)
contracts: dict[str, list[str]] = field(default_factory=dict)
channels: list[str] = field(default_factory=list)
providers: list[str] = field(default_factory=list)
skills: list[str] = field(default_factory=list)
capability_tags: list[str] = field(default_factory=list)
clawhub_channel: str = "bundled"
clawhub_is_official: bool = False
version: str = ""
@classmethod
def from_file(cls, path: Path) -> PluginManifest:
with path.open(encoding="utf-8") as f:
data = json.load(f)
return cls.from_dict(data)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PluginManifest:
kind_raw = data.get("kind", [])
if isinstance(kind_raw, str):
kind = [kind_raw]
elif isinstance(kind_raw, list):
kind = list(kind_raw)
else:
kind = []
contracts_raw = data.get("contracts", {}) or {}
contracts: dict[str, list[str]] = {}
for key in CONTRACT_KEYS:
value = contracts_raw.get(key)
if isinstance(value, list):
contracts[key] = [str(v) for v in value]
else:
contracts[key] = []
return cls(
id=str(data.get("id", "")),
kind=kind,
contracts=contracts,
channels=list(data.get("channels", []) or []),
providers=list(data.get("providers", []) or []),
skills=list(data.get("skills", []) or []),
capability_tags=list(data.get("capabilityTags", []) or []),
clawhub_channel=str(data.get("clawhub_channel", "bundled")),
clawhub_is_official=bool(data.get("clawhub_is_official", False)),
version=str(data.get("version", "")),
)
# ---------------------------------------------------------------------------
# Registration Trace — what a plugin actually registered at runtime.
# Captured from the gateway's plugin registry after the plugin loads.
# ---------------------------------------------------------------------------
@dataclass
class RegistrationTrace:
"""Records what a plugin registered when its register() was called."""
plugin_id: str
tools: list[str] = field(default_factory=list) # tool names
tool_families_seen: list[str] = field(default_factory=list) # classified
hooks: list[str] = field(default_factory=list) # hook event names
gateway_methods: list[str] = field(default_factory=list)
http_routes: list[str] = field(default_factory=list)
services: list[str] = field(default_factory=list)
cli_commands: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Plugin Feature Vector — computed for ANY plugin, seen or unseen.
# This is the heart of why the framework generalizes: every plugin yields
# the same shape vector regardless of implementation.
# ---------------------------------------------------------------------------
def plugin_feature_vector(
manifest: PluginManifest,
trace: RegistrationTrace | None = None,
) -> dict[str, Any]:
"""Build the typed feature vector for one plugin.
Parameters
----------
manifest : PluginManifest
The plugin's manifest, parsed from openclaw.plugin.json.
trace : RegistrationTrace | None
Optional registration trace observed at runtime. If None, the
feature vector is built purely from the manifest (cheap path,
usable before the plugin loads).
Returns
-------
dict[str, Any]
A feature dict with the same keys for every plugin.
"""
trace = trace or RegistrationTrace(plugin_id=manifest.id)
features: dict[str, Any] = {
"plugin_id": manifest.id,
"version": manifest.version,
"clawhub_channel": manifest.clawhub_channel,
"clawhub_is_official": manifest.clawhub_is_official,
}
# Contract presence (boolean per contract type)
for key in CONTRACT_KEYS:
features[f"provides_{_snake(key)}"] = bool(manifest.contracts.get(key))
# Tool count from contracts
features["provides_tools_count"] = len(manifest.contracts.get("tools", []))
# Kind flags
features["provides_memory"] = "memory" in manifest.kind
features["provides_context_engine"] = "context-engine" in manifest.kind
# Counts of higher-level capabilities
features["n_channels"] = len(manifest.channels)
features["n_providers"] = len(manifest.providers)
features["n_skills"] = len(manifest.skills)
features["n_capability_tags"] = len(manifest.capability_tags)
features["capability_tags"] = sorted(manifest.capability_tags)
# Hook footprint (one column per known hook)
trace_hooks = set(trace.hooks)
for hook in KNOWN_HOOKS:
features[f"hooks_{hook}"] = hook in trace_hooks
features["n_hooks"] = sum(1 for h in KNOWN_HOOKS if h in trace_hooks)
# Tool family surface
trace_families = set(trace.tool_families_seen)
for family in TOOL_FAMILIES:
features[f"tool_family_{family}"] = family in trace_families
features["n_tool_families"] = len(trace_families)
# Surface area
features["n_tools_registered"] = len(trace.tools)
features["registers_gateway_methods"] = bool(trace.gateway_methods)
features["registers_http_routes"] = bool(trace.http_routes)
features["registers_services"] = bool(trace.services)
features["registers_cli_commands"] = bool(trace.cli_commands)
return features
def _snake(camel: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower()
# ---------------------------------------------------------------------------
# Plugin Profile — what a benchmark submission looks like.
# ---------------------------------------------------------------------------
@dataclass
class PluginProfileEntry:
id: str
source: str = "bundled" # "bundled" | "clawhub" | "local"
config: dict[str, Any] = field(default_factory=dict)
version: str = ""
@dataclass
class PluginProfile:
name: str
base_model: str
plugins: list[PluginProfileEntry] = field(default_factory=list)
slots: dict[str, str] = field(default_factory=dict)
tools_allow: list[str] = field(default_factory=list)
notes: str = ""
@classmethod
def from_yaml_file(cls, path: Path) -> PluginProfile:
with path.open(encoding="utf-8") as f:
data = yaml.safe_load(f)
return cls.from_dict(data)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PluginProfile:
if "profile" in data:
data = data["profile"]
plugins_raw = data.get("plugins", {})
if isinstance(plugins_raw, dict):
entries_raw = plugins_raw.get("enabled", [])
slots = plugins_raw.get("slots", {}) or {}
tools_allow = plugins_raw.get("tools_allow", []) or []
else:
entries_raw = plugins_raw or []
slots = {}
tools_allow = []
entries: list[PluginProfileEntry] = []
for raw in entries_raw:
if isinstance(raw, str):
entries.append(_entry_from_id(raw))
elif isinstance(raw, dict):
pid = str(raw.get("id", ""))
if not pid:
continue
entry = _entry_from_id(pid)
if "config" in raw and isinstance(raw["config"], dict):
entry.config = dict(raw["config"])
if "version" in raw:
entry.version = str(raw["version"])
entries.append(entry)
return cls(
name=str(data.get("name", "unnamed-profile")),
base_model=str(data.get("base_model", "")),
plugins=entries,
slots=dict(slots),
tools_allow=list(tools_allow),
notes=str(data.get("notes", "")),
)
def _entry_from_id(raw_id: str) -> PluginProfileEntry:
"""Parse `bundled-id`, `clawhub:pkg@1.2`, or `local:./path` notations."""
if raw_id.startswith("clawhub:"):
rest = raw_id[len("clawhub:"):]
if "@" in rest:
pid, version = rest.split("@", 1)
else:
pid, version = rest, ""
return PluginProfileEntry(id=pid, source="clawhub", version=version)
if raw_id.startswith("local:"):
return PluginProfileEntry(id=raw_id[len("local:"):], source="local")
return PluginProfileEntry(id=raw_id, source="bundled")
# ---------------------------------------------------------------------------
# Profile Fingerprint — aggregated structural summary of a profile.
# Two profiles with the same fingerprint should score similarly.
# ---------------------------------------------------------------------------
@dataclass
class ProfileFingerprint:
"""Structural summary of a Plugin Profile.
The fingerprint is computed by aggregating per-plugin feature vectors
plus profile-level features (base model, slot fills, tool allowlist).
"""
profile_name: str
base_model: str
capability_coverage: list[str] # union of contract types present
hook_footprint: list[str] # union of hooks intercepted
tool_family_surface: list[str] # union of tool families
capability_tags_union: list[str] # union of clawhub tags
memory_slot: str
context_engine_slot: str
n_plugins: int
n_clawhub_plugins: int
n_custom_plugins: int
n_official_plugins: int
n_tools_total: int
n_hooks_total: int
plugin_ids: list[str]
tools_allow: list[str]
fingerprint_hash: str # stable content hash for indexing
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_profile(
cls,
profile: PluginProfile,
manifests: dict[str, PluginManifest],
traces: dict[str, RegistrationTrace] | None = None,
) -> ProfileFingerprint:
traces = traces or {}
feature_vectors = []
for entry in profile.plugins:
manifest = manifests.get(entry.id)
if manifest is None:
# Cold start for an unknown plugin: synthesize a minimal
# manifest so the plugin still contributes to the fingerprint.
manifest = PluginManifest(id=entry.id, clawhub_channel=entry.source)
trace = traces.get(entry.id)
feature_vectors.append(plugin_feature_vector(manifest, trace))
capability_coverage = sorted({
_snake(key)
for fv in feature_vectors
for key in CONTRACT_KEYS
if fv.get(f"provides_{_snake(key)}")
})
hook_footprint = sorted({
hook for fv in feature_vectors
for hook in KNOWN_HOOKS
if fv.get(f"hooks_{hook}")
})
tool_family_surface = sorted({
family for fv in feature_vectors
for family in TOOL_FAMILIES
if fv.get(f"tool_family_{family}")
})
capability_tags_union = sorted({
tag for fv in feature_vectors
for tag in fv.get("capability_tags", [])
})
n_clawhub = sum(1 for e in profile.plugins if e.source == "clawhub")
n_custom = sum(1 for e in profile.plugins if e.source == "local")
n_official = sum(
1 for fv in feature_vectors if fv.get("clawhub_is_official")
)
n_tools = sum(int(fv.get("n_tools_registered", 0)) for fv in feature_vectors)
n_hooks = sum(int(fv.get("n_hooks", 0)) for fv in feature_vectors)
# Stable hash over the structural content
h_payload = {
"base_model": profile.base_model,
"capabilities": capability_coverage,
"hooks": hook_footprint,
"families": tool_family_surface,
"tags": capability_tags_union,
"memory_slot": profile.slots.get("memory", ""),
"context_engine_slot": profile.slots.get("contextEngine", ""),
"plugin_ids": sorted(e.id for e in profile.plugins),
"tools_allow": sorted(profile.tools_allow),
}
fingerprint_hash = hashlib.sha256(
json.dumps(h_payload, sort_keys=True).encode("utf-8")
).hexdigest()[:16]
return cls(
profile_name=profile.name,
base_model=profile.base_model,
capability_coverage=capability_coverage,
hook_footprint=hook_footprint,
tool_family_surface=tool_family_surface,
capability_tags_union=capability_tags_union,
memory_slot=profile.slots.get("memory", ""),
context_engine_slot=profile.slots.get("contextEngine", ""),
n_plugins=len(profile.plugins),
n_clawhub_plugins=n_clawhub,
n_custom_plugins=n_custom,
n_official_plugins=n_official,
n_tools_total=n_tools,
n_hooks_total=n_hooks,
plugin_ids=sorted(e.id for e in profile.plugins),
tools_allow=sorted(profile.tools_allow),
fingerprint_hash=fingerprint_hash,
)
# ---------------------------------------------------------------------------
# Similarity metric for k-NN prediction.
# ---------------------------------------------------------------------------
def fingerprint_similarity(a: ProfileFingerprint, b: ProfileFingerprint) -> float:
"""Composite similarity in [0, 1].
Combines:
- Jaccard over capability coverage (weight 0.30)
- Jaccard over hook footprint (weight 0.25)
- Jaccard over tool family surface (weight 0.20)
- Jaccard over capability tags (weight 0.10)
- Slot match (memory, contextEngine) (weight 0.10)
- Same base model (weight 0.05)
"""
def jaccard(s1: Iterable[str], s2: Iterable[str]) -> float:
ss1, ss2 = set(s1), set(s2)
if not ss1 and not ss2:
return 1.0
union = ss1 | ss2
if not union:
return 1.0
return len(ss1 & ss2) / len(union)
cap = jaccard(a.capability_coverage, b.capability_coverage)
hooks = jaccard(a.hook_footprint, b.hook_footprint)
fams = jaccard(a.tool_family_surface, b.tool_family_surface)
tags = jaccard(a.capability_tags_union, b.capability_tags_union)
slot_match = 0.0
if a.memory_slot == b.memory_slot:
slot_match += 0.5
if a.context_engine_slot == b.context_engine_slot:
slot_match += 0.5
model_match = 1.0 if a.base_model == b.base_model else 0.0
return (
0.30 * cap
+ 0.25 * hooks
+ 0.20 * fams
+ 0.10 * tags
+ 0.10 * slot_match
+ 0.05 * model_match
)

View File

@ -8,18 +8,28 @@ from typing import Any
DATASET_SOURCE = "basic_usage_query_suite_v1"
SCENARIO_WEIGHT_DEFAULTS: dict[str, float] = {
"file_system_ops": 0.13,
"web_info_ops": 0.10,
"calendar_reminders": 0.08,
"communication_messaging": 0.09,
"data_processing_analysis": 0.11,
"coding_dev_assist": 0.09,
# Original 12 scenarios from the basic-usage query test sheet
"file_system_ops": 0.10,
"web_info_ops": 0.08,
"calendar_reminders": 0.06,
"communication_messaging": 0.08,
"data_processing_analysis": 0.09,
"coding_dev_assist": 0.07,
"personal_life_assistant": 0.06,
"multi_step_compound": 0.12,
"multi_step_compound": 0.10,
"context_continuation": 0.05,
"error_boundary_cases": 0.05,
"skill_calling": 0.07,
"system_capabilities": 0.05,
"skill_calling": 0.06,
"system_capabilities": 0.04,
# v0.5 additions: high-frequency personal-agent scenarios beyond the sheet
"privacy_pii_handling": 0.04,
"personal_financial_hygiene": 0.03,
"travel_logistics_under_uncertainty": 0.03,
"social_coordination": 0.02,
"personal_knowledge_base": 0.02,
"health_wellness_tracking": 0.01,
"account_security_hygiene": 0.01,
"multimodal_understanding": 0.00,
}

View File

@ -0,0 +1,231 @@
"""ClawBench v0.5 — Recommendations generator.
The Recommendations section is the prescriptive output that distinguishes
ClawBench from descriptive leaderboards (CLAWBENCH_V0_4_SPEC.md §8
"Configuration Diagnostic Report"). Every recommendation must be backed
by data either by neighbor profiles that already include the suggested
plugin, or by factor-importance attributions with explicit confidence.
This module generates a ranked list of concrete profile changes from the
historical database + factor analysis + the current profile, with
per-recommendation evidence and a conservative estimated score impact.
No speculative recommendations are generated. If the database is too
small or the evidence too weak, the output is an empty list and the
caller is expected to surface that explicitly in the diagnostic.
"""
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass, field, asdict
from clawbench.factor_analysis import FactorAnalysisReport
from clawbench.prediction import HistoricalDatabase
from clawbench.profile import ProfileFingerprint
from clawbench.utilization import UtilizationReport
@dataclass
class Recommendation:
kind: str # "add_plugin", "remove_plugin", "fill_slot", "swap_plugin"
target: str # plugin id or slot name
rationale: str
estimated_delta: float # predicted score impact, signed
confidence: float # 0..1
evidence: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class RecommendationSet:
recommendations: list[Recommendation] = field(default_factory=list)
note: str = ""
def to_dict(self) -> dict:
return {
"recommendations": [r.to_dict() for r in self.recommendations],
"note": self.note,
}
MIN_DB_SIZE_FOR_RECOMMENDATIONS = 5
MIN_EVIDENCE_NEIGHBORS = 2
def generate_recommendations(
fingerprint: ProfileFingerprint,
db: HistoricalDatabase,
factor: FactorAnalysisReport | None,
utilization: UtilizationReport | None = None,
*,
max_recommendations: int = 6,
) -> RecommendationSet:
"""Generate a ranked, evidence-backed list of profile changes.
Signals combined:
1. Dead-weight plugins (from utilization) remove_plugin.
2. Empty required slots fill_slot.
3. Plugins appearing in high-scoring neighbors but missing from
this profile add_plugin.
4. Factor-analysis main effects with positive delta and features
this profile lacks add_plugin (capability level).
Every recommendation includes evidence naming either the neighbor
profiles that justify it or the factor-analysis row that produced it.
"""
if len(db) < MIN_DB_SIZE_FOR_RECOMMENDATIONS:
return RecommendationSet(
recommendations=[],
note=(
f"recommendations disabled: historical database has only "
f"{len(db)} runs (need ≥{MIN_DB_SIZE_FOR_RECOMMENDATIONS})"
),
)
recs: list[Recommendation] = []
# --- Signal 1: dead-weight plugin removal ----------------------------
if utilization is not None:
for p in utilization.per_plugin:
if p.dead_weight:
recs.append(Recommendation(
kind="remove_plugin",
target=p.plugin_id,
rationale=(
f"plugin '{p.plugin_id}' loaded but was never invoked "
f"during this run — consider removing it to reduce "
f"configuration surface area"
),
estimated_delta=0.0, # removing dead weight is neutral for score
confidence=0.9,
evidence=[f"0 tool invocations across all tasks"],
))
# --- Signal 2: empty slots -------------------------------------------
if not fingerprint.memory_slot:
# Check if filling memory slot correlates with higher scores
with_mem = [r for r in db.runs if r.fingerprint.memory_slot]
without_mem = [r for r in db.runs if not r.fingerprint.memory_slot]
if len(with_mem) >= MIN_EVIDENCE_NEIGHBORS and without_mem:
mean_with = sum(r.overall_score for r in with_mem) / len(with_mem)
mean_without = sum(r.overall_score for r in without_mem) / len(without_mem)
delta = mean_with - mean_without
if delta > 0.03:
# Pick the most popular memory plugin across the high-scorers
high = [r for r in with_mem if r.overall_score >= mean_with]
memories = Counter(r.fingerprint.memory_slot for r in high)
if memories:
top_mem, count = memories.most_common(1)[0]
recs.append(Recommendation(
kind="fill_slot",
target=f"memory={top_mem}",
rationale=(
f"profiles with a memory slot filled average "
f"{mean_with:.2f} vs {mean_without:.2f} without. "
f"'{top_mem}' is the most common choice among "
f"high scorers."
),
estimated_delta=round(delta, 4),
confidence=round(min(0.9, 0.3 + 0.1 * len(with_mem)), 2),
evidence=[
f"{len(with_mem)} profiles with memory: mean {mean_with:.3f}",
f"{len(without_mem)} profiles without: mean {mean_without:.3f}",
f"{count}/{len(high)} high scorers use '{top_mem}'",
],
))
# --- Signal 3: plugins missing vs high-scoring neighbors -------------
our_plugin_ids = set(fingerprint.plugin_ids)
# High scorers = top third of database by overall_score
sorted_runs = sorted(db.runs, key=lambda r: r.overall_score, reverse=True)
top_third = sorted_runs[: max(3, len(sorted_runs) // 3)]
plugin_freq: Counter[str] = Counter()
for r in top_third:
for pid in r.fingerprint.plugin_ids:
if pid not in our_plugin_ids:
plugin_freq[pid] += 1
# Only recommend plugins present in ≥ MIN_EVIDENCE_NEIGHBORS high scorers
for plugin_id, count in plugin_freq.most_common(max_recommendations):
if count < MIN_EVIDENCE_NEIGHBORS:
break
# Estimate delta: mean score of top-third runs WITH this plugin
# minus mean of runs WITHOUT it, restricted to comparable profiles.
with_plugin = [
r for r in db.runs if plugin_id in r.fingerprint.plugin_ids
]
without_plugin = [
r for r in db.runs if plugin_id not in r.fingerprint.plugin_ids
]
if not with_plugin or not without_plugin:
continue
mean_with = sum(r.overall_score for r in with_plugin) / len(with_plugin)
mean_without = sum(r.overall_score for r in without_plugin) / len(without_plugin)
delta = mean_with - mean_without
if delta <= 0.01:
continue
# Confidence rises with sample size on both sides, caps at 0.85
confidence = min(0.85, 0.25 + 0.05 * min(len(with_plugin), len(without_plugin)))
recs.append(Recommendation(
kind="add_plugin",
target=plugin_id,
rationale=(
f"'{plugin_id}' appears in {count} of {len(top_third)} "
f"top-scoring profiles and is missing from this one"
),
estimated_delta=round(delta, 4),
confidence=round(confidence, 2),
evidence=[
f"{len(with_plugin)} profiles with '{plugin_id}': mean {mean_with:.3f}",
f"{len(without_plugin)} without: mean {mean_without:.3f}",
f"present in {count}/{len(top_third)} top scorers",
],
))
# --- Signal 4: factor-analysis lifts for features the profile lacks ---
if factor is not None and factor.main_effects:
our_caps = set(fingerprint.capability_coverage)
our_hooks = set(fingerprint.hook_footprint)
for me in factor.main_effects[:10]:
if me.importance < 0.05 or me.delta <= 0.02:
continue
feat = me.feature
if feat.startswith("capability:"):
name = feat.split(":", 1)[1]
if name in our_caps:
continue
rationale_target = f"any plugin providing '{name}'"
elif feat.startswith("hook:"):
name = feat.split(":", 1)[1]
if name in our_hooks:
continue
rationale_target = f"any plugin registering hook '{name}'"
else:
continue
# Avoid duplicating add_plugin recommendations that already
# target a specific plugin providing this capability.
recs.append(Recommendation(
kind="add_capability",
target=rationale_target,
rationale=(
f"factor analysis attributes {me.importance:.1%} of "
f"variance to '{feat}' (Δ={me.delta:+.3f}); "
f"this profile does not cover it"
),
estimated_delta=round(me.delta, 4),
confidence=round(min(0.75, 0.2 + me.importance), 2),
evidence=[
f"fANOVA importance {me.importance:.3f}",
f"n_with={me.n_with}, n_without={me.n_without}",
],
))
# Rank by (estimated_delta * confidence), cap the output
recs.sort(key=lambda r: r.estimated_delta * r.confidence, reverse=True)
return RecommendationSet(
recommendations=recs[:max_recommendations],
note="" if recs else "no strong signals found in historical data",
)

View File

@ -53,6 +53,7 @@ class CapabilityTag(str, enum.Enum):
class ScenarioDomain(str, enum.Enum):
# Original 12 scenarios from the basic-usage query test sheet
FILE_SYSTEM_OPS = "file_system_ops"
WEB_INFO_OPS = "web_info_ops"
CALENDAR_REMINDERS = "calendar_reminders"
@ -65,6 +66,15 @@ class ScenarioDomain(str, enum.Enum):
ERROR_BOUNDARY = "error_boundary_cases"
SKILL_CALLING = "skill_calling"
SYSTEM = "system_capabilities"
# v0.5 additions: high-frequency personal-agent scenarios beyond the test sheet
PRIVACY_PII = "privacy_pii_handling"
FINANCIAL_PERSONAL = "personal_financial_hygiene"
TRAVEL_LOGISTICS = "travel_logistics_under_uncertainty"
SOCIAL_COORDINATION = "social_coordination"
KNOWLEDGE_BASE = "personal_knowledge_base"
HEALTH_TRACKING = "health_wellness_tracking"
SECURITY_HYGIENE = "account_security_hygiene"
MULTIMODAL_UNDERSTANDING = "multimodal_understanding"
class QueryDifficulty(str, enum.Enum):

View File

@ -41,12 +41,45 @@ DONE_PATTERN = re.compile(
r"\b(done|fixed|completed|finished|all set|tests pass|verified|resolved|ready)\b",
re.IGNORECASE,
)
RUN_SCORE_WEIGHTS = {
# Deterministic weights (used when no judge available, or when the task
# has deterministic execution checks — see combine_run_score).
RUN_SCORE_WEIGHTS_DETERMINISTIC = {
"completion": 0.40,
"trajectory": 0.30,
"behavior": 0.20,
}
# Weights when a judge is available AND the task has NO deterministic
# completion verifiers. In that regime the judge is the only signal that
# captures semantic correctness.
RUN_SCORE_WEIGHTS_SEMANTIC_ONLY = {
"completion": 0.20,
"trajectory": 0.20,
"behavior": 0.10,
"judge": 0.50,
}
# Weights when a judge is available AND the task has deterministic
# completion verifiers. Per CLAWBENCH_V0_4_SPEC.md §"Disallowed Primary
# Verifiers" and §"Judge Gating", the judge must not dominate the score
# when deterministic verification is possible. Judge contribution is
# capped at 10% and only contributes at all when the deterministic floor
# is effectively met (completion.score >= 0.9999) — this gate is enforced
# in combine_run_score().
RUN_SCORE_WEIGHTS_WITH_DETERMINISTIC_JUDGE = {
"completion": 0.40,
"trajectory": 0.30,
"behavior": 0.20,
"judge": 0.10,
}
# Backward-compat alias — kept pointing at the deterministic weights
# which is what existing callers implicitly expect.
RUN_SCORE_WEIGHTS = RUN_SCORE_WEIGHTS_DETERMINISTIC
RUN_SCORE_WEIGHT_TOTAL = sum(RUN_SCORE_WEIGHTS.values())
# Legacy alias — a few tests may still reference this name. It is now a
# synonym for the semantic-only weighting.
RUN_SCORE_WEIGHTS_WITH_JUDGE = RUN_SCORE_WEIGHTS_SEMANTIC_ONLY
async def score_task_run(
@ -88,6 +121,12 @@ async def score_task_run(
completion=completion_result.score,
trajectory=trajectory_result.score,
behavior=behavior_result.score,
judge=(
judge_result.score
if judge_result.enabled and not judge_result.error
else None
),
has_deterministic_verifier=completion_result.total_assertions > 0,
)
delivery_outcome = classify_delivery_outcome(
task=task,
@ -134,13 +173,77 @@ async def score_task_run(
)
def combine_run_score(*, completion: float, trajectory: float, behavior: float) -> float:
weighted_sum = (
RUN_SCORE_WEIGHTS["completion"] * completion
+ RUN_SCORE_WEIGHTS["trajectory"] * trajectory
+ RUN_SCORE_WEIGHTS["behavior"] * behavior
)
score = weighted_sum / RUN_SCORE_WEIGHT_TOTAL if RUN_SCORE_WEIGHT_TOTAL else 0.0
DETERMINISTIC_FLOOR = 0.9999
def combine_run_score(
*,
completion: float,
trajectory: float,
behavior: float,
judge: float | None = None,
has_deterministic_verifier: bool = False,
) -> float:
"""Blend completion + trajectory + behavior (+ judge when available).
Gating rules, per CLAWBENCH_V0_4_SPEC.md §"Disallowed Primary
Verifiers" and §"Judge Gating":
1. If there is no judge signal, use the deterministic-only weights.
2. If there is a judge AND the task has a deterministic verifier
(execution checks, file assertions, gateway assertions, etc.),
the judge is capped at 10% of the run score, and it only
contributes when the deterministic completion floor is met
(completion.score >= 0.9999). This matches the spec's policy
that "semantic quality never rescues failed completion."
3. If there is a judge AND the task has NO deterministic verifier,
the judge is the dominant signal (50%) this is the only regime
where an LLM judge is allowed to drive the primary score.
"""
if judge is None:
weights = RUN_SCORE_WEIGHTS_DETERMINISTIC
weighted_sum = (
weights["completion"] * completion
+ weights["trajectory"] * trajectory
+ weights["behavior"] * behavior
)
total = sum(weights.values())
elif has_deterministic_verifier:
# Judge is capped and gated on the deterministic floor. When the
# floor is not met, the judge signal is completely ignored —
# including its weight column — so semantic quality cannot
# rescue a failed deterministic completion. When the floor is
# met, the judge can contribute at most 10% of the run score.
if completion < DETERMINISTIC_FLOOR:
weights = RUN_SCORE_WEIGHTS_DETERMINISTIC
weighted_sum = (
weights["completion"] * completion
+ weights["trajectory"] * trajectory
+ weights["behavior"] * behavior
)
total = sum(weights.values())
else:
weights = RUN_SCORE_WEIGHTS_WITH_DETERMINISTIC_JUDGE
weighted_sum = (
weights["completion"] * completion
+ weights["trajectory"] * trajectory
+ weights["behavior"] * behavior
+ weights["judge"] * judge
)
total = sum(weights.values())
else:
# Semantic-only task: judge is the dominant signal.
weights = RUN_SCORE_WEIGHTS_SEMANTIC_ONLY
weighted_sum = (
weights["completion"] * completion
+ weights["trajectory"] * trajectory
+ weights["behavior"] * behavior
+ weights["judge"] * judge
)
total = sum(weights.values())
score = weighted_sum / total if total else 0.0
return round(min(1.0, max(0.0, score)), 4)

View File

@ -1,8 +1,9 @@
"""Statistical helpers for ClawBench v0.3."""
"""Statistical helpers for ClawBench v0.3+ (extended for v0.5)."""
from __future__ import annotations
from dataclasses import dataclass
import math
from dataclasses import dataclass, field
import numpy as np
@ -128,6 +129,105 @@ def compute_reliability_with_flags(
)
@dataclass
class RobustnessProfile:
"""Taguchi-style robustness summary for a profile or model across tasks.
The larger-is-better signal-to-noise ratio is dominated by the
worst-performing tasks (because of the 1/yᵢ² term), which is the
behavior we want for agent benchmarking: a configuration that scores
0.85 on average but 0.10 on adversarial tasks is worse in production
than one that averages 0.78 and never drops below 0.65.
Reference: CLAWBENCH_V0_4_SPEC.md v0.5 §"Taguchi Signal-to-Noise".
"""
mean: float
worst_of_n: float
best_of_n: float
stddev: float
sn_ratio_db: float # larger-is-better S/N ratio in decibels
tier_means: dict[str, float] = field(default_factory=dict)
n_tasks: int = 0
def to_dict(self) -> dict:
return {
"mean": round(self.mean, 4),
"worst_of_n": round(self.worst_of_n, 4),
"best_of_n": round(self.best_of_n, 4),
"stddev": round(self.stddev, 4),
"sn_ratio_db": round(self.sn_ratio_db, 4),
"tier_means": {k: round(v, 4) for k, v in self.tier_means.items()},
"n_tasks": self.n_tasks,
}
def taguchi_sn_larger_is_better(scores: list[float], *, floor: float = 1e-3) -> float:
"""Compute the larger-is-better signal-to-noise ratio in decibels.
S/N = -10 * log10( (1/n) * Σ (1/yᵢ²) )
`floor` clamps any zero scores to avoid 1/0. A tiny positive floor
still heavily penalizes zero-scored tasks in the final S/N, which is
the desired behavior a benchmark run that crashes on a task should
drag the S/N down sharply.
"""
if not scores:
return 0.0
clamped = [max(floor, float(s)) for s in scores]
mean_inverse_square = sum(1.0 / (y * y) for y in clamped) / len(clamped)
return -10.0 * math.log10(mean_inverse_square)
def compute_robustness_profile(
per_task_scores: dict[str, float],
*,
tier_of: dict[str, str] | None = None,
) -> RobustnessProfile:
"""Build a RobustnessProfile from a {task_id: score} mapping.
If `tier_of` is supplied, also compute per-tier mean scores so the
diagnostic report can show where the configuration is strong or weak.
"""
if not per_task_scores:
return RobustnessProfile(
mean=0.0,
worst_of_n=0.0,
best_of_n=0.0,
stddev=0.0,
sn_ratio_db=0.0,
tier_means={},
n_tasks=0,
)
values = list(per_task_scores.values())
arr = np.array(values, dtype=float)
mean = float(arr.mean())
worst = float(arr.min())
best = float(arr.max())
stddev = float(arr.std(ddof=1)) if len(values) > 1 else 0.0
sn = taguchi_sn_larger_is_better(values)
tier_means: dict[str, float] = {}
if tier_of:
bucket: dict[str, list[float]] = {}
for task_id, score in per_task_scores.items():
tier = tier_of.get(task_id, "unknown")
bucket.setdefault(tier, []).append(float(score))
for tier, scores in bucket.items():
tier_means[tier] = sum(scores) / len(scores)
return RobustnessProfile(
mean=mean,
worst_of_n=worst,
best_of_n=best,
stddev=stddev,
sn_ratio_db=sn,
tier_means=tier_means,
n_tasks=len(values),
)
def summarize_task_runs(
scores: list[float],
pass_threshold: float = 0.7,

283
clawbench/utilization.py Normal file
View File

@ -0,0 +1,283 @@
"""ClawBench v0.5 — Plugin Utilization Audit and Manifest-vs-Reality Gap.
This module answers two questions from the Configuration Diagnostic Report
(CLAWBENCH_V0_4_SPEC.md §"Configuration Diagnostic Report" items 3 and 4):
3. For each plugin in the profile, was it actually invoked during the
run? Plugins that loaded but were never called are flagged as dead
weight.
4. For each plugin, did it impact the tasks its manifest suggested it
would? Discrepancies are listed.
Both are computed purely from the profile + transcripts, with no live
gateway instrumentation required. The tool-name plugin-id mapping is
derived from the RegistrationTrace when available, and falls back to a
conservative heuristic (tool family match) when the trace is missing.
"""
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass, field, asdict
from typing import Iterable
from clawbench.profile import (
PluginManifest,
PluginProfile,
RegistrationTrace,
TOOL_FAMILIES,
)
from clawbench.schemas import Transcript
from clawbench.trajectory import classify_tool_call
@dataclass
class PluginUtilization:
"""Per-plugin invocation summary for a single profile run."""
plugin_id: str
source: str
invoked: bool
invocation_count: int
tool_calls: list[str] = field(default_factory=list) # tool names invoked
tool_families_touched: list[str] = field(default_factory=list)
task_ids_with_invocation: list[str] = field(default_factory=list)
dead_weight: bool = False # True if plugin loaded but never invoked
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class UtilizationReport:
n_plugins: int
n_invoked: int
n_dead_weight: int
per_plugin: list[PluginUtilization] = field(default_factory=list)
unassigned_tool_calls: int = 0 # tool calls we couldn't trace back to a plugin
def to_dict(self) -> dict:
return {
"n_plugins": self.n_plugins,
"n_invoked": self.n_invoked,
"n_dead_weight": self.n_dead_weight,
"unassigned_tool_calls": self.unassigned_tool_calls,
"per_plugin": [p.to_dict() for p in self.per_plugin],
}
@property
def utilization_rate(self) -> float:
if self.n_plugins == 0:
return 0.0
return self.n_invoked / self.n_plugins
def _build_tool_to_plugin_map(
profile: PluginProfile,
traces: dict[str, RegistrationTrace] | None,
) -> dict[str, str]:
"""Map tool name → plugin_id using registration traces when available."""
mapping: dict[str, str] = {}
if not traces:
return mapping
for entry in profile.plugins:
trace = traces.get(entry.id)
if trace is None:
continue
for tool_name in trace.tools:
# First-registration wins; traces are processed in profile order
mapping.setdefault(tool_name, entry.id)
return mapping
def _fallback_family_to_plugin(
profile: PluginProfile,
traces: dict[str, RegistrationTrace] | None,
) -> dict[str, list[str]]:
"""Fallback: map tool family → candidate plugin ids.
Used when a tool call's name does not appear in any registration trace
(e.g., because no traces were captured for this run). We can still
attribute at the family level based on what each plugin declared.
"""
out: dict[str, list[str]] = {}
if not traces:
return out
for entry in profile.plugins:
trace = traces.get(entry.id)
if trace is None:
continue
for fam in trace.tool_families_seen:
out.setdefault(fam, []).append(entry.id)
return out
def audit_plugin_utilization(
profile: PluginProfile,
transcripts: dict[str, Transcript],
*,
manifests: dict[str, PluginManifest] | None = None,
traces: dict[str, RegistrationTrace] | None = None,
) -> UtilizationReport:
"""Compute a UtilizationReport from a profile + per-task transcripts.
Parameters
----------
profile : PluginProfile
The submitted profile.
transcripts : dict[task_id, Transcript]
The per-task transcripts from the v0.4 benchmark run.
manifests : dict[plugin_id, PluginManifest] | None
Optional cached manifests (unused directly but kept for parity
with other v0.5 signatures callers always have them around).
traces : dict[plugin_id, RegistrationTrace] | None
Optional registration traces. When provided, enables exact tool
name plugin_id attribution. When missing, falls back to family
matching.
Returns
-------
UtilizationReport
"""
del manifests # accepted for signature parity; not currently needed
tool_to_plugin = _build_tool_to_plugin_map(profile, traces)
family_to_plugins = _fallback_family_to_plugin(profile, traces)
per_plugin_counts: dict[str, int] = {e.id: 0 for e in profile.plugins}
per_plugin_tools: dict[str, Counter] = {e.id: Counter() for e in profile.plugins}
per_plugin_families: dict[str, set[str]] = {e.id: set() for e in profile.plugins}
per_plugin_tasks: dict[str, set[str]] = {e.id: set() for e in profile.plugins}
unassigned = 0
for task_id, transcript in transcripts.items():
for call in transcript.tool_call_sequence:
family = call.family or classify_tool_call(call)[0] or "unknown"
plugin_id = tool_to_plugin.get(call.name)
if plugin_id is None:
# Family fallback: if exactly one plugin claims this family,
# attribute to it. If multiple do, leave unassigned — we
# don't want to inflate counts via ambiguous attribution.
candidates = family_to_plugins.get(family, [])
if len(candidates) == 1:
plugin_id = candidates[0]
if plugin_id is None or plugin_id not in per_plugin_counts:
unassigned += 1
continue
per_plugin_counts[plugin_id] += 1
per_plugin_tools[plugin_id][call.name] += 1
per_plugin_families[plugin_id].add(family)
per_plugin_tasks[plugin_id].add(task_id)
per_plugin: list[PluginUtilization] = []
for entry in profile.plugins:
count = per_plugin_counts[entry.id]
invoked = count > 0
per_plugin.append(PluginUtilization(
plugin_id=entry.id,
source=entry.source,
invoked=invoked,
invocation_count=count,
tool_calls=sorted(per_plugin_tools[entry.id].keys()),
tool_families_touched=sorted(per_plugin_families[entry.id]),
task_ids_with_invocation=sorted(per_plugin_tasks[entry.id]),
dead_weight=not invoked,
))
n_invoked = sum(1 for p in per_plugin if p.invoked)
n_dead = sum(1 for p in per_plugin if p.dead_weight)
return UtilizationReport(
n_plugins=len(per_plugin),
n_invoked=n_invoked,
n_dead_weight=n_dead,
per_plugin=per_plugin,
unassigned_tool_calls=unassigned,
)
# ---------------------------------------------------------------------------
# Manifest vs. Reality Gap — §4 of the Configuration Diagnostic Report.
# ---------------------------------------------------------------------------
@dataclass
class ManifestRealityGap:
plugin_id: str
claimed_capabilities: list[str]
observed_capabilities: list[str]
unused_capabilities: list[str] # claimed but never exercised
unclaimed_capabilities: list[str] # observed but not declared
claim_coverage: float # fraction of claimed capabilities actually exercised
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ManifestRealityReport:
per_plugin: list[ManifestRealityGap] = field(default_factory=list)
def to_dict(self) -> dict:
return {"per_plugin": [g.to_dict() for g in self.per_plugin]}
def _manifest_claimed_families(manifest: PluginManifest) -> set[str]:
"""Derive claimed tool families from a manifest.
Each manifest contract maps to one or more ClawBench tool families.
This mapping is conservative: if we're not sure, we don't claim the
family. The point is to detect mismatch, not to be exhaustive.
"""
claimed: set[str] = set()
contracts = manifest.contracts or {}
if contracts.get("tools"):
# Can't determine family from contract alone, but it's "something"
claimed.add("unknown")
if contracts.get("webFetchProviders") or contracts.get("webSearchProviders"):
claimed.add("search")
claimed.add("browser")
if "memory" in manifest.kind:
claimed.add("memory")
if "context-engine" in manifest.kind:
claimed.add("read")
return claimed
def compute_manifest_reality_gap(
profile: PluginProfile,
manifests: dict[str, PluginManifest],
utilization: UtilizationReport,
) -> ManifestRealityReport:
"""For each plugin, compare what the manifest claims against what ran."""
gaps: list[ManifestRealityGap] = []
util_lookup = {p.plugin_id: p for p in utilization.per_plugin}
for entry in profile.plugins:
manifest = manifests.get(entry.id)
if manifest is None:
continue
util = util_lookup.get(entry.id)
claimed = _manifest_claimed_families(manifest)
observed = set(util.tool_families_touched) if util else set()
# Drop the "unknown" sentinel from both sides when computing coverage
claimed_concrete = claimed - {"unknown"}
unused = sorted(claimed_concrete - observed)
unclaimed = sorted(observed - claimed_concrete - {"unknown"})
if claimed_concrete:
coverage = len(claimed_concrete & observed) / len(claimed_concrete)
else:
# Plugin made no family-level claims — coverage is 1.0 if it
# was invoked at all, else 0.0.
coverage = 1.0 if (util and util.invoked) else 0.0
gaps.append(ManifestRealityGap(
plugin_id=entry.id,
claimed_capabilities=sorted(claimed_concrete),
observed_capabilities=sorted(observed),
unused_capabilities=unused,
unclaimed_capabilities=unclaimed,
claim_coverage=round(coverage, 4),
))
return ManifestRealityReport(per_plugin=gaps)