445 lines
15 KiB
Python
445 lines
15 KiB
Python
"""Tests for `OpenClawAdapter` — exercised against a stub gateway.
|
|
|
|
This validates the adapter wiring (lifecycle + state-query resolution)
|
|
in isolation, before the harness is rewired through it. The stub
|
|
`GatewayClient` records every call and produces canned responses so
|
|
the adapter's branches are covered end-to-end without a real gateway.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from clawbench.adapters import get_adapter
|
|
from clawbench.adapters.base import AdapterContext, StateQueryResult
|
|
from clawbench.adapters.openclaw import OpenClawAdapter, OpenClawAdapterConfig
|
|
from clawbench.canonical import (
|
|
AdapterCapability,
|
|
CanonicalTask,
|
|
StateQuery,
|
|
)
|
|
from clawbench.canonical.convert import from_task_definition
|
|
from clawbench.schemas import (
|
|
CompletionSpec,
|
|
ExecutionCheck,
|
|
FileState,
|
|
GatewayAssertion,
|
|
MemoryState,
|
|
SessionState,
|
|
SimulatedUser,
|
|
TaskDefinition,
|
|
TaskFamily,
|
|
TaskSetup,
|
|
Tier,
|
|
Transcript,
|
|
UserTurn,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stub GatewayClient
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _StubGateway:
|
|
"""Minimal GatewayClient stand-in for adapter tests.
|
|
|
|
Records every `create_agent`, `create_session`, `subscribe`,
|
|
`send_and_wait`, `delete_*` call in `.calls`, and serves canned
|
|
responses for the verification RPCs used by `OpenClawAdapter`.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[str, dict[str, Any]]] = []
|
|
self.rpc_responses: dict[str, dict[str, Any]] = {}
|
|
self.send_transcript = Transcript()
|
|
|
|
async def __aenter__(self) -> "_StubGateway":
|
|
self.calls.append(("__aenter__", {}))
|
|
return self
|
|
|
|
async def __aexit__(self, *exc: object) -> None:
|
|
self.calls.append(("__aexit__", {}))
|
|
|
|
async def create_agent(self, *, name: str, workspace: str) -> str:
|
|
self.calls.append(("create_agent", {"name": name, "workspace": workspace}))
|
|
return "agent-stub"
|
|
|
|
async def create_session(self, *, model: str, agent_id: str, label: str) -> str:
|
|
self.calls.append(
|
|
("create_session", {"model": model, "agent_id": agent_id, "label": label})
|
|
)
|
|
return f"session-{label}"
|
|
|
|
async def subscribe(self, session_key: str) -> None:
|
|
self.calls.append(("subscribe", {"session_key": session_key}))
|
|
|
|
async def send_and_wait(
|
|
self,
|
|
session_key: str,
|
|
message: str,
|
|
*,
|
|
timeout: float,
|
|
) -> Transcript:
|
|
self.calls.append(
|
|
(
|
|
"send_and_wait",
|
|
{"session_key": session_key, "message": message, "timeout": timeout},
|
|
)
|
|
)
|
|
return self.send_transcript
|
|
|
|
async def delete_session(self, session_key: str) -> None:
|
|
self.calls.append(("delete_session", {"session_key": session_key}))
|
|
|
|
async def delete_agent(self, agent_id: str, *, delete_files: bool) -> None:
|
|
self.calls.append(
|
|
("delete_agent", {"agent_id": agent_id, "delete_files": delete_files})
|
|
)
|
|
|
|
async def get_effective_tools(self, session_key: str) -> dict[str, Any]:
|
|
self.calls.append(("get_effective_tools", {"session_key": session_key}))
|
|
return self.rpc_responses.get(
|
|
"tools.effective",
|
|
{"groups": [{"tools": [{"id": "bash"}, {"id": "browser"}]}]},
|
|
)
|
|
|
|
async def _rpc(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
|
|
self.calls.append((f"_rpc:{method}", dict(params)))
|
|
if method in self.rpc_responses:
|
|
return self.rpc_responses[method]
|
|
raise RuntimeError(f"stub gateway: no response set for {method}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _coding_task() -> CanonicalTask:
|
|
return from_task_definition(
|
|
TaskDefinition(
|
|
id="oa-adapter-test",
|
|
name="OA adapter test",
|
|
tier=Tier.TIER1,
|
|
family=TaskFamily.CODING,
|
|
surface="coding",
|
|
setup=TaskSetup(),
|
|
user=SimulatedUser(
|
|
max_turns=1,
|
|
turns=[UserTurn(message="Do the task.")],
|
|
),
|
|
completion=CompletionSpec(
|
|
files=[FileState(path="out.txt", exists=True)],
|
|
execution_checks=[ExecutionCheck(name="ok", command="true")],
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
def _mixed_state_task() -> CanonicalTask:
|
|
return from_task_definition(
|
|
TaskDefinition(
|
|
id="oa-adapter-state-test",
|
|
name="OA state test",
|
|
tier=Tier.TIER2,
|
|
family=TaskFamily.MULTI_TOOL,
|
|
surface="tools",
|
|
setup=TaskSetup(
|
|
pre_check_gateway=[
|
|
GatewayAssertion(
|
|
method="agents.list",
|
|
assert_path="$.count",
|
|
assert_equals=0,
|
|
),
|
|
],
|
|
),
|
|
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
|
|
completion=CompletionSpec(
|
|
memory=[MemoryState(key_pattern="stack", exists=True, value_contains=["React"])],
|
|
session=SessionState(should_exist=True, model_should_be="opus"),
|
|
),
|
|
)
|
|
)
|
|
|
|
|
|
def _make_adapter_and_gateway() -> tuple[OpenClawAdapter, _StubGateway]:
|
|
gateway = _StubGateway()
|
|
adapter = OpenClawAdapter(OpenClawAdapterConfig(model="test-model"))
|
|
adapter._client_factory = lambda: gateway # type: ignore[assignment]
|
|
return adapter, gateway
|
|
|
|
|
|
def _make_ctx(task: CanonicalTask, workspace: Path) -> AdapterContext:
|
|
return AdapterContext(
|
|
task=task,
|
|
workspace=workspace,
|
|
runtime_values={},
|
|
run_index=0,
|
|
model="test-model",
|
|
transcript=Transcript(),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_openclaw_adapter_is_registered() -> None:
|
|
cls = get_adapter("openclaw")
|
|
assert cls is OpenClawAdapter
|
|
|
|
|
|
def test_openclaw_declares_full_capability_set() -> None:
|
|
assert AdapterCapability.FILES in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.EXECUTION in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.MEMORY in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.SESSION in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.CRON in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.GATEWAY_RPC in OpenClawAdapter.capabilities
|
|
assert AdapterCapability.BROWSER in OpenClawAdapter.capabilities
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_setup_realizes_memory_seed_files(tmp_path: Path) -> None:
|
|
task = from_task_definition(
|
|
TaskDefinition(
|
|
id="oa-seeded-memory",
|
|
name="OA seeded memory",
|
|
tier=Tier.TIER2,
|
|
family=TaskFamily.MULTI_TOOL,
|
|
surface="tools",
|
|
setup=TaskSetup(
|
|
memory_seed=[
|
|
{
|
|
"key": "event profile",
|
|
"value": "Vegetarian food, quiet rooms, and no stairs.",
|
|
}
|
|
]
|
|
),
|
|
user=SimulatedUser(max_turns=1, turns=[UserTurn(message="go")]),
|
|
)
|
|
)
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
|
|
async def _go() -> None:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
|
|
asyncio.run(_go())
|
|
|
|
assert (tmp_path / "MEMORY.md").read_text(encoding="utf-8").count("event profile") == 1
|
|
assert "Vegetarian food" in (tmp_path / "memory" / "event_profile.md").read_text(encoding="utf-8")
|
|
assert any(call[0] == "create_agent" for call in gateway.calls)
|
|
|
|
|
|
def test_run_phase_creates_session_subscribes_and_drives_simulator(tmp_path: Path) -> None:
|
|
task = _coding_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
|
|
async def _go() -> None:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
result = await adapter.run_phase(task.phases[0], ctx)
|
|
assert result.error is None
|
|
await adapter.teardown(ctx)
|
|
|
|
asyncio.run(_go())
|
|
|
|
methods = [name for name, _ in gateway.calls]
|
|
# Ordered sequence we expect:
|
|
assert "create_agent" in methods
|
|
assert "create_session" in methods
|
|
assert "subscribe" in methods
|
|
assert "send_and_wait" in methods
|
|
assert "delete_session" in methods
|
|
assert "delete_agent" in methods
|
|
# The send_and_wait call should use the rendered user turn text.
|
|
send_args = next(args for name, args in gateway.calls if name == "send_and_wait")
|
|
assert send_args["message"] == "Do the task."
|
|
|
|
|
|
def test_run_phase_fails_fast_without_setup(tmp_path: Path) -> None:
|
|
task = _coding_task()
|
|
adapter, _ = _make_adapter_and_gateway()
|
|
|
|
async def _go() -> None:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
# Skip setup() — run_phase should return an error phase.
|
|
result = await adapter.run_phase(task.phases[0], ctx)
|
|
assert result.completed_normally is False
|
|
assert result.error and "agent_id" in result.error
|
|
|
|
asyncio.run(_go())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State queries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_memory_query_uses_memory_search_primary_path(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
gateway.rpc_responses["memory.search"] = {
|
|
"payload": {"entries": [{"value": "stack = React, Node, Postgres"}]}
|
|
}
|
|
|
|
query = StateQuery(
|
|
kind="memory",
|
|
predicate="exists",
|
|
selector={"key_pattern": "stack"},
|
|
expected={"value_contains": ["React"]},
|
|
required_capability=AdapterCapability.MEMORY,
|
|
)
|
|
|
|
async def _go() -> StateQueryResult:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
return await adapter.verify_state_query(query, ctx)
|
|
|
|
result = asyncio.run(_go())
|
|
assert result.ok is True
|
|
assert result.detail == "OK"
|
|
|
|
|
|
def test_memory_query_falls_back_to_workspace_on_rpc_failure(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
# No memory.search response → primary path raises, fallback runs.
|
|
# Seed a MEMORY.md file in the workspace so the fallback succeeds.
|
|
(tmp_path / "MEMORY.md").write_text(
|
|
"stack: React, Node, Postgres", encoding="utf-8"
|
|
)
|
|
|
|
query = StateQuery(
|
|
kind="memory",
|
|
predicate="exists",
|
|
selector={"key_pattern": "stack"},
|
|
expected={"value_contains": ["React"]},
|
|
required_capability=AdapterCapability.MEMORY,
|
|
)
|
|
|
|
async def _go() -> StateQueryResult:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
return await adapter.verify_state_query(query, ctx)
|
|
|
|
result = asyncio.run(_go())
|
|
assert result.ok is True
|
|
|
|
|
|
def test_session_query_uses_sessions_resolve(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
gateway.rpc_responses["sessions.resolve"] = {
|
|
"payload": {"model": "claude-opus-4"}
|
|
}
|
|
|
|
query = StateQuery(
|
|
kind="session",
|
|
predicate="exists",
|
|
selector={},
|
|
expected={"model": "opus"},
|
|
required_capability=AdapterCapability.SESSION,
|
|
)
|
|
|
|
async def _go() -> StateQueryResult:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
ctx.adapter_state["last_session_key"] = "some-session"
|
|
return await adapter.verify_state_query(query, ctx)
|
|
|
|
result = asyncio.run(_go())
|
|
assert result.ok is True
|
|
|
|
|
|
def test_gateway_query_resolves_json_path(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
gateway.rpc_responses["memory.list"] = {
|
|
"payload": {"count": 3}
|
|
}
|
|
|
|
query = StateQuery(
|
|
kind="custom",
|
|
predicate="equals",
|
|
selector={"method": "memory.list", "params": {}, "assert_path": "$.count"},
|
|
expected={"equals": 3, "exists": True},
|
|
required_capability=AdapterCapability.GATEWAY_RPC,
|
|
)
|
|
|
|
async def _go() -> StateQueryResult:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
return await adapter.verify_state_query(query, ctx)
|
|
|
|
result = asyncio.run(_go())
|
|
assert result.ok is True
|
|
|
|
|
|
def test_cron_query_returns_false_when_no_jobs(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
gateway.rpc_responses["cron.list"] = {"payload": {"jobs": []}}
|
|
|
|
query = StateQuery(
|
|
kind="cron",
|
|
predicate="exists",
|
|
selector={"description_contains": "daily"},
|
|
expected={},
|
|
required_capability=AdapterCapability.CRON,
|
|
)
|
|
|
|
async def _go() -> StateQueryResult:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
return await adapter.verify_state_query(query, ctx)
|
|
|
|
result = asyncio.run(_go())
|
|
assert result.ok is False
|
|
|
|
|
|
def test_pre_run_queries_evaluated_during_setup(tmp_path: Path) -> None:
|
|
task = _mixed_state_task()
|
|
adapter, gateway = _make_adapter_and_gateway()
|
|
# Deliberately return the wrong count to trigger a pre-run failure.
|
|
gateway.rpc_responses["agents.list"] = {"payload": {"count": 99}}
|
|
|
|
async def _go() -> list[str]:
|
|
async with adapter:
|
|
ctx = _make_ctx(task, tmp_path)
|
|
await adapter.setup(ctx)
|
|
return ctx.adapter_state.get("pre_run_failures", [])
|
|
|
|
failures = asyncio.run(_go())
|
|
assert failures, "pre-run gateway assertion should have failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Requires-context guard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_client_accessor_errors_when_not_in_context() -> None:
|
|
adapter, _ = _make_adapter_and_gateway()
|
|
with pytest.raises(RuntimeError):
|
|
_ = adapter.client
|