diff --git a/src/daemon/client.ts b/src/daemon/client.ts index 91b3f2e..7bbc6db 100644 --- a/src/daemon/client.ts +++ b/src/daemon/client.ts @@ -3,7 +3,16 @@ import net from 'node:net'; import path from 'node:path'; import { launchDaemonDetached } from './launch.js'; import { getDaemonMetadataPath, getDaemonSocketPath } from './paths.js'; -import type { CallToolParams, CloseServerParams, DaemonRequest, DaemonRequestMethod, DaemonResponse, ListResourcesParams, ListToolsParams, StatusResult } from './protocol.js'; +import type { + CallToolParams, + CloseServerParams, + DaemonRequest, + DaemonRequestMethod, + DaemonResponse, + ListResourcesParams, + ListToolsParams, + StatusResult, +} from './protocol.js'; export interface DaemonClientOptions { readonly configPath: string; diff --git a/src/daemon/host.ts b/src/daemon/host.ts index 6d5cb17..1091af0 100644 --- a/src/daemon/host.ts +++ b/src/daemon/host.ts @@ -1,10 +1,10 @@ -import fsSync from "node:fs"; -import fs from "node:fs/promises"; -import net from "node:net"; -import path from "node:path"; -import type { ServerDefinition } from "../config.js"; -import { isKeepAliveServer, keepAliveIdleTimeout } from "../lifecycle.js"; -import { createRuntime, type Runtime } from "../runtime.js"; +import fsSync from 'node:fs'; +import fs from 'node:fs/promises'; +import net from 'node:net'; +import path from 'node:path'; +import type { ServerDefinition } from '../config.js'; +import { isKeepAliveServer, keepAliveIdleTimeout } from '../lifecycle.js'; +import { createRuntime, type Runtime } from '../runtime.js'; import type { CallToolParams, CloseServerParams, @@ -13,7 +13,7 @@ import type { ListResourcesParams, ListToolsParams, StatusResult, -} from "./protocol.js"; +} from './protocol.js'; interface DaemonHostOptions { readonly socketPath: string; @@ -35,13 +35,9 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { configPath: options.configPath, rootDir: options.rootDir, }); - const keepAliveDefinitions = runtime - .getDefinitions() - .filter(isKeepAliveServer); + const keepAliveDefinitions = runtime.getDefinitions().filter(isKeepAliveServer); if (keepAliveDefinitions.length === 0) { - throw new Error( - "No MCP servers require keep-alive; daemon will not start." - ); + throw new Error('No MCP servers require keep-alive; daemon will not start.'); } const managedServers = new Map(); for (const definition of keepAliveDefinitions) { @@ -77,12 +73,12 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { }, 30_000); idleWatcher.unref(); - logEvent(logContext, "Daemon host started."); + logEvent(logContext, 'Daemon host started.'); const startedAt = Date.now(); const server = net.createServer({ allowHalfOpen: true }, (socket) => { - socket.setEncoding("utf8"); - let buffer = ""; + socket.setEncoding('utf8'); + let buffer = ''; let handled = false; const tryHandle = () => { if (handled) { @@ -93,8 +89,9 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { return; } // Attempt to parse immediately; if it parses, handle the request now. + let parsedRequest: DaemonRequest; try { - JSON.parse(trimmed); + parsedRequest = JSON.parse(trimmed) as DaemonRequest; } catch { // Not a complete JSON yet; wait for more data or 'end' return; @@ -113,28 +110,29 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { logPath: options.logPath ?? null, }, logContext, - shutdown + shutdown, + parsedRequest ); }; - socket.on("data", (chunk) => { + socket.on('data', (chunk) => { buffer += chunk; tryHandle(); }); - socket.on("end", () => { + socket.on('end', () => { // Fallback: if we haven't handled yet, try now (for compatibility) if (!handled) { tryHandle(); } }); - socket.on("error", () => { + socket.on('error', () => { socket.destroy(); }); }); await new Promise((resolve, reject) => { - server.once("error", reject); + server.once('error', reject); server.listen(options.socketPath, () => { - server.off("error", reject); + server.off('error', reject); resolve(); }); }); @@ -152,7 +150,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { null, 2 ), - "utf8" + 'utf8' ); let shuttingDown = false; @@ -161,7 +159,7 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { return; } shuttingDown = true; - logEvent(logContext, "Shutting down daemon host."); + logEvent(logContext, 'Shutting down daemon host.'); clearInterval(idleWatcher); server.close(); await runtime.close().catch(() => {}); @@ -170,19 +168,19 @@ export async function runDaemonHost(options: DaemonHostOptions): Promise { process.exit(0); }; - process.once("SIGINT", shutdown); - process.once("SIGTERM", shutdown); - process.once("SIGQUIT", shutdown); + process.once('SIGINT', shutdown); + process.once('SIGTERM', shutdown); + process.once('SIGQUIT', shutdown); } async function prepareSocket(socketPath: string): Promise { - if (process.platform === "win32") { + if (process.platform === 'win32') { return; } try { await fs.unlink(socketPath); } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { throw error; } } @@ -190,7 +188,7 @@ async function prepareSocket(socketPath: string): Promise { } async function cleanupArtifacts(options: DaemonHostOptions): Promise { - if (process.platform !== "win32") { + if (process.platform !== 'win32') { try { await fs.unlink(options.socketPath); } catch { @@ -217,7 +215,8 @@ async function handleSocketRequest( logPath: string | null; }, logContext: LogContext, - shutdown: () => Promise + shutdown: () => Promise, + preParsedRequest?: DaemonRequest ): Promise { const { response, shouldShutdown } = await processRequest( rawPayload, @@ -225,7 +224,8 @@ async function handleSocketRequest( managedServers, activity, metadata, - logContext + logContext, + preParsedRequest ); socket.write(JSON.stringify(response), () => { socket.end(() => { @@ -247,36 +247,38 @@ async function processRequest( startedAt: number; logPath: string | null; }, - logContext: LogContext + logContext: LogContext, + preParsedRequest?: DaemonRequest ): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> { const trimmed = rawPayload.trim(); - if (!trimmed) { + if (!trimmed && !preParsedRequest) { return { - response: buildErrorResponse("unknown", "empty_request"), + response: buildErrorResponse('unknown', 'empty_request'), shouldShutdown: false, }; } let request: DaemonRequest; - try { - request = JSON.parse(trimmed) as DaemonRequest; - } catch (error) { - return { - response: buildErrorResponse("unknown", "invalid_json", error), - shouldShutdown: false, - }; + if (preParsedRequest) { + request = preParsedRequest; + } else { + try { + request = JSON.parse(trimmed) as DaemonRequest; + } catch (error) { + return { + response: buildErrorResponse('unknown', 'invalid_json', error), + shouldShutdown: false, + }; + } } - const id = request.id ?? "unknown"; + const id = request.id ?? 'unknown'; try { switch (request.method) { - case "callTool": { + case 'callTool': { const params = request.params as CallToolParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); if (loggable) { - logEvent( - logContext, - `callTool start server=${params.server} tool=${params.tool}` - ); + logEvent(logContext, `callTool start server=${params.server} tool=${params.tool}`); } try { const result = await runtime.callTool(params.server, params.tool, { @@ -285,24 +287,18 @@ async function processRequest( }); markActivity(params.server, activity); if (loggable) { - logEvent( - logContext, - `callTool success server=${params.server} tool=${params.tool}` - ); + logEvent(logContext, `callTool success server=${params.server} tool=${params.tool}`); } return { response: { id, ok: true, result }, shouldShutdown: false }; } catch (error) { if (loggable) { const detail = formatError(error); - logEvent( - logContext, - `callTool error server=${params.server} tool=${params.tool} err=${detail}` - ); + logEvent(logContext, `callTool error server=${params.server} tool=${params.tool} err=${detail}`); } throw error; } } - case "listTools": { + case 'listTools': { const params = request.params as ListToolsParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -322,15 +318,12 @@ async function processRequest( } catch (error) { if (loggable) { const detail = formatError(error); - logEvent( - logContext, - `listTools error server=${params.server} err=${detail}` - ); + logEvent(logContext, `listTools error server=${params.server} err=${detail}`); } throw error; } } - case "listResources": { + case 'listResources': { const params = request.params as ListResourcesParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -338,30 +331,21 @@ async function processRequest( logEvent(logContext, `listResources start server=${params.server}`); } try { - const result = await runtime.listResources( - params.server, - params.params - ); + const result = await runtime.listResources(params.server, params.params); markActivity(params.server, activity); if (loggable) { - logEvent( - logContext, - `listResources success server=${params.server}` - ); + logEvent(logContext, `listResources success server=${params.server}`); } return { response: { id, ok: true, result }, shouldShutdown: false }; } catch (error) { if (loggable) { const detail = formatError(error); - logEvent( - logContext, - `listResources error server=${params.server} err=${detail}` - ); + logEvent(logContext, `listResources error server=${params.server} err=${detail}`); } throw error; } } - case "closeServer": { + case 'closeServer': { const params = request.params as CloseServerParams; ensureManaged(params.server, managedServers); const loggable = shouldLogServer(logContext, params.server); @@ -381,15 +365,12 @@ async function processRequest( } catch (error) { if (loggable) { const detail = formatError(error); - logEvent( - logContext, - `closeServer error server=${params.server} err=${detail}` - ); + logEvent(logContext, `closeServer error server=${params.server} err=${detail}`); } throw error; } } - case "status": { + case 'status': { const result: StatusResult = { pid: process.pid, startedAt: metadata.startedAt, @@ -407,8 +388,8 @@ async function processRequest( }; return { response: { id, ok: true, result }, shouldShutdown: false }; } - case "stop": { - logEvent(logContext, "Received stop request."); + case 'stop': { + logEvent(logContext, 'Received stop request.'); return { response: { id, ok: true, result: true }, shouldShutdown: true, @@ -416,31 +397,25 @@ async function processRequest( } default: return { - response: buildErrorResponse(id, "unknown_method"), + response: buildErrorResponse(id, 'unknown_method'), shouldShutdown: false, }; } } catch (error) { return { - response: buildErrorResponse(id, "runtime_error", error), + response: buildErrorResponse(id, 'runtime_error', error), shouldShutdown: false, }; } } -function ensureManaged( - server: string, - managedServers: Map -): void { +function ensureManaged(server: string, managedServers: Map): void { if (!managedServers.has(server)) { throw new Error(`Server '${server}' is not managed by the daemon.`); } } -function markActivity( - server: string, - activity: Map -): void { +function markActivity(server: string, activity: Map): void { const entry = activity.get(server); if (entry) { entry.connected = true; @@ -475,15 +450,11 @@ async function evictIdleServers( ); } -function buildErrorResponse( - id: string, - code: string, - error?: unknown -): DaemonResponse { +function buildErrorResponse(id: string, code: string, error?: unknown): DaemonResponse { let message = code; if (error instanceof Error) { message = error.message; - } else if (typeof error === "string") { + } else if (typeof error === 'string') { message = error; } return { @@ -509,8 +480,7 @@ function createLogContext(options: { servers: Set; logPath?: string; }): LogContext { - const derivedEnabled = - options.enabled || options.logAllServers || options.servers.size > 0; + const derivedEnabled = options.enabled || options.logAllServers || options.servers.size > 0; const context: LogContext = { enabled: derivedEnabled, logAllServers: options.logAllServers, @@ -520,14 +490,10 @@ function createLogContext(options: { try { fsSync.mkdirSync(path.dirname(options.logPath), { recursive: true }); context.writer = fsSync.createWriteStream(options.logPath, { - flags: "a", + flags: 'a', }); } catch (error) { - console.warn( - `[daemon] Failed to open log file ${options.logPath}: ${ - (error as Error).message - }` - ); + console.warn(`[daemon] Failed to open log file ${options.logPath}: ${(error as Error).message}`); } } return context; @@ -553,7 +519,7 @@ async function disposeLogContext(context: LogContext): Promise { } await new Promise((resolve) => { writer.end(() => resolve()); - writer.on("error", () => resolve()); + writer.on('error', () => resolve()); }); } @@ -571,8 +537,25 @@ function formatError(error: unknown): string { if (error instanceof Error) { return error.message; } - if (typeof error === "string") { + if (typeof error === 'string') { return error; } - return "unknown"; + return 'unknown'; +} + +export async function __testProcessRequest( + rawPayload: string, + runtime: Runtime, + managedServers: Map, + activity: Map, + metadata: { + configPath: string; + socketPath: string; + startedAt: number; + logPath: string | null; + }, + logContext: LogContext, + preParsedRequest?: DaemonRequest +): Promise<{ response: DaemonResponse; shouldShutdown: boolean }> { + return await processRequest(rawPayload, runtime, managedServers, activity, metadata, logContext, preParsedRequest); } diff --git a/tests/daemon-host.test.ts b/tests/daemon-host.test.ts new file mode 100644 index 0000000..d78775d --- /dev/null +++ b/tests/daemon-host.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it } from 'vitest'; +import type { ServerDefinition } from '../src/config.js'; +import { __testProcessRequest } from '../src/daemon/host.js'; +import type { DaemonRequest } from '../src/daemon/protocol.js'; +import type { Runtime } from '../src/runtime.js'; + +describe('daemon host request handling', () => { + it('reuses pre-parsed requests without reparsing payloads', async () => { + const metadata = { + configPath: '/tmp/config.json', + socketPath: '/tmp/socket', + startedAt: Date.now(), + logPath: null, + }; + const logContext = { enabled: false, logAllServers: false, servers: new Set() }; + + const parsedRequest: DaemonRequest = { id: '1', method: 'status', params: {} }; + const result = await __testProcessRequest( + '!!!invalid-json!!!', + {} as Runtime, + new Map(), + new Map(), + metadata, + logContext, + parsedRequest + ); + + expect(result.response.ok).toBe(true); + expect(result.shouldShutdown).toBe(false); + }); +});