aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/tests/queue.test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts221
1 files changed, 221 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
new file mode 100644
index 00000000..e59d47cb
--- /dev/null
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -0,0 +1,221 @@
+import {
+ afterAll,
+ afterEach,
+ beforeAll,
+ beforeEach,
+ describe,
+ expect,
+ inject,
+ it,
+} from "vitest";
+
+import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+
+import { AdminClient } from "../admin.js";
+import { RestateQueueProvider } from "../index.js";
+import { waitUntil } from "./utils.js";
+
+type TestAction =
+ | { type: "val"; val: number }
+ | { type: "err"; err: string }
+ | { type: "stall"; durSec: number };
+
+describe("Restate Queue Provider", () => {
+ let queueClient: QueueClient;
+ let queue: Queue<TestAction>;
+ let adminClient: AdminClient;
+
+ const testState = {
+ results: [] as number[],
+ errors: [] as string[],
+ inFlight: 0,
+ maxInFlight: 0,
+ };
+
+ async function waitUntilQueueEmpty() {
+ await waitUntil(
+ async () => {
+ const stats = await queue.stats();
+ return stats.pending + stats.pending_retry + stats.running === 0;
+ },
+ "Queue to be empty",
+ 60000,
+ );
+ }
+
+ beforeEach(async () => {
+ testState.results = [];
+ testState.errors = [];
+ testState.inFlight = 0;
+ testState.maxInFlight = 0;
+ });
+ afterEach(async () => {
+ await waitUntilQueueEmpty();
+ });
+
+ beforeAll(async () => {
+ const ingressPort = inject("restateIngressPort");
+ const adminPort = inject("restateAdminPort");
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ const provider = new RestateQueueProvider();
+ const client = await provider.getClient();
+
+ if (!client) {
+ throw new Error("Failed to create queue client");
+ }
+
+ queueClient = client;
+ adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
+
+ queue = queueClient.createQueue<TestAction>("test-queue", {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ });
+
+ queueClient.createRunner(
+ queue,
+ {
+ run: async (job) => {
+ testState.inFlight++;
+ testState.maxInFlight = Math.max(
+ testState.maxInFlight,
+ testState.inFlight,
+ );
+ const jobData = job.data;
+ switch (jobData.type) {
+ case "val":
+ testState.results.push(jobData.val);
+ break;
+ case "err":
+ throw new Error(jobData.err);
+ case "stall":
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.durSec * 1000),
+ );
+ break;
+ }
+ },
+ onError: async (job) => {
+ testState.inFlight--;
+ const jobData = job.data;
+ if (jobData && jobData.type === "err") {
+ testState.errors.push(jobData.err);
+ }
+ },
+ onComplete: async () => {
+ testState.inFlight--;
+ },
+ },
+ {
+ concurrency: 3,
+ timeoutSecs: 2,
+ pollIntervalMs: 0 /* Doesn't matter */,
+ },
+ );
+
+ await queueClient.prepare();
+ await queueClient.start();
+
+ await adminClient.upsertDeployment("http://host.docker.internal:9080");
+ }, 90000);
+
+ afterAll(async () => {
+ if (queueClient?.shutdown) {
+ await queueClient.shutdown();
+ }
+ });
+
+ it("should enqueue and process a job", async () => {
+ const jobId = await queue.enqueue({ type: "val", val: 42 });
+
+ expect(jobId).toBeDefined();
+ expect(typeof jobId).toBe("string");
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([42]);
+ }, 60000);
+
+ it("should process multiple jobs", async () => {
+ await queue.enqueue({ type: "val", val: 1 });
+ await queue.enqueue({ type: "val", val: 2 });
+ await queue.enqueue({ type: "val", val: 3 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results.length).toEqual(3);
+ expect(testState.results).toContain(1);
+ expect(testState.results).toContain(2);
+ expect(testState.results).toContain(3);
+ }, 60000);
+
+ it("should retry failed jobs", async () => {
+ await queue.enqueue({ type: "err", err: "Test error" });
+
+ await waitUntilQueueEmpty();
+
+ // Initial attempt + 3 retries
+ expect(testState.errors).toEqual([
+ "Test error",
+ "Test error",
+ "Test error",
+ "Test error",
+ ]);
+ }, 90000);
+
+ it("should use idempotency key", async () => {
+ const idempotencyKey = `test-${Date.now()}`;
+
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([200]);
+ }, 60000);
+
+ it("should handle concurrent jobs", async () => {
+ const promises = [];
+ for (let i = 300; i < 320; i++) {
+ promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
+ }
+ await Promise.all(promises);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.maxInFlight).toEqual(3);
+ }, 60000);
+
+ it("should handle priorities", async () => {
+ // Hog the queue first
+ await Promise.all([
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ ]);
+
+ // Then those will get reprioritized
+ await Promise.all([
+ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
+ queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
+ queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
+
+ queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
+ queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
+ queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
+ ]);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ // Lower numeric priority value should run first
+ 202, 201, 200, 300, 301, 302,
+ ]);
+ }, 60000);
+});