Compare commits

...

3 Commits

Author SHA1 Message Date
Vincent Koc
69a2311681
fix: harden adapter workspace checks
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
2026-04-29 13:53:44 -07:00
Vincent Koc
82eaadbc61
Merge remote-tracking branch 'origin/main' into pr17-nonrewrite
* origin/main:
  fix(worker): harden runtime result writes
  fix(client): clean pending rpc on send failure
  test: cover environment verifier success paths
  test: cover judge score gate propagation
  fix(scoring): gate judge-weighted scores
  fix(runtime): harden benchmark cache and task paths
  fix: flag credential file access in dangerous shell patterns (#6)
  fix: flag git push --force variants as dangerous shell commands (#5)
  chore: add open-source contribution scaffolding (#3)
  fix: strip quoted strings before checking for shell redirect operators (#2)
2026-04-29 13:52:41 -07:00
scoootscooob
30334cac88 feat: add adapter canonicalization layer
Some checks are pending
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Waiting to run
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Waiting to run
2026-04-29 11:15:11 -07:00
19 changed files with 5245 additions and 0 deletions

313
clawbench/ablation.py Normal file
View File

@ -0,0 +1,313 @@
"""Ablation profiles and fair-comparison helpers.
The benchmark can only explain model, harness, and tool effects if those
axes are represented explicitly in run metadata. This module keeps that
representation small and deterministic: a harness driver plus a tool
profile yields a fingerprint, and result comparison refuses to call a
delta fair when models or task sets drift.
"""
from __future__ import annotations
import hashlib
import json
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable
from pydantic import BaseModel, Field
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterConfig
from clawbench.canonical import AdapterCapability
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import BenchmarkResult, TaskDefinition
CAPABILITY_TO_INTERFACE: dict[AdapterCapability, str] = {
AdapterCapability.FILES: "filesystem",
AdapterCapability.EXECUTION: "shell",
AdapterCapability.MEMORY: "memory",
AdapterCapability.SESSION: "session",
AdapterCapability.CRON: "scheduler",
AdapterCapability.BROWSER: "browser",
AdapterCapability.GATEWAY_RPC: "gateway_rpc",
AdapterCapability.MULTI_TURN_INJECTION: "multi_turn",
}
class HarnessDescriptor(BaseModel):
"""Identifies the agent loop being measured."""
adapter: str
driver: str = ""
version: str = ""
git_sha: str = ""
source: str = ""
invocation: str = "clawbench"
class ToolProfile(BaseModel):
"""The tools/interfaces exposed to a harness run."""
name: str
mode: str = "native"
interfaces: list[str] = Field(default_factory=list)
adapter_capabilities: list[str] = Field(default_factory=list)
enabled_toolsets: list[str] = Field(default_factory=list)
disabled_toolsets: list[str] = Field(default_factory=list)
tools: list[str] = Field(default_factory=list)
fingerprint: str = ""
def with_fingerprint(self) -> "ToolProfile":
payload = {
"name": self.name,
"mode": self.mode,
"interfaces": sorted(self.interfaces),
"adapter_capabilities": sorted(self.adapter_capabilities),
"enabled_toolsets": sorted(self.enabled_toolsets),
"disabled_toolsets": sorted(self.disabled_toolsets),
"tools": sorted(self.tools),
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()
return self.model_copy(update={"fingerprint": digest[:16]})
class AblationProfile(BaseModel):
"""Run-level axis metadata embedded in BenchmarkResult.environment."""
model: str
harness: HarnessDescriptor
tool_profile: ToolProfile
prompt_profile: str = "clear"
fingerprint: str = ""
def with_fingerprint(self) -> "AblationProfile":
tool_profile = self.tool_profile.with_fingerprint()
payload = {
"model": self.model,
"harness": self.harness.model_dump(),
"tool_profile": tool_profile.model_dump(),
"prompt_profile": self.prompt_profile,
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
).hexdigest()
return self.model_copy(
update={
"tool_profile": tool_profile,
"fingerprint": digest[:16],
}
)
@dataclass(frozen=True)
class FairTaskSet:
task_ids: list[str]
skipped: dict[str, list[str]] = field(default_factory=dict)
def capabilities_to_interfaces(capabilities: Iterable[AdapterCapability | str]) -> list[str]:
values: list[str] = []
for cap in capabilities:
enum_value = cap if isinstance(cap, AdapterCapability) else AdapterCapability(str(cap))
values.append(CAPABILITY_TO_INTERFACE.get(enum_value, enum_value.value))
return sorted(set(values))
def adapter_capabilities(
adapter: str,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
adapter_cls = get_adapter(adapter)
return adapter_cls.supported_capabilities(config)
def default_tool_profile(
*,
adapter: str,
config: AdapterConfig | None = None,
name: str | None = None,
mode: str = "native",
enabled_toolsets: list[str] | None = None,
disabled_toolsets: list[str] | None = None,
) -> ToolProfile:
caps = adapter_capabilities(adapter, config)
profile = ToolProfile(
name=name or f"{adapter}-{mode}",
mode=mode,
interfaces=capabilities_to_interfaces(caps),
adapter_capabilities=sorted(cap.value for cap in caps),
enabled_toolsets=enabled_toolsets or [],
disabled_toolsets=disabled_toolsets or [],
)
return profile.with_fingerprint()
def compatible_task_ids(
tasks: Iterable[TaskDefinition],
*,
adapter: str,
config: AdapterConfig | None = None,
) -> tuple[list[str], dict[str, list[str]]]:
caps = adapter_capabilities(adapter, config)
task_ids: list[str] = []
skipped: dict[str, list[str]] = {}
for task in tasks:
canonical = from_task_definition(task)
missing = set(canonical.required_adapter_capabilities) - caps
if missing:
skipped[task.id] = sorted(cap.value for cap in missing)
else:
task_ids.append(task.id)
return task_ids, skipped
def common_compatible_task_set(
tasks: Iterable[TaskDefinition],
adapter_configs: dict[str, tuple[str, AdapterConfig | None]],
) -> FairTaskSet:
task_list = list(tasks)
common: set[str] | None = None
skipped: dict[str, list[str]] = {}
for label, (adapter, config) in adapter_configs.items():
ids, missing = compatible_task_ids(task_list, adapter=adapter, config=config)
ids_set = set(ids)
common = ids_set if common is None else common & ids_set
for task_id, caps in missing.items():
skipped.setdefault(task_id, []).append(f"{label}: {', '.join(caps)}")
ordered = [task.id for task in task_list if task.id in (common or set())]
return FairTaskSet(task_ids=ordered, skipped=skipped)
def build_ablation_profile(
*,
model: str,
adapter: str,
config: AdapterConfig | None = None,
prompt_profile: str = "clear",
harness_version: str = "",
harness_git_sha: str = "",
harness_source: str = "",
driver: str = "",
tool_profile_name: str | None = None,
enabled_toolsets: list[str] | None = None,
disabled_toolsets: list[str] | None = None,
) -> AblationProfile:
harness = HarnessDescriptor(
adapter=adapter,
driver=driver,
version=harness_version,
git_sha=harness_git_sha,
source=harness_source,
)
tool_profile = default_tool_profile(
adapter=adapter,
config=config,
name=tool_profile_name,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
)
return AblationProfile(
model=model,
harness=harness,
tool_profile=tool_profile,
prompt_profile=prompt_profile,
).with_fingerprint()
def compare_results(results: dict[str, BenchmarkResult]) -> dict[str, Any]:
"""Return score deltas plus fairness checks for result JSONs."""
labels = list(results)
models = {label: result.model for label, result in results.items()}
task_sets = {
label: [task.task_id for task in result.task_results]
for label, result in results.items()
}
first_tasks = next(iter(task_sets.values()), [])
same_task_set = all(tasks == first_tasks for tasks in task_sets.values())
same_model = len(set(models.values())) == 1
snapshot_fingerprints = {
result.task_snapshot_fingerprint
for result in results.values()
if result.task_snapshot_fingerprint
}
same_task_snapshot = len(snapshot_fingerprints) <= 1
prompt_variants = {
str(result.environment.get("prompt_variant", ""))
for result in results.values()
if result.environment.get("prompt_variant", "")
}
same_prompt_variant = len(prompt_variants) <= 1
benchmark_releases = {
result.benchmark_release_id
for result in results.values()
if result.benchmark_release_id
}
same_benchmark_release = len(benchmark_releases) <= 1
task_verifier_fair = same_task_set and same_task_snapshot and same_prompt_variant and same_benchmark_release
rows: dict[str, Any] = {}
for label, result in results.items():
rows[label] = {
"model": result.model,
"adapter": result.environment.get("adapter", ""),
"score": result.overall_score,
"completion": result.overall_completion,
"trajectory": result.overall_trajectory,
"behavior": result.overall_behavior,
"reliability": result.overall_reliability,
"task_count": len(result.task_results),
"task_snapshot_fingerprint": result.task_snapshot_fingerprint,
"benchmark_release_id": result.benchmark_release_id,
"prompt_variant": result.environment.get("prompt_variant", ""),
"dimension_coverage": result.environment.get("dimension_coverage", {}),
"ablation": result.environment.get("ablation_profile", {}),
}
deltas: dict[str, float] = {}
if labels:
baseline = results[labels[0]].overall_score
for label in labels[1:]:
deltas[f"{label}_minus_{labels[0]}"] = round(
results[label].overall_score - baseline,
4,
)
return {
"fair": bool(task_verifier_fair),
"task_verifier_fair": bool(task_verifier_fair),
"controlled_ablation": bool(task_verifier_fair and same_model),
"same_model": same_model,
"same_task_set": same_task_set,
"same_task_snapshot": same_task_snapshot,
"same_prompt_variant": same_prompt_variant,
"same_benchmark_release": same_benchmark_release,
"models": models,
"task_sets": task_sets,
"rows": rows,
"deltas": deltas,
}
def git_head(path: Path) -> tuple[str, str]:
"""Best-effort `(sha, describe)` for harness provenance."""
try:
sha = subprocess.check_output(
["git", "-C", str(path), "rev-parse", "HEAD"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
desc = subprocess.check_output(
["git", "-C", str(path), "describe", "--tags", "--always", "--dirty"],
text=True,
stderr=subprocess.DEVNULL,
).strip()
return sha, desc
except Exception:
return "", ""

View File

@ -0,0 +1,102 @@
"""Agent adapter layer — Phase-4 of CLAWBENCH_V0_4_SPEC.md.
Adapters plug an agent framework (OpenClaw, Hermes, Codex, Claude Code,
Deerflow, ) into ClawBench's canonical task pipeline. Each adapter is
responsible for:
- Setting up the workspace + seed state from a `CanonicalTask`.
- Driving the agent through each `CanonicalPhase`'s simulated user.
- Returning a canonical `Transcript` so the scorer, trajectory analyser,
and judge can score the run unchanged.
- Resolving `StateQuery` assertions that fall under its declared
capabilities; returning `capability_missing=True` for queries that
require a capability the adapter doesn't provide.
The `ADAPTERS` registry is populated by each adapter module at import
time. `get_adapter(name)` is the canonical lookup.
"""
from __future__ import annotations
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
#: Registry of adapter_name → adapter class. Populated by the adapter
#: modules at import time (e.g. `from clawbench.adapters.openclaw import *`
#: registers the OpenClaw adapter). Callers should use `get_adapter`
#: rather than reading this dict directly.
ADAPTERS: dict[str, type[AgentAdapter]] = {}
def register_adapter(cls: type[AgentAdapter]) -> type[AgentAdapter]:
"""Decorator / direct-call helper that registers an adapter class.
Adapters declare themselves via:
```
@register_adapter
class HermesAdapter(AgentAdapter):
name = "hermes"
...
```
"""
name = getattr(cls, "name", "")
if not name:
raise ValueError(f"{cls.__name__} must set a non-empty `name` class attribute")
existing = ADAPTERS.get(name)
if existing is not None and existing is not cls:
raise ValueError(
f"Adapter name collision: '{name}' is already registered "
f"to {existing.__qualname__}"
)
ADAPTERS[name] = cls
return cls
def get_adapter(name: str) -> type[AgentAdapter]:
"""Look up an adapter class by its registered name.
Import the adapter module before calling this so the registration
has run. `clawbench.adapters.openclaw` always loads; optional
adapters (hermes, codex) guard their imports and raise a clear
error if their runtime dep isn't installed.
"""
try:
return ADAPTERS[name]
except KeyError as exc:
available = ", ".join(sorted(ADAPTERS)) or "(none)"
raise KeyError(
f"Unknown adapter '{name}'. Registered adapters: {available}"
) from exc
__all__ = [
"ADAPTERS",
"AdapterConfig",
"AdapterContext",
"AgentAdapter",
"PhaseResult",
"StateQueryResult",
"get_adapter",
"register_adapter",
]
# Register built-in adapters at import time. Each adapter module is
# expected to @register_adapter its class. OpenClaw is always
# available; optional adapters (hermes, codex) guard their imports and
# are registered only when their runtime dep is present.
from clawbench.adapters import openclaw as _openclaw # noqa: E402,F401
try:
from clawbench.adapters import hermes as _hermes # noqa: E402,F401
except Exception:
# hermes-agent is an optional extra; absence is fine.
_hermes = None # type: ignore[assignment]

234
clawbench/adapters/base.py Normal file
View File

@ -0,0 +1,234 @@
"""Agent adapter ABC and associated data shapes.
An `AgentAdapter` is the execution counterpart to a `CanonicalTask`. It
is the only place where framework-specific details (OpenClaw gateway
RPCs, Hermes `MiniSWERunner`, Claude Code SDK, etc.) live. Everything
downstream of the adapter trajectory analysis, scorer, judge, stats
consumes a canonical `Transcript` and `TaskRunResult` produced by the
adapter, so those modules stay unchanged across adapters.
Lifecycle per task run:
1. Harness instantiates `adapter = AdapterClass(config)`.
2. `async with adapter as adapter:` starts subprocesses / websockets
/ whatever this adapter needs to hold open across a run.
3. `await adapter.setup(ctx)` realizes seed state, workspace files,
background services, pre-run state queries.
4. For each `CanonicalPhase`: `await adapter.run_phase(phase, ctx)`
drives the simulated user against the agent, returns a
`PhaseResult` with the transcript increment.
5. For each `StateQuery` in `task.verifier.state_queries`:
`await adapter.verify_state_query(query, ctx)` returns whether
the assertion held, or that the adapter lacks the capability.
6. `await adapter.teardown(ctx)` cleans up agent-side state (the
workspace itself is harness-owned).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, ClassVar
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
CanonicalTask,
StateQuery,
)
from clawbench.schemas import Transcript, TranscriptMessage
@dataclass
class AdapterConfig:
"""Base config every adapter accepts.
Adapters subclass this to add their own fields. The harness builds
a config instance from CLI flags / env vars and passes it to the
adapter constructor.
"""
#: Primary model identifier. Semantics are adapter-specific (an
#: OpenClaw model id, a Hermes `--model` string, etc.).
model: str = ""
@dataclass
class AdapterContext:
"""Per-run context handed to every adapter method.
`transcript` is mutated in place across phases: each
`run_phase` call appends the messages it observed, so the scorer
sees one consolidated `Transcript` at the end.
"""
task: CanonicalTask
workspace: Path
runtime_values: dict[str, Any]
run_index: int
model: str
transcript: Transcript
#: Free-form adapter-owned scratch state (e.g. the OpenClaw
#: `session_key` and `agent_id`; the Hermes `MiniSWERunner`
#: instance). The harness never reads these — the adapter is free
#: to use the dict as its own in-context cache.
adapter_state: dict[str, Any] = field(default_factory=dict)
@dataclass
class PhaseResult:
"""The transcript increment produced by a single phase."""
messages: list[TranscriptMessage] = field(default_factory=list)
#: Adapter-specific metadata for this phase (token counts returned
#: by the adapter, session identifiers, etc.). Merged into
#: `TaskRunResult` under the `efficiency_result` / adapter metadata
#: fields where applicable.
adapter_metadata: dict[str, Any] = field(default_factory=dict)
#: True if the adapter detected that the agent completed normally
#: (e.g. Hermes's `completed=True`). Not a pass/fail signal — just
#: whether the trajectory ran out of work vs was cut short. The
#: scorer uses this in `delivery_outcome` classification.
completed_normally: bool = True
#: If the phase aborted due to the adapter itself (not the agent),
#: populated with an error message the harness surfaces.
error: str | None = None
@dataclass
class StateQueryResult:
"""Result of resolving a `StateQuery` against the adapter's state.
`capability_missing=True` means "this adapter cannot evaluate this
kind of query". The scorer treats that as neutral (neither pass nor
fail) and records a skip note in the `CompletionResult`; under
`--strict-compat` the harness will have filtered the task out before
the adapter ever saw it.
"""
ok: bool
detail: str = ""
capability_missing: bool = False
class AgentAdapter(ABC):
"""Abstract base class for agent adapters.
Subclasses MUST:
- Set a unique `name: ClassVar[str]`.
- Set a `capabilities: ClassVar[set[AdapterCapability]]` declaring
which state-query kinds the adapter can resolve.
- Implement `setup`, `run_phase`, `verify_state_query`, `teardown`.
- Optionally implement `__aenter__` / `__aexit__` for long-lived
resource setup (a persistent websocket, a subprocess pool).
"""
name: ClassVar[str] = ""
capabilities: ClassVar[set[AdapterCapability]] = set()
def __init__(self, config: AdapterConfig | None = None) -> None:
self.config: AdapterConfig = config or AdapterConfig()
# ------------------------------------------------------------------
# Optional long-lived resource management.
# ------------------------------------------------------------------
async def __aenter__(self) -> "AgentAdapter":
return self
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None:
return None
# ------------------------------------------------------------------
# Required per-run lifecycle.
# ------------------------------------------------------------------
@abstractmethod
async def setup(self, ctx: AdapterContext) -> None:
"""Realise the workspace, seed state, and any pre-run state.
The harness has already created the workspace dir and expanded
`CanonicalAssets.workspace_files` into it. The adapter is
responsible for:
- Applying `seed_state` entries via an adapter-appropriate
mechanism (OpenClaw memory RPCs; Hermes file writes).
- Starting the agent's process/session so `run_phase` can send
turns immediately.
"""
@abstractmethod
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Drive one `CanonicalPhase` to completion.
The simulated user in `phase.user` dictates what to send and
when. The adapter's job is to deliver those turns, observe the
agent's responses, and append canonical `TranscriptMessage`
entries to `ctx.transcript`.
"""
@abstractmethod
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
"""Resolve one `StateQuery` against the agent's post-run state.
Adapters whose `capabilities` don't cover `query.required_capability`
should return `StateQueryResult(ok=False, capability_missing=True)`.
"""
@abstractmethod
async def teardown(self, ctx: AdapterContext) -> None:
"""Release any agent-side state created during `setup`/`run_phase`.
The harness owns the workspace lifecycle; the adapter owns
sessions, subprocesses, and any in-memory caches it held open.
"""
# ------------------------------------------------------------------
# Convenience helpers available to every adapter.
# ------------------------------------------------------------------
@classmethod
def supported_capabilities(
cls,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
"""Return capabilities available for a concrete adapter config.
Most adapters have a fixed surface and can use the class-level
`capabilities`. Adapters with multiple driver modes, such as Hermes
MiniSWE vs full AIAgent, override this to keep task gating honest.
"""
return set(cls.capabilities)
@classmethod
def missing_capabilities_for(
cls,
task: CanonicalTask,
config: AdapterConfig | None = None,
) -> set[AdapterCapability]:
"""Return the subset of `task.required_adapter_capabilities` this
adapter cannot cover. Empty set means the task is fully runnable
under this adapter.
"""
return set(task.required_adapter_capabilities) - cls.supported_capabilities(config)
@classmethod
def supports(
cls,
task: CanonicalTask,
config: AdapterConfig | None = None,
) -> bool:
"""True iff this adapter can cover every capability the task needs."""
return not cls.missing_capabilities_for(task, config)

View File

@ -0,0 +1,706 @@
"""Hermes adapter — drives Nous Research `hermes-agent`.
Hermes (https://github.com/NousResearch/hermes-agent) is a Python agent
framework with `MiniSWERunner` as its clean programmatic entry point.
This adapter:
1. Realizes the canonical workspace + seed state (seed_state entries
with `kind="memory"` become files, since Hermes has no memory RPC).
2. Constructs a `MiniSWERunner` scoped to the workspace.
3. For each canonical phase, renders the user turn and calls
`runner.run_task(prompt)` in a worker thread, with the phase's
timeout enforced as a wall clock.
4. Parses the returned `conversations` via
`clawbench.adapters.hermes_xml.parse_conversation` into a canonical
`Transcript` the scorer can consume unchanged.
5. For state queries the adapter can't resolve (session, cron, custom
gateway RPC), returns `capability_missing=True` so the harness
reports a clean skip. Memory queries fall back to workspace file
scanning via `environment_files.verify_memory_fallback`.
`hermes-agent` is an **optional** dependency (`clawbench[hermes]`). The
import is guarded so the base install stays lean; calling this adapter
without the dep installed raises a clear error rather than a cryptic
`ImportError`.
"""
from __future__ import annotations
import asyncio
import importlib.util
import json
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from clawbench.adapters import register_adapter
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
from clawbench.adapters.hermes_xml import parse_chat_messages, parse_conversation
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
StateQuery,
)
from clawbench.environment_files import verify_memory_fallback
from clawbench.render import render_template
from clawbench.schemas import MemoryState, PromptVariant
from clawbench.simulated_user import UserSimulator
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional dependency import — guarded so the base install stays lean.
# ---------------------------------------------------------------------------
def _load_mini_swe_runner() -> tuple[Any, Exception | None]:
try: # pragma: no cover - import-guard branch
from mini_swe_runner import MiniSWERunner as runner_cls # type: ignore[import-not-found]
return runner_cls, None
except Exception as exc: # pragma: no cover - import-guard branch
import_error = exc
candidates: list[Path] = []
explicit_file = os.environ.get("HERMES_MINI_SWE_RUNNER")
if explicit_file:
candidates.append(Path(explicit_file).expanduser())
for env_name in ("HERMES_AGENT_REPO", "HERMES_INSTALL_DIR"):
value = os.environ.get(env_name)
if value:
candidates.append(Path(value).expanduser() / "mini_swe_runner.py")
hermes_home = Path(os.environ.get("HERMES_HOME", "~/.hermes")).expanduser()
candidates.append(hermes_home / "hermes-agent" / "mini_swe_runner.py")
for path in candidates:
if not path.is_file():
continue
try:
repo_root = str(path.parent)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
spec = importlib.util.spec_from_file_location(
"_clawbench_hermes_mini_swe_runner",
path,
)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module.MiniSWERunner, None
except Exception as path_exc:
import_error = path_exc
continue
return None, import_error
MiniSWERunner, _HERMES_IMPORT_ERROR = _load_mini_swe_runner()
def _load_ai_agent() -> tuple[Any, Exception | None]:
try: # pragma: no cover - import-guard branch
from run_agent import AIAgent as agent_cls # type: ignore[import-not-found]
return agent_cls, None
except Exception as exc: # pragma: no cover - import-guard branch
import_error = exc
candidates: list[Path] = []
for env_name in ("HERMES_AGENT_REPO", "HERMES_INSTALL_DIR"):
value = os.environ.get(env_name)
if value:
candidates.append(Path(value).expanduser() / "run_agent.py")
hermes_home = Path(os.environ.get("HERMES_HOME", "~/.hermes")).expanduser()
candidates.append(hermes_home / "hermes-agent" / "run_agent.py")
for path in candidates:
if not path.is_file():
continue
try:
repo_root = str(path.parent)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
spec = importlib.util.spec_from_file_location(
"_clawbench_hermes_run_agent",
path,
)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module.AIAgent, None
except Exception as path_exc:
import_error = path_exc
continue
return None, import_error
AIAgent, _HERMES_AGENT_IMPORT_ERROR = _load_ai_agent()
class _CodexToolMessageCompatClient:
"""Client wrapper for Hermes's Codex Responses shim.
The current Hermes MiniSWERunner feeds OpenAI chat-style `role="tool"`
messages back into `chat.completions.create()`. Hermes's Codex
Responses adapter accepts chat-shaped calls but currently forwards
those tool messages to Responses as plain input items, where Codex
rejects the unsupported role. Rewriting tool results as user-visible
text preserves the important observation for the next turn and keeps
the runner moving.
"""
def __init__(self, inner: Any) -> None:
self._inner = inner
self.chat = _CodexToolMessageCompatChat(inner.chat)
self.api_key = getattr(inner, "api_key", None)
self.base_url = getattr(inner, "base_url", None)
def close(self) -> None:
close = getattr(self._inner, "close", None)
if callable(close):
close()
class _CodexToolMessageCompatChat:
def __init__(self, inner_chat: Any) -> None:
self.completions = _CodexToolMessageCompatCompletions(inner_chat.completions)
class _CodexToolMessageCompatCompletions:
def __init__(self, inner_completions: Any) -> None:
self._inner = inner_completions
def create(self, **kwargs: Any) -> Any:
messages = kwargs.get("messages")
if isinstance(messages, list):
kwargs = dict(kwargs)
kwargs["messages"] = [_rewrite_codex_tool_message(message) for message in messages]
return self._inner.create(**kwargs)
def _rewrite_codex_tool_message(message: Any) -> Any:
if not isinstance(message, dict) or message.get("role") != "tool":
return message
content = message.get("content", "")
if not isinstance(content, str):
content = str(content)
tool_call_id = message.get("tool_call_id") or message.get("name") or "tool"
return {
"role": "user",
"content": f"Tool result ({tool_call_id}):\n{content}",
}
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
@dataclass
class HermesAdapterConfig(AdapterConfig):
"""Config for the Hermes adapter.
Fields map onto `MiniSWERunner` kwargs; ClawBench passes the
canonical model string through verbatim so users pick Hermes-
supported models via the existing `--model` flag.
"""
env_type: str = "local"
max_iterations: int = 15
timeout_seconds: int = 60
base_url: str | None = None
api_key: str | None = None
provider: str | None = None
api_mode: str | None = None
prompt_variant: str = PromptVariant.CLEAR.value
driver_mode: str = "mini_swe"
enabled_toolsets: list[str] | None = None
disabled_toolsets: list[str] | None = None
hermes_home: str | None = None
tool_delay_seconds: float = 0.0
# Optional: an explicit `MiniSWERunner` factory. Used by tests to
# plug in a stub; production code leaves this None and the adapter
# instantiates the real runner lazily.
runner_factory: Any = None
agent_factory: Any = None
@register_adapter
class HermesAdapter(AgentAdapter):
"""Adapter for the Nous Research hermes-agent."""
name = "hermes"
capabilities = {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
@classmethod
def supported_capabilities(cls, config: AdapterConfig | None = None) -> set[AdapterCapability]:
if isinstance(config, HermesAdapterConfig) and config.driver_mode == "ai_agent":
return {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
AdapterCapability.MEMORY,
AdapterCapability.CRON,
AdapterCapability.BROWSER,
AdapterCapability.MULTI_TURN_INJECTION,
}
return set(cls.capabilities)
def __init__(self, config: HermesAdapterConfig | None = None) -> None:
super().__init__(config or HermesAdapterConfig())
self._config: HermesAdapterConfig = self.config # type: ignore[assignment]
# ------------------------------------------------------------------
# Lifecycle.
# ------------------------------------------------------------------
async def setup(self, ctx: AdapterContext) -> None:
"""Realize memory seed state as files and build the runner.
Hermes-in-`env_type=local` operates directly on the workspace
filesystem, so memory `SeedEntry` entries are written out as
`memory/<key>.md` files. Callers that want a different mapping
can pre-populate the workspace before invoking the adapter.
"""
for seed in ctx.task.assets.seed_state:
if seed.kind == "memory" and seed.key:
target = ctx.workspace / "memory" / f"{seed.key}.md"
target.parent.mkdir(parents=True, exist_ok=True)
content = seed.content or ""
if not isinstance(content, str):
content = str(content)
target.write_text(content, encoding="utf-8")
if self._config.driver_mode == "ai_agent":
agent = self._build_ai_agent(ctx)
ctx.adapter_state["agent"] = agent
ctx.adapter_state["conversation_history"] = []
ctx.adapter_state["hermes_home"] = self._hermes_home(ctx)
else:
runner = self._build_runner(ctx)
ctx.adapter_state["runner"] = runner
ctx.adapter_state.setdefault("api_calls", 0)
def _hermes_home(self, ctx: AdapterContext) -> Path:
configured = self._config.hermes_home
if configured:
return Path(configured).expanduser()
return ctx.workspace / ".hermes"
def _prepare_process_env(self, ctx: AdapterContext) -> None:
hermes_home = self._hermes_home(ctx)
hermes_home.mkdir(parents=True, exist_ok=True)
os.environ["HERMES_HOME"] = str(hermes_home)
os.environ["TERMINAL_CWD"] = str(ctx.workspace)
os.environ.setdefault("TERMINAL_ENV", "local")
cron_jobs = sys.modules.get("cron.jobs")
if cron_jobs is not None:
cron_dir = hermes_home / "cron"
setattr(cron_jobs, "HERMES_DIR", hermes_home)
setattr(cron_jobs, "CRON_DIR", cron_dir)
setattr(cron_jobs, "JOBS_FILE", cron_dir / "jobs.json")
setattr(cron_jobs, "OUTPUT_DIR", cron_dir / "output")
def _effective_model(self, ctx: AdapterContext) -> str:
"""Translate ClawBench provider-prefixed slugs for direct providers."""
model = ctx.model
if self._config.provider:
return model
base_url = self._config.base_url or ""
try:
host = urlparse(base_url).hostname or ""
except Exception:
host = ""
if host == "api.openai.com" and model.startswith("openai/"):
return model.split("/", 1)[1]
return model
def _runtime_provider_hint(self) -> str | None:
"""Return the provider identity Hermes should expose to its runtime.
Hermes distinguishes the transport used for the main model from the
auxiliary routing metadata it exposes to side tasks. Direct
OpenAI-compatible endpoints need to keep their explicit base URL and
API key, but should still identify as ``custom`` so Hermes auxiliary
calls resolve to the same primary model instead of falling through to
auto-detected providers such as OpenRouter.
"""
if self._config.provider:
return self._config.provider
if self._config.base_url:
return "custom"
return None
def _build_runner(self, ctx: AdapterContext) -> Any:
explicit_api_key = None if self._config.provider else self._config.api_key
explicit_base_url = None if self._config.provider else self._config.base_url
effective_model = self._effective_model(ctx)
ctx.adapter_state["effective_model"] = effective_model
if self._config.runner_factory is not None:
return self._config.runner_factory(
model=effective_model,
env_type=self._config.env_type,
cwd=str(ctx.workspace),
max_iterations=self._config.max_iterations,
command_timeout=self._config.timeout_seconds,
base_url=explicit_base_url,
api_key=explicit_api_key,
)
if MiniSWERunner is None: # pragma: no cover - import-guard branch
raise RuntimeError(
"HermesAdapter requires Hermes Agent's `mini_swe_runner.py`. "
"Install Hermes with the official installer, or set "
"`HERMES_AGENT_REPO=/path/to/hermes-agent` / "
"`HERMES_MINI_SWE_RUNNER=/path/to/mini_swe_runner.py`. "
f"Underlying import error: {_HERMES_IMPORT_ERROR!r}"
)
runner = MiniSWERunner(
model=effective_model,
env_type=self._config.env_type,
cwd=str(ctx.workspace),
max_iterations=self._config.max_iterations,
command_timeout=self._config.timeout_seconds,
base_url=explicit_base_url,
api_key=explicit_api_key,
)
if self._config.provider:
try:
from agent.auxiliary_client import resolve_provider_client
except Exception as exc: # pragma: no cover - optional Hermes internals
raise RuntimeError(
f"Hermes provider routing requested for '{self._config.provider}', "
"but Hermes provider utilities could not be imported."
) from exc
client, resolved_model = resolve_provider_client(
self._config.provider,
model=ctx.model,
)
if client is None or not resolved_model:
raise RuntimeError(
f"Hermes provider '{self._config.provider}' did not resolve credentials."
)
if self._config.provider == "openai-codex":
client = _CodexToolMessageCompatClient(client)
runner.client = client
runner.model = str(resolved_model)
return runner
def _build_ai_agent(self, ctx: AdapterContext) -> Any:
self._prepare_process_env(ctx)
explicit_api_key = None if self._config.provider else self._config.api_key
explicit_base_url = None if self._config.provider else self._config.base_url
enabled_toolsets = self._config.enabled_toolsets or ["hermes-api-server"]
effective_model = self._effective_model(ctx)
provider_hint = self._runtime_provider_hint()
ctx.adapter_state["effective_model"] = effective_model
if self._config.agent_factory is not None:
return self._config.agent_factory(
model=effective_model,
base_url=explicit_base_url,
api_key=explicit_api_key,
provider=provider_hint,
api_mode=self._config.api_mode,
max_iterations=self._config.max_iterations,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=self._config.disabled_toolsets,
)
if AIAgent is None: # pragma: no cover - import-guard branch
raise RuntimeError(
"HermesAdapter full mode requires Hermes Agent's `run_agent.py`. "
"Set `HERMES_AGENT_REPO=/path/to/hermes-agent` or install Hermes. "
f"Underlying import error: {_HERMES_AGENT_IMPORT_ERROR!r}"
)
return AIAgent(
base_url=explicit_base_url,
api_key=explicit_api_key,
provider=provider_hint,
api_mode=self._config.api_mode,
model=effective_model,
max_iterations=self._config.max_iterations,
tool_delay=self._config.tool_delay_seconds,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=self._config.disabled_toolsets,
quiet_mode=True,
verbose_logging=False,
skip_context_files=True,
session_id=f"clawbench-{ctx.task.id}-run{ctx.run_index}",
platform="cli",
)
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Render the phase's first user turn, invoke Hermes, parse output.
v1 limitation: only the first turn of each phase is delivered.
Tasks that declare `MULTI_TURN_INJECTION` as a required
capability are filtered out at harness level before the adapter
is invoked (harness gating lands in a later step). Guarding
here too keeps the adapter honest if it is driven directly.
"""
if self._config.driver_mode == "ai_agent":
return await self._run_ai_agent_phase(phase, ctx)
runner = ctx.adapter_state.get("runner")
if runner is None:
return PhaseResult(
error="HermesAdapter.run_phase called before setup(); no runner",
completed_normally=False,
)
if not phase.user.turns:
return PhaseResult(completed_normally=True)
# Hermes cannot receive dynamic follow-ups; we render and send
# only the first turn. Later turns remain in the canonical
# phase description but are intentionally dropped here.
first_turn = phase.user.turns[0]
message = first_turn.variant_messages.get(
self._config.prompt_variant, first_turn.message
)
prompt = render_template(message, ctx.runtime_values)
phase_timeout = float(
phase.timeout_seconds
or ctx.task.budgets.timeout_seconds
or self._config.timeout_seconds * self._config.max_iterations
)
try:
result: dict[str, Any] = await asyncio.wait_for(
asyncio.to_thread(runner.run_task, prompt),
timeout=phase_timeout,
)
except asyncio.TimeoutError:
return PhaseResult(
error=f"Hermes phase '{phase.name}' exceeded {phase_timeout:.0f}s",
completed_normally=False,
)
except Exception as exc: # pragma: no cover - runner-internal error
return PhaseResult(
error=f"HermesAdapter runner error: {exc}",
completed_normally=False,
)
phase_transcript = parse_conversation(result or {})
ctx.transcript.messages.extend(phase_transcript.messages)
api_calls = int(result.get("api_calls", 0)) if isinstance(result, dict) else 0
ctx.adapter_state["api_calls"] = (
int(ctx.adapter_state.get("api_calls", 0)) + api_calls
)
return PhaseResult(
messages=phase_transcript.messages,
adapter_metadata={
"api_calls": api_calls,
"hermes_metadata": result.get("metadata", {}) if isinstance(result, dict) else {},
},
completed_normally=bool(result.get("completed", False)) if isinstance(result, dict) else False,
)
async def _run_ai_agent_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
agent = ctx.adapter_state.get("agent")
if agent is None:
return PhaseResult(
error="HermesAdapter.run_phase called before setup(); no AIAgent",
completed_normally=False,
)
simulator = UserSimulator(
phase.user,
ctx.runtime_values,
prompt_variant=self._config.prompt_variant,
)
phase_timeout = float(
phase.timeout_seconds
or ctx.task.budgets.timeout_seconds
or self._config.timeout_seconds * self._config.max_iterations
)
appended_messages: list = []
phase_api_calls = 0
completed = True
while not simulator.is_done:
user_message = await simulator.next_message(ctx.transcript)
if user_message is None:
break
history = list(ctx.adapter_state.get("conversation_history") or [])
try:
result: dict[str, Any] = await asyncio.wait_for(
asyncio.to_thread(
agent.run_conversation,
user_message,
conversation_history=history or None,
task_id=f"{ctx.task.id}-run{ctx.run_index}",
),
timeout=phase_timeout,
)
except asyncio.TimeoutError:
return PhaseResult(
messages=appended_messages,
error=f"Hermes AIAgent phase '{phase.name}' exceeded {phase_timeout:.0f}s",
completed_normally=False,
)
except Exception as exc: # pragma: no cover - agent-internal error
return PhaseResult(
messages=appended_messages,
error=f"HermesAdapter AIAgent error: {exc}",
completed_normally=False,
)
messages = result.get("messages", []) if isinstance(result, dict) else []
if not isinstance(messages, list):
messages = []
delta = messages[len(history):] if len(messages) >= len(history) else messages
phase_transcript = parse_chat_messages(delta)
ctx.transcript.messages.extend(phase_transcript.messages)
appended_messages.extend(phase_transcript.messages)
ctx.adapter_state["conversation_history"] = messages
phase_api_calls += int(result.get("api_calls", 0)) if isinstance(result, dict) else 0
completed = completed and bool(result.get("completed", False))
ctx.adapter_state["api_calls"] = (
int(ctx.adapter_state.get("api_calls", 0)) + phase_api_calls
)
return PhaseResult(
messages=appended_messages,
adapter_metadata={
"api_calls": phase_api_calls,
"driver_mode": "ai_agent",
},
completed_normally=completed,
)
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
if query.kind == "memory":
fallback_state = MemoryState(
key_pattern=str(query.selector.get("key_pattern", "")),
exists=query.predicate != "absent",
value_contains=list(query.expected.get("value_contains", [])),
)
extra_memory_text = self._read_hermes_memory_text(ctx)
ok, detail = verify_memory_fallback(
fallback_state,
ctx.workspace,
transcript=ctx.transcript,
extra_memory_text=extra_memory_text,
)
return StateQueryResult(ok=ok, detail=detail)
if self._config.driver_mode == "ai_agent" and query.kind == "session":
expected_model = str(query.expected.get("model") or "")
if query.predicate == "absent":
return StateQueryResult(ok=False, detail="Hermes AIAgent session exists")
if expected_model and expected_model.lower() not in ctx.model.lower():
return StateQueryResult(
ok=False,
detail=f"Model mismatch: expected {expected_model}, got {ctx.model}",
)
return StateQueryResult(ok=True, detail="OK")
if self._config.driver_mode == "ai_agent" and query.kind == "cron":
return self._verify_cron_file(query, ctx)
# HermesAdapter does not currently expose session/cron/custom
# gateway state. Flag as capability-missing so the scorer can
# apply the neutral skip policy.
return StateQueryResult(
ok=False,
detail=(
f"HermesAdapter does not resolve '{query.kind}' state queries "
f"(missing capability {query.required_capability.value})"
),
capability_missing=True,
)
def _read_hermes_memory_text(self, ctx: AdapterContext) -> str:
hermes_home = Path(ctx.adapter_state.get("hermes_home") or self._hermes_home(ctx))
candidates = [
hermes_home / "memory",
hermes_home / "memories",
hermes_home / "user_memory",
]
chunks: list[str] = []
for candidate in candidates:
if candidate.is_file():
chunks.append(candidate.read_text(encoding="utf-8", errors="replace"))
elif candidate.is_dir():
for path in candidate.rglob("*"):
if path.is_file() and path.suffix.lower() in {".md", ".txt", ".json"}:
try:
chunks.append(path.read_text(encoding="utf-8", errors="replace"))
except Exception:
continue
return "\n".join(chunks)
def _verify_cron_file(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
hermes_home = Path(ctx.adapter_state.get("hermes_home") or self._hermes_home(ctx))
jobs_file = hermes_home / "cron" / "jobs.json"
if not jobs_file.is_file():
if query.predicate == "absent":
return StateQueryResult(ok=True, detail="Correctly absent")
return StateQueryResult(ok=False, detail=f"No Hermes cron jobs file at {jobs_file}")
try:
payload = json.loads(jobs_file.read_text(encoding="utf-8"))
except Exception as exc:
return StateQueryResult(ok=False, detail=f"Could not read Hermes cron jobs: {exc}")
jobs = payload if isinstance(payload, list) else payload.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
if query.predicate == "absent":
return StateQueryResult(
ok=not jobs,
detail="Correctly absent" if not jobs else "Cron jobs exist",
)
description_contains = query.selector.get("description_contains")
if not jobs:
return StateQueryResult(ok=False, detail="No cron jobs found")
if description_contains:
needle = str(description_contains).lower()
if not any(needle in json.dumps(job, sort_keys=True).lower() for job in jobs):
return StateQueryResult(
ok=False,
detail=f"No cron job matched '{description_contains}'",
)
return StateQueryResult(ok=True, detail="OK")
async def teardown(self, ctx: AdapterContext) -> None:
"""Release the runner reference so GC can reclaim its process pool."""
ctx.adapter_state.pop("runner", None)
ctx.adapter_state.pop("agent", None)
__all__ = ["HermesAdapter", "HermesAdapterConfig"]

View File

@ -0,0 +1,494 @@
"""Hermes agent conversation → ClawBench `Transcript` converter.
Hermes's `MiniSWERunner.run_task()` returns a dict shaped like:
```json
{
"conversations": [
{"from": "system", "value": "..."},
{"from": "user", "value": "..."},
{"from": "assistant", "value": "I'll look at the file.\\n<tool_call>{\\"name\\":\\"bash\\",\\"arguments\\":{\\"cmd\\":\\"ls\\"}}</tool_call>"},
{"from": "tool", "value": "<tool_response>{\\"stdout\\":\\"file.py\\"}</tool_response>"},
{"from": "assistant", "value": "<tool_call>...</tool_call>"},
...
],
"completed": true,
"api_calls": 7,
"metadata": {...}
}
```
This module parses that into a canonical `Transcript` with
`TranscriptMessage` + `ToolCall` entries so the scorer / trajectory /
judge layers can score the run without any Hermes-specific knowledge.
The XML parsing is deliberately tolerant: Hermes transcripts observed
in the wild sometimes have malformed JSON inside `<tool_call>` tags
(trailing commas, unescaped newlines). We fall back to a permissive
regex extraction in that case so a single bad tool call doesn't tank
the whole transcript.
"""
from __future__ import annotations
import json
import re
from typing import Any, Iterable
from clawbench.schemas import ToolCall, Transcript, TranscriptMessage
#: One `<tool_call>…</tool_call>` block. Non-greedy across newlines.
_TOOL_CALL_RE = re.compile(
r"<tool_call>\s*(?P<body>.*?)\s*</tool_call>", re.DOTALL
)
#: One `<tool_response>…</tool_response>` block.
_TOOL_RESPONSE_RE = re.compile(
r"<tool_response>\s*(?P<body>.*?)\s*</tool_response>", re.DOTALL
)
def _coerce_role(raw: str) -> str:
"""Normalize Hermes role labels to ClawBench `TranscriptMessage.role`.
ClawBench uses `"user"`, `"assistant"`, `"system"`, `"tool"`. Hermes
can emit `"human"`/`"gpt"`/`"function"` variants; we map them all
down to the canonical vocabulary.
"""
value = (raw or "").strip().lower()
if value in {"assistant", "gpt", "model"}:
return "assistant"
if value in {"user", "human"}:
return "user"
if value in {"tool", "function", "tool_response"}:
return "tool"
if value == "system":
return "system"
return value or "assistant"
def _extract_json_objects(text: str) -> list[dict[str, Any]]:
"""Parse 0-or-more top-level JSON objects from free-form text.
Hermes usually puts a single JSON object inside each `<tool_call>`,
but we handle multi-object payloads defensively. Returns an empty
list if no valid JSON is present.
"""
text = text.strip()
if not text:
return []
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return [parsed]
if isinstance(parsed, list):
return [item for item in parsed if isinstance(item, dict)]
except json.JSONDecodeError:
pass
# Fallback: scan for balanced `{...}` blocks. Useful when the
# assistant wrote slightly malformed JSON. We accept a best-effort
# parse and silently discard the rest.
results: list[dict[str, Any]] = []
depth = 0
start: int | None = None
for i, ch in enumerate(text):
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
depth -= 1
if depth == 0 and start is not None:
candidate = text[start : i + 1]
try:
obj = json.loads(candidate)
if isinstance(obj, dict):
results.append(obj)
except json.JSONDecodeError:
pass
start = None
return results
def _tool_call_from_payload(
payload: dict[str, Any],
*,
index: int,
timestamp_ms: int,
) -> ToolCall:
"""Build a canonical `ToolCall` from a Hermes `<tool_call>` payload.
Hermes emits `{"name": "...", "arguments": {...}}` inside each
tool_call tag. Some Nous-trained models emit slight variants
`"function"` for the tool name, `"parameters"` or `"input"` for
the args. We accept any of those.
"""
name = (
payload.get("name")
or payload.get("function")
or payload.get("tool")
or ""
)
arguments = (
payload.get("arguments")
or payload.get("parameters")
or payload.get("args")
or payload.get("input")
or {}
)
if isinstance(arguments, str):
# Occasionally Hermes passes a JSON-encoded string of args.
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"raw": arguments}
if not isinstance(arguments, dict):
arguments = {"value": arguments}
call_id = str(payload.get("id") or payload.get("call_id") or f"hermes-{index}")
return ToolCall(
id=call_id,
name=str(name),
input=arguments,
timestamp_ms=timestamp_ms,
)
def _tool_response_summary(payload: dict[str, Any]) -> tuple[str, str, bool | None]:
"""Extract (output, error, success) from a `<tool_response>` payload."""
output = ""
error = ""
success: bool | None = None
stdout = payload.get("stdout")
stderr = payload.get("stderr")
result = payload.get("result")
err = payload.get("error")
msg = payload.get("message")
status = payload.get("status")
if isinstance(stdout, str):
output = stdout
elif isinstance(result, (str, dict, list)):
output = result if isinstance(result, str) else json.dumps(result)
elif isinstance(msg, str):
output = msg
if isinstance(stderr, str) and stderr.strip():
error = stderr
elif isinstance(err, (str, dict, list)):
error = err if isinstance(err, str) else json.dumps(err)
if isinstance(status, str):
lowered = status.lower()
if lowered in {"ok", "success", "succeeded"}:
success = True
elif lowered in {"error", "failed", "failure"}:
success = False
if error and success is None:
success = False
if not error and output and success is None:
success = True
return output, error, success
def _split_tagged(text: str, tag_re: re.Pattern[str]) -> list[tuple[str, str]]:
"""Split `text` into `(kind, body)` tuples where `kind` is `"text"` or
`"tag"`. Preserves ordering so we can thread tool calls/responses
back into the canonical transcript in the order they appeared.
"""
pieces: list[tuple[str, str]] = []
cursor = 0
for match in tag_re.finditer(text):
if match.start() > cursor:
pieces.append(("text", text[cursor : match.start()]))
pieces.append(("tag", match.group("body")))
cursor = match.end()
if cursor < len(text):
pieces.append(("text", text[cursor:]))
return pieces
def parse_conversation(result: dict[str, Any]) -> Transcript:
"""Parse a `MiniSWERunner.run_task` result dict into a `Transcript`.
The conversation is processed in order; tool calls are emitted into
the assistant message that contained them, and tool responses are
paired with the most recent unpaired call. The final Transcript is
ready for `annotate_transcript_tool_calls` scorer.
"""
transcript = Transcript()
conversations = result.get("conversations") or []
pending_calls: list[ToolCall] = []
call_counter = 0
for turn_index, entry in enumerate(conversations):
if not isinstance(entry, dict):
continue
role = _coerce_role(str(entry.get("from", "")))
value = str(entry.get("value", "") or "")
# Tool responses arrive from the tool/function role.
if role == "tool":
for response_body in _TOOL_RESPONSE_RE.findall(value):
payloads = _extract_json_objects(response_body)
if not payloads:
payloads = [{"result": response_body}]
for payload in payloads:
output, error, success = _tool_response_summary(payload)
if pending_calls:
target = pending_calls.pop(0)
target.output = output
target.error = error
if success is not None:
target.success = success
else:
# Orphan tool response — surface it as a tool
# message so nothing is silently dropped.
transcript.messages.append(
TranscriptMessage(
role="tool",
tool_result_content=output or error,
)
)
continue
# Everything else (assistant / user / system) may carry tool
# calls plus free-form text. We interleave them faithfully.
pieces = _split_tagged(value, _TOOL_CALL_RE)
text_chunks: list[str] = []
tool_calls: list[ToolCall] = []
for kind, body in pieces:
if kind == "text":
text_chunks.append(body)
else:
payloads = _extract_json_objects(body)
for payload in payloads:
call_counter += 1
tool_call = _tool_call_from_payload(
payload,
index=call_counter,
timestamp_ms=turn_index,
)
tool_calls.append(tool_call)
pending_calls.append(tool_call)
joined_text = "\n".join(chunk for chunk in text_chunks if chunk.strip()).strip()
if role == "assistant":
transcript.messages.append(
TranscriptMessage(
role="assistant",
text=joined_text,
tool_calls=tool_calls,
timestamp_ms=turn_index,
)
)
elif role == "user":
transcript.messages.append(
TranscriptMessage(
role="user",
text=joined_text,
timestamp_ms=turn_index,
)
)
elif role == "system":
if joined_text:
transcript.messages.append(
TranscriptMessage(
role="system",
text=joined_text,
timestamp_ms=turn_index,
)
)
else:
if joined_text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=joined_text,
timestamp_ms=turn_index,
)
)
return transcript
def _content_to_text(content: Any) -> str:
"""Normalize OpenAI/Anthropic-style message content to plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
if isinstance(part.get("text"), str):
parts.append(part["text"])
elif isinstance(part.get("content"), str):
parts.append(part["content"])
return "\n".join(parts)
if isinstance(content, dict):
if isinstance(content.get("text"), str):
return content["text"]
if isinstance(content.get("content"), str):
return content["content"]
return str(content)
def _tool_call_from_chat_payload(
payload: dict[str, Any],
*,
index: int,
timestamp_ms: int,
) -> ToolCall:
"""Build a canonical tool call from chat-completions message payloads."""
function = payload.get("function")
if not isinstance(function, dict):
function = {}
name = (
function.get("name")
or payload.get("name")
or payload.get("tool")
or payload.get("type")
or ""
)
arguments = (
function.get("arguments")
or payload.get("arguments")
or payload.get("args")
or payload.get("input")
or {}
)
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"raw": arguments}
if not isinstance(arguments, dict):
arguments = {"value": arguments}
return ToolCall(
id=str(payload.get("id") or payload.get("call_id") or f"hermes-chat-{index}"),
name=str(name),
input=arguments,
timestamp_ms=timestamp_ms,
)
def parse_chat_messages(messages: Iterable[dict[str, Any]]) -> Transcript:
"""Parse Hermes AIAgent/OpenAI-style message history to a Transcript.
`AIAgent.run_conversation()` returns a `messages` list with user,
assistant, and tool-role entries. This parser preserves ordering and
attaches tool-role output back to the assistant `ToolCall` it belongs to.
"""
transcript = Transcript()
pending_by_id: dict[str, ToolCall] = {}
pending_order: list[ToolCall] = []
call_counter = 0
for turn_index, entry in enumerate(messages):
if not isinstance(entry, dict):
continue
role = _coerce_role(str(entry.get("role") or entry.get("from") or ""))
text = _content_to_text(entry.get("content", entry.get("value", "")))
if role == "tool":
tool_call_id = str(entry.get("tool_call_id") or entry.get("id") or "")
target = pending_by_id.get(tool_call_id) if tool_call_id else None
if target is None and pending_order:
target = pending_order.pop(0)
if target is not None:
target.output = text
target.success = not _looks_like_error(text)
if not target.success:
target.error = text
elif text:
transcript.messages.append(
TranscriptMessage(
role="tool",
tool_result_for=tool_call_id or None,
tool_result_content=text,
timestamp_ms=turn_index,
)
)
continue
tool_calls: list[ToolCall] = []
raw_calls = entry.get("tool_calls") or []
if isinstance(raw_calls, list):
for payload in raw_calls:
if not isinstance(payload, dict):
continue
call_counter += 1
call = _tool_call_from_chat_payload(
payload,
index=call_counter,
timestamp_ms=turn_index,
)
tool_calls.append(call)
pending_by_id[call.id] = call
pending_order.append(call)
if role == "assistant":
transcript.messages.append(
TranscriptMessage(
role="assistant",
text=text,
tool_calls=tool_calls,
timestamp_ms=turn_index,
)
)
elif role in {"user", "system"}:
if text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=text,
timestamp_ms=turn_index,
)
)
elif text:
transcript.messages.append(
TranscriptMessage(
role=role,
text=text,
timestamp_ms=turn_index,
)
)
return transcript
def _looks_like_error(text: str) -> bool:
lowered = text.lower()
return any(token in lowered for token in ("error", "traceback", "failed", "exception"))
def iter_tool_calls_from_conversations(conversations: Iterable[dict[str, Any]]) -> list[ToolCall]:
"""Helper used by tests: pull out just the tool-call sequence.
Equivalent to `parse_conversation({"conversations": list(conv)}).tool_call_sequence`
but skips the assistant-text assembly. Useful for asserting on call
order and arguments without noise.
"""
return parse_conversation({"conversations": list(conversations)}).tool_call_sequence
__all__ = [
"iter_tool_calls_from_conversations",
"parse_chat_messages",
"parse_conversation",
]

View File

@ -0,0 +1,467 @@
"""OpenClaw adapter — drives tasks through an OpenClaw gateway.
This is the adapter-shaped wrapper around the agent execution flow that
has lived inside `BenchmarkHarness._run_single` until now. It holds a
`GatewayClient` open for the run's duration, creates one agent per run
and one session per phase (matching the existing behavior), delivers
simulated-user turns, and resolves `StateQuery` assertions against the
gateway's `memory.search` / `sessions.resolve` / `cron.list` / arbitrary
`_rpc(method)` surface.
The legacy harness still owns the executable CLI path for now; this
adapter is the canonical wrapper used by adapter-level tests and later
harness wiring.
"""
from __future__ import annotations
import json
import logging
import uuid
from dataclasses import dataclass
from clawbench.adapters import register_adapter
from clawbench.adapters.base import (
AdapterConfig,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
)
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
StateQuery,
)
from clawbench.client import GatewayClient, GatewayConfig
from clawbench.environment_files import (
resolve_json_path,
verify_memory_fallback,
)
from clawbench.schemas import (
MemoryState,
PromptVariant,
)
from clawbench.session_labels import unique_session_label
from clawbench.simulated_user import UserSimulator
logger = logging.getLogger(__name__)
@dataclass
class OpenClawAdapterConfig(AdapterConfig):
"""Config for the OpenClaw adapter.
`gateway` holds the connection parameters the adapter uses to reach
the OpenClaw gateway. `prompt_variant` controls which wording of
each simulated-user turn is rendered.
"""
gateway: GatewayConfig | None = None
prompt_variant: str = PromptVariant.CLEAR.value
# Default per-turn timeout passed to `send_and_wait` when the
# phase does not override it. Matches the existing harness default.
turn_timeout_seconds: float = 180.0
@register_adapter
class OpenClawAdapter(AgentAdapter):
"""Adapter for the OpenClaw gateway (default harness path)."""
name = "openclaw"
capabilities = {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
AdapterCapability.MEMORY,
AdapterCapability.SESSION,
AdapterCapability.CRON,
AdapterCapability.BROWSER,
AdapterCapability.GATEWAY_RPC,
AdapterCapability.MULTI_TURN_INJECTION,
}
def __init__(self, config: OpenClawAdapterConfig | None = None) -> None:
super().__init__(config or OpenClawAdapterConfig())
self._config: OpenClawAdapterConfig = self.config # type: ignore[assignment]
self._gateway_config: GatewayConfig = self._config.gateway or GatewayConfig()
self._client: GatewayClient | None = None
# Dependency injection hook for tests: monkeypatch this to swap
# in a stub gateway without touching the class definition.
self._client_factory = lambda: GatewayClient(self._gateway_config)
# ------------------------------------------------------------------
# Long-lived gateway connection.
# ------------------------------------------------------------------
async def __aenter__(self) -> "OpenClawAdapter":
client = self._client_factory()
await client.__aenter__()
self._client = client
return self
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> None:
if self._client is not None:
try:
await self._client.__aexit__(exc_type, exc, tb)
finally:
self._client = None
@property
def client(self) -> GatewayClient:
if self._client is None:
raise RuntimeError(
"OpenClawAdapter must be used as an async context manager "
"before calling setup/run_phase/teardown."
)
return self._client
# ------------------------------------------------------------------
# Lifecycle.
# ------------------------------------------------------------------
async def setup(self, ctx: AdapterContext) -> None:
"""Create the per-run agent and run pre-run state queries."""
self._realize_memory_seeds(ctx)
agent_name = (
f"clawbench-{ctx.task.id}-run-{ctx.run_index}-{uuid.uuid4().hex[:6]}"
)
agent_id = await self.client.create_agent(
name=agent_name, workspace=str(ctx.workspace)
)
ctx.adapter_state["agent_id"] = agent_id
ctx.adapter_state.setdefault("session_keys", [])
# Pre-run gateway assertions (ex-`setup.pre_check_gateway`) —
# evaluated immediately, failures are surfaced via the returned
# state via `ctx.adapter_state["pre_run_failures"]` so the
# harness can fail fast before doing any phase work.
failures: list[str] = []
for query in ctx.task.verifier.pre_run_queries:
result = await self.verify_state_query(query, ctx)
if not result.ok:
failures.append(result.detail or query.description)
if failures:
ctx.adapter_state["pre_run_failures"] = failures
def _realize_memory_seeds(self, ctx: AdapterContext) -> None:
"""Expose canonical memory seeds through the run workspace.
OpenClaw's native memory backend has no public seed/write RPC in the
benchmark client, but agents can read files in their workspace and the
verifier already falls back to these same memory files. This keeps
seeded-memory tasks fair across OpenClaw and filesystem-first harnesses.
"""
chunks: list[str] = []
for seed in ctx.task.assets.seed_state:
if seed.kind != "memory" or not seed.key:
continue
content = seed.content or ""
if not isinstance(content, str):
content = str(content)
safe_key = "".join(
ch if ch.isalnum() or ch in ("-", "_") else "_"
for ch in seed.key.strip()
).strip("_")
if not safe_key:
safe_key = "seed"
body = f"# {seed.key}\n\n{content.strip()}\n"
target = ctx.workspace / "memory" / f"{safe_key}.md"
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(body, encoding="utf-8")
chunks.append(body)
if chunks:
(ctx.workspace / "MEMORY.md").write_text("\n".join(chunks), encoding="utf-8")
async def run_phase(
self,
phase: CanonicalPhase,
ctx: AdapterContext,
) -> PhaseResult:
"""Create a session, drive the simulator, append to the transcript."""
agent_id = ctx.adapter_state.get("agent_id")
if not agent_id:
return PhaseResult(
error="OpenClawAdapter.run_phase called before setup(); no agent_id",
completed_normally=False,
)
session_keys: list[str] = ctx.adapter_state.setdefault("session_keys", [])
session_key = await self.client.create_session(
model=ctx.model,
agent_id=agent_id,
label=unique_session_label(
f"clawbench-{ctx.task.id}-run{ctx.run_index}-phase{phase.name}"
),
)
session_keys.append(session_key)
ctx.adapter_state["last_session_key"] = session_key
await self.client.subscribe(session_key)
# Browser tasks require the browser tool to actually be
# registered in the effective tool set for this session. If it
# isn't, fail the phase fast rather than letting the agent
# flounder against a missing tool.
if ctx.task.family.value == "browser":
try:
await self._assert_browser_support(session_key)
except Exception as exc:
return PhaseResult(
error=str(exc),
completed_normally=False,
)
simulator = UserSimulator(
phase.user,
ctx.runtime_values,
prompt_variant=self._config.prompt_variant,
)
turn_timeout = float(phase.timeout_seconds or ctx.task.budgets.timeout_seconds)
turn_timeout = min(turn_timeout, self._config.turn_timeout_seconds)
appended: list = []
turns_sent = 0
while not simulator.is_done:
user_message = await simulator.next_message(ctx.transcript)
if user_message is None:
break
phase_transcript = await self.client.send_and_wait(
session_key,
user_message,
timeout=turn_timeout,
)
ctx.transcript.messages.extend(phase_transcript.messages)
appended.extend(phase_transcript.messages)
turns_sent += 1
return PhaseResult(
messages=appended,
adapter_metadata={
"session_key": session_key,
"turns_sent": turns_sent,
},
)
async def _assert_browser_support(self, session_key: str) -> None:
inventory = await self.client.get_effective_tools(session_key)
tool_ids = {
str(tool.get("id", ""))
for group in inventory.get("groups", [])
for tool in group.get("tools", [])
}
if "browser" not in tool_ids:
raise RuntimeError(
"Browser tasks require the browser tool, but it is not available in this gateway."
)
async def teardown(self, ctx: AdapterContext) -> None:
"""Delete per-phase sessions and the per-run agent."""
client = self._client
if client is None:
return
session_keys: list[str] = ctx.adapter_state.get("session_keys", [])
agent_id: str | None = ctx.adapter_state.get("agent_id")
for session_key in session_keys:
try:
await client.delete_session(session_key)
except Exception as exc: # pragma: no cover - best effort
logger.warning("delete_session failed for %s: %s", session_key, exc)
if agent_id:
try:
await client.delete_agent(agent_id, delete_files=False)
except Exception as exc: # pragma: no cover - best effort
logger.warning("delete_agent failed for %s: %s", agent_id, exc)
# ------------------------------------------------------------------
# State query resolution.
# ------------------------------------------------------------------
async def verify_state_query(
self,
query: StateQuery,
ctx: AdapterContext,
) -> StateQueryResult:
try:
if query.kind == "memory":
return await self._verify_memory(query, ctx)
if query.kind == "session":
return await self._verify_session(query, ctx)
if query.kind == "cron":
return await self._verify_cron(query, ctx)
if query.kind == "custom":
return await self._verify_gateway(query, ctx)
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
return StateQueryResult(
ok=False,
detail=f"OpenClawAdapter has no handler for query kind '{query.kind}'",
capability_missing=True,
)
# --- memory ---
async def _verify_memory(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
key_pattern = str(query.selector.get("key_pattern", ""))
value_contains = list(query.expected.get("value_contains", []))
session_key = ctx.adapter_state.get("last_session_key", "")
agent_id = ctx.adapter_state.get("agent_id")
# Primary path: memory.search RPC.
try:
response = await self.client._rpc(
"memory.search",
{
"query": key_pattern,
"sessionKey": session_key,
"limit": 20,
},
)
entries = response.get("payload", {}).get("entries", [])
if query.predicate == "absent":
ok = not entries
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Memory entry exists",
)
if not entries:
return StateQueryResult(ok=False, detail="No matching memory entries found")
all_values = " ".join(str(entry.get("value", "")) for entry in entries)
for token in value_contains:
if token.lower() not in all_values.lower():
return StateQueryResult(
ok=False, detail=f"Memory value missing '{token}'"
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
logger.info(
"memory.search unavailable for verification, falling back: %s",
exc,
)
# Fallback: gateway-sourced memory files + workspace scan + transcript.
fallback_state = MemoryState(
key_pattern=key_pattern,
exists=query.predicate != "absent",
value_contains=value_contains,
)
extra_memory_text = ""
if agent_id:
try:
from clawbench.environment import _read_agent_memory_text # local import to avoid cycle
extra_memory_text = await _read_agent_memory_text(self.client, agent_id)
except Exception:
extra_memory_text = ""
ok, detail = verify_memory_fallback(
fallback_state,
ctx.workspace,
transcript=ctx.transcript,
extra_memory_text=extra_memory_text,
)
return StateQueryResult(ok=ok, detail=detail)
# --- session ---
async def _verify_session(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
session_key = ctx.adapter_state.get("last_session_key", "")
expected_model = query.expected.get("model") or ""
try:
response = await self.client._rpc("sessions.resolve", {"key": session_key})
payload = response.get("payload", {})
if query.predicate == "absent":
return StateQueryResult(ok=False, detail="Session exists but should not")
if expected_model:
actual = str(payload.get("model", ""))
if str(expected_model).lower() not in actual.lower():
return StateQueryResult(
ok=False,
detail=f"Model mismatch: expected {expected_model}, got {actual}",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
if query.predicate == "absent":
return StateQueryResult(ok=True, detail="Correctly absent")
return StateQueryResult(ok=False, detail=str(exc))
# --- cron ---
async def _verify_cron(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
description_contains = query.selector.get("description_contains")
try:
response = await self.client._rpc("cron.list", {})
jobs = response.get("payload", {}).get("jobs", [])
if query.predicate == "absent":
ok = not jobs
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Cron jobs exist",
)
if not jobs:
return StateQueryResult(ok=False, detail="No cron jobs found")
if description_contains and not any(
str(description_contains).lower() in json.dumps(job).lower() for job in jobs
):
return StateQueryResult(
ok=False,
detail=f"No cron job matched '{description_contains}'",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
# --- arbitrary gateway RPC ---
async def _verify_gateway(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
method = str(query.selector.get("method", ""))
params = dict(query.selector.get("params", {}))
assert_path = str(query.selector.get("assert_path", "$"))
expected_equals = query.expected.get("equals")
expected_contains = query.expected.get("contains")
expected_exists = bool(query.expected.get("exists", True))
try:
response = await self.client._rpc(method, params)
payload = response.get("payload", {})
value = resolve_json_path(payload, assert_path)
if not expected_exists:
ok = value is None
return StateQueryResult(
ok=ok,
detail="Correctly absent" if ok else "Path exists",
)
if value is None:
return StateQueryResult(
ok=False, detail=f"Path {assert_path} not found"
)
if expected_equals is not None and value != expected_equals:
return StateQueryResult(
ok=False, detail=f"Expected {expected_equals}, got {value}"
)
if (
expected_contains is not None
and str(expected_contains).lower() not in str(value).lower()
):
return StateQueryResult(
ok=False,
detail=f"Expected '{expected_contains}' in {value}",
)
return StateQueryResult(ok=True, detail="OK")
except Exception as exc:
return StateQueryResult(ok=False, detail=str(exc))
__all__ = ["OpenClawAdapter", "OpenClawAdapterConfig"]

View File

@ -0,0 +1,45 @@
"""Canonical task schema — agent-agnostic intent layer.
Part of ClawBench Phase-4 per CLAWBENCH_V0_4_SPEC.md §"Canonical Task Schema".
Splits canonical task intent (what to set up, prompt with, and verify) from
OpenClaw-specific execution details (which become adapter responsibilities).
The existing `TaskDefinition` in `clawbench/schemas.py` stays as-is for
back-compat; this package adds a canonical view produced by
`convert.from_task_definition`, which is the single bridge between the two
shapes. Everything downstream of the harness (scorer, trajectory, judge,
stats) is already agent-agnostic those modules consume the transcript +
TaskRunResult and do not need changes.
"""
from clawbench.canonical.schema import (
AdapterCapability,
BudgetSpec,
CanonicalAssets,
CanonicalPhase,
CanonicalTask,
Deliverable,
InteractionPolicy,
SeedEntry,
StateQuery,
StateQueryKind,
StateQueryPredicate,
VerifierContract,
)
from clawbench.canonical.convert import from_task_definition
__all__ = [
"AdapterCapability",
"BudgetSpec",
"CanonicalAssets",
"CanonicalPhase",
"CanonicalTask",
"Deliverable",
"InteractionPolicy",
"SeedEntry",
"StateQuery",
"StateQueryKind",
"StateQueryPredicate",
"VerifierContract",
"from_task_definition",
]

View File

@ -0,0 +1,328 @@
"""Convert `TaskDefinition` → `CanonicalTask`.
This is the single bridge between the existing OpenClaw-entangled task
format (`clawbench.schemas.TaskDefinition`) and the agent-agnostic
canonical form (`CanonicalTask`). Callers load tasks as usual via
`clawbench.tasks.load_all_tasks` and then call
`from_task_definition(task)` to get the canonical view.
Field mappings (any field not mentioned is copied verbatim):
- `setup.asset_packs` `assets.seed_state` (kind="file", asset_pack=...)
- `setup.workspace_files` `assets.workspace_files`
- `setup.background_services` `assets.background_services`
- `setup.memory_seed` `assets.seed_state` (kind="memory")
- `setup.pre_check_gateway` `verifier.pre_run_queries` (GATEWAY_RPC)
- `completion.files` `verifier.file_states`
- `completion.execution_checks` `verifier.execution_checks`
- `completion.memory` `verifier.state_queries` (MEMORY)
- `completion.session` `verifier.state_queries` (SESSION)
- `completion.cron` `verifier.state_queries` (CRON)
- `completion.gateway_assertions` `verifier.state_queries` (GATEWAY_RPC)
- `trajectory` `verifier.trajectory`
- `behavior` `verifier.behavior`
- `judge` `verifier.judge`
- `user` / `phases` `phases` via `task.normalized_phases()`
- `timeout_seconds` `budgets.timeout_seconds` (also on each phase)
`required_adapter_capabilities` is computed from what the task actually
needs: always `{FILES, EXECUTION}`, plus `MEMORY`/`SESSION`/`CRON`/
`GATEWAY_RPC`/`BROWSER`/`MULTI_TURN_INJECTION` when the source task's
fields trigger those capabilities.
"""
from __future__ import annotations
from clawbench.canonical.schema import (
AdapterCapability,
BudgetSpec,
CanonicalAssets,
CanonicalPhase,
CanonicalTask,
InteractionPolicy,
SeedEntry,
StateQuery,
VerifierContract,
)
from clawbench.schemas import (
CronState,
GatewayAssertion,
MemoryState,
SessionState,
TaskDefinition,
TaskFamily,
UserTurn,
)
# ---------------------------------------------------------------------------
# Seed state
# ---------------------------------------------------------------------------
def _seeds_from_setup(task: TaskDefinition) -> list[SeedEntry]:
seeds: list[SeedEntry] = []
for pack in task.setup.asset_packs:
seeds.append(SeedEntry(kind="file", asset_pack=pack))
for entry in task.setup.memory_seed:
# memory_seed entries are free-form dicts in the existing schema;
# we preserve them verbatim in `metadata` and surface `key` +
# `content` when present so adapters can consume the structured
# pieces without re-parsing.
seeds.append(
SeedEntry(
kind="memory",
key=str(entry.get("key", "")),
content=entry.get("value") or entry.get("content"),
metadata=dict(entry),
)
)
return seeds
# ---------------------------------------------------------------------------
# State queries: memory / session / cron / gateway_assertions
# ---------------------------------------------------------------------------
def _memory_state_to_query(state: MemoryState) -> StateQuery:
expected: dict[str, object] = {}
if state.value_contains:
expected["value_contains"] = list(state.value_contains)
return StateQuery(
kind="memory",
predicate="exists" if state.exists else "absent",
selector={"key_pattern": state.key_pattern},
expected=expected,
required_capability=AdapterCapability.MEMORY,
description=f"memory key ~ /{state.key_pattern}/",
)
def _session_state_to_query(state: SessionState) -> StateQuery:
expected: dict[str, object] = {}
if state.model_should_be:
expected["model"] = state.model_should_be
return StateQuery(
kind="session",
predicate="exists" if state.should_exist else "absent",
selector={},
expected=expected,
required_capability=AdapterCapability.SESSION,
description="session state",
)
def _cron_state_to_query(state: CronState) -> StateQuery:
selector: dict[str, object] = {}
if state.description_contains:
selector["description_contains"] = state.description_contains
return StateQuery(
kind="cron",
predicate="exists" if state.exists else "absent",
selector=selector,
expected={},
required_capability=AdapterCapability.CRON,
description="cron schedule",
)
def _gateway_assertion_to_query(assertion: GatewayAssertion) -> StateQuery:
selector: dict[str, object] = {
"method": assertion.method,
"params": dict(assertion.params),
"assert_path": assertion.assert_path,
}
expected: dict[str, object] = {}
if assertion.assert_equals is not None:
expected["equals"] = assertion.assert_equals
if assertion.assert_contains is not None:
expected["contains"] = assertion.assert_contains
expected["exists"] = assertion.assert_exists
predicate = "exists"
if assertion.assert_equals is not None:
predicate = "equals"
elif assertion.assert_contains is not None:
predicate = "contains"
elif not assertion.assert_exists:
predicate = "absent"
return StateQuery(
kind="custom",
predicate=predicate,
selector=selector,
expected=expected,
required_capability=AdapterCapability.GATEWAY_RPC,
description=f"gateway rpc: {assertion.method}",
)
def _state_queries_from_completion(task: TaskDefinition) -> list[StateQuery]:
queries: list[StateQuery] = []
for mem in task.completion.memory:
queries.append(_memory_state_to_query(mem))
if task.completion.session is not None:
queries.append(_session_state_to_query(task.completion.session))
for cron in task.completion.cron:
queries.append(_cron_state_to_query(cron))
for assertion in task.completion.gateway_assertions:
queries.append(_gateway_assertion_to_query(assertion))
return queries
def _pre_run_queries_from_setup(task: TaskDefinition) -> list[StateQuery]:
return [_gateway_assertion_to_query(a) for a in task.setup.pre_check_gateway]
# ---------------------------------------------------------------------------
# Phases + dynamic-turn detection
# ---------------------------------------------------------------------------
_DYNAMIC_TURN_FIELDS = (
"when_tool_family",
"when_tool_name",
"when_assistant_contains",
"when_last_tool_failed",
)
def _turn_is_dynamic(turn: UserTurn) -> bool:
if turn.when_last_tool_failed:
return True
for name in _DYNAMIC_TURN_FIELDS:
value = getattr(turn, name, None)
if isinstance(value, bool):
if value:
return True
elif value:
return True
return False
def _phases_from_task(task: TaskDefinition) -> tuple[list[CanonicalPhase], bool]:
phases: list[CanonicalPhase] = []
any_dynamic = False
for phase in task.normalized_phases():
phases.append(
CanonicalPhase(
name=phase.name,
user=phase.user,
timeout_seconds=phase.timeout_seconds,
)
)
if len(phase.user.turns) > 1 or any(_turn_is_dynamic(t) for t in phase.user.turns):
any_dynamic = True
return phases, any_dynamic
# ---------------------------------------------------------------------------
# Capability inference
# ---------------------------------------------------------------------------
def _capabilities_for_task(task: TaskDefinition, *, uses_dynamic: bool) -> set[AdapterCapability]:
caps: set[AdapterCapability] = {AdapterCapability.FILES, AdapterCapability.EXECUTION}
if task.completion.memory or any(seed.get("key") for seed in task.setup.memory_seed):
caps.add(AdapterCapability.MEMORY)
if task.completion.session is not None:
caps.add(AdapterCapability.SESSION)
if task.completion.cron:
caps.add(AdapterCapability.CRON)
if task.completion.gateway_assertions or task.setup.pre_check_gateway:
caps.add(AdapterCapability.GATEWAY_RPC)
if task.family == TaskFamily.BROWSER:
caps.add(AdapterCapability.BROWSER)
if uses_dynamic:
caps.add(AdapterCapability.MULTI_TURN_INJECTION)
return caps
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def from_task_definition(task: TaskDefinition) -> CanonicalTask:
"""Produce the canonical view of a legacy `TaskDefinition`.
This is lossless for fields that have a canonical equivalent.
OpenClaw-only constructs (gateway_assertions, pre_check_gateway,
memory_seed) become `StateQuery` entries / `SeedEntry` entries
tagged with the capability an adapter needs to resolve them.
"""
phases, any_dynamic = _phases_from_task(task)
assets = CanonicalAssets(
workspace_files=list(task.setup.workspace_files),
background_services=list(task.setup.background_services),
seed_state=_seeds_from_setup(task),
)
verifier = VerifierContract(
file_states=list(task.completion.files),
execution_checks=list(task.completion.execution_checks),
state_queries=_state_queries_from_completion(task),
pre_run_queries=_pre_run_queries_from_setup(task),
trajectory=task.trajectory,
behavior=task.behavior,
judge=task.judge,
)
interaction = InteractionPolicy(
max_turns=max((phase.user.max_turns for phase in phases), default=20),
allow_multi_phase=len(phases) > 1,
uses_dynamic_user_triggers=any_dynamic,
)
budgets = BudgetSpec(timeout_seconds=task.timeout_seconds)
capabilities = _capabilities_for_task(task, uses_dynamic=any_dynamic)
return CanonicalTask(
id=task.id,
name=task.name,
tier=task.tier,
family=task.family,
surface=task.surface,
scenario=task.scenario,
subscenario=task.subscenario,
capabilities=list(task.capabilities),
atomic_capabilities=list(task.atomic_capabilities),
pool=task.pool,
subsets=list(task.subsets),
variant_group=task.variant_group,
variant_id=task.variant_id,
template_id=task.template_id,
release_id=task.release_id,
source_kind=task.source_kind,
provenance_ids=list(task.provenance_ids),
privacy_tier=task.privacy_tier,
contamination_risk=task.contamination_risk,
freshness_epoch=task.freshness_epoch,
category=task.category,
domain=task.domain,
functionality=list(task.functionality),
trace_distribution=list(task.trace_distribution),
tool_surface=list(task.tool_surface),
risk_tags=list(task.risk_tags),
first_used_at=task.first_used_at,
retire_after_runs=task.retire_after_runs,
similarity_hash=task.similarity_hash,
canary_token=task.canary_token,
official=task.official,
query_difficulty=task.query_difficulty,
query_weight=task.query_weight,
artifact_type=task.artifact_type,
preconditions=list(task.preconditions),
source_dataset=task.source_dataset,
prompt_variants=list(task.prompt_variants),
pass_threshold=task.pass_threshold,
assets=assets,
phases=phases,
verifier=verifier,
budgets=budgets,
interaction=interaction,
deliverables=[],
required_adapter_capabilities=capabilities,
)

View File

@ -0,0 +1,296 @@
"""Canonical task schema — agent-agnostic intent.
This is the Phase-4 split of `TaskDefinition` (see CLAWBENCH_V0_4_SPEC.md
§"Canonical Task Schema"). The canonical layer expresses **what** a task
is its identity, prompts, assets, and verification contract without
saying **how** it gets executed. The "how" (gateway RPCs, session
lifecycle, tool-family normalization) lives in per-adapter code under
`clawbench/adapters/`.
The rule of thumb:
- If a field describes what the user asked for, what files/state the
agent is expected to produce, or what the run must satisfy to pass,
it belongs here.
- If a field describes how OpenClaw's gateway is called to drive the
run or read back state, it belongs in the OpenClaw adapter (and the
canonical version of that check is a `StateQuery` with a
`required_capability`).
Converting from `TaskDefinition` `CanonicalTask` is lossless for fields
that have a canonical equivalent; OpenClaw-only fields (like
`pre_check_gateway` and `gateway_assertions`) survive as `StateQuery`
entries tagged with `AdapterCapability.GATEWAY_RPC`, so adapters that
support them can still resolve them while adapters that don't can cleanly
report a capability gap.
"""
from __future__ import annotations
import enum
from typing import Any, Literal
from pydantic import BaseModel, Field, model_validator
from clawbench.schemas import (
ArtifactType,
BackgroundService,
BehaviorExpectations,
CapabilityTag,
ExecutionCheck,
FileState,
JudgeExpectations,
PromptVariant,
QueryDifficulty,
ScenarioDomain,
SimulatedUser,
TaskFamily,
TaskPool,
TaskSubset,
Tier,
TrajectoryExpectations,
)
class AdapterCapability(str, enum.Enum):
"""What an adapter is able to provide to a running task.
Each `StateQuery` declares a `required_capability`. If the selected
adapter's `capabilities` set does not include that capability, the
harness either skips the task entirely (strict mode) or scores the
query as neutral (partial mode). This keeps the leaderboard honest
about what an adapter can actually evaluate.
"""
FILES = "files"
EXECUTION = "execution"
MEMORY = "memory"
SESSION = "session"
CRON = "cron"
BROWSER = "browser"
GATEWAY_RPC = "gateway_rpc"
# The adapter can deliver additional user turns mid-trajectory in
# response to simulated-user triggers (when_tool_family,
# when_assistant_contains, etc). Single-shot drivers like Hermes's
# MiniSWERunner do not provide this.
MULTI_TURN_INJECTION = "multi_turn_injection"
StateQueryKind = Literal["memory", "session", "cron", "custom"]
StateQueryPredicate = Literal["exists", "absent", "equals", "contains"]
class StateQuery(BaseModel):
"""An abstract state assertion resolved by the active adapter.
The canonical layer does not commit to how the state is read. For
example, a `kind="memory"` query with `selector={"key_pattern":"alpha"}`
and `expected={"value_contains":["foo"]}` means "there is a memory
entry whose key matches /alpha/ and whose value contains 'foo'".
OpenClaw's adapter resolves that against the `memory.search` gateway
RPC; a filesystem-memory adapter (e.g. Hermes) resolves it by
scanning `MEMORY.md` / `memory/notes.md` in the workspace.
The `required_capability` is what the harness checks against the
adapter's declared capability set.
"""
kind: StateQueryKind
predicate: StateQueryPredicate = "exists"
selector: dict[str, Any] = Field(default_factory=dict)
expected: dict[str, Any] = Field(default_factory=dict)
required_capability: AdapterCapability
description: str = ""
class SeedEntry(BaseModel):
"""A single piece of pre-task state to seed into the workspace.
`kind="file"`: the adapter writes `content` (or copies a bundled
asset via `asset_pack`) to `path` inside the workspace.
`kind="memory"`: the adapter seeds a memory entry with `key` and
`content`. Adapters without memory support fall back to writing
the seed as a file (see `environment_files.verify_memory_fallback`).
"""
kind: Literal["file", "memory"]
path: str | None = None
content: str | None = None
key: str | None = None
asset_pack: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
@model_validator(mode="after")
def _validate_shape(self) -> SeedEntry:
if self.kind == "file" and not self.path and not self.asset_pack:
raise ValueError("SeedEntry(kind='file') requires `path` or `asset_pack`.")
if self.kind == "memory" and not self.key:
raise ValueError("SeedEntry(kind='memory') requires `key`.")
return self
class Deliverable(BaseModel):
"""A user-visible artifact the task is expected to produce."""
kind: ArtifactType
paths: list[str] = Field(default_factory=list)
description: str = ""
class BudgetSpec(BaseModel):
"""Per-task execution budgets.
`timeout_seconds` is the wall clock for the full run (all phases).
`max_tool_calls=0` means unbounded within the timeout. Adapters are
expected to honor these as soft caps; the harness will also enforce
the timeout as a hard deadline.
"""
timeout_seconds: int = 180
max_tool_calls: int = 0
per_turn_timeout_seconds: int = 0
class InteractionPolicy(BaseModel):
"""How the canonical phases drive the agent."""
max_turns: int = 20
allow_multi_phase: bool = True
# Declares that the task's simulated user sends follow-up turns
# based on trajectory triggers (not just counts). Adapters without
# MULTI_TURN_INJECTION cannot deliver these dynamically.
uses_dynamic_user_triggers: bool = False
class VerifierContract(BaseModel):
"""Everything needed to score a run, independent of how it ran.
The file/execution halves are fully agent-agnostic `environment_files`
evaluates them against the workspace directly. State queries are
resolved by `adapter.verify_state_query`. Trajectory and behavior
expectations are evaluated against the `Transcript` (already agent-
agnostic). The optional judge rubric is evaluated against artifacts
+ transcript + completion feedback.
"""
file_states: list[FileState] = Field(default_factory=list)
execution_checks: list[ExecutionCheck] = Field(default_factory=list)
state_queries: list[StateQuery] = Field(default_factory=list)
pre_run_queries: list[StateQuery] = Field(default_factory=list)
trajectory: TrajectoryExpectations = Field(default_factory=TrajectoryExpectations)
behavior: BehaviorExpectations = Field(default_factory=BehaviorExpectations)
judge: JudgeExpectations | None = None
class CanonicalAssets(BaseModel):
"""Workspace + seed state the harness realizes before phases run.
`workspace_files` is a list of relative paths (resolved against the
task's assets/ dir) to copy into the workspace. `background_services`
is already canonical (subprocess + readiness probe, no OpenClaw
coupling). `seed_state` replaces `asset_packs` + `memory_seed` with
a uniform per-entry list.
"""
workspace_files: list[str] = Field(default_factory=list)
background_services: list[BackgroundService] = Field(default_factory=list)
seed_state: list[SeedEntry] = Field(default_factory=list)
class CanonicalPhase(BaseModel):
"""One simulated-user phase in a multi-phase task.
`user` is reused verbatim from `clawbench.schemas.SimulatedUser`
it is already agent-agnostic (turn text + canonical trigger
predicates). Whether a specific trigger fires on a given adapter
depends on whether tool-family tags are populated, which is an
adapter responsibility.
"""
name: str
user: SimulatedUser
timeout_seconds: int | None = None
class CanonicalTask(BaseModel):
"""Agent-agnostic task definition.
Produced by `convert.from_task_definition` from an existing
`TaskDefinition`. Consumed by adapters via `AdapterContext` and by
the scorer + trajectory/judge layers. No field here is OpenClaw-
specific; OpenClaw-only semantics survive as `StateQuery` entries
with `required_capability=GATEWAY_RPC`.
"""
# Identity and taxonomy (already canonical in TaskDefinition).
id: str
name: str
tier: Tier
family: TaskFamily
surface: str
scenario: ScenarioDomain | None = None
subscenario: str = ""
capabilities: list[CapabilityTag] = Field(default_factory=list)
atomic_capabilities: list[str] = Field(default_factory=list)
# Pool / rotation / provenance.
pool: TaskPool = TaskPool.PUBLIC_DEV
subsets: list[TaskSubset] = Field(default_factory=list)
variant_group: str = ""
variant_id: str = "main"
template_id: str = ""
release_id: str = ""
source_kind: str = ""
provenance_ids: list[str] = Field(default_factory=list)
privacy_tier: str = ""
contamination_risk: str = ""
freshness_epoch: str = ""
category: str = ""
domain: str = ""
functionality: list[str] = Field(default_factory=list)
trace_distribution: list[str] = Field(default_factory=list)
tool_surface: list[str] = Field(default_factory=list)
risk_tags: list[str] = Field(default_factory=list)
first_used_at: str = ""
retire_after_runs: int = 0
similarity_hash: str = ""
canary_token: str = ""
official: bool = False
# Policy + prompts.
query_difficulty: QueryDifficulty | None = None
query_weight: float = 1.0
artifact_type: ArtifactType | None = None
preconditions: list[str] = Field(default_factory=list)
source_dataset: str = ""
prompt_variants: list[PromptVariant] = Field(default_factory=lambda: [PromptVariant.CLEAR])
pass_threshold: float = 0.7
# Canonical body.
assets: CanonicalAssets = Field(default_factory=CanonicalAssets)
phases: list[CanonicalPhase]
verifier: VerifierContract = Field(default_factory=VerifierContract)
budgets: BudgetSpec = Field(default_factory=BudgetSpec)
interaction: InteractionPolicy = Field(default_factory=InteractionPolicy)
deliverables: list[Deliverable] = Field(default_factory=list)
# Adapter gating.
required_adapter_capabilities: set[AdapterCapability] = Field(default_factory=set)
# Forward-compat: lets us evolve this schema while hidden / external
# task manifests continue to validate.
schema_version: str = "1"
@model_validator(mode="after")
def _defaults(self) -> CanonicalTask:
if not self.variant_group:
self.variant_group = self.id
if not self.prompt_variants:
self.prompt_variants = [PromptVariant.CLEAR]
else:
deduped: list[PromptVariant] = []
for variant in self.prompt_variants:
if variant not in deduped:
deduped.append(variant)
self.prompt_variants = deduped
return self

View File

@ -0,0 +1,438 @@
"""Agent-agnostic workspace verification primitives.
This is the half of `environment.py` that does not touch the OpenClaw
gateway: file-state checks, execution-check subprocessing, stdout/JSON
assertions, JSON path resolution, and the filesystem/transcript-based
memory fallback readers.
Adapters (OpenClaw, Hermes, future) consume these primitives directly.
`environment.py` re-exports them for back-compat so existing callers
keep working while the gateway-tied halves (`_verify_memory` primary
path, `_verify_session`, `_verify_cron`, `_verify_gateway_assertion`)
stay where they are and move to `adapters/openclaw.py` in a later step.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import shlex
import sys
from pathlib import Path
from typing import Any
from clawbench.paths import resolve_workspace_path
from clawbench.render import render_template, render_value
from clawbench.schemas import (
ExecutionCheck,
ExecutionCheckResult,
FileState,
MemoryState,
Transcript,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# File-state verification
# ---------------------------------------------------------------------------
def verify_file_state(
spec: FileState,
workspace: Path,
runtime_values: dict[str, Any],
) -> tuple[bool, str]:
"""Verify a single `FileState` against the workspace filesystem."""
try:
path = resolve_workspace_path(
workspace,
render_template(spec.path, runtime_values),
field=f"completion file {spec.path}",
)
except ValueError as exc:
return False, str(exc)
exists = path.exists() and path.is_file()
if not spec.exists:
return (not exists, "Correctly absent" if not exists else "File should not exist")
if not exists:
return False, "File does not exist"
content = path.read_text(encoding="utf-8", errors="replace")
if spec.min_size_bytes > 0 and path.stat().st_size < spec.min_size_bytes:
return False, f"File too small: {path.stat().st_size} < {spec.min_size_bytes}"
for token in spec.content_contains:
rendered = render_template(token, runtime_values)
if rendered not in content:
return False, f"Missing expected content '{rendered}'"
for token in spec.content_not_contains:
rendered = render_template(token, runtime_values)
if rendered in content:
return False, f"Contains forbidden content '{rendered}'"
if spec.content_matches and not re.search(
render_template(spec.content_matches, runtime_values),
content,
re.MULTILINE | re.DOTALL,
):
return False, f"Content does not match {spec.content_matches}"
return True, "OK"
# ---------------------------------------------------------------------------
# Execution checks
# ---------------------------------------------------------------------------
async def run_execution_check(
spec: ExecutionCheck,
*,
workspace: Path,
runtime_values: dict[str, Any],
) -> ExecutionCheckResult:
"""Run a single `ExecutionCheck` subprocess and evaluate its output."""
rendered_command = render_template(spec.command, runtime_values)
try:
rendered_cwd = resolve_workspace_path(
workspace,
render_template(spec.cwd, runtime_values),
field=f"execution check cwd for {spec.name}",
)
except ValueError as exc:
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=str(exc),
)
rendered_env = render_value(spec.env, runtime_values)
full_env = {
**os.environ,
**{key: str(value) for key, value in rendered_env.items()},
"PYTHONUNBUFFERED": "1",
}
python_bin_dir = str(Path(sys.executable).parent)
full_env["PATH"] = f"{python_bin_dir}:{full_env.get('PATH', '')}"
python_path_parts = [str(rendered_cwd), str(workspace)]
existing_pythonpath = full_env.get("PYTHONPATH")
if existing_pythonpath:
python_path_parts.append(existing_pythonpath)
full_env["PYTHONPATH"] = ":".join(python_path_parts)
try:
if spec.shell:
process = await asyncio.create_subprocess_shell(
rendered_command,
cwd=str(rendered_cwd),
env=full_env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
else:
process = await asyncio.create_subprocess_exec(
*shlex.split(rendered_command),
cwd=str(rendered_cwd),
env=full_env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(),
timeout=spec.timeout_seconds,
)
except asyncio.TimeoutError:
process.kill()
await process.communicate()
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=f"Timed out after {spec.timeout_seconds}s",
)
except Exception as exc:
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=-1,
passed=False,
reason=str(exc),
)
stdout = stdout_bytes.decode("utf-8", errors="replace")
stderr = stderr_bytes.decode("utf-8", errors="replace")
passed, reason = evaluate_execution_result(
spec, workspace, runtime_values, process.returncode, stdout, stderr
)
return ExecutionCheckResult(
name=spec.name,
command=rendered_command,
exit_code=process.returncode,
stdout=stdout,
stderr=stderr,
passed=passed,
reason=reason,
)
def evaluate_execution_result(
spec: ExecutionCheck,
workspace: Path,
runtime_values: dict[str, Any],
exit_code: int,
stdout: str,
stderr: str,
) -> tuple[bool, str]:
"""Apply every assertion declared on an `ExecutionCheck`."""
if exit_code != spec.expected_exit_code:
return False, f"Exit code {exit_code} != expected {spec.expected_exit_code}"
for token in spec.stdout_contains:
rendered = render_template(token, runtime_values)
if rendered not in stdout:
return False, f"stdout missing '{rendered}'"
for token in spec.stdout_not_contains:
rendered = render_template(token, runtime_values)
if rendered in stdout:
return False, f"stdout unexpectedly contains '{rendered}'"
for token in spec.stderr_contains:
rendered = render_template(token, runtime_values)
if rendered not in stderr:
return False, f"stderr missing '{rendered}'"
if spec.stdout_matches and not re.search(
render_template(spec.stdout_matches, runtime_values), stdout, re.MULTILINE | re.DOTALL
):
return False, f"stdout does not match {spec.stdout_matches}"
if spec.stderr_matches and not re.search(
render_template(spec.stderr_matches, runtime_values), stderr, re.MULTILINE | re.DOTALL
):
return False, f"stderr does not match {spec.stderr_matches}"
if spec.expected_stdout is not None:
rendered = render_template(spec.expected_stdout, runtime_values).strip()
if stdout.strip() != rendered:
return False, "stdout did not match expected text"
if spec.expected_stdout_file:
try:
expected_path = resolve_workspace_path(
workspace,
render_template(spec.expected_stdout_file, runtime_values),
field=f"expected_stdout_file for {spec.name}",
)
except ValueError as exc:
return False, str(exc)
if stdout.strip() != expected_path.read_text(encoding="utf-8").strip():
return False, f"stdout did not match {spec.expected_stdout_file}"
if spec.expected_json is not None:
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
return False, f"stdout was not valid JSON: {exc}"
if parsed != render_value(spec.expected_json, runtime_values):
return False, "stdout JSON did not match expected JSON"
if spec.expected_json_file:
try:
expected_path = resolve_workspace_path(
workspace,
render_template(spec.expected_json_file, runtime_values),
field=f"expected_json_file for {spec.name}",
)
except ValueError as exc:
return False, str(exc)
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
return False, f"stdout was not valid JSON: {exc}"
expected_json = json.loads(expected_path.read_text(encoding="utf-8"))
if parsed != expected_json:
return False, f"stdout JSON did not match {spec.expected_json_file}"
return True, "OK"
# ---------------------------------------------------------------------------
# Memory fallback: read well-known files from the workspace directly.
# ---------------------------------------------------------------------------
MEMORY_FILE_CANDIDATES: tuple[str, ...] = (
"MEMORY.md",
"memory.md",
"memory/MEMORY.md",
"memory/memory.md",
"memory/notes.md",
"memory/NOTES.md",
"notes.md",
)
def read_workspace_memory_text(workspace: Path) -> str:
"""Read concatenated memory-file contents straight from the workspace.
This is the adapter-free equivalent of
`environment._read_agent_memory_text`, which reads the same files via
`GatewayClient.get_agent_file`. Use this from any adapter whose agent
runs directly in the ClawBench workspace (Hermes, Claude Code, Codex).
"""
contents: list[str] = []
for name in MEMORY_FILE_CANDIDATES:
path = workspace / name
try:
if path.is_file():
text = path.read_text(encoding="utf-8", errors="replace")
if text.strip():
contents.append(text)
except Exception:
continue
return "\n".join(contents)
def memory_visible_in_transcript(spec: MemoryState, transcript: Transcript) -> bool:
"""Return True if the transcript shows a memory *write* matching `spec`.
Same heuristic as `environment._memory_visible_in_transcript` kept
agent-agnostic: it reads `ToolCall.family`, `call.name`, `call.input`,
`call.output`, `call.error`, all of which are canonical.
"""
needle = spec.key_pattern.lower()
for call in transcript.tool_call_sequence:
family = (call.family or "").lower()
name = call.name.lower()
path = str(call.input.get("path", "")).lower()
if family != "memory" and "memory" not in path:
continue
if (
family == "memory"
and "search" in name
and "write" not in name
and "store" not in name
and "save" not in name
):
continue
serialized_bits = [call.output, call.error]
try:
serialized_bits.append(json.dumps(call.input, sort_keys=True))
except TypeError:
serialized_bits.append(str(call.input))
haystack = " ".join(bit for bit in serialized_bits if bit).lower()
if needle not in haystack:
continue
if all(token.lower() in haystack for token in spec.value_contains):
return True
return False
def verify_memory_fallback(
spec: MemoryState,
workspace: Path,
*,
transcript: Transcript | None = None,
extra_memory_text: str = "",
) -> tuple[bool, str]:
"""Resolve a `MemoryState` assertion using workspace files + transcript.
Used by any adapter that doesn't expose an OpenClaw-style
`memory.search` RPC. The lookup strategy is deliberately permissive
(matches the existing fallback path in `environment._verify_memory`):
1. Concatenate every known memory file in the workspace.
2. Optionally add any adapter-supplied text (e.g. OpenClaw's
`_read_agent_memory_text`) via `extra_memory_text`.
3. If the key_pattern appears (case-insensitive), check every
`value_contains` token.
4. If that fails, fall back to scanning the transcript for a memory
write that matches.
"""
memory_text = (read_workspace_memory_text(workspace) + "\n" + extra_memory_text).lower()
needle = spec.key_pattern.lower()
found = needle in memory_text
if not spec.exists:
return (not found, "Correctly absent" if not found else "Memory entry exists")
if found:
for token in spec.value_contains:
if token.lower() not in memory_text:
return False, f"Memory value missing '{token}'"
return True, "OK"
if transcript is not None and memory_visible_in_transcript(spec, transcript):
return True, "Verified from transcript fallback"
return (
False,
"No matching memory content found in persisted memory files or transcript fallback",
)
# ---------------------------------------------------------------------------
# JSON-path resolver (pure function over dict/list payloads)
# ---------------------------------------------------------------------------
def resolve_json_path(payload: Any, path: str) -> Any:
"""Resolve a dotted `$.foo.bar[0].baz` path into `payload`.
Returns None if any part of the path is missing or the type is
wrong. Handles index syntax via `foo[3]`.
"""
if path == "$":
return payload
current = payload
for part in path.lstrip("$").lstrip(".").split("."):
if not part:
continue
match = re.fullmatch(r"([^\[]+)\[(\d+)\]", part)
if match:
key, index = match.groups()
if not isinstance(current, dict) or key not in current:
return None
current = current[key]
if not isinstance(current, list):
return None
idx = int(index)
if idx >= len(current):
return None
current = current[idx]
continue
if isinstance(current, dict) and part in current:
current = current[part]
continue
return None
return current
__all__ = [
"MEMORY_FILE_CANDIDATES",
"evaluate_execution_result",
"memory_visible_in_transcript",
"read_workspace_memory_text",
"resolve_json_path",
"run_execution_check",
"verify_file_state",
"verify_memory_fallback",
]

View File

@ -390,6 +390,12 @@ class TaskDefinition(BaseModel):
privacy_tier: str = ""
contamination_risk: str = ""
freshness_epoch: str = ""
category: str = ""
domain: str = ""
functionality: list[str] = Field(default_factory=list)
trace_distribution: list[str] = Field(default_factory=list)
tool_surface: list[str] = Field(default_factory=list)
risk_tags: list[str] = Field(default_factory=list)
first_used_at: str = ""
retire_after_runs: int = 0
similarity_hash: str = ""

View File

@ -32,6 +32,9 @@ dev = [
"pre-commit>=4.0,<5",
"ruff>=0.9,<1",
]
hermes = [
"hermes-agent @ git+https://github.com/NousResearch/hermes-agent.git@main",
]
[project.urls]
Homepage = "https://github.com/openclaw/clawbench"
@ -49,6 +52,9 @@ build-backend = "hatchling.build"
packages = ["clawbench"]
force-include = { "tasks-public" = "tasks-public", "tasks-domain" = "tasks-domain", "profiles" = "profiles", "baselines" = "baselines", "CLAWBENCH_V0_4_SPEC.md" = "CLAWBENCH_V0_4_SPEC.md", "PARTNER_TRACE_SPEC.md" = "PARTNER_TRACE_SPEC.md" }
[tool.hatch.metadata]
allow-direct-references = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
addopts = ["-p", "no:opik"]

122
tests/test_ablation.py Normal file
View File

@ -0,0 +1,122 @@
from clawbench.ablation import (
common_compatible_task_set,
compare_results,
default_tool_profile,
)
from clawbench.adapters.hermes import HermesAdapterConfig
from clawbench.schemas import (
BenchmarkResult,
CompletionSpec,
FileState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskStats,
Tier,
UserTurn,
)
def _task(task_id: str) -> TaskDefinition:
return TaskDefinition(
id=task_id,
name=task_id,
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
user=SimulatedUser(turns=[UserTurn(message="write out.txt")]),
completion=CompletionSpec(files=[FileState(path="out.txt")]),
)
def test_tool_profile_fingerprint_is_stable() -> None:
config = HermesAdapterConfig(driver_mode="ai_agent", enabled_toolsets=["hermes-api-server"])
a = default_tool_profile(adapter="hermes", config=config, enabled_toolsets=["hermes-api-server"])
b = default_tool_profile(adapter="hermes", config=config, enabled_toolsets=["hermes-api-server"])
assert a.fingerprint == b.fingerprint
assert "browser" in a.interfaces
assert "multi_turn" in a.interfaces
def test_common_compatible_task_set_uses_effective_adapter_config() -> None:
tasks = [_task("a"), _task("b")]
plan = common_compatible_task_set(
tasks,
{
"openclaw": ("openclaw", None),
"hermes": ("hermes", HermesAdapterConfig(driver_mode="ai_agent")),
},
)
assert plan.task_ids == ["a", "b"]
assert plan.skipped == {}
def _result(label: str, model: str, task_ids: list[str], score: float) -> BenchmarkResult:
task_results = [
TaskStats(
task_id=task_id,
tier="tier1",
family="coding",
runs=1,
mean_completion_score=1.0,
mean_trajectory_score=1.0,
mean_behavior_score=1.0,
mean_run_score=score,
reliability_score=1.0,
variance_score=1.0,
mean_task_score=score,
stddev=0.0,
min_score=score,
max_score=score,
pass_at_1=True,
pass_rate=1.0,
pass_hat_k=True,
)
for task_id in task_ids
]
return BenchmarkResult(
submission_id=label,
model=model,
provider="test",
timestamp="2026-04-25T00:00:00Z",
overall_score=score,
overall_completion=1.0,
overall_trajectory=1.0,
overall_behavior=1.0,
overall_reliability=1.0,
overall_ci_lower=score,
overall_ci_upper=score,
overall_pass_hat_k=1.0,
task_results=task_results,
)
def test_compare_results_rejects_different_task_sets() -> None:
comparison = compare_results(
{
"a": _result("a", "m", ["t1", "t2"], 0.8),
"b": _result("b", "m", ["t1"], 0.9),
}
)
assert comparison["fair"] is False
assert comparison["task_verifier_fair"] is False
assert comparison["controlled_ablation"] is False
assert comparison["same_model"] is True
assert comparison["same_task_set"] is False
def test_compare_results_allows_cross_model_same_task_leaderboard() -> None:
a = _result("a", "model-a", ["t1", "t2"], 0.8)
b = _result("b", "model-b", ["t1", "t2"], 0.9)
a.task_snapshot_fingerprint = "snapshot-1"
b.task_snapshot_fingerprint = "snapshot-1"
comparison = compare_results({"a": a, "b": b})
assert comparison["fair"] is True
assert comparison["task_verifier_fair"] is True
assert comparison["controlled_ablation"] is False
assert comparison["same_model"] is False

222
tests/test_adapter_base.py Normal file
View File

@ -0,0 +1,222 @@
"""Tests for `clawbench.adapters.base` + registry.
Keeps the adapter ABC and registration helpers honest before any
concrete adapter lands. A parametrized contract test in
`test_adapter_contract.py` will exercise the ABC against every shipped
adapter later.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from clawbench.adapters import (
ADAPTERS,
AdapterContext,
AgentAdapter,
PhaseResult,
StateQueryResult,
get_adapter,
register_adapter,
)
from clawbench.canonical import (
AdapterCapability,
CanonicalPhase,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Minimal adapter for contract verification.
# ---------------------------------------------------------------------------
class _EchoAdapter(AgentAdapter):
name = "echo-test-adapter"
capabilities = {AdapterCapability.FILES, AdapterCapability.EXECUTION}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover - trivial
return None
async def run_phase(
self, phase: CanonicalPhase, ctx: AdapterContext
) -> PhaseResult:
return PhaseResult(messages=[], adapter_metadata={"phase": phase.name})
async def verify_state_query(
self, query: StateQuery, ctx: AdapterContext
) -> StateQueryResult:
if query.required_capability in self.capabilities:
return StateQueryResult(ok=True, detail="echo-adapter-always-ok")
return StateQueryResult(
ok=False,
detail=f"echo adapter does not provide {query.required_capability.value}",
capability_missing=True,
)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover - trivial
return None
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
def test_register_adapter_adds_to_registry_and_get_adapter_resolves() -> None:
original = dict(ADAPTERS)
try:
register_adapter(_EchoAdapter)
assert ADAPTERS["echo-test-adapter"] is _EchoAdapter
assert get_adapter("echo-test-adapter") is _EchoAdapter
finally:
ADAPTERS.clear()
ADAPTERS.update(original)
def test_register_adapter_rejects_duplicate_name() -> None:
class _OtherEcho(AgentAdapter):
name = "echo-test-adapter"
capabilities = {AdapterCapability.FILES}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
async def run_phase(self, phase, ctx) -> PhaseResult: # pragma: no cover
return PhaseResult()
async def verify_state_query(self, query, ctx) -> StateQueryResult: # pragma: no cover
return StateQueryResult(ok=False, capability_missing=True)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
original = dict(ADAPTERS)
try:
register_adapter(_EchoAdapter)
with pytest.raises(ValueError):
register_adapter(_OtherEcho)
finally:
ADAPTERS.clear()
ADAPTERS.update(original)
def test_register_adapter_requires_name() -> None:
class _Nameless(AgentAdapter):
capabilities = {AdapterCapability.FILES}
async def setup(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
async def run_phase(self, phase, ctx) -> PhaseResult: # pragma: no cover
return PhaseResult()
async def verify_state_query(self, query, ctx) -> StateQueryResult: # pragma: no cover
return StateQueryResult(ok=False, capability_missing=True)
async def teardown(self, ctx: AdapterContext) -> None: # pragma: no cover
return None
with pytest.raises(ValueError):
register_adapter(_Nameless)
def test_get_adapter_raises_for_unknown_name() -> None:
with pytest.raises(KeyError):
get_adapter("no-such-adapter-exists")
# ---------------------------------------------------------------------------
# Capability gating helpers
# ---------------------------------------------------------------------------
def _file_task() -> CanonicalTask:
task = TaskDefinition(
id="capability-test",
name="capability test",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(),
user=SimulatedUser(
max_turns=1, turns=[UserTurn(message="Do a thing.")]
),
completion=CompletionSpec(
files=[FileState(path="out.txt", exists=True)],
execution_checks=[ExecutionCheck(name="ok", command="true")],
),
)
return from_task_definition(task)
def test_supports_is_true_when_capabilities_cover_task() -> None:
task = _file_task()
assert _EchoAdapter.supports(task)
assert _EchoAdapter.missing_capabilities_for(task) == set()
def test_supports_is_false_when_task_needs_more() -> None:
task = _file_task()
task = task.model_copy(
update={
"required_adapter_capabilities": (
task.required_adapter_capabilities | {AdapterCapability.MEMORY}
)
}
)
assert not _EchoAdapter.supports(task)
assert _EchoAdapter.missing_capabilities_for(task) == {AdapterCapability.MEMORY}
# ---------------------------------------------------------------------------
# Context roundtrip (sanity: adapter methods can build and return
# PhaseResult / StateQueryResult without tripping dataclass defaults)
# ---------------------------------------------------------------------------
def test_adapter_phase_result_round_trip(tmp_path: Path) -> None:
task = _file_task()
adapter = _EchoAdapter()
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="test-model",
transcript=Transcript(),
)
import asyncio
async def _go() -> None:
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
assert isinstance(result, PhaseResult)
assert result.adapter_metadata == {"phase": task.phases[0].name}
query = StateQuery(
kind="memory",
required_capability=AdapterCapability.MEMORY,
selector={"key_pattern": "x"},
)
res = await adapter.verify_state_query(query, ctx)
assert res.capability_missing is True
await adapter.teardown(ctx)
asyncio.run(_go())

View File

@ -0,0 +1,268 @@
"""Tests for `clawbench.canonical.convert.from_task_definition`.
Covers the three representative task shapes:
1. A files + execution-only task (tier-1 bugfix) must produce
`required_adapter_capabilities == {FILES, EXECUTION}`.
2. A memory-using, multi-phase task (tier-2 memory roundtrip) must
include `MEMORY` and MULTI_TURN_INJECTION is NOT set since each
phase's user has exactly one static turn.
3. A synthetic task exercising gateway_assertions, session, cron, and
browser must surface each capability.
The tests also round-trip the real task corpus through the converter
to make sure every live YAML file produces a valid `CanonicalTask`
(no missing-field or validation errors), since the converter is how
every downstream adapter will see tasks.
"""
from __future__ import annotations
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
from_task_definition,
)
from clawbench.schemas import (
BackgroundService,
CompletionSpec,
CronState,
ExecutionCheck,
FileState,
GatewayAssertion,
MemoryState,
SessionState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
UserTurn,
)
from clawbench.tasks import load_all_tasks
# ---------------------------------------------------------------------------
# Fixture builders
# ---------------------------------------------------------------------------
def _files_only_task() -> TaskDefinition:
return TaskDefinition(
id="test-files-only",
name="Files-only task",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(asset_packs=["pack_a"]),
user=SimulatedUser(
max_turns=2,
turns=[UserTurn(message="Fix the bug and run the tests.")],
),
completion=CompletionSpec(
files=[FileState(path="src/main.py", exists=True)],
execution_checks=[ExecutionCheck(name="tests", command="pytest -q")],
),
)
def _memory_task() -> TaskDefinition:
return TaskDefinition(
id="test-memory-roundtrip",
name="Memory roundtrip",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
memory_seed=[{"key": "existing_key", "value": "existing_value"}],
),
phases=[
{
"name": "store",
"user": SimulatedUser(
max_turns=1,
turns=[UserTurn(message="Remember: stack = React, Node, Postgres.")],
),
},
{
"name": "recall",
"user": SimulatedUser(
max_turns=1,
turns=[UserTurn(message="What's my stack?")],
),
},
],
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
),
)
def _full_surface_task() -> TaskDefinition:
# Synthetic task exercising session, cron, gateway_assertion, browser,
# and a dynamic follow-up turn.
return TaskDefinition(
id="test-full-surface",
name="Full surface",
tier=Tier.TIER3,
family=TaskFamily.BROWSER,
surface="browser",
setup=TaskSetup(
pre_check_gateway=[
GatewayAssertion(
method="agents.list",
assert_path="$.count",
assert_equals=0,
),
],
background_services=[
BackgroundService(
name="echo-service",
command="python3 -m http.server",
port=0,
ready_path="/",
),
],
),
user=SimulatedUser(
max_turns=4,
turns=[
UserTurn(message="Start the task."),
UserTurn(
message="Try again.",
when_tool_family="browser",
when_last_tool_failed=True,
),
],
),
completion=CompletionSpec(
session=SessionState(should_exist=True, model_should_be="claude-opus-4"),
cron=[CronState(exists=True, description_contains="daily")],
gateway_assertions=[
GatewayAssertion(
method="memory.list",
assert_path="$.count",
assert_equals=1,
),
],
),
)
# ---------------------------------------------------------------------------
# Capability inference
# ---------------------------------------------------------------------------
def test_files_only_task_requires_only_files_and_execution() -> None:
task = _files_only_task()
task.category = "software_engineering"
task.domain = "devtools"
task.functionality = ["bugfix", "test_verification"]
task.trace_distribution = ["read_heavy", "edit_heavy", "execute_heavy"]
task.tool_surface = ["filesystem", "shell"]
task.risk_tags = ["code_regression"]
canonical = from_task_definition(task)
assert isinstance(canonical, CanonicalTask)
assert canonical.required_adapter_capabilities == {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
assert canonical.category == "software_engineering"
assert canonical.domain == "devtools"
assert canonical.functionality == ["bugfix", "test_verification"]
assert canonical.trace_distribution == ["read_heavy", "edit_heavy", "execute_heavy"]
assert canonical.tool_surface == ["filesystem", "shell"]
assert canonical.risk_tags == ["code_regression"]
# Seed state should carry the asset pack through.
assert len(canonical.assets.seed_state) == 1
assert canonical.assets.seed_state[0].kind == "file"
assert canonical.assets.seed_state[0].asset_pack == "pack_a"
# File + execution checks carry over.
assert len(canonical.verifier.file_states) == 1
assert len(canonical.verifier.execution_checks) == 1
assert canonical.verifier.state_queries == []
# One non-dynamic phase → no dynamic-trigger capability.
assert canonical.interaction.uses_dynamic_user_triggers is False
def test_memory_task_requires_memory_capability() -> None:
canonical = from_task_definition(_memory_task())
assert AdapterCapability.MEMORY in canonical.required_adapter_capabilities
# Two phases with a single static turn each → dynamic-trigger is NOT
# required (the simulated user just sends one message per phase).
assert AdapterCapability.MULTI_TURN_INJECTION not in canonical.required_adapter_capabilities
assert canonical.interaction.allow_multi_phase is True
assert len(canonical.phases) == 2
# Memory seed lifted to SeedEntry.
memory_seeds = [s for s in canonical.assets.seed_state if s.kind == "memory"]
assert len(memory_seeds) == 1
assert memory_seeds[0].key == "existing_key"
# Memory completion check → StateQuery with MEMORY capability.
memory_queries = [q for q in canonical.verifier.state_queries if q.kind == "memory"]
assert len(memory_queries) == 1
assert memory_queries[0].required_capability is AdapterCapability.MEMORY
assert memory_queries[0].selector == {"key_pattern": "stack"}
assert memory_queries[0].expected == {"value_contains": ["React"]}
def test_full_surface_task_surfaces_every_capability() -> None:
canonical = from_task_definition(_full_surface_task())
caps = canonical.required_adapter_capabilities
assert AdapterCapability.FILES in caps
assert AdapterCapability.EXECUTION in caps
assert AdapterCapability.SESSION in caps
assert AdapterCapability.CRON in caps
assert AdapterCapability.GATEWAY_RPC in caps
assert AdapterCapability.BROWSER in caps
# Dynamic turn (when_tool_family + when_last_tool_failed) flags MTI.
assert AdapterCapability.MULTI_TURN_INJECTION in caps
# pre_check_gateway survives as a pre-run query.
assert len(canonical.verifier.pre_run_queries) == 1
assert canonical.verifier.pre_run_queries[0].required_capability is AdapterCapability.GATEWAY_RPC
# gateway_assertions route through the verifier state_queries.
gateway_queries = [
q for q in canonical.verifier.state_queries if q.kind == "custom"
]
assert len(gateway_queries) == 1
assert gateway_queries[0].selector["method"] == "memory.list"
# Session state with model constraint surfaces in expected.
session_queries = [q for q in canonical.verifier.state_queries if q.kind == "session"]
assert len(session_queries) == 1
assert session_queries[0].expected == {"model": "claude-opus-4"}
def test_background_services_pass_through_unchanged() -> None:
canonical = from_task_definition(_full_surface_task())
assert len(canonical.assets.background_services) == 1
service = canonical.assets.background_services[0]
assert service.name == "echo-service"
assert service.command == "python3 -m http.server"
# ---------------------------------------------------------------------------
# Whole-corpus smoke
# ---------------------------------------------------------------------------
def test_every_task_in_corpus_converts() -> None:
"""Every shipped task YAML must produce a valid CanonicalTask.
Acts as a regression gate: any new field added to TaskDefinition that
the converter doesn't know about will likely still work (fields it
ignores don't break canonical), but any task using new completion
shapes that the converter can't translate will raise here.
"""
tasks = load_all_tasks()
assert tasks, "expected at least one task in the corpus"
for task in tasks:
canonical = from_task_definition(task)
# Every canonical task must declare FILES + EXECUTION capability.
assert AdapterCapability.FILES in canonical.required_adapter_capabilities
assert AdapterCapability.EXECUTION in canonical.required_adapter_capabilities
# Phases always have at least one entry (normalized_phases fills
# one from `user` when `phases` is absent).
assert canonical.phases, f"{task.id}: canonical phases empty"
# Budgets honour the source timeout.
assert canonical.budgets.timeout_seconds == task.timeout_seconds

View File

@ -0,0 +1,98 @@
from pathlib import Path
import pytest
from clawbench.environment_files import run_execution_check, verify_file_state
from clawbench.schemas import ExecutionCheck, FileState
def test_verify_file_state_rejects_paths_outside_workspace(tmp_path: Path):
outside = tmp_path.parent / "outside.txt"
outside.write_text("secret", encoding="utf-8")
ok, reason = verify_file_state(
FileState(path="../outside.txt"),
workspace=tmp_path,
runtime_values={},
)
assert ok is False
assert "escapes workspace" in reason
@pytest.mark.asyncio
async def test_execution_check_supports_cwd_env_and_expected_json_file(tmp_path: Path):
expected = tmp_path / "expected.json"
expected.write_text('{"status": "ok"}', encoding="utf-8")
workdir = tmp_path / "subdir"
workdir.mkdir()
result = await run_execution_check(
ExecutionCheck(
name="json-check",
command=(
"python -c \"import json, os; "
"print(json.dumps({'status': os.environ['CHECK_STATUS']}))\""
),
cwd="subdir",
env={"CHECK_STATUS": "ok"},
expected_json_file="expected.json",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is True
assert result.reason == "OK"
@pytest.mark.asyncio
async def test_execution_check_rejects_cwd_outside_workspace(tmp_path: Path):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-cwd",
command="true",
cwd="../outside",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason
@pytest.mark.asyncio
async def test_execution_check_rejects_expected_stdout_file_outside_workspace(
tmp_path: Path,
):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-expected-stdout",
command="printf secret",
expected_stdout_file="../outside.txt",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason
@pytest.mark.asyncio
async def test_execution_check_rejects_expected_json_file_outside_workspace(
tmp_path: Path,
):
result = await run_execution_check(
ExecutionCheck(
name="unsafe-expected-json",
command="printf '{}'",
expected_json_file="../outside.json",
),
workspace=tmp_path,
runtime_values={},
)
assert result.passed is False
assert "escapes workspace" in result.reason

View File

@ -0,0 +1,463 @@
"""Tests for `HermesAdapter` against a stub `MiniSWERunner`.
We don't pull in the real `hermes-agent` package — the adapter is
driven through its `runner_factory` hook, which lets tests plug in a
fixed conversation without any network / subprocess activity.
What's covered:
- The adapter registers under the `"hermes"` name.
- `capabilities` is the minimal `{FILES, EXECUTION}` set.
- `setup` realises memory seed entries as workspace files.
- `run_phase` renders the user turn, calls the stub runner, and
appends the parsed conversation into the shared transcript.
- `verify_state_query` falls back to workspace memory scanning for
memory queries, and returns `capability_missing=True` for other
kinds.
- Task gating: a task that requires MEMORY / SESSION / CRON is NOT
supported by HermesAdapter; a files-only task is.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterContext, StateQueryResult
from clawbench.adapters.hermes import HermesAdapter, HermesAdapterConfig
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
MemoryState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Stub MiniSWERunner
# ---------------------------------------------------------------------------
class _StubRunner:
"""Pretends to be `MiniSWERunner`; returns a canned conversation."""
def __init__(self, *, model: str, cwd: str, **_: object) -> None:
self.model = model
self.cwd = cwd
self.last_prompt: str | None = None
self.calls = 0
self.conversation = {
"conversations": [
{"from": "user", "value": "placeholder — filled per-test"},
{
"from": "assistant",
"value": (
"Running `ls`.\n"
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>'
),
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"main.py"}</tool_response>',
},
],
"completed": True,
"api_calls": 3,
"metadata": {"model": "stub", "env_type": "local"},
}
def run_task(self, prompt: str) -> dict:
self.last_prompt = prompt
self.calls += 1
# Swap the placeholder user value with the real prompt so the
# conversation reflects what the adapter actually sent.
convo = {**self.conversation}
convo["conversations"] = [
{"from": "user", "value": prompt}
if entry.get("from") == "user"
else entry
for entry in convo["conversations"]
]
return convo
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _files_only_task(memory_seed: bool = False) -> CanonicalTask:
setup = (
TaskSetup(memory_seed=[{"key": "stack", "value": "React, Node"}])
if memory_seed
else TaskSetup()
)
return from_task_definition(
TaskDefinition(
id="hermes-files-only",
name="Hermes files-only",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=setup,
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="List the workspace files.")],
),
completion=CompletionSpec(
files=[FileState(path="main.py", exists=True)],
execution_checks=[ExecutionCheck(name="noop", command="true")],
),
)
)
def _memory_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="hermes-memory",
name="Hermes memory",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="remember stack=X")]),
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
),
)
)
def _make_adapter() -> tuple[HermesAdapter, list[_StubRunner]]:
runners: list[_StubRunner] = []
def _factory(**kwargs):
runner = _StubRunner(**kwargs)
runners.append(runner)
return runner
adapter = HermesAdapter(
HermesAdapterConfig(model="stub-model", runner_factory=_factory)
)
return adapter, runners
def _make_ctx(task: CanonicalTask, workspace: Path) -> AdapterContext:
return AdapterContext(
task=task,
workspace=workspace,
runtime_values={},
run_index=0,
model="stub-model",
transcript=Transcript(),
)
# ---------------------------------------------------------------------------
# Registration + capability shape
# ---------------------------------------------------------------------------
def test_hermes_adapter_is_registered() -> None:
cls = get_adapter("hermes")
assert cls is HermesAdapter
def test_hermes_capabilities_are_files_and_execution_only() -> None:
assert HermesAdapter.capabilities == {
AdapterCapability.FILES,
AdapterCapability.EXECUTION,
}
def test_hermes_supports_files_only_task() -> None:
task = _files_only_task()
assert HermesAdapter.supports(task)
def test_hermes_does_not_support_memory_task() -> None:
task = _memory_task()
assert not HermesAdapter.supports(task)
missing = HermesAdapter.missing_capabilities_for(task)
assert AdapterCapability.MEMORY in missing
def test_hermes_full_agent_capabilities_cover_memory_and_dynamic_tasks() -> None:
task = _memory_task()
config = HermesAdapterConfig(model="stub-model", driver_mode="ai_agent")
assert HermesAdapter.supports(task, config)
caps = HermesAdapter.supported_capabilities(config)
assert AdapterCapability.MEMORY in caps
assert AdapterCapability.CRON in caps
assert AdapterCapability.BROWSER in caps
assert AdapterCapability.MULTI_TURN_INJECTION in caps
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def test_setup_realizes_memory_seed_as_workspace_files(tmp_path: Path) -> None:
task = _files_only_task(memory_seed=True)
adapter, _ = _make_adapter()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
seeded = tmp_path / "memory" / "stack.md"
assert seeded.is_file()
assert "React" in seeded.read_text(encoding="utf-8")
def test_run_phase_sends_rendered_prompt_and_parses_conversation(tmp_path: Path) -> None:
task = _files_only_task()
adapter, runners = _make_adapter()
async def _go():
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
return ctx, result
ctx, result = asyncio.run(_go())
# The stub runner saw the rendered user message.
assert runners
assert runners[0].last_prompt == "List the workspace files."
# Conversation parsed into the shared transcript.
assert result.error is None
assert ctx.transcript.tool_call_sequence, "expected tool calls parsed out of Hermes conversation"
first_call = ctx.transcript.tool_call_sequence[0]
assert first_call.name == "bash"
assert first_call.input == {"cmd": "ls"}
assert "main.py" in first_call.output
assert result.adapter_metadata.get("api_calls") == 3
assert result.completed_normally is True
def test_runner_factory_uses_explicit_provider_instead_of_api_key(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
def _factory(**kwargs):
calls.append(kwargs)
return _StubRunner(model=kwargs["model"], cwd=kwargs["cwd"])
adapter = HermesAdapter(
HermesAdapterConfig(
model="stub-model",
provider="openai-codex",
base_url="https://example.invalid/v1",
api_key="secret",
runner_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
assert calls
assert calls[0]["base_url"] is None
assert calls[0]["api_key"] is None
def test_direct_openai_endpoint_strips_provider_prefix_for_hermes(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
def _factory(**kwargs):
calls.append(kwargs)
return _StubRunner(model=kwargs["model"], cwd=kwargs["cwd"])
adapter = HermesAdapter(
HermesAdapterConfig(
model="openai/gpt-5.4",
base_url="https://api.openai.com/v1",
api_key="secret",
runner_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="openai/gpt-5.4",
transcript=Transcript(),
)
await adapter.setup(ctx)
assert ctx.adapter_state["effective_model"] == "gpt-5.4"
asyncio.run(_go())
assert calls
assert calls[0]["model"] == "gpt-5.4"
def test_ai_agent_direct_endpoint_reports_custom_provider(tmp_path: Path) -> None:
task = _files_only_task()
calls: list[dict] = []
class _StubAgent:
pass
def _factory(**kwargs):
calls.append(kwargs)
return _StubAgent()
adapter = HermesAdapter(
HermesAdapterConfig(
model="openai/gpt-5.4",
base_url="https://api.openai.com/v1",
api_key="secret",
driver_mode="ai_agent",
agent_factory=_factory,
)
)
async def _go() -> None:
async with adapter:
ctx = AdapterContext(
task=task,
workspace=tmp_path,
runtime_values={},
run_index=0,
model="openai/gpt-5.4",
transcript=Transcript(),
)
await adapter.setup(ctx)
assert ctx.adapter_state["effective_model"] == "gpt-5.4"
asyncio.run(_go())
assert calls
assert calls[0]["model"] == "gpt-5.4"
assert calls[0]["base_url"] == "https://api.openai.com/v1"
assert calls[0]["api_key"] == "secret"
assert calls[0]["provider"] == "custom"
# ---------------------------------------------------------------------------
# State queries
# ---------------------------------------------------------------------------
def test_memory_query_uses_workspace_fallback(tmp_path: Path) -> None:
task = _memory_task()
adapter, _ = _make_adapter()
# Simulate a prior run that wrote a MEMORY.md into the workspace.
(tmp_path / "MEMORY.md").write_text("stack: React, Node, Postgres", encoding="utf-8")
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
assert result.capability_missing is False
def test_session_query_is_reported_as_capability_missing(tmp_path: Path) -> None:
task = _memory_task()
adapter, _ = _make_adapter()
query = StateQuery(
kind="session",
predicate="exists",
selector={},
expected={},
required_capability=AdapterCapability.SESSION,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.capability_missing is True
assert result.ok is False
# ---------------------------------------------------------------------------
# Timeouts
# ---------------------------------------------------------------------------
def test_run_phase_surfaces_runner_timeout(tmp_path: Path) -> None:
task = _files_only_task()
class _SlowRunner:
def __init__(self, **_: object) -> None:
pass
def run_task(self, prompt: str) -> dict:
import time
time.sleep(5) # will exceed the test's configured timeout
return {"conversations": [], "completed": False, "api_calls": 0}
adapter = HermesAdapter(
HermesAdapterConfig(
model="stub-model",
runner_factory=lambda **kw: _SlowRunner(**kw),
)
)
# Force a short phase timeout so the test stays fast.
task_with_short_timeout = task.model_copy(
update={
"phases": [
task.phases[0].model_copy(update={"timeout_seconds": 1})
]
}
)
async def _go():
async with adapter:
ctx = _make_ctx(task_with_short_timeout, tmp_path)
await adapter.setup(ctx)
return await adapter.run_phase(task_with_short_timeout.phases[0], ctx)
result = asyncio.run(_go())
assert result.error is not None
assert "exceeded" in result.error
assert result.completed_normally is False

193
tests/test_hermes_xml.py Normal file
View File

@ -0,0 +1,193 @@
"""Tests for `clawbench.adapters.hermes_xml.parse_conversation`.
Covers the Hermes conversation shapes we expect from the wild:
- Plain assistant turn with a single tool call + a following tool_response.
- Multiple tool calls in one assistant turn.
- Assistant turn with free-form text + a tool call.
- A malformed tool_call payload parser must recover gracefully
(no raise; surface a best-effort call).
- Name-variant keys (`function`, `parameters`) Hermes-variant models emit.
"""
from __future__ import annotations
from clawbench.adapters.hermes_xml import (
iter_tool_calls_from_conversations,
parse_chat_messages,
parse_conversation,
)
from clawbench.trajectory import annotate_transcript_tool_calls
def _conv(*entries: dict[str, str]) -> dict:
return {"conversations": list(entries), "completed": True, "api_calls": 1}
def test_single_tool_call_with_response() -> None:
convo = _conv(
{"from": "system", "value": "You are a helpful coding agent."},
{"from": "user", "value": "List files."},
{
"from": "assistant",
"value": "I'll run `ls`.\n"
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>',
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"main.py\\nREADME"}</tool_response>',
},
)
transcript = parse_conversation(convo)
calls = transcript.tool_call_sequence
assert len(calls) == 1
assert calls[0].name == "bash"
assert calls[0].input == {"cmd": "ls"}
assert "main.py" in calls[0].output
assert calls[0].success is True
# Assistant text preserved, tool-call body stripped out.
assistant = next(
msg for msg in transcript.messages if msg.role == "assistant"
)
assert "I'll run `ls`." in assistant.text
assert "<tool_call>" not in assistant.text
def test_multiple_tool_calls_in_one_turn() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"}}</tool_call>'
'<tool_call>{"name":"bash","arguments":{"cmd":"pwd"}}</tool_call>'
),
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"a"}</tool_response>',
},
{
"from": "tool",
"value": '<tool_response>{"stdout":"/tmp"}</tool_response>',
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 2
assert calls[0].input == {"cmd": "ls"}
assert calls[1].input == {"cmd": "pwd"}
assert calls[0].output == "a"
assert calls[1].output == "/tmp"
def test_malformed_json_falls_back_to_best_effort() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"bash","arguments":{"cmd":"ls"} <-- stray text }</tool_call>'
'<tool_call>{"name":"bash","arguments":{"cmd":"pwd"}}</tool_call>'
),
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
# First is malformed; parser recovers one or two calls without
# raising, and the clean second call is always captured.
assert len(calls) >= 1
assert any(c.input == {"cmd": "pwd"} for c in calls)
def test_name_variants_are_accepted() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"function":"bash","parameters":{"cmd":"ls"}}</tool_call>'
),
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 1
assert calls[0].name == "bash"
assert calls[0].input == {"cmd": "ls"}
def test_tool_error_marks_call_failed() -> None:
convo = _conv(
{
"from": "assistant",
"value": '<tool_call>{"name":"bash","arguments":{"cmd":"nonsense"}}</tool_call>',
},
{
"from": "tool",
"value": '<tool_response>{"stderr":"command not found","status":"error"}</tool_response>',
},
)
calls = iter_tool_calls_from_conversations(convo["conversations"])
assert len(calls) == 1
assert calls[0].success is False
assert "command not found" in calls[0].error
def test_orphan_tool_response_not_silently_dropped() -> None:
convo = _conv(
{
"from": "tool",
"value": '<tool_response>{"stdout":"nothing to pair with"}</tool_response>',
},
)
transcript = parse_conversation(convo)
# No calls, but one tool-role transcript message surfaces the output.
assert transcript.tool_call_sequence == []
tool_messages = [msg for msg in transcript.messages if msg.role == "tool"]
assert tool_messages
assert "nothing to pair" in tool_messages[0].tool_result_content
def test_parser_output_annotates_with_canonical_families() -> None:
convo = _conv(
{
"from": "assistant",
"value": (
'<tool_call>{"name":"str_replace_based_edit_tool",'
'"arguments":{"path":"main.py","old":"a","new":"b"}}</tool_call>'
),
},
)
transcript = parse_conversation(convo)
# Running the existing trajectory classifier over the parsed
# transcript should assign a canonical family tag to every call.
annotated = annotate_transcript_tool_calls(transcript)
families = [c.family for c in annotated.tool_call_sequence]
assert all(f for f in families), f"expected every call to get a family tag, got {families}"
assert families == ["edit"]
def test_parse_chat_messages_pairs_tool_results() -> None:
transcript = parse_chat_messages(
[
{"role": "user", "content": "List files"},
{
"role": "assistant",
"content": "I'll inspect.",
"tool_calls": [
{
"id": "call-1",
"function": {
"name": "terminal",
"arguments": "{\"command\":\"ls\"}",
},
}
],
},
{"role": "tool", "tool_call_id": "call-1", "content": "main.py"},
{"role": "assistant", "content": "Found main.py"},
]
)
calls = transcript.tool_call_sequence
assert len(calls) == 1
assert calls[0].name == "terminal"
assert calls[0].input == {"command": "ls"}
assert calls[0].output == "main.py"
assert transcript.assistant_messages[-1].text == "Found main.py"

View File

@ -0,0 +1,444 @@
"""Tests for `OpenClawAdapter` — exercised against a stub gateway.
This validates the adapter wiring (lifecycle + state-query resolution)
in isolation, before the harness is rewired through it. The stub
`GatewayClient` records every call and produces canned responses so
the adapter's branches are covered end-to-end without a real gateway.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
import pytest
from clawbench.adapters import get_adapter
from clawbench.adapters.base import AdapterContext, StateQueryResult
from clawbench.adapters.openclaw import OpenClawAdapter, OpenClawAdapterConfig
from clawbench.canonical import (
AdapterCapability,
CanonicalTask,
StateQuery,
)
from clawbench.canonical.convert import from_task_definition
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
FileState,
GatewayAssertion,
MemoryState,
SessionState,
SimulatedUser,
TaskDefinition,
TaskFamily,
TaskSetup,
Tier,
Transcript,
UserTurn,
)
# ---------------------------------------------------------------------------
# Stub GatewayClient
# ---------------------------------------------------------------------------
class _StubGateway:
"""Minimal GatewayClient stand-in for adapter tests.
Records every `create_agent`, `create_session`, `subscribe`,
`send_and_wait`, `delete_*` call in `.calls`, and serves canned
responses for the verification RPCs used by `OpenClawAdapter`.
"""
def __init__(self) -> None:
self.calls: list[tuple[str, dict[str, Any]]] = []
self.rpc_responses: dict[str, dict[str, Any]] = {}
self.send_transcript = Transcript()
async def __aenter__(self) -> "_StubGateway":
self.calls.append(("__aenter__", {}))
return self
async def __aexit__(self, *exc: object) -> None:
self.calls.append(("__aexit__", {}))
async def create_agent(self, *, name: str, workspace: str) -> str:
self.calls.append(("create_agent", {"name": name, "workspace": workspace}))
return "agent-stub"
async def create_session(self, *, model: str, agent_id: str, label: str) -> str:
self.calls.append(
("create_session", {"model": model, "agent_id": agent_id, "label": label})
)
return f"session-{label}"
async def subscribe(self, session_key: str) -> None:
self.calls.append(("subscribe", {"session_key": session_key}))
async def send_and_wait(
self,
session_key: str,
message: str,
*,
timeout: float,
) -> Transcript:
self.calls.append(
(
"send_and_wait",
{"session_key": session_key, "message": message, "timeout": timeout},
)
)
return self.send_transcript
async def delete_session(self, session_key: str) -> None:
self.calls.append(("delete_session", {"session_key": session_key}))
async def delete_agent(self, agent_id: str, *, delete_files: bool) -> None:
self.calls.append(
("delete_agent", {"agent_id": agent_id, "delete_files": delete_files})
)
async def get_effective_tools(self, session_key: str) -> dict[str, Any]:
self.calls.append(("get_effective_tools", {"session_key": session_key}))
return self.rpc_responses.get(
"tools.effective",
{"groups": [{"tools": [{"id": "bash"}, {"id": "browser"}]}]},
)
async def _rpc(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
self.calls.append((f"_rpc:{method}", dict(params)))
if method in self.rpc_responses:
return self.rpc_responses[method]
raise RuntimeError(f"stub gateway: no response set for {method}")
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _coding_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="oa-adapter-test",
name="OA adapter test",
tier=Tier.TIER1,
family=TaskFamily.CODING,
surface="coding",
setup=TaskSetup(),
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="Do the task.")],
),
completion=CompletionSpec(
files=[FileState(path="out.txt", exists=True)],
execution_checks=[ExecutionCheck(name="ok", command="true")],
),
)
)
def _mixed_state_task() -> CanonicalTask:
return from_task_definition(
TaskDefinition(
id="oa-adapter-state-test",
name="OA state test",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
pre_check_gateway=[
GatewayAssertion(
method="agents.list",
assert_path="$.count",
assert_equals=0,
),
],
),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
completion=CompletionSpec(
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
session=SessionState(should_exist=True, model_should_be="opus"),
),
)
)
def _make_adapter_and_gateway() -> tuple[OpenClawAdapter, _StubGateway]:
gateway = _StubGateway()
adapter = OpenClawAdapter(OpenClawAdapterConfig(model="test-model"))
adapter._client_factory = lambda: gateway # type: ignore[assignment]
return adapter, gateway
def _make_ctx(task: CanonicalTask, workspace: Path) -> AdapterContext:
return AdapterContext(
task=task,
workspace=workspace,
runtime_values={},
run_index=0,
model="test-model",
transcript=Transcript(),
)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def test_openclaw_adapter_is_registered() -> None:
cls = get_adapter("openclaw")
assert cls is OpenClawAdapter
def test_openclaw_declares_full_capability_set() -> None:
assert AdapterCapability.FILES in OpenClawAdapter.capabilities
assert AdapterCapability.EXECUTION in OpenClawAdapter.capabilities
assert AdapterCapability.MEMORY in OpenClawAdapter.capabilities
assert AdapterCapability.SESSION in OpenClawAdapter.capabilities
assert AdapterCapability.CRON in OpenClawAdapter.capabilities
assert AdapterCapability.GATEWAY_RPC in OpenClawAdapter.capabilities
assert AdapterCapability.BROWSER in OpenClawAdapter.capabilities
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def test_setup_realizes_memory_seed_files(tmp_path: Path) -> None:
task = from_task_definition(
TaskDefinition(
id="oa-seeded-memory",
name="OA seeded memory",
tier=Tier.TIER2,
family=TaskFamily.MULTI_TOOL,
surface="tools",
setup=TaskSetup(
memory_seed=[
{
"key": "event profile",
"value": "Vegetarian food, quiet rooms, and no stairs.",
}
]
),
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
)
)
adapter, gateway = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
asyncio.run(_go())
assert (tmp_path / "MEMORY.md").read_text(encoding="utf-8").count("event profile") == 1
assert "Vegetarian food" in (tmp_path / "memory" / "event_profile.md").read_text(encoding="utf-8")
assert any(call[0] == "create_agent" for call in gateway.calls)
def test_run_phase_creates_session_subscribes_and_drives_simulator(tmp_path: Path) -> None:
task = _coding_task()
adapter, gateway = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
result = await adapter.run_phase(task.phases[0], ctx)
assert result.error is None
await adapter.teardown(ctx)
asyncio.run(_go())
methods = [name for name, _ in gateway.calls]
# Ordered sequence we expect:
assert "create_agent" in methods
assert "create_session" in methods
assert "subscribe" in methods
assert "send_and_wait" in methods
assert "delete_session" in methods
assert "delete_agent" in methods
# The send_and_wait call should use the rendered user turn text.
send_args = next(args for name, args in gateway.calls if name == "send_and_wait")
assert send_args["message"] == "Do the task."
def test_run_phase_fails_fast_without_setup(tmp_path: Path) -> None:
task = _coding_task()
adapter, _ = _make_adapter_and_gateway()
async def _go() -> None:
async with adapter:
ctx = _make_ctx(task, tmp_path)
# Skip setup() — run_phase should return an error phase.
result = await adapter.run_phase(task.phases[0], ctx)
assert result.completed_normally is False
assert result.error and "agent_id" in result.error
asyncio.run(_go())
# ---------------------------------------------------------------------------
# State queries
# ---------------------------------------------------------------------------
def test_memory_query_uses_memory_search_primary_path(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["memory.search"] = {
"payload": {"entries": [{"value": "stack = React, Node, Postgres"}]}
}
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
assert result.detail == "OK"
def test_memory_query_falls_back_to_workspace_on_rpc_failure(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
# No memory.search response → primary path raises, fallback runs.
# Seed a MEMORY.md file in the workspace so the fallback succeeds.
(tmp_path / "MEMORY.md").write_text(
"stack: React, Node, Postgres", encoding="utf-8"
)
query = StateQuery(
kind="memory",
predicate="exists",
selector={"key_pattern": "stack"},
expected={"value_contains": ["React"]},
required_capability=AdapterCapability.MEMORY,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_session_query_uses_sessions_resolve(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["sessions.resolve"] = {
"payload": {"model": "claude-opus-4"}
}
query = StateQuery(
kind="session",
predicate="exists",
selector={},
expected={"model": "opus"},
required_capability=AdapterCapability.SESSION,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
ctx.adapter_state["last_session_key"] = "some-session"
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_gateway_query_resolves_json_path(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["memory.list"] = {
"payload": {"count": 3}
}
query = StateQuery(
kind="custom",
predicate="equals",
selector={"method": "memory.list", "params": {}, "assert_path": "$.count"},
expected={"equals": 3, "exists": True},
required_capability=AdapterCapability.GATEWAY_RPC,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is True
def test_cron_query_returns_false_when_no_jobs(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
gateway.rpc_responses["cron.list"] = {"payload": {"jobs": []}}
query = StateQuery(
kind="cron",
predicate="exists",
selector={"description_contains": "daily"},
expected={},
required_capability=AdapterCapability.CRON,
)
async def _go() -> StateQueryResult:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return await adapter.verify_state_query(query, ctx)
result = asyncio.run(_go())
assert result.ok is False
def test_pre_run_queries_evaluated_during_setup(tmp_path: Path) -> None:
task = _mixed_state_task()
adapter, gateway = _make_adapter_and_gateway()
# Deliberately return the wrong count to trigger a pre-run failure.
gateway.rpc_responses["agents.list"] = {"payload": {"count": 99}}
async def _go() -> list[str]:
async with adapter:
ctx = _make_ctx(task, tmp_path)
await adapter.setup(ctx)
return ctx.adapter_state.get("pre_run_failures", [])
failures = asyncio.run(_go())
assert failures, "pre-run gateway assertion should have failed"
# ---------------------------------------------------------------------------
# Requires-context guard
# ---------------------------------------------------------------------------
def test_client_accessor_errors_when_not_in_context() -> None:
adapter, _ = _make_adapter_and_gateway()
with pytest.raises(RuntimeError):
_ = adapter.client