fs-safe/test/json-durable-queue.test.ts
2026-05-06 22:32:09 +01:00

98 lines
3.4 KiB
TypeScript

import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
ackJsonDurableQueueEntry,
ensureJsonDurableQueueDirs,
jsonDurableQueueEntryExists,
loadJsonDurableQueueEntry,
loadPendingJsonDurableQueueEntries,
moveJsonDurableQueueEntryToFailed,
resolveJsonDurableQueueEntryPaths,
writeJsonDurableQueueEntry,
} from "../src/json-durable-queue.js";
let root: string;
beforeEach(async () => {
root = await fs.mkdtemp(path.join(os.tmpdir(), "fs-safe-json-queue-"));
});
afterEach(async () => {
await fs.rm(root, { recursive: true, force: true });
});
describe("durable JSON queues", () => {
it("writes, migrates, loads, acks, and moves entries", async () => {
const queueDir = path.join(root, "queue");
const failedDir = path.join(root, "failed");
await ensureJsonDurableQueueDirs({ queueDir, failedDir });
const paths = resolveJsonDurableQueueEntryPaths(queueDir, "entry-1");
await writeJsonDurableQueueEntry({
filePath: paths.jsonPath,
entry: { id: "entry-1", version: 1 },
tempPrefix: "queue",
});
await expect(jsonDurableQueueEntryExists(paths.jsonPath)).resolves.toBe(true);
await expect(
loadJsonDurableQueueEntry<{ id: string; version: number }>({
paths,
tempPrefix: "queue",
read: async (entry) => ({ entry: { ...entry, version: 2 }, migrated: true }),
}),
).resolves.toEqual({ id: "entry-1", version: 2 });
await expect(fs.readFile(paths.jsonPath, "utf8")).resolves.toContain("\"version\": 2");
await expect(
loadPendingJsonDurableQueueEntries<{ id: string; version: number }>({
queueDir,
tempPrefix: "queue",
}),
).resolves.toEqual([{ id: "entry-1", version: 2 }]);
await ackJsonDurableQueueEntry(paths);
await expect(jsonDurableQueueEntryExists(paths.jsonPath)).resolves.toBe(false);
await writeJsonDurableQueueEntry({
filePath: paths.jsonPath,
entry: { id: "entry-1", version: 3 },
tempPrefix: "queue",
});
await moveJsonDurableQueueEntryToFailed({ queueDir, failedDir, id: "entry-1" });
await expect(fs.readFile(path.join(failedDir, "entry-1.json"), "utf8")).resolves.toContain(
"\"version\": 3",
);
});
it("skips bad pending entries and cleans stale temp files", async () => {
const queueDir = path.join(root, "queue");
const failedDir = path.join(root, "failed");
await ensureJsonDurableQueueDirs({ queueDir, failedDir });
await fs.writeFile(path.join(queueDir, "good.json"), JSON.stringify({ ok: true }), "utf8");
await fs.writeFile(path.join(queueDir, "bad.json"), "{", "utf8");
const tempPath = path.join(queueDir, "orphan.tmp");
await fs.writeFile(tempPath, "tmp", "utf8");
const old = new Date(Date.now() - 10_000);
await fs.utimes(tempPath, old, old);
await expect(
loadPendingJsonDurableQueueEntries<{ ok: boolean }>({
queueDir,
tempPrefix: "queue",
cleanupTmpMaxAgeMs: 1,
}),
).resolves.toEqual([{ ok: true }]);
await expect(fs.access(tempPath)).rejects.toMatchObject({ code: "ENOENT" });
});
it("rejects whitespace-padded queue ids", async () => {
const queueDir = path.join(root, "queue");
expect(() => resolveJsonDurableQueueEntryPaths(queueDir, "job ")).toThrow();
expect(() => resolveJsonDurableQueueEntryPaths(queueDir, " job")).toThrow();
});
});