Compare commits

...

1 Commits

Author SHA1 Message Date
Vincent Koc
82bcfc1891
fix(worker): harden runtime result writes
Some checks failed
CI / Python ${{ matrix.python-version }} test suite (3.11) (push) Has been cancelled
CI / Python ${{ matrix.python-version }} test suite (3.12) (push) Has been cancelled
2026-04-29 13:16:40 -07:00
3 changed files with 339 additions and 0 deletions

View File

@ -39,6 +39,9 @@ jobs:
- name: Run static lint
run: python -m ruff check clawbench app.py scripts tests
- name: Run runtime contract smoke tests
run: python -m pytest -q tests/test_runtime_contracts.py
- name: Run test suite
run: python -m pytest -q

View File

@ -225,6 +225,7 @@ class EvalWorker:
job.job_id,
progress.mark_status("Uploading results", clear_active=True),
)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
result_path = RESULTS_DIR / f"{result.submission_id}.json"
result_path.write_text(json.dumps(result.model_dump(), indent=2), encoding="utf-8")

View File

@ -0,0 +1,335 @@
from __future__ import annotations
import datetime
import importlib
import json
import sys
import threading
from pathlib import Path
import pytest
from clawbench.client import GatewayConfig
from clawbench.harness import BenchmarkHarness
from clawbench.queue import Job, JobQueue, JobStatus, SubmissionRequest
import clawbench.queue as queue_module
from clawbench.schemas import (
CompletionSpec,
ExecutionCheck,
SimulatedUser,
TaskDefinition,
TaskFamily,
Tier,
ToolCall,
TrajectoryExpectations,
Transcript,
TranscriptMessage,
UserTurn,
)
from clawbench.worker import EvalWorker
def _runtime_task() -> TaskDefinition:
return TaskDefinition(
id="runtime-contract-smoke",
name="Runtime Contract Smoke",
tier=Tier.TIER1,
family=TaskFamily.TOOLS,
surface="tools",
user=SimulatedUser(
max_turns=1,
turns=[UserTurn(message="create answer.txt with runtime ok, then verify it")],
),
completion=CompletionSpec(
execution_checks=[
ExecutionCheck(
name="answer artifact",
command=(
"{python_exe} -c "
"\"from pathlib import Path; "
"assert Path('answer.txt').read_text(encoding='utf-8') == 'runtime ok\\n'\""
),
)
]
),
trajectory=TrajectoryExpectations(
required_families=["read", "edit", "execute"],
min_distinct_families=3,
require_read_before_mutation=True,
require_self_verification=True,
),
)
class _GatewayState:
def __init__(self) -> None:
self.agent_workspaces: dict[str, Path] = {}
self.session_agents: dict[str, str] = {}
self.deleted_sessions: list[str] = []
self.deleted_agents: list[str] = []
class _SuccessfulGatewayClient:
state = _GatewayState()
def __init__(self, config: GatewayConfig | None = None) -> None:
self.config = config or GatewayConfig()
async def __aenter__(self) -> _SuccessfulGatewayClient:
return self
async def __aexit__(self, *exc: object) -> None:
return None
async def create_agent(self, *, name: str, workspace: str) -> str:
agent_id = f"agent-{len(self.state.agent_workspaces) + 1}"
self.state.agent_workspaces[agent_id] = Path(workspace)
return agent_id
async def create_session(self, *, model: str, agent_id: str, label: str) -> str: # noqa: ARG002
session_key = f"session-{len(self.state.session_agents) + 1}"
self.state.session_agents[session_key] = agent_id
return session_key
async def subscribe(self, session_key: str) -> None: # noqa: ARG002
return None
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
workspace = self.state.agent_workspaces[self.state.session_agents[session_key]]
(workspace / "answer.txt").write_text("runtime ok\n", encoding="utf-8")
return Transcript(
messages=[
TranscriptMessage(
role="assistant",
text="i'll inspect, write the answer, then verify it.",
tool_calls=[
ToolCall(
name="read_file",
input={"path": "answer.txt"},
output="missing",
success=True,
),
ToolCall(
name="write_file",
input={"path": "answer.txt"},
output="wrote answer.txt",
success=True,
),
ToolCall(
name="shell",
input={"command": "python -m pytest -q"},
output="1 passed",
success=True,
),
],
),
TranscriptMessage(role="assistant", text="done, verified."),
]
)
async def delete_session(self, session_key: str) -> None:
self.state.deleted_sessions.append(session_key)
async def delete_agent(self, agent_id: str, *, delete_files: bool = False) -> None: # noqa: ARG002
self.state.deleted_agents.append(agent_id)
class _DisconnectingGatewayClient(_SuccessfulGatewayClient):
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
raise ConnectionError("gateway connection dropped")
@pytest.mark.asyncio
async def test_queue_worker_harness_scorer_happy_path_writes_result(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
queue_dir = tmp_path / "queue"
results_dir = tmp_path / "results"
state_dir = tmp_path / "state"
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", queue_dir)
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
monkeypatch.setattr("clawbench.worker.RESULTS_DIR", results_dir)
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(state_dir))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
async def fake_upload_result(result) -> None: # noqa: ANN001
return None
async def fake_ensure_gateway() -> None:
return None
async def fake_preflight_browser_support_for_tasks(*args, **kwargs) -> None: # noqa: ANN002, ANN003
return None
task = _runtime_task()
queue = JobQueue()
job = await queue.submit(
SubmissionRequest(
model="test/model",
provider="test",
runs_per_task=1,
max_parallel_lanes=1,
)
)
claimed = await queue.claim_pending()
assert [claimed_job.job_id for claimed_job in claimed] == [job.job_id]
worker = EvalWorker(queue)
monkeypatch.setattr(worker, "_load_job_tasks", lambda current_job: [task])
monkeypatch.setattr("clawbench.harness.load_all_tasks", lambda **kwargs: [task])
monkeypatch.setattr(worker, "_ensure_gateway", fake_ensure_gateway)
monkeypatch.setattr(worker, "_preflight_browser_support_for_tasks", fake_preflight_browser_support_for_tasks)
monkeypatch.setattr(worker, "_stop_gateway", lambda: None)
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: None)
monkeypatch.setattr("clawbench.upload.upload_result", fake_upload_result)
await worker._process_job(claimed[0])
finished = await queue.get_status(job.job_id)
assert finished is not None
assert finished.status == JobStatus.FINISHED
assert finished.result_id is not None
assert finished.progress_message == "Finished"
result_path = results_dir / f"{finished.result_id}.json"
result = json.loads(result_path.read_text(encoding="utf-8"))
assert result["model"] == "test/model"
assert result["overall_completion"] == 1.0
assert result["overall_pass_hat_k"] == 1.0
assert result["task_results"][0]["task_id"] == "runtime-contract-smoke"
@pytest.mark.asyncio
async def test_harness_turn_disconnect_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _DisconnectingGatewayClient)
harness = BenchmarkHarness(
gateway_config=GatewayConfig(),
model="test/model",
randomize_order=False,
print_report=False,
quiet=True,
)
result = await harness._run_single(_runtime_task(), 0)
assert result.run_score == 0.0
assert result.delivery_outcome.value == "fail"
assert result.failure_mode is not None
assert result.failure_mode.value == "environment_unavailable"
assert "gateway connection dropped" in (result.error or "")
@pytest.mark.asyncio
async def test_harness_scorer_exception_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
async def fail_score_task_run(**kwargs): # noqa: ANN003
raise RuntimeError("scorer exploded")
monkeypatch.setattr("clawbench.harness.score_task_run", fail_score_task_run)
harness = BenchmarkHarness(
gateway_config=GatewayConfig(),
model="test/model",
randomize_order=False,
print_report=False,
quiet=True,
)
result = await harness._run_single(_runtime_task(), 0)
assert result.run_score == 0.0
assert result.delivery_outcome.value == "fail"
assert result.failure_mode is not None
assert result.failure_mode.value == "state_regression"
assert result.error == "scorer exploded"
@pytest.mark.asyncio
async def test_stale_evaluating_job_can_be_reclaimed_and_claimed_again(monkeypatch: pytest.MonkeyPatch):
queue = JobQueue()
stale_started_at = (
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
).isoformat()
queue._jobs = {
"job-1": Job(
job_id="job-1",
status=JobStatus.EVALUATING,
started_at=stale_started_at,
last_progress_at=stale_started_at,
current_task_id="runtime-contract-smoke",
current_run_index=1,
current_run_total=1,
attempt_count=1,
request=SubmissionRequest(model="test/model"),
)
}
monkeypatch.setattr(queue, "_save_local", lambda: None)
async def fake_sync_to_hub() -> None:
return None
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync_to_hub)
reclaimed = await queue.reclaim_stale_jobs(stale_after_seconds=300)
claimed = await queue.claim_pending()
assert [job.job_id for job in reclaimed] == ["job-1"]
assert [job.job_id for job in claimed] == ["job-1"]
job = queue._jobs["job-1"]
assert job.status == JobStatus.EVALUATING
assert job.attempt_count == 2
assert job.stale_requeues == 1
assert job.current_task_id is None
assert job.current_run_index is None
assert job.progress_message == "Queued for evaluation"
def test_leaderboard_skips_malformed_local_result_file(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
):
class NoopThread:
def __init__(self, *args, **kwargs) -> None: # noqa: ANN002, ANN003
return None
def start(self) -> None:
return None
monkeypatch.setattr(threading, "Thread", NoopThread)
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", tmp_path / "queue")
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
sys.modules.pop("app", None)
app = importlib.import_module("app")
results_dir = tmp_path / "results"
results_dir.mkdir()
(results_dir / "bad.json").write_text("{not json", encoding="utf-8")
(results_dir / "good.json").write_text(
json.dumps(
{
"model": "test/model",
"timestamp": "2026-04-29T00:00:00+00:00",
"overall_score": 0.91,
"overall_completion": 1.0,
"overall_trajectory": 0.8,
"overall_behavior": 1.0,
"overall_pass_hat_k": 1.0,
"environment": {"prompt_variant": "clear", "scenario": "all"},
"task_results": [{"task_id": "runtime-contract-smoke"}],
}
),
encoding="utf-8",
)
monkeypatch.setattr(app, "RESULTS_DIR", results_dir)
monkeypatch.setattr(app, "dataset_has_submission_results", lambda api, repo: False)
frame = app._load_leaderboard_uncached()
assert list(frame["Model"]) == ["test/model"]
assert list(frame["Score"]) == [0.91]