fix(daemon): stabilize windows pipe parsing

This commit is contained in:
Peter Steinberger 2025-11-17 19:55:21 +00:00
parent 73fa3fa387
commit a515dcfc72
3 changed files with 135 additions and 112 deletions

View File

@ -3,7 +3,16 @@ import net from 'node:net';
import path from 'node:path';
import { launchDaemonDetached } from './launch.js';
import { getDaemonMetadataPath, getDaemonSocketPath } from './paths.js';
import type { CallToolParams, CloseServerParams, DaemonRequest, DaemonRequestMethod, DaemonResponse, ListResourcesParams, ListToolsParams, StatusResult } from './protocol.js';
import type {
CallToolParams,
CloseServerParams,
DaemonRequest,
DaemonRequestMethod,
DaemonResponse,
ListResourcesParams,
ListToolsParams,
StatusResult,
} from './protocol.js';
export interface DaemonClientOptions {
readonly configPath: string;

View File

@ -1,10 +1,10 @@
import fsSync from "node:fs";
import fs from "node:fs/promises";
import net from "node:net";
import path from "node:path";
import type { ServerDefinition } from "../config.js";
import { isKeepAliveServer, keepAliveIdleTimeout } from "../lifecycle.js";
import { createRuntime, type Runtime } from "../runtime.js";
import fsSync from 'node:fs';
import fs from 'node:fs/promises';
import net from 'node:net';
import path from 'node:path';
import type { ServerDefinition } from '../config.js';
import { isKeepAliveServer, keepAliveIdleTimeout } from '../lifecycle.js';
import { createRuntime, type Runtime } from '../runtime.js';
import type {
CallToolParams,
CloseServerParams,
@ -13,7 +13,7 @@ import type {
ListResourcesParams,
ListToolsParams,
StatusResult,
} from "./protocol.js";
} from './protocol.js';
interface DaemonHostOptions {
readonly socketPath: string;
@ -35,13 +35,9 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
configPath: options.configPath,
rootDir: options.rootDir,
});
const keepAliveDefinitions = runtime
.getDefinitions()
.filter(isKeepAliveServer);
const keepAliveDefinitions = runtime.getDefinitions().filter(isKeepAliveServer);
if (keepAliveDefinitions.length === 0) {
throw new Error(
"No MCP servers require keep-alive; daemon will not start."
);
throw new Error('No MCP servers require keep-alive; daemon will not start.');
}
const managedServers = new Map<string, ServerDefinition>();
for (const definition of keepAliveDefinitions) {
@ -77,12 +73,12 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
}, 30_000);
idleWatcher.unref();
logEvent(logContext, "Daemon host started.");
logEvent(logContext, 'Daemon host started.');
const startedAt = Date.now();
const server = net.createServer({ allowHalfOpen: true }, (socket) => {
socket.setEncoding("utf8");
let buffer = "";
socket.setEncoding('utf8');
let buffer = '';
let handled = false;
const tryHandle = () => {
if (handled) {
@ -93,8 +89,9 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
return;
}
// Attempt to parse immediately; if it parses, handle the request now.
let parsedRequest: DaemonRequest;
try {
JSON.parse(trimmed);
parsedRequest = JSON.parse(trimmed) as DaemonRequest;
} catch {
// Not a complete JSON yet; wait for more data or 'end'
return;
@ -113,28 +110,29 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
logPath: options.logPath ?? null,
},
logContext,
shutdown
shutdown,
parsedRequest
);
};
socket.on("data", (chunk) => {
socket.on('data', (chunk) => {
buffer += chunk;
tryHandle();
});
socket.on("end", () => {
socket.on('end', () => {
// Fallback: if we haven't handled yet, try now (for compatibility)
if (!handled) {
tryHandle();
}
});
socket.on("error", () => {
socket.on('error', () => {
socket.destroy();
});
});
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.once('error', reject);
server.listen(options.socketPath, () => {
server.off("error", reject);
server.off('error', reject);
resolve();
});
});
@ -152,7 +150,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
null,
2
),
"utf8"
'utf8'
);
let shuttingDown = false;
@ -161,7 +159,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
return;
}
shuttingDown = true;
logEvent(logContext, "Shutting down daemon host.");
logEvent(logContext, 'Shutting down daemon host.');
clearInterval(idleWatcher);
server.close();
await runtime.close().catch(() => {});
@ -170,19 +168,19 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise<void> {
process.exit(0);
};
process.once("SIGINT", shutdown);
process.once("SIGTERM", shutdown);
process.once("SIGQUIT", shutdown);
process.once('SIGINT', shutdown);
process.once('SIGTERM', shutdown);
process.once('SIGQUIT', shutdown);
}
async function prepareSocket(socketPath: string): Promise<void> {
if (process.platform === "win32") {
if (process.platform === 'win32') {
return;
}
try {
await fs.unlink(socketPath);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
}
@ -190,7 +188,7 @@ async function prepareSocket(socketPath: string): Promise<void> {
}
async function cleanupArtifacts(options: DaemonHostOptions): Promise<void> {
if (process.platform !== "win32") {
if (process.platform !== 'win32') {
try {
await fs.unlink(options.socketPath);
} catch {
@ -217,7 +215,8 @@ async function handleSocketRequest(
logPath: string | null;
},
logContext: LogContext,
shutdown: () => Promise<void>
shutdown: () => Promise<void>,
preParsedRequest?: DaemonRequest
): Promise<void> {
const { response, shouldShutdown } = await processRequest(
rawPayload,
@ -225,7 +224,8 @@ async function handleSocketRequest(
managedServers,
activity,
metadata,
logContext
logContext,
preParsedRequest
);
socket.write(JSON.stringify(response), () => {
socket.end(() => {
@ -247,36 +247,38 @@ async function processRequest(
startedAt: number;
logPath: string | null;
},
logContext: LogContext
logContext: LogContext,
preParsedRequest?: DaemonRequest
): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> {
const trimmed = rawPayload.trim();
if (!trimmed) {
if (!trimmed && !preParsedRequest) {
return {
response: buildErrorResponse("unknown", "empty_request"),
response: buildErrorResponse('unknown', 'empty_request'),
shouldShutdown: false,
};
}
let request: DaemonRequest;
try {
request = JSON.parse(trimmed) as DaemonRequest;
} catch (error) {
return {
response: buildErrorResponse("unknown", "invalid_json", error),
shouldShutdown: false,
};
if (preParsedRequest) {
request = preParsedRequest;
} else {
try {
request = JSON.parse(trimmed) as DaemonRequest;
} catch (error) {
return {
response: buildErrorResponse('unknown', 'invalid_json', error),
shouldShutdown: false,
};
}
}
const id = request.id ?? "unknown";
const id = request.id ?? 'unknown';
try {
switch (request.method) {
case "callTool": {
case 'callTool': {
const params = request.params as CallToolParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
if (loggable) {
logEvent(
logContext,
`callTool start server=${params.server} tool=${params.tool}`
);
logEvent(logContext, `callTool start server=${params.server} tool=${params.tool}`);
}
try {
const result = await runtime.callTool(params.server, params.tool, {
@ -285,24 +287,18 @@ async function processRequest(
});
markActivity(params.server, activity);
if (loggable) {
logEvent(
logContext,
`callTool success server=${params.server} tool=${params.tool}`
);
logEvent(logContext, `callTool success server=${params.server} tool=${params.tool}`);
}
return { response: { id, ok: true, result }, shouldShutdown: false };
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(
logContext,
`callTool error server=${params.server} tool=${params.tool} err=${detail}`
);
logEvent(logContext, `callTool error server=${params.server} tool=${params.tool} err=${detail}`);
}
throw error;
}
}
case "listTools": {
case 'listTools': {
const params = request.params as ListToolsParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -322,15 +318,12 @@ async function processRequest(
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(
logContext,
`listTools error server=${params.server} err=${detail}`
);
logEvent(logContext, `listTools error server=${params.server} err=${detail}`);
}
throw error;
}
}
case "listResources": {
case 'listResources': {
const params = request.params as ListResourcesParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -338,30 +331,21 @@ async function processRequest(
logEvent(logContext, `listResources start server=${params.server}`);
}
try {
const result = await runtime.listResources(
params.server,
params.params
);
const result = await runtime.listResources(params.server, params.params);
markActivity(params.server, activity);
if (loggable) {
logEvent(
logContext,
`listResources success server=${params.server}`
);
logEvent(logContext, `listResources success server=${params.server}`);
}
return { response: { id, ok: true, result }, shouldShutdown: false };
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(
logContext,
`listResources error server=${params.server} err=${detail}`
);
logEvent(logContext, `listResources error server=${params.server} err=${detail}`);
}
throw error;
}
}
case "closeServer": {
case 'closeServer': {
const params = request.params as CloseServerParams;
ensureManaged(params.server, managedServers);
const loggable = shouldLogServer(logContext, params.server);
@ -381,15 +365,12 @@ async function processRequest(
} catch (error) {
if (loggable) {
const detail = formatError(error);
logEvent(
logContext,
`closeServer error server=${params.server} err=${detail}`
);
logEvent(logContext, `closeServer error server=${params.server} err=${detail}`);
}
throw error;
}
}
case "status": {
case 'status': {
const result: StatusResult = {
pid: process.pid,
startedAt: metadata.startedAt,
@ -407,8 +388,8 @@ async function processRequest(
};
return { response: { id, ok: true, result }, shouldShutdown: false };
}
case "stop": {
logEvent(logContext, "Received stop request.");
case 'stop': {
logEvent(logContext, 'Received stop request.');
return {
response: { id, ok: true, result: true },
shouldShutdown: true,
@ -416,31 +397,25 @@ async function processRequest(
}
default:
return {
response: buildErrorResponse(id, "unknown_method"),
response: buildErrorResponse(id, 'unknown_method'),
shouldShutdown: false,
};
}
} catch (error) {
return {
response: buildErrorResponse(id, "runtime_error", error),
response: buildErrorResponse(id, 'runtime_error', error),
shouldShutdown: false,
};
}
}
function ensureManaged(
server: string,
managedServers: Map<string, ServerDefinition>
): void {
function ensureManaged(server: string, managedServers: Map<string, ServerDefinition>): void {
if (!managedServers.has(server)) {
throw new Error(`Server '${server}' is not managed by the daemon.`);
}
}
function markActivity(
server: string,
activity: Map<string, ServerActivity>
): void {
function markActivity(server: string, activity: Map<string, ServerActivity>): void {
const entry = activity.get(server);
if (entry) {
entry.connected = true;
@ -475,15 +450,11 @@ async function evictIdleServers(
);
}
function buildErrorResponse(
id: string,
code: string,
error?: unknown
): DaemonResponse {
function buildErrorResponse(id: string, code: string, error?: unknown): DaemonResponse {
let message = code;
if (error instanceof Error) {
message = error.message;
} else if (typeof error === "string") {
} else if (typeof error === 'string') {
message = error;
}
return {
@ -509,8 +480,7 @@ function createLogContext(options: {
servers: Set<string>;
logPath?: string;
}): LogContext {
const derivedEnabled =
options.enabled || options.logAllServers || options.servers.size > 0;
const derivedEnabled = options.enabled || options.logAllServers || options.servers.size > 0;
const context: LogContext = {
enabled: derivedEnabled,
logAllServers: options.logAllServers,
@ -520,14 +490,10 @@ function createLogContext(options: {
try {
fsSync.mkdirSync(path.dirname(options.logPath), { recursive: true });
context.writer = fsSync.createWriteStream(options.logPath, {
flags: "a",
flags: 'a',
});
} catch (error) {
console.warn(
`[daemon] Failed to open log file ${options.logPath}: ${
(error as Error).message
}`
);
console.warn(`[daemon] Failed to open log file ${options.logPath}: ${(error as Error).message}`);
}
}
return context;
@ -553,7 +519,7 @@ async function disposeLogContext(context: LogContext): Promise<void> {
}
await new Promise<void>((resolve) => {
writer.end(() => resolve());
writer.on("error", () => resolve());
writer.on('error', () => resolve());
});
}
@ -571,8 +537,25 @@ function formatError(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
if (typeof error === "string") {
if (typeof error === 'string') {
return error;
}
return "unknown";
return 'unknown';
}
export async function __testProcessRequest(
rawPayload: string,
runtime: Runtime,
managedServers: Map<string, ServerDefinition>,
activity: Map<string, ServerActivity>,
metadata: {
configPath: string;
socketPath: string;
startedAt: number;
logPath: string | null;
},
logContext: LogContext,
preParsedRequest?: DaemonRequest
): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> {
return await processRequest(rawPayload, runtime, managedServers, activity, metadata, logContext, preParsedRequest);
}

31
tests/daemon-host.test.ts Normal file
View File

@ -0,0 +1,31 @@
import { describe, expect, it } from 'vitest';
import type { ServerDefinition } from '../src/config.js';
import { __testProcessRequest } from '../src/daemon/host.js';
import type { DaemonRequest } from '../src/daemon/protocol.js';
import type { Runtime } from '../src/runtime.js';
describe('daemon host request handling', () => {
it('reuses pre-parsed requests without reparsing payloads', async () => {
const metadata = {
configPath: '/tmp/config.json',
socketPath: '/tmp/socket',
startedAt: Date.now(),
logPath: null,
};
const logContext = { enabled: false, logAllServers: false, servers: new Set<string>() };
const parsedRequest: DaemonRequest = { id: '1', method: 'status', params: {} };
const result = await __testProcessRequest(
'!!!invalid-json!!!',
{} as Runtime,
new Map<string, ServerDefinition>(),
new Map(),
metadata,
logContext,
parsedRequest
);
expect(result.response.ok).toBe(true);
expect(result.shouldShutdown).toBe(false);
});
});