Adds a separate merge gate, human-review labels for merge-ready targets, wave dispatching, and promotes the stuck job backlog into inbox.
133 lines
4.0 KiB
JavaScript
Executable File
133 lines
4.0 KiB
JavaScript
Executable File
#!/usr/bin/env node
|
|
import fs from "node:fs";
|
|
import path from "node:path";
|
|
import { spawnSync } from "node:child_process";
|
|
import {
|
|
assertLiveWorkerCapacity,
|
|
currentProjectRepo,
|
|
liveWorkerCapacity,
|
|
parseArgs,
|
|
parseJob,
|
|
readMaxLiveWorkers,
|
|
repoRoot,
|
|
validateJob,
|
|
waitForLiveWorkerCapacity,
|
|
} from "./lib.mjs";
|
|
|
|
const args = parseArgs(process.argv.slice(2));
|
|
const defaultRunner = process.env.CLOWNFISH_WORKER_RUNNER ?? "blacksmith-4vcpu-ubuntu-2404";
|
|
const defaultExecutionRunner = process.env.CLOWNFISH_EXECUTION_RUNNER ?? "blacksmith-16vcpu-ubuntu-2404";
|
|
const mode = args.mode ?? "plan";
|
|
const runner = args.runner ?? defaultRunner;
|
|
const executionRunner = args["execution-runner"] ?? args.execution_runner ?? defaultExecutionRunner;
|
|
const workflow = args.workflow ?? "cluster-worker.yml";
|
|
const repo = String(args.repo ?? currentProjectRepo());
|
|
const model = String(args.model ?? process.env.CLOWNFISH_MODEL ?? "gpt-5.5");
|
|
const maxLiveWorkers = readMaxLiveWorkers(args);
|
|
const waitForCapacity = Boolean(args["wait-for-capacity"]);
|
|
const ref = args.ref ? String(args.ref) : "";
|
|
const files = args._;
|
|
|
|
if (files.length === 0) {
|
|
console.error(
|
|
"usage: node scripts/dispatch-jobs.mjs <job.md> [...] [--mode plan|execute|autonomous] [--runner label] [--execution-runner label] [--model model] [--max-live-workers 50] [--wait-for-capacity]",
|
|
);
|
|
process.exit(2);
|
|
}
|
|
|
|
let failed = false;
|
|
const jobs = [];
|
|
for (const file of files) {
|
|
const job = parseJob(file);
|
|
const errors = validateJob(job);
|
|
if (errors.length > 0) {
|
|
failed = true;
|
|
console.error(`invalid job: ${file}`);
|
|
for (const error of errors) console.error(`- ${error}`);
|
|
continue;
|
|
}
|
|
|
|
const relative = job.relativePath;
|
|
if (!fs.existsSync(path.join(repoRoot(), relative))) {
|
|
failed = true;
|
|
console.error(`job does not exist inside repo: ${file}`);
|
|
continue;
|
|
}
|
|
jobs.push(relative);
|
|
}
|
|
|
|
if (!failed) {
|
|
const requested = waitForCapacity ? Math.min(jobs.length, 1) : jobs.length;
|
|
const capacityOptions = { repo, workflow, requested, maxLiveWorkers };
|
|
const capacity = waitForCapacity ? waitForLiveWorkerCapacity(capacityOptions) : assertLiveWorkerCapacity(capacityOptions);
|
|
console.log(
|
|
`live worker capacity: ${capacity.active}/${capacity.max_live_workers} active; dispatching ${jobs.length} ${workflow} run(s)`,
|
|
);
|
|
}
|
|
|
|
let dispatched = 0;
|
|
let index = 0;
|
|
while (!failed && index < jobs.length) {
|
|
let batchSize = jobs.length - index;
|
|
if (waitForCapacity) {
|
|
const capacity = waitForLiveWorkerCapacity({
|
|
repo,
|
|
workflow,
|
|
requested: 1,
|
|
maxLiveWorkers,
|
|
});
|
|
const refreshed = liveWorkerCapacity({ repo, workflow, requested: 1, maxLiveWorkers });
|
|
batchSize = Math.min(batchSize, Math.max(1, refreshed.available || capacity.available || 1));
|
|
console.log(
|
|
`live worker capacity: ${refreshed.active}/${refreshed.max_live_workers} active; dispatching next ${batchSize} run(s)`,
|
|
);
|
|
}
|
|
|
|
for (const relative of jobs.slice(index, index + batchSize)) {
|
|
if (failed) break;
|
|
dispatched += 1;
|
|
dispatchJob(relative, dispatched, jobs.length);
|
|
}
|
|
index += batchSize;
|
|
if (waitForCapacity && !failed && index < jobs.length) {
|
|
sleepMs(15_000);
|
|
}
|
|
}
|
|
|
|
function dispatchJob(relative, position, total) {
|
|
const result = spawnSync(
|
|
"gh",
|
|
[
|
|
"workflow",
|
|
"run",
|
|
workflow,
|
|
"--repo",
|
|
repo,
|
|
...(ref ? ["--ref", ref] : []),
|
|
"-f",
|
|
`job=${relative}`,
|
|
"-f",
|
|
`mode=${mode}`,
|
|
"-f",
|
|
`runner=${runner}`,
|
|
"-f",
|
|
`execution_runner=${executionRunner}`,
|
|
"-f",
|
|
`model=${model}`,
|
|
],
|
|
{ cwd: repoRoot(), encoding: "utf8", stdio: "pipe" },
|
|
);
|
|
if (result.status !== 0) {
|
|
failed = true;
|
|
console.error(result.stderr || result.stdout);
|
|
} else {
|
|
console.log(`dispatched ${position}/${total} ${relative} (${mode}) on ${runner}; execution on ${executionRunner}`);
|
|
}
|
|
}
|
|
|
|
function sleepMs(milliseconds) {
|
|
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, milliseconds);
|
|
}
|
|
|
|
if (failed) process.exit(1);
|