feat(flows): add decision() helper for constrained LLM branching (#278)

Wraps the existing acp + parse + switch pattern into a typed helper that
scaffolds the JSON-with-reason prompt and validates the chosen value
against the supplied choices tuple. decisionEdge() builds the matching
switch edge with exhaustively-typed cases. Returns a plain AcpNodeDefinition
so no schema, snapshot, or replay-viewer changes are required.

Rewrites examples/flows/branch.flow.ts to use the helper.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joshua Lelon Mitchell 2026-05-05 14:28:05 -05:00 committed by GitHub
parent fd67b109b3
commit 2d1e30d00c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 326 additions and 20 deletions

View File

@ -8,6 +8,8 @@ Repo: https://github.com/openclaw/acpx
### Changes
- Flows/authoring: add `decision()` and `decisionEdge()` helpers for constrained LLM branching on top of the existing `acp`, `parse`, and `switch` machinery. (#278) Thanks @JoshuaLelon.
### Breaking
### Fixes

View File

@ -225,6 +225,7 @@ runtime and persists run state under `~/.acpx/flows/runs/`.
Flows are for multi-step ACP work where one prompt is not enough:
- `acp` steps keep model-shaped work in ACP
- `decision()` and `decisionEdge()` wrap constrained-choice ACP branching without adding a new node type
- `action` steps handle deterministic mechanics like shell commands or GitHub calls
- `compute` steps do local routing or shaping
- `checkpoint` steps pause for something outside the runtime

View File

@ -10,7 +10,7 @@ They intentionally use the public authoring surface:
- export a flow via `defineFlow(...)`
- `echo.flow.ts`: one ACP step that returns a JSON reply
- `branch.flow.ts`: ACP classification followed by a deterministic branch into either `continue` or `checkpoint`
- `branch.flow.ts`: constrained-choice classification using `decision()` and `decisionEdge()`, followed by a deterministic branch into either `continue` or `checkpoint`
- `pr-triage/pr-triage.flow.ts`: a larger single-PR workflow example with a colocated written spec in `pr-triage/README.md`
- `replay-viewer/`: a browser app that visualizes saved flow run bundles with React Flow, a recent-runs picker, ACP session inspection, and a dedicated viewer spec in `docs/2026-03-27-flow-replay-viewer.md`
- `shell.flow.ts`: one native runtime-owned shell action that returns structured JSON

View File

@ -1,32 +1,29 @@
import { acp, checkpoint, defineFlow, extractJsonObject } from "acpx/flows";
import { acp, checkpoint, decision, decisionEdge, defineFlow, extractJsonObject } from "acpx/flows";
type BranchInput = {
task?: string;
};
const classifyChoices = ["continue", "checkpoint"] as const;
export default defineFlow({
name: "example-branch",
startAt: "classify",
nodes: {
classify: acp({
async prompt({ input }) {
classify: decision({
choices: classifyChoices,
question: ({ input }) => {
const task =
(input as BranchInput).task ??
"Investigate a flaky test and decide whether the request is clear enough to continue.";
return [
"Read the task below.",
"If it is concrete and scoped, route `continue`.",
"If it is ambiguous or needs clarification, route `checkpoint`.",
"Return exactly one JSON object with this shape:",
"{",
' "route": "continue" | "checkpoint",',
' "reason": "short explanation"',
"}",
"Pick `continue` if it is concrete and scoped.",
"Pick `checkpoint` if it is ambiguous or needs clarification.",
"",
`Task: ${task}`,
].join("\n");
},
parse: (text) => extractJsonObject(text),
}),
continue_lane: acp({
async prompt({ outputs }) {
@ -52,15 +49,13 @@ export default defineFlow({
}),
},
edges: [
{
decisionEdge({
from: "classify",
switch: {
on: "$.route",
cases: {
continue: "continue_lane",
checkpoint: "checkpoint_lane",
},
choices: classifyChoices,
cases: {
continue: "continue_lane",
checkpoint: "checkpoint_lane",
},
},
}),
],
});

View File

@ -31,6 +31,7 @@ Core capabilities:
- Stable ACP `authenticate` handshake via env/config credentials
- Structured streaming output (`text`, `json`, `quiet`) with optional `--suppress-reads`
- Built-in agent registry plus raw `--agent` escape hatch
- Experimental `flow run` support with `acpx/flows` helpers, including constrained-choice `decision()` branching
## Install

View File

@ -1,5 +1,7 @@
export { FlowRunner } from "./flows/runtime.js";
export { acp, action, checkpoint, compute, defineFlow, shell } from "./flows/definition.js";
export { decision, decisionEdge } from "./flows/decision.js";
export type { DecisionDefinition } from "./flows/decision.js";
export type {
AcpNodeDefinition,
ActionNodeDefinition,

114
src/flows/decision.ts Normal file
View File

@ -0,0 +1,114 @@
import { acp } from "./definition.js";
import { extractJsonObject } from "./json.js";
import type { AcpNodeDefinition, FlowEdge, FlowNodeContext } from "./types.js";
const DEFAULT_FIELD = "route";
const SIMPLE_FIELD_PATTERN = /^[A-Za-z_][A-Za-z0-9_]*$/;
// All `acp` node fields except the ones the decision helper owns.
type DecisionAcpOptions = Omit<AcpNodeDefinition, "nodeType" | "prompt" | "parse">;
export type DecisionDefinition<TChoice extends string> = DecisionAcpOptions & {
question: string | ((context: FlowNodeContext) => string | Promise<string>);
choices: readonly TChoice[];
field?: string;
};
// Build an `acp` node that asks the model to pick one of `choices` and reply
// with a JSON object whose chosen field is validated. Pair with `decisionEdge`
// (or any `switch` edge keyed on `$.<field>`) to route on the result.
export function decision<TChoice extends string>(
definition: DecisionDefinition<TChoice>,
): AcpNodeDefinition {
const { question, choices, field: fieldOverride, ...acpOptions } = definition;
const field = normalizeField(fieldOverride);
assertValidChoices(choices);
const allowed = new Set<string>(choices);
return acp({
...acpOptions,
async prompt(context) {
const text = typeof question === "function" ? await question(context) : question;
return formatDecisionPrompt(text, choices, field);
},
parse(text) {
const raw = extractJsonObject(text);
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
throw new Error(`Decision response must be a JSON object, got ${typeof raw}`);
}
const value = (raw as Record<string, unknown>)[field];
if (typeof value !== "string" || !allowed.has(value)) {
const allowedLabels = choices.map((choice) => JSON.stringify(choice)).join(", ");
throw new Error(
`Decision returned invalid ${field}=${JSON.stringify(value)}; expected one of ${allowedLabels}`,
);
}
return raw;
},
});
}
// Build the matching `switch` edge for a `decision` node. Typing `cases` as
// `Record<TChoice, string>` makes a missing case a compile error.
export function decisionEdge<TChoice extends string>(args: {
from: string;
choices: readonly TChoice[];
field?: string;
cases: Record<TChoice, string>;
}): FlowEdge {
const field = normalizeField(args.field);
assertValidChoices(args.choices);
for (const choice of args.choices) {
if (!Object.hasOwn(args.cases, choice)) {
throw new Error(`Decision edge is missing case for choice ${JSON.stringify(choice)}`);
}
}
return {
from: args.from,
switch: {
on: `$.${field}`,
cases: args.cases as Record<string, string>,
},
};
}
function assertValidChoices(choices: readonly string[]): void {
if (choices.length === 0) {
throw new Error("Decision choices must include at least one value");
}
const seen = new Set<string>();
for (const choice of choices) {
if (typeof choice !== "string" || choice.length === 0) {
throw new Error("Decision choices must be non-empty strings");
}
if (seen.has(choice)) {
throw new Error(`Decision choices must be unique; duplicate ${JSON.stringify(choice)}`);
}
seen.add(choice);
}
}
function normalizeField(fieldOverride: string | undefined): string {
const field = fieldOverride ?? DEFAULT_FIELD;
if (!SIMPLE_FIELD_PATTERN.test(field)) {
throw new Error(
`Decision field must be a simple JSON key matching ${SIMPLE_FIELD_PATTERN.source}`,
);
}
return field;
}
function formatDecisionPrompt(question: string, choices: readonly string[], field: string): string {
const allowed = choices.map((choice) => JSON.stringify(choice)).join(" | ");
return [
question,
"",
"Return exactly one JSON object with this shape:",
"{",
` ${JSON.stringify(field)}: ${allowed},`,
' "reason": "short justification"',
"}",
"",
"Do not include any other text outside the JSON object.",
].join("\n");
}

View File

@ -5,6 +5,7 @@ import path from "node:path";
import test from "node:test";
import { fileURLToPath } from "node:url";
import { TimeoutError } from "../src/async-control.js";
import { decision, decisionEdge } from "../src/flows/decision.js";
import { validateFlowDefinition } from "../src/flows/graph.js";
import { extractJsonObject, parseJsonObject, parseStrictJsonObject } from "../src/flows/json.js";
import { createRunId } from "../src/flows/runtime-support.js";
@ -94,6 +95,196 @@ test("createRunId slugifies flow names without regex backtracking", () => {
);
});
test("decision builds an acp node that scaffolds the prompt and validates the choice", async () => {
const node = decision({
choices: ["continue", "checkpoint"] as const,
question: "Decide.",
});
assert.equal(node.nodeType, "acp");
assert.equal(typeof node.prompt, "function");
assert.equal(typeof node.parse, "function");
const promptText = (await node.prompt({
input: undefined,
outputs: {},
results: {},
state: {} as never,
services: {},
})) as string;
assert.match(promptText, /Decide\./);
assert.match(promptText, /Return exactly one JSON object with this shape:/);
assert.match(promptText, /"route": "continue" \| "checkpoint"/);
assert.match(promptText, /"reason": "short justification"/);
const valid = (await node.parse?.('{"route":"continue","reason":"clear"}', {} as never)) as
| Record<string, unknown>
| undefined;
assert.deepEqual(valid, { route: "continue", reason: "clear" });
await assert.rejects(
async () => node.parse?.('{"route":"sideways"}', {} as never),
/Decision returned invalid route="sideways"; expected one of "continue", "checkpoint"/,
);
await assert.rejects(
async () => node.parse?.('{"reason":"none"}', {} as never),
/Decision returned invalid route=undefined/,
);
await assert.rejects(
async () => node.parse?.("[1,2,3]", {} as never),
/Decision response must be a JSON object/,
);
});
test("decision honors a custom field name and forwards acp options", async () => {
const node = decision({
field: "verdict",
choices: ["yes", "no"] as const,
question: () => Promise.resolve("Approve?"),
profile: "claude",
timeoutMs: 1234,
});
assert.equal(node.profile, "claude");
assert.equal(node.timeoutMs, 1234);
const promptText = (await node.prompt({
input: undefined,
outputs: {},
results: {},
state: {} as never,
services: {},
})) as string;
assert.match(promptText, /"verdict": "yes" \| "no"/);
const valid = (await node.parse?.('{"verdict":"yes"}', {} as never)) as
| Record<string, unknown>
| undefined;
assert.deepEqual(valid, { verdict: "yes" });
});
test("decisionEdge produces a switch edge keyed on the chosen field", () => {
const edge = decisionEdge({
from: "classify",
choices: ["continue", "checkpoint"] as const,
cases: { continue: "continue_lane", checkpoint: "checkpoint_lane" },
});
assert.deepEqual(edge, {
from: "classify",
switch: {
on: "$.route",
cases: { continue: "continue_lane", checkpoint: "checkpoint_lane" },
},
});
const customField = decisionEdge({
from: "approve",
field: "verdict",
choices: ["yes", "no"] as const,
cases: { yes: "ship", no: "rollback" },
});
assert.equal("switch" in customField && customField.switch.on, "$.verdict");
});
test("decision validates choices, field names, and edge cases", () => {
assert.throws(
() =>
decision({
choices: [],
question: "Choose.",
}),
/Decision choices must include at least one value/,
);
assert.throws(
() =>
decision({
choices: ["yes", "yes"],
question: "Choose.",
}),
/Decision choices must be unique/,
);
assert.throws(
() =>
decision({
field: "bad.path",
choices: ["yes"],
question: "Choose.",
}),
/Decision field must be a simple JSON key/,
);
assert.throws(
() =>
decisionEdge({
from: "classify",
choices: ["yes", "no"] as const,
cases: { yes: "ship" } as Record<"yes" | "no", string>,
}),
/Decision edge is missing case for choice "no"/,
);
});
test("FlowRunner routes through decision helpers", async () => {
await withTempHome(async () => {
const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-flow-decision-cwd-"));
try {
const runner = new FlowRunner({
resolveAgent: () => ({
agentName: "mock",
agentCommand: MOCK_AGENT_COMMAND,
cwd,
}),
permissionMode: "approve-all",
ttlMs: 1_000,
});
const choices = ["continue", "checkpoint"] as const;
const flow = defineFlow({
name: "decision-branch-test",
startAt: "classify",
nodes: {
classify: decision({
session: {
isolated: true,
},
choices,
question: ({ input }) => {
const route = (input as { route: string }).route;
return `echo ${JSON.stringify({ route, reason: "mocked" })}`;
},
}),
continue_lane: action({
run: () => ({ ok: true }),
}),
checkpoint_lane: checkpoint({
run: () => ({ ok: false }),
}),
},
edges: [
decisionEdge({
from: "classify",
choices,
cases: {
continue: "continue_lane",
checkpoint: "checkpoint_lane",
},
}),
],
});
const result = await runner.run(flow, { route: "continue" });
assert.equal(result.state.status, "completed");
assert.deepEqual(result.state.outputs.classify, { route: "continue", reason: "mocked" });
assert.deepEqual(result.state.outputs.continue_lane, { ok: true });
assert.equal(result.state.outputs.checkpoint_lane, undefined);
} finally {
await fs.rm(cwd, { recursive: true, force: true });
}
});
});
test("flow node helpers validate node-local shape before runtime", () => {
const extensibleNode = compute({
run: () => ({ ok: true }),