fix: skip active repair job dispatches
Some checks are pending
CI / pnpm check (push) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (actions) (push) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (push) Waiting to run
Pages / Deploy docs (push) Waiting to run

This commit is contained in:
Peter Steinberger 2026-05-02 08:12:06 +01:00
parent ec423bfc44
commit 4a59e45a3f
No known key found for this signature in database
7 changed files with 147 additions and 43 deletions

View File

@ -156,9 +156,10 @@ The loop is intentionally idempotent.
- One durable review comment edited in place.
- Comment-router ledger keys use comment id plus `updated_at`.
- Response markers include the PR head SHA.
- Before dispatching repair, the router checks for an active run with the same
adopted job path. If one exists, the command stays `waiting` and points at
that run instead of enqueueing another repair.
- Before dispatching repair, the router and repair dispatchers check for an
active run with the same adopted job path. If one exists, the command stays
`waiting` or the dispatcher skips that job instead of enqueueing another
repair.
- Repair workers still keep a workflow concurrency group for the same job path
as a last-resort race guard.
- Automatic repairs are capped by

View File

@ -3,8 +3,8 @@ import type { JsonValue, LooseRecord } from "./json-types.js";
import fs from "node:fs";
import path from "node:path";
import {
activeRepairWorkflowRunForJob,
assertLiveWorkerCapacity,
listActiveWorkflowRuns,
parseArgs,
parseJob,
repoRoot,
@ -1436,24 +1436,13 @@ function commandHasWaitingRepairDispatch(command: LooseRecord) {
function activeRepairRunForCommand(command: LooseRecord) {
const jobPath = String(command.target?.job_path ?? "");
if (!jobPath) return null;
const automergeJob = jobPath.includes("/inbox/automerge-");
const prefix = automergeJob ? automergeRunNamePrefix : "repair cluster ";
const expectedTitle = `${prefix}${jobPath}`;
if (!activeRepairRunsByPrefix.has(prefix)) {
activeRepairRunsByPrefix.set(
prefix,
listActiveWorkflowRuns({
repo: repairRepo,
workflow,
runNamePrefix: prefix,
}),
);
}
return (
activeRepairRunsByPrefix
.get(prefix)
?.find((run: JsonValue) => String(run.displayTitle ?? "") === expectedTitle) ?? null
);
return activeRepairWorkflowRunForJob({
repo: repairRepo,
workflow,
jobPath,
automergeRunNamePrefix,
activeRunsByPrefix: activeRepairRunsByPrefix,
});
}
function dispatchTokenEnv(): NodeJS.ProcessEnv {

View File

@ -1,9 +1,10 @@
#!/usr/bin/env node
import type { JsonValue } from "./json-types.js";
import type { JsonValue, LooseRecord } from "./json-types.js";
import fs from "node:fs";
import path from "node:path";
import { spawnSync } from "node:child_process";
import {
activeRepairWorkflowRunForJob,
assertLiveWorkerCapacity,
currentProjectRepo,
liveWorkerCapacity,
@ -31,6 +32,7 @@ const maxLiveWorkers = readMaxLiveWorkers(args);
const waitForCapacity = Boolean(args["wait-for-capacity"]);
const ref = args.ref ? String(args.ref) : "";
const files = args._;
const activeRepairRunsByPrefix = new Map<string, LooseRecord[]>();
if (files.length === 0) {
console.error(
@ -40,7 +42,7 @@ if (files.length === 0) {
}
let failed = false;
const jobs: JsonValue[] = [];
const validatedJobs: JsonValue[] = [];
for (const file of files) {
const job = parseJob(file);
const errors = validateJob(job);
@ -57,9 +59,11 @@ for (const file of files) {
console.error(`job does not exist inside repo: ${file}`);
continue;
}
jobs.push(relative);
validatedJobs.push(relative);
}
const jobs = failed ? [] : validatedJobs.filter((relative) => shouldDispatchJob(relative));
if (!failed) {
const requested = waitForCapacity ? Math.min(jobs.length, 1) : jobs.length;
const capacityOptions = { repo, workflow, requested, maxLiveWorkers };
@ -133,4 +137,18 @@ function dispatchJob(relative: JsonValue, position: JsonValue, total: JsonValue)
}
}
function shouldDispatchJob(relative: JsonValue) {
const activeRun = activeRepairWorkflowRunForJob({
repo,
workflow,
jobPath: relative,
activeRunsByPrefix: activeRepairRunsByPrefix,
});
if (!activeRun) return true;
console.log(
`skipping ${relative}: active ${workflow} run already exists (${activeRun.url ?? activeRun.databaseId ?? "unknown run"})`,
);
return false;
}
if (failed) process.exit(1);

View File

@ -3,6 +3,7 @@ import type { JsonValue, LooseRecord } from "./json-types.js";
import fs from "node:fs";
import path from "node:path";
import {
activeRepairWorkflowRunForJob,
assertLiveWorkerCapacity,
currentProjectRepo,
hasDeterministicSecuritySignal,
@ -52,6 +53,7 @@ const maxPrs = Number(args["max-prs"] ?? args.limit ?? 5);
const maxLiveWorkers = readMaxLiveWorkers(args);
const waitForCapacity = Boolean(args["wait-for-capacity"]);
const allowRepeat = Boolean(args["allow-repeat"]);
const activeRepairRunsByPrefix = new Map<string, LooseRecord[]>();
if (!/^[A-Za-z0-9_.-]+\/[A-Za-z0-9_.-]+$/.test(repo)) {
throw new Error(`repo must be owner/repo, got ${repo}`);
@ -587,25 +589,38 @@ function executeDispatches(candidates: LooseRecord[], dispatchSummary: JsonValue
throw new Error("refusing finalizer dispatch: CLAWSWEEPER_ALLOW_FIX_PR must be 1");
}
const capacity = waitForCapacity
? waitForLiveWorkerCapacity({
repo: repairRepo,
workflow,
requested: candidates.length,
maxLiveWorkers,
})
: assertLiveWorkerCapacity({
repo: repairRepo,
workflow,
requested: candidates.length,
maxLiveWorkers,
});
summary.live_worker_capacity_before_dispatch = capacity;
const activeRunsByJobPath = new Map<string, LooseRecord>();
for (const candidate of candidates) {
const activeRun = activeRepairWorkflowRunForJob({
repo: repairRepo,
workflow,
jobPath: candidate.job_path,
activeRunsByPrefix: activeRepairRunsByPrefix,
});
if (activeRun) activeRunsByJobPath.set(String(candidate.job_path), activeRun);
}
const dispatchCount = candidates.length - activeRunsByJobPath.size;
if (dispatchCount > 0) {
const capacity = waitForCapacity
? waitForLiveWorkerCapacity({
repo: repairRepo,
workflow,
requested: dispatchCount,
maxLiveWorkers,
})
: assertLiveWorkerCapacity({
repo: repairRepo,
workflow,
requested: dispatchCount,
maxLiveWorkers,
});
summary.live_worker_capacity_before_dispatch = capacity;
}
const ledger = readDispatchLedger();
const batchId = `finalize-open-prs-${new Date().toISOString().replace(/[:.]/g, "-")}`;
for (const candidate of candidates) {
const attempt = {
const attempt: LooseRecord = {
batch_id: batchId,
idempotency_key: candidate.idempotency_key,
target_repo: repo,
@ -625,8 +640,17 @@ function executeDispatches(candidates: LooseRecord[], dispatchSummary: JsonValue
dispatched_at: new Date().toISOString(),
status: "pending",
};
dispatchRepair(candidate);
attempt.status = "dispatched";
const activeRun = activeRunsByJobPath.get(String(candidate.job_path));
if (activeRun) {
attempt.status = "waiting";
attempt.reason = "repair worker already active for this job path";
attempt.run_url = activeRun.url;
attempt.run_id = activeRun.databaseId ?? activeRun.id;
attempt.run_status = activeRun.status;
} else {
dispatchRepair(candidate);
attempt.status = "dispatched";
}
summary.attempts.push(attempt);
ledger.attempts.push(attempt);
}

View File

@ -27,9 +27,12 @@ export type ParsedJob = {
export { githubActionsRunUrl, currentProjectRepo } from "./project-repo.js";
export {
assertLiveWorkerCapacity,
activeRepairWorkflowRunForJob,
listActiveWorkflowRuns,
liveWorkerCapacity,
readMaxLiveWorkers,
repairRunNameForJob,
repairRunNamePrefixForJob,
waitForLiveWorkerCapacity,
} from "./live-worker-capacity.js";
export { hasDeterministicSecuritySignal, hasSecuritySignalText } from "./security-signals.js";

View File

@ -6,6 +6,8 @@ import { sleepMs } from "./timing.js";
const DEFAULT_MAX_LIVE_WORKERS = 50;
export const MAX_LIVE_WORKERS = 100;
export const DEFAULT_AUTOMERGE_REPAIR_RUN_NAME_PREFIX = "automerge repair ";
export const DEFAULT_REPAIR_RUN_NAME_PREFIX = "repair cluster ";
const DEFAULT_CAPACITY_POLL_MS = 30_000;
const DEFAULT_CAPACITY_TIMEOUT_MS = 30 * 60 * 1000;
const ACTIVE_WORKFLOW_STATUSES = ["queued", "in_progress", "waiting", "requested", "pending"];
@ -135,6 +137,56 @@ export function listActiveWorkflowRuns({
);
}
export function repairRunNamePrefixForJob(
jobPath: JsonValue,
automergeRunNamePrefix: JsonValue = DEFAULT_AUTOMERGE_REPAIR_RUN_NAME_PREFIX,
) {
return String(jobPath ?? "").includes("/inbox/automerge-")
? String(automergeRunNamePrefix ?? DEFAULT_AUTOMERGE_REPAIR_RUN_NAME_PREFIX)
: DEFAULT_REPAIR_RUN_NAME_PREFIX;
}
export function repairRunNameForJob(
jobPath: JsonValue,
automergeRunNamePrefix: JsonValue = DEFAULT_AUTOMERGE_REPAIR_RUN_NAME_PREFIX,
) {
return `${repairRunNamePrefixForJob(jobPath, automergeRunNamePrefix)}${String(jobPath ?? "")}`;
}
export function activeRepairWorkflowRunForJob({
repo = currentProjectRepo(),
workflow = REPAIR_CLUSTER_WORKFLOW,
jobPath,
automergeRunNamePrefix = DEFAULT_AUTOMERGE_REPAIR_RUN_NAME_PREFIX,
activeRunsByPrefix,
}: LooseRecord = {}) {
const job = String(jobPath ?? "");
if (!job) return null;
const prefix = repairRunNamePrefixForJob(job, automergeRunNamePrefix);
const expectedTitle = repairRunNameForJob(job, automergeRunNamePrefix);
if (activeRunsByPrefix instanceof Map && !activeRunsByPrefix.has(prefix)) {
activeRunsByPrefix.set(
prefix,
listActiveWorkflowRuns({
repo,
workflow,
runNamePrefix: prefix,
}),
);
}
const activeRuns =
activeRunsByPrefix instanceof Map
? activeRunsByPrefix.get(prefix)
: listActiveWorkflowRuns({
repo,
workflow,
runNamePrefix: prefix,
});
return (
activeRuns?.find((run: JsonValue) => String(run.displayTitle ?? "") === expectedTitle) ?? null
);
}
function runMatchesNameFilter(
run: LooseRecord,
runNamePrefix: JsonValue,

View File

@ -1,7 +1,12 @@
import assert from "node:assert/strict";
import test from "node:test";
import { MAX_LIVE_WORKERS, readMaxLiveWorkers } from "../../dist/repair/live-worker-capacity.js";
import {
MAX_LIVE_WORKERS,
readMaxLiveWorkers,
repairRunNameForJob,
repairRunNamePrefixForJob,
} from "../../dist/repair/live-worker-capacity.js";
test("live worker capacity refuses limits above the global Codex cap", () => {
assert.equal(MAX_LIVE_WORKERS, 100);
@ -22,3 +27,15 @@ test("live worker capacity accepts env default within the global Codex cap", ()
else process.env.CLAWSWEEPER_MAX_LIVE_WORKERS = previous;
}
});
test("repair run names match workflow dispatch titles", () => {
assert.equal(
repairRunNameForJob("jobs/openclaw/inbox/automerge-openclaw-openclaw-75363.md"),
"automerge repair jobs/openclaw/inbox/automerge-openclaw-openclaw-75363.md",
);
assert.equal(repairRunNamePrefixForJob("jobs/openclaw/inbox/cluster-abc.md"), "repair cluster ");
assert.equal(
repairRunNameForJob("jobs/openclaw/inbox/automerge-openclaw-openclaw-75363.md", "auto "),
"auto jobs/openclaw/inbox/automerge-openclaw-openclaw-75363.md",
);
});