Fix with windows pipe failure

This commit is contained in:
Benjamin Grosse 2025-11-12 23:58:57 +01:00 committed by Peter Steinberger
parent fcca731977
commit 73fa3fa387
2 changed files with 166 additions and 75 deletions

View File

@ -3,16 +3,7 @@ import net from 'node:net';
import path from 'node:path';
import { launchDaemonDetached } from './launch.js';
import { getDaemonMetadataPath, getDaemonSocketPath } from './paths.js';
import type {
CallToolParams,
CloseServerParams,
DaemonRequest,
DaemonRequestMethod,
DaemonResponse,
ListResourcesParams,
ListToolsParams,
StatusResult,
} from './protocol.js';
import type { CallToolParams, CloseServerParams, DaemonRequest, DaemonRequestMethod, DaemonResponse, ListResourcesParams, ListToolsParams, StatusResult } from './protocol.js';
export interface DaemonClientOptions {
readonly configPath: string;
@ -183,7 +174,11 @@ export class DaemonClient {
socket.setTimeout(timeoutMs, () => {
// If the daemon doesn't answer in time we treat it as a transport error, destroy the socket,
// and let invoke() restart the daemon so hung keep-alive servers get a fresh start.
socket.destroy(Object.assign(new Error('Daemon request timed out.'), { code: 'ETIMEDOUT' }));
socket.destroy(
Object.assign(new Error('Daemon request timed out.'), {
code: 'ETIMEDOUT',
})
);
});
let buffer = '';
socket.on('connect', () => {
@ -191,7 +186,7 @@ export class DaemonClient {
if (error) {
finishReject(error);
}
socket.end();
// Do not end the socket here; allow the server to respond and close.
});
});
socket.on('data', (chunk) => {

View File

@ -1,10 +1,10 @@
import fsSync from 'node:fs';
import fs from 'node:fs/promises';
import net from 'node:net';
import path from 'node:path';
import type { ServerDefinition } from '../config.js';
import { isKeepAliveServer, keepAliveIdleTimeout } from '../lifecycle.js';
import { createRuntime, type Runtime } from '../runtime.js';
import fsSync from "node:fs";
import fs from "node:fs/promises";
import net from "node:net";
import path from "node:path";
import type { ServerDefinition } from "../config.js";
import { isKeepAliveServer, keepAliveIdleTimeout } from "../lifecycle.js";
import { createRuntime, type Runtime } from "../runtime.js";
import type {
CallToolParams,
CloseServerParams,
@ -13,7 +13,7 @@ import type {
ListResourcesParams,
ListToolsParams,
StatusResult,
} from './protocol.js';
} from "./protocol.js";
interface DaemonHostOptions {
readonly socketPath: string;
@ -35,9 +35,13 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
configPath: options.configPath,
rootDir: options.rootDir,
});
const keepAliveDefinitions = runtime.getDefinitions().filter(isKeepAliveServer);
const keepAliveDefinitions = runtime
.getDefinitions()
.filter(isKeepAliveServer);
if (keepAliveDefinitions.length === 0) {
throw new Error('No MCP servers require keep-alive; daemon will not start.');
throw new Error(
"No MCP servers require keep-alive; daemon will not start."
);
}
const managedServers = new Map<string, ServerDefinition>();
for (const definition of keepAliveDefinitions) {
@ -73,18 +77,31 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
}, 30_000);
idleWatcher.unref();
logEvent(logContext, 'Daemon host started.');
logEvent(logContext, "Daemon host started.");
const startedAt = Date.now();
const server = net.createServer({ allowHalfOpen: true }, (socket) => {
socket.setEncoding('utf8');
let buffer = '';
socket.on('data', (chunk) => {
buffer += chunk;
});
socket.on('end', () => {
socket.setEncoding("utf8");
let buffer = "";
let handled = false;
const tryHandle = () => {
if (handled) {
return;
}
const trimmed = buffer.trim();
if (trimmed.length === 0) {
return;
}
// Attempt to parse immediately; if it parses, handle the request now.
try {
JSON.parse(trimmed);
} catch {
// Not a complete JSON yet; wait for more data or 'end'
return;
}
handled = true;
void handleSocketRequest(
buffer,
trimmed,
socket,
runtime,
managedServers,
@ -98,16 +115,26 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
logContext,
shutdown
);
};
socket.on("data", (chunk) => {
buffer += chunk;
tryHandle();
});
socket.on('error', () => {
socket.on("end", () => {
// Fallback: if we haven't handled yet, try now (for compatibility)
if (!handled) {
tryHandle();
}
});
socket.on("error", () => {
socket.destroy();
});
});
await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.once("error", reject);
server.listen(options.socketPath, () => {
server.off('error', reject);
server.off("error", reject);
resolve();
});
});
@ -125,7 +152,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
null,
2
),
'utf8'
"utf8"
);
let shuttingDown = false;
@ -134,7 +161,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
return;
}
shuttingDown = true;
logEvent(logContext, 'Shutting down daemon host.');
logEvent(logContext, "Shutting down daemon host.");
clearInterval(idleWatcher);
server.close();
await runtime.close().catch(() => {});
@ -143,19 +170,19 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
process.exit(0);
};
process.once('SIGINT', shutdown);
process.once('SIGTERM', shutdown);
process.once('SIGQUIT', shutdown);
process.once("SIGINT", shutdown);
process.once("SIGTERM", shutdown);
process.once("SIGQUIT", shutdown);
}
async function prepareSocket(socketPath: string): Promise<void> {
if (process.platform === 'win32') {
if (process.platform === "win32") {
return;
}
try {
await fs.unlink(socketPath);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
}
@ -163,7 +190,7 @@ async function prepareSocket(socketPath: string): Promise<void> {
}
async function cleanupArtifacts(options: DaemonHostOptions): Promise<void> {
if (process.platform !== 'win32') {
if (process.platform !== "win32") {
try {
await fs.unlink(options.socketPath);
} catch {
@ -183,7 +210,12 @@ async function handleSocketRequest(
runtime: Runtime,
managedServers: Map<string, ServerDefinition>,
activity: Map<string, ServerActivity>,
metadata: { configPath: string; socketPath: string; startedAt: number; logPath: string | null },
metadata: {
configPath: string;
socketPath: string;
startedAt: number;
logPath: string | null;
},
logContext: LogContext,
shutdown: () => Promise<void>
): Promise<void> {
@ -209,28 +241,42 @@ async function processRequest(
runtime: Runtime,
managedServers: Map<string, ServerDefinition>,
activity: Map<string, ServerActivity>,
metadata: { configPath: string; socketPath: string; startedAt: number; logPath: string | null },
metadata: {
configPath: string;
socketPath: string;
startedAt: number;
logPath: string | null;
},
logContext: LogContext
): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> {
const trimmed = rawPayload.trim();
if (!trimmed) {
return { response: buildErrorResponse('unknown', 'empty_request'), shouldShutdown: false };
return {
response: buildErrorResponse("unknown", "empty_request"),
shouldShutdown: false,
};
}
let request: DaemonRequest;
try {
request = JSON.parse(trimmed) as DaemonRequest;
} catch (error) {
return { response: buildErrorResponse('unknown', 'invalid_json', error), shouldShutdown: false };
return {
response: buildErrorResponse("unknown", "invalid_json", error),
shouldShutdown: false,
};
}
const id = request.id ?? 'unknown';
const id = request.id ?? "unknown";
try {
switch (request.method) {
case 'callTool': {
case "callTool": {
const params = request.params as CallToolParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
if (loggable) {
logEvent(logContext, `callTool start server=${params.server} tool=${params.tool}`);
logEvent(
logContext,
`callTool start server=${params.server} tool=${params.tool}`
);
}
try {
const result = await runtime.callTool(params.server, params.tool, {
@ -239,18 +285,24 @@ async function processRequest(
});
markActivity(params.server, activity);
if (loggable) {
logEvent(logContext, `callTool success server=${params.server} tool=${params.tool}`);
logEvent(
logContext,
`callTool success server=${params.server} tool=${params.tool}`
);
}
return { response: { id, ok: true, result }, shouldShutdown: false };
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(logContext, `callTool error server=${params.server} tool=${params.tool} err=${detail}`);
logEvent(
logContext,
`callTool error server=${params.server} tool=${params.tool} err=${detail}`
);
}
throw error;
}
}
case 'listTools': {
case "listTools": {
const params = request.params as ListToolsParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -270,12 +322,15 @@ async function processRequest(
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(logContext, `listTools error server=${params.server} err=${detail}`);
logEvent(
logContext,
`listTools error server=${params.server} err=${detail}`
);
}
throw error;
}
}
case 'listResources': {
case "listResources": {
const params = request.params as ListResourcesParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -283,21 +338,30 @@ async function processRequest(
logEvent(logContext, `listResources start server=${params.server}`);
}
try {
const result = await runtime.listResources(params.server, params.params);
const result = await runtime.listResources(
params.server,
params.params
);
markActivity(params.server, activity);
if (loggable) {
logEvent(logContext, `listResources success server=${params.server}`);
logEvent(
logContext,
`listResources success server=${params.server}`
);
}
return { response: { id, ok: true, result }, shouldShutdown: false };
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(logContext, `listResources error server=${params.server} err=${detail}`);
logEvent(
logContext,
`listResources error server=${params.server} err=${detail}`
);
}
throw error;
}
}
case 'closeServer': {
case "closeServer": {
const params = request.params as CloseServerParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -310,16 +374,22 @@ async function processRequest(
if (loggable) {
logEvent(logContext, `closeServer success server=${params.server}`);
}
return { response: { id, ok: true, result: true }, shouldShutdown: false };
return {
response: { id, ok: true, result: true },
shouldShutdown: false,
};
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(logContext, `closeServer error server=${params.server} err=${detail}`);
logEvent(
logContext,
`closeServer error server=${params.server} err=${detail}`
);
}
throw error;
}
}
case 'status': {
case "status": {
const result: StatusResult = {
pid: process.pid,
startedAt: metadata.startedAt,
@ -337,25 +407,40 @@ async function processRequest(
};
return { response: { id, ok: true, result }, shouldShutdown: false };
}
case 'stop': {
logEvent(logContext, 'Received stop request.');
return { response: { id, ok: true, result: true }, shouldShutdown: true };
case "stop": {
logEvent(logContext, "Received stop request.");
return {
response: { id, ok: true, result: true },
shouldShutdown: true,
};
}
default:
return { response: buildErrorResponse(id, 'unknown_method'), shouldShutdown: false };
return {
response: buildErrorResponse(id, "unknown_method"),
shouldShutdown: false,
};
}
} catch (error) {
return { response: buildErrorResponse(id, 'runtime_error', error), shouldShutdown: false };
return {
response: buildErrorResponse(id, "runtime_error", error),
shouldShutdown: false,
};
}
}
function ensureManaged(server: string, managedServers: Map<string, ServerDefinition>): void {
function ensureManaged(
server: string,
managedServers: Map<string, ServerDefinition>
): void {
if (!managedServers.has(server)) {
throw new Error(`Server '${server}' is not managed by the daemon.`);
}
}
function markActivity(server: string, activity: Map<string, ServerActivity>): void {
function markActivity(
server: string,
activity: Map<string, ServerActivity>
): void {
const entry = activity.get(server);
if (entry) {
entry.connected = true;
@ -390,11 +475,15 @@ async function evictIdleServers(
);
}
function buildErrorResponse(id: string, code: string, error?: unknown): DaemonResponse {
function buildErrorResponse(
id: string,
code: string,
error?: unknown
): DaemonResponse {
let message = code;
if (error instanceof Error) {
message = error.message;
} else if (typeof error === 'string') {
} else if (typeof error === "string") {
message = error;
}
return {
@ -420,7 +509,8 @@ function createLogContext(options: {
servers: Set<string>;
logPath?: string;
}): LogContext {
const derivedEnabled = options.enabled || options.logAllServers || options.servers.size > 0;
const derivedEnabled =
options.enabled || options.logAllServers || options.servers.size > 0;
const context: LogContext = {
enabled: derivedEnabled,
logAllServers: options.logAllServers,
@ -429,9 +519,15 @@ function createLogContext(options: {
if (derivedEnabled && options.logPath) {
try {
fsSync.mkdirSync(path.dirname(options.logPath), { recursive: true });
context.writer = fsSync.createWriteStream(options.logPath, { flags: 'a' });
context.writer = fsSync.createWriteStream(options.logPath, {
flags: "a",
});
} catch (error) {
console.warn(`[daemon] Failed to open log file ${options.logPath}: ${(error as Error).message}`);
console.warn(
`[daemon] Failed to open log file ${options.logPath}: ${
(error as Error).message
}`
);
}
}
return context;
@ -457,7 +553,7 @@ async function disposeLogContext(context: LogContext): Promise<void> {
}
await new Promise<void>((resolve) => {
writer.end(() => resolve());
writer.on('error', () => resolve());
writer.on("error", () => resolve());
});
}
@ -475,8 +571,8 @@ function formatError(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
if (typeof error === 'string') {
if (typeof error === "string") {
return error;
}
return 'unknown';
return "unknown";
}