lobster/test/llm_task_invoke.test.ts
2026-05-04 01:56:15 +01:00

393 lines
12 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import http from "node:http";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { createDefaultRegistry } from "../src/commands/registry.js";
function streamOf(items: any[]) {
return (async function* () {
for (const item of items) yield item;
})();
}
async function collect(iterable: AsyncIterable<any>) {
const items = [];
for await (const item of iterable) items.push(item);
return items;
}
test("llm_task.invoke posts to /tools/invoke (clawd) and normalizes result", async () => {
const registry = createDefaultRegistry();
const cmd = registry.get("llm_task.invoke");
assert.ok(cmd, "llm_task.invoke should be registered");
const cacheDir = await mkdtemp(path.join(tmpdir(), "lobster-cache-"));
const bodyLog: any[] = [];
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end("nope");
return;
}
let buf = "";
req.setEncoding("utf8");
req.on("data", (d) => (buf += d));
req.on("end", () => {
const parsed = JSON.parse(buf || "{}");
bodyLog.push(parsed);
res.writeHead(200, { "content-type": "application/json" });
res.end(
JSON.stringify({
ok: true,
result: {
ok: true,
result: {
runId: "task_1",
model: parsed.args?.model,
prompt: parsed.args?.prompt,
output: {
text: "done",
data: { summary: "hello world" },
},
usage: { inputTokens: 12, outputTokens: 2, totalTokens: 14 },
},
},
}),
);
});
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
try {
const result = await cmd.run({
input: streamOf([{ kind: "text", text: "doc" }]),
args: {
_: [],
token: "test-token",
model: "claude-3-sonnet",
prompt: "Summarize",
},
ctx: baseCtx(
{ LOBSTER_CACHE_DIR: cacheDir, CLAWD_URL: `http://localhost:${port}` },
registry,
),
} as any);
const items = await collect(result.output!);
assert.equal(items.length, 1);
const payload = items[0];
assert.equal(payload.kind, "llm_task.invoke");
assert.equal(payload.runId, "task_1");
assert.equal(payload.output.data.summary, "hello world");
assert.equal(payload.model, "claude-3-sonnet");
assert.equal(payload.source, "clawd");
assert.equal(payload.cached, false);
assert.ok(payload.cacheKey);
assert.equal(bodyLog.length, 1);
assert.equal(bodyLog[0].tool, "llm-task");
assert.equal(bodyLog[0].action, "invoke");
assert.equal(bodyLog[0].args.prompt, "Summarize");
assert.equal(bodyLog[0].args.model, "claude-3-sonnet");
assert.equal(bodyLog[0].args.artifacts.length, 1);
assert.equal(bodyLog[0].args.artifactHashes.length, 1);
} finally {
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});
test("llm_task.invoke retries when schema validation fails", async () => {
const registry = createDefaultRegistry();
const cmd = registry.get("llm_task.invoke");
assert.ok(cmd);
const cacheDir = await mkdtemp(path.join(tmpdir(), "lobster-cache-"));
let calls = 0;
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end();
return;
}
calls += 1;
const valid = calls >= 2;
const payload = {
ok: true,
result: {
ok: true,
result: {
runId: `attempt_${calls}`,
output: valid ? { data: { decision: "send" } } : { data: { foo: "bar" } },
},
},
};
res.writeHead(200, { "content-type": "application/json" });
res.end(JSON.stringify(payload));
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
try {
const result = await cmd.run({
input: streamOf([]),
args: {
_: [],
model: "claude-3-opus",
prompt: "Decide",
"output-schema": '{"type":"object","required":["decision"]}',
"max-validation-retries": 2,
},
ctx: baseCtx(
{ LOBSTER_CACHE_DIR: cacheDir, CLAWD_URL: `http://localhost:${port}` },
registry,
),
} as any);
const items = await collect(result.output!);
assert.equal(items.length, 1);
assert.equal(items[0].runId, "attempt_2");
assert.equal(items[0].output.data.decision, "send");
assert.equal(calls, 2);
} finally {
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});
test("llm_task.invoke persists to run state so resume skips remote call", async () => {
const stateDir = await mkdtemp(path.join(tmpdir(), "lobster-state-"));
const registry = createDefaultRegistry();
const cmd = registry.get("llm_task.invoke");
assert.ok(cmd);
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end("not found");
return;
}
let buf = "";
req.setEncoding("utf8");
req.on("data", (d) => (buf += d));
req.on("end", () => {
void buf;
res.writeHead(200, { "content-type": "application/json" });
res.end(
JSON.stringify({
ok: true,
result: { ok: true, result: { runId: "state_run", output: { data: { ok: true } } } },
}),
);
});
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
const cacheDir = await mkdtemp(path.join(tmpdir(), "lobster-cache-"));
const ctxEnv = { LOBSTER_STATE_DIR: stateDir, LOBSTER_CACHE_DIR: cacheDir };
try {
const first = await cmd.run({
input: streamOf([{ foo: "bar" }]),
args: {
_: [],
model: "claude",
prompt: "Do thing",
"state-key": "run123",
},
ctx: baseCtx({ ...ctxEnv, CLAWD_URL: `http://localhost:${port}` }, registry),
} as any);
const firstItems = await collect(first.output!);
assert.equal(firstItems[0].source, "clawd");
await closeServer(server);
const second = await cmd.run({
input: streamOf([{ foo: "bar" }]),
args: {
_: [],
model: "claude",
prompt: "Do thing",
"state-key": "run123",
},
ctx: baseCtx({ ...ctxEnv, CLAWD_URL: `http://localhost:${port}` }, registry),
} as any);
const secondItems = await collect(second.output!);
assert.equal(secondItems.length, 1);
assert.equal(secondItems[0].source, "run_state");
} finally {
await rm(stateDir, { recursive: true, force: true });
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});
test("llm_task.invoke reuses file cache when URL unavailable", async () => {
const cacheDir = await mkdtemp(path.join(tmpdir(), "lobster-cache-"));
const registry = createDefaultRegistry();
const cmd = registry.get("llm_task.invoke");
assert.ok(cmd);
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end("not found");
return;
}
let buf = "";
req.setEncoding("utf8");
req.on("data", (d) => (buf += d));
req.on("end", () => {
void buf;
res.writeHead(200, { "content-type": "application/json" });
res.end(
JSON.stringify({
ok: true,
result: { ok: true, result: { runId: "cache_run", output: { text: "cached" } } },
}),
);
});
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
const ctxEnv = { LOBSTER_CACHE_DIR: cacheDir, CLAWD_URL: `http://localhost:${port}` };
try {
const first = await cmd.run({
input: streamOf([]),
args: {
_: [],
model: "claude",
prompt: "Cache me",
},
ctx: baseCtx({ ...ctxEnv, CLAWD_URL: `http://localhost:${port}` }, registry),
} as any);
const firstItems = await collect(first.output!);
assert.equal(firstItems[0].source, "clawd");
await closeServer(server);
const second = await cmd.run({
input: streamOf([]),
args: {
_: [],
model: "claude",
prompt: "Cache me",
},
ctx: baseCtx({ ...ctxEnv, CLAWD_URL: `http://localhost:${port}` }, registry),
} as any);
const secondItems = await collect(second.output!);
assert.equal(secondItems.length, 1);
assert.equal(secondItems[0].source, "cache");
assert.equal(secondItems[0].cached, true);
} finally {
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});
test("llm_task.invoke uses CLAWD_URL (/tools/invoke) without requiring --url/--model", async () => {
const registry = createDefaultRegistry();
const cmd = registry.get("llm_task.invoke");
assert.ok(cmd);
const cacheDir = await mkdtemp(path.join(tmpdir(), "lobster-cache-"));
const bodyLog: any[] = [];
const server = http.createServer((req, res) => {
if (req.method !== "POST" || req.url !== "/tools/invoke") {
res.writeHead(404);
res.end("not found");
return;
}
let buf = "";
req.setEncoding("utf8");
req.on("data", (d) => (buf += d));
req.on("end", () => {
const parsed = JSON.parse(buf || "{}");
bodyLog.push(parsed);
// This is the OpenClaw tool router envelope.
res.writeHead(200, { "content-type": "application/json" });
res.end(
JSON.stringify({
ok: true,
result: {
ok: true,
result: {
runId: "task_clawd_1",
output: { data: { hello: "world" } },
},
},
}),
);
});
});
await new Promise<void>((resolve) => server.listen(0, resolve));
const addr = server.address();
const port = typeof addr === "object" && addr ? addr.port : 0;
try {
const result = await cmd.run({
input: streamOf([{ kind: "text", text: "doc" }]),
args: {
_: [],
// no url, no model
prompt: "Summarize",
refresh: true,
},
ctx: baseCtx(
{ CLAWD_URL: `http://localhost:${port}`, LOBSTER_CACHE_DIR: cacheDir },
registry,
),
} as any);
const items = await collect(result.output!);
assert.equal(items.length, 1);
assert.equal(items[0].source, "clawd");
assert.equal(items[0].cached, false);
assert.equal(items[0].runId, "task_clawd_1");
assert.equal(items[0].output.data.hello, "world");
assert.equal(bodyLog.length, 1);
assert.equal(bodyLog[0].tool, "llm-task");
assert.equal(bodyLog[0].action, "invoke");
assert.equal(bodyLog[0].args.prompt, "Summarize");
assert.ok(Array.isArray(bodyLog[0].args.artifactHashes));
} finally {
await rm(cacheDir, { recursive: true, force: true });
await closeServer(server);
}
});
function baseCtx(envOverrides: Record<string, string>, registry?) {
return {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
env: { ...process.env, ...envOverrides },
registry: registry ?? null,
mode: "tool",
render: { json() {}, lines() {} },
};
}
async function closeServer(server: http.Server) {
if (!server.listening) return;
await new Promise<void>((resolve) => server.close(() => resolve()));
}