fix: harden resume security and runtime portability

This commit is contained in:
Vignesh Natarajan 2026-03-15 20:52:03 -07:00
parent e196a1ca51
commit 775c32b805
No known key found for this signature in database
GPG Key ID: C5E014CC92E2A144
18 changed files with 638 additions and 127 deletions

35
AGENTS.md Normal file
View File

@ -0,0 +1,35 @@
# AGENTS.md
Guidance for coding assistants operating in this repository.
## When To Use Lobster
- Prefer `lobster` for multi-step or repeatable workflows.
- Use direct shell commands for simple one-off tasks.
- Prefer deterministic pipelines/workflows over ad-hoc LLM re-planning loops.
## Invocation Contract
- Use tool mode for machine-readable output:
- `lobster run --mode tool '<pipeline>'`
- `lobster run --mode tool --file <workflow.lobster> --args-json '<json>'`
- If `lobster` is not on `PATH`, use:
- `node bin/lobster.js ...`
## Approval And Resume
- Treat `status: "needs_approval"` as a hard stop.
- Never auto-approve on behalf of a user.
- Resume only after explicit user decision:
- `lobster resume --token <resumeToken> --approve yes|no`
## Output Handling
- Parse the tool envelope JSON fields: `ok`, `status`, `output`, `requiresApproval`, `error`.
- On `ok: false`, surface the error and stop.
## Safety And Shell Usage
- For workflow-file commands, prefer environment variables (`LOBSTER_ARG_*`) for untrusted or quoted values.
- Avoid embedding unsafe user strings directly into shell command text.

View File

@ -4,6 +4,16 @@ import { runPipeline } from './runtime.js';
import { encodeToken } from './token.js';
import { decodeResumeToken, parseResumeArgs } from './resume.js';
import { runWorkflowFile } from './workflows/file.js';
import { randomUUID } from 'node:crypto';
import { deleteStateJson, readStateJson, writeStateJson } from './state/store.js';
type PipelineResumeState = {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
resumeAtIndex: number;
items: unknown[];
prompt?: string;
createdAt: string;
};
export async function runCli(argv) {
const registry = createDefaultRegistry();
@ -161,13 +171,19 @@ async function handleRun({ argv, registry }) {
: null;
if (approval) {
const resumeToken = encodeToken({
protocolVersion: 1,
v: 1,
const stateKey = await savePipelineResumeState(process.env, {
pipeline,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
const resumeToken = encodeToken({
protocolVersion: 1,
v: 1,
kind: 'pipeline-resume',
stateKey,
});
writeToolEnvelope({
@ -296,10 +312,25 @@ async function resolveWorkflowFile(candidate) {
async function handleResume({ argv, registry }) {
const mode = 'tool';
const { token, approved } = parseResumeArgs(argv);
const payload = decodeResumeToken(token);
let approved: boolean;
let payload: any;
try {
const parsed = parseResumeArgs(argv);
approved = parsed.approved;
payload = decodeResumeToken(parsed.token);
} catch (err) {
writeToolEnvelope({ ok: false, error: { type: 'parse_error', message: err?.message ?? String(err) } });
process.exitCode = 2;
return;
}
if (!approved) {
if (payload.kind === 'workflow-file' && payload.stateKey) {
await deleteStateJson({ env: process.env, key: payload.stateKey });
}
if (payload.kind === 'pipeline-resume' && payload.stateKey) {
await deleteStateJson({ env: process.env, key: payload.stateKey });
}
writeToolEnvelope({ ok: true, status: 'cancelled', output: [], requiresApproval: null });
return;
}
@ -337,11 +368,17 @@ async function handleResume({ argv, registry }) {
return;
}
}
const remaining = payload.pipeline.slice(payload.resumeAtIndex);
const input = (async function* () {
for (const item of payload.items) yield item;
})();
const previousStateKey = payload.stateKey;
let resumeState: PipelineResumeState;
try {
resumeState = await loadPipelineResumeState(process.env, previousStateKey);
} catch (err) {
writeToolEnvelope({ ok: false, error: { type: 'runtime_error', message: err?.message ?? String(err) } });
process.exitCode = 1;
return;
}
const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex);
const input = streamFromItems(resumeState.items);
try {
const output = await runPipeline({
@ -360,13 +397,20 @@ async function handleResume({ argv, registry }) {
: null;
if (approval) {
const resumeToken = encodeToken({
protocolVersion: 1,
v: 1,
const nextStateKey = await savePipelineResumeState(process.env, {
pipeline: remaining,
resumeAtIndex: (output.haltedAt?.index ?? -1) + 1,
items: approval.items,
prompt: approval.prompt,
createdAt: new Date().toISOString(),
});
await deleteStateJson({ env: process.env, key: previousStateKey });
const resumeToken = encodeToken({
protocolVersion: 1,
v: 1,
kind: 'pipeline-resume',
stateKey: nextStateKey,
});
writeToolEnvelope({
@ -378,6 +422,7 @@ async function handleResume({ argv, registry }) {
return;
}
await deleteStateJson({ env: process.env, key: previousStateKey });
writeToolEnvelope({ ok: true, status: 'ok', output: output.items, requiresApproval: null });
} catch (err) {
writeToolEnvelope({ ok: false, error: { type: 'runtime_error', message: err?.message ?? String(err) } });
@ -385,6 +430,30 @@ async function handleResume({ argv, registry }) {
}
}
function streamFromItems(items: unknown[]) {
return (async function* () {
for (const item of items) yield item;
})();
}
async function savePipelineResumeState(env: Record<string, string | undefined>, state: PipelineResumeState) {
const stateKey = `pipeline_resume_${randomUUID()}`;
await writeStateJson({ env, key: stateKey, value: state });
return stateKey;
}
async function loadPipelineResumeState(env: Record<string, string | undefined>, stateKey: string) {
const stored = await readStateJson({ env, key: stateKey });
if (!stored || typeof stored !== 'object') {
throw new Error('Pipeline resume state not found');
}
const data = stored as Partial<PipelineResumeState>;
if (!Array.isArray(data.pipeline)) throw new Error('Invalid pipeline resume state');
if (typeof data.resumeAtIndex !== 'number') throw new Error('Invalid pipeline resume state');
if (!Array.isArray(data.items)) throw new Error('Invalid pipeline resume state');
return data as PipelineResumeState;
}
async function readVersion() {
const { readFile } = await import('node:fs/promises');
const { fileURLToPath } = await import('node:url');

View File

@ -1,3 +1,5 @@
import { readLineFromStream } from '../../read_line.js';
function isInteractive(stdin) {
return Boolean(stdin.isTTY);
}
@ -49,7 +51,9 @@ export const approveCommand = {
}
ctx.stdout.write(`${prompt} [y/N] `);
const answer = await readLine(ctx.stdin);
const answer = await readLineFromStream(ctx.stdin, {
timeoutMs: parseApprovalTimeoutMs(ctx.env),
});
if (!/^y(es)?$/i.test(String(answer).trim())) {
throw new Error('Not approved');
@ -67,21 +71,11 @@ function buildPreview(items) {
return JSON.stringify(items, null, 2);
}
function readLine(stdin) {
return new Promise((resolve) => {
let buf = '';
const onData = (chunk) => {
buf += chunk.toString('utf8');
const idx = buf.indexOf('\n');
if (idx !== -1) {
stdin.off('data', onData);
resolve(buf.slice(0, idx));
}
};
stdin.on('data', onData);
});
function parseApprovalTimeoutMs(env) {
const raw = env?.LOBSTER_APPROVAL_INPUT_TIMEOUT_MS;
const value = Number(raw);
if (!Number.isFinite(value) || value <= 0) return 0;
return Math.floor(value);
}
async function* asStream(items) {

View File

@ -182,10 +182,10 @@ export const emailTriageCommand = {
type: 'object',
properties: {
limit: { type: 'number', description: 'Maximum items to consume from input stream', default: 20 },
llm: { type: 'boolean', description: 'Use llm_task.invoke for categorization + draft replies (requires LLM_TASK_URL)' },
llm: { type: 'boolean', description: 'Use llm_task.invoke for categorization + draft replies (requires OPENCLAW_URL or CLAWD_URL)' },
model: { type: 'string', description: 'Model for llm_task.invoke (required when --llm true)' },
url: { type: 'string', description: 'llm-task base URL (or LLM_TASK_URL)' },
token: { type: 'string', description: 'Bearer token (or LLM_TASK_TOKEN)' },
url: { type: 'string', description: 'Reserved for compatibility (ignored in OpenClaw mode)' },
token: { type: 'string', description: 'Bearer token (or OPENCLAW_TOKEN/CLAWD_TOKEN)' },
temperature: { type: 'number', description: 'LLM temperature' },
'max-output-tokens': { type: 'number', description: 'Max completion tokens' },
emit: { type: 'string', description: "Output mode: 'report' (default) or 'drafts'", default: 'report' },
@ -222,9 +222,9 @@ export const emailTriageCommand = {
const wantLlm = Boolean(args.llm ?? false);
const env = ctx?.env ?? process.env;
const hasClawdUrl = Boolean(String(env.CLAWD_URL ?? '').trim());
const hasGatewayUrl = Boolean(String(env.OPENCLAW_URL ?? env.CLAWD_URL ?? '').trim());
if (!wantLlm || !hasClawdUrl) {
if (!wantLlm || !hasGatewayUrl) {
const report = buildDeterministicReport(emails);
if (emit === 'drafts') {
return { output: streamOf([]) };

View File

@ -1,4 +1,5 @@
import { spawn } from 'node:child_process';
import { resolveInlineShellCommand } from '../../shell.js';
export const execCommand = {
name: 'exec',
@ -8,7 +9,7 @@ export const execCommand = {
type: 'object',
properties: {
json: { type: 'boolean', description: 'Parse stdout as JSON (single value).' },
shell: { type: 'string', description: 'Run via /bin/sh -lc with this command line.' },
shell: { type: 'string', description: 'Run via the system shell with this command line.' },
_: { type: 'array', items: { type: 'string' }, description: 'Command + args.' },
},
required: ['_'],
@ -25,7 +26,7 @@ export const execCommand = {
`Notes:\n` +
` - With --json, parses stdout as JSON (single value).\n` +
` - With --stdin, writes pipeline input to stdin.\n` +
` - With --shell (or a single arg containing spaces), runs via /bin/sh -lc.\n`;
` - With --shell (or a single arg containing spaces), runs via the system shell.\n`;
},
async run({ input, args, ctx }) {
const cmd = args._;
@ -49,7 +50,7 @@ export const execCommand = {
}
const result = useShell
? await runProcess('/bin/sh', ['-lc', shellLine ?? cmd[0] ?? ''], { env: ctx.env, cwd: process.cwd(), stdin: stdinPayload })
? await runShellLine(shellLine ?? cmd[0] ?? '', { env: ctx.env, cwd: process.cwd(), stdin: stdinPayload })
: await runProcess(cmd[0], cmd.slice(1), { env: ctx.env, cwd: process.cwd(), stdin: stdinPayload });
if (args.json) {
@ -101,6 +102,11 @@ function runProcess(command, argv, { env, cwd, stdin }) {
});
}
function runShellLine(commandLine, { env, cwd, stdin }) {
const shell = resolveInlineShellCommand({ command: commandLine, env });
return runProcess(shell.command, shell.argv, { env, cwd, stdin });
}
function encodeStdin(items, mode) {
if (mode === 'json') return JSON.stringify(items);
if (mode === 'jsonl') {

View File

@ -20,9 +20,12 @@ function defaultCompare(a: any, b: any): number {
// number compare if both numbers
if (typeof a === 'number' && typeof b === 'number') return a - b;
// date-ish compare if both look like ISO/date strings?
// Keep it simple: string compare for everything else.
return String(a).localeCompare(String(b));
// Deterministic lexical compare independent of process locale.
const aStr = String(a);
const bStr = String(b);
if (aStr < bStr) return -1;
if (aStr > bStr) return 1;
return 0;
}
export const sortCommand = {

57
src/read_line.ts Normal file
View File

@ -0,0 +1,57 @@
export function readLineFromStream(
stream: NodeJS.ReadableStream,
opts?: { timeoutMs?: number },
) {
const timeoutMs = Number(opts?.timeoutMs ?? 0);
return new Promise<string>((resolve, reject) => {
let settled = false;
let buf = '';
let timer: NodeJS.Timeout | null = null;
const cleanup = () => {
stream.off('data', onData);
stream.off('end', onEnd);
stream.off('close', onClose);
stream.off('error', onError);
if (timer) clearTimeout(timer);
};
const finish = (value: string) => {
if (settled) return;
settled = true;
cleanup();
resolve(value);
};
const fail = (err: Error) => {
if (settled) return;
settled = true;
cleanup();
reject(err);
};
const onData = (chunk: Buffer | string) => {
buf += Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk);
const idx = buf.indexOf('\n');
if (idx !== -1) {
finish(buf.slice(0, idx));
}
};
const onEnd = () => finish(buf);
const onClose = () => finish(buf);
const onError = (err: Error) => fail(err);
if (timeoutMs > 0) {
timer = setTimeout(() => {
fail(new Error(`Timed out waiting for input (${timeoutMs}ms)`));
}, timeoutMs);
}
stream.on('data', onData);
stream.on('end', onEnd);
stream.on('close', onClose);
stream.on('error', onError);
});
}

View File

@ -1,6 +1,13 @@
import { decodeToken } from './token.js';
import { decodeWorkflowResumePayload } from './workflows/file.js';
export type PipelineResumePayload = {
protocolVersion: 1;
v: 1;
kind: 'pipeline-resume';
stateKey: string;
};
export function parseResumeArgs(argv) {
const args = { decision: null, token: null };
@ -46,10 +53,21 @@ export function decodeResumeToken(token) {
if (payload.v !== 1) throw new Error('Unsupported token version');
const workflowPayload = decodeWorkflowResumePayload(payload);
if (workflowPayload) return workflowPayload;
if (!Array.isArray(payload.pipeline)) throw new Error('Invalid token');
if (typeof payload.resumeAtIndex !== 'number') throw new Error('Invalid token');
if (!Array.isArray(payload.items)) throw new Error('Invalid token');
return payload;
const pipelinePayload = decodePipelineResumePayload(payload);
if (pipelinePayload) return pipelinePayload;
throw new Error('Invalid token');
}
function decodePipelineResumePayload(payload: unknown): PipelineResumePayload | null {
if (!payload || typeof payload !== 'object') return null;
const data = payload as Partial<PipelineResumePayload>;
if (data.kind !== 'pipeline-resume') return null;
if (data.protocolVersion !== 1 || data.v !== 1) throw new Error('Unsupported token version');
if (!data.stateKey || typeof data.stateKey !== 'string') throw new Error('Invalid token');
return {
protocolVersion: 1,
v: 1,
kind: 'pipeline-resume',
stateKey: data.stateKey,
};
}

View File

@ -10,6 +10,7 @@
*/
import { spawn } from 'node:child_process';
import { resolveInlineShellCommand } from '../../shell.js';
/**
* Run a process and capture output
@ -133,7 +134,8 @@ export function exec(cmdString, options: any = {}) {
if (useShell) {
// Shell execution
const result = await runProcess('/bin/sh', ['-c', cmdString], { env, cwd });
const shell = resolveInlineShellCommand({ command: cmdString, env });
const result = await runProcess(shell.command, shell.argv, { env, cwd });
stdout = result.stdout;
} else {
// Direct execution

60
src/shell.ts Normal file
View File

@ -0,0 +1,60 @@
export function resolveInlineShellCommand({
command,
env,
platform = process.platform,
}: {
command: string;
env: Record<string, string | undefined>;
platform?: string;
}) {
const shellOverride = String(env?.LOBSTER_SHELL ?? '').trim();
const isWindows = platform === 'win32';
if (shellOverride) {
return {
command: shellOverride,
argv: buildShellArgs({ shellCommand: shellOverride, command, isWindows }),
};
}
if (isWindows) {
const comspec = String(env?.ComSpec ?? env?.COMSPEC ?? 'cmd.exe').trim() || 'cmd.exe';
return {
command: comspec,
argv: ['/d', '/s', '/c', command],
};
}
// Keep default behavior deterministic and POSIX-compatible across environments.
const shell = '/bin/sh';
return {
command: shell,
argv: ['-lc', command],
};
}
function buildShellArgs({
shellCommand,
command,
isWindows,
}: {
shellCommand: string;
command: string;
isWindows: boolean;
}) {
const lowered = shellCommand.toLowerCase();
const looksLikeCmd = lowered.endsWith('cmd') || lowered.endsWith('cmd.exe');
const looksLikePowerShell =
lowered.endsWith('powershell') ||
lowered.endsWith('powershell.exe') ||
lowered.endsWith('pwsh') ||
lowered.endsWith('pwsh.exe');
if (looksLikePowerShell) {
return ['-NoProfile', '-Command', command];
}
if (looksLikeCmd || isWindows) {
return ['/d', '/s', '/c', command];
}
return ['-lc', command];
}

View File

@ -49,6 +49,17 @@ export async function writeStateJson({ env, key, value }) {
await fsp.writeFile(filePath, JSON.stringify(value, null, 2) + '\n', 'utf8');
}
export async function deleteStateJson({ env, key }) {
const stateDir = defaultStateDir(env);
const filePath = keyToPath(stateDir, key);
try {
await fsp.unlink(filePath);
} catch (err) {
if (err?.code === 'ENOENT') return;
throw err;
}
}
export async function diffAndStore({ env, key, value }) {
const before = await readStateJson({ env, key });
const changed = stableStringify(before) !== stableStringify(value);

View File

@ -5,7 +5,9 @@ import { parse as parseYaml } from 'yaml';
import { randomUUID } from 'node:crypto';
import { encodeToken } from '../token.js';
import { readStateJson, writeStateJson } from '../state/store.js';
import { deleteStateJson, readStateJson, writeStateJson } from '../state/store.js';
import { readLineFromStream } from '../read_line.js';
import { resolveInlineShellCommand } from '../shell.js';
export type WorkflowFile = {
name?: string;
@ -143,6 +145,9 @@ export async function runWorkflowFile({
resume?: WorkflowResumePayload;
approved?: boolean;
}): Promise<WorkflowRunResult> {
const consumedResumeStateKey = resume?.stateKey && typeof resume.stateKey === 'string'
? resume.stateKey
: null;
const resumeState = resume?.stateKey
? await loadWorkflowResumeState(ctx.env, resume.stateKey)
: resume ?? null;
@ -198,6 +203,10 @@ export async function runWorkflowFile({
createdAt: new Date().toISOString(),
});
if (consumedResumeStateKey && consumedResumeStateKey !== stateKey) {
await deleteStateJson({ env: ctx.env, key: consumedResumeStateKey });
}
const resumeToken = encodeToken({
protocolVersion: 1,
v: 1,
@ -216,7 +225,9 @@ export async function runWorkflowFile({
}
ctx.stdout.write(`${approval.prompt} [y/N] `);
const answer = await readLine(ctx.stdin);
const answer = await readLineFromStream(ctx.stdin, {
timeoutMs: parseApprovalTimeoutMs(ctx.env),
});
if (!/^y(es)?$/i.test(String(answer).trim())) {
throw new Error('Not approved');
}
@ -225,6 +236,9 @@ export async function runWorkflowFile({
}
const output = lastStepId ? toOutputItems(results[lastStepId]) : [];
if (consumedResumeStateKey) {
await deleteStateJson({ env: ctx.env, key: consumedResumeStateKey });
}
return { status: 'ok', output };
}
@ -468,21 +482,11 @@ function isInteractive(stdin: NodeJS.ReadableStream) {
return Boolean((stdin as NodeJS.ReadStream).isTTY);
}
function readLine(stdin: NodeJS.ReadableStream) {
return new Promise((resolve) => {
let buf = '';
const onData = (chunk: Buffer) => {
buf += chunk.toString('utf8');
const idx = buf.indexOf('\n');
if (idx !== -1) {
stdin.off('data', onData);
resolve(buf.slice(0, idx));
}
};
stdin.on('data', onData);
});
function parseApprovalTimeoutMs(env: Record<string, string | undefined>) {
const raw = env?.LOBSTER_APPROVAL_INPUT_TIMEOUT_MS;
const value = Number(raw);
if (!Number.isFinite(value) || value <= 0) return 0;
return Math.floor(value);
}
async function runShellCommand({
@ -499,7 +503,8 @@ async function runShellCommand({
const { spawn } = await import('node:child_process');
return await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
const child = spawn('/bin/sh', ['-lc', command], {
const shell = resolveInlineShellCommand({ command, env });
const child = spawn(shell.command, shell.argv, {
env,
cwd,
stdio: ['pipe', 'pipe', 'pipe'],

View File

@ -237,3 +237,88 @@ test("email.triage --llm uses llm_task.invoke to draft replies (and can emit dra
await closeServer(server);
}
});
test("email.triage --llm honors OPENCLAW_URL (not just CLAWD_URL)", async () => {
const registry = createDefaultRegistry();
const cacheDir = await mkdtemp(join(tmpdir(), "lobster-cache-"));
const emails = [
{
id: "m1",
threadId: "t1",
from: "Alice <alice@example.com>",
subject: "Quick question",
date: "2026-01-22T07:00:00Z",
snippet: "Hey, can you take a look?",
labels: ["INBOX", "UNREAD"],
},
];
let callCount = 0;
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end("not found");
return;
}
callCount++;
res.writeHead(200, { "content-type": "application/json" });
res.end(
JSON.stringify({
ok: true,
result: {
ok: true,
result: {
runId: "triage_openclaw_url",
output: {
data: {
decisions: [
{
id: "m1",
category: "needs_reply",
reply: { body: "Absolutely — I can help." },
},
],
},
},
},
},
}),
);
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
try {
const input = (async function* () {
for (const e of emails) yield e;
})();
const result = await runPipeline({
pipeline: [{ name: "email.triage", args: { llm: true, limit: 20 }, raw: "" }],
registry,
input,
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env: {
...process.env,
OPENCLAW_URL: `http://127.0.0.1:${port}`,
LOBSTER_CACHE_DIR: cacheDir,
LLM_TASK_FORCE_REFRESH: "1",
},
mode: "tool",
} as any);
assert.equal(callCount, 1);
assert.equal(result.items.length, 1);
assert.equal(result.items[0].mode, "llm");
assert.equal(result.items[0].drafts.length, 1);
} finally {
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});

View File

@ -2,24 +2,34 @@ import test from 'node:test';
import assert from 'node:assert/strict';
import { spawnSync } from 'node:child_process';
import path from 'node:path';
import os from 'node:os';
import { promises as fsp } from 'node:fs';
function runTool(pipeline) {
function runTool(pipeline, env) {
const bin = path.join(process.cwd(), 'bin', 'lobster.js');
const res = spawnSync('node', [bin, 'run', '--mode', 'tool', pipeline], { encoding: 'utf8' });
assert.equal(res.status, 0);
return JSON.parse(res.stdout);
}
function resume(token, approve) {
const bin = path.join(process.cwd(), 'bin', 'lobster.js');
const res = spawnSync('node', [bin, 'resume', '--token', token, '--approve', approve ? 'yes' : 'no'], {
const res = spawnSync('node', [bin, 'run', '--mode', 'tool', pipeline], {
encoding: 'utf8',
env: { ...process.env, ...env },
});
assert.equal(res.status, 0);
return JSON.parse(res.stdout);
}
test('two approve gates can be resumed sequentially', () => {
function resume(token, approve, env) {
const bin = path.join(process.cwd(), 'bin', 'lobster.js');
const res = spawnSync('node', [bin, 'resume', '--token', token, '--approve', approve ? 'yes' : 'no'], {
encoding: 'utf8',
env: { ...process.env, ...env },
});
assert.equal(res.status, 0);
return JSON.parse(res.stdout);
}
test('two approve gates can be resumed sequentially', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-multi-approval-'));
const stateDir = path.join(tmpDir, 'state');
const env = { LOBSTER_STATE_DIR: stateDir };
const pipeline = [
"exec --json --shell \"printf '%s' '[{\\\"x\\\":1}]'\"",
"approve --prompt 'first?'",
@ -27,15 +37,19 @@ test('two approve gates can be resumed sequentially', () => {
'pick x',
].join(' | ');
const first = runTool(pipeline);
const first = runTool(pipeline, env);
assert.equal(first.status, 'needs_approval');
assert.equal(first.requiresApproval.prompt, 'first?');
const second = resume(first.requiresApproval.resumeToken, true);
const second = resume(first.requiresApproval.resumeToken, true, env);
assert.equal(second.status, 'needs_approval');
assert.equal(second.requiresApproval.prompt, 'second?');
const done = resume(second.requiresApproval.resumeToken, true);
const done = resume(second.requiresApproval.resumeToken, true, env);
assert.equal(done.status, 'ok');
assert.deepEqual(done.output, [{ x: 1 }]);
const files = await fsp.readdir(stateDir);
const pipelineResumeFiles = files.filter((name) => name.startsWith('pipeline_resume_'));
assert.deepEqual(pipelineResumeFiles, []);
});

30
test/read_line.test.ts Normal file
View File

@ -0,0 +1,30 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { PassThrough } from 'node:stream';
import { readLineFromStream } from '../src/read_line.js';
test('readLineFromStream resolves on newline', async () => {
const input = new PassThrough();
const promise = readLineFromStream(input);
input.write('yes\n');
input.end();
const value = await promise;
assert.equal(value, 'yes');
});
test('readLineFromStream resolves on end without newline', async () => {
const input = new PassThrough();
const promise = readLineFromStream(input);
input.write('partial');
input.end();
const value = await promise;
assert.equal(value, 'partial');
});
test('readLineFromStream times out when no input arrives', async () => {
const input = new PassThrough();
await assert.rejects(() => readLineFromStream(input, { timeoutMs: 5 }), /Timed out waiting for input/);
});

View File

@ -1,62 +1,82 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { createDefaultRegistry } from '../src/commands/registry.js';
import { parsePipeline } from '../src/parser.js';
import { runPipeline } from '../src/runtime.js';
import { encodeToken, decodeToken } from '../src/token.js';
import { promises as fsp } from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { spawnSync } from 'node:child_process';
function streamOf(items) {
return (async function* () {
for (const item of items) yield item;
})();
import { decodeResumeToken } from '../src/resume.js';
import { encodeToken } from '../src/token.js';
function runCli(args: string[], env: Record<string, string | undefined>) {
const bin = path.join(process.cwd(), 'bin', 'lobster.js');
return spawnSync('node', [bin, ...args], {
encoding: 'utf8',
env: { ...process.env, ...env },
});
}
test('resume token roundtrip and resume pipeline continues', async () => {
const registry = createDefaultRegistry();
test('state-backed resume token roundtrip and resume pipeline continues', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-resume-'));
const stateDir = path.join(tmpDir, 'state');
const pipeline = parsePipeline(
"exec --json --shell \"node -e 'process.stdout.write(JSON.stringify([{a:1}]))'\" | approve --prompt 'ok?' | pick a"
const pipeline =
"exec --json --shell \"node -e 'process.stdout.write(JSON.stringify([{a:1}]))'\" | approve --prompt 'ok?' | pick a";
const first = runCli(['run', '--mode', 'tool', pipeline], { LOBSTER_STATE_DIR: stateDir });
assert.equal(first.status, 0);
const firstJson = JSON.parse(first.stdout);
assert.equal(firstJson.status, 'needs_approval');
assert.ok(firstJson.requiresApproval?.resumeToken);
const payload = decodeResumeToken(firstJson.requiresApproval.resumeToken);
assert.equal(payload.kind, 'pipeline-resume');
assert.equal(typeof payload.stateKey, 'string');
const resumed = runCli(
['resume', '--token', firstJson.requiresApproval.resumeToken, '--approve', 'yes'],
{ LOBSTER_STATE_DIR: stateDir },
);
assert.equal(resumed.status, 0);
const resumedJson = JSON.parse(resumed.stdout);
assert.equal(resumedJson.status, 'ok');
assert.deepEqual(resumedJson.output, [{ a: 1 }]);
});
const first = await runPipeline({
pipeline,
registry,
input: [],
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env: process.env,
mode: 'tool',
});
assert.equal(first.halted, true);
assert.equal(first.items[0].type, 'approval_request');
const token = encodeToken({
test('decodeResumeToken rejects inline executable pipeline tokens', () => {
const forgedToken = encodeToken({
protocolVersion: 1,
v: 1,
pipeline,
resumeAtIndex: (first.haltedAt?.index ?? -1) + 1,
items: first.items[0].items,
prompt: first.items[0].prompt,
pipeline: [{ name: 'exec', args: { shell: 'echo FORGED' }, raw: "exec --shell 'echo FORGED'" }],
resumeAtIndex: 0,
items: [],
prompt: 'ignored',
});
const decoded = decodeToken(token);
assert.equal(decoded.v, 1);
assert.equal(decoded.items.length, 1);
const remaining = decoded.pipeline.slice(decoded.resumeAtIndex);
const resumed = await runPipeline({
pipeline: remaining,
registry,
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env: process.env,
mode: 'tool',
input: streamOf(decoded.items),
});
assert.equal(resumed.halted, false);
assert.deepEqual(resumed.items, [{ a: 1 }]);
assert.throws(() => decodeResumeToken(forgedToken), /Invalid token/);
});
test('resume cancellation cleans up pipeline resume state', async () => {
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-resume-cancel-'));
const stateDir = path.join(tmpDir, 'state');
const pipeline =
"exec --json --shell \"node -e 'process.stdout.write(JSON.stringify([{a:1}]))'\" | approve --prompt 'ok?' | pick a";
const first = runCli(['run', '--mode', 'tool', pipeline], { LOBSTER_STATE_DIR: stateDir });
assert.equal(first.status, 0);
const firstJson = JSON.parse(first.stdout);
assert.equal(firstJson.status, 'needs_approval');
const cancelled = runCli(
['resume', '--token', firstJson.requiresApproval.resumeToken, '--approve', 'no'],
{ LOBSTER_STATE_DIR: stateDir },
);
assert.equal(cancelled.status, 0);
const cancelledJson = JSON.parse(cancelled.stdout);
assert.equal(cancelledJson.status, 'cancelled');
const files = await fsp.readdir(stateDir);
const pipelineResumeFiles = files.filter((name) => name.startsWith('pipeline_resume_'));
assert.deepEqual(pipelineResumeFiles, []);
});

37
test/shell.test.ts Normal file
View File

@ -0,0 +1,37 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { resolveInlineShellCommand } from '../src/shell.js';
test('resolveInlineShellCommand uses POSIX shell by default', () => {
const resolved = resolveInlineShellCommand({
command: 'echo hello',
env: { SHELL: '/bin/zsh' },
platform: 'darwin',
});
assert.equal(resolved.command, '/bin/sh');
assert.deepEqual(resolved.argv, ['-lc', 'echo hello']);
});
test('resolveInlineShellCommand uses cmd on Windows', () => {
const resolved = resolveInlineShellCommand({
command: 'echo hello',
env: { ComSpec: 'C:\\Windows\\System32\\cmd.exe' },
platform: 'win32',
});
assert.equal(resolved.command, 'C:\\Windows\\System32\\cmd.exe');
assert.deepEqual(resolved.argv, ['/d', '/s', '/c', 'echo hello']);
});
test('resolveInlineShellCommand respects powershell override', () => {
const resolved = resolveInlineShellCommand({
command: 'Write-Host hello',
env: { LOBSTER_SHELL: 'pwsh' },
platform: 'linux',
});
assert.equal(resolved.command, 'pwsh');
assert.deepEqual(resolved.argv, ['-NoProfile', '-Command', 'Write-Host hello']);
});

View File

@ -74,4 +74,69 @@ test('workflow file runs with approval and resume', async () => {
assert.equal(resumed.status, 'ok');
assert.deepEqual(resumed.output, [{ done: true, value: 2 }]);
const stateFiles = await fsp.readdir(stateDir);
const resumeStateFiles = stateFiles.filter((name) => name.startsWith('workflow_resume_'));
assert.deepEqual(resumeStateFiles, []);
});
test('workflow resume cancellation cleans up resume state', async () => {
const workflow = {
steps: [
{
id: 'approve_step',
command: "node -e \"process.stdout.write(JSON.stringify({requiresApproval:{prompt:'Proceed?', items:[{id:1}]}}))\"",
approval: 'required',
},
{
id: 'finish',
command: "node -e \"process.stdout.write(JSON.stringify({done:true}))\"",
condition: '$approve_step.approved',
},
],
};
const tmpDir = await fsp.mkdtemp(path.join(os.tmpdir(), 'lobster-workflow-cancel-'));
const stateDir = path.join(tmpDir, 'state');
const filePath = path.join(tmpDir, 'workflow.lobster');
await fsp.writeFile(filePath, JSON.stringify(workflow, null, 2), 'utf8');
const env = { ...process.env, LOBSTER_STATE_DIR: stateDir };
const first = await runWorkflowFile({
filePath,
ctx: {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env,
mode: 'tool',
},
});
assert.equal(first.status, 'needs_approval');
const payload = decodeResumeToken(first.requiresApproval?.resumeToken ?? '');
assert.equal(payload.kind, 'workflow-file');
assert.ok(payload.stateKey);
await fsp.access(path.join(stateDir, `${payload.stateKey}.json`));
const cancelled = await runWorkflowFile({
filePath,
ctx: {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env,
mode: 'tool',
},
resume: payload,
approved: false,
});
assert.equal(cancelled.status, 'ok');
assert.deepEqual(cancelled.output, []);
const files = await fsp.readdir(stateDir);
const resumeStateFiles = files.filter((name) => name.startsWith('workflow_resume_'));
assert.deepEqual(resumeStateFiles, []);
});