Compare commits

...

16 Commits

Author SHA1 Message Date
Vincent Koc
7da58897af
ci: default crabbox owned capacity to standard (#22)
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
Sync main to HF Space / mirror (push) Has been cancelled
2026-05-07 02:47:04 -07:00
scoootscooob
e0a86b4232
Merge pull request #21 from sallyom/k8s-job
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
Sync main to HF Space / mirror (push) Waiting to run
add docs, manifests for k8s
2026-05-06 15:02:15 -07:00
scoootscooob
a95423b3c6 Fix Kubernetes sidecar deploy flow 2026-05-06 14:51:54 -07:00
sallyom
7d75d99643
add docs, manifests for k8s
Signed-off-by: sallyom <somalley@redhat.com>
2026-05-06 08:19:58 -04:00
scoootscooob
d57e4a697d
Merge pull request #19 from openclaw/codex/openclaw-websocket-run-lifecycle
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
Sync main to HF Space / mirror (push) Has been cancelled
fix(eval): harden OpenClaw run lifecycle waits
2026-05-04 12:25:14 -07:00
scoootscooob
e3ad7ac173 fix(eval): isolate lane queues and configs
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
2026-05-04 12:19:20 -07:00
Vincent Koc
cce89d828b
feat: add crabbox validation wiring
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
Sync main to HF Space / mirror (push) Waiting to run
2026-05-02 18:34:01 -07:00
scoootscooob
5dfa4c9280 fix(eval): stabilize OpenClaw container sweeps
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
2026-05-02 02:50:57 -07:00
scoootscooob
f09a9f4bf7 fix(eval): carry tool profile through harness 2026-05-02 02:01:13 -07:00
scoootscooob
f45eb288d9 fix(eval): harden OpenClaw run lifecycle waits 2026-05-02 01:38:08 -07:00
Vincent Koc
4e6a686ae5
fix(deps): update benchmark dependency bounds
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
Sync main to HF Space / mirror (push) Has been cancelled
2026-04-30 15:14:54 -07:00
Vincent Koc
01dd96c71c
fix(security): constrain research article paths
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
Sync main to HF Space / mirror (push) Waiting to run
2026-04-30 02:57:52 -07:00
Vincent Koc
e80902bafa
chore: add codeowners 2026-04-29 16:02:36 -07:00
scoootscooob
56531fbf43
feat: add adapter canonicalization layer 2026-04-29 13:57:13 -07:00
Vincent Koc
dc8a1936ab
fix(worker): harden runtime result writes
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
Sync main to HF Space / mirror (push) Waiting to run
2026-04-29 13:24:40 -07:00
Vincent Koc
ea17c715b3
fix(client): clean pending rpc on send failure
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
Sync main to HF Space / mirror (push) Waiting to run
2026-04-29 00:09:27 -07:00
56 changed files with 8637 additions and 94 deletions

View File

@ -10,6 +10,11 @@ agent dotfiles, Docker, or a benchmark run that is too heavy for the local
machine. Keep normal unit-test iteration local unless the user asks for
Testbox proof.
Crabbox is the sibling lane for reusable owned-capacity proof. Use
`.agents/skills/crabbox/SKILL.md` and `.crabbox.yaml` when ClawBench needs
AWS-backed reusable boxes or Crabbox sync/log/result inspection. Keep this
skill focused on Blacksmith CI parity.
## Warmup
Run from the repository root:

View File

@ -0,0 +1,122 @@
---
name: crabbox
description: Use Crabbox for ClawBench remote Linux validation, warmed reusable boxes, GitHub Actions hydration, sync timing, logs, results, caches, and lease cleanup.
---
# Crabbox
Use Crabbox when ClawBench needs remote Linux proof on owned capacity, a large
runner class, reusable warm state, or a Blacksmith alternative.
## Before Running
- Run from the repo root. Crabbox sync mirrors the current checkout.
- Prefer local targeted tests for tight edit loops.
- Prefer Blacksmith Testbox when the task explicitly asks for Blacksmith or a
Blacksmith-specific CI comparison.
- Use Crabbox for broad ClawBench gates when owned AWS capacity is the right
remote lane.
- Check `.crabbox.yaml` for repo defaults before adding flags.
- Sanity-check the selected binary before remote work. Prefer the local
`openclaw/crabbox` checkout when present because the user PATH shim can be
stale: `command -v crabbox; ../crabbox/bin/crabbox --version`.
- Install with `brew install openclaw/tap/crabbox`; auth is required before use:
`crabbox login --url https://crabbox.openclaw.ai --provider aws`.
- On macOS the user config is `~/Library/Application Support/crabbox/config.yaml`;
it must include `broker.url`, `broker.token`, and usually `provider: aws`.
## ClawBench Flow
AWS/owned-capacity flow for Python tests:
```sh
crabbox warmup --class standard --idle-timeout 90m
crabbox actions hydrate --id <cbx_id-or-slug>
crabbox run --id <cbx_id-or-slug> --timing-json --shell -- "python -m pytest -q"
```
For commands that need hydrated HF/provider credentials or agent dotfiles, use
the helper installed by the hydration workflow:
```sh
crabbox run --id <cbx_id-or-slug> --timing-json --shell -- "clawbench-testbox-env python -m pytest -q"
crabbox run --id <cbx_id-or-slug> --timing-json --shell -- "clawbench-testbox-env clawbench run --model anthropic/claude-sonnet-4-6 --adapter simulated"
```
Blacksmith-backed Crabbox flow can delegate setup to the existing Testbox
workflow:
```sh
crabbox run --provider blacksmith-testbox --blacksmith-org openclaw --blacksmith-workflow .github/workflows/ci-check-testbox.yml --blacksmith-job check --blacksmith-ref main --idle-timeout 90m --timing-json --shell -- "python -m pytest -q"
```
Stop boxes you created before handoff:
```sh
crabbox stop <cbx_id-or-slug>
```
## Owned AWS Capacity
When AWS capacity is under pressure, do not start with `class=beast`.
`beast` begins at 48xlarge instances and can burn 192 vCPU quota per request.
ClawBench's owned-cloud default is `standard`; escalate to `fast`, then
`large`, and only use `beast` when the work is explicitly CPU-bound and the
smaller class already failed the goal.
Keep capacity hints enabled so brokered AWS leases print selected
region/market, quota pressure, Spot fallback, and high-pressure class warnings.
The ClawBench repo config sets `capacity.hints: true`; use
`CRABBOX_CAPACITY_HINTS=0` only when debugging hint rendering itself.
Use `beast` only for exceptional lanes:
- full benchmark sweeps where wall time is dominated by CPU, not dependency
install or network;
- release/blocker validation where a maintainer explicitly asks for the largest
owned AWS class;
- performance profiling where the point is to compare high-core behavior.
Do not use `beast` for ordinary `python -m pytest -q`, docs-only work, small
task repros, Blacksmith outage triage, or focused lint/type/test checks. Those
should use `standard` first and `fast` only when the extra cores materially
help.
## Useful Commands
```sh
crabbox status --id <id-or-slug> --wait
crabbox inspect --id <id-or-slug> --json
crabbox sync-plan
crabbox history --lease <id-or-slug>
crabbox logs <run_id>
crabbox results <run_id>
crabbox cache stats --id <id-or-slug>
crabbox ssh --id <id-or-slug>
```
Use `--debug` on `run` when measuring sync timing.
Use `--timing-json` on warmup, hydrate, and run when comparing AWS and
blacksmith-testbox timings.
Use `--market spot|on-demand` on AWS warmup or one-shot run when testing quota
or capacity behavior without changing `.crabbox.yaml`.
## Hydration Boundary
`.github/workflows/crabbox-hydrate.yml` is repo-specific on purpose. It owns
ClawBench checkout, setup-python, pip install, provider/HF env hydration,
agent-dotfile restoration, ready marker, and keepalive. Crabbox owns runner
registration, workflow dispatch, SSH sync, command execution, logs/results,
local lease claims, and idle cleanup.
Do not add ClawBench-specific setup to Crabbox. Put repo setup in the hydration
workflow and generic lease/sync behavior in Crabbox.
## Cleanup
Crabbox has coordinator-owned idle expiry and local lease claims, so ClawBench
does not need a custom ledger. Default idle timeout is 30 minutes unless config
or flags set a different value. Still stop boxes you created when done.
If `crabbox list` prints `orphan=no-active-lease`, treat it as an operator
review hint; do not delete `keep=true` machines without checking provider and
coordinator state.

48
.crabbox.yaml Normal file
View File

@ -0,0 +1,48 @@
profile: clawbench-check
provider: aws
class: standard
capacity:
market: spot
strategy: most-available
fallback: on-demand-after-120s
hints: true
regions:
- eu-west-1
actions:
workflow: .github/workflows/crabbox-hydrate.yml
job: hydrate
ref: main
runnerLabels:
- crabbox
- clawbench
runnerVersion: latest
ephemeral: true
aws:
region: eu-west-1
rootGB: 400
sync:
delete: true
checksum: false
gitSeed: true
fingerprint: true
baseRef: main
exclude:
- .artifacts
- .codex
- .DS_Store
- .pytest_cache
- .ruff_cache
- .venv
- dist
- htmlcov
- playwright-report
- test-results
env:
allow:
- CI
- CLAWBENCH_*
- OPENCLAW_*
- PYTHON*
ssh:
user: crabbox
port: "2222"

1
.github/CODEOWNERS vendored Normal file
View File

@ -0,0 +1 @@
* @openclaw/openclaw-evals

View File

@ -29,6 +29,22 @@ It installs ClawBench, hydrates provider/HF secrets into
dotfiles from repo or org secrets, and installs
`~/.local/bin/clawbench-testbox-env` for commands that need that live auth.
## `crabbox-hydrate.yml` — Crabbox Actions hydration
This workflow exists for the Crabbox CLI from `openclaw/crabbox`:
```bash
crabbox warmup --idle-timeout 90m
crabbox actions hydrate --id <cbx_id-or-slug>
crabbox run --id <cbx_id-or-slug> --shell -- "python -m pytest -q"
```
It runs on the dynamic self-hosted runner label registered by Crabbox, installs
ClawBench, hydrates the same provider/HF secrets and agent dotfiles as the
Blacksmith Testbox workflow, writes the Crabbox ready marker under
`~/.crabbox/actions/`, and keeps the job alive for follow-up SSH sync/run
commands.
## `sync-to-hf-space.yml` — auto-mirror main to the HF Space
Mirrors every push to `main` into the HF Space git remote so

View File

@ -39,6 +39,9 @@ jobs:
- name: Run static lint
run: python -m ruff check clawbench app.py scripts tests
- name: Run runtime contract smoke tests
run: python -m pytest -q tests/test_runtime_contracts.py
- name: Run test suite
run: python -m pytest -q

166
.github/workflows/crabbox-hydrate.yml vendored Normal file
View File

@ -0,0 +1,166 @@
name: Crabbox Hydrate
on:
workflow_dispatch:
inputs:
crabbox_id:
description: "Crabbox lease ID"
required: true
type: string
ref:
description: "Git ref to hydrate"
required: false
type: string
crabbox_runner_label:
description: "Dynamic Crabbox runner label"
required: true
type: string
crabbox_job:
description: "Hydration job identifier expected by Crabbox"
required: false
default: "hydrate"
type: string
crabbox_keep_alive_minutes:
description: "Minutes to keep the hydrated job alive"
required: false
default: "90"
type: string
permissions:
contents: read
jobs:
hydrate:
name: hydrate
runs-on: [self-hosted, "${{ inputs.crabbox_runner_label }}"]
timeout-minutes: 120
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ inputs.ref || github.ref }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
- name: Install project
run: |
python -m pip install --upgrade pip
python -m pip install -e .
- name: Prepare Crabbox shell
shell: bash
run: |
set -euo pipefail
git fetch --no-tags --depth=50 origin "+refs/heads/main:refs/remotes/origin/main"
python_dir="$(dirname "$(python -c 'import sys; print(sys.executable)')")"
sudo ln -sf "$python_dir/python" /usr/local/bin/python
sudo ln -sf "$python_dir/python" /usr/local/bin/python3
sudo ln -sf "$python_dir/pip" /usr/local/bin/pip
sudo ln -sf "$python_dir/pip" /usr/local/bin/pip3
sudo ln -sf "$python_dir/pytest" /usr/local/bin/pytest
- name: Hydrate Crabbox env helper
shell: bash
env:
HF_TOKEN: ${{ secrets.HF_TOKEN }}
HF_USERNAME: ${{ secrets.HF_USERNAME }}
CLAWBENCH_QUEUE_DATASET: ${{ vars.CLAWBENCH_QUEUE_DATASET || 'openclaw/clawbench-results' }}
CLAWBENCH_JUDGE_MODEL: ${{ vars.CLAWBENCH_JUDGE_MODEL }}
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
ANTHROPIC_API_KEY_OLD: ${{ secrets.ANTHROPIC_API_KEY_OLD }}
ANTHROPIC_API_TOKEN: ${{ secrets.ANTHROPIC_API_TOKEN }}
CEREBRAS_API_KEY: ${{ secrets.CEREBRAS_API_KEY }}
DEEPINFRA_API_KEY: ${{ secrets.DEEPINFRA_API_KEY }}
FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }}
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }}
KIMI_API_KEY: ${{ secrets.KIMI_API_KEY }}
MINIMAX_API_KEY: ${{ secrets.MINIMAX_API_KEY }}
MISTRAL_API_KEY: ${{ secrets.MISTRAL_API_KEY }}
MOONSHOT_API_KEY: ${{ secrets.MOONSHOT_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENAI_BASE_URL: ${{ secrets.OPENAI_BASE_URL }}
OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }}
QWEN_API_KEY: ${{ secrets.QWEN_API_KEY }}
TOGETHER_API_KEY: ${{ secrets.TOGETHER_API_KEY }}
XAI_API_KEY: ${{ secrets.XAI_API_KEY }}
ZAI_API_KEY: ${{ secrets.ZAI_API_KEY }}
Z_AI_API_KEY: ${{ secrets.Z_AI_API_KEY }}
OPENCLAW_CODEX_AUTH_JSON: ${{ secrets.OPENCLAW_CODEX_AUTH_JSON }}
OPENCLAW_CODEX_CONFIG_TOML: ${{ secrets.OPENCLAW_CODEX_CONFIG_TOML }}
OPENCLAW_CLAUDE_JSON: ${{ secrets.OPENCLAW_CLAUDE_JSON }}
OPENCLAW_CLAUDE_CREDENTIALS_JSON: ${{ secrets.OPENCLAW_CLAUDE_CREDENTIALS_JSON }}
OPENCLAW_CLAUDE_SETTINGS_JSON: ${{ secrets.OPENCLAW_CLAUDE_SETTINGS_JSON }}
OPENCLAW_CLAUDE_SETTINGS_LOCAL_JSON: ${{ secrets.OPENCLAW_CLAUDE_SETTINGS_LOCAL_JSON }}
OPENCLAW_GEMINI_SETTINGS_JSON: ${{ secrets.OPENCLAW_GEMINI_SETTINGS_JSON }}
CLAWBENCH_CODEX_AUTH_JSON: ${{ secrets.CLAWBENCH_CODEX_AUTH_JSON }}
CLAWBENCH_CODEX_CONFIG_TOML: ${{ secrets.CLAWBENCH_CODEX_CONFIG_TOML }}
CLAWBENCH_CLAUDE_JSON: ${{ secrets.CLAWBENCH_CLAUDE_JSON }}
CLAWBENCH_CLAUDE_CREDENTIALS_JSON: ${{ secrets.CLAWBENCH_CLAUDE_CREDENTIALS_JSON }}
CLAWBENCH_CLAUDE_SETTINGS_JSON: ${{ secrets.CLAWBENCH_CLAUDE_SETTINGS_JSON }}
CLAWBENCH_CLAUDE_SETTINGS_LOCAL_JSON: ${{ secrets.CLAWBENCH_CLAUDE_SETTINGS_LOCAL_JSON }}
CLAWBENCH_GEMINI_SETTINGS_JSON: ${{ secrets.CLAWBENCH_GEMINI_SETTINGS_JSON }}
run: |
bash scripts/ci-hydrate-testbox-env.sh
sudo ln -sf "$HOME/.local/bin/clawbench-testbox-env" /usr/local/bin/clawbench-testbox-env
- name: Mark Crabbox ready
shell: bash
run: |
set -euo pipefail
job="${{ inputs.crabbox_job }}"
if [ -z "$job" ]; then job=hydrate; fi
mkdir -p "$HOME/.crabbox/actions"
state="$HOME/.crabbox/actions/${{ inputs.crabbox_id }}.env"
env_file="$HOME/.crabbox/actions/${{ inputs.crabbox_id }}.env.sh"
services_file="$HOME/.crabbox/actions/${{ inputs.crabbox_id }}.services"
write_export() {
key="$1"
value="${!key-}"
if [ -n "$value" ]; then
printf 'export %s=%q\n' "$key" "$value"
fi
}
{
for key in CI GITHUB_ACTIONS GITHUB_WORKSPACE GITHUB_REPOSITORY GITHUB_RUN_ID GITHUB_RUN_NUMBER GITHUB_RUN_ATTEMPT GITHUB_REF GITHUB_REF_NAME GITHUB_SHA GITHUB_EVENT_NAME GITHUB_ACTOR RUNNER_OS RUNNER_ARCH RUNNER_TEMP RUNNER_TOOL_CACHE; do
write_export "$key"
done
} > "${env_file}.tmp"
mv "${env_file}.tmp" "$env_file"
{
echo "# Docker containers visible from the hydrated runner"
docker ps --format '{{.Names}}\t{{.Image}}\t{{.Ports}}' 2>/dev/null || true
} > "${services_file}.tmp"
mv "${services_file}.tmp" "$services_file"
tmp="${state}.tmp"
{
echo "WORKSPACE=${GITHUB_WORKSPACE}"
echo "RUN_ID=${GITHUB_RUN_ID}"
echo "JOB=${job}"
echo "ENV_FILE=${env_file}"
echo "SERVICES_FILE=${services_file}"
echo "READY_AT=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
} > "$tmp"
mv "$tmp" "$state"
- name: Keep Crabbox job alive
shell: bash
run: |
set -euo pipefail
minutes="${{ inputs.crabbox_keep_alive_minutes }}"
case "$minutes" in
''|*[!0-9]*) minutes=90 ;;
esac
stop="$HOME/.crabbox/actions/${{ inputs.crabbox_id }}.stop"
deadline=$(( $(date +%s) + minutes * 60 ))
while [ "$(date +%s)" -lt "$deadline" ]; do
if [ -f "$stop" ]; then
exit 0
fi
sleep 15
done

View File

@ -14,7 +14,7 @@ RUN apt-get update && \
RUN ln -s /app /openclaw
ENV PLAYWRIGHT_BROWSERS_PATH=/ms-playwright
RUN npx -y playwright@1.59.1 install --with-deps chromium && \
RUN cd /tmp && npx -y playwright@1.59.1 install --with-deps chromium && \
CHROME_PATH="$(find /ms-playwright -path '*/chrome' -type f | sort | head -n 1)" && \
test -x "$CHROME_PATH" && \
ln -sf "$CHROME_PATH" /usr/bin/chromium
@ -28,6 +28,7 @@ COPY --chown=node:node tasks-public/ tasks-public/
COPY --chown=node:node tasks-domain/ tasks-domain/
COPY --chown=node:node profiles/ profiles/
COPY --chown=node:node baselines/ baselines/
COPY --chown=node:node scripts/ scripts/
COPY --chown=node:node app.py .
RUN python3 -m pip install --break-system-packages --no-cache-dir .

View File

@ -461,6 +461,26 @@ python3 scripts/run_posterior_dynamics_pipeline.py \
clawbench diagnose profiles/local_ollama_gpt_oss.yaml
```
### Running on Kubernetes
See [`docs/kubernetes.md`](docs/kubernetes.md) for the full runbook. The short
version:
```bash
export CLAWBENCH_NAMESPACE=clawbench-eval
export OPENAI_API_KEY="sk-..." # or ANTHROPIC_API_KEY, OPENROUTER_API_KEY, etc.
export CLAWBENCH_MODEL="openai/gpt-5.5"
# export MLFLOW_NAMESPACE="mlflow" # MLflow deploys in a separate namespace (default: mlflow)
./scripts/k8s/deploy.sh # deploys OpenClaw + MLflow + starts eval
./scripts/k8s/deploy.sh --logs # follow progress
./scripts/k8s/deploy.sh --teardown # tear down openclaw & eval (does not delete MLflow)
```
API keys are stored in a Kubernetes Secret created by the deploy script.
MLflow is deployed in its own namespace (default: `mlflow`, configurable via
`MLFLOW_NAMESPACE`).
---
## Partner Trace Spec

313
clawbench/ablation.py Normal file
View File

@ -0,0 +1,313 @@
"""Ablation profiles and fair-comparison helpers.
The benchmark can only explain model, harness, and tool effects if those
axes are represented explicitly in run metadata. This module keeps that
representation small and deterministic: a harness driver plus a tool
profile yields a fingerprint, and result comparison refuses to call a
delta fair when models or task sets drift.
"""
from __future__ import annotations
import hashlib
import json
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable
from pydantic import BaseModel, Field
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterConfig
from clawbench.canonical import AdapterCapability
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import BenchmarkResult, TaskDefinition
CAPABILITY_TO_INTERFACE: dict[AdapterCapability, str] = {
AdapterCapability.FILES: "filesystem",
AdapterCapability.EXECUTION: "shell",
AdapterCapability.MEMORY: "memory",
AdapterCapability.SESSION: "session",
AdapterCapability.CRON: "scheduler",
AdapterCapability.BROWSER: "browser",
AdapterCapability.GATEWAY_RPC: "gateway_rpc",
AdapterCapability.MULTI_TURN_INJECTION: "multi_turn",
}
class HarnessDescriptor(BaseModel):
"""Identifies the agent loop being measured."""
adapter: str
driver: str = ""
version: str = ""
git_sha: str = ""
source: str = ""
invocation: str = "clawbench"
class ToolProfile(BaseModel):
"""The tools/interfaces exposed to a harness run."""
name: str
mode: str = "native"
interfaces: list[str] = Field(default_factory=list)
adapter_capabilities: list[str] = Field(default_factory=list)
enabled_toolsets: list[str] = Field(default_factory=list)
disabled_toolsets: list[str] = Field(default_factory=list)
tools: list[str] = Field(default_factory=list)
fingerprint: str = ""
def with_fingerprint(self) -> "ToolProfile":
payload = {
"name": self.name,
"mode": self.mode,
"interfaces": sorted(self.interfaces),
"adapter_capabilities": sorted(self.adapter_capabilities),
"enabled_toolsets": sorted(self.enabled_toolsets),
"disabled_toolsets": sorted(self.disabled_toolsets),
"tools": sorted(self.tools),
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()
return self.model_copy(update={"fingerprint": digest[:16]})
class AblationProfile(BaseModel):
"""Run-level axis metadata embedded in BenchmarkResult.environment."""
model: str
harness: HarnessDescriptor
tool_profile: ToolProfile
prompt_profile: str = "clear"
fingerprint: str = ""
def with_fingerprint(self) -> "AblationProfile":
tool_profile = self.tool_profile.with_fingerprint()
payload = {
"model": self.model,
"harness": self.harness.model_dump(),
"tool_profile": tool_profile.model_dump(),
"prompt_profile": self.prompt_profile,
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()
return self.model_copy(
update={
"tool_profile": tool_profile,
"fingerprint": digest[:16],
}
)
@dataclass(frozen=True)
class FairTaskSet:
task_ids: list[str]
skipped: dict[str, list[str]] = field(default_factory=dict)
def capabilities_to_interfaces(capabilities: Iterable[AdapterCapability | str]) -> list[str]:
values: list[str] = []
for cap in capabilities:
enum_value = cap if isinstance(cap, AdapterCapability) else AdapterCapability(str(cap))
values.append(CAPABILITY_TO_INTERFACE.get(enum_value, enum_value.value))
return sorted(set(values))
def adapter_capabilities(
adapter: str,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
adapter_cls = get_adapter(adapter)
return adapter_cls.supported_capabilities(config)
def default_tool_profile(
*,
adapter: str,
config: AdapterConfig | None = None,
name: str | None = None,
mode: str = "native",
enabled_toolsets: list[str] | None = None,
disabled_toolsets: list[str] | None = None,
) -> ToolProfile:
caps = adapter_capabilities(adapter, config)
profile = ToolProfile(
name=name or f"{adapter}-{mode}",
mode=mode,
interfaces=capabilities_to_interfaces(caps),
adapter_capabilities=sorted(cap.value for cap in caps),
enabled_toolsets=enabled_toolsets or [],
disabled_toolsets=disabled_toolsets or [],
)
return profile.with_fingerprint()
def compatible_task_ids(
tasks: Iterable[TaskDefinition],
*,
adapter: str,
config: AdapterConfig | None = None,
) -> tuple[list[str], dict[str, list[str]]]:
caps = adapter_capabilities(adapter, config)
task_ids: list[str] = []
skipped: dict[str, list[str]] = {}
for task in tasks:
canonical = from_task_definition(task)
missing = set(canonical.required_adapter_capabilities) - caps
if missing:
skipped[task.id] = sorted(cap.value for cap in missing)
else:
task_ids.append(task.id)
return task_ids, skipped
def common_compatible_task_set(
tasks: Iterable[TaskDefinition],
adapter_configs: dict[str, tuple[str, AdapterConfig | None]],
) -> FairTaskSet:
task_list = list(tasks)
common: set[str] | None = None
skipped: dict[str, list[str]] = {}
for label, (adapter, config) in adapter_configs.items():
ids, missing = compatible_task_ids(task_list, adapter=adapter, config=config)
ids_set = set(ids)
common = ids_set if common is None else common & ids_set
for task_id, caps in missing.items():
skipped.setdefault(task_id, []).append(f"{label}: {', '.join(caps)}")
ordered = [task.id for task in task_list if task.id in (common or set())]
return FairTaskSet(task_ids=ordered, skipped=skipped)
def build_ablation_profile(
*,
model: str,
adapter: str,
config: AdapterConfig | None = None,
prompt_profile: str = "clear",
harness_version: str = "",
harness_git_sha: str = "",
harness_source: str = "",
driver: str = "",
tool_profile_name: str | None = None,
enabled_toolsets: list[str] | None = None,
disabled_toolsets: list[str] | None = None,
) -> AblationProfile:
harness = HarnessDescriptor(
adapter=adapter,
driver=driver,
version=harness_version,
git_sha=harness_git_sha,
source=harness_source,
)
tool_profile = default_tool_profile(
adapter=adapter,
config=config,
name=tool_profile_name,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
)
return AblationProfile(
model=model,
harness=harness,
tool_profile=tool_profile,
prompt_profile=prompt_profile,
).with_fingerprint()
def compare_results(results: dict[str, BenchmarkResult]) -> dict[str, Any]:
"""Return score deltas plus fairness checks for result JSONs."""
labels = list(results)
models = {label: result.model for label, result in results.items()}
task_sets = {
label: [task.task_id for task in result.task_results]
for label, result in results.items()
}
first_tasks = next(iter(task_sets.values()), [])
same_task_set = all(tasks == first_tasks for tasks in task_sets.values())
same_model = len(set(models.values())) == 1
snapshot_fingerprints = {
result.task_snapshot_fingerprint
for result in results.values()
if result.task_snapshot_fingerprint
}
same_task_snapshot = len(snapshot_fingerprints) <= 1
prompt_variants = {
str(result.environment.get("prompt_variant", ""))
for result in results.values()
if result.environment.get("prompt_variant", "")
}
same_prompt_variant = len(prompt_variants) <= 1
benchmark_releases = {
result.benchmark_release_id
for result in results.values()
if result.benchmark_release_id
}
same_benchmark_release = len(benchmark_releases) <= 1
task_verifier_fair = same_task_set and same_task_snapshot and same_prompt_variant and same_benchmark_release
rows: dict[str, Any] = {}
for label, result in results.items():
rows[label] = {
"model": result.model,
"adapter": result.environment.get("adapter", ""),
"score": result.overall_score,
"completion": result.overall_completion,
"trajectory": result.overall_trajectory,
"behavior": result.overall_behavior,
"reliability": result.overall_reliability,
"task_count": len(result.task_results),
"task_snapshot_fingerprint": result.task_snapshot_fingerprint,
"benchmark_release_id": result.benchmark_release_id,
"prompt_variant": result.environment.get("prompt_variant", ""),
"dimension_coverage": result.environment.get("dimension_coverage", {}),
"ablation": result.environment.get("ablation_profile", {}),
}
deltas: dict[str, float] = {}
if labels:
baseline = results[labels[0]].overall_score
for label in labels[1:]:
deltas[f"{label}_minus_{labels[0]}"] = round(
results[label].overall_score - baseline,
4,
)
return {
"fair": bool(task_verifier_fair),
"task_verifier_fair": bool(task_verifier_fair),
"controlled_ablation": bool(task_verifier_fair and same_model),
"same_model": same_model,
"same_task_set": same_task_set,
"same_task_snapshot": same_task_snapshot,
"same_prompt_variant": same_prompt_variant,
"same_benchmark_release": same_benchmark_release,
"models": models,
"task_sets": task_sets,
"rows": rows,
"deltas": deltas,
}
def git_head(path: Path) -> tuple[str, str]:
"""Best-effort `(sha, describe)` for harness provenance."""
try:
sha = subprocess.check_output(
["git", "-C", str(path), "rev-parse", "HEAD"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
desc = subprocess.check_output(
["git", "-C", str(path), "describe", "--tags", "--always", "--dirty"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
return sha, desc
except Exception:
return "", ""

View File

@ -0,0 +1,102 @@
"""Agent adapter layer — Phase-4 of CLAWBENCH_V0_4_SPEC.md.
Adapters plug an agent framework (OpenClaw, Hermes, Codex, Claude Code,
Deerflow, ) into ClawBench's canonical task pipeline. Each adapter is
responsible for:
- Setting up the workspace + seed state from a `CanonicalTask`.
- Driving the agent through each `CanonicalPhase`'s simulated user.
- Returning a canonical `Transcript` so the scorer, trajectory analyser,
and judge can score the run unchanged.
- Resolving `StateQuery` assertions that fall under its declared
capabilities; returning `capability_missing=True` for queries that
require a capability the adapter doesn't provide.
The `ADAPTERS` registry is populated by each adapter module at import
time. `get_adapter(name)` is the canonical lookup.
"""
from __future__ import annotations
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
#: Registry of adapter_name → adapter class. Populated by the adapter
#: modules at import time (e.g. `from clawbench.adapters.openclaw import *`
#: registers the OpenClaw adapter). Callers should use `get_adapter`
#: rather than reading this dict directly.
ADAPTERS: dict[str, type[AgentAdapter]] = {}
def register_adapter(cls: type[AgentAdapter]) -> type[AgentAdapter]:
"""Decorator / direct-call helper that registers an adapter class.
Adapters declare themselves via:
```
@register_adapter
class HermesAdapter(AgentAdapter):
name = "hermes"
...
```
"""
name = getattr(cls, "name", "")
if not name:
raise ValueError(f"{cls.__name__} must set a non-empty `name` class attribute")
existing = ADAPTERS.get(name)
if existing is not None and existing is not cls:
raise ValueError(
f"Adapter name collision: '{name}' is already registered "
f"to {existing.__qualname__}"
)
ADAPTERS[name] = cls
return cls
def get_adapter(name: str) -> type[AgentAdapter]:
"""Look up an adapter class by its registered name.
Import the adapter module before calling this so the registration
has run. `clawbench.adapters.openclaw` always loads; optional
adapters (hermes, codex) guard their imports and raise a clear
error if their runtime dep isn't installed.
"""
try:
return ADAPTERS[name]
except KeyError as exc:
available = ", ".join(sorted(ADAPTERS)) or "(none)"
raise KeyError(
f"Unknown adapter '{name}'. Registered adapters: {available}"
) from exc
__all__ = [
"ADAPTERS",
"AdapterConfig",
"AdapterContext",
"AgentAdapter",
"PhaseResult",
"StateQueryResult",
"get_adapter",
"register_adapter",
]
# Register built-in adapters at import time. Each adapter module is
# expected to @register_adapter its class. OpenClaw is always
# available; optional adapters (hermes, codex) guard their imports and
# are registered only when their runtime dep is present.
from clawbench.adapters import openclaw as _openclaw # noqa: E402,F401
try:
from clawbench.adapters import hermes as _hermes # noqa: E402,F401
except Exception:
# hermes-agent is an optional extra; absence is fine.
_hermes = None # type: ignore[assignment]

234
clawbench/adapters/base.py Normal file
View File

@ -0,0 +1,234 @@
"""Agent adapter ABC and associated data shapes.
An `AgentAdapter` is the execution counterpart to a `CanonicalTask`. It
is the only place where framework-specific details (OpenClaw gateway
RPCs, Hermes `MiniSWERunner`, Claude Code SDK, etc.) live. Everything
downstream of the adapter trajectory analysis, scorer, judge, stats
consumes a canonical `Transcript` and `TaskRunResult` produced by the
adapter, so those modules stay unchanged across adapters.
Lifecycle per task run:
1. Harness instantiates `adapter = AdapterClass(config)`.
2. `async with adapter as adapter:` starts subprocesses / websockets
/ whatever this adapter needs to hold open across a run.
3. `await adapter.setup(ctx)` realizes seed state, workspace files,
background services, pre-run state queries.
4. For each `CanonicalPhase`: `await adapter.run_phase(phase, ctx)`
drives the simulated user against the agent, returns a
`PhaseResult` with the transcript increment.
5. For each `StateQuery` in `task.verifier.state_queries`:
`await adapter.verify_state_query(query, ctx)` returns whether
the assertion held, or that the adapter lacks the capability.
6. `await adapter.teardown(ctx)` cleans up agent-side state (the
workspace itself is harness-owned).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, ClassVar
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
CanonicalTask,
StateQuery,
)
from clawbench.schemas import Transcript, TranscriptMessage
@dataclass
class AdapterConfig:
"""Base config every adapter accepts.
Adapters subclass this to add their own fields. The harness builds
a config instance from CLI flags / env vars and passes it to the
adapter constructor.
"""
#: Primary model identifier. Semantics are adapter-specific (an
#: OpenClaw model id, a Hermes `--model` string, etc.).
model: str = ""
@dataclass
class AdapterContext:
"""Per-run context handed to every adapter method.
`transcript` is mutated in place across phases: each
`run_phase` call appends the messages it observed, so the scorer
sees one consolidated `Transcript` at the end.
"""
task: CanonicalTask
workspace: Path
runtime_values: dict[str, Any]
run_index: int
model: str
transcript: Transcript
#: Free-form adapter-owned scratch state (e.g. the OpenClaw
#: `session_key` and `agent_id`; the Hermes `MiniSWERunner`
#: instance). The harness never reads these — the adapter is free
#: to use the dict as its own in-context cache.
adapter_state: dict[str, Any] = field(default_factory=dict)
@dataclass
class PhaseResult:
"""The transcript increment produced by a single phase."""
messages: list[TranscriptMessage] = field(default_factory=list)
#: Adapter-specific metadata for this phase (token counts returned
#: by the adapter, session identifiers, etc.). Merged into
#: `TaskRunResult` under the `efficiency_result` / adapter metadata
#: fields where applicable.
adapter_metadata: dict[str, Any] = field(default_factory=dict)
#: True if the adapter detected that the agent completed normally
#: (e.g. Hermes's `completed=True`). Not a pass/fail signal — just
#: whether the trajectory ran out of work vs was cut short. The
#: scorer uses this in `delivery_outcome` classification.
completed_normally: bool = True
#: If the phase aborted due to the adapter itself (not the agent),
#: populated with an error message the harness surfaces.
error: str | None = None
@dataclass
class StateQueryResult:
"""Result of resolving a `StateQuery` against the adapter's state.
`capability_missing=True` means "this adapter cannot evaluate this
kind of query". The scorer treats that as neutral (neither pass nor
fail) and records a skip note in the `CompletionResult`; under
`--strict-compat` the harness will have filtered the task out before
the adapter ever saw it.
"""
ok: bool
detail: str = ""
capability_missing: bool = False
class AgentAdapter(ABC):
"""Abstract base class for agent adapters.
Subclasses MUST:
- Set a unique `name: ClassVar[str]`.
- Set a `capabilities: ClassVar[set[AdapterCapability]]` declaring
which state-query kinds the adapter can resolve.
- Implement `setup`, `run_phase`, `verify_state_query`, `teardown`.
- Optionally implement `__aenter__` / `__aexit__` for long-lived
resource setup (a persistent websocket, a subprocess pool).
"""
name: ClassVar[str] = ""
capabilities: ClassVar[set[AdapterCapability]] = set()
def __init__(self, config: AdapterConfig | None = None) -> None:
self.config: AdapterConfig = config or AdapterConfig()
# ------------------------------------------------------------------
# Optional long-lived resource management.
# ------------------------------------------------------------------
async def __aenter__(self) -> "AgentAdapter":
return self
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None:
return None
# ------------------------------------------------------------------
# Required per-run lifecycle.
# ------------------------------------------------------------------
@abstractmethod
async def setup(self, ctx: AdapterContext) -> None:
"""Realise the workspace, seed state, and any pre-run state.
The harness has already created the workspace dir and expanded
`CanonicalAssets.workspace_files` into it. The adapter is
responsible for:
- Applying `seed_state` entries via an adapter-appropriate
mechanism (OpenClaw memory RPCs; Hermes file writes).
- Starting the agent's process/session so `run_phase` can send
turns immediately.
"""
@abstractmethod
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Drive one `CanonicalPhase` to completion.
The simulated user in `phase.user` dictates what to send and
when. The adapter's job is to deliver those turns, observe the
agent's responses, and append canonical `TranscriptMessage`
entries to `ctx.transcript`.
"""
@abstractmethod
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
"""Resolve one `StateQuery` against the agent's post-run state.
Adapters whose `capabilities` don't cover `query.required_capability`
should return `StateQueryResult(ok=False, capability_missing=True)`.
"""
@abstractmethod
async def teardown(self, ctx: AdapterContext) -> None:
"""Release any agent-side state created during `setup`/`run_phase`.
The harness owns the workspace lifecycle; the adapter owns
sessions, subprocesses, and any in-memory caches it held open.
"""
# ------------------------------------------------------------------
# Convenience helpers available to every adapter.
# ------------------------------------------------------------------
@classmethod
def supported_capabilities(
cls,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
"""Return capabilities available for a concrete adapter config.
Most adapters have a fixed surface and can use the class-level
`capabilities`. Adapters with multiple driver modes, such as Hermes
MiniSWE vs full AIAgent, override this to keep task gating honest.
"""
return set(cls.capabilities)
@classmethod
def missing_capabilities_for(
cls,
task: CanonicalTask,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
"""Return the subset of `task.required_adapter_capabilities` this
adapter cannot cover. Empty set means the task is fully runnable
under this adapter.
"""
return set(task.required_adapter_capabilities) - cls.supported_capabilities(config)
@classmethod
def supports(
cls,
task: CanonicalTask,
config: AdapterConfig | None = None,
) -> bool:
"""True iff this adapter can cover every capability the task needs."""
return not cls.missing_capabilities_for(task, config)

View File

@ -0,0 +1,706 @@
"""Hermes adapter — drives Nous Research `hermes-agent`.
Hermes (https://github.com/NousResearch/hermes-agent) is a Python agent
framework with `MiniSWERunner` as its clean programmatic entry point.
This adapter:
1. Realizes the canonical workspace + seed state (seed_state entries
with `kind="memory"` become files, since Hermes has no memory RPC).
2. Constructs a `MiniSWERunner` scoped to the workspace.
3. For each canonical phase, renders the user turn and calls
`runner.run_task(prompt)` in a worker thread, with the phase's
timeout enforced as a wall clock.
4. Parses the returned `conversations` via
`clawbench.adapters.hermes_xml.parse_conversation` into a canonical
`Transcript` the scorer can consume unchanged.
5. For state queries the adapter can't resolve (session, cron, custom
gateway RPC), returns `capability_missing=True` so the harness
reports a clean skip. Memory queries fall back to workspace file
scanning via `environment_files.verify_memory_fallback`.
`hermes-agent` is an **optional** dependency (`clawbench[hermes]`). The
import is guarded so the base install stays lean; calling this adapter
without the dep installed raises a clear error rather than a cryptic
`ImportError`.
"""
from __future__ import annotations
import asyncio
import importlib.util
import json
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from clawbench.adapters import register_adapter
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
from clawbench.adapters.hermes_xml import parse_chat_messages, parse_conversation
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
StateQuery,
)
from clawbench.environment_files import verify_memory_fallback
from clawbench.render import render_template
from clawbench.schemas import MemoryState, PromptVariant
from clawbench.simulated_user import UserSimulator
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional dependency import — guarded so the base install stays lean.
# ---------------------------------------------------------------------------
def _load_mini_swe_runner() -> tuple[Any, Exception | None]:
try: # pragma: no cover - import-guard branch
from mini_swe_runner import MiniSWERunner as runner_cls # type: ignore[import-not-found]
return runner_cls, None
except Exception as exc: # pragma: no cover - import-guard branch
import_error = exc
candidates: list[Path] = []
explicit_file = os.environ.get("HERMES_MINI_SWE_RUNNER")
if explicit_file:
candidates.append(Path(explicit_file).expanduser())
for env_name in ("HERMES_AGENT_REPO", "HERMES_INSTALL_DIR"):
value = os.environ.get(env_name)
if value:
candidates.append(Path(value).expanduser() / "mini_swe_runner.py")
hermes_home = Path(os.environ.get("HERMES_HOME", "~/.hermes")).expanduser()
candidates.append(hermes_home / "hermes-agent" / "mini_swe_runner.py")
for path in candidates:
if not path.is_file():
continue
try:
repo_root = str(path.parent)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
spec = importlib.util.spec_from_file_location(
"_clawbench_hermes_mini_swe_runner",
path,
)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module.MiniSWERunner, None
except Exception as path_exc:
import_error = path_exc
continue
return None, import_error
MiniSWERunner, _HERMES_IMPORT_ERROR = _load_mini_swe_runner()
def _load_ai_agent() -> tuple[Any, Exception | None]:
try: # pragma: no cover - import-guard branch
from run_agent import AIAgent as agent_cls # type: ignore[import-not-found]
return agent_cls, None
except Exception as exc: # pragma: no cover - import-guard branch
import_error = exc
candidates: list[Path] = []
for env_name in ("HERMES_AGENT_REPO", "HERMES_INSTALL_DIR"):
value = os.environ.get(env_name)
if value:
candidates.append(Path(value).expanduser() / "run_agent.py")
hermes_home = Path(os.environ.get("HERMES_HOME", "~/.hermes")).expanduser()
candidates.append(hermes_home / "hermes-agent" / "run_agent.py")
for path in candidates:
if not path.is_file():
continue
try:
repo_root = str(path.parent)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
spec = importlib.util.spec_from_file_location(
"_clawbench_hermes_run_agent",
path,
)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module.AIAgent, None
except Exception as path_exc:
import_error = path_exc
continue
return None, import_error
AIAgent, _HERMES_AGENT_IMPORT_ERROR = _load_ai_agent()
class _CodexToolMessageCompatClient:
"""Client wrapper for Hermes's Codex Responses shim.
The current Hermes MiniSWERunner feeds OpenAI chat-style `role="tool"`
messages back into `chat.completions.create()`. Hermes's Codex
Responses adapter accepts chat-shaped calls but currently forwards
those tool messages to Responses as plain input items, where Codex
rejects the unsupported role. Rewriting tool results as user-visible
text preserves the important observation for the next turn and keeps
the runner moving.
"""
def __init__(self, inner: Any) -> None:
self._inner = inner
self.chat = _CodexToolMessageCompatChat(inner.chat)
self.api_key = getattr(inner, "api_key", None)
self.base_url = getattr(inner, "base_url", None)
def close(self) -> None:
close = getattr(self._inner, "close", None)
if callable(close):
close()
class _CodexToolMessageCompatChat:
def __init__(self, inner_chat: Any) -> None:
self.completions = _CodexToolMessageCompatCompletions(inner_chat.completions)
class _CodexToolMessageCompatCompletions:
def __init__(self, inner_completions: Any) -> None:
self._inner = inner_completions
def create(self, **kwargs: Any) -> Any:
messages = kwargs.get("messages")
if isinstance(messages, list):
kwargs = dict(kwargs)
kwargs["messages"] = [_rewrite_codex_tool_message(message) for message in messages]
return self._inner.create(**kwargs)
def _rewrite_codex_tool_message(message: Any) -> Any:
if not isinstance(message, dict) or message.get("role") != "tool":
return message
content = message.get("content", "")
if not isinstance(content, str):
content = str(content)
tool_call_id = message.get("tool_call_id") or message.get("name") or "tool"
return {
"role": "user",
"content": f"Tool result ({tool_call_id}):\n{content}",
}
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
@dataclass
class HermesAdapterConfig(AdapterConfig):
"""Config for the Hermes adapter.
Fields map onto `MiniSWERunner` kwargs; ClawBench passes the
canonical model string through verbatim so users pick Hermes-
supported models via the existing `--model` flag.
"""
env_type: str = "local"
max_iterations: int = 15
timeout_seconds: int = 60
base_url: str | None = None
api_key: str | None = None
provider: str | None = None
api_mode: str | None = None
prompt_variant: str = PromptVariant.CLEAR.value
driver_mode: str = "mini_swe"
enabled_toolsets: list[str] | None = None
disabled_toolsets: list[str] | None = None
hermes_home: str | None = None
tool_delay_seconds: float = 0.0
# Optional: an explicit `MiniSWERunner` factory. Used by tests to
# plug in a stub; production code leaves this None and the adapter
# instantiates the real runner lazily.
runner_factory: Any = None
agent_factory: Any = None
@register_adapter
class HermesAdapter(AgentAdapter):
"""Adapter for the Nous Research hermes-agent."""
name = "hermes"
capabilities = {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
@classmethod
def supported_capabilities(cls, config: AdapterConfig | None = None) -> set[AdapterCapability]:
if isinstance(config, HermesAdapterConfig) and config.driver_mode == "ai_agent":
return {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
AdapterCapability.MEMORY,
AdapterCapability.CRON,
AdapterCapability.BROWSER,
AdapterCapability.MULTI_TURN_INJECTION,
}
return set(cls.capabilities)
def __init__(self, config: HermesAdapterConfig | None = None) -> None:
super().__init__(config or HermesAdapterConfig())
self._config: HermesAdapterConfig = self.config # type: ignore[assignment]
# ------------------------------------------------------------------
# Lifecycle.
# ------------------------------------------------------------------
async def setup(self, ctx: AdapterContext) -> None:
"""Realize memory seed state as files and build the runner.
Hermes-in-`env_type=local` operates directly on the workspace
filesystem, so memory `SeedEntry` entries are written out as
`memory/<key>.md` files. Callers that want a different mapping
can pre-populate the workspace before invoking the adapter.
"""
for seed in ctx.task.assets.seed_state:
if seed.kind == "memory" and seed.key:
target = ctx.workspace / "memory" / f"{seed.key}.md"
target.parent.mkdir(parents=True, exist_ok=True)
content = seed.content or ""
if not isinstance(content, str):
content = str(content)
target.write_text(content, encoding="utf-8")
if self._config.driver_mode == "ai_agent":
agent = self._build_ai_agent(ctx)
ctx.adapter_state["agent"] = agent
ctx.adapter_state["conversation_history"] = []
ctx.adapter_state["hermes_home"] = self._hermes_home(ctx)
else:
runner = self._build_runner(ctx)
ctx.adapter_state["runner"] = runner
ctx.adapter_state.setdefault("api_calls", 0)
def _hermes_home(self, ctx: AdapterContext) -> Path:
configured = self._config.hermes_home
if configured:
return Path(configured).expanduser()
return ctx.workspace / ".hermes"
def _prepare_process_env(self, ctx: AdapterContext) -> None:
hermes_home = self._hermes_home(ctx)
hermes_home.mkdir(parents=True, exist_ok=True)
os.environ["HERMES_HOME"] = str(hermes_home)
os.environ["TERMINAL_CWD"] = str(ctx.workspace)
os.environ.setdefault("TERMINAL_ENV", "local")
cron_jobs = sys.modules.get("cron.jobs")
if cron_jobs is not None:
cron_dir = hermes_home / "cron"
setattr(cron_jobs, "HERMES_DIR", hermes_home)
setattr(cron_jobs, "CRON_DIR", cron_dir)
setattr(cron_jobs, "JOBS_FILE", cron_dir / "jobs.json")
setattr(cron_jobs, "OUTPUT_DIR", cron_dir / "output")
def _effective_model(self, ctx: AdapterContext) -> str:
"""Translate ClawBench provider-prefixed slugs for direct providers."""
model = ctx.model
if self._config.provider:
return model
base_url = self._config.base_url or ""
try:
host = urlparse(base_url).hostname or ""
except Exception:
host = ""
if host == "api.openai.com" and model.startswith("openai/"):
return model.split("/", 1)[1]
return model
def _runtime_provider_hint(self) -> str | None:
"""Return the provider identity Hermes should expose to its runtime.
Hermes distinguishes the transport used for the main model from the
auxiliary routing metadata it exposes to side tasks. Direct
OpenAI-compatible endpoints need to keep their explicit base URL and
API key, but should still identify as ``custom`` so Hermes auxiliary
calls resolve to the same primary model instead of falling through to
auto-detected providers such as OpenRouter.
"""
if self._config.provider:
return self._config.provider
if self._config.base_url:
return "custom"
return None
def _build_runner(self, ctx: AdapterContext) -> Any:
explicit_api_key = None if self._config.provider else self._config.api_key
explicit_base_url = None if self._config.provider else self._config.base_url
effective_model = self._effective_model(ctx)
ctx.adapter_state["effective_model"] = effective_model
if self._config.runner_factory is not None:
return self._config.runner_factory(
model=effective_model,
env_type=self._config.env_type,
cwd=str(ctx.workspace),
max_iterations=self._config.max_iterations,
command_timeout=self._config.timeout_seconds,
base_url=explicit_base_url,
api_key=explicit_api_key,
)
if MiniSWERunner is None: # pragma: no cover - import-guard branch
raise RuntimeError(
"HermesAdapter requires Hermes Agent's `mini_swe_runner.py`. "
"Install Hermes with the official installer, or set "
"`HERMES_AGENT_REPO=/path/to/hermes-agent` / "
"`HERMES_MINI_SWE_RUNNER=/path/to/mini_swe_runner.py`. "
f"Underlying import error: {_HERMES_IMPORT_ERROR!r}"
)
runner = MiniSWERunner(
model=effective_model,
env_type=self._config.env_type,
cwd=str(ctx.workspace),
max_iterations=self._config.max_iterations,
command_timeout=self._config.timeout_seconds,
base_url=explicit_base_url,
api_key=explicit_api_key,
)
if self._config.provider:
try:
from agent.auxiliary_client import resolve_provider_client
except Exception as exc: # pragma: no cover - optional Hermes internals
raise RuntimeError(
f"Hermes provider routing requested for '{self._config.provider}', "
"but Hermes provider utilities could not be imported."
) from exc
client, resolved_model = resolve_provider_client(
self._config.provider,
model=ctx.model,
)
if client is None or not resolved_model:
raise RuntimeError(
f"Hermes provider '{self._config.provider}' did not resolve credentials."
)
if self._config.provider == "openai-codex":
client = _CodexToolMessageCompatClient(client)
runner.client = client
runner.model = str(resolved_model)
return runner
def _build_ai_agent(self, ctx: AdapterContext) -> Any:
self._prepare_process_env(ctx)
explicit_api_key = None if self._config.provider else self._config.api_key
explicit_base_url = None if self._config.provider else self._config.base_url
enabled_toolsets = self._config.enabled_toolsets or ["hermes-api-server"]
effective_model = self._effective_model(ctx)
provider_hint = self._runtime_provider_hint()
ctx.adapter_state["effective_model"] = effective_model
if self._config.agent_factory is not None:
return self._config.agent_factory(
model=effective_model,
base_url=explicit_base_url,
api_key=explicit_api_key,
provider=provider_hint,
api_mode=self._config.api_mode,
max_iterations=self._config.max_iterations,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=self._config.disabled_toolsets,
)
if AIAgent is None: # pragma: no cover - import-guard branch
raise RuntimeError(
"HermesAdapter full mode requires Hermes Agent's `run_agent.py`. "
"Set `HERMES_AGENT_REPO=/path/to/hermes-agent` or install Hermes. "
f"Underlying import error: {_HERMES_AGENT_IMPORT_ERROR!r}"
)
return AIAgent(
base_url=explicit_base_url,
api_key=explicit_api_key,
provider=provider_hint,
api_mode=self._config.api_mode,
model=effective_model,
max_iterations=self._config.max_iterations,
tool_delay=self._config.tool_delay_seconds,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=self._config.disabled_toolsets,
quiet_mode=True,
verbose_logging=False,
skip_context_files=True,
session_id=f"clawbench-{ctx.task.id}-run{ctx.run_index}",
platform="cli",
)
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Render the phase's first user turn, invoke Hermes, parse output.
v1 limitation: only the first turn of each phase is delivered.
Tasks that declare `MULTI_TURN_INJECTION` as a required
capability are filtered out at harness level before the adapter
is invoked (harness gating lands in a later step). Guarding
here too keeps the adapter honest if it is driven directly.
"""
if self._config.driver_mode == "ai_agent":
return await self._run_ai_agent_phase(phase, ctx)
runner = ctx.adapter_state.get("runner")
if runner is None:
return PhaseResult(
error="HermesAdapter.run_phase called before setup(); no runner",
completed_normally=False,
)
if not phase.user.turns:
return PhaseResult(completed_normally=True)
# Hermes cannot receive dynamic follow-ups; we render and send
# only the first turn. Later turns remain in the canonical
# phase description but are intentionally dropped here.
first_turn = phase.user.turns[0]
message = first_turn.variant_messages.get(
self._config.prompt_variant, first_turn.message
)
prompt = render_template(message, ctx.runtime_values)
phase_timeout = float(
phase.timeout_seconds
or ctx.task.budgets.timeout_seconds
or self._config.timeout_seconds * self._config.max_iterations
)
try:
result: dict[str, Any] = await asyncio.wait_for(
asyncio.to_thread(runner.run_task, prompt),
timeout=phase_timeout,
)
except asyncio.TimeoutError:
return PhaseResult(
error=f"Hermes phase '{phase.name}' exceeded {phase_timeout:.0f}s",
completed_normally=False,
)
except Exception as exc: # pragma: no cover - runner-internal error
return PhaseResult(
error=f"HermesAdapter runner error: {exc}",
completed_normally=False,
)
phase_transcript = parse_conversation(result or {})
ctx.transcript.messages.extend(phase_transcript.messages)
api_calls = int(result.get("api_calls", 0)) if isinstance(result, dict) else 0
ctx.adapter_state["api_calls"] = (
int(ctx.adapter_state.get("api_calls", 0)) + api_calls
)
return PhaseResult(
messages=phase_transcript.messages,
adapter_metadata={
"api_calls": api_calls,
"hermes_metadata": result.get("metadata", {}) if isinstance(result, dict) else {},
},
completed_normally=bool(result.get("completed", False)) if isinstance(result, dict) else False,
)
async def _run_ai_agent_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
agent = ctx.adapter_state.get("agent")
if agent is None:
return PhaseResult(
error="HermesAdapter.run_phase called before setup(); no AIAgent",
completed_normally=False,
)
simulator = UserSimulator(
phase.user,
ctx.runtime_values,
prompt_variant=self._config.prompt_variant,
)
phase_timeout = float(
phase.timeout_seconds
or ctx.task.budgets.timeout_seconds
or self._config.timeout_seconds * self._config.max_iterations
)
appended_messages: list = []
phase_api_calls = 0
completed = True
while not simulator.is_done:
user_message = await simulator.next_message(ctx.transcript)
if user_message is None:
break
history = list(ctx.adapter_state.get("conversation_history") or [])
try:
result: dict[str, Any] = await asyncio.wait_for(
asyncio.to_thread(
agent.run_conversation,
user_message,
conversation_history=history or None,
task_id=f"{ctx.task.id}-run{ctx.run_index}",
),
timeout=phase_timeout,
)
except asyncio.TimeoutError:
return PhaseResult(
messages=appended_messages,
error=f"Hermes AIAgent phase '{phase.name}' exceeded {phase_timeout:.0f}s",
completed_normally=False,
)
except Exception as exc: # pragma: no cover - agent-internal error
return PhaseResult(
messages=appended_messages,
error=f"HermesAdapter AIAgent error: {exc}",
completed_normally=False,
)
messages = result.get("messages", []) if isinstance(result, dict) else []
if not isinstance(messages, list):
messages = []
delta = messages[len(history):] if len(messages) >= len(history) else messages
phase_transcript = parse_chat_messages(delta)
ctx.transcript.messages.extend(phase_transcript.messages)
appended_messages.extend(phase_transcript.messages)
ctx.adapter_state["conversation_history"] = messages
phase_api_calls += int(result.get("api_calls", 0)) if isinstance(result, dict) else 0
completed = completed and bool(result.get("completed", False))
ctx.adapter_state["api_calls"] = (
int(ctx.adapter_state.get("api_calls", 0)) + phase_api_calls
)
return PhaseResult(
messages=appended_messages,
adapter_metadata={
"api_calls": phase_api_calls,
"driver_mode": "ai_agent",
},
completed_normally=completed,
)
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
if query.kind == "memory":
fallback_state = MemoryState(
key_pattern=str(query.selector.get("key_pattern", "")),
exists=query.predicate != "absent",
value_contains=list(query.expected.get("value_contains", [])),
)
extra_memory_text = self._read_hermes_memory_text(ctx)
ok, detail = verify_memory_fallback(
fallback_state,
ctx.workspace,
transcript=ctx.transcript,
extra_memory_text=extra_memory_text,
)
return StateQueryResult(ok=ok, detail=detail)
if self._config.driver_mode == "ai_agent" and query.kind == "session":
expected_model = str(query.expected.get("model") or "")
if query.predicate == "absent":
return StateQueryResult(ok=False, detail="Hermes AIAgent session exists")
if expected_model and expected_model.lower() not in ctx.model.lower():
return StateQueryResult(
ok=False,
detail=f"Model mismatch: expected {expected_model}, got {ctx.model}",
)
return StateQueryResult(ok=True, detail="OK")
if self._config.driver_mode == "ai_agent" and query.kind == "cron":
return self._verify_cron_file(query, ctx)
# HermesAdapter does not currently expose session/cron/custom
# gateway state. Flag as capability-missing so the scorer can
# apply the neutral skip policy.
return StateQueryResult(
ok=False,
detail=(
f"HermesAdapter does not resolve '{query.kind}' state queries "
f"(missing capability {query.required_capability.value})"
),
capability_missing=True,
)
def _read_hermes_memory_text(self, ctx: AdapterContext) -> str:
hermes_home = Path(ctx.adapter_state.get("hermes_home") or self._hermes_home(ctx))
candidates = [
hermes_home / "memory",
hermes_home / "memories",
hermes_home / "user_memory",
]
chunks: list[str] = []
for candidate in candidates:
if candidate.is_file():
chunks.append(candidate.read_text(encoding="utf-8", errors="replace"))
elif candidate.is_dir():
for path in candidate.rglob("*"):
if path.is_file() and path.suffix.lower() in {".md", ".txt", ".json"}:
try:
chunks.append(path.read_text(encoding="utf-8", errors="replace"))
except Exception:
continue
return "\n".join(chunks)
def _verify_cron_file(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
hermes_home = Path(ctx.adapter_state.get("hermes_home") or self._hermes_home(ctx))
jobs_file = hermes_home / "cron" / "jobs.json"
if not jobs_file.is_file():
if query.predicate == "absent":
return StateQueryResult(ok=True, detail="Correctly absent")
return StateQueryResult(ok=False, detail=f"No Hermes cron jobs file at {jobs_file}")
try:
payload = json.loads(jobs_file.read_text(encoding="utf-8"))
except Exception as exc:
return StateQueryResult(ok=False, detail=f"Could not read Hermes cron jobs: {exc}")
jobs = payload if isinstance(payload, list) else payload.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
if query.predicate == "absent":
return StateQueryResult(
ok=not jobs,
detail="Correctly absent" if not jobs else "Cron jobs exist",
)
description_contains = query.selector.get("description_contains")
if not jobs:
return StateQueryResult(ok=False, detail="No cron jobs found")
if description_contains:
needle = str(description_contains).lower()
if not any(needle in json.dumps(job, sort_keys=True).lower() for job in jobs):
return StateQueryResult(
ok=False,
detail=f"No cron job matched '{description_contains}'",
)
return StateQueryResult(ok=True, detail="OK")
async def teardown(self, ctx: AdapterContext) -> None:
"""Release the runner reference so GC can reclaim its process pool."""
ctx.adapter_state.pop("runner", None)
ctx.adapter_state.pop("agent", None)
__all__ = ["HermesAdapter", "HermesAdapterConfig"]

View File

@ -0,0 +1,494 @@
"""Hermes agent conversation → ClawBench `Transcript` converter.
Hermes's `MiniSWERunner.run_task()` returns a dict shaped like:
```json
{
"conversations": [
{"from": "system", "value": "..."},
{"from": "user", "value": "..."},
{"from": "assistant", "value": "I'll look at the file.\\n<tool_call>{\\"name\\":\\"bash\\",\\"arguments\\":{\\"cmd\\":\\"ls\\"}}</tool_call>"},
{"from": "tool", "value": "<tool_response>{\\"stdout\\":\\"file.py\\"}</tool_response>"},
{"from": "assistant", "value": "<tool_call>...</tool_call>"},
...
],
"completed": true,
"api_calls": 7,
"metadata": {...}
}
```
This module parses that into a canonical `Transcript` with
`TranscriptMessage` + `ToolCall` entries so the scorer / trajectory /
judge layers can score the run without any Hermes-specific knowledge.
The XML parsing is deliberately tolerant: Hermes transcripts observed
in the wild sometimes have malformed JSON inside `<tool_call>` tags
(trailing commas, unescaped newlines). We fall back to a permissive
regex extraction in that case so a single bad tool call doesn't tank
the whole transcript.
"""
from __future__ import annotations
import json
import re
from typing import Any, Iterable
from clawbench.schemas import ToolCall, Transcript, TranscriptMessage
#: One `<tool_call>…</tool_call>` block. Non-greedy across newlines.
_TOOL_CALL_RE = re.compile(
r"<tool_call>\s*(?P<body>.*?)\s*</tool_call>", re.DOTALL
)
#: One `<tool_response>…</tool_response>` block.
_TOOL_RESPONSE_RE = re.compile(
r"<tool_response>\s*(?P<body>.*?)\s*</tool_response>", re.DOTALL
)
def _coerce_role(raw: str) -> str:
"""Normalize Hermes role labels to ClawBench `TranscriptMessage.role`.
ClawBench uses `"user"`, `"assistant"`, `"system"`, `"tool"`. Hermes
can emit `"human"`/`"gpt"`/`"function"` variants; we map them all
down to the canonical vocabulary.
"""
value = (raw or "").strip().lower()
if value in {"assistant", "gpt", "model"}:
return "assistant"
if value in {"user", "human"}:
return "user"
if value in {"tool", "function", "tool_response"}:
return "tool"
if value == "system":
return "system"
return value or "assistant"
def _extract_json_objects(text: str) -> list[dict[str, Any]]:
"""Parse 0-or-more top-level JSON objects from free-form text.
Hermes usually puts a single JSON object inside each `<tool_call>`,
but we handle multi-object payloads defensively. Returns an empty
list if no valid JSON is present.
"""
text = text.strip()
if not text:
return []
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return [parsed]
if isinstance(parsed, list):
return [item for item in parsed if isinstance(item, dict)]
except json.JSONDecodeError:
pass
# Fallback: scan for balanced `{...}` blocks. Useful when the
# assistant wrote slightly malformed JSON. We accept a best-effort
# parse and silently discard the rest.
results: list[dict[str, Any]] = []
depth = 0
start: int | None = None
for i, ch in enumerate(text):
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
depth -= 1
if depth == 0 and start is not None:
candidate = text[start : i + 1]
try:
obj = json.loads(candidate)
if isinstance(obj, dict):
results.append(obj)
except json.JSONDecodeError:
pass
start = None
return results
def _tool_call_from_payload(
payload: dict[str, Any],
*,
index: int,
timestamp_ms: int,
) -> ToolCall:
"""Build a canonical `ToolCall` from a Hermes `<tool_call>` payload.
Hermes emits `{"name": "...", "arguments": {...}}` inside each
tool_call tag. Some Nous-trained models emit slight variants
`"function"` for the tool name, `"parameters"` or `"input"` for
the args. We accept any of those.
"""
name = (
payload.get("name")
or payload.get("function")
or payload.get("tool")
or ""
)
arguments = (
payload.get("arguments")
or payload.get("parameters")
or payload.get("args")
or payload.get("input")
or {}
)
if isinstance(arguments, str):
# Occasionally Hermes passes a JSON-encoded string of args.
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"raw": arguments}
if not isinstance(arguments, dict):
arguments = {"value": arguments}
call_id = str(payload.get("id") or payload.get("call_id") or f"hermes-{index}")
return ToolCall(
id=call_id,
name=str(name),
input=arguments,
timestamp_ms=timestamp_ms,
)
def _tool_response_summary(payload: dict[str, Any]) -> tuple[str, str, bool | None]:
"""Extract (output, error, success) from a `<tool_response>` payload."""
output = ""
error = ""
success: bool | None = None
stdout = payload.get("stdout")
stderr = payload.get("stderr")
result = payload.get("result")
err = payload.get("error")
msg = payload.get("message")
status = payload.get("status")
if isinstance(stdout, str):
output = stdout
elif isinstance(result, (str, dict, list)):
output = result if isinstance(result, str) else json.dumps(result)
elif isinstance(msg, str):
output = msg
if isinstance(stderr, str) and stderr.strip():
error = stderr
elif isinstance(err, (str, dict, list)):
error = err if isinstance(err, str) else json.dumps(err)
if isinstance(status, str):
lowered = status.lower()
if lowered in {"ok", "success", "succeeded"}:
success = True
elif lowered in {"error", "failed", "failure"}:
success = False
if error and success is None:
success = False
if not error and output and success is None:
success = True
return output, error, success
def _split_tagged(text: str, tag_re: re.Pattern[str]) -> list[tuple[str, str]]:
"""Split `text` into `(kind, body)` tuples where `kind` is `"text"` or
`"tag"`. Preserves ordering so we can thread tool calls/responses
back into the canonical transcript in the order they appeared.
"""
pieces: list[tuple[str, str]] = []
cursor = 0
for match in tag_re.finditer(text):
if match.start() > cursor:
pieces.append(("text", text[cursor : match.start()]))
pieces.append(("tag", match.group("body")))
cursor = match.end()
if cursor < len(text):
pieces.append(("text", text[cursor:]))
return pieces
def parse_conversation(result: dict[str, Any]) -> Transcript:
"""Parse a `MiniSWERunner.run_task` result dict into a `Transcript`.
The conversation is processed in order; tool calls are emitted into
the assistant message that contained them, and tool responses are
paired with the most recent unpaired call. The final Transcript is
ready for `annotate_transcript_tool_calls` scorer.
"""
transcript = Transcript()
conversations = result.get("conversations") or []
pending_calls: list[ToolCall] = []
call_counter = 0
for turn_index, entry in enumerate(conversations):
if not isinstance(entry, dict):
continue
role = _coerce_role(str(entry.get("from", "")))
value = str(entry.get("value", "") or "")
# Tool responses arrive from the tool/function role.
if role == "tool":
for response_body in _TOOL_RESPONSE_RE.findall(value):
payloads = _extract_json_objects(response_body)
if not payloads:
payloads = [{"result": response_body}]
for payload in payloads:
output, error, success = _tool_response_summary(payload)
if pending_calls:
target = pending_calls.pop(0)
target.output = output
target.error = error
if success is not None:
target.success = success
else:
# Orphan tool response — surface it as a tool
# message so nothing is silently dropped.
transcript.messages.append(
TranscriptMessage(
role="tool",
tool_result_content=output or error,
)
)
continue
# Everything else (assistant / user / system) may carry tool
# calls plus free-form text. We interleave them faithfully.
pieces = _split_tagged(value, _TOOL_CALL_RE)
text_chunks: list[str] = []
tool_calls: list[ToolCall] = []
for kind, body in pieces:
if kind == "text":
text_chunks.append(body)
else:
payloads = _extract_json_objects(body)
for payload in payloads:
call_counter += 1
tool_call = _tool_call_from_payload(
payload,
index=call_counter,
timestamp_ms=turn_index,
)
tool_calls.append(tool_call)
pending_calls.append(tool_call)
joined_text = "\n".join(chunk for chunk in text_chunks if chunk.strip()).strip()
if role == "assistant":
transcript.messages.append(
TranscriptMessage(
role="assistant",
text=joined_text,
tool_calls=tool_calls,
timestamp_ms=turn_index,
)
)
elif role == "user":
transcript.messages.append(
TranscriptMessage(
role="user",
text=joined_text,
timestamp_ms=turn_index,
)
)
elif role == "system":
if joined_text:
transcript.messages.append(
TranscriptMessage(
role="system",
text=joined_text,
timestamp_ms=turn_index,
)
)
else:
if joined_text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=joined_text,
timestamp_ms=turn_index,
)
)
return transcript
def _content_to_text(content: Any) -> str:
"""Normalize OpenAI/Anthropic-style message content to plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
if isinstance(part.get("text"), str):
parts.append(part["text"])
elif isinstance(part.get("content"), str):
parts.append(part["content"])
return "\n".join(parts)
if isinstance(content, dict):
if isinstance(content.get("text"), str):
return content["text"]
if isinstance(content.get("content"), str):
return content["content"]
return str(content)
def _tool_call_from_chat_payload(
payload: dict[str, Any],
*,
index: int,
timestamp_ms: int,
) -> ToolCall:
"""Build a canonical tool call from chat-completions message payloads."""
function = payload.get("function")
if not isinstance(function, dict):
function = {}
name = (
function.get("name")
or payload.get("name")
or payload.get("tool")
or payload.get("type")
or ""
)
arguments = (
function.get("arguments")
or payload.get("arguments")
or payload.get("args")
or payload.get("input")
or {}
)
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"raw": arguments}
if not isinstance(arguments, dict):
arguments = {"value": arguments}
return ToolCall(
id=str(payload.get("id") or payload.get("call_id") or f"hermes-chat-{index}"),
name=str(name),
input=arguments,
timestamp_ms=timestamp_ms,
)
def parse_chat_messages(messages: Iterable[dict[str, Any]]) -> Transcript:
"""Parse Hermes AIAgent/OpenAI-style message history to a Transcript.
`AIAgent.run_conversation()` returns a `messages` list with user,
assistant, and tool-role entries. This parser preserves ordering and
attaches tool-role output back to the assistant `ToolCall` it belongs to.
"""
transcript = Transcript()
pending_by_id: dict[str, ToolCall] = {}
pending_order: list[ToolCall] = []
call_counter = 0
for turn_index, entry in enumerate(messages):
if not isinstance(entry, dict):
continue
role = _coerce_role(str(entry.get("role") or entry.get("from") or ""))
text = _content_to_text(entry.get("content", entry.get("value", "")))
if role == "tool":
tool_call_id = str(entry.get("tool_call_id") or entry.get("id") or "")
target = pending_by_id.get(tool_call_id) if tool_call_id else None
if target is None and pending_order:
target = pending_order.pop(0)
if target is not None:
target.output = text
target.success = not _looks_like_error(text)
if not target.success:
target.error = text
elif text:
transcript.messages.append(
TranscriptMessage(
role="tool",
tool_result_for=tool_call_id or None,
tool_result_content=text,
timestamp_ms=turn_index,
)
)
continue
tool_calls: list[ToolCall] = []
raw_calls = entry.get("tool_calls") or []
if isinstance(raw_calls, list):
for payload in raw_calls:
if not isinstance(payload, dict):
continue
call_counter += 1
call = _tool_call_from_chat_payload(
payload,
index=call_counter,
timestamp_ms=turn_index,
)
tool_calls.append(call)
pending_by_id[call.id] = call
pending_order.append(call)
if role == "assistant":
transcript.messages.append(
TranscriptMessage(
role="assistant",
text=text,
tool_calls=tool_calls,
timestamp_ms=turn_index,
)
)
elif role in {"user", "system"}:
if text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=text,
timestamp_ms=turn_index,
)
)
elif text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=text,
timestamp_ms=turn_index,
)
)
return transcript
def _looks_like_error(text: str) -> bool:
lowered = text.lower()
return any(token in lowered for token in ("error", "traceback", "failed", "exception"))
def iter_tool_calls_from_conversations(conversations: Iterable[dict[str, Any]]) -> list[ToolCall]:
"""Helper used by tests: pull out just the tool-call sequence.
Equivalent to `parse_conversation({"conversations": list(conv)}).tool_call_sequence`
but skips the assistant-text assembly. Useful for asserting on call
order and arguments without noise.
"""
return parse_conversation({"conversations": list(conversations)}).tool_call_sequence
__all__ = [
"iter_tool_calls_from_conversations",
"parse_chat_messages",
"parse_conversation",
]

View File

@ -0,0 +1,467 @@
"""OpenClaw adapter — drives tasks through an OpenClaw gateway.
This is the adapter-shaped wrapper around the agent execution flow that
has lived inside `BenchmarkHarness._run_single` until now. It holds a
`GatewayClient` open for the run's duration, creates one agent per run
and one session per phase (matching the existing behavior), delivers
simulated-user turns, and resolves `StateQuery` assertions against the
gateway's `memory.search` / `sessions.resolve` / `cron.list` / arbitrary
`_rpc(method)` surface.
The legacy harness still owns the executable CLI path for now; this
adapter is the canonical wrapper used by adapter-level tests and later
harness wiring.
"""
from __future__ import annotations
import json
import logging
import uuid
from dataclasses import dataclass
from clawbench.adapters import register_adapter
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
StateQuery,
)
from clawbench.client import GatewayClient, GatewayConfig
from clawbench.environment_files import (
resolve_json_path,
verify_memory_fallback,
)
from clawbench.schemas import (
MemoryState,
PromptVariant,
)
from clawbench.session_labels import unique_session_label
from clawbench.simulated_user import UserSimulator
logger = logging.getLogger(__name__)
@dataclass
class OpenClawAdapterConfig(AdapterConfig):
"""Config for the OpenClaw adapter.
`gateway` holds the connection parameters the adapter uses to reach
the OpenClaw gateway. `prompt_variant` controls which wording of
each simulated-user turn is rendered.
"""
gateway: GatewayConfig | None = None
prompt_variant: str = PromptVariant.CLEAR.value
# Default per-turn timeout passed to `send_and_wait` when the
# phase does not override it. Matches the existing harness default.
turn_timeout_seconds: float = 180.0
@register_adapter
class OpenClawAdapter(AgentAdapter):
"""Adapter for the OpenClaw gateway (default harness path)."""
name = "openclaw"
capabilities = {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
AdapterCapability.MEMORY,
AdapterCapability.SESSION,
AdapterCapability.CRON,
AdapterCapability.BROWSER,
AdapterCapability.GATEWAY_RPC,
AdapterCapability.MULTI_TURN_INJECTION,
}
def __init__(self, config: OpenClawAdapterConfig | None = None) -> None:
super().__init__(config or OpenClawAdapterConfig())
self._config: OpenClawAdapterConfig = self.config # type: ignore[assignment]
self._gateway_config: GatewayConfig = self._config.gateway or GatewayConfig()
self._client: GatewayClient | None = None
# Dependency injection hook for tests: monkeypatch this to swap
# in a stub gateway without touching the class definition.
self._client_factory = lambda: GatewayClient(self._gateway_config)
# ------------------------------------------------------------------
# Long-lived gateway connection.
# ------------------------------------------------------------------
async def __aenter__(self) -> "OpenClawAdapter":
client = self._client_factory()
await client.__aenter__()
self._client = client
return self
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None:
if self._client is not None:
try:
await self._client.__aexit__(exc_type, exc, tb)
finally:
self._client = None
@property
def client(self) -> GatewayClient:
if self._client is None:
raise RuntimeError(
"OpenClawAdapter must be used as an async context manager "
"before calling setup/run_phase/teardown."
)
return self._client
# ------------------------------------------------------------------
# Lifecycle.
# ------------------------------------------------------------------
async def setup(self, ctx: AdapterContext) -> None:
"""Create the per-run agent and run pre-run state queries."""
self._realize_memory_seeds(ctx)
agent_name = (
f"clawbench-{ctx.task.id}-run-{ctx.run_index}-{uuid.uuid4().hex[:6]}"
)
agent_id = await self.client.create_agent(
name=agent_name, workspace=str(ctx.workspace)
)
ctx.adapter_state["agent_id"] = agent_id
ctx.adapter_state.setdefault("session_keys", [])
# Pre-run gateway assertions (ex-`setup.pre_check_gateway`) —
# evaluated immediately, failures are surfaced via the returned
# state via `ctx.adapter_state["pre_run_failures"]` so the
# harness can fail fast before doing any phase work.
failures: list[str] = []
for query in ctx.task.verifier.pre_run_queries:
result = await self.verify_state_query(query, ctx)
if not result.ok:
failures.append(result.detail or query.description)
if failures:
ctx.adapter_state["pre_run_failures"] = failures
def _realize_memory_seeds(self, ctx: AdapterContext) -> None:
"""Expose canonical memory seeds through the run workspace.
OpenClaw's native memory backend has no public seed/write RPC in the
benchmark client, but agents can read files in their workspace and the
verifier already falls back to these same memory files. This keeps
seeded-memory tasks fair across OpenClaw and filesystem-first harnesses.
"""
chunks: list[str] = []
for seed in ctx.task.assets.seed_state:
if seed.kind != "memory" or not seed.key:
continue
content = seed.content or ""
if not isinstance(content, str):
content = str(content)
safe_key = "".join(
ch if ch.isalnum() or ch in ("-", "_") else "_"
for ch in seed.key.strip()
).strip("_")
if not safe_key:
safe_key = "seed"
body = f"# {seed.key}\n\n{content.strip()}\n"
target = ctx.workspace / "memory" / f"{safe_key}.md"
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(body, encoding="utf-8")
chunks.append(body)
if chunks:
(ctx.workspace / "MEMORY.md").write_text("\n".join(chunks), encoding="utf-8")
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Create a session, drive the simulator, append to the transcript."""
agent_id = ctx.adapter_state.get("agent_id")
if not agent_id:
return PhaseResult(
error="OpenClawAdapter.run_phase called before setup(); no agent_id",
completed_normally=False,
)
session_keys: list[str] = ctx.adapter_state.setdefault("session_keys", [])
session_key = await self.client.create_session(
model=ctx.model,
agent_id=agent_id,
label=unique_session_label(
f"clawbench-{ctx.task.id}-run{ctx.run_index}-phase{phase.name}"
),
)
session_keys.append(session_key)
ctx.adapter_state["last_session_key"] = session_key
await self.client.subscribe(session_key)
# Browser tasks require the browser tool to actually be
# registered in the effective tool set for this session. If it
# isn't, fail the phase fast rather than letting the agent
# flounder against a missing tool.
if ctx.task.family.value == "browser":
try:
await self._assert_browser_support(session_key)
except Exception as exc:
return PhaseResult(
error=str(exc),
completed_normally=False,
)
simulator = UserSimulator(
phase.user,
ctx.runtime_values,
prompt_variant=self._config.prompt_variant,
)
turn_timeout = float(phase.timeout_seconds or ctx.task.budgets.timeout_seconds)
turn_timeout = min(turn_timeout, self._config.turn_timeout_seconds)
appended: list = []
turns_sent = 0
while not simulator.is_done:
user_message = await simulator.next_message(ctx.transcript)
if user_message is None:
break
phase_transcript = await self.client.send_and_wait(
session_key,
user_message,
timeout=turn_timeout,
)
ctx.transcript.messages.extend(phase_transcript.messages)
appended.extend(phase_transcript.messages)
turns_sent += 1
return PhaseResult(
messages=appended,
adapter_metadata={
"session_key": session_key,
"turns_sent": turns_sent,
},
)
async def _assert_browser_support(self, session_key: str) -> None:
inventory = await self.client.get_effective_tools(session_key)
tool_ids = {
str(tool.get("id", ""))
for group in inventory.get("groups", [])
for tool in group.get("tools", [])
}
if "browser" not in tool_ids:
raise RuntimeError(
"Browser tasks require the browser tool, but it is not available in this gateway."
)
async def teardown(self, ctx: AdapterContext) -> None:
"""Delete per-phase sessions and the per-run agent."""
client = self._client
if client is None:
return
session_keys: list[str] = ctx.adapter_state.get("session_keys", [])
agent_id: str | None = ctx.adapter_state.get("agent_id")
for session_key in session_keys:
try:
await client.delete_session(session_key)
except Exception as exc: # pragma: no cover - best effort
logger.warning("delete_session failed for %s: %s", session_key, exc)
if agent_id:
try:
await client.delete_agent(agent_id, delete_files=False)
except Exception as exc: # pragma: no cover - best effort
logger.warning("delete_agent failed for %s: %s", agent_id, exc)
# ------------------------------------------------------------------
# State query resolution.
# ------------------------------------------------------------------
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
try:
if query.kind == "memory":
return await self._verify_memory(query, ctx)
if query.kind == "session":
return await self._verify_session(query, ctx)
if query.kind == "cron":
return await self._verify_cron(query, ctx)
if query.kind == "custom":
return await self._verify_gateway(query, ctx)
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
return StateQueryResult(
ok=False,
detail=f"OpenClawAdapter has no handler for query kind '{query.kind}'",
capability_missing=True,
)
# --- memory ---
async def _verify_memory(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
key_pattern = str(query.selector.get("key_pattern", ""))
value_contains = list(query.expected.get("value_contains", []))
session_key = ctx.adapter_state.get("last_session_key", "")
agent_id = ctx.adapter_state.get("agent_id")
# Primary path: memory.search RPC.
try:
response = await self.client._rpc(
"memory.search",
{
"query": key_pattern,
"sessionKey": session_key,
"limit": 20,
},
)
entries = response.get("payload", {}).get("entries", [])
if query.predicate == "absent":
ok = not entries
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Memory entry exists",
)
if not entries:
return StateQueryResult(ok=False, detail="No matching memory entries found")
all_values = " ".join(str(entry.get("value", "")) for entry in entries)
for token in value_contains:
if token.lower() not in all_values.lower():
return StateQueryResult(
ok=False, detail=f"Memory value missing '{token}'"
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
logger.info(
"memory.search unavailable for verification, falling back: %s",
exc,
)
# Fallback: gateway-sourced memory files + workspace scan + transcript.
fallback_state = MemoryState(
key_pattern=key_pattern,
exists=query.predicate != "absent",
value_contains=value_contains,
)
extra_memory_text = ""
if agent_id:
try:
from clawbench.environment import _read_agent_memory_text # local import to avoid cycle
extra_memory_text = await _read_agent_memory_text(self.client, agent_id)
except Exception:
extra_memory_text = ""
ok, detail = verify_memory_fallback(
fallback_state,
ctx.workspace,
transcript=ctx.transcript,
extra_memory_text=extra_memory_text,
)
return StateQueryResult(ok=ok, detail=detail)
# --- session ---
async def _verify_session(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
session_key = ctx.adapter_state.get("last_session_key", "")
expected_model = query.expected.get("model") or ""
try:
response = await self.client._rpc("sessions.resolve", {"key": session_key})
payload = response.get("payload", {})
if query.predicate == "absent":
return StateQueryResult(ok=False, detail="Session exists but should not")
if expected_model:
actual = str(payload.get("model", ""))
if str(expected_model).lower() not in actual.lower():
return StateQueryResult(
ok=False,
detail=f"Model mismatch: expected {expected_model}, got {actual}",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
if query.predicate == "absent":
return StateQueryResult(ok=True, detail="Correctly absent")
return StateQueryResult(ok=False, detail=str(exc))
# --- cron ---
async def _verify_cron(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
description_contains = query.selector.get("description_contains")
try:
response = await self.client._rpc("cron.list", {})
jobs = response.get("payload", {}).get("jobs", [])
if query.predicate == "absent":
ok = not jobs
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Cron jobs exist",
)
if not jobs:
return StateQueryResult(ok=False, detail="No cron jobs found")
if description_contains and not any(
str(description_contains).lower() in json.dumps(job).lower() for job in jobs
):
return StateQueryResult(
ok=False,
detail=f"No cron job matched '{description_contains}'",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
# --- arbitrary gateway RPC ---
async def _verify_gateway(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
method = str(query.selector.get("method", ""))
params = dict(query.selector.get("params", {}))
assert_path = str(query.selector.get("assert_path", "$"))
expected_equals = query.expected.get("equals")
expected_contains = query.expected.get("contains")
expected_exists = bool(query.expected.get("exists", True))
try:
response = await self.client._rpc(method, params)
payload = response.get("payload", {})
value = resolve_json_path(payload, assert_path)
if not expected_exists:
ok = value is None
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Path exists",
)
if value is None:
return StateQueryResult(
ok=False, detail=f"Path {assert_path} not found"
)
if expected_equals is not None and value != expected_equals:
return StateQueryResult(
ok=False, detail=f"Expected {expected_equals}, got {value}"
)
if (
expected_contains is not None
and str(expected_contains).lower() not in str(value).lower()
):
return StateQueryResult(
ok=False,
detail=f"Expected '{expected_contains}' in {value}",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
__all__ = ["OpenClawAdapter", "OpenClawAdapterConfig"]

View File

@ -0,0 +1,45 @@
"""Canonical task schema — agent-agnostic intent layer.
Part of ClawBench Phase-4 per CLAWBENCH_V0_4_SPEC.md §"Canonical Task Schema".
Splits canonical task intent (what to set up, prompt with, and verify) from
OpenClaw-specific execution details (which become adapter responsibilities).
The existing `TaskDefinition` in `clawbench/schemas.py` stays as-is for
back-compat; this package adds a canonical view produced by
`convert.from_task_definition`, which is the single bridge between the two
shapes. Everything downstream of the harness (scorer, trajectory, judge,
stats) is already agent-agnostic those modules consume the transcript +
TaskRunResult and do not need changes.
"""
from clawbench.canonical.schema import (
AdapterCapability,
BudgetSpec,
CanonicalAssets,
CanonicalPhase,
CanonicalTask,
Deliverable,
InteractionPolicy,
SeedEntry,
StateQuery,
StateQueryKind,
StateQueryPredicate,
VerifierContract,
)
from clawbench.canonical.convert import from_task_definition
__all__ = [
"AdapterCapability",
"BudgetSpec",
"CanonicalAssets",
"CanonicalPhase",
"CanonicalTask",
"Deliverable",
"InteractionPolicy",
"SeedEntry",
"StateQuery",
"StateQueryKind",
"StateQueryPredicate",
"VerifierContract",
"from_task_definition",
]

View File

@ -0,0 +1,328 @@
"""Convert `TaskDefinition` → `CanonicalTask`.
This is the single bridge between the existing OpenClaw-entangled task
format (`clawbench.schemas.TaskDefinition`) and the agent-agnostic
canonical form (`CanonicalTask`). Callers load tasks as usual via
`clawbench.tasks.load_all_tasks` and then call
`from_task_definition(task)` to get the canonical view.
Field mappings (any field not mentioned is copied verbatim):
- `setup.asset_packs` `assets.seed_state` (kind="file", asset_pack=...)
- `setup.workspace_files` `assets.workspace_files`
- `setup.background_services` `assets.background_services`
- `setup.memory_seed` `assets.seed_state` (kind="memory")
- `setup.pre_check_gateway` `verifier.pre_run_queries` (GATEWAY_RPC)
- `completion.files` `verifier.file_states`
- `completion.execution_checks` `verifier.execution_checks`
- `completion.memory` `verifier.state_queries` (MEMORY)
- `completion.session` `verifier.state_queries` (SESSION)
- `completion.cron` `verifier.state_queries` (CRON)
- `completion.gateway_assertions` `verifier.state_queries` (GATEWAY_RPC)
- `trajectory` `verifier.trajectory`
- `behavior` `verifier.behavior`
- `judge` `verifier.judge`
- `user` / `phases` `phases` via `task.normalized_phases()`
- `timeout_seconds` `budgets.timeout_seconds` (also on each phase)
`required_adapter_capabilities` is computed from what the task actually
needs: always `{FILES, EXECUTION}`, plus `MEMORY`/`SESSION`/`CRON`/
`GATEWAY_RPC`/`BROWSER`/`MULTI_TURN_INJECTION` when the source task's
fields trigger those capabilities.
"""
from __future__ import annotations
from clawbench.canonical.schema import (
AdapterCapability,
BudgetSpec,
CanonicalAssets,
CanonicalPhase,
CanonicalTask,
InteractionPolicy,
SeedEntry,
StateQuery,
VerifierContract,
)
from clawbench.schemas import (
CronState,
GatewayAssertion,
MemoryState,
SessionState,
TaskDefinition,
TaskFamily,
UserTurn,
)
# ---------------------------------------------------------------------------
# Seed state
# ---------------------------------------------------------------------------
def _seeds_from_setup(task: TaskDefinition) -> list[SeedEntry]:
seeds: list[SeedEntry] = []
for pack in task.setup.asset_packs:
seeds.append(SeedEntry(kind="file", asset_pack=pack))
for entry in task.setup.memory_seed:
# memory_seed entries are free-form dicts in the existing schema;
# we preserve them verbatim in `metadata` and surface `key` +
# `content` when present so adapters can consume the structured
# pieces without re-parsing.
seeds.append(
SeedEntry(
kind="memory",
key=str(entry.get("key", "")),
content=entry.get("value") or entry.get("content"),
metadata=dict(entry),
)
)
return seeds
# ---------------------------------------------------------------------------
# State queries: memory / session / cron / gateway_assertions
# ---------------------------------------------------------------------------
def _memory_state_to_query(state: MemoryState) -> StateQuery:
expected: dict[str, object] = {}
if state.value_contains:
expected["value_contains"] = list(state.value_contains)
return StateQuery(
kind="memory",
predicate="exists" if state.exists else "absent",
selector={"key_pattern": state.key_pattern},
expected=expected,
required_capability=AdapterCapability.MEMORY,
description=f"memory key ~ /{state.key_pattern}/",
)
def _session_state_to_query(state: SessionState) -> StateQuery:
expected: dict[str, object] = {}
if state.model_should_be:
expected["model"] = state.model_should_be
return StateQuery(
kind="session",
predicate="exists" if state.should_exist else "absent",
selector={},
expected=expected,
required_capability=AdapterCapability.SESSION,
description="session state",
)
def _cron_state_to_query(state: CronState) -> StateQuery:
selector: dict[str, object] = {}
if state.description_contains:
selector["description_contains"] = state.description_contains
return StateQuery(
kind="cron",
predicate="exists" if state.exists else "absent",
selector=selector,
expected={},
required_capability=AdapterCapability.CRON,
description="cron schedule",
)
def _gateway_assertion_to_query(assertion: GatewayAssertion) -> StateQuery:
selector: dict[str, object] = {
"method": assertion.method,
"params": dict(assertion.params),
"assert_path": assertion.assert_path,
}
expected: dict[str, object] = {}
if assertion.assert_equals is not None:
expected["equals"] = assertion.assert_equals
if assertion.assert_contains is not None:
expected["contains"] = assertion.assert_contains
expected["exists"] = assertion.assert_exists
predicate = "exists"
if assertion.assert_equals is not None:
predicate = "equals"
elif assertion.assert_contains is not None:
predicate = "contains"
elif not assertion.assert_exists:
predicate = "absent"
return StateQuery(
kind="custom",
predicate=predicate,
selector=selector,
expected=expected,
required_capability=AdapterCapability.GATEWAY_RPC,
description=f"gateway rpc: {assertion.method}",
)
def _state_queries_from_completion(task: TaskDefinition) -> list[StateQuery]:
queries: list[StateQuery] = []
for mem in task.completion.memory:
queries.append(_memory_state_to_query(mem))
if task.completion.session is not None:
queries.append(_session_state_to_query(task.completion.session))
for cron in task.completion.cron:
queries.append(_cron_state_to_query(cron))
for assertion in task.completion.gateway_assertions:
queries.append(_gateway_assertion_to_query(assertion))
return queries
def _pre_run_queries_from_setup(task: TaskDefinition) -> list[StateQuery]:
return [_gateway_assertion_to_query(a) for a in task.setup.pre_check_gateway]
# ---------------------------------------------------------------------------
# Phases + dynamic-turn detection
# ---------------------------------------------------------------------------
_DYNAMIC_TURN_FIELDS = (
"when_tool_family",
"when_tool_name",
"when_assistant_contains",
"when_last_tool_failed",
)
def _turn_is_dynamic(turn: UserTurn) -> bool:
if turn.when_last_tool_failed:
return True
for name in _DYNAMIC_TURN_FIELDS:
value = getattr(turn, name, None)
if isinstance(value, bool):
if value:
return True
elif value:
return True
return False
def _phases_from_task(task: TaskDefinition) -> tuple[list[CanonicalPhase], bool]:
phases: list[CanonicalPhase] = []
any_dynamic = False
for phase in task.normalized_phases():
phases.append(
CanonicalPhase(
name=phase.name,
user=phase.user,
timeout_seconds=phase.timeout_seconds,
)
)
if len(phase.user.turns) > 1 or any(_turn_is_dynamic(t) for t in phase.user.turns):
any_dynamic = True
return phases, any_dynamic
# ---------------------------------------------------------------------------
# Capability inference
# ---------------------------------------------------------------------------
def _capabilities_for_task(task: TaskDefinition, *, uses_dynamic: bool) -> set[AdapterCapability]:
caps: set[AdapterCapability] = {AdapterCapability.FILES, AdapterCapability.EXECUTION}
if task.completion.memory or any(seed.get("key") for seed in task.setup.memory_seed):
caps.add(AdapterCapability.MEMORY)
if task.completion.session is not None:
caps.add(AdapterCapability.SESSION)
if task.completion.cron:
caps.add(AdapterCapability.CRON)
if task.completion.gateway_assertions or task.setup.pre_check_gateway:
caps.add(AdapterCapability.GATEWAY_RPC)
if task.family == TaskFamily.BROWSER:
caps.add(AdapterCapability.BROWSER)
if uses_dynamic:
caps.add(AdapterCapability.MULTI_TURN_INJECTION)
return caps
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def from_task_definition(task: TaskDefinition) -> CanonicalTask:
"""Produce the canonical view of a legacy `TaskDefinition`.
This is lossless for fields that have a canonical equivalent.
OpenClaw-only constructs (gateway_assertions, pre_check_gateway,
memory_seed) become `StateQuery` entries / `SeedEntry` entries
tagged with the capability an adapter needs to resolve them.
"""
phases, any_dynamic = _phases_from_task(task)
assets = CanonicalAssets(
workspace_files=list(task.setup.workspace_files),
background_services=list(task.setup.background_services),
seed_state=_seeds_from_setup(task),
)
verifier = VerifierContract(
file_states=list(task.completion.files),
execution_checks=list(task.completion.execution_checks),
state_queries=_state_queries_from_completion(task),
pre_run_queries=_pre_run_queries_from_setup(task),
trajectory=task.trajectory,
behavior=task.behavior,
judge=task.judge,
)
interaction = InteractionPolicy(
max_turns=max((phase.user.max_turns for phase in phases), default=20),
allow_multi_phase=len(phases) > 1,
uses_dynamic_user_triggers=any_dynamic,
)
budgets = BudgetSpec(timeout_seconds=task.timeout_seconds)
capabilities = _capabilities_for_task(task, uses_dynamic=any_dynamic)
return CanonicalTask(
id=task.id,
name=task.name,
tier=task.tier,
family=task.family,
surface=task.surface,
scenario=task.scenario,
subscenario=task.subscenario,
capabilities=list(task.capabilities),
atomic_capabilities=list(task.atomic_capabilities),
pool=task.pool,
subsets=list(task.subsets),
variant_group=task.variant_group,
variant_id=task.variant_id,
template_id=task.template_id,
release_id=task.release_id,
source_kind=task.source_kind,
provenance_ids=list(task.provenance_ids),
privacy_tier=task.privacy_tier,
contamination_risk=task.contamination_risk,
freshness_epoch=task.freshness_epoch,
category=task.category,
domain=task.domain,
functionality=list(task.functionality),
trace_distribution=list(task.trace_distribution),
tool_surface=list(task.tool_surface),
risk_tags=list(task.risk_tags),
first_used_at=task.first_used_at,
retire_after_runs=task.retire_after_runs,
similarity_hash=task.similarity_hash,
canary_token=task.canary_token,
official=task.official,
query_difficulty=task.query_difficulty,
query_weight=task.query_weight,
artifact_type=task.artifact_type,
preconditions=list(task.preconditions),
source_dataset=task.source_dataset,
prompt_variants=list(task.prompt_variants),
pass_threshold=task.pass_threshold,
assets=assets,
phases=phases,
verifier=verifier,
budgets=budgets,
interaction=interaction,
deliverables=[],
required_adapter_capabilities=capabilities,
)

View File

@ -0,0 +1,296 @@
"""Canonical task schema — agent-agnostic intent.
This is the Phase-4 split of `TaskDefinition` (see CLAWBENCH_V0_4_SPEC.md
§"Canonical Task Schema"). The canonical layer expresses **what** a task
is its identity, prompts, assets, and verification contract without
saying **how** it gets executed. The "how" (gateway RPCs, session
lifecycle, tool-family normalization) lives in per-adapter code under
`clawbench/adapters/`.
The rule of thumb:
- If a field describes what the user asked for, what files/state the
agent is expected to produce, or what the run must satisfy to pass,
it belongs here.
- If a field describes how OpenClaw's gateway is called to drive the
run or read back state, it belongs in the OpenClaw adapter (and the
canonical version of that check is a `StateQuery` with a
`required_capability`).
Converting from `TaskDefinition` `CanonicalTask` is lossless for fields
that have a canonical equivalent; OpenClaw-only fields (like
`pre_check_gateway` and `gateway_assertions`) survive as `StateQuery`
entries tagged with `AdapterCapability.GATEWAY_RPC`, so adapters that
support them can still resolve them while adapters that don't can cleanly
report a capability gap.
"""
from __future__ import annotations
import enum
from typing import Any, Literal
from pydantic import BaseModel, Field, model_validator
from clawbench.schemas import (
ArtifactType,
BackgroundService,
BehaviorExpectations,
CapabilityTag,
ExecutionCheck,
FileState,
JudgeExpectations,
PromptVariant,
QueryDifficulty,
ScenarioDomain,
SimulatedUser,
TaskFamily,
TaskPool,
TaskSubset,
Tier,
TrajectoryExpectations,
)
class AdapterCapability(str, enum.Enum):
"""What an adapter is able to provide to a running task.
Each `StateQuery` declares a `required_capability`. If the selected
adapter's `capabilities` set does not include that capability, the
harness either skips the task entirely (strict mode) or scores the
query as neutral (partial mode). This keeps the leaderboard honest
about what an adapter can actually evaluate.
"""
FILES = "files"
EXECUTION = "execution"
MEMORY = "memory"
SESSION = "session"
CRON = "cron"
BROWSER = "browser"
GATEWAY_RPC = "gateway_rpc"
# The adapter can deliver additional user turns mid-trajectory in
# response to simulated-user triggers (when_tool_family,
# when_assistant_contains, etc). Single-shot drivers like Hermes's
# MiniSWERunner do not provide this.
MULTI_TURN_INJECTION = "multi_turn_injection"
StateQueryKind = Literal["memory", "session", "cron", "custom"]
StateQueryPredicate = Literal["exists", "absent", "equals", "contains"]
class StateQuery(BaseModel):
"""An abstract state assertion resolved by the active adapter.
The canonical layer does not commit to how the state is read. For
example, a `kind="memory"` query with `selector={"key_pattern":"alpha"}`
and `expected={"value_contains":["foo"]}` means "there is a memory
entry whose key matches /alpha/ and whose value contains 'foo'".
OpenClaw's adapter resolves that against the `memory.search` gateway
RPC; a filesystem-memory adapter (e.g. Hermes) resolves it by
scanning `MEMORY.md` / `memory/notes.md` in the workspace.
The `required_capability` is what the harness checks against the
adapter's declared capability set.
"""
kind: StateQueryKind
predicate: StateQueryPredicate = "exists"
selector: dict[str, Any] = Field(default_factory=dict)
expected: dict[str, Any] = Field(default_factory=dict)
required_capability: AdapterCapability
description: str = ""
class SeedEntry(BaseModel):
"""A single piece of pre-task state to seed into the workspace.
`kind="file"`: the adapter writes `content` (or copies a bundled
asset via `asset_pack`) to `path` inside the workspace.
`kind="memory"`: the adapter seeds a memory entry with `key` and
`content`. Adapters without memory support fall back to writing
the seed as a file (see `environment_files.verify_memory_fallback`).
"""
kind: Literal["file", "memory"]
path: str | None = None
content: str | None = None
key: str | None = None
asset_pack: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
@model_validator(mode="after")
def _validate_shape(self) -> SeedEntry:
if self.kind == "file" and not self.path and not self.asset_pack:
raise ValueError("SeedEntry(kind='file') requires `path` or `asset_pack`.")
if self.kind == "memory" and not self.key:
raise ValueError("SeedEntry(kind='memory') requires `key`.")
return self
class Deliverable(BaseModel):
"""A user-visible artifact the task is expected to produce."""
kind: ArtifactType
paths: list[str] = Field(default_factory=list)
description: str = ""
class BudgetSpec(BaseModel):
"""Per-task execution budgets.
`timeout_seconds` is the wall clock for the full run (all phases).
`max_tool_calls=0` means unbounded within the timeout. Adapters are
expected to honor these as soft caps; the harness will also enforce
the timeout as a hard deadline.
"""
timeout_seconds: int = 180
max_tool_calls: int = 0
per_turn_timeout_seconds: int = 0
class InteractionPolicy(BaseModel):
"""How the canonical phases drive the agent."""
max_turns: int = 20
allow_multi_phase: bool = True
# Declares that the task's simulated user sends follow-up turns
# based on trajectory triggers (not just counts). Adapters without
# MULTI_TURN_INJECTION cannot deliver these dynamically.
uses_dynamic_user_triggers: bool = False
class VerifierContract(BaseModel):
"""Everything needed to score a run, independent of how it ran.
The file/execution halves are fully agent-agnostic `environment_files`
evaluates them against the workspace directly. State queries are
resolved by `adapter.verify_state_query`. Trajectory and behavior
expectations are evaluated against the `Transcript` (already agent-
agnostic). The optional judge rubric is evaluated against artifacts
+ transcript + completion feedback.
"""
file_states: list[FileState] = Field(default_factory=list)
execution_checks: list[ExecutionCheck] = Field(default_factory=list)
state_queries: list[StateQuery] = Field(default_factory=list)
pre_run_queries: list[StateQuery] = Field(default_factory=list)
trajectory: TrajectoryExpectations = Field(default_factory=TrajectoryExpectations)
behavior: BehaviorExpectations = Field(default_factory=BehaviorExpectations)
judge: JudgeExpectations | None = None
class CanonicalAssets(BaseModel):
"""Workspace + seed state the harness realizes before phases run.
`workspace_files` is a list of relative paths (resolved against the
task's assets/ dir) to copy into the workspace. `background_services`
is already canonical (subprocess + readiness probe, no OpenClaw
coupling). `seed_state` replaces `asset_packs` + `memory_seed` with
a uniform per-entry list.
"""
workspace_files: list[str] = Field(default_factory=list)
background_services: list[BackgroundService] = Field(default_factory=list)
seed_state: list[SeedEntry] = Field(default_factory=list)
class CanonicalPhase(BaseModel):
"""One simulated-user phase in a multi-phase task.
`user` is reused verbatim from `clawbench.schemas.SimulatedUser`
it is already agent-agnostic (turn text + canonical trigger
predicates). Whether a specific trigger fires on a given adapter
depends on whether tool-family tags are populated, which is an
adapter responsibility.
"""
name: str
user: SimulatedUser
timeout_seconds: int | None = None
class CanonicalTask(BaseModel):
"""Agent-agnostic task definition.
Produced by `convert.from_task_definition` from an existing
`TaskDefinition`. Consumed by adapters via `AdapterContext` and by
the scorer + trajectory/judge layers. No field here is OpenClaw-
specific; OpenClaw-only semantics survive as `StateQuery` entries
with `required_capability=GATEWAY_RPC`.
"""
# Identity and taxonomy (already canonical in TaskDefinition).
id: str
name: str
tier: Tier
family: TaskFamily
surface: str
scenario: ScenarioDomain | None = None
subscenario: str = ""
capabilities: list[CapabilityTag] = Field(default_factory=list)
atomic_capabilities: list[str] = Field(default_factory=list)
# Pool / rotation / provenance.
pool: TaskPool = TaskPool.PUBLIC_DEV
subsets: list[TaskSubset] = Field(default_factory=list)
variant_group: str = ""
variant_id: str = "main"
template_id: str = ""
release_id: str = ""
source_kind: str = ""
provenance_ids: list[str] = Field(default_factory=list)
privacy_tier: str = ""
contamination_risk: str = ""
freshness_epoch: str = ""
category: str = ""
domain: str = ""
functionality: list[str] = Field(default_factory=list)
trace_distribution: list[str] = Field(default_factory=list)
tool_surface: list[str] = Field(default_factory=list)
risk_tags: list[str] = Field(default_factory=list)
first_used_at: str = ""
retire_after_runs: int = 0
similarity_hash: str = ""
canary_token: str = ""
official: bool = False
# Policy + prompts.
query_difficulty: QueryDifficulty | None = None
query_weight: float = 1.0
artifact_type: ArtifactType | None = None
preconditions: list[str] = Field(default_factory=list)
source_dataset: str = ""
prompt_variants: list[PromptVariant] = Field(default_factory=lambda: [PromptVariant.CLEAR])
pass_threshold: float = 0.7
# Canonical body.
assets: CanonicalAssets = Field(default_factory=CanonicalAssets)
phases: list[CanonicalPhase]
verifier: VerifierContract = Field(default_factory=VerifierContract)
budgets: BudgetSpec = Field(default_factory=BudgetSpec)
interaction: InteractionPolicy = Field(default_factory=InteractionPolicy)
deliverables: list[Deliverable] = Field(default_factory=list)
# Adapter gating.
required_adapter_capabilities: set[AdapterCapability] = Field(default_factory=set)
# Forward-compat: lets us evolve this schema while hidden / external
# task manifests continue to validate.
schema_version: str = "1"
@model_validator(mode="after")
def _defaults(self) -> CanonicalTask:
if not self.variant_group:
self.variant_group = self.id
if not self.prompt_variants:
self.prompt_variants = [PromptVariant.CLEAR]
else:
deduped: list[PromptVariant] = []
for variant in self.prompt_variants:
if variant not in deduped:
deduped.append(variant)
self.prompt_variants = deduped
return self

View File

@ -226,14 +226,73 @@ class GatewayClient:
attempt += 1
try:
remaining = max(1.0, deadline - asyncio.get_running_loop().time())
attempt_timeout = min(30.0, remaining)
self._ws = await websockets.connect(
self.config.url,
max_size=10 * 1024 * 1024,
open_timeout=min(self.config.connect_timeout, remaining),
open_timeout=attempt_timeout,
additional_headers={"Origin": host},
)
break
self._listen_task = asyncio.create_task(self._listener())
challenge = await self._wait_event(
"connect.challenge", timeout=attempt_timeout
)
challenge_payload = challenge.get("payload", {})
nonce = ""
if isinstance(challenge_payload, dict):
raw_nonce = challenge_payload.get("nonce", "")
if isinstance(raw_nonce, str):
nonce = raw_nonce.strip()
role = "operator"
scopes = [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
]
client_info = {
"id": "openclaw-control-ui",
"version": __version__,
"platform": "linux",
"mode": "ui",
}
connect_params: dict[str, Any] = {
"minProtocol": PROTOCOL_VERSION,
"maxProtocol": PROTOCOL_VERSION,
"client": client_info,
"role": role,
"scopes": scopes,
"caps": [],
"commands": [],
"permissions": {},
"auth": {"token": self.config.token} if self.config.token else {},
}
device = _build_connect_device(
nonce=nonce,
token=self.config.token,
client_id=str(client_info["id"]),
client_mode=str(client_info["mode"]),
role=role,
scopes=scopes,
platform=str(client_info["platform"]),
)
if device:
connect_params["device"] = device
response = await self._rpc(
"connect",
connect_params,
timeout=attempt_timeout,
)
payload = response.get("payload", {})
if payload.get("type") != "hello-ok":
raise ConnectionError(f"Expected hello-ok, got: {payload}")
logger.info("Connected to gateway (protocol v%s)", payload.get("protocol", "?"))
return
except Exception as exc:
await self.close()
if not _is_transient_gateway_connect_error(exc):
raise
if asyncio.get_running_loop().time() >= deadline:
@ -245,60 +304,6 @@ class GatewayClient:
delay,
)
await asyncio.sleep(delay)
self._listen_task = asyncio.create_task(self._listener())
challenge = await self._wait_event("connect.challenge", timeout=self.config.connect_timeout)
challenge_payload = challenge.get("payload", {})
nonce = ""
if isinstance(challenge_payload, dict):
raw_nonce = challenge_payload.get("nonce", "")
if isinstance(raw_nonce, str):
nonce = raw_nonce.strip()
role = "operator"
scopes = [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
]
client_info = {
"id": "openclaw-control-ui",
"version": __version__,
"platform": "linux",
"mode": "ui",
}
connect_params: dict[str, Any] = {
"minProtocol": PROTOCOL_VERSION,
"maxProtocol": PROTOCOL_VERSION,
"client": client_info,
"role": role,
"scopes": scopes,
"caps": [],
"commands": [],
"permissions": {},
"auth": {"token": self.config.token} if self.config.token else {},
}
device = _build_connect_device(
nonce=nonce,
token=self.config.token,
client_id=str(client_info["id"]),
client_mode=str(client_info["mode"]),
role=role,
scopes=scopes,
platform=str(client_info["platform"]),
)
if device:
connect_params["device"] = device
response = await self._rpc(
"connect",
connect_params,
)
payload = response.get("payload", {})
if payload.get("type") != "hello-ok":
raise ConnectionError(f"Expected hello-ok, got: {payload}")
logger.info("Connected to gateway (protocol v%s)", payload.get("protocol", "?"))
async def close(self) -> None:
if self._listen_task and not self._listen_task.done():
@ -394,6 +399,15 @@ class GatewayClient:
except Exception as exc:
logger.warning("Failed to delete session %s: %s", session_key, exc)
async def abort_session(self, session_key: str, *, run_id: str | None = None) -> None:
params: dict[str, Any] = {"key": session_key}
if run_id:
params["runId"] = run_id
try:
await self._rpc("sessions.abort", params, timeout=min(self.config.request_timeout, 10.0))
except Exception as exc:
logger.warning("Failed to abort session %s run %s: %s", session_key, run_id or "-", exc)
async def get_effective_tools(self, session_key: str) -> dict[str, Any]:
response = await self._rpc("tools.effective", {"sessionKey": session_key})
return response.get("payload", {})
@ -413,15 +427,27 @@ class GatewayClient:
msg_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
self._event_queues[chat_queue_key] = chat_queue
self._event_queues[msg_queue_key] = msg_queue
timeout_ms = max(1, min(int(timeout * 1000), 2_147_483_647))
await self._rpc(
send_response = await self._rpc(
"sessions.send",
{
"key": session_key,
"message": message,
"idempotencyKey": idempotency_key,
"timeoutMs": timeout_ms,
},
)
send_payload = send_response.get("payload", {})
run_id = idempotency_key
if isinstance(send_payload, dict):
raw_run_id = send_payload.get("runId")
if isinstance(raw_run_id, str) and raw_run_id.strip():
run_id = raw_run_id.strip()
wait_task = asyncio.create_task(
self._wait_for_agent_run(run_id, timeout_ms=timeout_ms)
)
collected_messages: list[TranscriptMessage] = []
done = False
@ -430,8 +456,31 @@ class GatewayClient:
while not done:
remaining = deadline - asyncio.get_running_loop().time()
if remaining <= 0:
logger.warning("Timeout waiting for final state on session %s", session_key)
logger.warning(
"Timeout waiting for final state on session %s run %s",
session_key,
run_id,
)
break
if wait_task.done():
wait_payload = _task_result_or_empty(wait_task)
status = str(wait_payload.get("status", ""))
if status and status != "timeout":
logger.info(
"agent.wait observed terminal status for session %s run %s: %s",
session_key,
run_id,
status,
)
done = True
break
if status == "timeout":
logger.warning(
"agent.wait timed out for session %s run %s",
session_key,
run_id,
)
break
try:
event = await asyncio.wait_for(chat_queue.get(), timeout=min(0.5, remaining))
state = event.get("payload", {}).get("state", "")
@ -440,6 +489,9 @@ class GatewayClient:
except asyncio.TimeoutError:
pass
if not done:
await self.abort_session(session_key, run_id=run_id)
collected_messages.extend(
await _drain_message_queue(
msg_queue,
@ -464,11 +516,30 @@ class GatewayClient:
):
collected_messages = history_messages
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
self._event_queues.pop(chat_queue_key, None)
self._event_queues.pop(msg_queue_key, None)
return _correlate_transcript(Transcript(messages=collected_messages))
async def _wait_for_agent_run(self, run_id: str, *, timeout_ms: int) -> dict[str, Any]:
try:
response = await self._rpc(
"agent.wait",
{"runId": run_id, "timeoutMs": timeout_ms},
timeout=(timeout_ms / 1000.0) + 10.0,
)
except Exception as exc:
logger.warning("agent.wait failed for run %s: %s", run_id, exc)
return {}
payload = response.get("payload", {})
return payload if isinstance(payload, dict) else {}
async def get_session_messages(self, session_key: str) -> list[TranscriptMessage]:
try:
response = await self._rpc("sessions.get", {"key": session_key})
@ -507,14 +578,17 @@ class GatewayClient:
effective_timeout = timeout if timeout is not None else self.config.request_timeout
future: asyncio.Future[dict[str, Any]] = asyncio.get_running_loop().create_future()
self._pending[request_id] = future
await self._ws.send(json.dumps(frame))
try:
await self._ws.send(json.dumps(frame))
response = await asyncio.wait_for(future, timeout=effective_timeout)
except asyncio.TimeoutError:
self._pending.pop(request_id, None)
raise TimeoutError(
f"RPC {method} timed out after {effective_timeout:.1f}s"
)
except Exception:
self._pending.pop(request_id, None)
raise
if not response.get("ok", False):
error = response.get("error", {})
@ -574,6 +648,13 @@ def _build_connect_device(
platform: str,
device_family: str | None = None,
) -> dict[str, Any] | None:
if os.environ.get("CLAWBENCH_DISABLE_GATEWAY_DEVICE_IDENTITY", "").strip().lower() in {
"1",
"true",
"yes",
"on",
}:
return None
if not nonce:
return None
@ -643,6 +724,10 @@ def _resolve_node_executable() -> str | None:
def _is_transient_gateway_connect_error(exc: Exception) -> bool:
if isinstance(exc, (TimeoutError, asyncio.TimeoutError)):
return True
if isinstance(exc, websockets.exceptions.ConnectionClosed):
return True
if isinstance(exc, InvalidStatus):
return exc.response.status_code in {502, 503, 504}
if isinstance(exc, InvalidMessage):
@ -658,6 +743,13 @@ def _describe_connect_error(exc: Exception) -> str:
return exc.__class__.__name__
def _task_result_or_empty(task: asyncio.Task[dict[str, Any]]) -> dict[str, Any]:
try:
return task.result()
except Exception:
return {}
def _parse_single_message(message_data: dict[str, Any]) -> TranscriptMessage | None:
role = message_data.get("role", "")
if not role:

View File

@ -0,0 +1,438 @@
"""Agent-agnostic workspace verification primitives.
This is the half of `environment.py` that does not touch the OpenClaw
gateway: file-state checks, execution-check subprocessing, stdout/JSON
assertions, JSON path resolution, and the filesystem/transcript-based
memory fallback readers.
Adapters (OpenClaw, Hermes, future) consume these primitives directly.
`environment.py` re-exports them for back-compat so existing callers
keep working while the gateway-tied halves (`_verify_memory` primary
path, `_verify_session`, `_verify_cron`, `_verify_gateway_assertion`)
stay where they are and move to `adapters/openclaw.py` in a later step.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import shlex
import sys
from pathlib import Path
from typing import Any
from clawbench.paths import resolve_workspace_path
from clawbench.render import render_template, render_value
from clawbench.schemas import (
ExecutionCheck,
ExecutionCheckResult,
FileState,
MemoryState,
Transcript,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# File-state verification
# ---------------------------------------------------------------------------
def verify_file_state(
spec: FileState,
workspace: Path,
runtime_values: dict[str, Any],
) -> tuple[bool, str]:
"""Verify a single `FileState` against the workspace filesystem."""
try:
path = resolve_workspace_path(
workspace,
render_template(spec.path, runtime_values),
field=f"completion file {spec.path}",
)
except ValueError as exc:
return False, str(exc)
exists = path.exists() and path.is_file()
if not spec.exists:
return (not exists, "Correctly absent" if not exists else "File should not exist")
if not exists:
return False, "File does not exist"
content = path.read_text(encoding="utf-8", errors="replace")
if spec.min_size_bytes > 0 and path.stat().st_size < spec.min_size_bytes:
return False, f"File too small: {path.stat().st_size} < {spec.min_size_bytes}"
for token in spec.content_contains:
rendered = render_template(token, runtime_values)
if rendered not in content:
return False, f"Missing expected content '{rendered}'"
for token in spec.content_not_contains:
rendered = render_template(token, runtime_values)
if rendered in content:
return False, f"Contains forbidden content '{rendered}'"
if spec.content_matches and not re.search(
render_template(spec.content_matches, runtime_values),
content,
re.MULTILINE | re.DOTALL,
):
return False, f"Content does not match {spec.content_matches}"
return True, "OK"
# ---------------------------------------------------------------------------
# Execution checks
# ---------------------------------------------------------------------------
async def run_execution_check(
spec: ExecutionCheck,
*,
workspace: Path,
runtime_values: dict[str, Any],
) -> ExecutionCheckResult:
"""Run a single `ExecutionCheck` subprocess and evaluate its output."""
rendered_command = render_template(spec.command, runtime_values)
try:
rendered_cwd = resolve_workspace_path(
workspace,
render_template(spec.cwd, runtime_values),
field=f"execution check cwd for {spec.name}",
)
except ValueError as exc:
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=str(exc),
)
rendered_env = render_value(spec.env, runtime_values)
full_env = {
**os.environ,
**{key: str(value) for key, value in rendered_env.items()},
"PYTHONUNBUFFERED": "1",
}
python_bin_dir = str(Path(sys.executable).parent)
full_env["PATH"] = f"{python_bin_dir}:{full_env.get('PATH', '')}"
python_path_parts = [str(rendered_cwd), str(workspace)]
existing_pythonpath = full_env.get("PYTHONPATH")
if existing_pythonpath:
python_path_parts.append(existing_pythonpath)
full_env["PYTHONPATH"] = ":".join(python_path_parts)
try:
if spec.shell:
process = await asyncio.create_subprocess_shell(
rendered_command,
cwd=str(rendered_cwd),
env=full_env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
else:
process = await asyncio.create_subprocess_exec(
*shlex.split(rendered_command),
cwd=str(rendered_cwd),
env=full_env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(),
timeout=spec.timeout_seconds,
)
except asyncio.TimeoutError:
process.kill()
await process.communicate()
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=f"Timed out after {spec.timeout_seconds}s",
)
except Exception as exc:
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=str(exc),
)
stdout = stdout_bytes.decode("utf-8", errors="replace")
stderr = stderr_bytes.decode("utf-8", errors="replace")
passed, reason = evaluate_execution_result(
spec, workspace, runtime_values, process.returncode, stdout, stderr
)
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=process.returncode,
stdout=stdout,
stderr=stderr,
passed=passed,
reason=reason,
)
def evaluate_execution_result(
spec: ExecutionCheck,
workspace: Path,
runtime_values: dict[str, Any],
exit_code: int,
stdout: str,
stderr: str,
) -> tuple[bool, str]:
"""Apply every assertion declared on an `ExecutionCheck`."""
if exit_code != spec.expected_exit_code:
return False, f"Exit code {exit_code} != expected {spec.expected_exit_code}"
for token in spec.stdout_contains:
rendered = render_template(token, runtime_values)
if rendered not in stdout:
return False, f"stdout missing '{rendered}'"
for token in spec.stdout_not_contains:
rendered = render_template(token, runtime_values)
if rendered in stdout:
return False, f"stdout unexpectedly contains '{rendered}'"
for token in spec.stderr_contains:
rendered = render_template(token, runtime_values)
if rendered not in stderr:
return False, f"stderr missing '{rendered}'"
if spec.stdout_matches and not re.search(
render_template(spec.stdout_matches, runtime_values), stdout, re.MULTILINE | re.DOTALL
):
return False, f"stdout does not match {spec.stdout_matches}"
if spec.stderr_matches and not re.search(
render_template(spec.stderr_matches, runtime_values), stderr, re.MULTILINE | re.DOTALL
):
return False, f"stderr does not match {spec.stderr_matches}"
if spec.expected_stdout is not None:
rendered = render_template(spec.expected_stdout, runtime_values).strip()
if stdout.strip() != rendered:
return False, "stdout did not match expected text"
if spec.expected_stdout_file:
try:
expected_path = resolve_workspace_path(
workspace,
render_template(spec.expected_stdout_file, runtime_values),
field=f"expected_stdout_file for {spec.name}",
)
except ValueError as exc:
return False, str(exc)
if stdout.strip() != expected_path.read_text(encoding="utf-8").strip():
return False, f"stdout did not match {spec.expected_stdout_file}"
if spec.expected_json is not None:
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
return False, f"stdout was not valid JSON: {exc}"
if parsed != render_value(spec.expected_json, runtime_values):
return False, "stdout JSON did not match expected JSON"
if spec.expected_json_file:
try:
expected_path = resolve_workspace_path(
workspace,
render_template(spec.expected_json_file, runtime_values),
field=f"expected_json_file for {spec.name}",
)
except ValueError as exc:
return False, str(exc)
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
return False, f"stdout was not valid JSON: {exc}"
expected_json = json.loads(expected_path.read_text(encoding="utf-8"))
if parsed != expected_json:
return False, f"stdout JSON did not match {spec.expected_json_file}"
return True, "OK"
# ---------------------------------------------------------------------------
# Memory fallback: read well-known files from the workspace directly.
# ---------------------------------------------------------------------------
MEMORY_FILE_CANDIDATES: tuple[str, ...] = (
"MEMORY.md",
"memory.md",
"memory/MEMORY.md",
"memory/memory.md",
"memory/notes.md",
"memory/NOTES.md",
"notes.md",
)
def read_workspace_memory_text(workspace: Path) -> str:
"""Read concatenated memory-file contents straight from the workspace.
This is the adapter-free equivalent of
`environment._read_agent_memory_text`, which reads the same files via
`GatewayClient.get_agent_file`. Use this from any adapter whose agent
runs directly in the ClawBench workspace (Hermes, Claude Code, Codex).
"""
contents: list[str] = []
for name in MEMORY_FILE_CANDIDATES:
path = workspace / name
try:
if path.is_file():
text = path.read_text(encoding="utf-8", errors="replace")
if text.strip():
contents.append(text)
except Exception:
continue
return "\n".join(contents)
def memory_visible_in_transcript(spec: MemoryState, transcript: Transcript) -> bool:
"""Return True if the transcript shows a memory *write* matching `spec`.
Same heuristic as `environment._memory_visible_in_transcript` kept
agent-agnostic: it reads `ToolCall.family`, `call.name`, `call.input`,
`call.output`, `call.error`, all of which are canonical.
"""
needle = spec.key_pattern.lower()
for call in transcript.tool_call_sequence:
family = (call.family or "").lower()
name = call.name.lower()
path = str(call.input.get("path", "")).lower()
if family != "memory" and "memory" not in path:
continue
if (
family == "memory"
and "search" in name
and "write" not in name
and "store" not in name
and "save" not in name
):
continue
serialized_bits = [call.output, call.error]
try:
serialized_bits.append(json.dumps(call.input, sort_keys=True))
except TypeError:
serialized_bits.append(str(call.input))
haystack = " ".join(bit for bit in serialized_bits if bit).lower()
if needle not in haystack:
continue
if all(token.lower() in haystack for token in spec.value_contains):
return True
return False
def verify_memory_fallback(
spec: MemoryState,
workspace: Path,
*,
transcript: Transcript | None = None,
extra_memory_text: str = "",
) -> tuple[bool, str]:
"""Resolve a `MemoryState` assertion using workspace files + transcript.
Used by any adapter that doesn't expose an OpenClaw-style
`memory.search` RPC. The lookup strategy is deliberately permissive
(matches the existing fallback path in `environment._verify_memory`):
1. Concatenate every known memory file in the workspace.
2. Optionally add any adapter-supplied text (e.g. OpenClaw's
`_read_agent_memory_text`) via `extra_memory_text`.
3. If the key_pattern appears (case-insensitive), check every
`value_contains` token.
4. If that fails, fall back to scanning the transcript for a memory
write that matches.
"""
memory_text = (read_workspace_memory_text(workspace) + "\n" + extra_memory_text).lower()
needle = spec.key_pattern.lower()
found = needle in memory_text
if not spec.exists:
return (not found, "Correctly absent" if not found else "Memory entry exists")
if found:
for token in spec.value_contains:
if token.lower() not in memory_text:
return False, f"Memory value missing '{token}'"
return True, "OK"
if transcript is not None and memory_visible_in_transcript(spec, transcript):
return True, "Verified from transcript fallback"
return (
False,
"No matching memory content found in persisted memory files or transcript fallback",
)
# ---------------------------------------------------------------------------
# JSON-path resolver (pure function over dict/list payloads)
# ---------------------------------------------------------------------------
def resolve_json_path(payload: Any, path: str) -> Any:
"""Resolve a dotted `$.foo.bar[0].baz` path into `payload`.
Returns None if any part of the path is missing or the type is
wrong. Handles index syntax via `foo[3]`.
"""
if path == "$":
return payload
current = payload
for part in path.lstrip("$").lstrip(".").split("."):
if not part:
continue
match = re.fullmatch(r"([^\[]+)\[(\d+)\]", part)
if match:
key, index = match.groups()
if not isinstance(current, dict) or key not in current:
return None
current = current[key]
if not isinstance(current, list):
return None
idx = int(index)
if idx >= len(current):
return None
current = current[idx]
continue
if isinstance(current, dict) and part in current:
current = current[part]
continue
return None
return current
__all__ = [
"MEMORY_FILE_CANDIDATES",
"evaluate_execution_result",
"memory_visible_in_transcript",
"read_workspace_memory_text",
"resolve_json_path",
"run_execution_check",
"verify_file_state",
"verify_memory_fallback",
]

View File

@ -19,6 +19,7 @@ from rich.console import Console
from rich.table import Table
from clawbench import __version__
from clawbench.ablation import build_ablation_profile
from clawbench.client import GatewayClient, GatewayConfig
from clawbench.releases import compute_task_snapshot_fingerprint, load_active_release
from clawbench.schemas import (
@ -86,6 +87,9 @@ class BenchmarkHarness:
browser_concurrency: int = 1,
adapter: str = "openclaw",
judge_affects_score: bool = False,
tool_profile_name: str | None = None,
enabled_toolsets: list[str] | None = None,
disabled_toolsets: list[str] | None = None,
) -> None:
self.gateway_config = gateway_config
self.model = model
@ -111,6 +115,9 @@ class BenchmarkHarness:
self.concurrency = max(1, int(concurrency))
self.browser_concurrency = max(1, int(browser_concurrency))
self.adapter = adapter
self.tool_profile_name = tool_profile_name
self.enabled_toolsets = enabled_toolsets or []
self.disabled_toolsets = disabled_toolsets or []
self.repo_root = Path(__file__).parent.parent
self.last_task_runs: dict[str, list[TaskRunResult]] = {}
@ -548,6 +555,9 @@ class BenchmarkHarness:
"prompt_variant": self.prompt_variant,
"judge_model": self.judge_model,
"judge_affects_score": self.judge_affects_score,
"tool_profile_name": self.tool_profile_name,
"enabled_toolsets": self.enabled_toolsets,
"disabled_toolsets": self.disabled_toolsets,
"benchmark_version": __version__,
"task_fingerprint": _task_definition_fingerprint(task),
}
@ -753,6 +763,15 @@ class BenchmarkHarness:
for _ in range(count)
)
active_release = load_active_release()
ablation_profile = build_ablation_profile(
model=self.model,
adapter=self.adapter,
prompt_profile=self.prompt_variant,
harness_version=__version__,
tool_profile_name=self.tool_profile_name,
enabled_toolsets=self.enabled_toolsets,
disabled_toolsets=self.disabled_toolsets,
)
result = BenchmarkResult(
submission_id=str(uuid.uuid4()),
model=self.model,
@ -770,6 +789,7 @@ class BenchmarkHarness:
"judge_model": self.judge_model,
"judge_affects_score": self.judge_affects_score,
"adapter": self.adapter,
"ablation_profile": ablation_profile.model_dump(),
"known_adapters": list(KNOWN_ADAPTERS),
"executable_adapters": sorted(EXECUTABLE_ADAPTERS),
"subsets": self.subsets,

View File

@ -28,7 +28,14 @@ logger = logging.getLogger(__name__)
HF_TOKEN = os.environ.get("HF_TOKEN", "")
# Local fallback when HF is unavailable
LOCAL_QUEUE_DIR = Path("/data/queue") if Path("/data").exists() else Path("data/queue")
def _resolve_local_queue_dir() -> Path:
override = os.environ.get("CLAWBENCH_LOCAL_QUEUE_DIR", "").strip()
if override:
return Path(override).expanduser()
return Path("/data/queue") if Path("/data").exists() else Path("data/queue")
LOCAL_QUEUE_DIR = _resolve_local_queue_dir()
class JobStatus(str, Enum):
@ -50,6 +57,7 @@ class SubmissionRequest(BaseModel):
runs_per_task: int = Field(default=3, ge=1, le=10)
max_parallel_lanes: int = Field(default=1, ge=1, le=8)
tier: str | None = None # Filter to a specific tier
task_ids: list[str] = Field(default_factory=list)
scenario: str | None = None
prompt_variant: str = "clear"
submitter: str = "" # HF username
@ -65,6 +73,7 @@ class SubmissionRequest(BaseModel):
"runs_per_task": self.runs_per_task,
"max_parallel_lanes": self.max_parallel_lanes,
"tier": self.tier or "",
"task_ids": sorted({task_id.strip() for task_id in self.task_ids if task_id.strip()}),
"scenario": self.scenario or "",
"prompt_variant": self.prompt_variant,
}

View File

@ -390,6 +390,12 @@ class TaskDefinition(BaseModel):
privacy_tier: str = ""
contamination_risk: str = ""
freshness_epoch: str = ""
category: str = ""
domain: str = ""
functionality: list[str] = Field(default_factory=list)
trace_distribution: list[str] = Field(default_factory=list)
tool_surface: list[str] = Field(default_factory=list)
risk_tags: list[str] = Field(default_factory=list)
first_used_at: str = ""
retire_after_runs: int = 0
similarity_hash: str = ""

View File

@ -34,6 +34,13 @@ STALE_EVALUATION_SECONDS = max(
JOB_HEARTBEAT_INTERVAL_SECONDS * 4,
int(os.environ.get("CLAWBENCH_STALE_EVALUATION_SECONDS", "1800")),
)
OPENCLAW_EVAL_EXEC_HOSTS = {"auto", "gateway", "sandbox", "node"}
OPENCLAW_EVAL_SYSTEM_PROMPT = (
"You are running an OpenClaw benchmark task. Complete the user's request in the current "
"workspace using the available tools when needed. For file, code, browser, shell, or memory "
"tasks, make the requested changes directly and verify them when practical. Do not ask "
"follow-up questions during the benchmark. Keep any final reply brief."
)
@dataclass
@ -46,6 +53,12 @@ class ParallelLane:
state_dir: Path | None = None
log_path: Path | None = None
@property
def home_dir(self) -> Path | None:
if self.state_dir is None:
return None
return self.state_dir.parent / "home"
@property
def ws_url(self) -> str:
return f"ws://localhost:{self.port}"
@ -225,6 +238,7 @@ class EvalWorker:
job.job_id,
progress.mark_status("Uploading results", clear_active=True),
)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
result_path = RESULTS_DIR / f"{result.submission_id}.json"
result_path.write_text(json.dumps(result.model_dump(), indent=2), encoding="utf-8")
@ -301,6 +315,7 @@ class EvalWorker:
prompt_variant=job.request.prompt_variant,
prepare_run=prepare_run,
progress_callback=progress_callback,
tool_profile_name=os.environ.get("CLAWBENCH_TOOL_PROFILE_NAME", "") or None,
)
return await harness.run()
@ -371,6 +386,7 @@ class EvalWorker:
tier=job.request.tier,
scenario=job.request.scenario,
prompt_variant=job.request.prompt_variant,
tool_profile_name=os.environ.get("CLAWBENCH_TOOL_PROFILE_NAME", "") or None,
)
return summary_harness.compose_result_from_task_stats(
ordered_stats,
@ -384,7 +400,8 @@ class EvalWorker:
)
finally:
self._stop_parallel_gateways()
shutil.rmtree(job_root, ignore_errors=True)
if os.environ.get("CLAWBENCH_KEEP_PARALLEL_LANE_ROOT", "").strip() != "1":
shutil.rmtree(job_root, ignore_errors=True)
async def _run_parallel_lane(self, job, lane: ParallelLane, progress: JobProgressTracker):
gateway_cmd = self._find_gateway_cmd()
@ -433,6 +450,7 @@ class EvalWorker:
progress_callback=progress_callback,
print_report=False,
quiet=True,
tool_profile_name=os.environ.get("CLAWBENCH_TOOL_PROFILE_NAME", "") or None,
)
result = await harness.run()
await self._sync_job_progress(job.job_id, progress.clear_lane(lane.index))
@ -447,6 +465,9 @@ class EvalWorker:
return load_all_tasks(
tier=job.request.tier,
scenario=job.request.scenario,
task_ids=list(getattr(job.request, "task_ids", []) or None)
if getattr(job.request, "task_ids", None)
else None,
prompt_variant=job.request.prompt_variant,
)
@ -506,10 +527,36 @@ class EvalWorker:
def _materialize_lane_runtime(self, lane: ParallelLane, job_root: Path) -> None:
lane_root = job_root / f"lane-{lane.index}"
lane.state_dir = lane_root / "state"
lane_home = lane.home_dir
if lane_home is not None:
(lane_home / ".config").mkdir(parents=True, exist_ok=True)
lane.log_path = lane_root / "gateway.log"
lane.port = GATEWAY_PORT + (lane.index * GATEWAY_PORT_SPACING)
self._seed_lane_state_dir(lane.state_dir)
def _run_lane_prepare_hook(self, lane: ParallelLane) -> None:
hook = os.environ.get("CLAWBENCH_LANE_PREPARE_CMD", "").strip()
if not hook:
return
if lane.state_dir is None:
raise RuntimeError(f"Lane {lane.index + 1} state dir missing before prepare hook")
lane_home = lane.home_dir
if lane_home is None:
raise RuntimeError(f"Lane {lane.index + 1} home dir missing before prepare hook")
(lane_home / ".config").mkdir(parents=True, exist_ok=True)
hook_env = {
**os.environ,
"HOME": str(lane_home),
"OPENCLAW_HOME": str(lane_home),
"OPENCLAW_STATE_DIR": str(lane.state_dir),
"OPENCLAW_CONFIG_PATH": str(lane.state_dir / "openclaw.json"),
"XDG_CONFIG_HOME": str(lane_home / ".config"),
"CLAWBENCH_LANE_INDEX": str(lane.index),
"CLAWBENCH_LANE_PORT": str(lane.port),
}
logger.info("Running lane %d prepare hook", lane.index + 1)
subprocess.run([hook], env=hook_env, check=True)
def _seed_lane_state_dir(self, target_state_dir: Path) -> None:
source_state_dir = Path(os.environ.get("OPENCLAW_STATE_DIR", os.path.expanduser("~/.openclaw")))
shutil.rmtree(target_state_dir, ignore_errors=True)
@ -628,13 +675,19 @@ class EvalWorker:
_set_nested(data, "browser.headless", True)
_set_nested(data, "browser.noSandbox", True)
_set_nested(data, "agents.defaults.skipBootstrap", True)
_set_nested(data, "tools.exec.host", self._openclaw_eval_exec_host())
_set_nested(data, "tools.exec.security", "full")
_set_nested(data, "tools.exec.ask", "off")
_set_nested(data, "approvals.exec.enabled", False)
if self._active_model:
_set_nested(data, "agents.defaults.model.primary", self._active_model)
_set_nested(data, "agents.defaults.subagents.model.primary", self._active_model)
self._apply_eval_model_defaults(data, self._active_model)
tmp_path = cfg_path.with_suffix(".json.tmp")
tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp_path.replace(cfg_path)
self._write_eval_exec_approvals(lane_state_dir)
def _order_task_stats(self, tasks: list[TaskDefinition], combined_stats: list) -> list:
stats_by_id = {}
@ -729,6 +782,7 @@ class EvalWorker:
"token",
"--token",
gateway_token,
"--compact",
],
stdout=log_handle,
stderr=subprocess.STDOUT,
@ -767,6 +821,12 @@ class EvalWorker:
f"Gateway /health did not respond within {health_deadline_sec}s. Log:\n{self._read_gateway_log()}"
)
await self._wait_for_gateway_ready_marker(
process=self._gateway_process,
log_reader=lambda: self._read_gateway_log(limit=20_000),
description="Gateway",
)
# Phase B: control-plane probe with retries (see the parallel
# variant in _ensure_parallel_gateway for the detailed rationale).
gateway_config = GatewayConfig(url=GATEWAY_WS_URL, token=GATEWAY_TOKEN)
@ -816,21 +876,30 @@ class EvalWorker:
# Re-inject the host config's env + plugins before every restart.
if lane.state_dir is not None:
self._reinject_host_env_to_lane(lane.state_dir)
self._run_lane_prepare_hook(lane)
if lane.state_dir is None or lane.log_path is None:
raise RuntimeError(f"Lane {lane.index + 1} runtime was not materialized before gateway startup")
lane_home = lane.home_dir
if lane_home is None:
raise RuntimeError(f"Lane {lane.index + 1} home was not materialized before gateway startup")
(lane_home / ".config").mkdir(parents=True, exist_ok=True)
logger.info("Starting lane %d gateway on port %d", lane.index + 1, lane.port)
gateway_token = os.environ.get("OPENCLAW_GATEWAY_TOKEN", "clawbench-internal-token")
gateway_env = {
**os.environ,
"OPENCLAW_HOME": os.environ.get("OPENCLAW_HOME", os.path.expanduser("~")),
"HOME": str(lane_home),
"OPENCLAW_HOME": str(lane_home),
"OPENCLAW_STATE_DIR": str(lane.state_dir),
"OPENCLAW_CONFIG_PATH": str(lane.state_dir / "openclaw.json"),
"XDG_CONFIG_HOME": str(lane_home / ".config"),
"OPENCLAW_SKIP_GMAIL_WATCHER": "1",
"OPENCLAW_SKIP_CANVAS_HOST": "1",
"OPENCLAW_NO_RESPAWN": "1",
}
self._configure_browser_runtime(gateway_cmd, gateway_env)
lane.log_path.parent.mkdir(parents=True, exist_ok=True)
lane.log_path.write_text("", encoding="utf-8")
log_handle = lane.log_path.open("a", encoding="utf-8")
try:
process = subprocess.Popen(
@ -848,6 +917,7 @@ class EvalWorker:
"token",
"--token",
gateway_token,
"--compact",
],
stdout=log_handle,
stderr=subprocess.STDOUT,
@ -890,6 +960,12 @@ class EvalWorker:
f"Log:\n{self._read_parallel_gateway_log(lane)}"
)
await self._wait_for_gateway_ready_marker(
process=process,
log_reader=lambda: self._read_parallel_gateway_log(lane, limit=20_000),
description=f"Lane {lane.index + 1} gateway",
)
# Phase B: control-plane probe with explicit retries. A healthy
# /health response does not guarantee sessions.create works
# immediately — plugin registration races can leave the gateway
@ -1001,6 +1077,10 @@ class EvalWorker:
("agents.defaults.skipBootstrap", True),
("browser.headless", True),
("browser.noSandbox", True),
("tools.exec.host", self._openclaw_eval_exec_host()),
("tools.exec.security", "full"),
("tools.exec.ask", "off"),
("approvals.exec.enabled", False),
]
if self._active_model:
config_pairs.extend(
@ -1010,14 +1090,61 @@ class EvalWorker:
]
)
try:
self._patch_openclaw_config(config_pairs)
state_dir = Path(
gateway_env.get("OPENCLAW_STATE_DIR")
or os.environ.get("OPENCLAW_STATE_DIR")
or os.path.expanduser("~/.openclaw")
)
config_path = Path(gateway_env.get("OPENCLAW_CONFIG_PATH") or (state_dir / "openclaw.json"))
self._patch_openclaw_config(config_pairs, config_path=config_path)
self._write_eval_exec_approvals(state_dir)
except Exception as exc:
logger.warning("Direct openclaw.json patch failed: %s", exc)
@staticmethod
def _patch_openclaw_config(pairs: list[tuple[str, object]]) -> None:
state_dir = Path(os.environ.get("OPENCLAW_STATE_DIR") or os.path.expanduser("~/.openclaw"))
config_path = state_dir / "openclaw.json"
def _openclaw_eval_exec_host() -> str:
value = os.environ.get("OPENCLAW_EXEC_HOST", "gateway").strip().lower()
if value in OPENCLAW_EVAL_EXEC_HOSTS:
return value
logger.warning("Invalid OPENCLAW_EXEC_HOST=%r; using gateway", value)
return "gateway"
@staticmethod
def _write_eval_exec_approvals(state_dir: Path) -> None:
state_dir.mkdir(parents=True, exist_ok=True)
approvals_path = state_dir / "exec-approvals.json"
approvals = {
"version": 1,
"socket": {
"path": str(approvals_path.with_suffix(".sock")),
"token": "clawbench-eval-token",
},
"defaults": {
"security": "full",
"ask": "off",
"askFallback": "full",
},
"agents": {
"*": {
"security": "full",
"ask": "off",
"askFallback": "full",
}
},
}
tmp_path = approvals_path.with_suffix(".json.tmp")
tmp_path.write_text(json.dumps(approvals, indent=2), encoding="utf-8")
tmp_path.replace(approvals_path)
def _patch_openclaw_config(
self,
pairs: list[tuple[str, object]],
*,
config_path: Path | None = None,
) -> None:
if config_path is None:
state_dir = Path(os.environ.get("OPENCLAW_STATE_DIR") or os.path.expanduser("~/.openclaw"))
config_path = state_dir / "openclaw.json"
if not config_path.exists():
logger.warning("openclaw.json not found at %s; skipping direct patch", config_path)
return
@ -1033,12 +1160,50 @@ class EvalWorker:
if cursor.get(parts[-1]) != value:
cursor[parts[-1]] = value
changed = True
if self._active_model:
changed = self._apply_eval_model_defaults(data, self._active_model) or changed
if not changed:
return
tmp_path = config_path.with_suffix(".json.tmp")
tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp_path.replace(config_path)
@staticmethod
def _apply_eval_model_defaults(data: dict, model: str) -> bool:
"""Force eval model parameters that keep benchmark turns low-latency."""
agents = data.setdefault("agents", {})
if not isinstance(agents, dict):
data["agents"] = agents = {}
defaults = agents.setdefault("defaults", {})
if not isinstance(defaults, dict):
agents["defaults"] = defaults = {}
models = defaults.setdefault("models", {})
if not isinstance(models, dict):
defaults["models"] = models = {}
entry = models.setdefault(model, {})
if not isinstance(entry, dict):
entry = {}
models[model] = entry
params = entry.setdefault("params", {})
if not isinstance(params, dict):
params = {}
entry["params"] = params
changed = False
if defaults.get("systemPromptOverride") != OPENCLAW_EVAL_SYSTEM_PROMPT:
defaults["systemPromptOverride"] = OPENCLAW_EVAL_SYSTEM_PROMPT
changed = True
if params.get("fastMode") is not True:
params["fastMode"] = True
changed = True
if model.startswith("openai/"):
if params.get("transport") != "sse":
params["transport"] = "sse"
changed = True
if params.get("openaiWsWarmup") is not False:
params["openaiWsWarmup"] = False
changed = True
return changed
def _find_gateway_cmd(self) -> list[str] | None:
import shutil
@ -1058,13 +1223,15 @@ class EvalWorker:
# Use a generous dedicated config for the probe. A healthy gateway
# usually responds to sessions.create in under a second, but plugin
# initialization (especially OpenRouter model list fetch) can add
# 10-30s after /health reports 200. The 60s outer bound ensures we
# don't give up during a cold-start scenario.
# 10-30s after /health reports 200. On cold Docker lanes OpenClaw may
# also install provider runtime SDKs during the first sessions.create,
# so keep this bound configurable and separate from steady-state RPCs.
probe_timeout = float(os.environ.get("CLAWBENCH_GATEWAY_PROBE_TIMEOUT_SECONDS", "180"))
probe_config = GatewayConfig(
url=gateway_config.url,
token=gateway_config.token,
connect_timeout=gateway_config.connect_timeout,
request_timeout=30.0,
request_timeout=probe_timeout,
)
async def _probe() -> None:
@ -1075,25 +1242,67 @@ class EvalWorker:
await client.delete_session(session_key)
try:
await asyncio.wait_for(_probe(), timeout=60.0)
await asyncio.wait_for(_probe(), timeout=probe_timeout + 10.0)
except asyncio.TimeoutError as exc:
raise RuntimeError(
"Gateway control-plane probe timed out after 60s "
f"Gateway control-plane probe timed out after {probe_timeout:.0f}s "
"(sessions.create hung on a freshly-started gateway); "
"lane will be retried by the queue."
) from exc
def _read_gateway_log(self) -> str:
async def _wait_for_gateway_ready_marker(self, process: subprocess.Popen, log_reader, description: str) -> None:
# OpenClaw 2026.4.26 can answer /health before channels and sidecars
# finish startup. Probing sessions.create during that window can hold the
# session write lock for minutes. Some lane gateway modes do not emit
# the final ready marker, so wait for it briefly after sidecar startup
# and then let the bounded control-plane probe decide.
ready_deadline_sec = int(os.environ.get("CLAWBENCH_GATEWAY_READY_TIMEOUT_SECONDS", "420"))
marker_grace_sec = int(os.environ.get("CLAWBENCH_GATEWAY_READY_MARKER_GRACE_SECONDS", "90"))
saw_sidecar_start = False
sidecar_start_elapsed: int | None = None
for elapsed in range(ready_deadline_sec):
if process.poll() is not None:
raise RuntimeError(
f"{description} exited with code {process.returncode}. Log:\n{log_reader()[-4_000:]}"
)
log_text = log_reader()
if "[gateway] ready" in log_text:
logger.info("%s ready after %ss", description, elapsed)
return
if "[gateway] starting channels and sidecars" in log_text:
saw_sidecar_start = True
if sidecar_start_elapsed is None:
sidecar_start_elapsed = elapsed
if sidecar_start_elapsed is not None and elapsed - sidecar_start_elapsed >= marker_grace_sec:
logger.info(
"%s did not emit ready marker %ss after sidecar startup; probing control plane",
description,
marker_grace_sec,
)
return
if not saw_sidecar_start and elapsed >= 15:
return
await asyncio.sleep(1)
logger.warning(
"%s did not log ready within %ss; probing control plane anyway. Log:\n%s",
description,
ready_deadline_sec,
log_reader()[-4_000:],
)
def _read_gateway_log(self, limit: int = 4_000) -> str:
try:
return Path("/tmp/gateway.log").read_text(encoding="utf-8", errors="replace")[-4_000:]
return Path("/tmp/gateway.log").read_text(encoding="utf-8", errors="replace")[-limit:]
except Exception:
return "(no gateway log)"
def _read_parallel_gateway_log(self, lane: ParallelLane) -> str:
def _read_parallel_gateway_log(self, lane: ParallelLane, limit: int = 4_000) -> str:
if lane.log_path is None:
return "(no gateway log)"
try:
return lane.log_path.read_text(encoding="utf-8", errors="replace")[-4_000:]
return lane.log_path.read_text(encoding="utf-8", errors="replace")[-limit:]
except Exception:
return "(no gateway log)"

367
docs/kubernetes.md Normal file
View File

@ -0,0 +1,367 @@
# Running ClawBench on Kubernetes
ClawBench runs as a **sidecar** in the OpenClaw gateway pod. The sidecar
connects to the gateway over loopback (`ws://localhost:18789`), runs the
19-task eval suite, and optionally logs results to MLflow.
```
┌─── OpenClaw Pod ─────────────────────────────┐
│ gateway container (ws://localhost:18789) │
│ clawbench sidecar ──► gateway via loopback │
└──────────────────────────────────────────────┘
│ │
▼ ▼
Model provider API MLflow (optional)
```
All commands use `scripts/k8s/deploy.sh`. The script has these modes:
| Flag | What it does |
|------|-------------|
| *(none)* | Full deploy: OpenClaw + MLflow + eval sidecar |
| `--openclaw-only` | Deploy OpenClaw gateway only |
| `--mlflow-only` | Deploy MLflow only |
| `--add-sidecar` | Inject clawbench sidecar (starts eval) |
| `--remove-sidecar` | Remove clawbench sidecar |
| `--logs` | Tail sidecar logs |
| `--teardown` | Delete eval namespace (keeps MLflow) |
---
## Prerequisites
- `kubectl` on PATH, connected to a cluster (`kubectl cluster-info` succeeds)
- A container image for ClawBench (see [Building images](#building-images))
- At least one model provider API key (`OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, etc.)
For local testing with Kind:
https://github.com/openclaw/openclaw/blob/main/docs/install/kubernetes.md#local-testing-with-kind
---
## Environment variables
Set these **before** running `deploy.sh`.
### Required
| Variable | Purpose |
|----------|---------|
| `CLAWBENCH_NAMESPACE` | Namespace for OpenClaw + eval (e.g. `clawbench-eval`) |
| `OPENAI_API_KEY` | Model provider key (or use another provider — see table below) |
### Optional
| Variable | Default | Purpose |
|----------|---------|---------|
| `CLAWBENCH_IMAGE` | `quay.io/sallyom/clawbench:latest` | ClawBench sidecar image |
| `OPENCLAW_IMAGE` | `ghcr.io/openclaw/openclaw:latest` | OpenClaw gateway image |
| `OPENCLAW_GATEWAY_TOKEN` | *(generated by script)* | Gateway token; set this when attaching the sidecar to an existing gateway |
| `CLAWBENCH_MODEL` | `openai/gpt-5.5` | Model to evaluate |
| `MLFLOW_NAMESPACE` | `mlflow` | MLflow namespace |
| `MLFLOW_TRACKING_URI` | *(deployed by script)* | External MLflow URI — skips MLflow deploy if set |
| `MLFLOW_EXPERIMENT_ID` | | MLflow experiment ID |
| `MLFLOW_EXPERIMENT_NAME` | `clawbench` | MLflow experiment name |
| `MLFLOW_IMAGE` | `ghcr.io/mlflow/mlflow:v2.21.3` | MLflow server image |
| `ANTHROPIC_API_KEY` | | Added to K8s secret if set |
| `OPENROUTER_API_KEY` | | Added to K8s secret if set |
| `GEMINI_API_KEY` | | Added to K8s secret if set |
| `OPENAI_API_BASE` | | Base URL for OpenAI-compatible endpoints (e.g. vLLM, Ollama); patched into gateway config |
### Model routing
The gateway routes by provider prefix:
| Model string | Required variables |
|-------------|-------------------|
| `openai/gpt-5.5` | `OPENAI_API_KEY` |
| `anthropic/claude-sonnet-4-6` | `ANTHROPIC_API_KEY` |
| `openrouter/anthropic/claude-sonnet-4-6` | `OPENROUTER_API_KEY` |
| `openai/my-local-model` | `OPENAI_API_KEY` + `OPENAI_API_BASE` |
For OpenAI-compatible endpoints (vLLM, Ollama, TGI, or any in-cluster model
server), set `OPENAI_API_BASE` to the endpoint URL and use the `openai/`
prefix for the model name:
```bash
export CLAWBENCH_MODEL="openai/meta-llama/Llama-4-Scout-17B"
export OPENAI_API_KEY="none" # dummy value if the endpoint doesn't require auth
export OPENAI_API_BASE="http://vllm-service.my-ns.svc.cluster.local:8000/v1"
```
---
## Full deploy (quick start)
Deploys OpenClaw gateway, MLflow, and the eval sidecar in one command.
```bash
export CLAWBENCH_NAMESPACE=clawbench-eval
# Export API keys before running. The script stores them in a K8s Secret
# ("clawbench-secrets") that the gateway and sidecar containers read.
export OPENAI_API_KEY="sk-..."
# Model to evaluate (default: openai/gpt-5.5)
# export CLAWBENCH_MODEL="anthropic/claude-sonnet-4-6"
./scripts/k8s/deploy.sh
```
Verify:
```bash
# Should show 2/2 containers (gateway + clawbench)
kubectl get pods -n clawbench-eval
# Follow eval progress
./scripts/k8s/deploy.sh --logs
```
When the eval finishes, copy results and clean up:
```bash
# Copy results from the sidecar
POD=$(kubectl get pod -n $CLAWBENCH_NAMESPACE -l app=openclaw -o jsonpath='{.items[0].metadata.name}')
kubectl cp "$CLAWBENCH_NAMESPACE/$POD:/results/benchmark.json" -c clawbench ./benchmark.json
# Remove the sidecar (keeps OpenClaw + MLflow running)
./scripts/k8s/deploy.sh --remove-sidecar
# Or tear down everything
./scripts/k8s/deploy.sh --teardown
```
---
## Existing cluster + existing MLflow
If you already have an OpenShift or Kubernetes cluster and an MLflow instance,
you only need to deploy OpenClaw and run the eval — no cluster or MLflow setup
required.
```bash
export CLAWBENCH_NAMESPACE=clawbench-eval
# API keys — export before running deploy.sh. The script creates a
# Kubernetes Secret ("clawbench-secrets") from whichever keys are set.
# At least one provider key is required.
export OPENAI_API_KEY="sk-..."
# export ANTHROPIC_API_KEY="sk-ant-..."
# export OPENROUTER_API_KEY="sk-or-..."
# export GEMINI_API_KEY="..."
# Model to evaluate (default: openai/gpt-5.5)
export CLAWBENCH_MODEL="anthropic/claude-sonnet-4-6"
# If attaching to an existing OpenClaw gateway, this must match that gateway.
# If deploy.sh creates OpenClaw, it generates this token for you.
# export OPENCLAW_GATEWAY_TOKEN="..."
# Point to your existing MLflow
export MLFLOW_TRACKING_URI="https://mlflow.example.com"
export MLFLOW_EXPERIMENT_NAME="clawbench-gpt5.5" # or use MLFLOW_EXPERIMENT_ID=42
# Deploy OpenClaw gateway into your cluster
./scripts/k8s/deploy.sh --openclaw-only
```
Verify OpenClaw is running:
```bash
kubectl get pods -n clawbench-eval
# Expect: openclaw-xxxx 1/1 Running
```
Then start the eval:
```bash
./scripts/k8s/deploy.sh --add-sidecar
./scripts/k8s/deploy.sh --logs
```
The deploy script sets `MLFLOW_TRACKING_URI` to skip its own MLflow deployment
and patches the experiment name/ID into the clawbench ConfigMap. When the eval
completes, `scripts/log_to_mlflow.py` logs results to your MLflow under that
experiment.
`MLFLOW_EXPERIMENT_NAME` creates the experiment if it doesn't exist.
`MLFLOW_EXPERIMENT_ID` requires an existing experiment.
---
## Step-by-step deploy
Use this when you want to deploy components individually or bring your own
OpenClaw/MLflow.
### Step 1: Deploy OpenClaw gateway
```bash
export CLAWBENCH_NAMESPACE=clawbench-eval
export OPENAI_API_KEY="sk-..."
./scripts/k8s/deploy.sh --openclaw-only
```
Verify:
```bash
kubectl get pods -n clawbench-eval
# Expect: openclaw-xxxx 1/1 Running
```
This deploys from `scripts/k8s/openclaw/`: a single gateway pod with token
auth, ClusterIP service, and 10Gi PVC. The deploy script generates a gateway
token and creates the `clawbench-secrets` Secret automatically.
**Skip this step** if you already have an OpenClaw deployment. Your existing
gateway must have this config (see `scripts/k8s/openclaw/configmap.yaml`):
```json
{
"browser": {
"enabled": true,
"headless": true,
"noSandbox": true,
"ssrfPolicy": {
"allowedHostnames": ["localhost", "127.0.0.1"]
}
},
"tools": {
"profile": "coding",
"alsoAllow": ["browser"]
}
}
```
Key requirements:
- `browser.enabled: true` — activates the bundled browser plugin
- `tools.alsoAllow: ["browser"]` — the `coding` profile does NOT include browser by default
- `browser.ssrfPolicy` — several eval tasks need localhost access
- Gateway must bind to loopback with token auth; export the matching
`OPENCLAW_GATEWAY_TOKEN` before running `--add-sidecar`
### Step 2: Deploy MLflow
```bash
./scripts/k8s/deploy.sh --mlflow-only
```
Verify:
```bash
kubectl get pods -n mlflow
# Expect: mlflow-xxxx 1/1 Running
```
Deploys a single-replica MLflow server with SQLite backend into the `mlflow`
namespace. The clawbench ConfigMap defaults to
`http://mlflow-service.mlflow.svc.cluster.local:5000`.
**Skip this step** if you have an external MLflow — set `MLFLOW_TRACKING_URI`:
```bash
export MLFLOW_TRACKING_URI=http://my-mlflow.example.com:5000
export MLFLOW_EXPERIMENT_ID=4 # or MLFLOW_EXPERIMENT_NAME
```
### Step 3: Run the eval
```bash
./scripts/k8s/deploy.sh --add-sidecar
```
This patches the OpenClaw deployment to inject a clawbench sidecar that:
1. Waits for the gateway (TCP check on port 18789, up to 3 min)
2. Checks MLflow connectivity if configured
3. Runs `clawbench run` with settings from the ConfigMap
4. Logs results to MLflow on success
5. Sleeps indefinitely so you can retrieve logs and results
Verify:
```bash
kubectl get pods -n $CLAWBENCH_NAMESPACE
# Expect: openclaw-xxxx 2/2 Running (gateway + clawbench)
./scripts/k8s/deploy.sh --logs
# Should show "Waiting for gateway..." then "Starting eval..."
```
When finished, remove the sidecar:
```bash
./scripts/k8s/deploy.sh --remove-sidecar
```
---
## ConfigMap tuning
The clawbench ConfigMap (`scripts/k8s/manifests/configmap.yaml`) controls eval
behavior. Override at deploy time via env vars, or patch after deploy:
| Key | Default | What it controls |
|-----|---------|-----------------|
| `CLAWBENCH_MODEL` | `openai/gpt-5.5` | Model under test |
| `CLAWBENCH_RUNS` | `3` | Runs per task (19 tasks x 3 = 57 total) |
| `CLAWBENCH_CONCURRENCY` | `4` | Parallel eval lanes |
| `CLAWBENCH_JUDGE_MODEL` | *(empty)* | Separate judge model (optional) |
| `CLAWBENCH_TASKS` | *(empty — runs all)* | Space-separated task IDs (e.g. `t1-bugfix-discount t2-config-loader`) |
| `CLAWBENCH_CONNECT_TIMEOUT` | `120` | Gateway connect timeout in seconds |
| `CLAWBENCH_REQUEST_TIMEOUT` | `300` | Per-request timeout in seconds |
| `CLAWBENCH_PER_RUN_BUDGET_SECONDS` | `600` | Max wall time per run |
| `MLFLOW_TRACKING_URI` | `http://mlflow-service.mlflow.svc.cluster.local:5000` | MLflow endpoint |
| `MLFLOW_EXPERIMENT_NAME` | `clawbench` | MLflow experiment name |
---
## MLflow integration
Results are logged via `scripts/log_to_mlflow.py` after a successful eval.
**What gets logged:**
- **Params**: model, provider, benchmark version, OpenClaw version, judge model
- **Metrics**: overall score, per-axis scores (completion, trajectory, behavior,
reliability), cost, tokens, latency, CI bounds, per-tier and per-task scores
- **Tags**: submission ID, timestamp, certified flag
- **Artifacts**: full benchmark result JSON
---
## Building images
### ClawBench image
`quay.io/sallyom/clawbench:latest` is public
For Kubernetes, use the lightweight sidecar image instead — it only includes
the eval harness and MLflow client:
```bash
docker build -t clawbench:latest -f scripts/k8s/Dockerfile .
# For Kind clusters, load directly instead of pushing to a registry:
kind load docker-image clawbench:latest --name openclaw
# For non-Kind clusters, push to registry and set CLAWBENCH_IMAGE accordingly
# Ensure you build for the right architecture, usually amd64 for non-local k8s
```
Set `CLAWBENCH_IMAGE=clawbench:latest` when running `deploy.sh` to use it.
---
## Cleanup
```bash
# Remove eval sidecar only (keeps OpenClaw + MLflow running for another eval)
./scripts/k8s/deploy.sh --remove-sidecar
# Delete eval namespace (keeps MLflow running)
./scripts/k8s/deploy.sh --teardown
# Delete the Kind cluster entirely
kind delete cluster --name openclaw
```

View File

@ -10,7 +10,8 @@ dependencies = [
"pydantic>=2.7,<3",
"pyyaml>=6.0,<7",
"datasets>=3.0,<4",
"gradio>=5.0,<6",
"gradio>=6.7.0,<7",
"pillow>=12.2.0,<13",
"httpx>=0.27,<1",
"numpy>=1.26,<3",
"rich>=13.0,<14",
@ -18,8 +19,8 @@ dependencies = [
# Runtime deps for the task completion verifier. The harness shells out
# to `pytest -q` / `pytest-asyncio` inside per-task workspaces as the
# execution check; the container must have them in PATH.
"pytest>=8.0,<9",
"pytest-asyncio>=0.24,<1",
"pytest>=9.0.3,<10",
"pytest-asyncio>=1,<2",
]
[project.optional-dependencies]
@ -27,11 +28,17 @@ dev = [
# Kept as an alias for historical `pip install .[dev]` invocations.
# pytest + pytest-asyncio are now in the base [dependencies] since the
# benchmark itself runs pytest in task workspaces.
"pytest>=8.0,<9",
"pytest-asyncio>=0.24,<1",
"pytest>=9.0.3,<10",
"pytest-asyncio>=1,<2",
"pre-commit>=4.0,<5",
"ruff>=0.9,<1",
]
mlflow = [
"mlflow>=2.10,<3",
]
hermes = [
"hermes-agent @ git+https://github.com/NousResearch/hermes-agent.git@main",
]
[project.urls]
Homepage = "https://github.com/openclaw/clawbench"
@ -49,6 +56,9 @@ build-backend = "hatchling.build"
packages = ["clawbench"]
force-include = { "tasks-public" = "tasks-public", "tasks-domain" = "tasks-domain", "profiles" = "profiles", "baselines" = "baselines", "CLAWBENCH_V0_4_SPEC.md" = "CLAWBENCH_V0_4_SPEC.md", "PARTNER_TRACE_SPEC.md" = "PARTNER_TRACE_SPEC.md" }
[tool.hatch.metadata]
allow-direct-references = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
addopts = ["-p", "no:opik"]

View File

@ -0,0 +1,198 @@
#!/bin/bash
# Cherry-pick variant of container_sweep_single.sh: runs ONLY the tasks listed
# in $CHERRY_TASKS (comma-separated task IDs), with state-dir isolation.
#
# Required env vars:
# SWEEP_LABEL (e.g. opus47)
# SWEEP_MODEL (e.g. anthropic/claude-opus-4-7)
# SWEEP_PROFILE (absolute path in container)
# SWEEP_LOGDIR (default /data/drift_2026-04-20-cherry)
# SWEEP_OUT_TAG (default v2026-4-20-cherry)
# CHERRY_TASKS (comma-separated task IDs, e.g. "t2-ctx-pronoun-resolve,t3-fin-budget-monthly")
set -u
: "${SWEEP_LABEL:?SWEEP_LABEL required}"
: "${SWEEP_MODEL:?SWEEP_MODEL required}"
: "${SWEEP_PROFILE:?SWEEP_PROFILE required}"
: "${CHERRY_TASKS:?CHERRY_TASKS required (comma-separated task IDs)}"
: "${SWEEP_LOGDIR:=/data/drift_2026-04-20-cherry}"
: "${SWEEP_OUT_TAG:=v2026-4-20-cherry}"
cd /data
LOGDIR="$SWEEP_LOGDIR"
mkdir -p "$LOGDIR"
export OPENCLAW_GATEWAY_TOKEN="local-dev-token-for-testing"
export CLAWBENCH_RUN_CACHE_DIR="/data/run_cache"
mkdir -p "$CLAWBENCH_RUN_CACHE_DIR"
export NODE_OPTIONS="--max-old-space-size=4096"
# OpenClaw 4.22+ has slower agents.create / sessions.create on cold start
# (we observed 72s for opus-4-7). Bump RPC timeouts so the harness doesn't
# cancel mid-flight. Override defaults of 30s / 60s respectively.
export CLAWBENCH_CONNECT_TIMEOUT="${CLAWBENCH_CONNECT_TIMEOUT:-120}"
export CLAWBENCH_REQUEST_TIMEOUT="${CLAWBENCH_REQUEST_TIMEOUT:-300}"
export CLAWBENCH_PER_RUN_BUDGET_SECONDS="${CLAWBENCH_PER_RUN_BUDGET_SECONDS:-900}"
export HERMES_STEP_TIMEOUT_SECONDS="${HERMES_STEP_TIMEOUT_SECONDS:-180}"
# State-dir isolation (same as container_sweep_single.sh)
SRC_STATE="/home/node/.openclaw"
FRESH_STATE="/tmp/openclaw-state-${SWEEP_LABEL}-$$"
echo "[state-isolate] cloning config from $SRC_STATE to $FRESH_STATE"
mkdir -p "$FRESH_STATE"
[ -f "$SRC_STATE/openclaw.json" ] && cp "$SRC_STATE/openclaw.json" "$FRESH_STATE/openclaw.json"
[ -f "$SRC_STATE/exec-approvals.json" ] && cp "$SRC_STATE/exec-approvals.json" "$FRESH_STATE/exec-approvals.json"
for d in identity devices tasks subagents flows cron; do
[ -d "$SRC_STATE/$d" ] && cp -r "$SRC_STATE/$d" "$FRESH_STATE/$d"
done
mkdir -p "$FRESH_STATE/agents" "$FRESH_STATE/workspace" "$FRESH_STATE/logs" "$FRESH_STATE/memory" "$FRESH_STATE/cache"
export OPENCLAW_STATE_DIR="$FRESH_STATE"
export OPENCLAW_CONFIG_PATH="$FRESH_STATE/openclaw.json"
echo "[state-isolate] OPENCLAW_STATE_DIR=$OPENCLAW_STATE_DIR"
python - <<'PY'
import json
import os
from pathlib import Path
cfg_path = Path(os.environ["OPENCLAW_CONFIG_PATH"])
data = json.loads(cfg_path.read_text(encoding="utf-8")) if cfg_path.exists() else {}
def set_nested(root, dotted, value):
cursor = root
parts = dotted.split(".")
for part in parts[:-1]:
child = cursor.get(part)
if not isinstance(child, dict):
child = {}
cursor[part] = child
cursor = child
cursor[parts[-1]] = value
exec_host = os.environ.get("OPENCLAW_EXEC_HOST", "gateway").strip().lower()
if exec_host not in {"auto", "gateway", "sandbox", "node"}:
raise SystemExit(f"invalid OPENCLAW_EXEC_HOST={exec_host!r}")
set_nested(data, "tools.exec.host", exec_host)
set_nested(data, "tools.exec.security", "full")
set_nested(data, "tools.exec.ask", "off")
set_nested(data, "approvals.exec.enabled", False)
cfg_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
approvals_path = cfg_path.with_name("exec-approvals.json")
approvals = {
"version": 1,
"socket": {
"path": str(approvals_path.with_suffix(".sock")),
"token": "container-cherry-eval-token",
},
"defaults": {"security": "full", "ask": "off", "askFallback": "full"},
"agents": {"*": {"security": "full", "ask": "off", "askFallback": "full"}},
}
approvals_path.write_text(json.dumps(approvals, indent=2) + "\n", encoding="utf-8")
PY
# Map model to cache subdir (for archiving)
case "$SWEEP_MODEL" in
anthropic/claude-opus-4-7) CACHE_SUB="anthropic_claude-opus-4-7" ;;
anthropic/claude-opus-4-6) CACHE_SUB="anthropic_claude-opus-4-6" ;;
anthropic/claude-sonnet-4-6) CACHE_SUB="anthropic_claude-sonnet-4-6" ;;
openai/gpt-5.5) CACHE_SUB="openai_gpt-5.5" ;;
openai/gpt-5.4) CACHE_SUB="openai_gpt-5.4" ;;
google/gemini-3.1-pro-preview) CACHE_SUB="google_gemini-3.1-pro-preview" ;;
openrouter/z-ai/glm-5.1) CACHE_SUB="openrouter_z-ai_glm-5.1" ;;
openrouter/qwen/qwen3.6-plus) CACHE_SUB="openrouter_qwen_qwen3.6-plus" ;;
openrouter/minimax/minimax-m2.7) CACHE_SUB="openrouter_minimax_minimax-m2.7" ;;
openrouter/moonshotai/kimi-k2.6) CACHE_SUB="openrouter_moonshotai_kimi-k2.6" ;;
openrouter/moonshotai/kimi-k2.5) CACHE_SUB="openrouter_moonshotai_kimi-k2.5" ;;
openrouter/deepseek/deepseek-v4-pro) CACHE_SUB="openrouter_deepseek_deepseek-v4-pro" ;;
deepseek/deepseek-v4-pro) CACHE_SUB="deepseek_deepseek-v4-pro" ;;
deepseek/v4-pro) CACHE_SUB="deepseek_v4-pro" ;;
*) CACHE_SUB="" ;;
esac
OUT="$LOGDIR/docker_${SWEEP_LABEL}_${SWEEP_OUT_TAG}.json"
LOG="$LOGDIR/docker_${SWEEP_LABEL}_${SWEEP_OUT_TAG}.log"
GWLOG="$LOGDIR/gateway_${SWEEP_LABEL}.log"
echo "===== CHERRY-PICK SWEEP $(date '+%Y-%m-%d %H:%M:%S') ====="
echo "label: $SWEEP_LABEL"
echo "model: $SWEEP_MODEL"
echo "tasks: $CHERRY_TASKS"
echo "out: $OUT"
# Force-clear this model's run_cache (including fixed-task slots — so they
# actually re-run against the new image instead of hitting old cache).
if [ -n "$CACHE_SUB" ] && [ -d "$CLAWBENCH_RUN_CACHE_DIR/$CACHE_SUB" ]; then
echo "clearing cache: $CLAWBENCH_RUN_CACHE_DIR/$CACHE_SUB"
rm -rf "$CLAWBENCH_RUN_CACHE_DIR/$CACHE_SUB"
fi
[ -f "$OUT" ] && rm -f "$OUT"
# Start gateway with bumped heap
echo "Starting gateway on :18789 (heap=4GB) ..."
openclaw gateway --port 18789 > "$GWLOG" 2>&1 &
GATEWAY_PID=$!
ready=0
for i in $(seq 1 120); do
if curl -sf -H "Authorization: Bearer $OPENCLAW_GATEWAY_TOKEN" http://127.0.0.1:18789/ready > /dev/null 2>&1; then
echo "Gateway ready after ${i}s"
ready=1
break
fi
sleep 1
done
if [ $ready -ne 1 ]; then
echo "ERROR: gateway failed to become ready within 120s"
tail -30 "$GWLOG"
exit 1
fi
# Build -t args from comma-separated list
TASK_ARGS=()
IFS=',' read -ra TASK_ARR <<< "$CHERRY_TASKS"
for t in "${TASK_ARR[@]}"; do
TASK_ARGS+=("-t" "$t")
done
echo "===== $(date '+%H:%M:%S') running clawbench with tasks: ${TASK_ARR[*]} ====="
# NOTE: --profile intentionally OMITTED. The legacy frontier_*.yaml profile
# format is incompatible with OpenClaw 4.22+ (loads n_tools_total=0,
# starves the agent of tools, all runs fail with environment_unavailable
# or timeout). Running with the default openclaw tool stack — same for
# all models, so the comparison stays apples-to-apples.
PROFILE_ARG=""
if [ -n "${USE_PROFILE:-}" ] && [ -f "$SWEEP_PROFILE" ]; then
PROFILE_ARG="--profile $SWEEP_PROFILE"
fi
clawbench run \
--model "$SWEEP_MODEL" \
--runs 3 \
--concurrency "${CLAWBENCH_CONCURRENCY:-1}" \
$PROFILE_ARG \
--judge-model "anthropic/claude-sonnet-4-6" \
"${TASK_ARGS[@]}" \
-o "$OUT" \
> "$LOG" 2>&1
status=$?
if [ $status -eq 0 ]; then
echo "===== $(date '+%H:%M:%S') done $SWEEP_LABEL (exit 0) ====="
else
echo "===== $(date '+%H:%M:%S') FAILED $SWEEP_LABEL (exit $status) ====="
tail -20 "$LOG"
fi
# Archive cache to v2026-4-20-cherry tag
# shellcheck disable=SC1091
source "$(dirname "$0")/_archive_cache.sh" 2>/dev/null && archive_run_cache || echo "[archive] helper missing"
kill $GATEWAY_PID 2>/dev/null
wait $GATEWAY_PID 2>/dev/null
# Clean up isolated state dir
[ -n "${FRESH_STATE:-}" ] && [ -d "$FRESH_STATE" ] && rm -rf "$FRESH_STATE"
exit $status

231
scripts/container_lane_eval.sh Executable file
View File

@ -0,0 +1,231 @@
#!/bin/bash
# Run one OpenClaw model/profile through the HF-style isolated lane worker.
set -Eeuo pipefail
: "${SWEEP_MODEL:?SWEEP_MODEL required}"
: "${SWEEP_LABEL:?SWEEP_LABEL required}"
: "${SWEEP_OUT_TAG:=lane-container}"
: "${SWEEP_LANES:=3}"
: "${SWEEP_RUNS:=1}"
: "${SWEEP_LOGDIR:=/data/results}"
: "${CLAWBENCH_PER_RUN_BUDGET_SECONDS:=900}"
: "${CLAWBENCH_PER_TURN_TIMEOUT_SECONDS:=300}"
: "${OPENCLAW_EXEC_HOST:=gateway}"
cd /home/node/app
export CLAWBENCH_LOCAL_QUEUE_DIR="${CLAWBENCH_LOCAL_QUEUE_DIR:-/data/queue/$SWEEP_LABEL}"
mkdir -p "$SWEEP_LOGDIR" /data/results "$CLAWBENCH_LOCAL_QUEUE_DIR" /data/run_cache /data/lane_runtime
export HF_TOKEN=""
export OPENCLAW_GATEWAY_TOKEN="${OPENCLAW_GATEWAY_TOKEN:-local-dev-token-for-testing}"
export OPENCLAW_SKIP_GMAIL_WATCHER=1
export OPENCLAW_SKIP_CANVAS_HOST=1
export OPENCLAW_NO_RESPAWN=1
export CLAWBENCH_DISABLE_GATEWAY_DEVICE_IDENTITY=1
export CLAWBENCH_PER_RUN_BUDGET_SECONDS
export CLAWBENCH_PER_TURN_TIMEOUT_SECONDS
export CLAWBENCH_CONNECT_TIMEOUT="${CLAWBENCH_CONNECT_TIMEOUT:-180}"
export CLAWBENCH_REQUEST_TIMEOUT="${CLAWBENCH_REQUEST_TIMEOUT:-300}"
export CLAWBENCH_GATEWAY_HEALTH_TIMEOUT_SECONDS="${CLAWBENCH_GATEWAY_HEALTH_TIMEOUT_SECONDS:-240}"
export CLAWBENCH_LANE_STARTUP_STAGGER_SECONDS="${CLAWBENCH_LANE_STARTUP_STAGGER_SECONDS:-90}"
export CLAWBENCH_GATEWAY_READY_MARKER_GRACE_SECONDS="${CLAWBENCH_GATEWAY_READY_MARKER_GRACE_SECONDS:-90}"
export CLAWBENCH_KEEP_PARALLEL_LANE_ROOT="${CLAWBENCH_KEEP_PARALLEL_LANE_ROOT:-0}"
export CLAWBENCH_PARALLEL_LANE_ROOT="/data/lane_runtime/$SWEEP_LABEL"
export CLAWBENCH_TOOL_PROFILE_NAME="${CLAWBENCH_TOOL_PROFILE_NAME:-$SWEEP_LABEL}"
export NODE_OPTIONS="${NODE_OPTIONS:-"--max-old-space-size=4096"}"
if command -v npm >/dev/null 2>&1; then
export NODE_PATH="${NODE_PATH:-$(npm root -g 2>/dev/null || true)}"
fi
SRC_STATE="${OPENCLAW_CONFIG_SOURCE:-/config/openclaw}"
if [ ! -d "$SRC_STATE" ]; then
SRC_STATE="/home/node/.openclaw"
fi
safe_model="${SWEEP_MODEL//\//_}"
safe_model="${safe_model//:/_}"
OUT="$SWEEP_LOGDIR/${SWEEP_LABEL}_openclaw_${safe_model}_${SWEEP_OUT_TAG}.json"
LOG="$SWEEP_LOGDIR/${SWEEP_LABEL}_openclaw_${safe_model}_${SWEEP_OUT_TAG}.log"
export SWEEP_OUTPUT_PATH="$OUT"
FRESH_HOME="/tmp/openclaw-home-${SWEEP_LABEL}-$$"
FRESH_STATE="$FRESH_HOME/.openclaw"
rm -rf "$FRESH_HOME" "$CLAWBENCH_PARALLEL_LANE_ROOT"
mkdir -p "$FRESH_STATE" "$FRESH_HOME/.config"
if [ -f "$SRC_STATE/openclaw.json" ]; then
cp "$SRC_STATE/openclaw.json" "$FRESH_STATE/openclaw.json"
fi
if [ -d "$SRC_STATE/plugins" ]; then
mkdir -p "$FRESH_STATE/plugins"
cp -R "$SRC_STATE/plugins/." "$FRESH_STATE/plugins/" 2>/dev/null || true
fi
mkdir -p \
"$FRESH_STATE/agents" \
"$FRESH_STATE/workspace" \
"$FRESH_STATE/logs" \
"$FRESH_STATE/memory" \
"$FRESH_STATE/cache" \
"$FRESH_STATE/identity" \
"$FRESH_STATE/devices" \
"$FRESH_STATE/tasks" \
"$FRESH_STATE/subagents" \
"$FRESH_STATE/flows" \
"$FRESH_STATE/cron"
export HOME="$FRESH_HOME"
export OPENCLAW_HOME="$FRESH_HOME"
export OPENCLAW_STATE_DIR="$FRESH_STATE"
export OPENCLAW_CONFIG_PATH="$FRESH_STATE/openclaw.json"
export XDG_CONFIG_HOME="$FRESH_HOME/.config"
python - <<'PY'
import json
import os
from pathlib import Path
cfg_path = Path(os.environ["OPENCLAW_CONFIG_PATH"])
if not cfg_path.exists():
raise SystemExit("missing openclaw.json")
data = json.loads(cfg_path.read_text(encoding="utf-8"))
def set_nested(root, dotted, value):
cursor = root
parts = dotted.split(".")
for part in parts[:-1]:
child = cursor.get(part)
if not isinstance(child, dict):
child = {}
cursor[part] = child
cursor = child
cursor[parts[-1]] = value
agents = data.setdefault("agents", {})
if isinstance(agents, dict):
agents["list"] = []
channels = data.get("channels")
if isinstance(channels, dict):
for channel in channels.values():
if isinstance(channel, dict):
channel["enabled"] = False
exec_approvals = channel.get("execApprovals")
if not isinstance(exec_approvals, dict):
exec_approvals = {}
channel["execApprovals"] = exec_approvals
exec_approvals["enabled"] = False
plugins = data.setdefault("plugins", {})
stale = {"marxbiotech-git-tools", "lab"}
allow = plugins.get("allow")
if isinstance(allow, list):
plugins["allow"] = [item for item in allow if item not in stale]
entries = plugins.get("entries")
if isinstance(entries, dict):
for item in stale:
entries.pop(item, None)
set_nested(data, "browser.headless", True)
set_nested(data, "browser.noSandbox", True)
set_nested(data, "gateway.reload.mode", "off")
set_nested(data, "agents.defaults.skipBootstrap", True)
set_nested(data, "agents.defaults.sandbox.mode", "off")
set_nested(data, "agents.defaults.model.primary", os.environ["SWEEP_MODEL"])
set_nested(data, "agents.defaults.subagents.model.primary", os.environ["SWEEP_MODEL"])
set_nested(
data,
"agents.defaults.systemPromptOverride",
"You are running an OpenClaw benchmark task. Complete the user's request in the current "
"workspace using the available tools when needed. For file, code, browser, shell, or memory "
"tasks, make the requested changes directly and verify them when practical. Do not ask "
"follow-up questions during the benchmark. Keep any final reply brief.",
)
set_nested(data, "tools.exec.host", os.environ.get("OPENCLAW_EXEC_HOST", "gateway"))
set_nested(data, "tools.exec.security", "full")
set_nested(data, "tools.exec.ask", "off")
set_nested(data, "approvals.exec.enabled", False)
models = data.setdefault("agents", {}).setdefault("defaults", {}).setdefault("models", {})
model_entry = models.setdefault(os.environ["SWEEP_MODEL"], {})
params = model_entry.setdefault("params", {})
params["fastMode"] = True
if os.environ["SWEEP_MODEL"].startswith("openai/"):
params["transport"] = "sse"
params["openaiWsWarmup"] = False
cfg_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
approvals_path = cfg_path.with_name("exec-approvals.json")
approvals = {
"version": 1,
"socket": {
"path": str(approvals_path.with_suffix(".sock")),
"token": "container-lane-eval-token",
},
"defaults": {"security": "full", "ask": "off", "askFallback": "full"},
"agents": {"*": {"security": "full", "ask": "off", "askFallback": "full"}},
}
approvals_path.write_text(json.dumps(approvals, indent=2) + "\n", encoding="utf-8")
PY
echo "===== CONTAINER LANE EVAL START $(date '+%Y-%m-%d %H:%M:%S') ====="
echo "label: $SWEEP_LABEL"
echo "model: $SWEEP_MODEL"
echo "runs: $SWEEP_RUNS"
echo "lanes: $SWEEP_LANES"
echo "tasks: ${SWEEP_TASKS:-${CHERRY_TASKS:-all}}"
echo "out: $OUT"
echo "log: $LOG"
echo "home: $HOME"
echo "state: $OPENCLAW_STATE_DIR"
openclaw --version 2>/dev/null || true
set +e
python - <<'PY' > "$LOG" 2>&1
import asyncio
import json
import logging
import os
import shutil
from pathlib import Path
from clawbench.queue import JobQueue, JobStatus, SubmissionRequest
from clawbench.worker import EvalWorker, RESULTS_DIR
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
async def main() -> int:
queue = JobQueue()
queue._jobs.clear()
queue._save_local()
task_ids_raw = os.environ.get("SWEEP_TASKS") or os.environ.get("CHERRY_TASKS") or ""
task_ids = [item.strip() for item in task_ids_raw.split(",") if item.strip()]
request = SubmissionRequest(
model=os.environ["SWEEP_MODEL"],
runs_per_task=int(os.environ["SWEEP_RUNS"]),
max_parallel_lanes=int(os.environ["SWEEP_LANES"]),
task_ids=task_ids,
prompt_variant=os.environ.get("SWEEP_PROMPT_VARIANT", "clear"),
judge_model=os.environ.get("CLAWBENCH_JUDGE_MODEL", ""),
notes=os.environ.get("SWEEP_LABEL", ""),
)
job = await queue.submit(request)
worker = EvalWorker(queue)
await worker._process_job(job)
final = await queue.get_status(job.job_id)
print(json.dumps(final.model_dump() if final else {}, indent=2), flush=True)
if final is None or final.status != JobStatus.FINISHED or not final.result_id:
return 1
result_path = RESULTS_DIR / f"{final.result_id}.json"
output_path = Path(os.environ["SWEEP_OUTPUT_PATH"])
output_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(result_path, output_path)
return 0
raise SystemExit(asyncio.run(main()))
PY
status=$?
set -e
echo "===== lane eval exit=$status $(date '+%Y-%m-%d %H:%M:%S') ====="
tail -120 "$LOG" 2>/dev/null || true
exit "$status"

View File

@ -43,6 +43,13 @@ mkdir -p "$CLAWBENCH_RUN_CACHE_DIR"
# OOM fix: give the gateway Node process a 4GB old-space ceiling instead of the default ~2GB.
# Scoped via env so we don't stomp on other Node processes (clawbench itself is python).
export NODE_OPTIONS="--max-old-space-size=4096"
# OpenClaw 4.22+ has slower agents.create / sessions.create on cold start
# (we observed 72s for opus-4-7). Bump RPC timeouts so the harness doesn't
# cancel mid-flight. Override defaults of 30s / 60s respectively.
export CLAWBENCH_CONNECT_TIMEOUT="${CLAWBENCH_CONNECT_TIMEOUT:-120}"
export CLAWBENCH_REQUEST_TIMEOUT="${CLAWBENCH_REQUEST_TIMEOUT:-300}"
export CLAWBENCH_PER_RUN_BUDGET_SECONDS="${CLAWBENCH_PER_RUN_BUDGET_SECONDS:-900}"
export HERMES_STEP_TIMEOUT_SECONDS="${HERMES_STEP_TIMEOUT_SECONDS:-180}"
# State-dir isolation: the shared /home/node/.openclaw mount accumulates cruft
# across sweeps (agents/, workspace/, logs/, memory/, stale openclaw.json.*.tmp)
@ -73,23 +80,68 @@ done
# Ensure runtime dirs exist but are empty
mkdir -p "$FRESH_STATE/agents" "$FRESH_STATE/workspace" "$FRESH_STATE/logs" "$FRESH_STATE/memory" "$FRESH_STATE/cache"
export OPENCLAW_STATE_DIR="$FRESH_STATE"
export OPENCLAW_CONFIG_PATH="$FRESH_STATE/openclaw.json"
echo "[state-isolate] OPENCLAW_STATE_DIR=$OPENCLAW_STATE_DIR"
du -sh "$FRESH_STATE" 2>/dev/null | sed 's/^/[state-isolate] size: /'
python - <<'PY'
import json
import os
from pathlib import Path
cfg_path = Path(os.environ["OPENCLAW_CONFIG_PATH"])
data = json.loads(cfg_path.read_text(encoding="utf-8")) if cfg_path.exists() else {}
def set_nested(root, dotted, value):
cursor = root
parts = dotted.split(".")
for part in parts[:-1]:
child = cursor.get(part)
if not isinstance(child, dict):
child = {}
cursor[part] = child
cursor = child
cursor[parts[-1]] = value
exec_host = os.environ.get("OPENCLAW_EXEC_HOST", "gateway").strip().lower()
if exec_host not in {"auto", "gateway", "sandbox", "node"}:
raise SystemExit(f"invalid OPENCLAW_EXEC_HOST={exec_host!r}")
set_nested(data, "tools.exec.host", exec_host)
set_nested(data, "tools.exec.security", "full")
set_nested(data, "tools.exec.ask", "off")
set_nested(data, "approvals.exec.enabled", False)
cfg_path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
approvals_path = cfg_path.with_name("exec-approvals.json")
approvals = {
"version": 1,
"socket": {
"path": str(approvals_path.with_suffix(".sock")),
"token": "container-single-eval-token",
},
"defaults": {"security": "full", "ask": "off", "askFallback": "full"},
"agents": {"*": {"security": "full", "ask": "off", "askFallback": "full"}},
}
approvals_path.write_text(json.dumps(approvals, indent=2) + "\n", encoding="utf-8")
PY
# Map label -> cache subdir (matches what clawbench writes)
case "$SWEEP_MODEL" in
anthropic/claude-opus-4-7) CACHE_SUB="anthropic_claude-opus-4-7" ;;
anthropic/claude-sonnet-4-7) CACHE_SUB="anthropic_claude-sonnet-4-7" ;;
anthropic/claude-opus-4-6) CACHE_SUB="anthropic_claude-opus-4-6" ;;
anthropic/claude-sonnet-4-6) CACHE_SUB="anthropic_claude-sonnet-4-6" ;;
openai/gpt-5.5) CACHE_SUB="openai_gpt-5.5" ;;
openai/gpt-5.4) CACHE_SUB="openai_gpt-5.4" ;;
openai/gpt-5.2) CACHE_SUB="openai_gpt-5.2" ;;
google/gemini-3.1-pro-preview) CACHE_SUB="google_gemini-3.1-pro-preview" ;;
openrouter/z-ai/glm-5.1) CACHE_SUB="openrouter_z-ai_glm-5.1" ;;
openrouter/qwen/qwen3.6-plus) CACHE_SUB="openrouter_qwen_qwen3.6-plus" ;;
openrouter/minimax/minimax-m2.7) CACHE_SUB="openrouter_minimax_minimax-m2.7" ;;
openrouter/moonshotai/kimi-k2.6) CACHE_SUB="openrouter_moonshotai_kimi-k2.6" ;;
openrouter/moonshotai/kimi-k2.5) CACHE_SUB="openrouter_moonshotai_kimi-k2.5" ;;
# kimi-k2.6 is not yet supported in the openclaw version under test — skip.
deepseek/v4-pro) CACHE_SUB="deepseek_v4-pro" ;;
*) CACHE_SUB="" ;;
esac
@ -139,11 +191,19 @@ if [ $ready -ne 1 ]; then
fi
echo "===== $(date '+%H:%M:%S') starting $SWEEP_LABEL ($SWEEP_MODEL) ====="
# NOTE: --profile intentionally OMITTED unless USE_PROFILE=1 is set. The
# legacy frontier_*.yaml profile format is incompatible with OpenClaw
# 4.22+ (loads n_tools_total=0). Running with the default openclaw tool
# stack — identical across all models, so comparisons stay valid.
PROFILE_ARG=""
if [ -n "${USE_PROFILE:-}" ] && [ -f "$SWEEP_PROFILE" ]; then
PROFILE_ARG="--profile $SWEEP_PROFILE"
fi
clawbench run \
--model "$SWEEP_MODEL" \
--runs 3 \
--concurrency 4 \
--profile "$SWEEP_PROFILE" \
--concurrency "${CLAWBENCH_CONCURRENCY:-1}" \
$PROFILE_ARG \
--judge-model "anthropic/claude-sonnet-4-6" \
-o "$OUT" \
> "$LOG" 2>&1

33
scripts/k8s/Dockerfile Normal file
View File

@ -0,0 +1,33 @@
# Lightweight ClawBench image for Kubernetes sidecar use.
# Does NOT include the full OpenClaw server or Chromium — the gateway runs
# in a separate container. Node.js is copied from the OpenClaw image for
# the device-identity handshake required by the gateway protocol.
FROM ghcr.io/openclaw/openclaw:latest AS openclaw
FROM python:3.12-slim
COPY --from=openclaw /usr/local/bin/node /usr/local/bin/node
RUN apt-get update && \
apt-get install -y --no-install-recommends git && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pyproject.toml README.md CLAWBENCH_V0_4_SPEC.md PARTNER_TRACE_SPEC.md ./
COPY clawbench/ clawbench/
COPY tasks-public/ tasks-public/
COPY tasks-domain/ tasks-domain/
COPY profiles/ profiles/
COPY baselines/ baselines/
COPY scripts/ scripts/
RUN pip install --no-cache-dir ".[mlflow]"
RUN mkdir -p /results && chmod 777 /results
RUN useradd -m -d /home/node clawbench
USER clawbench
ENV HOME=/home/node
ENTRYPOINT ["clawbench"]

486
scripts/k8s/deploy.sh Executable file
View File

@ -0,0 +1,486 @@
#!/usr/bin/env bash
# Deploy ClawBench evals on Kubernetes (works on OpenShift too).
#
# 0-to-hero pipeline:
# Step 0: Create a cluster (see --help for Kind instructions)
# Step 1: Deploy OpenClaw gateway (optional — bring your own)
# Step 2: Deploy MLflow tracking server (optional — bring your own)
# Step 3: Run evals via sidecar (add / remove)
#
# Usage:
# ./scripts/k8s/deploy.sh # Full deploy: OpenClaw + MLflow + eval
# ./scripts/k8s/deploy.sh --openclaw-only # Step 1: deploy OpenClaw gateway
# ./scripts/k8s/deploy.sh --mlflow-only # Step 2: deploy MLflow
# ./scripts/k8s/deploy.sh --add-sidecar # Step 3: add eval sidecar (starts eval)
# ./scripts/k8s/deploy.sh --remove-sidecar # Step 3: remove eval sidecar
# ./scripts/k8s/deploy.sh --logs # Tail clawbench sidecar logs
# ./scripts/k8s/deploy.sh --teardown # Delete eval namespace (keeps MLflow)
#
# Environment (required):
# CLAWBENCH_NAMESPACE Namespace for OpenClaw + eval
# OPENAI_API_KEY Model provider API key (or another provider key)
#
# Environment (optional):
# CLAWBENCH_IMAGE Clawbench image (default: quay.io/sallyom/clawbench:latest)
# OPENCLAW_IMAGE OpenClaw image (default: ghcr.io/openclaw/openclaw:latest)
# OPENCLAW_GATEWAY_TOKEN Existing gateway token (generated if unset)
# CLAWBENCH_MODEL Model to eval (default: openai/gpt-5.5)
# MLFLOW_NAMESPACE MLflow namespace (default: mlflow)
# MLFLOW_TRACKING_URI External MLflow URI (skips MLflow deploy if set)
# MLFLOW_EXPERIMENT_ID MLflow experiment ID
# MLFLOW_EXPERIMENT_NAME MLflow experiment name
# MLFLOW_IMAGE MLflow image (default: ghcr.io/mlflow/mlflow:v2.21.3)
# ANTHROPIC_API_KEY Anthropic key (added to secret if set)
# OPENROUTER_API_KEY OpenRouter key (added to secret if set)
# GEMINI_API_KEY Gemini key (added to secret if set)
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
NS="${CLAWBENCH_NAMESPACE:-}"
MLFLOW_NS="${MLFLOW_NAMESPACE:-mlflow}"
CLAWBENCH_IMG="${CLAWBENCH_IMAGE:-quay.io/sallyom/clawbench:latest}"
OPENCLAW_IMG="${OPENCLAW_IMAGE:-ghcr.io/openclaw/openclaw:latest}"
MLFLOW_IMG="${MLFLOW_IMAGE:-ghcr.io/mlflow/mlflow:v2.21.3}"
# ---------------------------------------------------------------------------
if [[ "${1:-}" == "-h" || "${1:-}" == "--help" ]]; then
cat <<'HELP'
ClawBench Kubernetes Deployment
===============================
0-to-hero pipeline for running ClawBench evals on Kubernetes.
Step 0: Create a cluster
For local testing with Kind, see:
https://github.com/openclaw/openclaw/blob/main/docs/install/kubernetes.md#local-testing-with-kind
Step 1: Deploy OpenClaw gateway (optional — skip if you have one)
Step 2: Deploy MLflow tracking server (optional — skip if you have one)
Step 3: Run evals via sidecar (add/remove to OpenClaw deployment)
Usage:
./scripts/k8s/deploy.sh Full deploy (steps 1+2+3)
./scripts/k8s/deploy.sh --openclaw-only Step 1: OpenClaw only
./scripts/k8s/deploy.sh --mlflow-only Step 2: MLflow only
./scripts/k8s/deploy.sh --add-sidecar Step 3: add eval sidecar (starts eval)
./scripts/k8s/deploy.sh --remove-sidecar Step 3: remove eval sidecar
./scripts/k8s/deploy.sh --logs Tail clawbench sidecar logs
./scripts/k8s/deploy.sh --teardown Delete eval namespace (keeps MLflow)
Required environment:
CLAWBENCH_NAMESPACE Namespace for OpenClaw + eval
OPENAI_API_KEY Model provider API key (or ANTHROPIC_API_KEY, etc.)
Optional environment:
CLAWBENCH_IMAGE Clawbench image (default: quay.io/sallyom/clawbench:latest)
OPENCLAW_IMAGE OpenClaw image (default: ghcr.io/openclaw/openclaw:latest)
OPENCLAW_GATEWAY_TOKEN Existing gateway token (generated if unset)
CLAWBENCH_MODEL Model to eval (default: openai/gpt-5.5)
MLFLOW_NAMESPACE MLflow namespace (default: mlflow)
MLFLOW_TRACKING_URI External MLflow URI (skips MLflow deploy)
MLFLOW_EXPERIMENT_ID MLflow experiment ID
MLFLOW_EXPERIMENT_NAME MLflow experiment name
MLFLOW_IMAGE MLflow image (default: ghcr.io/mlflow/mlflow:v2.21.3)
ANTHROPIC_API_KEY Anthropic key (added to secret if set)
OPENROUTER_API_KEY OpenRouter key (added to secret if set)
GEMINI_API_KEY Gemini key (added to secret if set)
Works on Kubernetes and OpenShift.
HELP
exit 0
fi
command -v kubectl &>/dev/null || { echo "Missing: kubectl" >&2; exit 1; }
if [[ -z "$NS" ]]; then
echo "CLAWBENCH_NAMESPACE is required." >&2
echo " export CLAWBENCH_NAMESPACE=clawbench-eval" >&2
exit 1
fi
MODE="full"
while [[ $# -gt 0 ]]; do
case "$1" in
--openclaw-only) MODE="openclaw-only" ;;
--mlflow-only) MODE="mlflow-only" ;;
--add-sidecar) MODE="add-sidecar" ;;
--remove-sidecar) MODE="remove-sidecar" ;;
--logs) MODE="logs" ;;
--teardown) MODE="teardown" ;;
*) echo "Unknown option: $1" >&2; exit 1 ;;
esac
shift
done
kubectl cluster-info &>/dev/null || { echo "Cannot connect to cluster. Check kubeconfig." >&2; exit 1; }
# ---------------------------------------------------------------------------
# --logs
# ---------------------------------------------------------------------------
if [[ "$MODE" == "logs" ]]; then
kubectl logs deploy/openclaw -c clawbench -n "$NS" -f
exit 0
fi
# ---------------------------------------------------------------------------
# --teardown
# ---------------------------------------------------------------------------
if [[ "$MODE" == "teardown" ]]; then
echo "Deleting namespace '$NS'..."
kubectl delete namespace "$NS" --ignore-not-found
echo "Done. MLflow namespace '$MLFLOW_NS' was not deleted."
exit 0
fi
# ---------------------------------------------------------------------------
# --remove-sidecar
# ---------------------------------------------------------------------------
if [[ "$MODE" == "remove-sidecar" ]]; then
echo "Removing clawbench sidecar from openclaw in namespace '$NS'..."
INDEX=$(kubectl get deploy/openclaw -n "$NS" -o json \
| python3 -c "import json,sys; cs=json.load(sys.stdin)['spec']['template']['spec']['containers']; print(next((i for i,c in enumerate(cs) if c['name']=='clawbench'),-1))")
if [[ "$INDEX" == "-1" ]]; then
echo "No clawbench sidecar found."
else
kubectl patch deploy/openclaw -n "$NS" --type=json \
-p "[{\"op\":\"remove\",\"path\":\"/spec/template/spec/containers/$INDEX\"}]"
echo "Sidecar removed."
fi
exit 0
fi
# ---------------------------------------------------------------------------
# Create namespace + secret
# ---------------------------------------------------------------------------
ensure_namespace_and_secret() {
if ! kubectl get namespace "$NS" &>/dev/null; then
echo "Creating namespace '$NS'..."
kubectl create namespace "$NS"
fi
if ! kubectl get secret clawbench-secrets -n "$NS" &>/dev/null; then
echo "Creating clawbench-secrets..."
if [[ -n "${OPENCLAW_GATEWAY_TOKEN:-}" ]]; then
GATEWAY_TOKEN="$OPENCLAW_GATEWAY_TOKEN"
GATEWAY_TOKEN_SOURCE="from OPENCLAW_GATEWAY_TOKEN"
else
GATEWAY_TOKEN=$(python3 -c "import secrets,base64; print(base64.b64encode(secrets.token_bytes(32)).decode())")
GATEWAY_TOKEN_SOURCE="generated"
fi
SECRET_ARGS=(
--from-literal=OPENCLAW_GATEWAY_TOKEN="$GATEWAY_TOKEN"
)
[[ -n "${OPENAI_API_KEY:-}" ]] && SECRET_ARGS+=(--from-literal=OPENAI_API_KEY="$OPENAI_API_KEY")
[[ -n "${ANTHROPIC_API_KEY:-}" ]] && SECRET_ARGS+=(--from-literal=ANTHROPIC_API_KEY="$ANTHROPIC_API_KEY")
[[ -n "${OPENROUTER_API_KEY:-}" ]] && SECRET_ARGS+=(--from-literal=OPENROUTER_API_KEY="$OPENROUTER_API_KEY")
[[ -n "${GEMINI_API_KEY:-}" ]] && SECRET_ARGS+=(--from-literal=GEMINI_API_KEY="$GEMINI_API_KEY")
if [[ ${#SECRET_ARGS[@]} -eq 1 ]]; then
echo "Warning: No API keys provided. Set OPENAI_API_KEY or another provider key." >&2
fi
kubectl create secret generic clawbench-secrets -n "$NS" "${SECRET_ARGS[@]}"
echo " Gateway token: $GATEWAY_TOKEN_SOURCE"
[[ -n "${OPENAI_API_KEY:-}" ]] && echo " OPENAI_API_KEY: set"
[[ -n "${ANTHROPIC_API_KEY:-}" ]] && echo " ANTHROPIC_API_KEY: set"
[[ -n "${OPENROUTER_API_KEY:-}" ]] && echo " OPENROUTER_API_KEY: set"
[[ -n "${GEMINI_API_KEY:-}" ]] && echo " GEMINI_API_KEY: set"
else
echo "Secret clawbench-secrets already exists in '$NS'."
fi
return 0
}
# ---------------------------------------------------------------------------
# Step 1: Deploy OpenClaw
# ---------------------------------------------------------------------------
deploy_openclaw() {
echo ""
echo "Step 1: Deploying OpenClaw gateway (image: $OPENCLAW_IMG)..."
kubectl apply -f "$SCRIPT_DIR/openclaw/configmap.yaml" -n "$NS"
# Patch gateway config with custom OpenAI-compatible base URL
if [[ -n "${OPENAI_API_BASE:-}" ]]; then
echo " Patching gateway config: models.providers.openai.baseUrl = $OPENAI_API_BASE"
EXISTING_JSON=$(kubectl get configmap openclaw-config -n "$NS" -o jsonpath='{.data.openclaw\.json}')
PATCHED_JSON=$(echo "$EXISTING_JSON" | python3 -c "
import json, sys, os
cfg = json.load(sys.stdin)
openai_cfg = cfg.setdefault('models', {}).setdefault('providers', {}).setdefault('openai', {})
openai_cfg['baseUrl'] = os.environ['OPENAI_API_BASE']
openai_cfg.setdefault('models', [])
json.dump(cfg, sys.stdout, indent=2)
")
kubectl create configmap openclaw-config -n "$NS" \
--from-literal="openclaw.json=$PATCHED_JSON" \
--dry-run=client -o yaml | kubectl apply -f - -n "$NS" >/dev/null
fi
kubectl apply -f "$SCRIPT_DIR/openclaw/pvc.yaml" -n "$NS"
kubectl apply -f "$SCRIPT_DIR/openclaw/service.yaml" -n "$NS"
if [[ "$OPENCLAW_IMG" != "ghcr.io/openclaw/openclaw:latest" ]]; then
kubectl apply -f "$SCRIPT_DIR/openclaw/deployment.yaml" -n "$NS"
kubectl set image "deploy/openclaw" "gateway=$OPENCLAW_IMG" -n "$NS"
else
kubectl apply -f "$SCRIPT_DIR/openclaw/deployment.yaml" -n "$NS"
fi
echo "Waiting for OpenClaw rollout..."
kubectl rollout status deploy/openclaw -n "$NS" --timeout=180s || \
echo " (rollout still in progress)"
echo "OpenClaw deployed."
}
# ---------------------------------------------------------------------------
# Step 2: Deploy MLflow
# ---------------------------------------------------------------------------
deploy_mlflow() {
if [[ -n "${MLFLOW_TRACKING_URI:-}" ]]; then
echo ""
echo "Step 2: Skipping MLflow deploy (MLFLOW_TRACKING_URI is set: $MLFLOW_TRACKING_URI)"
return
fi
echo ""
echo "Step 2: Deploying MLflow (namespace: $MLFLOW_NS, image: $MLFLOW_IMG)..."
if ! kubectl get namespace "$MLFLOW_NS" &>/dev/null; then
kubectl create namespace "$MLFLOW_NS"
fi
kubectl apply -f "$SCRIPT_DIR/mlflow/pvc.yaml" -n "$MLFLOW_NS"
kubectl apply -f "$SCRIPT_DIR/mlflow/service.yaml" -n "$MLFLOW_NS"
if [[ "$MLFLOW_IMG" != "ghcr.io/mlflow/mlflow:v2.21.3" ]]; then
kubectl apply -f "$SCRIPT_DIR/mlflow/deployment.yaml" -n "$MLFLOW_NS"
kubectl set image "deploy/mlflow" "mlflow=$MLFLOW_IMG" -n "$MLFLOW_NS"
else
kubectl apply -f "$SCRIPT_DIR/mlflow/deployment.yaml" -n "$MLFLOW_NS"
fi
echo "Waiting for MLflow rollout..."
kubectl rollout status deploy/mlflow -n "$MLFLOW_NS" --timeout=120s || \
echo " (rollout still in progress)"
MLFLOW_TRACKING_URI="http://mlflow-service.${MLFLOW_NS}.svc.cluster.local:5000"
echo "MLflow deployed: $MLFLOW_TRACKING_URI"
}
# ---------------------------------------------------------------------------
# Step 3: Add clawbench sidecar (starts eval)
# ---------------------------------------------------------------------------
add_sidecar() {
echo ""
echo "Step 3: Adding clawbench eval sidecar..."
echo "Applying clawbench ConfigMap..."
kubectl apply -f "$SCRIPT_DIR/manifests/configmap.yaml" -n "$NS" >/dev/null
if [[ -n "${CLAWBENCH_MODEL:-}" ]]; then
kubectl patch configmap clawbench-config -n "$NS" \
--type merge -p "{\"data\":{\"CLAWBENCH_MODEL\":\"$CLAWBENCH_MODEL\"}}" >/dev/null
echo " Model: $CLAWBENCH_MODEL"
fi
if [[ -n "${OPENAI_API_BASE:-}" ]]; then
kubectl patch configmap clawbench-config -n "$NS" \
--type merge -p "{\"data\":{\"OPENAI_API_BASE\":\"$OPENAI_API_BASE\"}}" >/dev/null
echo " OpenAI API base: $OPENAI_API_BASE"
fi
# Patch MLflow settings into ConfigMap
PATCH_DATA=""
MLFLOW_URI="${MLFLOW_TRACKING_URI:-http://mlflow-service.${MLFLOW_NS}.svc.cluster.local:5000}"
PATCH_DATA="\"MLFLOW_TRACKING_URI\":\"$MLFLOW_URI\""
if [[ -n "${MLFLOW_EXPERIMENT_ID:-}" ]]; then
PATCH_DATA="$PATCH_DATA,\"MLFLOW_EXPERIMENT_ID\":\"$MLFLOW_EXPERIMENT_ID\""
fi
if [[ -n "${MLFLOW_EXPERIMENT_NAME:-}" ]]; then
PATCH_DATA="$PATCH_DATA,\"MLFLOW_EXPERIMENT_NAME\":\"$MLFLOW_EXPERIMENT_NAME\""
fi
kubectl patch configmap clawbench-config -n "$NS" \
--type merge -p "{\"data\":{$PATCH_DATA}}" >/dev/null
echo " MLflow URI: $MLFLOW_URI"
[[ -n "${MLFLOW_EXPERIMENT_ID:-}" ]] && echo " MLflow experiment ID: $MLFLOW_EXPERIMENT_ID"
[[ -n "${MLFLOW_EXPERIMENT_NAME:-}" ]] && echo " MLflow experiment name: $MLFLOW_EXPERIMENT_NAME"
# Check if sidecar already exists
HAS_SIDECAR=$(kubectl get deploy/openclaw -n "$NS" -o json \
| python3 -c "import json,sys; cs=json.load(sys.stdin)['spec']['template']['spec']['containers']; print('yes' if any(c['name']=='clawbench' for c in cs) else 'no')")
if [[ "$HAS_SIDECAR" == "yes" ]]; then
echo "Removing existing clawbench sidecar..."
INDEX=$(kubectl get deploy/openclaw -n "$NS" -o json \
| python3 -c "import json,sys; cs=json.load(sys.stdin)['spec']['template']['spec']['containers']; print(next(i for i,c in enumerate(cs) if c['name']=='clawbench'))")
kubectl patch deploy/openclaw -n "$NS" --type=json \
-p "[{\"op\":\"remove\",\"path\":\"/spec/template/spec/containers/$INDEX\"}]" >/dev/null
fi
# Find the OpenClaw home volume, and capture existing volumes so add-sidecar
# also works with bring-your-own deployments that lack this repo's PVC layout.
VOLUME_INFO=$(kubectl get deploy/openclaw -n "$NS" -o json \
| python3 -c "
import json, sys
spec = json.load(sys.stdin)['spec']['template']['spec']
volume_names = [v.get('name') for v in spec.get('volumes', []) if v.get('name')]
home_volume = 'openclaw-home'
for c in spec['containers']:
if c['name'] == 'gateway':
for vm in c.get('volumeMounts', []):
if vm['mountPath'] == '/home/node/.openclaw':
home_volume = vm['name']
break
print(json.dumps({
'home_volume': home_volume,
'volumes_present': 'volumes' in spec,
'volume_names': volume_names,
}))
")
echo "Adding clawbench sidecar (image: $CLAWBENCH_IMG)..."
PATCH=$(VOLUME_INFO="$VOLUME_INFO" CLAWBENCH_IMG="$CLAWBENCH_IMG" python3 - <<'PY'
import json
import os
info = json.loads(os.environ["VOLUME_INFO"])
home_volume = info["home_volume"]
command = r"""echo "Waiting for gateway on localhost:18789..."
for i in $(seq 1 90); do
python3 -c "import socket; s=socket.create_connection((\"127.0.0.1\",18789),2); s.close()" 2>/dev/null && echo "Gateway ready" && break
sleep 2
done
if [ -n "${MLFLOW_TRACKING_URI:-}" ]; then
echo "Checking MLflow at ${MLFLOW_TRACKING_URI}..."
python3 -c "import httpx,os; r=httpx.get(os.environ[\"MLFLOW_TRACKING_URI\"]+\"/health\"); print(\"MLflow OK:\",r.status_code)" 2>&1 || echo "MLflow pre-check failed (will retry at log time)"
fi
echo "Starting eval..."
clawbench run \
--model "${CLAWBENCH_MODEL}" \
--gateway-token "${OPENCLAW_GATEWAY_TOKEN}" \
--runs "${CLAWBENCH_RUNS}" \
--concurrency "${CLAWBENCH_CONCURRENCY}" \
${CLAWBENCH_JUDGE_MODEL:+--judge-model "${CLAWBENCH_JUDGE_MODEL}"} \
$([ -n "${CLAWBENCH_TASKS:-}" ] && for t in ${CLAWBENCH_TASKS}; do printf -- "-t %s " "$t"; done) \
-o /results/benchmark.json
RC=$?
if [ $RC -eq 0 ] && [ -n "${MLFLOW_TRACKING_URI:-}" ]; then
python scripts/log_to_mlflow.py /results/benchmark.json
fi
echo "ClawBench finished (exit=$RC)"
sleep infinity"""
container = {
"name": "clawbench",
"image": os.environ["CLAWBENCH_IMG"],
"imagePullPolicy": "IfNotPresent",
"command": ["/bin/bash", "-c", command],
"envFrom": [{"configMapRef": {"name": "clawbench-config"}}],
"env": [
{
"name": "OPENCLAW_GATEWAY_TOKEN",
"valueFrom": {
"secretKeyRef": {
"name": "clawbench-secrets",
"key": "OPENCLAW_GATEWAY_TOKEN",
}
},
}
],
"resources": {
"requests": {"memory": "1Gi", "cpu": "500m"},
"limits": {"memory": "4Gi", "cpu": "2"},
},
"volumeMounts": [
{"name": home_volume, "mountPath": "/home/node/.openclaw"},
{"name": "clawbench-results", "mountPath": "/results"},
{"name": "tmp-volume", "mountPath": "/tmp"},
],
"securityContext": {
"allowPrivilegeEscalation": False,
"capabilities": {"drop": ["ALL"]},
},
}
patch = [{"op": "add", "path": "/spec/template/spec/containers/-", "value": container}]
existing_volumes = set(info["volume_names"])
required_volumes = [
{"name": home_volume, "emptyDir": {}},
{"name": "clawbench-results", "emptyDir": {}},
{"name": "tmp-volume", "emptyDir": {}},
]
missing_volumes = []
for volume in required_volumes:
if volume["name"] not in existing_volumes and volume["name"] not in {
item["name"] for item in missing_volumes
}:
missing_volumes.append(volume)
if missing_volumes:
if info["volumes_present"]:
patch.extend(
{"op": "add", "path": "/spec/template/spec/volumes/-", "value": volume}
for volume in missing_volumes
)
else:
patch.append(
{"op": "add", "path": "/spec/template/spec/volumes", "value": missing_volumes}
)
print(json.dumps(patch))
PY
)
kubectl patch deploy/openclaw -n "$NS" --type=json -p "$PATCH" >/dev/null
echo ""
echo "Waiting for rollout..."
kubectl rollout status deploy/openclaw -n "$NS" --timeout=300s 2>/dev/null || \
echo " (rollout timeout — eval runs for 30-60 min)"
echo ""
echo "Eval is running. Follow logs with:"
echo " ./scripts/k8s/deploy.sh --logs"
echo ""
echo "When finished, remove the sidecar with:"
echo " ./scripts/k8s/deploy.sh --remove-sidecar"
}
# ---------------------------------------------------------------------------
# Execute
# ---------------------------------------------------------------------------
case "$MODE" in
full)
ensure_namespace_and_secret
deploy_openclaw
deploy_mlflow
add_sidecar
;;
openclaw-only)
ensure_namespace_and_secret
deploy_openclaw
echo ""
echo "OpenClaw is running. Next steps:"
echo " ./scripts/k8s/deploy.sh --mlflow-only # Deploy MLflow"
echo " ./scripts/k8s/deploy.sh --add-sidecar # Start eval"
;;
mlflow-only)
deploy_mlflow
;;
add-sidecar)
if ! kubectl get deploy/openclaw -n "$NS" &>/dev/null; then
echo "Deployment 'openclaw' not found in namespace '$NS'." >&2
echo "Deploy OpenClaw first with: ./scripts/k8s/deploy.sh --openclaw-only" >&2
exit 1
fi
ensure_namespace_and_secret
add_sidecar
;;
esac

View File

@ -0,0 +1,18 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: clawbench-config
labels:
app: clawbench
data:
CLAWBENCH_MODEL: "openai/gpt-5.5"
OPENAI_API_BASE: ""
CLAWBENCH_RUNS: "3"
CLAWBENCH_CONCURRENCY: "4"
CLAWBENCH_JUDGE_MODEL: ""
CLAWBENCH_TASKS: ""
CLAWBENCH_CONNECT_TIMEOUT: "120"
CLAWBENCH_REQUEST_TIMEOUT: "300"
CLAWBENCH_PER_RUN_BUDGET_SECONDS: "600"
MLFLOW_TRACKING_URI: "http://mlflow-service.mlflow.svc.cluster.local:5000"
MLFLOW_EXPERIMENT_NAME: "clawbench"

View File

@ -0,0 +1,15 @@
# Reference template — do NOT apply directly.
# The deploy script (scripts/k8s/deploy.sh) creates this secret automatically
# from exported environment variables (OPENAI_API_KEY, etc.).
apiVersion: v1
kind: Secret
metadata:
name: clawbench-secrets
labels:
app: clawbench
type: Opaque
stringData:
OPENAI_API_KEY: "REPLACE_ME"
# Add other provider keys as needed:
# ANTHROPIC_API_KEY: "REPLACE_ME"
# OPENROUTER_API_KEY: "REPLACE_ME"

View File

@ -0,0 +1,68 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow
labels:
app: mlflow
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: mlflow
template:
metadata:
labels:
app: mlflow
spec:
containers:
- name: mlflow
image: ghcr.io/mlflow/mlflow:v2.21.3
command:
- mlflow
- server
- --host
- "0.0.0.0"
- --port
- "5000"
- --backend-store-uri
- sqlite:///mlflow/mlflow.db
- --default-artifact-root
- /mlflow/artifacts
- --serve-artifacts
ports:
- name: http
containerPort: 5000
protocol: TCP
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 15
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 1Gi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumeMounts:
- name: mlflow-data
mountPath: /mlflow
volumes:
- name: mlflow-data
persistentVolumeClaim:
claimName: mlflow-data-pvc

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mlflow-data-pvc
labels:
app: mlflow
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: mlflow-service
labels:
app: mlflow
spec:
type: ClusterIP
selector:
app: mlflow
ports:
- name: http
port: 5000
targetPort: 5000
protocol: TCP

View File

@ -0,0 +1,36 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: openclaw-config
labels:
app: openclaw
data:
openclaw.json: |
{
"gateway": {
"mode": "local",
"bind": "loopback",
"port": 18789,
"auth": {
"mode": "token"
}
},
"browser": {
"enabled": true,
"headless": true,
"noSandbox": true,
"ssrfPolicy": {
"allowedHostnames": ["localhost", "127.0.0.1"]
}
},
"tools": {
"profile": "coding",
"alsoAllow": ["browser"]
},
"agents": {
"defaults": {
"workspace": "~/.openclaw/workspace"
}
},
"cron": { "enabled": false }
}

View File

@ -0,0 +1,146 @@
# OpenClaw gateway deployment for ClawBench evals.
#
# Build the image with browser support:
# docker build --build-arg OPENCLAW_INSTALL_BROWSER=1 \
# -t quay.io/yourorg/openclaw:eval .
#
# Or use upstream without browser (browser eval tasks will score 0):
# image: ghcr.io/openclaw/openclaw:latest
apiVersion: apps/v1
kind: Deployment
metadata:
name: openclaw
labels:
app: openclaw
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: openclaw
template:
metadata:
labels:
app: openclaw
spec:
initContainers:
- name: init-config
image: registry.access.redhat.com/ubi9-minimal:latest
command:
- sh
- -c
- |
cp /config/openclaw.json /home/node/.openclaw/openclaw.json
chmod 666 /home/node/.openclaw/openclaw.json
mkdir -p /home/node/.openclaw/workspace
mkdir -p /home/node/.openclaw/agents
chmod 777 /home/node/.openclaw /home/node/.openclaw/workspace /home/node/.openclaw/agents
echo "Config initialized"
volumeMounts:
- name: openclaw-home
mountPath: /home/node/.openclaw
- name: config-template
mountPath: /config
resources:
limits:
cpu: 200m
memory: 128Mi
requests:
cpu: 50m
memory: 64Mi
containers:
- name: gateway
image: ghcr.io/openclaw/openclaw:latest
imagePullPolicy: IfNotPresent
command:
- sh
- -c
- umask 007 && exec node dist/index.js gateway run --bind loopback --port 18789 --allow-unconfigured
env:
- name: HOME
value: /home/node
- name: NODE_ENV
value: production
- name: OPENCLAW_CONFIG_DIR
value: /home/node/.openclaw
- name: OPENCLAW_STATE_DIR
value: /home/node/.openclaw
- name: OPENCLAW_GATEWAY_TOKEN
valueFrom:
secretKeyRef:
name: clawbench-secrets
key: OPENCLAW_GATEWAY_TOKEN
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: clawbench-secrets
key: OPENAI_API_KEY
optional: true
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: clawbench-secrets
key: ANTHROPIC_API_KEY
optional: true
- name: OPENROUTER_API_KEY
valueFrom:
secretKeyRef:
name: clawbench-secrets
key: OPENROUTER_API_KEY
optional: true
- name: GEMINI_API_KEY
valueFrom:
secretKeyRef:
name: clawbench-secrets
key: GEMINI_API_KEY
optional: true
ports:
- name: gateway
containerPort: 18789
protocol: TCP
livenessProbe:
exec:
command:
- node
- -e
- "require('http').get('http://127.0.0.1:18789/',r=>process.exit(r.statusCode<400?0:1)).on('error',()=>process.exit(1))"
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
exec:
command:
- node
- -e
- "require('http').get('http://127.0.0.1:18789/',r=>process.exit(r.statusCode<400?0:1)).on('error',()=>process.exit(1))"
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
resources:
requests:
cpu: 250m
memory: 1Gi
limits:
cpu: "2"
memory: 4Gi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumeMounts:
- name: openclaw-home
mountPath: /home/node/.openclaw
- name: tmp-volume
mountPath: /tmp
terminationGracePeriodSeconds: 30
volumes:
- name: openclaw-home
persistentVolumeClaim:
claimName: openclaw-home-pvc
- name: config-template
configMap:
name: openclaw-config
- name: tmp-volume
emptyDir: {}

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: openclaw-home-pvc
labels:
app: openclaw
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi

View File

@ -0,0 +1,17 @@
# Reference template — do NOT apply directly.
# The deploy script (scripts/k8s/deploy.sh) creates this secret automatically
# from exported environment variables (OPENAI_API_KEY, etc.).
apiVersion: v1
kind: Secret
metadata:
name: clawbench-secrets
labels:
app: openclaw
type: Opaque
stringData:
OPENCLAW_GATEWAY_TOKEN: "REPLACE_ME"
OPENAI_API_KEY: "REPLACE_ME"
# Add other provider keys as needed:
# ANTHROPIC_API_KEY: "REPLACE_ME"
# OPENROUTER_API_KEY: "REPLACE_ME"
# GEMINI_API_KEY: "REPLACE_ME"

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: openclaw
labels:
app: openclaw
spec:
type: ClusterIP
selector:
app: openclaw
ports:
- name: gateway
port: 18789
targetPort: 18789
protocol: TCP

125
scripts/log_to_mlflow.py Normal file
View File

@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Log a ClawBench BenchmarkResult to MLflow.
Standalone script -- not imported by the clawbench package.
Requires: pip install mlflow (or pip install clawbench[mlflow])
Usage:
python scripts/log_to_mlflow.py /results/benchmark.json
Environment:
MLFLOW_TRACKING_URI MLflow tracking server (default: http://localhost:5000)
MLFLOW_EXPERIMENT_NAME Experiment name (default: clawbench)
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def main(result_path: str) -> None:
try:
import mlflow
except ImportError:
print(
"mlflow is not installed. Install with: pip install mlflow"
" (or pip install clawbench[mlflow])",
file=sys.stderr,
)
sys.exit(1)
from clawbench.schemas import BenchmarkResult
with open(result_path, encoding="utf-8") as f:
result = BenchmarkResult(**json.load(f))
experiment_id = os.environ.get("MLFLOW_EXPERIMENT_ID")
if experiment_id:
experiment = mlflow.set_experiment(experiment_id=experiment_id)
else:
experiment = mlflow.set_experiment(os.environ.get("MLFLOW_EXPERIMENT_NAME", "clawbench"))
run_name = f"{result.model}-{result.submission_id[:8]}"
with mlflow.start_run(run_name=run_name):
mlflow.log_params(
{
"model": result.model,
"provider": result.provider,
"benchmark_version": result.benchmark_version,
"openclaw_version": result.openclaw_version or "unknown",
"judge_model": result.judge_model or "none",
"task_snapshot_fingerprint": result.task_snapshot_fingerprint or "unknown",
}
)
mlflow.log_metrics(
{
"overall_score": result.overall_score,
"overall_completion": result.overall_completion,
"overall_trajectory": result.overall_trajectory,
"overall_behavior": result.overall_behavior,
"overall_reliability": result.overall_reliability,
"overall_pass_hat_k": result.overall_pass_hat_k,
"overall_judge_score": result.overall_judge_score,
"overall_judge_confidence": result.overall_judge_confidence,
"overall_judge_pass_rate": result.overall_judge_pass_rate,
"judge_task_coverage": result.judge_task_coverage,
"overall_weighted_query_score": result.overall_weighted_query_score,
"overall_median_latency_ms": result.overall_median_latency_ms,
"overall_p95_latency_ms": result.overall_p95_latency_ms,
"overall_total_tokens": result.overall_total_tokens,
"overall_cost_usd": result.overall_cost_usd,
"overall_tokens_per_pass": result.overall_tokens_per_pass,
"overall_cost_per_pass": result.overall_cost_per_pass,
"overall_ci_lower": result.overall_ci_lower,
"overall_ci_upper": result.overall_ci_upper,
}
)
for tier in result.tier_results:
mlflow.log_metrics(
{
f"{tier.tier}/score": tier.mean_task_score,
f"{tier.tier}/completion": tier.mean_completion,
f"{tier.tier}/trajectory": tier.mean_trajectory,
f"{tier.tier}/behavior": tier.mean_behavior,
f"{tier.tier}/reliability": tier.mean_reliability,
}
)
for i, task in enumerate(result.task_results):
mlflow.log_metrics(
{
f"task/{task.task_id}/score": task.mean_task_score,
f"task/{task.task_id}/reliability": task.reliability_score,
},
step=i,
)
mlflow.set_tags(
{
"submission_id": result.submission_id,
"timestamp": result.timestamp,
"certified": str(result.certified),
}
)
try:
mlflow.log_artifact(result_path)
except Exception as e:
print(f"Warning: artifact upload failed: {e}", file=sys.stderr)
print("Metrics and params were logged successfully.", file=sys.stderr)
print(f"Logged to MLflow: experiment={experiment.name} run={run_name}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <result.json>", file=sys.stderr)
sys.exit(1)
main(sys.argv[1])

View File

@ -5,13 +5,23 @@ from __future__ import annotations
import os
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from urllib.parse import unquote, urlsplit
ROOT = Path(__file__).parent / "articles"
ARTICLES = {path.stem: path for path in ROOT.glob("*.html") if path.is_file()}
def article_for_request_path(request_path: str) -> Path | None:
path = unquote(urlsplit(request_path).path)
if not path.startswith("/article/"):
return None
slug = path.removeprefix("/article/")
return ARTICLES.get(slug)
class Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None: # noqa: N802
path = self.path.split("?")[0]
path = unquote(urlsplit(self.path).path)
if path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
@ -22,9 +32,8 @@ class Handler(BaseHTTPRequestHandler):
self._index()
return
if path.startswith("/article/"):
slug = path.split("/", 2)[2]
article = ROOT / f"{slug}.html"
if article.exists():
article = article_for_request_path(self.path)
if article is not None:
self._html(article.read_bytes())
return
self.send_response(404)
@ -33,8 +42,7 @@ class Handler(BaseHTTPRequestHandler):
def _index(self) -> None:
items = []
for f in sorted(ROOT.glob("*.html")):
slug = f.stem
for slug in sorted(ARTICLES):
items.append(f'<li><a href="/article/{slug}">{slug}</a></li>')
body = (
"<!doctype html><html><body>"

122
tests/test_ablation.py Normal file
View File

@ -0,0 +1,122 @@
from clawbench.ablation import (
common_compatible_task_set,
compare_results,
default_tool_profile,
)
from clawbench.adapters.hermes import HermesAdapterConfig
from clawbench.schemas import (
BenchmarkResult,
CompletionSpec,
FileState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskStats,
Tier,
UserTurn,
)
def _task(task_id: str) -> TaskDefinition:
return TaskDefinition(
id=task_id,
name=task_id,
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
user=SimulatedUser(turns=[UserTurn(message="write out.txt")]),
completion=CompletionSpec(files=[FileState(path="out.txt")]),
)
def test_tool_profile_fingerprint_is_stable() -> None:
config = HermesAdapterConfig(driver_mode="ai_agent", enabled_toolsets=["hermes-api-server"])
a = default_tool_profile(adapter="hermes", config=config, enabled_toolsets=["hermes-api-server"])
b = default_tool_profile(adapter="hermes", config=config, enabled_toolsets=["hermes-api-server"])
assert a.fingerprint == b.fingerprint
assert "browser" in a.interfaces
assert "multi_turn" in a.interfaces
def test_common_compatible_task_set_uses_effective_adapter_config() -> None:
tasks = [_task("a"), _task("b")]
plan = common_compatible_task_set(
tasks,
{
"openclaw": ("openclaw", None),
"hermes": ("hermes", HermesAdapterConfig(driver_mode="ai_agent")),
},
)
assert plan.task_ids == ["a", "b"]
assert plan.skipped == {}
def _result(label: str, model: str, task_ids: list[str], score: float) -> BenchmarkResult:
task_results = [
TaskStats(
task_id=task_id,
tier="tier1",
family="coding",
runs=1,
mean_completion_score=1.0,
mean_trajectory_score=1.0,
mean_behavior_score=1.0,
mean_run_score=score,
reliability_score=1.0,
variance_score=1.0,
mean_task_score=score,
stddev=0.0,
min_score=score,
max_score=score,
pass_at_1=True,
pass_rate=1.0,
pass_hat_k=True,
)
for task_id in task_ids
]
return BenchmarkResult(
submission_id=label,
model=model,
provider="test",
timestamp="2026-04-25T00:00:00Z",
overall_score=score,
overall_completion=1.0,
overall_trajectory=1.0,
overall_behavior=1.0,
overall_reliability=1.0,
overall_ci_lower=score,
overall_ci_upper=score,
overall_pass_hat_k=1.0,
task_results=task_results,
)
def test_compare_results_rejects_different_task_sets() -> None:
comparison = compare_results(
{
"a": _result("a", "m", ["t1", "t2"], 0.8),
"b": _result("b", "m", ["t1"], 0.9),
}
)
assert comparison["fair"] is False
assert comparison["task_verifier_fair"] is False
assert comparison["controlled_ablation"] is False
assert comparison["same_model"] is True
assert comparison["same_task_set"] is False
def test_compare_results_allows_cross_model_same_task_leaderboard() -> None:
a = _result("a", "model-a", ["t1", "t2"], 0.8)
b = _result("b", "model-b", ["t1", "t2"], 0.9)
a.task_snapshot_fingerprint = "snapshot-1"
b.task_snapshot_fingerprint = "snapshot-1"
comparison = compare_results({"a": a, "b": b})
assert comparison["fair"] is True
assert comparison["task_verifier_fair"] is True
assert comparison["controlled_ablation"] is False
assert comparison["same_model"] is False

222
tests/test_adapter_base.py Normal file
View File

@ -0,0 +1,222 @@
"""Tests for `clawbench.adapters.base` + registry.
Keeps the adapter ABC and registration helpers honest before any
concrete adapter lands. A parametrized contract test in
`test_adapter_contract.py` will exercise the ABC against every shipped
adapter later.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from clawbench.adapters import (
ADAPTERS,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
get_adapter,
register_adapter,
)
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Minimal adapter for contract verification.
# ---------------------------------------------------------------------------
class _EchoAdapter(AgentAdapter):
name = "echo-test-adapter"
capabilities = {AdapterCapability.FILES, AdapterCapability.EXECUTION}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover - trivial
return None
async def run_phase(
self, phase: CanonicalPhase, ctx: AdapterContext
) -> PhaseResult:
return PhaseResult(messages=[], adapter_metadata={"phase": phase.name})
async def verify_state_query(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
if query.required_capability in self.capabilities:
return StateQueryResult(ok=True, detail="echo-adapter-always-ok")
return StateQueryResult(
ok=False,
detail=f"echo adapter does not provide {query.required_capability.value}",
capability_missing=True,
)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover - trivial
return None
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
def test_register_adapter_adds_to_registry_and_get_adapter_resolves() -> None:
original = dict(ADAPTERS)
try:
register_adapter(_EchoAdapter)
assert ADAPTERS["echo-test-adapter"] is _EchoAdapter
assert get_adapter("echo-test-adapter") is _EchoAdapter
finally:
ADAPTERS.clear()
ADAPTERS.update(original)
def test_register_adapter_rejects_duplicate_name() -> None:
class _OtherEcho(AgentAdapter):
name = "echo-test-adapter"
capabilities = {AdapterCapability.FILES}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
async def run_phase(self, phase, ctx) -> PhaseResult: # pragma: no cover
return PhaseResult()
async def verify_state_query(self, query, ctx) -> StateQueryResult: # pragma: no cover
return StateQueryResult(ok=False, capability_missing=True)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
original = dict(ADAPTERS)
try:
register_adapter(_EchoAdapter)
with pytest.raises(ValueError):
register_adapter(_OtherEcho)
finally:
ADAPTERS.clear()
ADAPTERS.update(original)
def test_register_adapter_requires_name() -> None:
class _Nameless(AgentAdapter):
capabilities = {AdapterCapability.FILES}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
async def run_phase(self, phase, ctx) -> PhaseResult: # pragma: no cover
return PhaseResult()
async def verify_state_query(self, query, ctx) -> StateQueryResult: # pragma: no cover
return StateQueryResult(ok=False, capability_missing=True)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
with pytest.raises(ValueError):
register_adapter(_Nameless)
def test_get_adapter_raises_for_unknown_name() -> None:
with pytest.raises(KeyError):
get_adapter("no-such-adapter-exists")
# ---------------------------------------------------------------------------
# Capability gating helpers
# ---------------------------------------------------------------------------
def _file_task() -> CanonicalTask:
task = TaskDefinition(
id="capability-test",
name="capability test",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(),
user=SimulatedUser(
max_turns=1, turns=[UserTurn(message="Do a thing.")]
),
completion=CompletionSpec(
files=[FileState(path="out.txt", exists=True)],
execution_checks=[ExecutionCheck(name="ok", command="true")],
),
)
return from_task_definition(task)
def test_supports_is_true_when_capabilities_cover_task() -> None:
task = _file_task()
assert _EchoAdapter.supports(task)
assert _EchoAdapter.missing_capabilities_for(task) == set()
def test_supports_is_false_when_task_needs_more() -> None:
task = _file_task()
task = task.model_copy(
update={
"required_adapter_capabilities": (
task.required_adapter_capabilities | {AdapterCapability.MEMORY}
)
}
)
assert not _EchoAdapter.supports(task)
assert _EchoAdapter.missing_capabilities_for(task) == {AdapterCapability.MEMORY}
# ---------------------------------------------------------------------------
# Context roundtrip (sanity: adapter methods can build and return
# PhaseResult / StateQueryResult without tripping dataclass defaults)
# ---------------------------------------------------------------------------
def test_adapter_phase_result_round_trip(tmp_path: Path) -> None:
task = _file_task()
adapter = _EchoAdapter()
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="test-model",
transcript=Transcript(),
)
import asyncio
async def _go() -> None:
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
assert isinstance(result, PhaseResult)
assert result.adapter_metadata == {"phase": task.phases[0].name}
query = StateQuery(
kind="memory",
required_capability=AdapterCapability.MEMORY,
selector={"key_pattern": "x"},
)
res = await adapter.verify_state_query(query, ctx)
assert res.capability_missing is True
await adapter.teardown(ctx)
asyncio.run(_go())

View File

@ -20,6 +20,46 @@ def test_testbox_workflow_hydrates_secrets_and_dotfiles():
assert "CLAWBENCH_CODEX_AUTH_JSON" in workflow
def test_crabbox_config_uses_actions_hydration():
config = Path(".crabbox.yaml").read_text(encoding="utf-8")
assert "profile: clawbench-check" in config
assert "provider: aws" in config
assert "workflow: .github/workflows/crabbox-hydrate.yml" in config
assert "job: hydrate" in config
assert "baseRef: main" in config
assert "- clawbench" in config
assert "- CLAWBENCH_*" in config
assert "- OPENCLAW_*" in config
def test_crabbox_workflow_hydrates_secrets_dotfiles_and_ready_marker():
workflow = Path(".github/workflows/crabbox-hydrate.yml").read_text(encoding="utf-8")
assert "crabbox_id:" in workflow
assert "crabbox_runner_label:" in workflow
assert 'runs-on: [self-hosted, "${{ inputs.crabbox_runner_label }}"]' in workflow
assert "actions/setup-python@v5" in workflow
assert "python -m pip install -e ." in workflow
assert "scripts/ci-hydrate-testbox-env.sh" in workflow
assert "HF_TOKEN" in workflow
assert "OPENCLAW_CODEX_AUTH_JSON" in workflow
assert "CLAWBENCH_CODEX_AUTH_JSON" in workflow
assert "/usr/local/bin/clawbench-testbox-env" in workflow
assert "$HOME/.crabbox/actions/${{ inputs.crabbox_id }}.env" in workflow
assert "crabbox_keep_alive_minutes" in workflow
def test_crabbox_skill_documents_clawbench_flow():
skill = Path(".agents/skills/crabbox/SKILL.md").read_text(encoding="utf-8")
assert "openclaw/crabbox" in skill
assert ".crabbox.yaml" in skill
assert "crabbox actions hydrate" in skill
assert "clawbench-testbox-env" in skill
assert ".github/workflows/crabbox-hydrate.yml" in skill
def test_testbox_helper_sources_hydrated_profile():
script = Path("scripts/ci-hydrate-testbox-env.sh").read_text(encoding="utf-8")

View File

@ -0,0 +1,268 @@
"""Tests for `clawbench.canonical.convert.from_task_definition`.
Covers the three representative task shapes:
1. A files + execution-only task (tier-1 bugfix) must produce
`required_adapter_capabilities == {FILES, EXECUTION}`.
2. A memory-using, multi-phase task (tier-2 memory roundtrip) must
include `MEMORY` and MULTI_TURN_INJECTION is NOT set since each
phase's user has exactly one static turn.
3. A synthetic task exercising gateway_assertions, session, cron, and
browser must surface each capability.
The tests also round-trip the real task corpus through the converter
to make sure every live YAML file produces a valid `CanonicalTask`
(no missing-field or validation errors), since the converter is how
every downstream adapter will see tasks.
"""
from __future__ import annotations
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
from_task_definition,
)
from clawbench.schemas import (
BackgroundService,
CompletionSpec,
CronState,
ExecutionCheck,
FileState,
GatewayAssertion,
MemoryState,
SessionState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
UserTurn,
)
from clawbench.tasks import load_all_tasks
# ---------------------------------------------------------------------------
# Fixture builders
# ---------------------------------------------------------------------------
def _files_only_task() -> TaskDefinition:
return TaskDefinition(
id="test-files-only",
name="Files-only task",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(asset_packs=["pack_a"]),
user=SimulatedUser(
max_turns=2,
turns=[UserTurn(message="Fix the bug and run the tests.")],
),
completion=CompletionSpec(
files=[FileState(path="src/main.py", exists=True)],
execution_checks=[ExecutionCheck(name="tests", command="pytest -q")],
),
)
def _memory_task() -> TaskDefinition:
return TaskDefinition(
id="test-memory-roundtrip",
name="Memory roundtrip",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
memory_seed=[{"key": "existing_key", "value": "existing_value"}],
),
phases=[
{
"name": "store",
"user": SimulatedUser(
max_turns=1,
turns=[UserTurn(message="Remember: stack = React, Node, Postgres.")],
),
},
{
"name": "recall",
"user": SimulatedUser(
max_turns=1,
turns=[UserTurn(message="What's my stack?")],
),
},
],
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
),
)
def _full_surface_task() -> TaskDefinition:
# Synthetic task exercising session, cron, gateway_assertion, browser,
# and a dynamic follow-up turn.
return TaskDefinition(
id="test-full-surface",
name="Full surface",
tier=Tier.TIER3,
family=TaskFamily.BROWSER,
surface="browser",
setup=TaskSetup(
pre_check_gateway=[
GatewayAssertion(
method="agents.list",
assert_path="$.count",
assert_equals=0,
),
],
background_services=[
BackgroundService(
name="echo-service",
command="python3 -m http.server",
port=0,
ready_path="/",
),
],
),
user=SimulatedUser(
max_turns=4,
turns=[
UserTurn(message="Start the task."),
UserTurn(
message="Try again.",
when_tool_family="browser",
when_last_tool_failed=True,
),
],
),
completion=CompletionSpec(
session=SessionState(should_exist=True, model_should_be="claude-opus-4"),
cron=[CronState(exists=True, description_contains="daily")],
gateway_assertions=[
GatewayAssertion(
method="memory.list",
assert_path="$.count",
assert_equals=1,
),
],
),
)
# ---------------------------------------------------------------------------
# Capability inference
# ---------------------------------------------------------------------------
def test_files_only_task_requires_only_files_and_execution() -> None:
task = _files_only_task()
task.category = "software_engineering"
task.domain = "devtools"
task.functionality = ["bugfix", "test_verification"]
task.trace_distribution = ["read_heavy", "edit_heavy", "execute_heavy"]
task.tool_surface = ["filesystem", "shell"]
task.risk_tags = ["code_regression"]
canonical = from_task_definition(task)
assert isinstance(canonical, CanonicalTask)
assert canonical.required_adapter_capabilities == {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
assert canonical.category == "software_engineering"
assert canonical.domain == "devtools"
assert canonical.functionality == ["bugfix", "test_verification"]
assert canonical.trace_distribution == ["read_heavy", "edit_heavy", "execute_heavy"]
assert canonical.tool_surface == ["filesystem", "shell"]
assert canonical.risk_tags == ["code_regression"]
# Seed state should carry the asset pack through.
assert len(canonical.assets.seed_state) == 1
assert canonical.assets.seed_state[0].kind == "file"
assert canonical.assets.seed_state[0].asset_pack == "pack_a"
# File + execution checks carry over.
assert len(canonical.verifier.file_states) == 1
assert len(canonical.verifier.execution_checks) == 1
assert canonical.verifier.state_queries == []
# One non-dynamic phase → no dynamic-trigger capability.
assert canonical.interaction.uses_dynamic_user_triggers is False
def test_memory_task_requires_memory_capability() -> None:
canonical = from_task_definition(_memory_task())
assert AdapterCapability.MEMORY in canonical.required_adapter_capabilities
# Two phases with a single static turn each → dynamic-trigger is NOT
# required (the simulated user just sends one message per phase).
assert AdapterCapability.MULTI_TURN_INJECTION not in canonical.required_adapter_capabilities
assert canonical.interaction.allow_multi_phase is True
assert len(canonical.phases) == 2
# Memory seed lifted to SeedEntry.
memory_seeds = [s for s in canonical.assets.seed_state if s.kind == "memory"]
assert len(memory_seeds) == 1
assert memory_seeds[0].key == "existing_key"
# Memory completion check → StateQuery with MEMORY capability.
memory_queries = [q for q in canonical.verifier.state_queries if q.kind == "memory"]
assert len(memory_queries) == 1
assert memory_queries[0].required_capability is AdapterCapability.MEMORY
assert memory_queries[0].selector == {"key_pattern": "stack"}
assert memory_queries[0].expected == {"value_contains": ["React"]}
def test_full_surface_task_surfaces_every_capability() -> None:
canonical = from_task_definition(_full_surface_task())
caps = canonical.required_adapter_capabilities
assert AdapterCapability.FILES in caps
assert AdapterCapability.EXECUTION in caps
assert AdapterCapability.SESSION in caps
assert AdapterCapability.CRON in caps
assert AdapterCapability.GATEWAY_RPC in caps
assert AdapterCapability.BROWSER in caps
# Dynamic turn (when_tool_family + when_last_tool_failed) flags MTI.
assert AdapterCapability.MULTI_TURN_INJECTION in caps
# pre_check_gateway survives as a pre-run query.
assert len(canonical.verifier.pre_run_queries) == 1
assert canonical.verifier.pre_run_queries[0].required_capability is AdapterCapability.GATEWAY_RPC
# gateway_assertions route through the verifier state_queries.
gateway_queries = [
q for q in canonical.verifier.state_queries if q.kind == "custom"
]
assert len(gateway_queries) == 1
assert gateway_queries[0].selector["method"] == "memory.list"
# Session state with model constraint surfaces in expected.
session_queries = [q for q in canonical.verifier.state_queries if q.kind == "session"]
assert len(session_queries) == 1
assert session_queries[0].expected == {"model": "claude-opus-4"}
def test_background_services_pass_through_unchanged() -> None:
canonical = from_task_definition(_full_surface_task())
assert len(canonical.assets.background_services) == 1
service = canonical.assets.background_services[0]
assert service.name == "echo-service"
assert service.command == "python3 -m http.server"
# ---------------------------------------------------------------------------
# Whole-corpus smoke
# ---------------------------------------------------------------------------
def test_every_task_in_corpus_converts() -> None:
"""Every shipped task YAML must produce a valid CanonicalTask.
Acts as a regression gate: any new field added to TaskDefinition that
the converter doesn't know about will likely still work (fields it
ignores don't break canonical), but any task using new completion
shapes that the converter can't translate will raise here.
"""
tasks = load_all_tasks()
assert tasks, "expected at least one task in the corpus"
for task in tasks:
canonical = from_task_definition(task)
# Every canonical task must declare FILES + EXECUTION capability.
assert AdapterCapability.FILES in canonical.required_adapter_capabilities
assert AdapterCapability.EXECUTION in canonical.required_adapter_capabilities
# Phases always have at least one entry (normalized_phases fills
# one from `user` when `phases` is absent).
assert canonical.phases, f"{task.id}: canonical phases empty"
# Budgets honour the source timeout.
assert canonical.budgets.timeout_seconds == task.timeout_seconds

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import json
import pytest
from websockets.datastructures import Headers
@ -106,7 +107,7 @@ async def test_gateway_client_retries_transient_drain_errors(monkeypatch: pytest
async def fake_wait_event(self, event_name: str, *, timeout: float):
return {"payload": {"nonce": ""}}
async def fake_rpc(self, method: str, params=None):
async def fake_rpc(self, method: str, params=None, **kwargs):
return {"payload": {"type": "hello-ok", "protocol": 3}}
async def fake_listener(self):
@ -143,7 +144,7 @@ async def test_gateway_client_retries_half_closed_handshake_errors(
async def fake_wait_event(self, event_name: str, *, timeout: float):
return {"payload": {"nonce": ""}}
async def fake_rpc(self, method: str, params=None):
async def fake_rpc(self, method: str, params=None, **kwargs):
return {"payload": {"type": "hello-ok", "protocol": 3}}
async def fake_listener(self):
@ -192,3 +193,104 @@ async def test_send_and_wait_collects_messages_that_arrive_after_final_state():
transcript = await client.send_and_wait(session_key, "hello", timeout=1.0)
assert [message.text for message in transcript.assistant_messages] == ["Late but valid."]
@pytest.mark.asyncio
async def test_rpc_send_failure_cleans_pending_request():
class FailingWebSocket:
async def send(self, payload: str) -> None: # noqa: ARG002
raise ConnectionError("socket closed")
client = GatewayClient(GatewayConfig(request_timeout=0.01))
client._ws = FailingWebSocket() # type: ignore[assignment]
with pytest.raises(ConnectionError, match="socket closed"):
await client._rpc("sessions.create", {"model": "test-model"})
assert client._pending == {}
@pytest.mark.asyncio
async def test_rpc_timeout_cleans_pending_request():
sent_frames: list[dict[str, object]] = []
class SilentWebSocket:
async def send(self, payload: str) -> None:
sent_frames.append(json.loads(payload))
client = GatewayClient(GatewayConfig(request_timeout=0.01))
client._ws = SilentWebSocket() # type: ignore[assignment]
with pytest.raises(TimeoutError, match="RPC sessions.create timed out"):
await client._rpc("sessions.create", {"model": "test-model"})
assert sent_frames[0]["method"] == "sessions.create"
assert client._pending == {}
@pytest.mark.asyncio
async def test_send_and_wait_passes_gateway_timeout_and_waits_for_run():
client = GatewayClient(GatewayConfig(request_timeout=1))
session_key = "session-1"
calls: list[tuple[str, dict | None, dict]] = []
async def fake_rpc(method: str, params=None, **kwargs):
calls.append((method, params, kwargs))
if method == "sessions.send":
return {"ok": True, "payload": {"runId": "run-1"}}
if method == "agent.wait":
return {"ok": True, "payload": {"runId": "run-1", "status": "completed"}}
if method == "sessions.get":
return {
"ok": True,
"payload": {
"messages": [
{
"role": "assistant",
"content": [{"type": "text", "text": "Done."}],
}
]
},
}
return {"ok": True, "payload": {}}
client._rpc = fake_rpc # type: ignore[method-assign]
transcript = await client.send_and_wait(session_key, "hello", timeout=1.5)
send_call = next(call for call in calls if call[0] == "sessions.send")
assert send_call[1] == {
"key": session_key,
"message": "hello",
"idempotencyKey": send_call[1]["idempotencyKey"],
"timeoutMs": 1500,
}
wait_call = next(call for call in calls if call[0] == "agent.wait")
assert wait_call[1] == {"runId": "run-1", "timeoutMs": 1500}
assert wait_call[2]["timeout"] == 11.5
assert [message.text for message in transcript.assistant_messages] == ["Done."]
@pytest.mark.asyncio
async def test_send_and_wait_aborts_run_when_no_terminal_state_arrives():
client = GatewayClient(GatewayConfig(request_timeout=1))
session_key = "session-1"
calls: list[tuple[str, dict | None, dict]] = []
async def fake_rpc(method: str, params=None, **kwargs):
calls.append((method, params, kwargs))
if method == "sessions.send":
return {"ok": True, "payload": {"runId": "run-timeout"}}
if method == "agent.wait":
await asyncio.sleep(60)
if method == "sessions.abort":
return {"ok": True, "payload": {"status": "aborted"}}
if method == "sessions.get":
return {"ok": True, "payload": {"messages": []}}
return {"ok": True, "payload": {}}
client._rpc = fake_rpc # type: ignore[method-assign]
await client.send_and_wait(session_key, "hello", timeout=0.01)
assert ("sessions.abort", {"key": session_key, "runId": "run-timeout"}, {"timeout": 1}) in calls

View File

@ -0,0 +1,98 @@
from pathlib import Path
import pytest
from clawbench.environment_files import run_execution_check, verify_file_state
from clawbench.schemas import ExecutionCheck, FileState
def test_verify_file_state_rejects_paths_outside_workspace(tmp_path: Path):
outside = tmp_path.parent / "outside.txt"
outside.write_text("secret", encoding="utf-8")
ok, reason = verify_file_state(
FileState(path="../outside.txt"),
workspace=tmp_path,
runtime_values={},
)
assert ok is False
assert "escapes workspace" in reason
@pytest.mark.asyncio
async def test_execution_check_supports_cwd_env_and_expected_json_file(tmp_path: Path):
expected = tmp_path / "expected.json"
expected.write_text('{"status": "ok"}', encoding="utf-8")
workdir = tmp_path / "subdir"
workdir.mkdir()
result = await run_execution_check(
ExecutionCheck(
name="json-check",
command=(
"python -c \"import json, os; "
"print(json.dumps({'status': os.environ['CHECK_STATUS']}))\""
),
cwd="subdir",
env={"CHECK_STATUS": "ok"},
expected_json_file="expected.json",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is True
assert result.reason == "OK"
@pytest.mark.asyncio
async def test_execution_check_rejects_cwd_outside_workspace(tmp_path: Path):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-cwd",
command="true",
cwd="../outside",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason
@pytest.mark.asyncio
async def test_execution_check_rejects_expected_stdout_file_outside_workspace(
tmp_path: Path,
):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-expected-stdout",
command="printf secret",
expected_stdout_file="../outside.txt",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason
@pytest.mark.asyncio
async def test_execution_check_rejects_expected_json_file_outside_workspace(
tmp_path: Path,
):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-expected-json",
command="printf '{}'",
expected_json_file="../outside.json",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason

View File

@ -0,0 +1,463 @@
"""Tests for `HermesAdapter` against a stub `MiniSWERunner`.
We don't pull in the real `hermes-agent` package — the adapter is
driven through its `runner_factory` hook, which lets tests plug in a
fixed conversation without any network / subprocess activity.
What's covered:
- The adapter registers under the `"hermes"` name.
- `capabilities` is the minimal `{FILES, EXECUTION}` set.
- `setup` realises memory seed entries as workspace files.
- `run_phase` renders the user turn, calls the stub runner, and
appends the parsed conversation into the shared transcript.
- `verify_state_query` falls back to workspace memory scanning for
memory queries, and returns `capability_missing=True` for other
kinds.
- Task gating: a task that requires MEMORY / SESSION / CRON is NOT
supported by HermesAdapter; a files-only task is.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterContext, StateQueryResult
from clawbench.adapters.hermes import HermesAdapter, HermesAdapterConfig
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
MemoryState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Stub MiniSWERunner
# ---------------------------------------------------------------------------
class _StubRunner:
"""Pretends to be `MiniSWERunner`; returns a canned conversation."""
def __init__(self, *, model: str, cwd: str, **_: object) -> None:
self.model = model
self.cwd = cwd
self.last_prompt: str | None = None
self.calls = 0
self.conversation = {
"conversations": [
{"from": "user", "value": "placeholder — filled per-test"},
{
"from": "assistant",
"value": (
"Running `ls`.\n"
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>'
),
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"main.py"}</tool_response>',
},
],
"completed": True,
"api_calls": 3,
"metadata": {"model": "stub", "env_type": "local"},
}
def run_task(self, prompt: str) -> dict:
self.last_prompt = prompt
self.calls += 1
# Swap the placeholder user value with the real prompt so the
# conversation reflects what the adapter actually sent.
convo = {**self.conversation}
convo["conversations"] = [
{"from": "user", "value": prompt}
if entry.get("from") == "user"
else entry
for entry in convo["conversations"]
]
return convo
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _files_only_task(memory_seed: bool = False) -> CanonicalTask:
setup = (
TaskSetup(memory_seed=[{"key": "stack", "value": "React, Node"}])
if memory_seed
else TaskSetup()
)
return from_task_definition(
TaskDefinition(
id="hermes-files-only",
name="Hermes files-only",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=setup,
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="List the workspace files.")],
),
completion=CompletionSpec(
files=[FileState(path="main.py", exists=True)],
execution_checks=[ExecutionCheck(name="noop", command="true")],
),
)
)
def _memory_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="hermes-memory",
name="Hermes memory",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="remember stack=X")]),
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
),
)
)
def _make_adapter() -> tuple[HermesAdapter, list[_StubRunner]]:
runners: list[_StubRunner] = []
def _factory(**kwargs):
runner = _StubRunner(**kwargs)
runners.append(runner)
return runner
adapter = HermesAdapter(
HermesAdapterConfig(model="stub-model", runner_factory=_factory)
)
return adapter, runners
def _make_ctx(task: CanonicalTask, workspace: Path) -> AdapterContext:
return AdapterContext(
task=task,
workspace=workspace,
runtime_values={},
run_index=0,
model="stub-model",
transcript=Transcript(),
)
# ---------------------------------------------------------------------------
# Registration + capability shape
# ---------------------------------------------------------------------------
def test_hermes_adapter_is_registered() -> None:
cls = get_adapter("hermes")
assert cls is HermesAdapter
def test_hermes_capabilities_are_files_and_execution_only() -> None:
assert HermesAdapter.capabilities == {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
def test_hermes_supports_files_only_task() -> None:
task = _files_only_task()
assert HermesAdapter.supports(task)
def test_hermes_does_not_support_memory_task() -> None:
task = _memory_task()
assert not HermesAdapter.supports(task)
missing = HermesAdapter.missing_capabilities_for(task)
assert AdapterCapability.MEMORY in missing
def test_hermes_full_agent_capabilities_cover_memory_and_dynamic_tasks() -> None:
task = _memory_task()
config = HermesAdapterConfig(model="stub-model", driver_mode="ai_agent")
assert HermesAdapter.supports(task, config)
caps = HermesAdapter.supported_capabilities(config)
assert AdapterCapability.MEMORY in caps
assert AdapterCapability.CRON in caps
assert AdapterCapability.BROWSER in caps
assert AdapterCapability.MULTI_TURN_INJECTION in caps
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def test_setup_realizes_memory_seed_as_workspace_files(tmp_path: Path) -> None:
task = _files_only_task(memory_seed=True)
adapter, _ = _make_adapter()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
seeded = tmp_path / "memory" / "stack.md"
assert seeded.is_file()
assert "React" in seeded.read_text(encoding="utf-8")
def test_run_phase_sends_rendered_prompt_and_parses_conversation(tmp_path: Path) -> None:
task = _files_only_task()
adapter, runners = _make_adapter()
async def _go():
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
return ctx, result
ctx, result = asyncio.run(_go())
# The stub runner saw the rendered user message.
assert runners
assert runners[0].last_prompt == "List the workspace files."
# Conversation parsed into the shared transcript.
assert result.error is None
assert ctx.transcript.tool_call_sequence, "expected tool calls parsed out of Hermes conversation"
first_call = ctx.transcript.tool_call_sequence[0]
assert first_call.name == "bash"
assert first_call.input == {"cmd": "ls"}
assert "main.py" in first_call.output
assert result.adapter_metadata.get("api_calls") == 3
assert result.completed_normally is True
def test_runner_factory_uses_explicit_provider_instead_of_api_key(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
def _factory(**kwargs):
calls.append(kwargs)
return _StubRunner(model=kwargs["model"], cwd=kwargs["cwd"])
adapter = HermesAdapter(
HermesAdapterConfig(
model="stub-model",
provider="openai-codex",
base_url="https://example.invalid/v1",
api_key="secret",
runner_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
assert calls
assert calls[0]["base_url"] is None
assert calls[0]["api_key"] is None
def test_direct_openai_endpoint_strips_provider_prefix_for_hermes(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
def _factory(**kwargs):
calls.append(kwargs)
return _StubRunner(model=kwargs["model"], cwd=kwargs["cwd"])
adapter = HermesAdapter(
HermesAdapterConfig(
model="openai/gpt-5.4",
base_url="https://api.openai.com/v1",
api_key="secret",
runner_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="openai/gpt-5.4",
transcript=Transcript(),
)
await adapter.setup(ctx)
assert ctx.adapter_state["effective_model"] == "gpt-5.4"
asyncio.run(_go())
assert calls
assert calls[0]["model"] == "gpt-5.4"
def test_ai_agent_direct_endpoint_reports_custom_provider(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
class _StubAgent:
pass
def _factory(**kwargs):
calls.append(kwargs)
return _StubAgent()
adapter = HermesAdapter(
HermesAdapterConfig(
model="openai/gpt-5.4",
base_url="https://api.openai.com/v1",
api_key="secret",
driver_mode="ai_agent",
agent_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="openai/gpt-5.4",
transcript=Transcript(),
)
await adapter.setup(ctx)
assert ctx.adapter_state["effective_model"] == "gpt-5.4"
asyncio.run(_go())
assert calls
assert calls[0]["model"] == "gpt-5.4"
assert calls[0]["base_url"] == "https://api.openai.com/v1"
assert calls[0]["api_key"] == "secret"
assert calls[0]["provider"] == "custom"
# ---------------------------------------------------------------------------
# State queries
# ---------------------------------------------------------------------------
def test_memory_query_uses_workspace_fallback(tmp_path: Path) -> None:
task = _memory_task()
adapter, _ = _make_adapter()
# Simulate a prior run that wrote a MEMORY.md into the workspace.
(tmp_path / "MEMORY.md").write_text("stack: React, Node, Postgres", encoding="utf-8")
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
assert result.capability_missing is False
def test_session_query_is_reported_as_capability_missing(tmp_path: Path) -> None:
task = _memory_task()
adapter, _ = _make_adapter()
query = StateQuery(
kind="session",
predicate="exists",
selector={},
expected={},
required_capability=AdapterCapability.SESSION,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.capability_missing is True
assert result.ok is False
# ---------------------------------------------------------------------------
# Timeouts
# ---------------------------------------------------------------------------
def test_run_phase_surfaces_runner_timeout(tmp_path: Path) -> None:
task = _files_only_task()
class _SlowRunner:
def __init__(self, **_: object) -> None:
pass
def run_task(self, prompt: str) -> dict:
import time
time.sleep(5) # will exceed the test's configured timeout
return {"conversations": [], "completed": False, "api_calls": 0}
adapter = HermesAdapter(
HermesAdapterConfig(
model="stub-model",
runner_factory=lambda **kw: _SlowRunner(**kw),
)
)
# Force a short phase timeout so the test stays fast.
task_with_short_timeout = task.model_copy(
update={
"phases": [
task.phases[0].model_copy(update={"timeout_seconds": 1})
]
}
)
async def _go():
async with adapter:
ctx = _make_ctx(task_with_short_timeout, tmp_path)
await adapter.setup(ctx)
return await adapter.run_phase(task_with_short_timeout.phases[0], ctx)
result = asyncio.run(_go())
assert result.error is not None
assert "exceeded" in result.error
assert result.completed_normally is False

193
tests/test_hermes_xml.py Normal file
View File

@ -0,0 +1,193 @@
"""Tests for `clawbench.adapters.hermes_xml.parse_conversation`.
Covers the Hermes conversation shapes we expect from the wild:
- Plain assistant turn with a single tool call + a following tool_response.
- Multiple tool calls in one assistant turn.
- Assistant turn with free-form text + a tool call.
- A malformed tool_call payload parser must recover gracefully
(no raise; surface a best-effort call).
- Name-variant keys (`function`, `parameters`) Hermes-variant models emit.
"""
from __future__ import annotations
from clawbench.adapters.hermes_xml import (
iter_tool_calls_from_conversations,
parse_chat_messages,
parse_conversation,
)
from clawbench.trajectory import annotate_transcript_tool_calls
def _conv(*entries: dict[str, str]) -> dict:
return {"conversations": list(entries), "completed": True, "api_calls": 1}
def test_single_tool_call_with_response() -> None:
convo = _conv(
{"from": "system", "value": "You are a helpful coding agent."},
{"from": "user", "value": "List files."},
{
"from": "assistant",
"value": "I'll run `ls`.\n"
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>',
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"main.py\\nREADME"}</tool_response>',
},
)
transcript = parse_conversation(convo)
calls = transcript.tool_call_sequence
assert len(calls) == 1
assert calls[0].name == "bash"
assert calls[0].input == {"cmd": "ls"}
assert "main.py" in calls[0].output
assert calls[0].success is True
# Assistant text preserved, tool-call body stripped out.
assistant = next(
msg for msg in transcript.messages if msg.role == "assistant"
)
assert "I'll run `ls`." in assistant.text
assert "<tool_call>" not in assistant.text
def test_multiple_tool_calls_in_one_turn() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>'
'<tool_call>{"name":"bash","arguments":{"cmd":"pwd"}}</tool_call>'
),
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"a"}</tool_response>',
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"/tmp"}</tool_response>',
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 2
assert calls[0].input == {"cmd": "ls"}
assert calls[1].input == {"cmd": "pwd"}
assert calls[0].output == "a"
assert calls[1].output == "/tmp"
def test_malformed_json_falls_back_to_best_effort() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"} <-- stray text }</tool_call>'
'<tool_call>{"name":"bash","arguments":{"cmd":"pwd"}}</tool_call>'
),
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
# First is malformed; parser recovers one or two calls without
# raising, and the clean second call is always captured.
assert len(calls) >= 1
assert any(c.input == {"cmd": "pwd"} for c in calls)
def test_name_variants_are_accepted() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"function":"bash","parameters":{"cmd":"ls"}}</tool_call>'
),
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 1
assert calls[0].name == "bash"
assert calls[0].input == {"cmd": "ls"}
def test_tool_error_marks_call_failed() -> None:
convo = _conv(
{
"from": "assistant",
"value": '<tool_call>{"name":"bash","arguments":{"cmd":"nonsense"}}</tool_call>',
},
{
"from": "tool",
"value": '<tool_response>{"stderr":"command not found","status":"error"}</tool_response>',
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 1
assert calls[0].success is False
assert "command not found" in calls[0].error
def test_orphan_tool_response_not_silently_dropped() -> None:
convo = _conv(
{
"from": "tool",
"value": '<tool_response>{"stdout":"nothing to pair with"}</tool_response>',
},
)
transcript = parse_conversation(convo)
# No calls, but one tool-role transcript message surfaces the output.
assert transcript.tool_call_sequence == []
tool_messages = [msg for msg in transcript.messages if msg.role == "tool"]
assert tool_messages
assert "nothing to pair" in tool_messages[0].tool_result_content
def test_parser_output_annotates_with_canonical_families() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"str_replace_based_edit_tool",'
'"arguments":{"path":"main.py","old":"a","new":"b"}}</tool_call>'
),
},
)
transcript = parse_conversation(convo)
# Running the existing trajectory classifier over the parsed
# transcript should assign a canonical family tag to every call.
annotated = annotate_transcript_tool_calls(transcript)
families = [c.family for c in annotated.tool_call_sequence]
assert all(f for f in families), f"expected every call to get a family tag, got {families}"
assert families == ["edit"]
def test_parse_chat_messages_pairs_tool_results() -> None:
transcript = parse_chat_messages(
[
{"role": "user", "content": "List files"},
{
"role": "assistant",
"content": "I'll inspect.",
"tool_calls": [
{
"id": "call-1",
"function": {
"name": "terminal",
"arguments": "{\"command\":\"ls\"}",
},
}
],
},
{"role": "tool", "tool_call_id": "call-1", "content": "main.py"},
{"role": "assistant", "content": "Found main.py"},
]
)
calls = transcript.tool_call_sequence
assert len(calls) == 1
assert calls[0].name == "terminal"
assert calls[0].input == {"command": "ls"}
assert calls[0].output == "main.py"
assert transcript.assistant_messages[-1].text == "Found main.py"

View File

@ -0,0 +1,444 @@
"""Tests for `OpenClawAdapter` — exercised against a stub gateway.
This validates the adapter wiring (lifecycle + state-query resolution)
in isolation, before the harness is rewired through it. The stub
`GatewayClient` records every call and produces canned responses so
the adapter's branches are covered end-to-end without a real gateway.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
import pytest
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterContext, StateQueryResult
from clawbench.adapters.openclaw import OpenClawAdapter, OpenClawAdapterConfig
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
GatewayAssertion,
MemoryState,
SessionState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Stub GatewayClient
# ---------------------------------------------------------------------------
class _StubGateway:
"""Minimal GatewayClient stand-in for adapter tests.
Records every `create_agent`, `create_session`, `subscribe`,
`send_and_wait`, `delete_*` call in `.calls`, and serves canned
responses for the verification RPCs used by `OpenClawAdapter`.
"""
def __init__(self) -> None:
self.calls: list[tuple[str, dict[str, Any]]] = []
self.rpc_responses: dict[str, dict[str, Any]] = {}
self.send_transcript = Transcript()
async def __aenter__(self) -> "_StubGateway":
self.calls.append(("__aenter__", {}))
return self
async def __aexit__(self, *exc: object) -> None:
self.calls.append(("__aexit__", {}))
async def create_agent(self, *, name: str, workspace: str) -> str:
self.calls.append(("create_agent", {"name": name, "workspace": workspace}))
return "agent-stub"
async def create_session(self, *, model: str, agent_id: str, label: str) -> str:
self.calls.append(
("create_session", {"model": model, "agent_id": agent_id, "label": label})
)
return f"session-{label}"
async def subscribe(self, session_key: str) -> None:
self.calls.append(("subscribe", {"session_key": session_key}))
async def send_and_wait(
self,
session_key: str,
message: str,
*,
timeout: float,
) -> Transcript:
self.calls.append(
(
"send_and_wait",
{"session_key": session_key, "message": message, "timeout": timeout},
)
)
return self.send_transcript
async def delete_session(self, session_key: str) -> None:
self.calls.append(("delete_session", {"session_key": session_key}))
async def delete_agent(self, agent_id: str, *, delete_files: bool) -> None:
self.calls.append(
("delete_agent", {"agent_id": agent_id, "delete_files": delete_files})
)
async def get_effective_tools(self, session_key: str) -> dict[str, Any]:
self.calls.append(("get_effective_tools", {"session_key": session_key}))
return self.rpc_responses.get(
"tools.effective",
{"groups": [{"tools": [{"id": "bash"}, {"id": "browser"}]}]},
)
async def _rpc(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
self.calls.append((f"_rpc:{method}", dict(params)))
if method in self.rpc_responses:
return self.rpc_responses[method]
raise RuntimeError(f"stub gateway: no response set for {method}")
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _coding_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="oa-adapter-test",
name="OA adapter test",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(),
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="Do the task.")],
),
completion=CompletionSpec(
files=[FileState(path="out.txt", exists=True)],
execution_checks=[ExecutionCheck(name="ok", command="true")],
),
)
)
def _mixed_state_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="oa-adapter-state-test",
name="OA state test",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
pre_check_gateway=[
GatewayAssertion(
method="agents.list",
assert_path="$.count",
assert_equals=0,
),
],
),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
session=SessionState(should_exist=True, model_should_be="opus"),
),
)
)
def _make_adapter_and_gateway() -> tuple[OpenClawAdapter, _StubGateway]:
gateway = _StubGateway()
adapter = OpenClawAdapter(OpenClawAdapterConfig(model="test-model"))
adapter._client_factory = lambda: gateway # type: ignore[assignment]
return adapter, gateway
def _make_ctx(task: CanonicalTask, workspace: Path) -> AdapterContext:
return AdapterContext(
task=task,
workspace=workspace,
runtime_values={},
run_index=0,
model="test-model",
transcript=Transcript(),
)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def test_openclaw_adapter_is_registered() -> None:
cls = get_adapter("openclaw")
assert cls is OpenClawAdapter
def test_openclaw_declares_full_capability_set() -> None:
assert AdapterCapability.FILES in OpenClawAdapter.capabilities
assert AdapterCapability.EXECUTION in OpenClawAdapter.capabilities
assert AdapterCapability.MEMORY in OpenClawAdapter.capabilities
assert AdapterCapability.SESSION in OpenClawAdapter.capabilities
assert AdapterCapability.CRON in OpenClawAdapter.capabilities
assert AdapterCapability.GATEWAY_RPC in OpenClawAdapter.capabilities
assert AdapterCapability.BROWSER in OpenClawAdapter.capabilities
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def test_setup_realizes_memory_seed_files(tmp_path: Path) -> None:
task = from_task_definition(
TaskDefinition(
id="oa-seeded-memory",
name="OA seeded memory",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
memory_seed=[
{
"key": "event profile",
"value": "Vegetarian food, quiet rooms, and no stairs.",
}
]
),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
)
)
adapter, gateway = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
assert (tmp_path / "MEMORY.md").read_text(encoding="utf-8").count("event profile") == 1
assert "Vegetarian food" in (tmp_path / "memory" / "event_profile.md").read_text(encoding="utf-8")
assert any(call[0] == "create_agent" for call in gateway.calls)
def test_run_phase_creates_session_subscribes_and_drives_simulator(tmp_path: Path) -> None:
task = _coding_task()
adapter, gateway = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
assert result.error is None
await adapter.teardown(ctx)
asyncio.run(_go())
methods = [name for name, _ in gateway.calls]
# Ordered sequence we expect:
assert "create_agent" in methods
assert "create_session" in methods
assert "subscribe" in methods
assert "send_and_wait" in methods
assert "delete_session" in methods
assert "delete_agent" in methods
# The send_and_wait call should use the rendered user turn text.
send_args = next(args for name, args in gateway.calls if name == "send_and_wait")
assert send_args["message"] == "Do the task."
def test_run_phase_fails_fast_without_setup(tmp_path: Path) -> None:
task = _coding_task()
adapter, _ = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
# Skip setup() — run_phase should return an error phase.
result = await adapter.run_phase(task.phases[0], ctx)
assert result.completed_normally is False
assert result.error and "agent_id" in result.error
asyncio.run(_go())
# ---------------------------------------------------------------------------
# State queries
# ---------------------------------------------------------------------------
def test_memory_query_uses_memory_search_primary_path(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["memory.search"] = {
"payload": {"entries": [{"value": "stack = React, Node, Postgres"}]}
}
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
assert result.detail == "OK"
def test_memory_query_falls_back_to_workspace_on_rpc_failure(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
# No memory.search response → primary path raises, fallback runs.
# Seed a MEMORY.md file in the workspace so the fallback succeeds.
(tmp_path / "MEMORY.md").write_text(
"stack: React, Node, Postgres", encoding="utf-8"
)
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_session_query_uses_sessions_resolve(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["sessions.resolve"] = {
"payload": {"model": "claude-opus-4"}
}
query = StateQuery(
kind="session",
predicate="exists",
selector={},
expected={"model": "opus"},
required_capability=AdapterCapability.SESSION,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
ctx.adapter_state["last_session_key"] = "some-session"
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_gateway_query_resolves_json_path(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["memory.list"] = {
"payload": {"count": 3}
}
query = StateQuery(
kind="custom",
predicate="equals",
selector={"method": "memory.list", "params": {}, "assert_path": "$.count"},
expected={"equals": 3, "exists": True},
required_capability=AdapterCapability.GATEWAY_RPC,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_cron_query_returns_false_when_no_jobs(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["cron.list"] = {"payload": {"jobs": []}}
query = StateQuery(
kind="cron",
predicate="exists",
selector={"description_contains": "daily"},
expected={},
required_capability=AdapterCapability.CRON,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is False
def test_pre_run_queries_evaluated_during_setup(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
# Deliberately return the wrong count to trigger a pre-run failure.
gateway.rpc_responses["agents.list"] = {"payload": {"count": 99}}
async def _go() -> list[str]:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return ctx.adapter_state.get("pre_run_failures", [])
failures = asyncio.run(_go())
assert failures, "pre-run gateway assertion should have failed"
# ---------------------------------------------------------------------------
# Requires-context guard
# ---------------------------------------------------------------------------
def test_client_accessor_errors_when_not_in_context() -> None:
adapter, _ = _make_adapter_and_gateway()
with pytest.raises(RuntimeError):
_ = adapter.client

View File

@ -20,6 +20,13 @@ def test_submission_request_defaults_to_single_parallel_lane():
assert request.max_parallel_lanes == 1
assert request.runs_per_task == 3
assert request.judge_affects_score is False
assert request.task_ids == []
def test_local_queue_dir_honors_env_override(tmp_path, monkeypatch):
monkeypatch.setenv("CLAWBENCH_LOCAL_QUEUE_DIR", str(tmp_path / "queue"))
assert queue_module._resolve_local_queue_dir() == tmp_path / "queue"
def test_submission_request_fingerprint_includes_judge_score_gate():
@ -33,6 +40,29 @@ def test_submission_request_fingerprint_includes_judge_score_gate():
assert advisory.active_fingerprint() != weighted.active_fingerprint()
def test_submission_request_fingerprint_includes_task_ids():
all_tasks = SubmissionRequest(model="anthropic/claude-sonnet-4-6")
subset = SubmissionRequest(
model="anthropic/claude-sonnet-4-6",
task_ids=["t1-fs-quick-note"],
)
assert all_tasks.active_fingerprint() != subset.active_fingerprint()
def test_submission_request_fingerprint_canonicalizes_task_ids():
first = SubmissionRequest(
model="anthropic/claude-sonnet-4-6",
task_ids=[" t2-demo ", "t1-demo", "t2-demo"],
)
second = SubmissionRequest(
model="anthropic/claude-sonnet-4-6",
task_ids=["t1-demo", "t2-demo"],
)
assert first.active_fingerprint() == second.active_fingerprint()
def test_save_local_replaces_queue_file_atomically(tmp_path, monkeypatch):
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", tmp_path)
monkeypatch.setattr(queue_module, "HF_TOKEN", "")

View File

@ -0,0 +1,335 @@
from __future__ import annotations
import datetime
import importlib
import json
import sys
import threading
from pathlib import Path
import pytest
from clawbench.client import GatewayConfig
from clawbench.harness import BenchmarkHarness
from clawbench.queue import Job, JobQueue, JobStatus, SubmissionRequest
import clawbench.queue as queue_module
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
SimulatedUser,
TaskDefinition,
TaskFamily,
Tier,
ToolCall,
TrajectoryExpectations,
Transcript,
TranscriptMessage,
UserTurn,
)
from clawbench.worker import EvalWorker
def _runtime_task() -> TaskDefinition:
return TaskDefinition(
id="runtime-contract-smoke",
name="Runtime Contract Smoke",
tier=Tier.TIER1,
family=TaskFamily.TOOLS,
surface="tools",
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="create answer.txt with runtime ok, then verify it")],
),
completion=CompletionSpec(
execution_checks=[
ExecutionCheck(
name="answer artifact",
command=(
"{python_exe} -c "
"\"from pathlib import Path; "
"assert Path('answer.txt').read_text(encoding='utf-8') == 'runtime ok\\n'\""
),
)
]
),
trajectory=TrajectoryExpectations(
required_families=["read", "edit", "execute"],
min_distinct_families=3,
require_read_before_mutation=True,
require_self_verification=True,
),
)
class _GatewayState:
def __init__(self) -> None:
self.agent_workspaces: dict[str, Path] = {}
self.session_agents: dict[str, str] = {}
self.deleted_sessions: list[str] = []
self.deleted_agents: list[str] = []
class _SuccessfulGatewayClient:
state = _GatewayState()
def __init__(self, config: GatewayConfig | None = None) -> None:
self.config = config or GatewayConfig()
async def __aenter__(self) -> _SuccessfulGatewayClient:
return self
async def __aexit__(self, *exc: object) -> None:
return None
async def create_agent(self, *, name: str, workspace: str) -> str:
agent_id = f"agent-{len(self.state.agent_workspaces) + 1}"
self.state.agent_workspaces[agent_id] = Path(workspace)
return agent_id
async def create_session(self, *, model: str, agent_id: str, label: str) -> str: # noqa: ARG002
session_key = f"session-{len(self.state.session_agents) + 1}"
self.state.session_agents[session_key] = agent_id
return session_key
async def subscribe(self, session_key: str) -> None: # noqa: ARG002
return None
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
workspace = self.state.agent_workspaces[self.state.session_agents[session_key]]
(workspace / "answer.txt").write_text("runtime ok\n", encoding="utf-8")
return Transcript(
messages=[
TranscriptMessage(
role="assistant",
text="i'll inspect, write the answer, then verify it.",
tool_calls=[
ToolCall(
name="read_file",
input={"path": "answer.txt"},
output="missing",
success=True,
),
ToolCall(
name="write_file",
input={"path": "answer.txt"},
output="wrote answer.txt",
success=True,
),
ToolCall(
name="shell",
input={"command": "python -m pytest -q"},
output="1 passed",
success=True,
),
],
),
TranscriptMessage(role="assistant", text="done, verified."),
]
)
async def delete_session(self, session_key: str) -> None:
self.state.deleted_sessions.append(session_key)
async def delete_agent(self, agent_id: str, *, delete_files: bool = False) -> None: # noqa: ARG002
self.state.deleted_agents.append(agent_id)
class _DisconnectingGatewayClient(_SuccessfulGatewayClient):
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
raise ConnectionError("gateway connection dropped")
@pytest.mark.asyncio
async def test_queue_worker_harness_scorer_happy_path_writes_result(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
queue_dir = tmp_path / "queue"
results_dir = tmp_path / "results"
state_dir = tmp_path / "state"
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", queue_dir)
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
monkeypatch.setattr("clawbench.worker.RESULTS_DIR", results_dir)
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(state_dir))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
async def fake_upload_result(result) -> None: # noqa: ANN001
return None
async def fake_ensure_gateway() -> None:
return None
async def fake_preflight_browser_support_for_tasks(*args, **kwargs) -> None: # noqa: ANN002, ANN003
return None
task = _runtime_task()
queue = JobQueue()
job = await queue.submit(
SubmissionRequest(
model="test/model",
provider="test",
runs_per_task=1,
max_parallel_lanes=1,
)
)
claimed = await queue.claim_pending()
assert [claimed_job.job_id for claimed_job in claimed] == [job.job_id]
worker = EvalWorker(queue)
monkeypatch.setattr(worker, "_load_job_tasks", lambda current_job: [task])
monkeypatch.setattr("clawbench.harness.load_all_tasks", lambda **kwargs: [task])
monkeypatch.setattr(worker, "_ensure_gateway", fake_ensure_gateway)
monkeypatch.setattr(worker, "_preflight_browser_support_for_tasks", fake_preflight_browser_support_for_tasks)
monkeypatch.setattr(worker, "_stop_gateway", lambda: None)
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: None)
monkeypatch.setattr("clawbench.upload.upload_result", fake_upload_result)
await worker._process_job(claimed[0])
finished = await queue.get_status(job.job_id)
assert finished is not None
assert finished.status == JobStatus.FINISHED
assert finished.result_id is not None
assert finished.progress_message == "Finished"
result_path = results_dir / f"{finished.result_id}.json"
result = json.loads(result_path.read_text(encoding="utf-8"))
assert result["model"] == "test/model"
assert result["overall_completion"] == 1.0
assert result["overall_pass_hat_k"] == 1.0
assert result["task_results"][0]["task_id"] == "runtime-contract-smoke"
@pytest.mark.asyncio
async def test_harness_turn_disconnect_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _DisconnectingGatewayClient)
harness = BenchmarkHarness(
gateway_config=GatewayConfig(),
model="test/model",
randomize_order=False,
print_report=False,
quiet=True,
)
result = await harness._run_single(_runtime_task(), 0)
assert result.run_score == 0.0
assert result.delivery_outcome.value == "fail"
assert result.failure_mode is not None
assert result.failure_mode.value == "environment_unavailable"
assert "gateway connection dropped" in (result.error or "")
@pytest.mark.asyncio
async def test_harness_scorer_exception_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
async def fail_score_task_run(**kwargs): # noqa: ANN003
raise RuntimeError("scorer exploded")
monkeypatch.setattr("clawbench.harness.score_task_run", fail_score_task_run)
harness = BenchmarkHarness(
gateway_config=GatewayConfig(),
model="test/model",
randomize_order=False,
print_report=False,
quiet=True,
)
result = await harness._run_single(_runtime_task(), 0)
assert result.run_score == 0.0
assert result.delivery_outcome.value == "fail"
assert result.failure_mode is not None
assert result.failure_mode.value == "state_regression"
assert result.error == "scorer exploded"
@pytest.mark.asyncio
async def test_stale_evaluating_job_can_be_reclaimed_and_claimed_again(monkeypatch: pytest.MonkeyPatch):
queue = JobQueue()
stale_started_at = (
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
).isoformat()
queue._jobs = {
"job-1": Job(
job_id="job-1",
status=JobStatus.EVALUATING,
started_at=stale_started_at,
last_progress_at=stale_started_at,
current_task_id="runtime-contract-smoke",
current_run_index=1,
current_run_total=1,
attempt_count=1,
request=SubmissionRequest(model="test/model"),
)
}
monkeypatch.setattr(queue, "_save_local", lambda: None)
async def fake_sync_to_hub() -> None:
return None
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync_to_hub)
reclaimed = await queue.reclaim_stale_jobs(stale_after_seconds=300)
claimed = await queue.claim_pending()
assert [job.job_id for job in reclaimed] == ["job-1"]
assert [job.job_id for job in claimed] == ["job-1"]
job = queue._jobs["job-1"]
assert job.status == JobStatus.EVALUATING
assert job.attempt_count == 2
assert job.stale_requeues == 1
assert job.current_task_id is None
assert job.current_run_index is None
assert job.progress_message == "Queued for evaluation"
def test_leaderboard_skips_malformed_local_result_file(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
class NoopThread:
def __init__(self, *args, **kwargs) -> None: # noqa: ANN002, ANN003
return None
def start(self) -> None:
return None
monkeypatch.setattr(threading, "Thread", NoopThread)
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", tmp_path / "queue")
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
sys.modules.pop("app", None)
app = importlib.import_module("app")
results_dir = tmp_path / "results"
results_dir.mkdir()
(results_dir / "bad.json").write_text("{not json", encoding="utf-8")
(results_dir / "good.json").write_text(
json.dumps(
{
"model": "test/model",
"timestamp": "2026-04-29T00:00:00+00:00",
"overall_score": 0.91,
"overall_completion": 1.0,
"overall_trajectory": 0.8,
"overall_behavior": 1.0,
"overall_pass_hat_k": 1.0,
"environment": {"prompt_variant": "clear", "scenario": "all"},
"task_results": [{"task_id": "runtime-contract-smoke"}],
}
),
encoding="utf-8",
)
monkeypatch.setattr(app, "RESULTS_DIR", results_dir)
monkeypatch.setattr(app, "dataset_has_submission_results", lambda api, repo: False)
frame = app._load_leaderboard_uncached()
assert list(frame["Model"]) == ["test/model"]
assert list(frame["Score"]) == [0.91]

View File

@ -0,0 +1,25 @@
from importlib import util
from pathlib import Path
def load_serve_module():
serve_path = (
Path(__file__).resolve().parents[1]
/ "tasks-public"
/ "assets"
/ "t3_web_research_and_cite"
/ "serve.py"
)
spec = util.spec_from_file_location("t3_web_research_serve", serve_path)
module = util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_article_paths_resolve_only_known_article_slugs():
serve = load_serve_module()
assert serve.article_for_request_path("/article/01_grid_basics").name == "01_grid_basics.html"
assert serve.article_for_request_path("/article/../../serve.py") is None
assert serve.article_for_request_path("/article/%2e%2e/%2e%2e/serve.py") is None

View File

@ -5,8 +5,15 @@ from types import SimpleNamespace
import pytest
from clawbench.queue import JobQueue
from clawbench.worker import GATEWAY_PORT, GATEWAY_PORT_SPACING, EvalWorker, JobProgressTracker, ParallelLane
from clawbench.queue import Job, JobQueue, JobStatus, SubmissionRequest
from clawbench.worker import (
GATEWAY_PORT,
GATEWAY_PORT_SPACING,
OPENCLAW_EVAL_SYSTEM_PROMPT,
EvalWorker,
JobProgressTracker,
ParallelLane,
)
class DummyTask:
@ -28,6 +35,52 @@ class DummyTask:
return [object()] * self._phases
class FakeQueue:
def __init__(self) -> None:
self.evaluating: list[str] = []
self.finished: list[tuple[str, str]] = []
self.failed: list[tuple[str, str]] = []
self.progress: list[tuple[str, dict[str, object]]] = []
async def mark_evaluating(self, job_id: str) -> None:
self.evaluating.append(job_id)
async def mark_finished(self, job_id: str, result_id: str) -> None:
self.finished.append((job_id, result_id))
async def mark_failed(self, job_id: str, error: str) -> None:
self.failed.append((job_id, error))
async def update_progress(self, job_id: str, **kwargs) -> None:
self.progress.append((job_id, kwargs))
class FakeBenchmarkResult:
submission_id = "submission-1"
overall_score = 0.82
overall_pass_hat_k = 1.0
def model_dump(self):
return {
"submission_id": self.submission_id,
"overall_score": self.overall_score,
"overall_pass_hat_k": self.overall_pass_hat_k,
}
def make_job(*, status: JobStatus = JobStatus.PENDING, lanes: int = 1) -> Job:
return Job(
job_id="job-1",
status=status,
request=SubmissionRequest(
model="anthropic/claude-sonnet-4-6",
provider="anthropic",
runs_per_task=1,
max_parallel_lanes=lanes,
),
)
def test_configure_browser_runtime_sets_benchmark_safe_openclaw_config(monkeypatch):
worker = EvalWorker(JobQueue())
state_dir = Path("/tmp/test-openclaw-config-basic")
@ -45,7 +98,12 @@ def test_configure_browser_runtime_sets_benchmark_safe_openclaw_config(monkeypat
assert json.loads(config_path.read_text(encoding="utf-8")) == {
"agents": {"defaults": {"skipBootstrap": True}},
"browser": {"headless": True, "noSandbox": True},
"tools": {"exec": {"host": "gateway", "security": "full", "ask": "off"}},
"approvals": {"exec": {"enabled": False}},
}
approvals = json.loads((state_dir / "exec-approvals.json").read_text(encoding="utf-8"))
assert approvals["defaults"] == {"security": "full", "ask": "off", "askFallback": "full"}
assert approvals["agents"]["*"] == {"security": "full", "ask": "off", "askFallback": "full"}
def test_configure_browser_runtime_pins_subagents_to_active_model(monkeypatch):
@ -68,10 +126,56 @@ def test_configure_browser_runtime_pins_subagents_to_active_model(monkeypatch):
"defaults": {
"skipBootstrap": True,
"model": {"primary": "openai-codex/gpt-5.4"},
"models": {"openai-codex/gpt-5.4": {"params": {"fastMode": True}}},
"systemPromptOverride": OPENCLAW_EVAL_SYSTEM_PROMPT,
"subagents": {"model": {"primary": "openai-codex/gpt-5.4"}},
}
},
"browser": {"headless": True, "noSandbox": True},
"tools": {"exec": {"host": "gateway", "security": "full", "ask": "off"}},
"approvals": {"exec": {"enabled": False}},
}
def test_configure_browser_runtime_uses_gateway_env_config_path(tmp_path: Path, monkeypatch):
worker = EvalWorker(JobQueue())
worker.set_active_model("openai-codex/gpt-5.4")
parent_state = tmp_path / "parent"
lane_state = tmp_path / "lane"
parent_state.mkdir()
lane_state.mkdir()
parent_config = parent_state / "openclaw.json"
lane_config = lane_state / "openclaw.json"
parent_config.write_text("{}", encoding="utf-8")
lane_config.write_text("{}", encoding="utf-8")
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(parent_state))
worker._configure_browser_runtime(
["node", "/openclaw/dist/cli.js"],
{
"OPENCLAW_STATE_DIR": str(lane_state),
"OPENCLAW_CONFIG_PATH": str(lane_config),
},
)
assert json.loads(parent_config.read_text(encoding="utf-8")) == {}
lane_data = json.loads(lane_config.read_text(encoding="utf-8"))
assert lane_data["agents"]["defaults"]["model"]["primary"] == "openai-codex/gpt-5.4"
assert lane_data["tools"]["exec"] == {"host": "gateway", "security": "full", "ask": "off"}
assert (lane_state / "exec-approvals.json").exists()
assert not (parent_state / "exec-approvals.json").exists()
def test_eval_model_defaults_pin_openai_to_sse_transport() -> None:
data: dict[str, object] = {}
changed = EvalWorker._apply_eval_model_defaults(data, "openai/gpt-5.5")
assert changed is True
assert data["agents"]["defaults"]["models"]["openai/gpt-5.5"]["params"] == {
"fastMode": True,
"transport": "sse",
"openaiWsWarmup": False,
}
@ -169,6 +273,90 @@ def test_materialize_lane_runtime_spaces_ports_and_copies_auth(tmp_path: Path, m
assert lane1.port == GATEWAY_PORT + GATEWAY_PORT_SPACING
assert lane1.state_dir is not None
assert (lane1.state_dir / "agents" / "main" / "agent" / "auth-profiles.json").exists()
lane_cfg = json.loads((lane1.state_dir / "openclaw.json").read_text(encoding="utf-8"))
assert lane_cfg["tools"]["exec"] == {"host": "gateway", "security": "full", "ask": "off"}
assert lane_cfg["approvals"]["exec"] == {"enabled": False}
lane_approvals = json.loads((lane1.state_dir / "exec-approvals.json").read_text(encoding="utf-8"))
assert lane_approvals["defaults"] == {"security": "full", "ask": "off", "askFallback": "full"}
@pytest.mark.asyncio
async def test_process_job_finishes_when_optional_result_upload_fails(tmp_path: Path, monkeypatch):
queue = FakeQueue()
worker = EvalWorker(queue) # type: ignore[arg-type]
cleanup_calls: list[str] = []
async def fake_run_serial_benchmark(job, tasks, progress): # noqa: ANN001
progress.mark_serial(tasks[0].id, 0, stage="running")
return FakeBenchmarkResult()
async def fake_upload_result(result): # noqa: ANN001
raise RuntimeError("hub upload unavailable")
monkeypatch.setattr("clawbench.worker.RESULTS_DIR", tmp_path)
monkeypatch.setattr(worker, "_load_job_tasks", lambda job: [DummyTask("t1", "tier1", "coding")])
monkeypatch.setattr(worker, "_run_serial_benchmark", fake_run_serial_benchmark)
monkeypatch.setattr(worker, "_stop_gateway", lambda: cleanup_calls.append("serial"))
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: cleanup_calls.append("parallel"))
monkeypatch.setattr("clawbench.upload.upload_result", fake_upload_result)
await worker._process_job(make_job())
assert queue.evaluating == ["job-1"]
assert queue.finished == [("job-1", "submission-1")]
assert queue.failed == []
assert (tmp_path / "submission-1.json").exists()
assert cleanup_calls[-2:] == ["serial", "parallel"]
assert worker._active_model == ""
assert worker._serial_last_task_id is None
@pytest.mark.asyncio
async def test_process_job_marks_failure_and_cleans_up_after_benchmark_error(monkeypatch):
queue = FakeQueue()
worker = EvalWorker(queue) # type: ignore[arg-type]
cleanup_calls: list[str] = []
async def fail_run_serial_benchmark(job, tasks, progress): # noqa: ANN001
raise RuntimeError("gateway died")
monkeypatch.setattr(worker, "_load_job_tasks", lambda job: [DummyTask("t1", "tier1", "coding")])
monkeypatch.setattr(worker, "_run_serial_benchmark", fail_run_serial_benchmark)
monkeypatch.setattr(worker, "_stop_gateway", lambda: cleanup_calls.append("serial"))
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: cleanup_calls.append("parallel"))
await worker._process_job(make_job())
assert queue.evaluating == ["job-1"]
assert queue.finished == []
assert queue.failed == [("job-1", "gateway died")]
assert cleanup_calls[-2:] == ["serial", "parallel"]
assert worker._active_model == ""
assert worker._serial_last_task_id is None
@pytest.mark.asyncio
async def test_process_job_does_not_reclaim_already_claimed_evaluating_job(tmp_path: Path, monkeypatch):
queue = FakeQueue()
worker = EvalWorker(queue) # type: ignore[arg-type]
async def fake_run_serial_benchmark(job, tasks, progress): # noqa: ANN001
return FakeBenchmarkResult()
async def fake_upload_result(result): # noqa: ANN001
return None
monkeypatch.setattr("clawbench.worker.RESULTS_DIR", tmp_path)
monkeypatch.setattr(worker, "_load_job_tasks", lambda job: [DummyTask("t1", "tier1", "coding")])
monkeypatch.setattr(worker, "_run_serial_benchmark", fake_run_serial_benchmark)
monkeypatch.setattr(worker, "_stop_gateway", lambda: None)
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: None)
monkeypatch.setattr("clawbench.upload.upload_result", fake_upload_result)
await worker._process_job(make_job(status=JobStatus.EVALUATING))
assert queue.evaluating == []
assert queue.finished == [("job-1", "submission-1")]
@pytest.mark.asyncio