fix(fs): serialize same-process file locks
This commit is contained in:
parent
c9325a6a4a
commit
870df28717
@ -6,6 +6,7 @@
|
||||
|
||||
- Add cache-friendly `disableOAuth` support across headless runtime, CLI, daemon, proxy, and `callOnce` paths so callers can suppress interactive OAuth without losing connection reuse. (Issues #197, #199, #201, thanks @feniix)
|
||||
- Recover cleanly from renamed OAuth server entries, invalid refresh tokens, and stale dynamic client registrations without reusing unrelated same-URL credentials.
|
||||
- Prevent concurrent OAuth vault updates from briefly exposing empty lock files and losing credential entries under load.
|
||||
|
||||
### CLI
|
||||
|
||||
|
||||
117
src/fs-json.ts
117
src/fs-json.ts
@ -8,6 +8,7 @@ const LOCK_POLL_MS = 25;
|
||||
const MALFORMED_LOCK_STALE_MS = 1_000;
|
||||
const MAX_SYMLINK_DEPTH = 40;
|
||||
const DEFAULT_ATOMIC_FILE_MODE = 0o600;
|
||||
const localLockTails = new Map<string, Promise<void>>();
|
||||
|
||||
// readJsonFile reads a JSON file and returns undefined when the file does not exist.
|
||||
export async function readJsonFile<T = unknown>(filePath: string): Promise<T | undefined> {
|
||||
@ -64,49 +65,51 @@ export async function withFileLock<T>(
|
||||
options: { timeoutMs?: number } = {}
|
||||
): Promise<T> {
|
||||
const lockTargetPath = await resolvePathFollowingSymlinks(filePath);
|
||||
await fs.mkdir(path.dirname(lockTargetPath), { recursive: true });
|
||||
let lockPath = `${lockTargetPath}.lock`;
|
||||
const fallbackLockPath = lockTargetPath !== filePath ? `${filePath}.lock` : undefined;
|
||||
const timeoutMs = options.timeoutMs ?? DEFAULT_LOCK_TIMEOUT_MS;
|
||||
const startedAt = Date.now();
|
||||
let acquired = false;
|
||||
return withLocalLock(lockTargetPath, timeoutMs, async () => {
|
||||
await fs.mkdir(path.dirname(lockTargetPath), { recursive: true });
|
||||
let lockPath = `${lockTargetPath}.lock`;
|
||||
const fallbackLockPath = lockTargetPath !== filePath ? `${filePath}.lock` : undefined;
|
||||
let acquired = false;
|
||||
|
||||
while (!acquired) {
|
||||
try {
|
||||
await fs.writeFile(lockPath, `${process.pid}\n${new Date().toISOString()}\n`, {
|
||||
encoding: 'utf8',
|
||||
flag: 'wx',
|
||||
});
|
||||
acquired = true;
|
||||
break;
|
||||
} catch (error) {
|
||||
if (fallbackLockPath && lockPath !== fallbackLockPath && isPermissionError(error)) {
|
||||
await fs.mkdir(path.dirname(fallbackLockPath), { recursive: true });
|
||||
lockPath = fallbackLockPath;
|
||||
continue;
|
||||
while (!acquired) {
|
||||
try {
|
||||
await fs.writeFile(lockPath, `${process.pid}\n${new Date().toISOString()}\n`, {
|
||||
encoding: 'utf8',
|
||||
flag: 'wx',
|
||||
});
|
||||
acquired = true;
|
||||
break;
|
||||
} catch (error) {
|
||||
if (fallbackLockPath && lockPath !== fallbackLockPath && isPermissionError(error)) {
|
||||
await fs.mkdir(path.dirname(fallbackLockPath), { recursive: true });
|
||||
lockPath = fallbackLockPath;
|
||||
continue;
|
||||
}
|
||||
if ((error as NodeJS.ErrnoException).code !== 'EEXIST') {
|
||||
throw error;
|
||||
}
|
||||
if (await removeRecoverableLock(lockPath)) {
|
||||
continue;
|
||||
}
|
||||
if (Date.now() - startedAt > timeoutMs) {
|
||||
throw new Error(`Timed out waiting for file lock ${lockPath}`, { cause: error });
|
||||
}
|
||||
await sleep(LOCK_POLL_MS);
|
||||
}
|
||||
if ((error as NodeJS.ErrnoException).code !== 'EEXIST') {
|
||||
throw error;
|
||||
}
|
||||
if (await removeRecoverableLock(lockPath)) {
|
||||
continue;
|
||||
}
|
||||
if (Date.now() - startedAt > timeoutMs) {
|
||||
throw new Error(`Timed out waiting for file lock ${lockPath}`, { cause: error });
|
||||
}
|
||||
await sleep(LOCK_POLL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
await fs.unlink(lockPath).catch((error) => {
|
||||
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
await fs.unlink(lockPath).catch((error) => {
|
||||
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function isPermissionError(error: unknown): boolean {
|
||||
@ -118,6 +121,46 @@ async function sleep(ms: number): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async function withLocalLock<T>(key: string, timeoutMs: number, task: () => Promise<T>): Promise<T> {
|
||||
const previous = localLockTails.get(key) ?? Promise.resolve();
|
||||
let release!: () => void;
|
||||
const current = new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
const tail = previous.then(() => current);
|
||||
localLockTails.set(key, tail);
|
||||
try {
|
||||
await waitForLocalLock(previous, timeoutMs, key);
|
||||
return await task();
|
||||
} finally {
|
||||
release();
|
||||
void tail.then(() => {
|
||||
if (localLockTails.get(key) === tail) {
|
||||
localLockTails.delete(key);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForLocalLock(previous: Promise<void>, timeoutMs: number, key: string): Promise<void> {
|
||||
let timer: NodeJS.Timeout | undefined;
|
||||
try {
|
||||
await Promise.race([
|
||||
previous,
|
||||
new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(
|
||||
() => reject(new Error(`Timed out waiting for file lock ${key}.lock`)),
|
||||
Math.max(0, timeoutMs)
|
||||
);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveAtomicWriteTarget(filePath: string): Promise<{ path: string; mode?: number }> {
|
||||
try {
|
||||
const stats = await fs.lstat(filePath);
|
||||
|
||||
@ -238,6 +238,39 @@ describe('fs-json helpers', () => {
|
||||
await expect(fs.access(`${lockTarget}.lock`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('applies the timeout while waiting for a same-process lock', async () => {
|
||||
const lockTarget = path.join(tempDir, 'shared.json');
|
||||
let enter!: () => void;
|
||||
let unblock!: () => void;
|
||||
const entered = new Promise<void>((resolve) => {
|
||||
enter = resolve;
|
||||
});
|
||||
const blocked = new Promise<void>((resolve) => {
|
||||
unblock = resolve;
|
||||
});
|
||||
const holder = withFileLock(lockTarget, async () => {
|
||||
enter();
|
||||
await blocked;
|
||||
});
|
||||
await entered;
|
||||
|
||||
await expect(withFileLock(lockTarget, async () => {}, { timeoutMs: 50 })).rejects.toThrow(
|
||||
/Timed out waiting for file lock/
|
||||
);
|
||||
|
||||
let followerEntered = false;
|
||||
const follower = withFileLock(lockTarget, async () => {
|
||||
followerEntered = true;
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
expect(followerEntered).toBe(false);
|
||||
|
||||
unblock();
|
||||
await Promise.all([holder, follower]);
|
||||
expect(followerEntered).toBe(true);
|
||||
await expect(fs.access(`${lockTarget}.lock`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('recovers lock files left by dead processes', async () => {
|
||||
const lockTarget = path.join(tempDir, 'shared.json');
|
||||
await fs.writeFile(`${lockTarget}.lock`, '99999999\n2026-01-01T00:00:00.000Z\n', 'utf8');
|
||||
|
||||
Loading…
Reference in New Issue
Block a user