refactor: dedupe session and ACP helpers

This commit is contained in:
Peter Steinberger 2026-04-25 11:32:44 +01:00
parent 7ac8cf0bb3
commit 7f3c177e7c
No known key found for this signature in database
8 changed files with 162 additions and 263 deletions

View File

@ -152,54 +152,12 @@ function compareVersionParts(left: readonly number[], right: readonly number[]):
}
async function detectGeminiVersion(command: string): Promise<GeminiVersion | undefined> {
return await new Promise<GeminiVersion | undefined>((resolve) => {
const child = spawn(
command,
["--version"],
buildSpawnCommandOptions(command, {
stdio: ["ignore", "pipe", "pipe"],
windowsHide: true,
}),
);
let stdout = "";
let stderr = "";
let settled = false;
const finish = (value: GeminiVersion | undefined) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
child.removeAllListeners();
child.stdout?.removeAllListeners();
child.stderr?.removeAllListeners();
resolve(value);
};
const timer = setTimeout(() => {
child.kill("SIGKILL");
finish(undefined);
}, GEMINI_VERSION_TIMEOUT_MS);
child.stdout?.setEncoding("utf8");
child.stderr?.setEncoding("utf8");
child.stdout?.on("data", (chunk: string) => {
stdout += chunk;
});
child.stderr?.on("data", (chunk: string) => {
stderr += chunk;
});
child.once("error", () => {
finish(undefined);
});
child.once("close", () => {
const versionLine = `${stdout}\n${stderr}`
.split(/\r?\n/)
.map((line) => line.trim())
.find((line) => /\d+\.\d+\.\d+/.test(line));
finish(parseGeminiVersion(versionLine));
});
});
const output = await readCommandOutput(command, ["--version"], GEMINI_VERSION_TIMEOUT_MS);
const versionLine = output
?.split(/\r?\n/)
.map((line) => line.trim())
.find((line) => /\d+\.\d+\.\d+/.test(line));
return parseGeminiVersion(versionLine);
}
export async function resolveGeminiCommandArgs(

View File

@ -9,7 +9,7 @@ function asRecord(value: unknown): Record<string, unknown> | undefined {
return value as Record<string, unknown>;
}
function toAcpErrorPayload(value: unknown): OutputErrorAcpPayload | undefined {
export function toAcpErrorPayload(value: unknown): OutputErrorAcpPayload | undefined {
const record = asRecord(value);
if (!record) {
return undefined;

View File

@ -30,6 +30,7 @@ import {
resolvePermissionMode,
resolveSessionNameFromFlags,
type ExecFlags,
type GlobalFlags,
type PromptFlags,
type SessionsHistoryFlags,
type SessionsNewFlags,
@ -156,6 +157,70 @@ function resolveRequestedOutputPolicy(globalFlags: {
};
}
type ResolvedAgentInvocation = ReturnType<typeof resolveAgentInvocation>;
function sessionOptionsFromGlobalFlags(
globalFlags: GlobalFlags,
): NonNullable<Parameters<SessionModule["createSession"]>[0]["sessionOptions"]> {
return {
model: globalFlags.model,
allowedTools: globalFlags.allowedTools,
maxTurns: globalFlags.maxTurns,
systemPrompt: globalFlags.systemPrompt,
};
}
function buildSessionStartOptions(params: {
agent: ResolvedAgentInvocation;
flags: SessionsNewFlags;
globalFlags: GlobalFlags;
config: ResolvedAcpxConfig;
permissionMode: ReturnType<typeof resolvePermissionMode>;
}): Parameters<SessionModule["createSession"]>[0] {
return {
agentCommand: params.agent.agentCommand,
cwd: params.agent.cwd,
name: params.flags.name,
resumeSessionId: params.flags.resumeSession,
mcpServers: params.config.mcpServers,
permissionMode: params.permissionMode,
nonInteractivePermissions: params.globalFlags.nonInteractivePermissions,
authCredentials: params.config.auth,
authPolicy: params.globalFlags.authPolicy,
terminal: params.globalFlags.terminal,
timeoutMs: params.globalFlags.timeout,
verbose: params.globalFlags.verbose,
sessionOptions: sessionOptionsFromGlobalFlags(params.globalFlags),
};
}
function missingScopedSessionMessage(
agent: ResolvedAgentInvocation,
sessionName: string | undefined,
): string {
return sessionName
? `No named session "${sessionName}" for cwd ${agent.cwd} and agent ${agent.agentName}`
: `No cwd session for ${agent.cwd} and agent ${agent.agentName}`;
}
async function findScopedSessionOrThrow(
agent: ResolvedAgentInvocation,
sessionName: string | undefined,
): Promise<SessionRecord> {
const record = await findSession({
agentCommand: agent.agentCommand,
cwd: agent.cwd,
name: sessionName,
includeClosed: true,
});
if (!record) {
throw new Error(missingScopedSessionMessage(agent, sessionName));
}
return record;
}
async function findRoutedSessionOrThrow(
agentCommand: string,
agentName: string,
@ -589,11 +654,7 @@ export async function handleSessionsClose(
});
if (!record) {
throw new Error(
sessionName
? `No named session "${sessionName}" for cwd ${agent.cwd} and agent ${agent.agentName}`
: `No cwd session for ${agent.cwd} and agent ${agent.agentName}`,
);
throw new Error(missingScopedSessionMessage(agent, sessionName));
}
const closed = await closeSession(record.acpxRecordId);
@ -625,26 +686,9 @@ export async function handleSessionsNew(
}
}
const created = await createSession({
agentCommand: agent.agentCommand,
cwd: agent.cwd,
name: flags.name,
resumeSessionId: flags.resumeSession,
mcpServers: config.mcpServers,
permissionMode,
nonInteractivePermissions: globalFlags.nonInteractivePermissions,
authCredentials: config.auth,
authPolicy: globalFlags.authPolicy,
terminal: globalFlags.terminal,
timeoutMs: globalFlags.timeout,
verbose: globalFlags.verbose,
sessionOptions: {
model: globalFlags.model,
allowedTools: globalFlags.allowedTools,
maxTurns: globalFlags.maxTurns,
systemPrompt: globalFlags.systemPrompt,
},
});
const created = await createSession(
buildSessionStartOptions({ agent, flags, globalFlags, config, permissionMode }),
);
printCreatedSessionBanner(created, agent.agentName, globalFlags.format, globalFlags.jsonStrict);
@ -667,26 +711,9 @@ export async function handleSessionsEnsure(
const agent = resolveAgentInvocation(explicitAgentName, globalFlags, config);
const [{ ensureSession }, { printCreatedSessionBanner, printEnsuredSessionByFormat }] =
await Promise.all([loadSessionModule(), loadOutputRenderModule()]);
const result = await ensureSession({
agentCommand: agent.agentCommand,
cwd: agent.cwd,
name: flags.name,
resumeSessionId: flags.resumeSession,
mcpServers: config.mcpServers,
permissionMode,
nonInteractivePermissions: globalFlags.nonInteractivePermissions,
authCredentials: config.auth,
authPolicy: globalFlags.authPolicy,
terminal: globalFlags.terminal,
timeoutMs: globalFlags.timeout,
verbose: globalFlags.verbose,
sessionOptions: {
model: globalFlags.model,
allowedTools: globalFlags.allowedTools,
maxTurns: globalFlags.maxTurns,
systemPrompt: globalFlags.systemPrompt,
},
});
const result = await ensureSession(
buildSessionStartOptions({ agent, flags, globalFlags, config, permissionMode }),
);
if (result.created) {
printCreatedSessionBanner(
@ -846,20 +873,7 @@ export async function handleSessionsShow(
): Promise<void> {
const globalFlags = resolveGlobalFlags(command, config);
const agent = resolveAgentInvocation(explicitAgentName, globalFlags, config);
const record = await findSession({
agentCommand: agent.agentCommand,
cwd: agent.cwd,
name: sessionName,
includeClosed: true,
});
if (!record) {
throw new Error(
sessionName
? `No named session "${sessionName}" for cwd ${agent.cwd} and agent ${agent.agentName}`
: `No cwd session for ${agent.cwd} and agent ${agent.agentName}`,
);
}
const record = await findScopedSessionOrThrow(agent, sessionName);
printSessionDetailsByFormat(record, globalFlags.format);
}
@ -873,20 +887,7 @@ export async function handleSessionsHistory(
): Promise<void> {
const globalFlags = resolveGlobalFlags(command, config);
const agent = resolveAgentInvocation(explicitAgentName, globalFlags, config);
const record = await findSession({
agentCommand: agent.agentCommand,
cwd: agent.cwd,
name: sessionName,
includeClosed: true,
});
if (!record) {
throw new Error(
sessionName
? `No named session "${sessionName}" for cwd ${agent.cwd} and agent ${agent.agentName}`
: `No cwd session for ${agent.cwd} and agent ${agent.agentName}`,
);
}
const record = await findScopedSessionOrThrow(agent, sessionName);
printSessionHistoryByFormat(record, flags.limit, globalFlags.format);
}

View File

@ -1,4 +1,5 @@
import type { SetSessionConfigOptionResponse } from "@agentclientprotocol/sdk";
import { toAcpErrorPayload } from "../../acp/error-shapes.js";
import { isAcpJsonRpcMessage } from "../../acp/jsonrpc.js";
import { isPromptInput, textPrompt } from "../../prompt-content.js";
import {
@ -171,25 +172,6 @@ function isOutputErrorOrigin(value: unknown): value is OutputErrorOrigin {
return typeof value === "string" && OUTPUT_ERROR_ORIGINS.includes(value as OutputErrorOrigin);
}
function parseAcpError(value: unknown): OutputErrorAcpPayload | undefined {
const record = asRecord(value);
if (!record) {
return undefined;
}
if (typeof record.code !== "number" || !Number.isFinite(record.code)) {
return undefined;
}
if (typeof record.message !== "string" || record.message.length === 0) {
return undefined;
}
return {
code: record.code,
message: record.message,
data: record.data,
};
}
function parseSessionOptions(value: unknown): QueueSessionOptions | null | undefined {
if (value == null) {
return undefined;
@ -537,7 +519,7 @@ export function parseQueueOwnerMessage(raw: unknown): QueueOwnerMessage | null {
? message.detailCode
: undefined;
const retryable = typeof message.retryable === "boolean" ? message.retryable : undefined;
const acp = parseAcpError(message.acp);
const acp = toAcpErrorPayload(message.acp);
const outputAlreadyEmitted =
typeof message.outputAlreadyEmitted === "boolean" ? message.outputAlreadyEmitted : undefined;

View File

@ -0,0 +1,25 @@
import type { AcpClient, SessionCreateResult } from "../../acp/client.js";
import { withTimeout } from "../../async-control.js";
export async function applyRequestedModelIfAdvertised(params: {
client: AcpClient;
sessionId: string;
requestedModel: string | undefined;
models: SessionCreateResult["models"];
timeoutMs?: number;
}): Promise<boolean> {
const requestedModel =
typeof params.requestedModel === "string" ? params.requestedModel.trim() : "";
if (!requestedModel || !params.models) {
return false;
}
if (params.models.currentModelId === requestedModel) {
return true;
}
await withTimeout(
params.client.setSessionModel(params.sessionId, requestedModel),
params.timeoutMs,
);
return true;
}

View File

@ -2,6 +2,8 @@ import { withTimeout } from "../../async-control.js";
import {
withConnectedSession,
type FullConnectedSessionController,
type WithConnectedSessionOptions,
type WithConnectedSessionResult,
} from "../../runtime/engine/connected-session.js";
import {
setCurrentModelId,
@ -65,10 +67,24 @@ export type RunSessionSetModelDirectOptions = {
onClientClosed?: () => void;
};
export async function runSessionSetModeDirect(
options: RunSessionSetModeDirectOptions,
): Promise<SessionSetModeResult> {
const result = await withConnectedSession({
type DirectConnectedSessionOptions = {
sessionRecordId: string;
mcpServers?: McpServer[];
nonInteractivePermissions?: NonInteractivePermissionPolicy;
authCredentials?: Record<string, string>;
authPolicy?: AuthPolicy;
terminal?: boolean;
timeoutMs?: number;
verbose?: boolean;
onClientAvailable?: (controller: ActiveSessionController) => void;
onClientClosed?: () => void;
};
function buildDirectConnectedSessionOptions<T>(
options: DirectConnectedSessionOptions,
run: WithConnectedSessionOptions<T>["run"],
): WithConnectedSessionOptions<T> {
return {
sessionRecordId: options.sessionRecordId,
loadRecord: resolveSessionRecord,
saveRecord: writeSessionRecord,
@ -83,12 +99,13 @@ export async function runSessionSetModeDirect(
options.onClientAvailable?.(controller);
},
onClientClosed: options.onClientClosed,
run: async ({ client, sessionId, record }) => {
await withTimeout(client.setSessionMode(sessionId, options.modeId), options.timeoutMs);
setDesiredModeId(record, options.modeId);
},
});
run,
};
}
function toSessionMutationResult(
result: Pick<WithConnectedSessionResult<unknown>, "record" | "resumed" | "loadError">,
): Pick<SessionSetModeResult, "record" | "resumed" | "loadError"> {
return {
record: result.record,
resumed: result.resumed,
@ -96,57 +113,38 @@ export async function runSessionSetModeDirect(
};
}
export async function runSessionSetModeDirect(
options: RunSessionSetModeDirectOptions,
): Promise<SessionSetModeResult> {
const result = await withConnectedSession(
buildDirectConnectedSessionOptions(options, async ({ client, sessionId, record }) => {
await withTimeout(client.setSessionMode(sessionId, options.modeId), options.timeoutMs);
setDesiredModeId(record, options.modeId);
}),
);
return toSessionMutationResult(result);
}
export async function runSessionSetModelDirect(
options: RunSessionSetModelDirectOptions,
): Promise<SessionSetModelResult> {
const result = await withConnectedSession({
sessionRecordId: options.sessionRecordId,
loadRecord: resolveSessionRecord,
saveRecord: writeSessionRecord,
mcpServers: options.mcpServers,
nonInteractivePermissions: options.nonInteractivePermissions,
authCredentials: options.authCredentials,
authPolicy: options.authPolicy,
terminal: options.terminal,
timeoutMs: options.timeoutMs,
verbose: options.verbose,
onClientAvailable: (controller: FullConnectedSessionController) => {
options.onClientAvailable?.(controller);
},
onClientClosed: options.onClientClosed,
run: async ({ client, sessionId, record }) => {
const result = await withConnectedSession(
buildDirectConnectedSessionOptions(options, async ({ client, sessionId, record }) => {
await withTimeout(client.setSessionModel(sessionId, options.modelId), options.timeoutMs);
setDesiredModelId(record, options.modelId);
setCurrentModelId(record, options.modelId);
},
});
}),
);
return {
record: result.record,
resumed: result.resumed,
loadError: result.loadError,
};
return toSessionMutationResult(result);
}
export async function runSessionSetConfigOptionDirect(
options: RunSessionSetConfigOptionDirectOptions,
): Promise<SessionSetConfigOptionResult> {
const result = await withConnectedSession({
sessionRecordId: options.sessionRecordId,
loadRecord: resolveSessionRecord,
saveRecord: writeSessionRecord,
mcpServers: options.mcpServers,
nonInteractivePermissions: options.nonInteractivePermissions,
authCredentials: options.authCredentials,
authPolicy: options.authPolicy,
terminal: options.terminal,
timeoutMs: options.timeoutMs,
verbose: options.verbose,
onClientAvailable: (controller: FullConnectedSessionController) => {
options.onClientAvailable?.(controller);
},
onClientClosed: options.onClientClosed,
run: async ({ client, sessionId, record }) => {
const result = await withConnectedSession(
buildDirectConnectedSessionOptions(options, async ({ client, sessionId, record }) => {
const response = await withTimeout(
client.setSessionConfigOption(sessionId, options.configId, options.value),
options.timeoutMs,
@ -157,8 +155,8 @@ export async function runSessionSetConfigOptionDirect(
setDesiredConfigOption(record, options.configId, options.value);
}
return response;
},
});
}),
);
return {
record: result.record,

View File

@ -37,52 +37,32 @@ import {
} from "../../session/persistence.js";
import type {
AcpJsonRpcMessage,
AcpMessageDirection,
AuthPolicy,
ClientOperation,
McpServer,
NonInteractivePermissionPolicy,
OutputErrorAcpPayload,
OutputErrorCode,
OutputErrorOrigin,
OutputFormatter,
PermissionMode,
PromptInput,
RunPromptResult,
SessionNotification,
SessionRecord,
SessionResumePolicy,
SessionSendResult,
} from "../../types.js";
import { type QueueOwnerMessage, type QueueTask, waitMs } from "../queue/ipc.js";
import { type QueueOwnerActiveSessionController } from "../queue/owner-turn-controller.js";
import type { RunOnceOptions, SessionSendOptions } from "./contracts.js";
import { applyRequestedModelIfAdvertised } from "./model-helpers.js";
const INTERRUPT_CANCEL_WAIT_MS = 2_500;
type RunSessionPromptOptions = {
type RunSessionPromptOptions = Omit<
SessionSendOptions,
"errorEmissionPolicy" | "maxQueueDepth" | "sessionId" | "ttlMs" | "waitForCompletion"
> & {
sessionRecordId: string;
prompt: PromptInput;
resumePolicy?: SessionResumePolicy;
mcpServers?: McpServer[];
permissionMode: PermissionMode;
nonInteractivePermissions?: NonInteractivePermissionPolicy;
authCredentials?: Record<string, string>;
authPolicy?: AuthPolicy;
terminal?: boolean;
outputFormatter: OutputFormatter;
onAcpMessage?: (direction: AcpMessageDirection, message: AcpJsonRpcMessage) => void;
onSessionUpdate?: (notification: SessionNotification) => void;
onClientOperation?: (operation: ClientOperation) => void;
timeoutMs?: number;
suppressSdkConsoleErrors?: boolean;
verbose?: boolean;
promptRetries?: number;
sessionOptions?: SessionAgentOptions;
onClientAvailable?: (controller: ActiveSessionController) => void;
onClientClosed?: () => void;
onPromptActive?: () => Promise<void> | void;
client?: AcpClient;
};
type ActiveSessionController = QueueOwnerActiveSessionController;
@ -149,29 +129,6 @@ function toPromptResult(
};
}
async function applyRequestedModelIfAdvertised(params: {
client: AcpClient;
sessionId: string;
requestedModel: string | undefined;
models: import("../../acp/client.js").SessionCreateResult["models"];
timeoutMs?: number;
}): Promise<boolean> {
const requestedModel =
typeof params.requestedModel === "string" ? params.requestedModel.trim() : "";
if (!requestedModel || !params.models) {
return false;
}
if (params.models.currentModelId === requestedModel) {
return true;
}
await withTimeout(
params.client.setSessionModel(params.sessionId, requestedModel),
params.timeoutMs,
);
return true;
}
async function applyPromptModelIfAdvertised(params: {
client: AcpClient;
sessionId: string;

View File

@ -21,6 +21,7 @@ import type {
SessionCreateWithClientResult,
SessionEnsureOptions,
} from "./contracts.js";
import { applyRequestedModelIfAdvertised } from "./model-helpers.js";
import { setSessionModel } from "./session-control.js";
function persistSessionOptions(
@ -70,29 +71,6 @@ function persistSessionOptions(
delete record.acpx.session_options;
}
async function applyRequestedModelIfAdvertised(params: {
client: AcpClient;
sessionId: string;
requestedModel: string | undefined;
models: SessionCreateResult["models"];
timeoutMs?: number;
}): Promise<boolean> {
const requestedModel =
typeof params.requestedModel === "string" ? params.requestedModel.trim() : "";
if (!requestedModel || !params.models) {
return false;
}
if (params.models.currentModelId === requestedModel) {
return true;
}
await withTimeout(
params.client.setSessionModel(params.sessionId, requestedModel),
params.timeoutMs,
);
return true;
}
async function createSessionRecordWithClient(
client: AcpClient,
options: SessionCreateOptions,