feat: send dashboard turns over direct gateway rpc

This commit is contained in:
Shakker 2026-05-07 08:33:44 +01:00
parent fe3d40552f
commit 9d9ff32d32
No known key found for this signature in database
2 changed files with 318 additions and 72 deletions

View File

@ -1,5 +1,12 @@
import { dirname } from "node:path";
import { execFileSync } from "node:child_process";
import { randomUUID } from "node:crypto";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
const GATEWAY_PROTOCOL_VERSION = 3;
const GATEWAY_RPC_CLIENT_ID = "gateway-client";
const GATEWAY_RPC_CLIENT_MODE = "backend";
const GATEWAY_OPERATOR_SCOPES = ["operator.read", "operator.write"];
export function prepareOpenClawRuntimeFromOcmEnv(envName) {
if (!envName) {
@ -80,6 +87,231 @@ export function runOcmJson(args) {
}
}
export async function openDirectGatewayRpcClient(runtimeContext) {
if (typeof WebSocket !== "function") {
return {
client: null,
transport: "shell",
fallbackReason: "websocket-unavailable"
};
}
const token = readGatewayAuthToken(runtimeContext.root);
if (!token) {
return {
client: null,
transport: "shell",
fallbackReason: "gateway-token-unavailable"
};
}
const client = new DirectGatewayRpcClient({
url: `ws://127.0.0.1:${runtimeContext.gatewayPort}`,
token
});
await client.connect();
return {
client,
transport: "direct-gateway-rpc",
fallbackReason: null
};
}
class DirectGatewayRpcClient {
constructor({ url, token }) {
this.url = url;
this.token = token;
this.ws = null;
this.pending = new Map();
this.connectStarted = false;
this.connected = false;
}
async connect(timeoutMs = 15000) {
if (this.connected) {
return;
}
const ws = new WebSocket(this.url);
this.ws = ws;
await new Promise((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
reject(new Error(`gateway direct RPC connect timeout after ${timeoutMs}ms`));
this.close();
}, timeoutMs);
const cleanup = () => {
clearTimeout(timer);
ws.removeEventListener("message", onMessage);
ws.removeEventListener("close", onClose);
ws.removeEventListener("error", onError);
};
const onClose = () => {
cleanup();
reject(new Error("gateway direct RPC closed before connect"));
};
const onError = () => {
cleanup();
reject(new Error("gateway direct RPC socket error before connect"));
};
const onMessage = (event) => {
void this.handleMessage(event)
.then((frame) => {
if (frame?.type === "event" && frame.event === "connect.challenge" && !this.connectStarted) {
this.connectStarted = true;
void this.request("connect", this.buildConnectParams(), { timeoutMs })
.then(() => {
this.connected = true;
cleanup();
ws.addEventListener("message", (messageEvent) => {
void this.handleMessage(messageEvent);
});
ws.addEventListener("close", () => {
this.rejectPending(new Error("gateway direct RPC closed"));
});
ws.addEventListener("error", () => {
this.rejectPending(new Error("gateway direct RPC socket error"));
});
resolve();
})
.catch((error) => {
cleanup();
reject(error);
});
}
})
.catch((error) => {
cleanup();
reject(error);
});
};
ws.addEventListener("message", onMessage);
ws.addEventListener("close", onClose);
ws.addEventListener("error", onError);
});
}
buildConnectParams() {
return {
minProtocol: GATEWAY_PROTOCOL_VERSION,
maxProtocol: GATEWAY_PROTOCOL_VERSION,
client: {
id: GATEWAY_RPC_CLIENT_ID,
displayName: "Kova Gateway RPC",
version: "kova",
platform: process.platform,
mode: GATEWAY_RPC_CLIENT_MODE,
instanceId: `kova-${randomUUID()}`
},
caps: [],
role: "operator",
scopes: GATEWAY_OPERATOR_SCOPES,
auth: {
token: this.token
}
};
}
async request(method, params, { timeoutMs = 15000 } = {}) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("gateway direct RPC is not connected");
}
const id = `kova-${randomUUID()}`;
const frame = { type: "req", id, method, params };
const response = new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`gateway direct RPC request timeout for ${method}`));
}, timeoutMs);
this.pending.set(id, {
method,
resolve,
reject,
timer
});
});
this.ws.send(JSON.stringify(frame));
return await response;
}
async handleMessage(event) {
const raw = await readWebSocketData(event.data);
const frame = JSON.parse(raw);
if (frame?.type !== "res") {
return frame;
}
const pending = this.pending.get(frame.id);
if (!pending) {
return frame;
}
this.pending.delete(frame.id);
clearTimeout(pending.timer);
if (frame.ok) {
pending.resolve(frame.payload);
} else {
pending.reject(new Error(formatGatewayRpcError(pending.method, frame.error)));
}
return frame;
}
close() {
this.rejectPending(new Error("gateway direct RPC closed"));
if (!this.ws) {
return;
}
try {
this.ws.close();
} catch {}
this.ws = null;
}
rejectPending(error) {
for (const [id, pending] of this.pending.entries()) {
clearTimeout(pending.timer);
pending.reject(error);
this.pending.delete(id);
}
}
}
async function readWebSocketData(data) {
if (typeof data === "string") {
return data;
}
if (data instanceof ArrayBuffer) {
return Buffer.from(data).toString("utf8");
}
if (ArrayBuffer.isView(data)) {
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8");
}
if (typeof Blob !== "undefined" && data instanceof Blob) {
return await data.text();
}
return String(data);
}
function formatGatewayRpcError(method, error) {
const message = typeof error?.message === "string" && error.message.trim()
? error.message.trim()
: "gateway request failed";
const code = typeof error?.code === "string" && error.code.trim() ? ` ${error.code.trim()}` : "";
return `${method}${code}: ${message}`;
}
function readGatewayAuthToken(root) {
const configPath = process.env.OPENCLAW_CONFIG_PATH || join(root, ".openclaw", "openclaw.json");
let config;
try {
config = JSON.parse(readFileSync(configPath, "utf8"));
} catch {
return null;
}
const authToken = config?.gateway?.auth?.token;
if (typeof authToken === "string" && authToken.length > 0) {
return authToken;
}
const remoteToken = config?.gateway?.remote?.token;
return typeof remoteToken === "string" && remoteToken.length > 0 ? remoteToken : null;
}
function readRequiredString(value, label) {
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`${label} missing`);

View File

@ -5,6 +5,7 @@ import {
extractText,
failJson,
finishJson,
openDirectGatewayRpcClient,
parseSupportArgs,
prepareOpenClawRuntimeFromOcmEnv,
readTimeoutMs,
@ -23,81 +24,91 @@ try {
const sessionKey = args["session-key"] ?? `kova-dashboard-${randomUUID()}`;
const createSession = readBoolean(args["create-session"], true);
const minAssistantCount = readPositiveInteger(args["min-assistant-count"], 1);
const gatewayTransport = await openDirectGatewayRpcClient(runtimeContext);
let created = null;
let sessionCreateStartedAtEpochMs = null;
let sessionCreateFinishedAtEpochMs = null;
if (createSession) {
sessionCreateStartedAtEpochMs = Date.now();
created = gatewayCall(runtimeContext.envName, "sessions.create", {
agentId: "main",
key: sessionKey,
label: "Kova Dashboard Session Send"
}, Math.min(timeoutMs, 60000));
sessionCreateFinishedAtEpochMs = Date.now();
}
const canonicalKey = created?.key ?? sessionKey;
const sendStartedAtEpochMs = Date.now();
const sent = gatewayCall(runtimeContext.envName, "sessions.send", {
key: canonicalKey,
message,
thinking: "off",
try {
let created = null;
let sessionCreateStartedAtEpochMs = null;
let sessionCreateFinishedAtEpochMs = null;
if (createSession) {
sessionCreateStartedAtEpochMs = Date.now();
created = await gatewayCall(runtimeContext, gatewayTransport, "sessions.create", {
agentId: "main",
key: sessionKey,
label: "Kova Dashboard Session Send"
}, Math.min(timeoutMs, 60000));
sessionCreateFinishedAtEpochMs = Date.now();
}
const canonicalKey = created?.key ?? sessionKey;
const sendStartedAtEpochMs = Date.now();
const sent = await gatewayCall(runtimeContext, gatewayTransport, "sessions.send", {
key: canonicalKey,
message,
thinking: "off",
timeoutMs,
idempotencyKey: `kova-dashboard-${randomUUID()}`
}, Math.min(timeoutMs, 60000));
const sendFinishedAtEpochMs = Date.now();
const runId = typeof sent?.runId === "string" ? sent.runId : null;
const history = await waitForAssistantText({
runtimeContext,
gatewayTransport,
sessionKey: canonicalKey,
expectedText,
timeoutMs,
idempotencyKey: `kova-dashboard-${randomUUID()}`
}, Math.min(timeoutMs, 60000));
const sendFinishedAtEpochMs = Date.now();
const runId = typeof sent?.runId === "string" ? sent.runId : null;
minAssistantCount
});
const finishedAtEpochMs = Date.now();
const activeFinishedAtEpochMs = history.assistantMatchedAtEpochMs ?? finishedAtEpochMs;
const history = await waitForAssistantText({
envName: runtimeContext.envName,
sessionKey: canonicalKey,
expectedText,
timeoutMs,
minAssistantCount
});
const finishedAtEpochMs = Date.now();
const activeFinishedAtEpochMs = history.assistantMatchedAtEpochMs ?? finishedAtEpochMs;
finishJson({
ok: true,
surface: "dashboard-session-send-turn",
method: "sessions.send",
createSession,
minAssistantCount,
envName: runtimeContext.envName,
runtime: runtimeContext.runtime,
sessionKey: canonicalKey,
runId,
startedAtEpochMs,
sessionCreateStartedAtEpochMs,
sessionCreateFinishedAtEpochMs,
sessionCreateDurationMs: sessionCreateStartedAtEpochMs === null || sessionCreateFinishedAtEpochMs === null
? null
: sessionCreateFinishedAtEpochMs - sessionCreateStartedAtEpochMs,
sendStartedAtEpochMs,
sendFinishedAtEpochMs,
sendDurationMs: sendFinishedAtEpochMs - sendStartedAtEpochMs,
activeStartedAtEpochMs: sendStartedAtEpochMs,
activeFinishedAtEpochMs,
activeTurnMs: activeFinishedAtEpochMs - sendStartedAtEpochMs,
finishedAtEpochMs,
assistantFirstSeenAtEpochMs: history.assistantFirstSeenAtEpochMs,
assistantMatchedAtEpochMs: history.assistantMatchedAtEpochMs,
timeToFirstAssistantMs: history.assistantFirstSeenAtEpochMs === null ? null : history.assistantFirstSeenAtEpochMs - sendStartedAtEpochMs,
timeToMatchedAssistantMs: history.assistantMatchedAtEpochMs === null ? null : history.assistantMatchedAtEpochMs - sendStartedAtEpochMs,
historyPollCount: history.pollCount,
historyErrorCount: history.errorCount,
lastHistoryError: history.lastHistoryErrorMessage,
finalAssistantVisibleText: history.matchedAssistantText,
finalAssistantRawText: history.lastAssistantText,
assistantMessageCount: history.assistantTexts.length,
expectedTextPresent: history.matchedAssistantText.includes(expectedText)
});
finishJson({
ok: true,
surface: "dashboard-session-send-turn",
method: "sessions.send",
createSession,
minAssistantCount,
envName: runtimeContext.envName,
runtime: runtimeContext.runtime,
gatewayTransport: {
kind: gatewayTransport.transport,
fallbackReason: gatewayTransport.fallbackReason
},
sessionKey: canonicalKey,
runId,
startedAtEpochMs,
sessionCreateStartedAtEpochMs,
sessionCreateFinishedAtEpochMs,
sessionCreateDurationMs: sessionCreateStartedAtEpochMs === null || sessionCreateFinishedAtEpochMs === null
? null
: sessionCreateFinishedAtEpochMs - sessionCreateStartedAtEpochMs,
sendStartedAtEpochMs,
sendFinishedAtEpochMs,
sendDurationMs: sendFinishedAtEpochMs - sendStartedAtEpochMs,
activeStartedAtEpochMs: sendStartedAtEpochMs,
activeFinishedAtEpochMs,
activeTurnMs: activeFinishedAtEpochMs - sendStartedAtEpochMs,
finishedAtEpochMs,
assistantFirstSeenAtEpochMs: history.assistantFirstSeenAtEpochMs,
assistantMatchedAtEpochMs: history.assistantMatchedAtEpochMs,
timeToFirstAssistantMs: history.assistantFirstSeenAtEpochMs === null ? null : history.assistantFirstSeenAtEpochMs - sendStartedAtEpochMs,
timeToMatchedAssistantMs: history.assistantMatchedAtEpochMs === null ? null : history.assistantMatchedAtEpochMs - sendStartedAtEpochMs,
historyPollCount: history.pollCount,
historyErrorCount: history.errorCount,
lastHistoryError: history.lastHistoryErrorMessage,
finalAssistantVisibleText: history.matchedAssistantText,
finalAssistantRawText: history.lastAssistantText,
assistantMessageCount: history.assistantTexts.length,
expectedTextPresent: history.matchedAssistantText.includes(expectedText)
});
} finally {
gatewayTransport.client?.close();
}
} catch (error) {
failJson(error, { surface: "dashboard-session-send-turn", finishedAtEpochMs: Date.now() });
}
async function waitForAssistantText({ envName, sessionKey, expectedText, timeoutMs, minAssistantCount }) {
async function waitForAssistantText({ runtimeContext, gatewayTransport, sessionKey, expectedText, timeoutMs, minAssistantCount }) {
const deadline = Date.now() + timeoutMs;
let lastAssistantText = "";
let lastHistoryError = null;
@ -108,7 +119,7 @@ async function waitForAssistantText({ envName, sessionKey, expectedText, timeout
while (Date.now() < deadline) {
try {
pollCount += 1;
const history = gatewayCall(envName, "chat.history", { sessionKey, limit: 16 }, Math.min(15000, Math.max(1000, deadline - Date.now())));
const history = await gatewayCall(runtimeContext, gatewayTransport, "chat.history", { sessionKey, limit: 16 }, Math.min(15000, Math.max(1000, deadline - Date.now())));
lastHistoryError = null;
assistantTexts = extractAssistantTexts(history?.messages ?? []);
lastAssistantText = assistantTexts.at(-1) ?? "";
@ -140,9 +151,12 @@ async function waitForAssistantText({ envName, sessionKey, expectedText, timeout
);
}
function gatewayCall(envName, method, params, timeoutMs) {
async function gatewayCall(runtimeContext, gatewayTransport, method, params, timeoutMs) {
if (gatewayTransport.client) {
return await gatewayTransport.client.request(method, params, { timeoutMs });
}
return runOcmJson([
`@${envName}`,
`@${runtimeContext.envName}`,
"--",
"gateway",
"call",