feat: add concurrent provider pressure scenario

This commit is contained in:
Shakker 2026-05-01 08:39:54 +01:00
parent d601bde7f8
commit 3f649687f5
No known key found for this signature in database
10 changed files with 488 additions and 8 deletions

View File

@ -31,6 +31,7 @@
{ "scenario": "agent-provider-timeout", "state": "mock-openai-provider", "timeoutMs": 180000 },
{ "scenario": "agent-provider-malformed", "state": "mock-openai-provider", "timeoutMs": 180000 },
{ "scenario": "agent-provider-streaming-stall", "state": "mock-openai-provider", "timeoutMs": 180000 },
{ "scenario": "agent-provider-concurrent", "state": "mock-openai-provider", "timeoutMs": 240000 },
{ "scenario": "agent-provider-recovery", "state": "mock-openai-provider", "timeoutMs": 240000 },
{ "scenario": "dashboard-readiness", "state": "fresh" },
{ "scenario": "tui-responsiveness", "state": "fresh" },

View File

@ -97,6 +97,7 @@
{ "scenario": "agent-provider-timeout", "state": "mock-openai-provider" },
{ "scenario": "agent-provider-malformed", "state": "mock-openai-provider" },
{ "scenario": "agent-provider-streaming-stall", "state": "mock-openai-provider" },
{ "scenario": "agent-provider-concurrent", "state": "mock-openai-provider" },
{ "scenario": "agent-provider-recovery", "state": "mock-openai-provider" },
{ "scenario": "agent-long-session", "state": "mock-openai-provider" },
{ "scenario": "failure-injection", "state": "broken-plugin-deps" },
@ -187,6 +188,11 @@
"state": "mock-openai-provider",
"timeoutMs": 180000
},
{
"scenario": "agent-provider-concurrent",
"state": "mock-openai-provider",
"timeoutMs": 240000
},
{
"scenario": "agent-provider-recovery",
"state": "mock-openai-provider",

View File

@ -0,0 +1,55 @@
{
"id": "agent-provider-concurrent",
"surface": "agent-message",
"title": "Agent Provider Concurrent Pressure",
"objective": "Prove OpenClaw can process multiple overlapping local agent turns through the provider path, keep the gateway healthy, return correct responses, and leave no leaked child processes.",
"tags": ["agent", "message", "provider-failure", "concurrency", "containment"],
"timeoutMs": 240000,
"auth": { "mode": "mock" },
"mockProvider": {
"mode": "concurrent-pressure",
"delayMs": 1500,
"concurrency": 3
},
"agent": {
"expectedText": "KOVA_AGENT_OK"
},
"thresholds": {
"gatewayReadyMs": 30000,
"agentTurnMs": 120000,
"providerFinalMs": 30000,
"providerRequestCountMin": 3,
"providerConcurrencyMin": 2,
"providerFailureHealthFailures": 0,
"agentContainmentHealthFailures": 0,
"agentProcessLeaks": 0,
"peakRssMb": 900,
"missingDependencyErrors": 0,
"pluginLoadFailures": 0
},
"phases": [
{
"id": "provision",
"title": "Provision Agent Env",
"intent": "Start a disposable OpenClaw gateway before applying concurrent provider pressure.",
"commands": ["ocm start {env} {startSelector} --json"],
"evidence": ["gateway port", "runtime binding", "startup readiness"]
},
{
"id": "concurrent-provider-turns",
"title": "Concurrent Provider Turns",
"intent": "Send several real local agent messages at the same time and verify all hit the provider and return correct responses.",
"commands": [
"node {kovaRoot}/support/run-concurrent-agent-turns.mjs --env {env} --count 3 --session-prefix kova-agent-provider-concurrent --message 'Reply with exact ASCII text KOVA_AGENT_OK only.' --expected-text KOVA_AGENT_OK --timeout 120"
],
"evidence": ["assistant responses", "provider request count", "provider overlap timing", "pre-provider timing", "role resource samples", "process leak snapshot"]
},
{
"id": "post-concurrency-health",
"title": "Post-Concurrency Gateway Health",
"intent": "Verify the gateway remains responsive after concurrent agent/provider work.",
"commands": ["ocm @{env} -- status", "ocm logs {env} --tail 300 --raw"],
"evidence": ["gateway status", "provider logs", "plugin errors", "memory after concurrent turns"]
}
]
}

View File

@ -563,11 +563,14 @@ function mockProviderPolicy(scenario, state) {
throw new Error(`mockProvider.mode must be one of ${[...mockProviderModes].join(", ")}`);
}
const policy = { mode };
for (const key of ["delayMs", "stallMs", "errorStatus"]) {
for (const key of ["delayMs", "stallMs", "errorStatus", "concurrency"]) {
if (raw[key] !== undefined) {
const value = Number(raw[key]);
if (!Number.isInteger(value) || value < 0) {
throw new Error(`mockProvider.${key} must be a non-negative integer`);
const valid = key === "concurrency"
? Number.isInteger(value) && value > 0
: Number.isInteger(value) && value >= 0;
if (!valid) {
throw new Error(`mockProvider.${key} must be a ${key === "concurrency" ? "positive" : "non-negative"} integer`);
}
policy[key] = value;
}
@ -580,7 +583,8 @@ function mockProviderDisplay(policy) {
mode: policy.mode,
delayMs: policy.delayMs ?? null,
stallMs: policy.stallMs ?? null,
errorStatus: policy.errorStatus ?? null
errorStatus: policy.errorStatus ?? null,
concurrency: policy.concurrency ?? null
};
}

View File

@ -656,6 +656,27 @@ function summarizeNumericField(items, field) {
};
}
function maxProviderRequestConcurrency(requests) {
const events = [];
for (const request of requests ?? []) {
if (typeof request.receivedAtEpochMs !== "number" || typeof request.respondedAtEpochMs !== "number") {
continue;
}
if (request.respondedAtEpochMs < request.receivedAtEpochMs) {
continue;
}
events.push({ time: request.receivedAtEpochMs, delta: 1 });
events.push({ time: request.respondedAtEpochMs, delta: -1 });
}
let current = 0;
let max = 0;
for (const event of events.toSorted((left, right) => left.time - right.time || right.delta - left.delta)) {
current += event.delta;
max = Math.max(max, current);
}
return max;
}
function percentile(sortedValues, percentileValue) {
if (sortedValues.length === 0) {
return null;
@ -732,6 +753,15 @@ function evaluateProviderSimulation({ turns, scenario, record, thresholds }) {
const slowObserved = mode === "slow"
? turns.some((turn) => typeof turn.providerFinalMs === "number" && typeof providerSlowMinMs === "number" && turn.providerFinalMs >= providerSlowMinMs)
: null;
const providerRequestCount = record.providerEvidence?.requestCount ?? turns.reduce((total, turn) => total + (turn.requestCount ?? 0), 0);
const providerRequestCountMin = thresholds.providerRequestCountMin ?? scenario.mockProvider?.concurrency ?? null;
const providerMaxConcurrency = maxProviderRequestConcurrency(record.providerEvidence?.requests ?? []);
const providerConcurrencyMin = thresholds.providerConcurrencyMin ?? (typeof scenario.mockProvider?.concurrency === "number" ? Math.min(2, scenario.mockProvider.concurrency) : null);
const requestCountOk = typeof providerRequestCountMin === "number" ? providerRequestCount >= providerRequestCountMin : null;
const overlapObserved = typeof providerConcurrencyMin === "number" ? providerMaxConcurrency >= providerConcurrencyMin : null;
const concurrentObserved = mode === "concurrent-pressure"
? requestCountOk === true && overlapObserved === true
: null;
return {
schemaVersion: "kova.agentProviderSimulation.v1",
@ -748,7 +778,14 @@ function evaluateProviderSimulation({ turns, scenario, record, thresholds }) {
finalGatewayState,
healthFailures,
healthLimit,
providerSlowMinMs
providerSlowMinMs,
providerRequestCount,
providerRequestCountMin,
providerMaxConcurrency,
providerConcurrencyMin,
requestCountOk,
overlapObserved,
concurrentObserved
};
}
@ -801,6 +838,15 @@ function checkProviderSimulation(violations, simulation) {
message: "mock provider error-then-recover mode did not prove agent recovery"
});
}
if (simulation.mode === "concurrent-pressure" && simulation.concurrentObserved !== true) {
violations.push({
kind: "provider-simulation",
metric: "providerConcurrentPressureObserved",
expected: `>= ${simulation.providerRequestCountMin ?? "configured concurrency"} provider requests and max in-flight >= ${simulation.providerConcurrencyMin ?? "configured overlap"}`,
actual: `requests=${simulation.providerRequestCount}, maxInFlight=${simulation.providerMaxConcurrency}`,
message: "mock provider concurrent-pressure mode did not produce enough overlapping provider work"
});
}
if (simulation.containmentOk !== true) {
violations.push({
kind: "provider-containment",
@ -842,6 +888,13 @@ function buildAgentFailureFixerSummary(latencyDiagnosis, providerSimulation, con
likelyOwner: "provider retry / agent recovery"
});
}
if (providerSimulation?.mode === "concurrent-pressure") {
items.push({
kind: "provider-concurrent-pressure",
summary: `Concurrent provider pressure produced ${providerSimulation.providerRequestCount ?? "unknown"} provider request(s), max in-flight ${providerSimulation.providerMaxConcurrency ?? "unknown"}; verify OpenClaw keeps gateway and agent sessions responsive under overlapping turns.`,
likelyOwner: "agent concurrency / provider scheduling"
});
}
if (latencyDiagnosis?.kind === "cold-pre-provider-stall" || latencyDiagnosis?.kind === "pre-provider-stall") {
items.push({
kind: "pre-provider-stall",
@ -1895,7 +1948,8 @@ function countDiagnosticMetric(record, key) {
}
function isAgentMessageCommand(command) {
return command.includes(" -- agent ") && command.includes("--message");
return (command.includes(" -- agent ") && command.includes("--message")) ||
command.includes("run-concurrent-agent-turns.mjs");
}
function extractAgentResponse(result) {

View File

@ -81,6 +81,9 @@ function validateMockProvider(mockProvider, prefix, errors) {
errors.push(`${prefix}.${key} must be a non-negative integer when set`);
}
}
if (mockProvider.concurrency !== undefined && (!Number.isInteger(mockProvider.concurrency) || mockProvider.concurrency <= 0)) {
errors.push(`${prefix}.concurrency must be a positive integer when set`);
}
}
function validatePhases(phases, errors) {

View File

@ -161,7 +161,10 @@ export function renderMarkdownReport(report) {
}
if (record.measurements.agentProviderSimulation?.expected) {
const sim = record.measurements.agentProviderSimulation;
lines.push(`- Provider simulation: ${sim.mode}; observed ${sim.observedIssue}; containment ${sim.containmentOk}; recovery ${sim.recoveryOk ?? "n/a"}`);
const concurrent = sim.concurrentObserved === null || sim.concurrentObserved === undefined
? ""
: `; concurrent requests ${sim.providerRequestCount}/${sim.providerRequestCountMin}, max in-flight ${sim.providerMaxConcurrency}/${sim.providerConcurrencyMin}, ok ${sim.concurrentObserved}`;
lines.push(`- Provider simulation: ${sim.mode}; observed ${sim.observedIssue}; containment ${sim.containmentOk}; recovery ${sim.recoveryOk ?? "n/a"}${concurrent}`);
}
if (record.measurements.agentFailureContainment) {
const containment = record.measurements.agentFailureContainment;

View File

@ -638,7 +638,8 @@ async function runScenarioCommand(command, context, envName, artifactDir, phaseI
}
function isAgentMessageCommand(command) {
return command.includes(" -- agent ") && command.includes("--message");
return (command.includes(" -- agent ") && command.includes("--message")) ||
command.includes("run-concurrent-agent-turns.mjs");
}
function agentLeakRoles() {

View File

@ -168,6 +168,8 @@ export async function runSelfCheck(flags = {}) {
checks.push(await mockProviderBehaviorCheck(tmp));
checks.push(providerFailureEvaluationCheck());
checks.push(agentColdWarmEvaluationCheck());
checks.push(await concurrentAgentRunnerCheck(tmp));
checks.push(providerConcurrentEvaluationCheck());
checks.push(await jsonCommandCheck(
"dry-run-state-lifecycle-json",
`node bin/kova.mjs run --target runtime:stable --scenario fresh-install --state missing-plugin-index --report-dir ${quoteShell(tmp)} --json`,
@ -1352,6 +1354,45 @@ async function mockProviderBehaviorCheck(tmp) {
}
}
async function concurrentAgentRunnerCheck(tmp) {
const fakeBin = join(tmp, "concurrent-agent-runner-bin");
const fakeOcm = join(fakeBin, "ocm");
await mkdir(fakeBin, { recursive: true });
await writeFile(fakeOcm, [
"#!/usr/bin/env node",
"process.stdout.write(JSON.stringify({ finalAssistantVisibleText: 'KOVA_AGENT_OK' }) + '\\n');"
].join("\n"), "utf8");
await chmod(fakeOcm, 0o755);
const command = `PATH=${quoteShell(fakeBin)}:$PATH node support/run-concurrent-agent-turns.mjs --env kova-self-check --count 2 --session-prefix kova-self-check-concurrent --message hi --expected-text KOVA_AGENT_OK --timeout 5`;
const result = await runCommand(command, { timeoutMs: 10000 });
try {
if (result.status !== 0) {
throw new Error(`concurrent agent runner failed: ${result.stderr || result.stdout}`);
}
const summary = JSON.parse(result.stdout);
assertEqual(summary.schemaVersion, "kova.concurrentAgentTurns.v1", "concurrent runner schema");
assertEqual(summary.ok, true, "concurrent runner ok");
assertEqual(summary.count, 2, "concurrent runner count");
assertEqual(summary.successCount, 2, "concurrent runner success count");
assertEqual(summary.turns.every((turn) => turn.expectedTextPresent === true), true, "all concurrent turns included expected text");
return {
id: "concurrent-agent-runner",
status: "PASS",
command,
durationMs: result.durationMs
};
} catch (error) {
return {
id: "concurrent-agent-runner",
status: "FAIL",
command,
durationMs: result.durationMs,
message: error.message
};
}
}
function providerFailureEvaluationCheck() {
try {
const recoverCommand = "ocm @kova -- agent --local --agent main --session-id kova-agent-provider-recovery --message hi --json";
@ -1465,6 +1506,102 @@ function providerFailureEvaluationCheck() {
}
}
function providerConcurrentEvaluationCheck() {
try {
const command = "node support/run-concurrent-agent-turns.mjs --env kova-self-check --count 3 --message hi --expected-text KOVA_AGENT_OK";
const record = {
scenario: "agent-provider-concurrent",
status: "PASS",
auth: { mode: "mock", source: "mock", providerId: "openai" },
phases: [
{
id: "concurrent-provider-turns",
results: [{
command,
status: 0,
timedOut: false,
startedAt: "2026-04-30T10:00:01.000Z",
startedAtEpochMs: 1777543201000,
finishedAt: "2026-04-30T10:00:05.000Z",
finishedAtEpochMs: 1777543205000,
durationMs: 4000,
stdout: "{\"finalAssistantVisibleText\":\"KOVA_AGENT_OK\",\"successCount\":3}",
stderr: "",
processSnapshots: {
leaks: {
schemaVersion: "kova.processLeakSummary.v1",
leakCount: 0,
leakedProcesses: [],
leaksByRole: {}
}
}
}],
metrics: { logs: zeroLogMetrics(), health: { ok: true } }
}
],
providerEvidence: {
available: true,
requestCount: 3,
requests: [1, 2, 3].map((index) => ({
requestId: `concurrent-provider-${index}`,
mode: "concurrent-pressure",
outcome: "completed",
errorClass: null,
receivedAt: `2026-04-30T10:00:02.${index}00Z`,
receivedAtEpochMs: 1777543202000 + (index * 100),
respondedAt: `2026-04-30T10:00:03.${index}00Z`,
respondedAtEpochMs: 1777543203000 + (index * 100),
firstByteLatencyMs: 1000,
firstChunkLatencyMs: 1000,
route: "/v1/responses",
model: "gpt-5.5",
stream: true,
status: 200,
statusClass: "2xx"
}))
},
finalMetrics: {
service: { gatewayState: "running" },
logs: zeroLogMetrics()
}
};
evaluateRecord(record, {
id: "agent-provider-concurrent",
mockProvider: { mode: "concurrent-pressure", delayMs: 1500, concurrency: 3 },
agent: { expectedText: "KOVA_AGENT_OK" },
thresholds: {
providerRequestCountMin: 3,
providerConcurrencyMin: 2,
providerFailureHealthFailures: 0,
agentProcessLeaks: 0
}
}, { surface: { thresholds: {} }, targetPlan: { kind: "npm" } });
assertEqual(record.status, "PASS", "provider concurrent scenario status");
assertEqual(record.measurements.agentProviderSimulation.mode, "concurrent-pressure", "provider concurrent mode");
assertEqual(record.measurements.agentProviderSimulation.concurrentObserved, true, "provider concurrent observed");
assertEqual(record.measurements.agentProviderSimulation.providerRequestCount, 3, "provider concurrent request count");
assertEqual(record.measurements.agentProviderSimulation.providerMaxConcurrency, 3, "provider max concurrency");
assertEqual(record.measurements.agentTurns[0].requestCount, 3, "concurrent turn provider request count");
assertEqual(record.measurements.agentTurns[0].responseOk, true, "concurrent response ok");
return {
id: "provider-concurrent-evaluation",
status: "PASS",
command: "evaluate synthetic concurrent provider pressure",
durationMs: 0
};
} catch (error) {
return {
id: "provider-concurrent-evaluation",
status: "FAIL",
command: "evaluate synthetic concurrent provider pressure",
durationMs: 0,
message: error.message
};
}
}
function agentColdWarmEvaluationCheck() {
try {
const coldCommand = "ocm @kova -- agent --local --agent main --session-id kova-agent-cold-warm --message hi --json";

View File

@ -0,0 +1,216 @@
#!/usr/bin/env node
import { spawn } from "node:child_process";
const options = parseArgs(process.argv.slice(2));
const envName = requiredString(options.env, "--env");
const count = positiveInteger(options.count ?? "3", "--count");
const sessionPrefix = requiredString(options.sessionPrefix ?? "kova-agent-concurrent", "--session-prefix");
const message = requiredString(options.message, "--message");
const expectedText = requiredString(options.expectedText ?? "KOVA_AGENT_OK", "--expected-text");
const timeoutSeconds = positiveInteger(options.timeout ?? "120", "--timeout");
if (!/^kova-[a-z0-9][a-z0-9-]*$/i.test(envName)) {
failUsage(`refusing to run concurrent agent turns against non-Kova env: ${JSON.stringify(envName)}`);
}
const startedAtEpochMs = Date.now();
const startedAt = new Date(startedAtEpochMs).toISOString();
const turns = await Promise.all(
Array.from({ length: count }, (_, index) => runTurn(index + 1))
);
const finishedAtEpochMs = Date.now();
const failed = turns.filter((turn) => turn.ok !== true);
const summary = {
schemaVersion: "kova.concurrentAgentTurns.v1",
ok: failed.length === 0,
env: envName,
count,
successCount: turns.length - failed.length,
failedCount: failed.length,
expectedText,
startedAt,
startedAtEpochMs,
finishedAt: new Date(finishedAtEpochMs).toISOString(),
finishedAtEpochMs,
durationMs: finishedAtEpochMs - startedAtEpochMs,
finalAssistantVisibleText: failed.length === 0 ? expectedText : null,
turns
};
process.stdout.write(`${JSON.stringify(summary)}\n`);
process.exit(summary.ok ? 0 : 1);
function runTurn(index) {
const sessionId = `${sessionPrefix}-${index}`;
const command = "ocm";
const args = [
`@${envName}`,
"--",
"agent",
"--local",
"--agent",
"main",
"--session-id",
sessionId,
"--message",
message,
"--thinking",
"off",
"--timeout",
String(timeoutSeconds),
"--json"
];
const commandText = [command, ...args].join(" ");
const turnStartedAtEpochMs = Date.now();
return new Promise((resolve) => {
const child = spawn(command, args, {
env: process.env,
shell: false,
stdio: ["ignore", "pipe", "pipe"]
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += chunk.toString();
});
child.stderr.on("data", (chunk) => {
stderr += chunk.toString();
});
child.on("error", (error) => {
const turnFinishedAtEpochMs = Date.now();
resolve({
index,
sessionId,
ok: false,
command: commandText,
status: 127,
signal: null,
durationMs: turnFinishedAtEpochMs - turnStartedAtEpochMs,
startedAt: new Date(turnStartedAtEpochMs).toISOString(),
startedAtEpochMs: turnStartedAtEpochMs,
finishedAt: new Date(turnFinishedAtEpochMs).toISOString(),
finishedAtEpochMs: turnFinishedAtEpochMs,
responseText: null,
expectedTextPresent: false,
error: error.message,
stdout: "",
stderr: truncate(error.message)
});
});
child.on("close", (status, signal) => {
const turnFinishedAtEpochMs = Date.now();
const responseText = extractResponseText(stdout);
const expectedTextPresent = responseText?.includes(expectedText) || stdout.includes(expectedText) || stderr.includes(expectedText);
resolve({
index,
sessionId,
ok: status === 0 && !signal && expectedTextPresent === true,
command: commandText,
status: status ?? 1,
signal,
durationMs: turnFinishedAtEpochMs - turnStartedAtEpochMs,
startedAt: new Date(turnStartedAtEpochMs).toISOString(),
startedAtEpochMs: turnStartedAtEpochMs,
finishedAt: new Date(turnFinishedAtEpochMs).toISOString(),
finishedAtEpochMs: turnFinishedAtEpochMs,
responseText,
expectedTextPresent,
stdout: truncate(stdout),
stderr: truncate(stderr)
});
});
});
}
function extractResponseText(stdout) {
const text = String(stdout ?? "").trim();
if (!text) {
return null;
}
try {
const parsed = JSON.parse(text);
const found = findFirstString(parsed, [
"finalAssistantVisibleText",
"finalAssistantRawText",
"text",
"reply"
]);
return typeof found === "string" && found.trim() ? found.trim() : null;
} catch {
const match = text.match(/"finalAssistant(?:Raw|Visible)Text"\s*:\s*"([^"]+)"/);
return match?.[1]?.trim() ?? null;
}
}
function findFirstString(value, keys) {
if (!value || typeof value !== "object") {
return null;
}
for (const key of keys) {
if (typeof value[key] === "string") {
return value[key];
}
}
for (const child of Object.values(value)) {
const nested = findFirstString(child, keys);
if (typeof nested === "string") {
return nested;
}
}
return null;
}
function parseArgs(args) {
const parsed = {};
for (let index = 0; index < args.length; index += 1) {
const arg = args[index];
if (!arg.startsWith("--")) {
failUsage(`unexpected positional argument ${JSON.stringify(arg)}`);
}
const key = arg.slice(2).replaceAll("-", "_");
const next = args[index + 1];
if (next === undefined || next.startsWith("--")) {
failUsage(`${arg} requires a value`);
}
parsed[key] = next;
index += 1;
}
return {
env: parsed.env,
count: parsed.count,
sessionPrefix: parsed.session_prefix,
message: parsed.message,
expectedText: parsed.expected_text,
timeout: parsed.timeout
};
}
function requiredString(value, flag) {
if (typeof value !== "string" || value.length === 0) {
failUsage(`${flag} is required`);
}
return value;
}
function positiveInteger(value, flag) {
const number = Number(value);
if (!Number.isInteger(number) || number <= 0) {
failUsage(`${flag} must be a positive integer`);
}
return number;
}
function truncate(value, limit = 4000) {
const text = String(value ?? "");
if (text.length <= limit) {
return text;
}
return `${text.slice(0, limit)}\n[truncated ${text.length - limit} chars]`;
}
function failUsage(message) {
process.stderr.write(`${message}\n`);
process.stderr.write("usage: run-concurrent-agent-turns.mjs --env <kova-env> --count <n> --message <text> [--expected-text <text>] [--session-prefix <prefix>] [--timeout <seconds>]\n");
process.exit(2);
}