aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src/tests')
-rw-r--r--packages/plugins-queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins-queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins-queue-restate/src/tests/utils.ts23
4 files changed, 0 insertions, 342 deletions
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml
deleted file mode 100644
index f24c2921..00000000
--- a/packages/plugins-queue-restate/src/tests/docker-compose.yml
+++ /dev/null
@@ -1,8 +0,0 @@
-services:
- restate:
- image: docker.restate.dev/restatedev/restate:1.5
- ports:
- - "${RESTATE_INGRESS_PORT:-8080}:8080"
- - "${RESTATE_ADMIN_PORT:-9070}:9070"
- extra_hosts:
- - "host.docker.internal:host-gateway"
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
deleted file mode 100644
index e59d47cb..00000000
--- a/packages/plugins-queue-restate/src/tests/queue.test.ts
+++ /dev/null
@@ -1,221 +0,0 @@
-import {
- afterAll,
- afterEach,
- beforeAll,
- beforeEach,
- describe,
- expect,
- inject,
- it,
-} from "vitest";
-
-import type { Queue, QueueClient } from "@karakeep/shared/queueing";
-
-import { AdminClient } from "../admin.js";
-import { RestateQueueProvider } from "../index.js";
-import { waitUntil } from "./utils.js";
-
-type TestAction =
- | { type: "val"; val: number }
- | { type: "err"; err: string }
- | { type: "stall"; durSec: number };
-
-describe("Restate Queue Provider", () => {
- let queueClient: QueueClient;
- let queue: Queue<TestAction>;
- let adminClient: AdminClient;
-
- const testState = {
- results: [] as number[],
- errors: [] as string[],
- inFlight: 0,
- maxInFlight: 0,
- };
-
- async function waitUntilQueueEmpty() {
- await waitUntil(
- async () => {
- const stats = await queue.stats();
- return stats.pending + stats.pending_retry + stats.running === 0;
- },
- "Queue to be empty",
- 60000,
- );
- }
-
- beforeEach(async () => {
- testState.results = [];
- testState.errors = [];
- testState.inFlight = 0;
- testState.maxInFlight = 0;
- });
- afterEach(async () => {
- await waitUntilQueueEmpty();
- });
-
- beforeAll(async () => {
- const ingressPort = inject("restateIngressPort");
- const adminPort = inject("restateAdminPort");
-
- process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
- process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
- process.env.RESTATE_LISTEN_PORT = "9080";
-
- const provider = new RestateQueueProvider();
- const client = await provider.getClient();
-
- if (!client) {
- throw new Error("Failed to create queue client");
- }
-
- queueClient = client;
- adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
-
- queue = queueClient.createQueue<TestAction>("test-queue", {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
- });
-
- queueClient.createRunner(
- queue,
- {
- run: async (job) => {
- testState.inFlight++;
- testState.maxInFlight = Math.max(
- testState.maxInFlight,
- testState.inFlight,
- );
- const jobData = job.data;
- switch (jobData.type) {
- case "val":
- testState.results.push(jobData.val);
- break;
- case "err":
- throw new Error(jobData.err);
- case "stall":
- await new Promise((resolve) =>
- setTimeout(resolve, jobData.durSec * 1000),
- );
- break;
- }
- },
- onError: async (job) => {
- testState.inFlight--;
- const jobData = job.data;
- if (jobData && jobData.type === "err") {
- testState.errors.push(jobData.err);
- }
- },
- onComplete: async () => {
- testState.inFlight--;
- },
- },
- {
- concurrency: 3,
- timeoutSecs: 2,
- pollIntervalMs: 0 /* Doesn't matter */,
- },
- );
-
- await queueClient.prepare();
- await queueClient.start();
-
- await adminClient.upsertDeployment("http://host.docker.internal:9080");
- }, 90000);
-
- afterAll(async () => {
- if (queueClient?.shutdown) {
- await queueClient.shutdown();
- }
- });
-
- it("should enqueue and process a job", async () => {
- const jobId = await queue.enqueue({ type: "val", val: 42 });
-
- expect(jobId).toBeDefined();
- expect(typeof jobId).toBe("string");
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([42]);
- }, 60000);
-
- it("should process multiple jobs", async () => {
- await queue.enqueue({ type: "val", val: 1 });
- await queue.enqueue({ type: "val", val: 2 });
- await queue.enqueue({ type: "val", val: 3 });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results.length).toEqual(3);
- expect(testState.results).toContain(1);
- expect(testState.results).toContain(2);
- expect(testState.results).toContain(3);
- }, 60000);
-
- it("should retry failed jobs", async () => {
- await queue.enqueue({ type: "err", err: "Test error" });
-
- await waitUntilQueueEmpty();
-
- // Initial attempt + 3 retries
- expect(testState.errors).toEqual([
- "Test error",
- "Test error",
- "Test error",
- "Test error",
- ]);
- }, 90000);
-
- it("should use idempotency key", async () => {
- const idempotencyKey = `test-${Date.now()}`;
-
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([200]);
- }, 60000);
-
- it("should handle concurrent jobs", async () => {
- const promises = [];
- for (let i = 300; i < 320; i++) {
- promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
- }
- await Promise.all(promises);
-
- await waitUntilQueueEmpty();
-
- expect(testState.maxInFlight).toEqual(3);
- }, 60000);
-
- it("should handle priorities", async () => {
- // Hog the queue first
- await Promise.all([
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
- ]);
-
- // Then those will get reprioritized
- await Promise.all([
- queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
- queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
- queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
-
- queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
- queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
- queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
- ]);
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([
- // Lower numeric priority value should run first
- 202, 201, 200, 300, 301, 302,
- ]);
- }, 60000);
-});
diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
deleted file mode 100644
index 7d9dea5c..00000000
--- a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
+++ /dev/null
@@ -1,90 +0,0 @@
-import { execSync } from "child_process";
-import net from "net";
-import path from "path";
-import { fileURLToPath } from "url";
-import type { GlobalSetupContext } from "vitest/node";
-
-import { waitUntil } from "../utils.js";
-
-async function getRandomPort(): Promise<number> {
- const server = net.createServer();
- return new Promise<number>((resolve, reject) => {
- server.unref();
- server.on("error", reject);
- server.listen(0, () => {
- const port = (server.address() as net.AddressInfo).port;
- server.close(() => resolve(port));
- });
- });
-}
-
-async function waitForHealthy(
- ingressPort: number,
- adminPort: number,
- timeout = 60000,
-): Promise<void> {
- await waitUntil(
- async () => {
- const response = await fetch(`http://localhost:${adminPort}/health`);
- return response.ok;
- },
- "Restate admin API is healthy",
- timeout,
- );
-
- await waitUntil(
- async () => {
- const response = await fetch(
- `http://localhost:${ingressPort}/restate/health`,
- );
- return response.ok;
- },
- "Restate ingress is healthy",
- timeout,
- );
-}
-
-export default async function ({ provide }: GlobalSetupContext) {
- const __dirname = path.dirname(fileURLToPath(import.meta.url));
- const ingressPort = await getRandomPort();
- const adminPort = await getRandomPort();
-
- console.log(
- `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
- );
- execSync(`docker compose up -d`, {
- cwd: path.join(__dirname, ".."),
- stdio: "ignore",
- env: {
- ...process.env,
- RESTATE_INGRESS_PORT: ingressPort.toString(),
- RESTATE_ADMIN_PORT: adminPort.toString(),
- },
- });
-
- console.log("Waiting for Restate to become healthy...");
- await waitForHealthy(ingressPort, adminPort);
-
- provide("restateIngressPort", ingressPort);
- provide("restateAdminPort", adminPort);
-
- process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
- process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
- process.env.RESTATE_LISTEN_PORT = "9080";
-
- return async () => {
- console.log("Stopping Restate...");
- execSync("docker compose down", {
- cwd: path.join(__dirname, ".."),
- stdio: "ignore",
- });
- return Promise.resolve();
- };
-}
-
-declare module "vitest" {
- export interface ProvidedContext {
- restateIngressPort: number;
- restateAdminPort: number;
- }
-}
diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts
deleted file mode 100644
index e02d2dee..00000000
--- a/packages/plugins-queue-restate/src/tests/utils.ts
+++ /dev/null
@@ -1,23 +0,0 @@
-export async function waitUntil(
- f: () => Promise<boolean>,
- description: string,
- timeoutMs = 60000,
-): Promise<void> {
- const startTime = Date.now();
-
- while (Date.now() - startTime < timeoutMs) {
- console.log(`Waiting for ${description}...`);
- try {
- const res = await f();
- if (res) {
- console.log(`${description}: success`);
- return;
- }
- } catch (error) {
- console.log(`${description}: error, retrying...: ${error}`);
- }
- await new Promise((resolve) => setTimeout(resolve, 1000));
- }
-
- throw new Error(`${description}: timeout after ${timeoutMs}ms`);
-}