diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-05 07:04:29 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-05 07:04:29 +0100 |
| commit | 74a1f7b6b600d4cb53352dde7def374c3125721a (patch) | |
| tree | 70b79ebae61456f6ff2cb02a37351fa9817fb342 /packages/plugins-queue-restate/src/tests | |
| parent | 4a580d713621f99abb8baabc9b847ce039d44842 (diff) | |
| download | karakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst | |
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration
* add retry
* add delay + idempotency
* implement concurrency limits
* add admin stats
* add todos
* add id provider
* handle onComplete failures
* add tests
* add pub key and fix logging
* add priorities
* fail call after retries
* more fixes
* fix retries left
* some refactoring
* fix package.json
* upgrade sdk
* some test cleanups
Diffstat (limited to 'packages/plugins-queue-restate/src/tests')
4 files changed, 342 insertions, 0 deletions
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml new file mode 100644 index 00000000..f24c2921 --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/docker-compose.yml @@ -0,0 +1,8 @@ +services: + restate: + image: docker.restate.dev/restatedev/restate:1.5 + ports: + - "${RESTATE_INGRESS_PORT:-8080}:8080" + - "${RESTATE_ADMIN_PORT:-9070}:9070" + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts new file mode 100644 index 00000000..692581d6 --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/queue.test.ts @@ -0,0 +1,221 @@ +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + inject, + it, +} from "vitest"; + +import type { Queue, QueueClient } from "@karakeep/shared/queueing"; + +import { AdminClient } from "../admin.js"; +import { RestateQueueProvider } from "../index.js"; +import { waitUntil } from "./utils.js"; + +type TestAction = + | { type: "val"; val: number } + | { type: "err"; err: string } + | { type: "stall"; durSec: number }; + +describe("Restate Queue Provider", () => { + let queueClient: QueueClient; + let queue: Queue<TestAction>; + let adminClient: AdminClient; + + const testState = { + results: [] as number[], + errors: [] as string[], + inFlight: 0, + maxInFlight: 0, + }; + + async function waitUntilQueueEmpty() { + await waitUntil( + async () => { + const stats = await queue.stats(); + return stats.pending + stats.pending_retry + stats.running === 0; + }, + "Queue to be empty", + 60000, + ); + } + + beforeEach(async () => { + testState.results = []; + testState.errors = []; + testState.inFlight = 0; + testState.maxInFlight = 0; + }); + afterEach(async () => { + await waitUntilQueueEmpty(); + }); + + beforeAll(async () => { + const ingressPort = inject("restateIngressPort"); + const adminPort = inject("restateAdminPort"); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + const provider = new RestateQueueProvider(); + const client = await provider.getClient(); + + if (!client) { + throw new Error("Failed to create queue client"); + } + + queueClient = client; + adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); + + queue = queueClient.createQueue<TestAction>("test-queue", { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }); + + queueClient.createRunner( + queue, + { + run: async (job) => { + testState.inFlight++; + testState.maxInFlight = Math.max( + testState.maxInFlight, + testState.inFlight, + ); + const jobData = job.data; + switch (jobData.type) { + case "val": + testState.results.push(jobData.val); + break; + case "err": + throw new Error(jobData.err); + case "stall": + await new Promise((resolve) => + setTimeout(resolve, jobData.durSec * 1000), + ); + break; + } + }, + onError: async (job) => { + testState.inFlight--; + const jobData = job.data; + if (jobData && jobData.type === "err") { + testState.errors.push(jobData.err); + } + }, + onComplete: async () => { + testState.inFlight--; + }, + }, + { + concurrency: 3, + timeoutSecs: 2, + pollIntervalMs: 0 /* Doesn't matter */, + }, + ); + + await queueClient.prepare(); + await queueClient.start(); + + await adminClient.upsertDeployment("http://host.docker.internal:9080"); + }, 90000); + + afterAll(async () => { + if (queueClient?.shutdown) { + await queueClient.shutdown(); + } + }); + + it("should enqueue and process a job", async () => { + const jobId = await queue.enqueue({ type: "val", val: 42 }); + + expect(jobId).toBeDefined(); + expect(typeof jobId).toBe("string"); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([42]); + }, 60000); + + it("should process multiple jobs", async () => { + await queue.enqueue({ type: "val", val: 1 }); + await queue.enqueue({ type: "val", val: 2 }); + await queue.enqueue({ type: "val", val: 3 }); + + await waitUntilQueueEmpty(); + + expect(testState.results.length).toEqual(3); + expect(testState.results).toContain(1); + expect(testState.results).toContain(2); + expect(testState.results).toContain(3); + }, 60000); + + it("should retry failed jobs", async () => { + await queue.enqueue({ type: "err", err: "Test error" }); + + await waitUntilQueueEmpty(); + + // Initial attempt + 3 retries + expect(testState.errors).toEqual([ + "Test error", + "Test error", + "Test error", + "Test error", + ]); + }, 90000); + + it("should use idempotency key", async () => { + const idempotencyKey = `test-${Date.now()}`; + + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([200]); + }, 60000); + + it("should handle concurrent jobs", async () => { + const promises = []; + for (let i = 300; i < 320; i++) { + promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); + } + await Promise.all(promises); + + await waitUntilQueueEmpty(); + + expect(testState.maxInFlight).toEqual(3); + }, 60000); + + it("should handle priorities", async () => { + // Hog the queue first + await Promise.all([ + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + ]); + + // Then those will get reprioritized + await Promise.all([ + queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), + queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), + queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), + + queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), + queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), + queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), + ]); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + // Then in order of increasing priority + 302, 301, 300, 200, 201, 202, + ]); + }, 60000); +}); diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts new file mode 100644 index 00000000..7d9dea5c --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts @@ -0,0 +1,90 @@ +import { execSync } from "child_process"; +import net from "net"; +import path from "path"; +import { fileURLToPath } from "url"; +import type { GlobalSetupContext } from "vitest/node"; + +import { waitUntil } from "../utils.js"; + +async function getRandomPort(): Promise<number> { + const server = net.createServer(); + return new Promise<number>((resolve, reject) => { + server.unref(); + server.on("error", reject); + server.listen(0, () => { + const port = (server.address() as net.AddressInfo).port; + server.close(() => resolve(port)); + }); + }); +} + +async function waitForHealthy( + ingressPort: number, + adminPort: number, + timeout = 60000, +): Promise<void> { + await waitUntil( + async () => { + const response = await fetch(`http://localhost:${adminPort}/health`); + return response.ok; + }, + "Restate admin API is healthy", + timeout, + ); + + await waitUntil( + async () => { + const response = await fetch( + `http://localhost:${ingressPort}/restate/health`, + ); + return response.ok; + }, + "Restate ingress is healthy", + timeout, + ); +} + +export default async function ({ provide }: GlobalSetupContext) { + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const ingressPort = await getRandomPort(); + const adminPort = await getRandomPort(); + + console.log( + `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, + ); + execSync(`docker compose up -d`, { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + env: { + ...process.env, + RESTATE_INGRESS_PORT: ingressPort.toString(), + RESTATE_ADMIN_PORT: adminPort.toString(), + }, + }); + + console.log("Waiting for Restate to become healthy..."); + await waitForHealthy(ingressPort, adminPort); + + provide("restateIngressPort", ingressPort); + provide("restateAdminPort", adminPort); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + return async () => { + console.log("Stopping Restate..."); + execSync("docker compose down", { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + }); + return Promise.resolve(); + }; +} + +declare module "vitest" { + export interface ProvidedContext { + restateIngressPort: number; + restateAdminPort: number; + } +} diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts new file mode 100644 index 00000000..e02d2dee --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/utils.ts @@ -0,0 +1,23 @@ +export async function waitUntil( + f: () => Promise<boolean>, + description: string, + timeoutMs = 60000, +): Promise<void> { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + console.log(`Waiting for ${description}...`); + try { + const res = await f(); + if (res) { + console.log(`${description}: success`); + return; + } + } catch (error) { + console.log(`${description}: error, retrying...: ${error}`); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + throw new Error(`${description}: timeout after ${timeoutMs}ms`); +} |
