354 lines
10 KiB
JavaScript
354 lines
10 KiB
JavaScript
import { execFileSync } from "node:child_process";
|
|
import { randomUUID } from "node:crypto";
|
|
import { readFileSync } from "node:fs";
|
|
import { dirname, join } from "node:path";
|
|
|
|
const GATEWAY_PROTOCOL_MIN_VERSION = 3;
|
|
const GATEWAY_PROTOCOL_MAX_VERSION = 4;
|
|
const GATEWAY_RPC_CLIENT_ID = "gateway-client";
|
|
const GATEWAY_RPC_CLIENT_MODE = "backend";
|
|
const GATEWAY_OPERATOR_SCOPES = ["operator.read", "operator.write"];
|
|
|
|
export function prepareOpenClawRuntimeFromOcmEnv(envName) {
|
|
if (!envName) {
|
|
throw new Error("--env is required");
|
|
}
|
|
const status = runOcmJson(["env", "status", envName, "--json"]);
|
|
const resolved = runOcmJson(["env", "resolve", envName, "--json", "--", "status"]);
|
|
const root = readRequiredString(status.root, "ocm env status root");
|
|
const port = Number(status.gatewayPort);
|
|
const binaryPath = readRequiredString(resolved.binaryPath, "ocm env resolve binaryPath");
|
|
if (!Number.isInteger(port) || port <= 0) {
|
|
throw new Error(`invalid gateway port from OCM status: ${JSON.stringify(status.gatewayPort)}`);
|
|
}
|
|
const packageRoot = dirname(binaryPath);
|
|
process.env.OPENCLAW_HOME = root;
|
|
process.env.OPENCLAW_GATEWAY_PORT = String(port);
|
|
process.chdir(packageRoot);
|
|
return {
|
|
envName,
|
|
root,
|
|
gatewayPort: port,
|
|
binaryPath,
|
|
packageRoot,
|
|
runtime: {
|
|
bindingKind: resolved.bindingKind ?? null,
|
|
bindingName: resolved.bindingName ?? null,
|
|
releaseVersion: resolved.runtimeReleaseVersion ?? null,
|
|
releaseChannel: resolved.runtimeReleaseChannel ?? null,
|
|
sourceKind: resolved.runtimeSourceKind ?? null
|
|
}
|
|
};
|
|
}
|
|
|
|
export function parseSupportArgs(argv) {
|
|
const parsed = {};
|
|
for (let index = 0; index < argv.length; index += 1) {
|
|
const arg = argv[index];
|
|
if (!arg.startsWith("--")) {
|
|
throw new Error(`unexpected argument: ${arg}`);
|
|
}
|
|
const key = arg.slice(2);
|
|
const value = argv[index + 1];
|
|
if (!value || value.startsWith("--")) {
|
|
throw new Error(`${arg} requires a value`);
|
|
}
|
|
parsed[key] = value;
|
|
index += 1;
|
|
}
|
|
return parsed;
|
|
}
|
|
|
|
export function readTimeoutMs(value, fallbackMs) {
|
|
if (value === undefined) {
|
|
return fallbackMs;
|
|
}
|
|
const parsed = Number(value);
|
|
if (!Number.isInteger(parsed) || parsed <= 0) {
|
|
throw new Error(`invalid timeout: ${value}`);
|
|
}
|
|
return parsed;
|
|
}
|
|
|
|
export function runOcmJson(args) {
|
|
let stdout = "";
|
|
try {
|
|
stdout = execFileSync("ocm", args, {
|
|
encoding: "utf8",
|
|
stdio: ["ignore", "pipe", "pipe"]
|
|
});
|
|
} catch (error) {
|
|
const stderr = error?.stderr ? String(error.stderr) : "";
|
|
throw new Error(`ocm ${args.join(" ")} failed: ${stderr.trim() || error.message}`);
|
|
}
|
|
try {
|
|
return JSON.parse(stdout);
|
|
} catch {
|
|
throw new Error(`ocm ${args.join(" ")} did not return JSON: ${stdout.slice(0, 1000)}`);
|
|
}
|
|
}
|
|
|
|
export async function openDirectGatewayRpcClient(runtimeContext) {
|
|
if (typeof WebSocket !== "function") {
|
|
return {
|
|
client: null,
|
|
transport: "shell",
|
|
fallbackReason: "websocket-unavailable"
|
|
};
|
|
}
|
|
const token = readGatewayAuthToken(runtimeContext.root);
|
|
if (!token) {
|
|
return {
|
|
client: null,
|
|
transport: "shell",
|
|
fallbackReason: "gateway-token-unavailable"
|
|
};
|
|
}
|
|
|
|
const client = new DirectGatewayRpcClient({
|
|
url: `ws://127.0.0.1:${runtimeContext.gatewayPort}`,
|
|
token
|
|
});
|
|
await client.connect();
|
|
return {
|
|
client,
|
|
transport: "direct-gateway-rpc",
|
|
fallbackReason: null
|
|
};
|
|
}
|
|
|
|
class DirectGatewayRpcClient {
|
|
constructor({ url, token }) {
|
|
this.url = url;
|
|
this.token = token;
|
|
this.ws = null;
|
|
this.pending = new Map();
|
|
this.connectStarted = false;
|
|
this.connected = false;
|
|
}
|
|
|
|
async connect(timeoutMs = 15000) {
|
|
if (this.connected) {
|
|
return;
|
|
}
|
|
const ws = new WebSocket(this.url);
|
|
this.ws = ws;
|
|
await new Promise((resolve, reject) => {
|
|
const timer = setTimeout(() => {
|
|
cleanup();
|
|
reject(new Error(`gateway direct RPC connect timeout after ${timeoutMs}ms`));
|
|
this.close();
|
|
}, timeoutMs);
|
|
const cleanup = () => {
|
|
clearTimeout(timer);
|
|
ws.removeEventListener("message", onMessage);
|
|
ws.removeEventListener("close", onClose);
|
|
ws.removeEventListener("error", onError);
|
|
};
|
|
const onClose = () => {
|
|
cleanup();
|
|
reject(new Error("gateway direct RPC closed before connect"));
|
|
};
|
|
const onError = () => {
|
|
cleanup();
|
|
reject(new Error("gateway direct RPC socket error before connect"));
|
|
};
|
|
const onMessage = (event) => {
|
|
void this.handleMessage(event)
|
|
.then((frame) => {
|
|
if (frame?.type === "event" && frame.event === "connect.challenge" && !this.connectStarted) {
|
|
this.connectStarted = true;
|
|
void this.request("connect", this.buildConnectParams(), { timeoutMs })
|
|
.then(() => {
|
|
this.connected = true;
|
|
cleanup();
|
|
ws.addEventListener("message", (messageEvent) => {
|
|
void this.handleMessage(messageEvent);
|
|
});
|
|
ws.addEventListener("close", () => {
|
|
this.rejectPending(new Error("gateway direct RPC closed"));
|
|
});
|
|
ws.addEventListener("error", () => {
|
|
this.rejectPending(new Error("gateway direct RPC socket error"));
|
|
});
|
|
resolve();
|
|
})
|
|
.catch((error) => {
|
|
cleanup();
|
|
reject(error);
|
|
});
|
|
}
|
|
})
|
|
.catch((error) => {
|
|
cleanup();
|
|
reject(error);
|
|
});
|
|
};
|
|
ws.addEventListener("message", onMessage);
|
|
ws.addEventListener("close", onClose);
|
|
ws.addEventListener("error", onError);
|
|
});
|
|
}
|
|
|
|
buildConnectParams() {
|
|
return {
|
|
minProtocol: GATEWAY_PROTOCOL_MIN_VERSION,
|
|
maxProtocol: GATEWAY_PROTOCOL_MAX_VERSION,
|
|
client: {
|
|
id: GATEWAY_RPC_CLIENT_ID,
|
|
displayName: "Kova Gateway RPC",
|
|
version: "kova",
|
|
platform: process.platform,
|
|
mode: GATEWAY_RPC_CLIENT_MODE,
|
|
instanceId: `kova-${randomUUID()}`
|
|
},
|
|
caps: [],
|
|
role: "operator",
|
|
scopes: GATEWAY_OPERATOR_SCOPES,
|
|
auth: {
|
|
token: this.token
|
|
}
|
|
};
|
|
}
|
|
|
|
async request(method, params, { timeoutMs = 15000 } = {}) {
|
|
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
|
throw new Error("gateway direct RPC is not connected");
|
|
}
|
|
const id = `kova-${randomUUID()}`;
|
|
const frame = { type: "req", id, method, params };
|
|
const response = new Promise((resolve, reject) => {
|
|
const timer = setTimeout(() => {
|
|
this.pending.delete(id);
|
|
reject(new Error(`gateway direct RPC request timeout for ${method}`));
|
|
}, timeoutMs);
|
|
this.pending.set(id, {
|
|
method,
|
|
resolve,
|
|
reject,
|
|
timer
|
|
});
|
|
});
|
|
this.ws.send(JSON.stringify(frame));
|
|
return await response;
|
|
}
|
|
|
|
async handleMessage(event) {
|
|
const raw = await readWebSocketData(event.data);
|
|
const frame = JSON.parse(raw);
|
|
if (frame?.type !== "res") {
|
|
return frame;
|
|
}
|
|
const pending = this.pending.get(frame.id);
|
|
if (!pending) {
|
|
return frame;
|
|
}
|
|
this.pending.delete(frame.id);
|
|
clearTimeout(pending.timer);
|
|
if (frame.ok) {
|
|
pending.resolve(frame.payload);
|
|
} else {
|
|
pending.reject(new Error(formatGatewayRpcError(pending.method, frame.error)));
|
|
}
|
|
return frame;
|
|
}
|
|
|
|
close() {
|
|
this.rejectPending(new Error("gateway direct RPC closed"));
|
|
if (!this.ws) {
|
|
return;
|
|
}
|
|
try {
|
|
this.ws.close();
|
|
} catch {}
|
|
this.ws = null;
|
|
}
|
|
|
|
rejectPending(error) {
|
|
for (const [id, pending] of this.pending.entries()) {
|
|
clearTimeout(pending.timer);
|
|
pending.reject(error);
|
|
this.pending.delete(id);
|
|
}
|
|
}
|
|
}
|
|
|
|
async function readWebSocketData(data) {
|
|
if (typeof data === "string") {
|
|
return data;
|
|
}
|
|
if (data instanceof ArrayBuffer) {
|
|
return Buffer.from(data).toString("utf8");
|
|
}
|
|
if (ArrayBuffer.isView(data)) {
|
|
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8");
|
|
}
|
|
if (typeof Blob !== "undefined" && data instanceof Blob) {
|
|
return await data.text();
|
|
}
|
|
return String(data);
|
|
}
|
|
|
|
function formatGatewayRpcError(method, error) {
|
|
const message = typeof error?.message === "string" && error.message.trim()
|
|
? error.message.trim()
|
|
: "gateway request failed";
|
|
const code = typeof error?.code === "string" && error.code.trim() ? ` ${error.code.trim()}` : "";
|
|
return `${method}${code}: ${message}`;
|
|
}
|
|
|
|
function readGatewayAuthToken(root) {
|
|
const configPath = process.env.OPENCLAW_CONFIG_PATH || join(root, ".openclaw", "openclaw.json");
|
|
let config;
|
|
try {
|
|
config = JSON.parse(readFileSync(configPath, "utf8"));
|
|
} catch {
|
|
return null;
|
|
}
|
|
const authToken = config?.gateway?.auth?.token;
|
|
if (typeof authToken === "string" && authToken.length > 0) {
|
|
return authToken;
|
|
}
|
|
const remoteToken = config?.gateway?.remote?.token;
|
|
return typeof remoteToken === "string" && remoteToken.length > 0 ? remoteToken : null;
|
|
}
|
|
|
|
function readRequiredString(value, label) {
|
|
if (typeof value !== "string" || value.trim().length === 0) {
|
|
throw new Error(`${label} missing`);
|
|
}
|
|
return value;
|
|
}
|
|
|
|
export function extractText(value) {
|
|
if (typeof value === "string") {
|
|
return value;
|
|
}
|
|
if (!value || typeof value !== "object") {
|
|
return "";
|
|
}
|
|
if (Array.isArray(value)) {
|
|
return value.map(extractText).filter(Boolean).join("\n");
|
|
}
|
|
for (const key of ["finalAssistantVisibleText", "finalAssistantRawText", "text", "content", "reply"]) {
|
|
if (typeof value[key] === "string") {
|
|
return value[key];
|
|
}
|
|
}
|
|
return Object.values(value).map(extractText).filter(Boolean).join("\n");
|
|
}
|
|
|
|
export async function sleep(ms) {
|
|
await new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
export function finishJson(payload) {
|
|
process.stdout.write(`${JSON.stringify(payload, null, 2)}\n`);
|
|
}
|
|
|
|
export function failJson(error, extra = {}) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
process.stdout.write(`${JSON.stringify({ ok: false, error: message, ...extra }, null, 2)}\n`);
|
|
process.exit(1);
|
|
}
|