fix(worker): harden runtime result writes
This commit is contained in:
parent
ea17c715b3
commit
82bcfc1891
3
.github/workflows/ci.yml
vendored
3
.github/workflows/ci.yml
vendored
@ -39,6 +39,9 @@ jobs:
|
||||
- name: Run static lint
|
||||
run: python -m ruff check clawbench app.py scripts tests
|
||||
|
||||
- name: Run runtime contract smoke tests
|
||||
run: python -m pytest -q tests/test_runtime_contracts.py
|
||||
|
||||
- name: Run test suite
|
||||
run: python -m pytest -q
|
||||
|
||||
|
||||
@ -225,6 +225,7 @@ class EvalWorker:
|
||||
job.job_id,
|
||||
progress.mark_status("Uploading results", clear_active=True),
|
||||
)
|
||||
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
result_path = RESULTS_DIR / f"{result.submission_id}.json"
|
||||
result_path.write_text(json.dumps(result.model_dump(), indent=2), encoding="utf-8")
|
||||
|
||||
|
||||
335
tests/test_runtime_contracts.py
Normal file
335
tests/test_runtime_contracts.py
Normal file
@ -0,0 +1,335 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import importlib
|
||||
import json
|
||||
import sys
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from clawbench.client import GatewayConfig
|
||||
from clawbench.harness import BenchmarkHarness
|
||||
from clawbench.queue import Job, JobQueue, JobStatus, SubmissionRequest
|
||||
import clawbench.queue as queue_module
|
||||
from clawbench.schemas import (
|
||||
CompletionSpec,
|
||||
ExecutionCheck,
|
||||
SimulatedUser,
|
||||
TaskDefinition,
|
||||
TaskFamily,
|
||||
Tier,
|
||||
ToolCall,
|
||||
TrajectoryExpectations,
|
||||
Transcript,
|
||||
TranscriptMessage,
|
||||
UserTurn,
|
||||
)
|
||||
from clawbench.worker import EvalWorker
|
||||
|
||||
|
||||
def _runtime_task() -> TaskDefinition:
|
||||
return TaskDefinition(
|
||||
id="runtime-contract-smoke",
|
||||
name="Runtime Contract Smoke",
|
||||
tier=Tier.TIER1,
|
||||
family=TaskFamily.TOOLS,
|
||||
surface="tools",
|
||||
user=SimulatedUser(
|
||||
max_turns=1,
|
||||
turns=[UserTurn(message="create answer.txt with runtime ok, then verify it")],
|
||||
),
|
||||
completion=CompletionSpec(
|
||||
execution_checks=[
|
||||
ExecutionCheck(
|
||||
name="answer artifact",
|
||||
command=(
|
||||
"{python_exe} -c "
|
||||
"\"from pathlib import Path; "
|
||||
"assert Path('answer.txt').read_text(encoding='utf-8') == 'runtime ok\\n'\""
|
||||
),
|
||||
)
|
||||
]
|
||||
),
|
||||
trajectory=TrajectoryExpectations(
|
||||
required_families=["read", "edit", "execute"],
|
||||
min_distinct_families=3,
|
||||
require_read_before_mutation=True,
|
||||
require_self_verification=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class _GatewayState:
|
||||
def __init__(self) -> None:
|
||||
self.agent_workspaces: dict[str, Path] = {}
|
||||
self.session_agents: dict[str, str] = {}
|
||||
self.deleted_sessions: list[str] = []
|
||||
self.deleted_agents: list[str] = []
|
||||
|
||||
|
||||
class _SuccessfulGatewayClient:
|
||||
state = _GatewayState()
|
||||
|
||||
def __init__(self, config: GatewayConfig | None = None) -> None:
|
||||
self.config = config or GatewayConfig()
|
||||
|
||||
async def __aenter__(self) -> _SuccessfulGatewayClient:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc: object) -> None:
|
||||
return None
|
||||
|
||||
async def create_agent(self, *, name: str, workspace: str) -> str:
|
||||
agent_id = f"agent-{len(self.state.agent_workspaces) + 1}"
|
||||
self.state.agent_workspaces[agent_id] = Path(workspace)
|
||||
return agent_id
|
||||
|
||||
async def create_session(self, *, model: str, agent_id: str, label: str) -> str: # noqa: ARG002
|
||||
session_key = f"session-{len(self.state.session_agents) + 1}"
|
||||
self.state.session_agents[session_key] = agent_id
|
||||
return session_key
|
||||
|
||||
async def subscribe(self, session_key: str) -> None: # noqa: ARG002
|
||||
return None
|
||||
|
||||
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
|
||||
workspace = self.state.agent_workspaces[self.state.session_agents[session_key]]
|
||||
(workspace / "answer.txt").write_text("runtime ok\n", encoding="utf-8")
|
||||
return Transcript(
|
||||
messages=[
|
||||
TranscriptMessage(
|
||||
role="assistant",
|
||||
text="i'll inspect, write the answer, then verify it.",
|
||||
tool_calls=[
|
||||
ToolCall(
|
||||
name="read_file",
|
||||
input={"path": "answer.txt"},
|
||||
output="missing",
|
||||
success=True,
|
||||
),
|
||||
ToolCall(
|
||||
name="write_file",
|
||||
input={"path": "answer.txt"},
|
||||
output="wrote answer.txt",
|
||||
success=True,
|
||||
),
|
||||
ToolCall(
|
||||
name="shell",
|
||||
input={"command": "python -m pytest -q"},
|
||||
output="1 passed",
|
||||
success=True,
|
||||
),
|
||||
],
|
||||
),
|
||||
TranscriptMessage(role="assistant", text="done, verified."),
|
||||
]
|
||||
)
|
||||
|
||||
async def delete_session(self, session_key: str) -> None:
|
||||
self.state.deleted_sessions.append(session_key)
|
||||
|
||||
async def delete_agent(self, agent_id: str, *, delete_files: bool = False) -> None: # noqa: ARG002
|
||||
self.state.deleted_agents.append(agent_id)
|
||||
|
||||
|
||||
class _DisconnectingGatewayClient(_SuccessfulGatewayClient):
|
||||
async def send_and_wait(self, session_key: str, message: str, *, timeout: float) -> Transcript: # noqa: ARG002
|
||||
raise ConnectionError("gateway connection dropped")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queue_worker_harness_scorer_happy_path_writes_result(
|
||||
tmp_path: Path,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
queue_dir = tmp_path / "queue"
|
||||
results_dir = tmp_path / "results"
|
||||
state_dir = tmp_path / "state"
|
||||
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", queue_dir)
|
||||
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
|
||||
monkeypatch.setattr("clawbench.worker.RESULTS_DIR", results_dir)
|
||||
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(state_dir))
|
||||
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
|
||||
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
|
||||
|
||||
async def fake_upload_result(result) -> None: # noqa: ANN001
|
||||
return None
|
||||
|
||||
async def fake_ensure_gateway() -> None:
|
||||
return None
|
||||
|
||||
async def fake_preflight_browser_support_for_tasks(*args, **kwargs) -> None: # noqa: ANN002, ANN003
|
||||
return None
|
||||
|
||||
task = _runtime_task()
|
||||
queue = JobQueue()
|
||||
job = await queue.submit(
|
||||
SubmissionRequest(
|
||||
model="test/model",
|
||||
provider="test",
|
||||
runs_per_task=1,
|
||||
max_parallel_lanes=1,
|
||||
)
|
||||
)
|
||||
claimed = await queue.claim_pending()
|
||||
assert [claimed_job.job_id for claimed_job in claimed] == [job.job_id]
|
||||
|
||||
worker = EvalWorker(queue)
|
||||
monkeypatch.setattr(worker, "_load_job_tasks", lambda current_job: [task])
|
||||
monkeypatch.setattr("clawbench.harness.load_all_tasks", lambda **kwargs: [task])
|
||||
monkeypatch.setattr(worker, "_ensure_gateway", fake_ensure_gateway)
|
||||
monkeypatch.setattr(worker, "_preflight_browser_support_for_tasks", fake_preflight_browser_support_for_tasks)
|
||||
monkeypatch.setattr(worker, "_stop_gateway", lambda: None)
|
||||
monkeypatch.setattr(worker, "_stop_parallel_gateways", lambda: None)
|
||||
monkeypatch.setattr("clawbench.upload.upload_result", fake_upload_result)
|
||||
|
||||
await worker._process_job(claimed[0])
|
||||
|
||||
finished = await queue.get_status(job.job_id)
|
||||
assert finished is not None
|
||||
assert finished.status == JobStatus.FINISHED
|
||||
assert finished.result_id is not None
|
||||
assert finished.progress_message == "Finished"
|
||||
result_path = results_dir / f"{finished.result_id}.json"
|
||||
result = json.loads(result_path.read_text(encoding="utf-8"))
|
||||
assert result["model"] == "test/model"
|
||||
assert result["overall_completion"] == 1.0
|
||||
assert result["overall_pass_hat_k"] == 1.0
|
||||
assert result["task_results"][0]["task_id"] == "runtime-contract-smoke"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_harness_turn_disconnect_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
|
||||
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
|
||||
monkeypatch.setattr("clawbench.harness.GatewayClient", _DisconnectingGatewayClient)
|
||||
|
||||
harness = BenchmarkHarness(
|
||||
gateway_config=GatewayConfig(),
|
||||
model="test/model",
|
||||
randomize_order=False,
|
||||
print_report=False,
|
||||
quiet=True,
|
||||
)
|
||||
|
||||
result = await harness._run_single(_runtime_task(), 0)
|
||||
|
||||
assert result.run_score == 0.0
|
||||
assert result.delivery_outcome.value == "fail"
|
||||
assert result.failure_mode is not None
|
||||
assert result.failure_mode.value == "environment_unavailable"
|
||||
assert "gateway connection dropped" in (result.error or "")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_harness_scorer_exception_becomes_failed_run(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setenv("OPENCLAW_STATE_DIR", str(tmp_path / "state"))
|
||||
monkeypatch.setenv("CLAWBENCH_RUN_CACHE_DIR", str(tmp_path / "run-cache"))
|
||||
monkeypatch.setattr("clawbench.harness.GatewayClient", _SuccessfulGatewayClient)
|
||||
|
||||
async def fail_score_task_run(**kwargs): # noqa: ANN003
|
||||
raise RuntimeError("scorer exploded")
|
||||
|
||||
monkeypatch.setattr("clawbench.harness.score_task_run", fail_score_task_run)
|
||||
harness = BenchmarkHarness(
|
||||
gateway_config=GatewayConfig(),
|
||||
model="test/model",
|
||||
randomize_order=False,
|
||||
print_report=False,
|
||||
quiet=True,
|
||||
)
|
||||
|
||||
result = await harness._run_single(_runtime_task(), 0)
|
||||
|
||||
assert result.run_score == 0.0
|
||||
assert result.delivery_outcome.value == "fail"
|
||||
assert result.failure_mode is not None
|
||||
assert result.failure_mode.value == "state_regression"
|
||||
assert result.error == "scorer exploded"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stale_evaluating_job_can_be_reclaimed_and_claimed_again(monkeypatch: pytest.MonkeyPatch):
|
||||
queue = JobQueue()
|
||||
stale_started_at = (
|
||||
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
|
||||
).isoformat()
|
||||
queue._jobs = {
|
||||
"job-1": Job(
|
||||
job_id="job-1",
|
||||
status=JobStatus.EVALUATING,
|
||||
started_at=stale_started_at,
|
||||
last_progress_at=stale_started_at,
|
||||
current_task_id="runtime-contract-smoke",
|
||||
current_run_index=1,
|
||||
current_run_total=1,
|
||||
attempt_count=1,
|
||||
request=SubmissionRequest(model="test/model"),
|
||||
)
|
||||
}
|
||||
monkeypatch.setattr(queue, "_save_local", lambda: None)
|
||||
|
||||
async def fake_sync_to_hub() -> None:
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync_to_hub)
|
||||
|
||||
reclaimed = await queue.reclaim_stale_jobs(stale_after_seconds=300)
|
||||
claimed = await queue.claim_pending()
|
||||
|
||||
assert [job.job_id for job in reclaimed] == ["job-1"]
|
||||
assert [job.job_id for job in claimed] == ["job-1"]
|
||||
job = queue._jobs["job-1"]
|
||||
assert job.status == JobStatus.EVALUATING
|
||||
assert job.attempt_count == 2
|
||||
assert job.stale_requeues == 1
|
||||
assert job.current_task_id is None
|
||||
assert job.current_run_index is None
|
||||
assert job.progress_message == "Queued for evaluation"
|
||||
|
||||
|
||||
def test_leaderboard_skips_malformed_local_result_file(
|
||||
tmp_path: Path,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
):
|
||||
class NoopThread:
|
||||
def __init__(self, *args, **kwargs) -> None: # noqa: ANN002, ANN003
|
||||
return None
|
||||
|
||||
def start(self) -> None:
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(threading, "Thread", NoopThread)
|
||||
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", tmp_path / "queue")
|
||||
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
|
||||
sys.modules.pop("app", None)
|
||||
app = importlib.import_module("app")
|
||||
|
||||
results_dir = tmp_path / "results"
|
||||
results_dir.mkdir()
|
||||
(results_dir / "bad.json").write_text("{not json", encoding="utf-8")
|
||||
(results_dir / "good.json").write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"model": "test/model",
|
||||
"timestamp": "2026-04-29T00:00:00+00:00",
|
||||
"overall_score": 0.91,
|
||||
"overall_completion": 1.0,
|
||||
"overall_trajectory": 0.8,
|
||||
"overall_behavior": 1.0,
|
||||
"overall_pass_hat_k": 1.0,
|
||||
"environment": {"prompt_variant": "clear", "scenario": "all"},
|
||||
"task_results": [{"task_id": "runtime-contract-smoke"}],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setattr(app, "RESULTS_DIR", results_dir)
|
||||
monkeypatch.setattr(app, "dataset_has_submission_results", lambda api, repo: False)
|
||||
|
||||
frame = app._load_leaderboard_uncached()
|
||||
|
||||
assert list(frame["Model"]) == ["test/model"]
|
||||
assert list(frame["Score"]) == [0.91]
|
||||
Loading…
Reference in New Issue
Block a user