465 lines
16 KiB
Python
465 lines
16 KiB
Python
import datetime
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from clawbench.hub import (
|
|
dataset_has_submission_results,
|
|
ensure_dataset_repo,
|
|
load_submission_rows_from_parquet,
|
|
resolve_dataset_repo,
|
|
submission_parquet_files,
|
|
)
|
|
from clawbench.queue import Job, JobQueue, JobStatus, SubmissionRequest
|
|
import clawbench.queue as queue_module
|
|
|
|
|
|
def test_submission_request_defaults_to_single_parallel_lane():
|
|
request = SubmissionRequest(model="openai-codex/gpt-5.4")
|
|
|
|
assert request.max_parallel_lanes == 1
|
|
assert request.runs_per_task == 3
|
|
assert request.judge_affects_score is False
|
|
assert request.task_ids == []
|
|
|
|
|
|
def test_local_queue_dir_honors_env_override(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("CLAWBENCH_LOCAL_QUEUE_DIR", str(tmp_path / "queue"))
|
|
|
|
assert queue_module._resolve_local_queue_dir() == tmp_path / "queue"
|
|
|
|
|
|
def test_submission_request_fingerprint_includes_judge_score_gate():
|
|
advisory = SubmissionRequest(model="anthropic/claude-sonnet-4-6", judge_model="judge")
|
|
weighted = SubmissionRequest(
|
|
model="anthropic/claude-sonnet-4-6",
|
|
judge_model="judge",
|
|
judge_affects_score=True,
|
|
)
|
|
|
|
assert advisory.active_fingerprint() != weighted.active_fingerprint()
|
|
|
|
|
|
def test_submission_request_fingerprint_includes_task_ids():
|
|
all_tasks = SubmissionRequest(model="anthropic/claude-sonnet-4-6")
|
|
subset = SubmissionRequest(
|
|
model="anthropic/claude-sonnet-4-6",
|
|
task_ids=["t1-fs-quick-note"],
|
|
)
|
|
|
|
assert all_tasks.active_fingerprint() != subset.active_fingerprint()
|
|
|
|
|
|
def test_submission_request_fingerprint_canonicalizes_task_ids():
|
|
first = SubmissionRequest(
|
|
model="anthropic/claude-sonnet-4-6",
|
|
task_ids=[" t2-demo ", "t1-demo", "t2-demo"],
|
|
)
|
|
second = SubmissionRequest(
|
|
model="anthropic/claude-sonnet-4-6",
|
|
task_ids=["t1-demo", "t2-demo"],
|
|
)
|
|
|
|
assert first.active_fingerprint() == second.active_fingerprint()
|
|
|
|
|
|
def test_save_local_replaces_queue_file_atomically(tmp_path, monkeypatch):
|
|
monkeypatch.setattr(queue_module, "LOCAL_QUEUE_DIR", tmp_path)
|
|
monkeypatch.setattr(queue_module, "HF_TOKEN", "")
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.PENDING,
|
|
submitted_at="2026-04-09T00:00:01+00:00",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
)
|
|
}
|
|
|
|
queue._save_local()
|
|
|
|
jobs_file = tmp_path / "jobs.json"
|
|
assert jobs_file.exists()
|
|
assert Job(**json.loads(jobs_file.read_text(encoding="utf-8"))[0]).job_id == "job-1"
|
|
assert not list(tmp_path.glob("jobs.*.tmp"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_submit_dedupes_equivalent_active_jobs(monkeypatch):
|
|
queue = JobQueue()
|
|
request = SubmissionRequest(model="anthropic/claude-sonnet-4-6", submitter="vincent")
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.PENDING,
|
|
submitted_at="2026-04-09T00:00:01+00:00",
|
|
request=request,
|
|
)
|
|
}
|
|
monkeypatch.setattr(queue, "_save_local", lambda: (_ for _ in ()).throw(AssertionError("should not save")))
|
|
|
|
async def fail_sync() -> None:
|
|
raise AssertionError("should not sync")
|
|
|
|
monkeypatch.setattr(queue, "_sync_to_hub", fail_sync)
|
|
|
|
job = await queue.submit(SubmissionRequest(model="anthropic/claude-sonnet-4-6", submitter="someone-else"))
|
|
|
|
assert job.job_id == "job-1"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_submit_enforces_queue_capacity(monkeypatch):
|
|
monkeypatch.setenv("CLAWBENCH_MAX_ACTIVE_QUEUE_JOBS", "1")
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.EVALUATING,
|
|
submitted_at="2026-04-09T00:00:01+00:00",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
)
|
|
}
|
|
|
|
with pytest.raises(ValueError, match="Queue is at capacity"):
|
|
await queue.submit(SubmissionRequest(model="huggingface/Qwen/Qwen3-32B"))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_submit_enforces_submitter_limit(monkeypatch):
|
|
monkeypatch.setenv("CLAWBENCH_MAX_ACTIVE_JOBS_PER_SUBMITTER", "1")
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.PENDING,
|
|
submitted_at="2026-04-09T00:00:01+00:00",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6", submitter="Vincent"),
|
|
)
|
|
}
|
|
|
|
with pytest.raises(ValueError, match="already has 1 active"):
|
|
await queue.submit(SubmissionRequest(model="huggingface/Qwen/Qwen3-32B", submitter=" vincent "))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_submit_enforces_per_submission_runtime_caps(monkeypatch):
|
|
monkeypatch.setenv("CLAWBENCH_MAX_RUNS_PER_SUBMISSION", "2")
|
|
monkeypatch.setenv("CLAWBENCH_MAX_LANES_PER_SUBMISSION", "1")
|
|
queue = JobQueue()
|
|
queue._jobs = {}
|
|
|
|
with pytest.raises(ValueError, match="runs_per_task=3"):
|
|
await queue.submit(SubmissionRequest(model="anthropic/claude-sonnet-4-6", runs_per_task=3))
|
|
|
|
with pytest.raises(ValueError, match="max_parallel_lanes=2"):
|
|
await queue.submit(
|
|
SubmissionRequest(
|
|
model="anthropic/claude-sonnet-4-6",
|
|
runs_per_task=2,
|
|
max_parallel_lanes=2,
|
|
)
|
|
)
|
|
|
|
|
|
def test_resolve_dataset_repo_prefers_explicit_env(monkeypatch):
|
|
monkeypatch.setenv("CLAWBENCH_QUEUE_DATASET", "custom-owner/custom-results")
|
|
monkeypatch.setenv("SPACE_ID", "ScoootScooob/clawbench")
|
|
|
|
assert resolve_dataset_repo() == "custom-owner/custom-results"
|
|
|
|
|
|
def test_resolve_dataset_repo_derives_owner_from_space_id(monkeypatch):
|
|
monkeypatch.delenv("CLAWBENCH_QUEUE_DATASET", raising=False)
|
|
monkeypatch.setenv("SPACE_ID", "ScoootScooob/clawbench")
|
|
|
|
assert resolve_dataset_repo() == "ScoootScooob/clawbench-results"
|
|
|
|
|
|
def test_ensure_dataset_repo_creates_missing_repo_and_bootstrap_readme():
|
|
class FakeApi:
|
|
def __init__(self) -> None:
|
|
self.created: list[tuple[str, str, bool, bool]] = []
|
|
self.uploaded: list[tuple[str, str, str]] = []
|
|
|
|
def repo_info(self, repo_id: str, repo_type: str) -> None:
|
|
raise RuntimeError("missing")
|
|
|
|
def create_repo(self, repo_id: str, repo_type: str, private: bool, exist_ok: bool) -> None:
|
|
self.created.append((repo_id, repo_type, private, exist_ok))
|
|
|
|
def upload_file(
|
|
self,
|
|
*,
|
|
path_or_fileobj: bytes,
|
|
path_in_repo: str,
|
|
repo_id: str,
|
|
repo_type: str,
|
|
) -> None:
|
|
assert b"ClawBench Results" in path_or_fileobj
|
|
self.uploaded.append((path_in_repo, repo_id, repo_type))
|
|
|
|
api = FakeApi()
|
|
|
|
ensure_dataset_repo(api, "ScoootScooob/clawbench-results")
|
|
|
|
assert api.created == [("ScoootScooob/clawbench-results", "dataset", False, True)]
|
|
assert api.uploaded == [("README.md", "ScoootScooob/clawbench-results", "dataset")]
|
|
|
|
|
|
def test_dataset_has_submission_results_ignores_queue_only_files():
|
|
class FakeApi:
|
|
def list_repo_files(self, repo_id: str, repo_type: str) -> list[str]:
|
|
assert repo_id == "ScoootScooob/clawbench-results"
|
|
assert repo_type == "dataset"
|
|
return ["README.md", "queue/jobs.json"]
|
|
|
|
assert not dataset_has_submission_results(FakeApi(), "ScoootScooob/clawbench-results")
|
|
|
|
|
|
def test_dataset_has_submission_results_detects_uploaded_parquet():
|
|
class FakeApi:
|
|
def list_repo_files(self, repo_id: str, repo_type: str) -> list[str]:
|
|
assert repo_id == "ScoootScooob/clawbench-results"
|
|
assert repo_type == "dataset"
|
|
return ["README.md", "data/submissions-00000-of-00001.parquet"]
|
|
|
|
assert dataset_has_submission_results(FakeApi(), "ScoootScooob/clawbench-results")
|
|
|
|
|
|
def test_submission_parquet_files_filters_to_submissions_split():
|
|
class FakeApi:
|
|
def list_repo_files(self, repo_id: str, repo_type: str) -> list[str]:
|
|
assert repo_id == "ScoootScooob/clawbench-results"
|
|
assert repo_type == "dataset"
|
|
return [
|
|
"README.md",
|
|
"queue/jobs.json",
|
|
"data/submissions-00000-of-00001.parquet",
|
|
"data/requests-00000-of-00001.parquet",
|
|
"data/submissions-00001-of-00002.parquet",
|
|
]
|
|
|
|
assert submission_parquet_files(FakeApi(), "ScoootScooob/clawbench-results") == [
|
|
"data/submissions-00000-of-00001.parquet",
|
|
"data/submissions-00001-of-00002.parquet",
|
|
]
|
|
|
|
|
|
def test_load_submission_rows_from_parquet_uses_direct_hub_download(tmp_path):
|
|
class FakeApi:
|
|
def list_repo_files(self, repo_id: str, repo_type: str) -> list[str]:
|
|
assert repo_id == "ScoootScooob/clawbench-results"
|
|
assert repo_type == "dataset"
|
|
return ["data/submissions-00000-of-00001.parquet"]
|
|
|
|
parquet_path = tmp_path / "submissions.parquet"
|
|
download_calls: list[tuple[str, str, str, str | None]] = []
|
|
|
|
def fake_downloader(*, repo_id: str, repo_type: str, filename: str, token: str | None = None) -> str:
|
|
download_calls.append((repo_id, repo_type, filename, token))
|
|
return str(parquet_path)
|
|
|
|
class FakeFrame:
|
|
def to_dict(self, orient: str):
|
|
assert orient == "records"
|
|
return [{"model": "anthropic/claude-sonnet-4-6", "overall_score": 0.7}]
|
|
|
|
class FakePandas:
|
|
@staticmethod
|
|
def read_parquet(path):
|
|
assert str(path) == str(parquet_path)
|
|
return FakeFrame()
|
|
|
|
rows = load_submission_rows_from_parquet(
|
|
"ScoootScooob/clawbench-results",
|
|
token="hf_test",
|
|
api=FakeApi(),
|
|
downloader=fake_downloader,
|
|
pandas_module=FakePandas(),
|
|
)
|
|
|
|
assert download_calls == [
|
|
("ScoootScooob/clawbench-results", "dataset", "data/submissions-00000-of-00001.parquet", "hf_test")
|
|
]
|
|
assert rows == [{"model": "anthropic/claude-sonnet-4-6", "overall_score": 0.7}]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mark_evaluating_syncs_to_hub(monkeypatch):
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.PENDING,
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
)
|
|
}
|
|
save_calls: list[str] = []
|
|
sync_calls: list[str] = []
|
|
|
|
def fake_save_local() -> None:
|
|
save_calls.append("saved")
|
|
|
|
async def fake_sync() -> None:
|
|
sync_calls.append("synced")
|
|
|
|
monkeypatch.setattr(queue, "_save_local", fake_save_local)
|
|
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync)
|
|
|
|
await queue.mark_evaluating("job-1")
|
|
|
|
assert queue._jobs["job-1"].status == JobStatus.EVALUATING
|
|
assert queue._jobs["job-1"].started_at is not None
|
|
assert save_calls == ["saved"]
|
|
assert sync_calls == ["synced"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_pending_marks_multiple_jobs_evaluating(monkeypatch):
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.PENDING,
|
|
submitted_at="2026-04-09T00:00:01+00:00",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
),
|
|
"job-2": Job(
|
|
job_id="job-2",
|
|
status=JobStatus.PENDING,
|
|
submitted_at="2026-04-09T00:00:02+00:00",
|
|
request=SubmissionRequest(model="huggingface/Qwen/Qwen3-32B"),
|
|
),
|
|
"job-3": Job(
|
|
job_id="job-3",
|
|
status=JobStatus.FINISHED,
|
|
submitted_at="2026-04-09T00:00:03+00:00",
|
|
request=SubmissionRequest(model="huggingface/zai-org/GLM-5"),
|
|
),
|
|
}
|
|
save_calls: list[str] = []
|
|
sync_calls: list[str] = []
|
|
|
|
def fake_save_local() -> None:
|
|
save_calls.append("saved")
|
|
|
|
async def fake_sync() -> None:
|
|
sync_calls.append("synced")
|
|
|
|
monkeypatch.setattr(queue, "_save_local", fake_save_local)
|
|
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync)
|
|
|
|
claimed = await queue.claim_pending(limit=2)
|
|
|
|
assert [job.job_id for job in claimed] == ["job-1", "job-2"]
|
|
assert queue._jobs["job-1"].status == JobStatus.EVALUATING
|
|
assert queue._jobs["job-2"].status == JobStatus.EVALUATING
|
|
assert queue._jobs["job-3"].status == JobStatus.FINISHED
|
|
assert queue._jobs["job-1"].started_at is not None
|
|
assert queue._jobs["job-2"].started_at is not None
|
|
assert save_calls == ["saved"]
|
|
assert sync_calls == ["synced"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_progress_tracks_current_task_and_heartbeat(monkeypatch):
|
|
queue = JobQueue()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.EVALUATING,
|
|
started_at="2026-04-09T00:00:00+00:00",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
)
|
|
}
|
|
save_calls: list[str] = []
|
|
sync_calls: list[str] = []
|
|
|
|
def fake_save_local() -> None:
|
|
save_calls.append("saved")
|
|
|
|
async def fake_sync() -> None:
|
|
sync_calls.append("synced")
|
|
|
|
monkeypatch.setattr(queue, "_save_local", fake_save_local)
|
|
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync)
|
|
|
|
await queue.update_progress(
|
|
"job-1",
|
|
current_task_id="t3-monitoring-automation",
|
|
current_run_index=2,
|
|
current_run_total=3,
|
|
progress_message="Running t3-monitoring-automation (run 2/3)",
|
|
)
|
|
|
|
job = queue._jobs["job-1"]
|
|
assert job.current_task_id == "t3-monitoring-automation"
|
|
assert job.current_run_index == 2
|
|
assert job.current_run_total == 3
|
|
assert job.progress_message == "Running t3-monitoring-automation (run 2/3)"
|
|
assert job.last_progress_at is not None
|
|
assert save_calls == ["saved"]
|
|
assert sync_calls == ["synced"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reclaim_stale_jobs_requeues_only_expired_evaluations(monkeypatch):
|
|
queue = JobQueue()
|
|
stale_started_at = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
|
|
fresh_started_at = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=2)).isoformat()
|
|
queue._jobs = {
|
|
"job-1": Job(
|
|
job_id="job-1",
|
|
status=JobStatus.EVALUATING,
|
|
started_at=stale_started_at,
|
|
last_progress_at=stale_started_at,
|
|
current_task_id="t1-architecture-brief",
|
|
current_run_index=1,
|
|
current_run_total=3,
|
|
progress_message="Running t1-architecture-brief (run 1/3)",
|
|
request=SubmissionRequest(model="anthropic/claude-sonnet-4-6"),
|
|
),
|
|
"job-2": Job(
|
|
job_id="job-2",
|
|
status=JobStatus.EVALUATING,
|
|
started_at=fresh_started_at,
|
|
last_progress_at=fresh_started_at,
|
|
current_task_id="t1-bugfix-discount",
|
|
current_run_index=1,
|
|
current_run_total=3,
|
|
progress_message="Running t1-bugfix-discount (run 1/3)",
|
|
request=SubmissionRequest(model="huggingface/Qwen/Qwen3-32B"),
|
|
),
|
|
}
|
|
save_calls: list[str] = []
|
|
sync_calls: list[str] = []
|
|
|
|
def fake_save_local() -> None:
|
|
save_calls.append("saved")
|
|
|
|
async def fake_sync() -> None:
|
|
sync_calls.append("synced")
|
|
|
|
monkeypatch.setattr(queue, "_save_local", fake_save_local)
|
|
monkeypatch.setattr(queue, "_sync_to_hub", fake_sync)
|
|
|
|
reclaimed = await queue.reclaim_stale_jobs(stale_after_seconds=300)
|
|
|
|
assert [job.job_id for job in reclaimed] == ["job-1"]
|
|
stale_job = queue._jobs["job-1"]
|
|
assert stale_job.status == JobStatus.PENDING
|
|
assert stale_job.started_at is None
|
|
assert stale_job.current_task_id is None
|
|
assert stale_job.current_run_index is None
|
|
assert stale_job.current_run_total is None
|
|
assert stale_job.stale_requeues == 1
|
|
assert stale_job.progress_message is not None
|
|
assert stale_job.progress_message.startswith("Auto-requeued after stale evaluation lease")
|
|
|
|
fresh_job = queue._jobs["job-2"]
|
|
assert fresh_job.status == JobStatus.EVALUATING
|
|
assert fresh_job.current_task_id == "t1-bugfix-discount"
|
|
assert save_calls == ["saved"]
|
|
assert sync_calls == ["synced"]
|