projectclownfish/scripts/requeue-job.mjs
2026-04-26 04:26:41 -07:00

244 lines
7.6 KiB
JavaScript

#!/usr/bin/env node
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { execFileSync, spawnSync } from "node:child_process";
import { parseArgs, parseJob, repoRoot, validateJob } from "./lib.mjs";
const DEFAULT_REPO = "openclaw/projectclownfish";
const DEFAULT_WORKFLOW = "cluster-worker.yml";
const DEFAULT_RUNNER = process.env.CLOWNFISH_WORKER_RUNNER ?? "blacksmith-4vcpu-ubuntu-2404";
const DEFAULT_EXECUTION_RUNNER = process.env.CLOWNFISH_EXECUTION_RUNNER ?? "blacksmith-16vcpu-ubuntu-2404";
const QUEUED_STATUSES = new Set(["queued", "requested", "waiting", "pending"]);
const args = parseArgs(process.argv.slice(2));
const repo = String(args.repo ?? DEFAULT_REPO);
const workflow = String(args.workflow ?? DEFAULT_WORKFLOW);
const runner = String(args.runner ?? DEFAULT_RUNNER);
const executionRunner = String(args["execution-runner"] ?? args.execution_runner ?? DEFAULT_EXECUTION_RUNNER);
const model = String(args.model ?? process.env.CLOWNFISH_MODEL ?? "gpt-5.5");
const execute = Boolean(args.execute || args.live);
const openExecuteWindow = Boolean(args["open-execute-window"] || args.live);
const requestedMode = typeof args.mode === "string" ? args.mode : null;
const requestedRunId = args["run-id"] ?? (looksLikeRunId(args._[0]) ? args._[0] : null);
const resolved = requestedRunId
? resolveFromRunId(String(requestedRunId))
: { source_job: args._[0], mode: requestedMode };
if (!resolved.source_job) {
console.error(
"usage: node scripts/requeue-job.mjs <job.md|run-id> [--mode plan|execute|autonomous] [--execute] [--open-execute-window] [--runner label] [--execution-runner label] [--model model]",
);
process.exit(2);
}
const job = parseJob(resolved.source_job);
const errors = validateJob(job);
if (errors.length > 0) {
console.error(`invalid job: ${job.relativePath}`);
for (const error of errors) console.error(`- ${error}`);
process.exit(1);
}
const mode = requestedMode ?? resolved.mode ?? job.frontmatter.mode;
if (!["plan", "execute", "autonomous"].includes(mode)) {
throw new Error(`unsupported mode: ${mode}`);
}
const summary = {
status: execute ? "dispatching" : "dry_run",
repo,
workflow,
source_run_id: requestedRunId,
source_job: job.relativePath,
mode,
runner,
execution_runner: executionRunner,
model,
};
if (!execute) {
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}
const gateRestores = [];
const headSha = currentHeadSha();
const dispatchStartedAt = new Date(Date.now() - 5000).toISOString();
try {
if (openExecuteWindow && ["execute", "autonomous"].includes(mode)) {
openGate("CLOWNFISH_ALLOW_EXECUTE");
if (job.frontmatter.allow_fix_pr === true || job.frontmatter.allowed_actions.includes("fix")) {
openGate("CLOWNFISH_ALLOW_FIX_PR");
}
}
assertGateOpenIfNeeded(mode);
dispatchJob(job.relativePath, mode);
const observedRuns = waitForStartedRuns({ headSha, since: dispatchStartedAt, expectedCount: 1 });
summary.status = "dispatched";
summary.observed_runs = observedRuns.map((run) => ({
run_id: String(run.databaseId),
status: run.status,
conclusion: run.conclusion ?? null,
created_at: run.createdAt,
url: run.url,
}));
console.log(JSON.stringify(summary, null, 2));
} finally {
for (const gate of gateRestores.reverse()) {
setGate(gate.name, gate.previous || "1");
}
}
function resolveFromRunId(runId) {
const fromLedger = readPublishedRunRecord(runId);
if (fromLedger?.source_job) {
return { source_job: fromLedger.source_job, mode: fromLedger.mode };
}
const artifactDir = fs.mkdtempSync(path.join(os.tmpdir(), `projectclownfish-requeue-${runId}-`));
const downloaded = spawnSync(
"gh",
["run", "download", runId, "--repo", repo, "--dir", artifactDir],
{ cwd: repoRoot(), encoding: "utf8", stdio: "pipe" },
);
if (downloaded.status !== 0) {
throw new Error(`could not resolve run ${runId}: ${downloaded.stderr || downloaded.stdout}`);
}
const planPath = findFirstFile(artifactDir, "cluster-plan.json");
const resultPath = findFirstFile(artifactDir, "result.json");
if (!planPath) throw new Error(`run ${runId} artifact did not include cluster-plan.json`);
const plan = JSON.parse(fs.readFileSync(planPath, "utf8"));
const result = resultPath ? JSON.parse(fs.readFileSync(resultPath, "utf8")) : null;
return { source_job: plan.source_job, mode: result?.mode ?? plan.mode };
}
function readPublishedRunRecord(runId) {
const file = path.join(repoRoot(), "results", "runs", `${runId}.json`);
if (!fs.existsSync(file)) return null;
return JSON.parse(fs.readFileSync(file, "utf8"));
}
function findFirstFile(root, basename) {
for (const entry of fs.readdirSync(root, { recursive: true })) {
const candidate = path.join(root, String(entry));
if (path.basename(candidate) === basename && fs.statSync(candidate).isFile()) return candidate;
}
return null;
}
function dispatchJob(jobPath, mode) {
const result = spawnSync(
"gh",
[
"workflow",
"run",
workflow,
"--repo",
repo,
"-f",
`job=${jobPath}`,
"-f",
`mode=${mode}`,
"-f",
`runner=${runner}`,
"-f",
`execution_runner=${executionRunner}`,
"-f",
`model=${model}`,
],
{ cwd: repoRoot(), encoding: "utf8", stdio: "pipe" },
);
if (result.status !== 0) {
throw new Error(`failed to dispatch ${jobPath}: ${result.stderr || result.stdout}`);
}
}
function waitForStartedRuns({ expectedCount, headSha, since }) {
const deadline = Date.now() + 5 * 60 * 1000;
let latest = [];
while (Date.now() < deadline) {
latest = listClusterRuns()
.filter((run) => run.headSha === headSha)
.filter((run) => Date.parse(run.createdAt) >= Date.parse(since))
.sort((left, right) => Date.parse(left.createdAt) - Date.parse(right.createdAt));
if (latest.length >= expectedCount && latest.every((run) => !QUEUED_STATUSES.has(run.status))) {
return latest.slice(-expectedCount);
}
sleepMs(5_000);
}
return latest.slice(-expectedCount);
}
function assertGateOpenIfNeeded(mode) {
if (!["execute", "autonomous"].includes(mode)) return;
if (readGate("CLOWNFISH_ALLOW_EXECUTE") !== "1") {
throw new Error("refusing write-mode requeue: CLOWNFISH_ALLOW_EXECUTE is not 1; use --open-execute-window");
}
}
function listClusterRuns() {
return ghJson([
"run",
"list",
"--repo",
repo,
"--workflow",
workflow,
"--limit",
"50",
"--json",
"databaseId,headSha,status,conclusion,createdAt,url",
]);
}
function readGate(name) {
const variables = ghJson(["variable", "list", "--repo", repo, "--json", "name,value"]);
return variables.find((variable) => variable.name === name)?.value ?? "";
}
function openGate(name) {
const previous = readGate(name);
gateRestores.push({ name, previous });
if (previous !== "1") setGate(name, "1");
}
function setGate(name, value) {
execFileSync("gh", ["variable", "set", name, "--repo", repo, "--body", value], {
cwd: repoRoot(),
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
});
console.log(`${name}=${value}`);
}
function currentHeadSha() {
return execFileSync("git", ["rev-parse", "origin/main"], {
cwd: repoRoot(),
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
}).trim();
}
function ghJson(ghArgs) {
const text = execFileSync("gh", ghArgs, {
cwd: repoRoot(),
encoding: "utf8",
stdio: ["ignore", "pipe", "pipe"],
maxBuffer: 64 * 1024 * 1024,
});
return JSON.parse(text || "null");
}
function looksLikeRunId(value) {
return /^[0-9]{6,}$/.test(String(value ?? ""));
}
function sleepMs(milliseconds) {
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, milliseconds);
}