aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/tests/queue.test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src/tests/queue.test.ts')
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
1 files changed, 0 insertions, 221 deletions
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
deleted file mode 100644
index e59d47cb..00000000
--- a/packages/plugins-queue-restate/src/tests/queue.test.ts
+++ /dev/null
@@ -1,221 +0,0 @@
-import {
- afterAll,
- afterEach,
- beforeAll,
- beforeEach,
- describe,
- expect,
- inject,
- it,
-} from "vitest";
-
-import type { Queue, QueueClient } from "@karakeep/shared/queueing";
-
-import { AdminClient } from "../admin.js";
-import { RestateQueueProvider } from "../index.js";
-import { waitUntil } from "./utils.js";
-
-type TestAction =
- | { type: "val"; val: number }
- | { type: "err"; err: string }
- | { type: "stall"; durSec: number };
-
-describe("Restate Queue Provider", () => {
- let queueClient: QueueClient;
- let queue: Queue<TestAction>;
- let adminClient: AdminClient;
-
- const testState = {
- results: [] as number[],
- errors: [] as string[],
- inFlight: 0,
- maxInFlight: 0,
- };
-
- async function waitUntilQueueEmpty() {
- await waitUntil(
- async () => {
- const stats = await queue.stats();
- return stats.pending + stats.pending_retry + stats.running === 0;
- },
- "Queue to be empty",
- 60000,
- );
- }
-
- beforeEach(async () => {
- testState.results = [];
- testState.errors = [];
- testState.inFlight = 0;
- testState.maxInFlight = 0;
- });
- afterEach(async () => {
- await waitUntilQueueEmpty();
- });
-
- beforeAll(async () => {
- const ingressPort = inject("restateIngressPort");
- const adminPort = inject("restateAdminPort");
-
- process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
- process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
- process.env.RESTATE_LISTEN_PORT = "9080";
-
- const provider = new RestateQueueProvider();
- const client = await provider.getClient();
-
- if (!client) {
- throw new Error("Failed to create queue client");
- }
-
- queueClient = client;
- adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
-
- queue = queueClient.createQueue<TestAction>("test-queue", {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
- });
-
- queueClient.createRunner(
- queue,
- {
- run: async (job) => {
- testState.inFlight++;
- testState.maxInFlight = Math.max(
- testState.maxInFlight,
- testState.inFlight,
- );
- const jobData = job.data;
- switch (jobData.type) {
- case "val":
- testState.results.push(jobData.val);
- break;
- case "err":
- throw new Error(jobData.err);
- case "stall":
- await new Promise((resolve) =>
- setTimeout(resolve, jobData.durSec * 1000),
- );
- break;
- }
- },
- onError: async (job) => {
- testState.inFlight--;
- const jobData = job.data;
- if (jobData && jobData.type === "err") {
- testState.errors.push(jobData.err);
- }
- },
- onComplete: async () => {
- testState.inFlight--;
- },
- },
- {
- concurrency: 3,
- timeoutSecs: 2,
- pollIntervalMs: 0 /* Doesn't matter */,
- },
- );
-
- await queueClient.prepare();
- await queueClient.start();
-
- await adminClient.upsertDeployment("http://host.docker.internal:9080");
- }, 90000);
-
- afterAll(async () => {
- if (queueClient?.shutdown) {
- await queueClient.shutdown();
- }
- });
-
- it("should enqueue and process a job", async () => {
- const jobId = await queue.enqueue({ type: "val", val: 42 });
-
- expect(jobId).toBeDefined();
- expect(typeof jobId).toBe("string");
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([42]);
- }, 60000);
-
- it("should process multiple jobs", async () => {
- await queue.enqueue({ type: "val", val: 1 });
- await queue.enqueue({ type: "val", val: 2 });
- await queue.enqueue({ type: "val", val: 3 });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results.length).toEqual(3);
- expect(testState.results).toContain(1);
- expect(testState.results).toContain(2);
- expect(testState.results).toContain(3);
- }, 60000);
-
- it("should retry failed jobs", async () => {
- await queue.enqueue({ type: "err", err: "Test error" });
-
- await waitUntilQueueEmpty();
-
- // Initial attempt + 3 retries
- expect(testState.errors).toEqual([
- "Test error",
- "Test error",
- "Test error",
- "Test error",
- ]);
- }, 90000);
-
- it("should use idempotency key", async () => {
- const idempotencyKey = `test-${Date.now()}`;
-
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([200]);
- }, 60000);
-
- it("should handle concurrent jobs", async () => {
- const promises = [];
- for (let i = 300; i < 320; i++) {
- promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
- }
- await Promise.all(promises);
-
- await waitUntilQueueEmpty();
-
- expect(testState.maxInFlight).toEqual(3);
- }, 60000);
-
- it("should handle priorities", async () => {
- // Hog the queue first
- await Promise.all([
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
- ]);
-
- // Then those will get reprioritized
- await Promise.all([
- queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
- queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
- queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
-
- queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
- queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
- queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
- ]);
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([
- // Lower numeric priority value should run first
- 202, 201, 200, 300, 301, 302,
- ]);
- }, 60000);
-});