From 73fa3fa387268d51300ba5eb140f83db70f3e1c3 Mon Sep 17 00:00:00 2001 From: Benjamin Grosse Date: Wed, 12 Nov 2025 23:58:57 +0100 Subject: [PATCH] Fix with windows pipe failure --- src/daemon/client.ts | 19 ++-- src/daemon/host.ts | 222 +++++++++++++++++++++++++++++++------------ 2 files changed, 166 insertions(+), 75 deletions(-) diff --git a/src/daemon/client.ts b/src/daemon/client.ts index 0f464e9..91b3f2e 100644 --- a/src/daemon/client.ts +++ b/src/daemon/client.ts @@ -3,16 +3,7 @@ import net from 'node:net'; import path from 'node:path'; import { launchDaemonDetached } from './launch.js'; import { getDaemonMetadataPath, getDaemonSocketPath } from './paths.js'; -import type { - CallToolParams, - CloseServerParams, - DaemonRequest, - DaemonRequestMethod, - DaemonResponse, - ListResourcesParams, - ListToolsParams, - StatusResult, -} from './protocol.js'; +import type { CallToolParams, CloseServerParams, DaemonRequest, DaemonRequestMethod, DaemonResponse, ListResourcesParams, ListToolsParams, StatusResult } from './protocol.js'; export interface DaemonClientOptions { readonly configPath: string; @@ -183,7 +174,11 @@ export class DaemonClient { socket.setTimeout(timeoutMs, () => { // If the daemon doesn't answer in time we treat it as a transport error, destroy the socket, // and let invoke() restart the daemon so hung keep-alive servers get a fresh start. - socket.destroy(Object.assign(new Error('Daemon request timed out.'), { code: 'ETIMEDOUT' })); + socket.destroy( + Object.assign(new Error('Daemon request timed out.'), { + code: 'ETIMEDOUT', + }) + ); }); let buffer = ''; socket.on('connect', () => { @@ -191,7 +186,7 @@ export class DaemonClient { if (error) { finishReject(error); } - socket.end(); + // Do not end the socket here; allow the server to respond and close. }); }); socket.on('data', (chunk) => { diff --git a/src/daemon/host.ts b/src/daemon/host.ts index 5fcf22d..6d5cb17 100644 --- a/src/daemon/host.ts +++ b/src/daemon/host.ts @@ -1,10 +1,10 @@ -import fsSync from 'node:fs'; -import fs from 'node:fs/promises'; -import net from 'node:net'; -import path from 'node:path'; -import type { ServerDefinition } from '../config.js'; -import { isKeepAliveServer, keepAliveIdleTimeout } from '../lifecycle.js'; -import { createRuntime, type Runtime } from '../runtime.js'; +import fsSync from "node:fs"; +import fs from "node:fs/promises"; +import net from "node:net"; +import path from "node:path"; +import type { ServerDefinition } from "../config.js"; +import { isKeepAliveServer, keepAliveIdleTimeout } from "../lifecycle.js"; +import { createRuntime, type Runtime } from "../runtime.js"; import type { CallToolParams, CloseServerParams, @@ -13,7 +13,7 @@ import type { ListResourcesParams, ListToolsParams, StatusResult, -} from './protocol.js'; +} from "./protocol.js"; interface DaemonHostOptions { readonly socketPath: string; @@ -35,9 +35,13 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { configPath: options.configPath, rootDir: options.rootDir, }); - const keepAliveDefinitions = runtime.getDefinitions().filter(isKeepAliveServer); + const keepAliveDefinitions = runtime + .getDefinitions() + .filter(isKeepAliveServer); if (keepAliveDefinitions.length === 0) { - throw new Error('No MCP servers require keep-alive; daemon will not start.'); + throw new Error( + "No MCP servers require keep-alive; daemon will not start." + ); } const managedServers = new Map(); for (const definition of keepAliveDefinitions) { @@ -73,18 +77,31 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { }, 30_000); idleWatcher.unref(); - logEvent(logContext, 'Daemon host started.'); + logEvent(logContext, "Daemon host started."); const startedAt = Date.now(); const server = net.createServer({ allowHalfOpen: true }, (socket) => { - socket.setEncoding('utf8'); - let buffer = ''; - socket.on('data', (chunk) => { - buffer += chunk; - }); - socket.on('end', () => { + socket.setEncoding("utf8"); + let buffer = ""; + let handled = false; + const tryHandle = () => { + if (handled) { + return; + } + const trimmed = buffer.trim(); + if (trimmed.length === 0) { + return; + } + // Attempt to parse immediately; if it parses, handle the request now. + try { + JSON.parse(trimmed); + } catch { + // Not a complete JSON yet; wait for more data or 'end' + return; + } + handled = true; void handleSocketRequest( - buffer, + trimmed, socket, runtime, managedServers, @@ -98,16 +115,26 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { logContext, shutdown ); + }; + socket.on("data", (chunk) => { + buffer += chunk; + tryHandle(); }); - socket.on('error', () => { + socket.on("end", () => { + // Fallback: if we haven't handled yet, try now (for compatibility) + if (!handled) { + tryHandle(); + } + }); + socket.on("error", () => { socket.destroy(); }); }); await new Promise((resolve, reject) => { - server.once('error', reject); + server.once("error", reject); server.listen(options.socketPath, () => { - server.off('error', reject); + server.off("error", reject); resolve(); }); }); @@ -125,7 +152,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { null, 2 ), - 'utf8' + "utf8" ); let shuttingDown = false; @@ -134,7 +161,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { return; } shuttingDown = true; - logEvent(logContext, 'Shutting down daemon host.'); + logEvent(logContext, "Shutting down daemon host."); clearInterval(idleWatcher); server.close(); await runtime.close().catch(() => {}); @@ -143,19 +170,19 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { process.exit(0); }; - process.once('SIGINT', shutdown); - process.once('SIGTERM', shutdown); - process.once('SIGQUIT', shutdown); + process.once("SIGINT", shutdown); + process.once("SIGTERM", shutdown); + process.once("SIGQUIT", shutdown); } async function prepareSocket(socketPath: string): Promise { - if (process.platform === 'win32') { + if (process.platform === "win32") { return; } try { await fs.unlink(socketPath); } catch (error) { - if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { throw error; } } @@ -163,7 +190,7 @@ async function prepareSocket(socketPath: string): Promise { } async function cleanupArtifacts(options: DaemonHostOptions): Promise { - if (process.platform !== 'win32') { + if (process.platform !== "win32") { try { await fs.unlink(options.socketPath); } catch { @@ -183,7 +210,12 @@ async function handleSocketRequest( runtime: Runtime, managedServers: Map, activity: Map, - metadata: { configPath: string; socketPath: string; startedAt: number; logPath: string | null }, + metadata: { + configPath: string; + socketPath: string; + startedAt: number; + logPath: string | null; + }, logContext: LogContext, shutdown: () => Promise ): Promise { @@ -209,28 +241,42 @@ async function processRequest( runtime: Runtime, managedServers: Map, activity: Map, - metadata: { configPath: string; socketPath: string; startedAt: number; logPath: string | null }, + metadata: { + configPath: string; + socketPath: string; + startedAt: number; + logPath: string | null; + }, logContext: LogContext ): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> { const trimmed = rawPayload.trim(); if (!trimmed) { - return { response: buildErrorResponse('unknown', 'empty_request'), shouldShutdown: false }; + return { + response: buildErrorResponse("unknown", "empty_request"), + shouldShutdown: false, + }; } let request: DaemonRequest; try { request = JSON.parse(trimmed) as DaemonRequest; } catch (error) { - return { response: buildErrorResponse('unknown', 'invalid_json', error), shouldShutdown: false }; + return { + response: buildErrorResponse("unknown", "invalid_json", error), + shouldShutdown: false, + }; } - const id = request.id ?? 'unknown'; + const id = request.id ?? "unknown"; try { switch (request.method) { - case 'callTool': { + case "callTool": { const params = request.params as CallToolParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); if (loggable) { - logEvent(logContext, `callTool start server=${params.server} tool=${params.tool}`); + logEvent( + logContext, + `callTool start server=${params.server} tool=${params.tool}` + ); } try { const result = await runtime.callTool(params.server, params.tool, { @@ -239,18 +285,24 @@ async function processRequest( }); markActivity(params.server, activity); if (loggable) { - logEvent(logContext, `callTool success server=${params.server} tool=${params.tool}`); + logEvent( + logContext, + `callTool success server=${params.server} tool=${params.tool}` + ); } return { response: { id, ok: true, result }, shouldShutdown: false }; } catch (error) { if (loggable) { const detail = formatError(error); - logEvent(logContext, `callTool error server=${params.server} tool=${params.tool} err=${detail}`); + logEvent( + logContext, + `callTool error server=${params.server} tool=${params.tool} err=${detail}` + ); } throw error; } } - case 'listTools': { + case "listTools": { const params = request.params as ListToolsParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -270,12 +322,15 @@ async function processRequest( } catch (error) { if (loggable) { const detail = formatError(error); - logEvent(logContext, `listTools error server=${params.server} err=${detail}`); + logEvent( + logContext, + `listTools error server=${params.server} err=${detail}` + ); } throw error; } } - case 'listResources': { + case "listResources": { const params = request.params as ListResourcesParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -283,21 +338,30 @@ async function processRequest( logEvent(logContext, `listResources start server=${params.server}`); } try { - const result = await runtime.listResources(params.server, params.params); + const result = await runtime.listResources( + params.server, + params.params + ); markActivity(params.server, activity); if (loggable) { - logEvent(logContext, `listResources success server=${params.server}`); + logEvent( + logContext, + `listResources success server=${params.server}` + ); } return { response: { id, ok: true, result }, shouldShutdown: false }; } catch (error) { if (loggable) { const detail = formatError(error); - logEvent(logContext, `listResources error server=${params.server} err=${detail}`); + logEvent( + logContext, + `listResources error server=${params.server} err=${detail}` + ); } throw error; } } - case 'closeServer': { + case "closeServer": { const params = request.params as CloseServerParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -310,16 +374,22 @@ async function processRequest( if (loggable) { logEvent(logContext, `closeServer success server=${params.server}`); } - return { response: { id, ok: true, result: true }, shouldShutdown: false }; + return { + response: { id, ok: true, result: true }, + shouldShutdown: false, + }; } catch (error) { if (loggable) { const detail = formatError(error); - logEvent(logContext, `closeServer error server=${params.server} err=${detail}`); + logEvent( + logContext, + `closeServer error server=${params.server} err=${detail}` + ); } throw error; } } - case 'status': { + case "status": { const result: StatusResult = { pid: process.pid, startedAt: metadata.startedAt, @@ -337,25 +407,40 @@ async function processRequest( }; return { response: { id, ok: true, result }, shouldShutdown: false }; } - case 'stop': { - logEvent(logContext, 'Received stop request.'); - return { response: { id, ok: true, result: true }, shouldShutdown: true }; + case "stop": { + logEvent(logContext, "Received stop request."); + return { + response: { id, ok: true, result: true }, + shouldShutdown: true, + }; } default: - return { response: buildErrorResponse(id, 'unknown_method'), shouldShutdown: false }; + return { + response: buildErrorResponse(id, "unknown_method"), + shouldShutdown: false, + }; } } catch (error) { - return { response: buildErrorResponse(id, 'runtime_error', error), shouldShutdown: false }; + return { + response: buildErrorResponse(id, "runtime_error", error), + shouldShutdown: false, + }; } } -function ensureManaged(server: string, managedServers: Map): void { +function ensureManaged( + server: string, + managedServers: Map +): void { if (!managedServers.has(server)) { throw new Error(`Server '${server}' is not managed by the daemon.`); } } -function markActivity(server: string, activity: Map): void { +function markActivity( + server: string, + activity: Map +): void { const entry = activity.get(server); if (entry) { entry.connected = true; @@ -390,11 +475,15 @@ async function evictIdleServers( ); } -function buildErrorResponse(id: string, code: string, error?: unknown): DaemonResponse { +function buildErrorResponse( + id: string, + code: string, + error?: unknown +): DaemonResponse { let message = code; if (error instanceof Error) { message = error.message; - } else if (typeof error === 'string') { + } else if (typeof error === "string") { message = error; } return { @@ -420,7 +509,8 @@ function createLogContext(options: { servers: Set; logPath?: string; }): LogContext { - const derivedEnabled = options.enabled || options.logAllServers || options.servers.size > 0; + const derivedEnabled = + options.enabled || options.logAllServers || options.servers.size > 0; const context: LogContext = { enabled: derivedEnabled, logAllServers: options.logAllServers, @@ -429,9 +519,15 @@ function createLogContext(options: { if (derivedEnabled && options.logPath) { try { fsSync.mkdirSync(path.dirname(options.logPath), { recursive: true }); - context.writer = fsSync.createWriteStream(options.logPath, { flags: 'a' }); + context.writer = fsSync.createWriteStream(options.logPath, { + flags: "a", + }); } catch (error) { - console.warn(`[daemon] Failed to open log file ${options.logPath}: ${(error as Error).message}`); + console.warn( + `[daemon] Failed to open log file ${options.logPath}: ${ + (error as Error).message + }` + ); } } return context; @@ -457,7 +553,7 @@ async function disposeLogContext(context: LogContext): Promise { } await new Promise((resolve) => { writer.end(() => resolve()); - writer.on('error', () => resolve()); + writer.on("error", () => resolve()); }); } @@ -475,8 +571,8 @@ function formatError(error: unknown): string { if (error instanceof Error) { return error.message; } - if (typeof error === 'string') { + if (typeof error === "string") { return error; } - return 'unknown'; + return "unknown"; }