aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/tests
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-10-05 07:04:29 +0100
committerGitHub <noreply@github.com>2025-10-05 07:04:29 +0100
commit74a1f7b6b600d4cb53352dde7def374c3125721a (patch)
tree70b79ebae61456f6ff2cb02a37351fa9817fb342 /packages/plugins-queue-restate/src/tests
parent4a580d713621f99abb8baabc9b847ce039d44842 (diff)
downloadkarakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration * add retry * add delay + idempotency * implement concurrency limits * add admin stats * add todos * add id provider * handle onComplete failures * add tests * add pub key and fix logging * add priorities * fail call after retries * more fixes * fix retries left * some refactoring * fix package.json * upgrade sdk * some test cleanups
Diffstat (limited to 'packages/plugins-queue-restate/src/tests')
-rw-r--r--packages/plugins-queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins-queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins-queue-restate/src/tests/utils.ts23
4 files changed, 342 insertions, 0 deletions
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml
new file mode 100644
index 00000000..f24c2921
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/docker-compose.yml
@@ -0,0 +1,8 @@
+services:
+ restate:
+ image: docker.restate.dev/restatedev/restate:1.5
+ ports:
+ - "${RESTATE_INGRESS_PORT:-8080}:8080"
+ - "${RESTATE_ADMIN_PORT:-9070}:9070"
+ extra_hosts:
+ - "host.docker.internal:host-gateway"
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
new file mode 100644
index 00000000..692581d6
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/queue.test.ts
@@ -0,0 +1,221 @@
+import {
+ afterAll,
+ afterEach,
+ beforeAll,
+ beforeEach,
+ describe,
+ expect,
+ inject,
+ it,
+} from "vitest";
+
+import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+
+import { AdminClient } from "../admin.js";
+import { RestateQueueProvider } from "../index.js";
+import { waitUntil } from "./utils.js";
+
+type TestAction =
+ | { type: "val"; val: number }
+ | { type: "err"; err: string }
+ | { type: "stall"; durSec: number };
+
+describe("Restate Queue Provider", () => {
+ let queueClient: QueueClient;
+ let queue: Queue<TestAction>;
+ let adminClient: AdminClient;
+
+ const testState = {
+ results: [] as number[],
+ errors: [] as string[],
+ inFlight: 0,
+ maxInFlight: 0,
+ };
+
+ async function waitUntilQueueEmpty() {
+ await waitUntil(
+ async () => {
+ const stats = await queue.stats();
+ return stats.pending + stats.pending_retry + stats.running === 0;
+ },
+ "Queue to be empty",
+ 60000,
+ );
+ }
+
+ beforeEach(async () => {
+ testState.results = [];
+ testState.errors = [];
+ testState.inFlight = 0;
+ testState.maxInFlight = 0;
+ });
+ afterEach(async () => {
+ await waitUntilQueueEmpty();
+ });
+
+ beforeAll(async () => {
+ const ingressPort = inject("restateIngressPort");
+ const adminPort = inject("restateAdminPort");
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ const provider = new RestateQueueProvider();
+ const client = await provider.getClient();
+
+ if (!client) {
+ throw new Error("Failed to create queue client");
+ }
+
+ queueClient = client;
+ adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
+
+ queue = queueClient.createQueue<TestAction>("test-queue", {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ });
+
+ queueClient.createRunner(
+ queue,
+ {
+ run: async (job) => {
+ testState.inFlight++;
+ testState.maxInFlight = Math.max(
+ testState.maxInFlight,
+ testState.inFlight,
+ );
+ const jobData = job.data;
+ switch (jobData.type) {
+ case "val":
+ testState.results.push(jobData.val);
+ break;
+ case "err":
+ throw new Error(jobData.err);
+ case "stall":
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.durSec * 1000),
+ );
+ break;
+ }
+ },
+ onError: async (job) => {
+ testState.inFlight--;
+ const jobData = job.data;
+ if (jobData && jobData.type === "err") {
+ testState.errors.push(jobData.err);
+ }
+ },
+ onComplete: async () => {
+ testState.inFlight--;
+ },
+ },
+ {
+ concurrency: 3,
+ timeoutSecs: 2,
+ pollIntervalMs: 0 /* Doesn't matter */,
+ },
+ );
+
+ await queueClient.prepare();
+ await queueClient.start();
+
+ await adminClient.upsertDeployment("http://host.docker.internal:9080");
+ }, 90000);
+
+ afterAll(async () => {
+ if (queueClient?.shutdown) {
+ await queueClient.shutdown();
+ }
+ });
+
+ it("should enqueue and process a job", async () => {
+ const jobId = await queue.enqueue({ type: "val", val: 42 });
+
+ expect(jobId).toBeDefined();
+ expect(typeof jobId).toBe("string");
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([42]);
+ }, 60000);
+
+ it("should process multiple jobs", async () => {
+ await queue.enqueue({ type: "val", val: 1 });
+ await queue.enqueue({ type: "val", val: 2 });
+ await queue.enqueue({ type: "val", val: 3 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results.length).toEqual(3);
+ expect(testState.results).toContain(1);
+ expect(testState.results).toContain(2);
+ expect(testState.results).toContain(3);
+ }, 60000);
+
+ it("should retry failed jobs", async () => {
+ await queue.enqueue({ type: "err", err: "Test error" });
+
+ await waitUntilQueueEmpty();
+
+ // Initial attempt + 3 retries
+ expect(testState.errors).toEqual([
+ "Test error",
+ "Test error",
+ "Test error",
+ "Test error",
+ ]);
+ }, 90000);
+
+ it("should use idempotency key", async () => {
+ const idempotencyKey = `test-${Date.now()}`;
+
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([200]);
+ }, 60000);
+
+ it("should handle concurrent jobs", async () => {
+ const promises = [];
+ for (let i = 300; i < 320; i++) {
+ promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
+ }
+ await Promise.all(promises);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.maxInFlight).toEqual(3);
+ }, 60000);
+
+ it("should handle priorities", async () => {
+ // Hog the queue first
+ await Promise.all([
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ ]);
+
+ // Then those will get reprioritized
+ await Promise.all([
+ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
+ queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
+ queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
+
+ queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
+ queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
+ queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
+ ]);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ // Then in order of increasing priority
+ 302, 301, 300, 200, 201, 202,
+ ]);
+ }, 60000);
+});
diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
new file mode 100644
index 00000000..7d9dea5c
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
@@ -0,0 +1,90 @@
+import { execSync } from "child_process";
+import net from "net";
+import path from "path";
+import { fileURLToPath } from "url";
+import type { GlobalSetupContext } from "vitest/node";
+
+import { waitUntil } from "../utils.js";
+
+async function getRandomPort(): Promise<number> {
+ const server = net.createServer();
+ return new Promise<number>((resolve, reject) => {
+ server.unref();
+ server.on("error", reject);
+ server.listen(0, () => {
+ const port = (server.address() as net.AddressInfo).port;
+ server.close(() => resolve(port));
+ });
+ });
+}
+
+async function waitForHealthy(
+ ingressPort: number,
+ adminPort: number,
+ timeout = 60000,
+): Promise<void> {
+ await waitUntil(
+ async () => {
+ const response = await fetch(`http://localhost:${adminPort}/health`);
+ return response.ok;
+ },
+ "Restate admin API is healthy",
+ timeout,
+ );
+
+ await waitUntil(
+ async () => {
+ const response = await fetch(
+ `http://localhost:${ingressPort}/restate/health`,
+ );
+ return response.ok;
+ },
+ "Restate ingress is healthy",
+ timeout,
+ );
+}
+
+export default async function ({ provide }: GlobalSetupContext) {
+ const __dirname = path.dirname(fileURLToPath(import.meta.url));
+ const ingressPort = await getRandomPort();
+ const adminPort = await getRandomPort();
+
+ console.log(
+ `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
+ );
+ execSync(`docker compose up -d`, {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ env: {
+ ...process.env,
+ RESTATE_INGRESS_PORT: ingressPort.toString(),
+ RESTATE_ADMIN_PORT: adminPort.toString(),
+ },
+ });
+
+ console.log("Waiting for Restate to become healthy...");
+ await waitForHealthy(ingressPort, adminPort);
+
+ provide("restateIngressPort", ingressPort);
+ provide("restateAdminPort", adminPort);
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ return async () => {
+ console.log("Stopping Restate...");
+ execSync("docker compose down", {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ });
+ return Promise.resolve();
+ };
+}
+
+declare module "vitest" {
+ export interface ProvidedContext {
+ restateIngressPort: number;
+ restateAdminPort: number;
+ }
+}
diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts
new file mode 100644
index 00000000..e02d2dee
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/utils.ts
@@ -0,0 +1,23 @@
+export async function waitUntil(
+ f: () => Promise<boolean>,
+ description: string,
+ timeoutMs = 60000,
+): Promise<void> {
+ const startTime = Date.now();
+
+ while (Date.now() - startTime < timeoutMs) {
+ console.log(`Waiting for ${description}...`);
+ try {
+ const res = await f();
+ if (res) {
+ console.log(`${description}: success`);
+ return;
+ }
+ } catch (error) {
+ console.log(`${description}: error, retrying...: ${error}`);
+ }
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+
+ throw new Error(`${description}: timeout after ${timeoutMs}ms`);
+}