aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src')
-rw-r--r--packages/plugins-queue-restate/src/admin.ts75
-rw-r--r--packages/plugins-queue-restate/src/env.ts13
-rw-r--r--packages/plugins-queue-restate/src/idProvider.ts18
-rw-r--r--packages/plugins-queue-restate/src/index.ts201
-rw-r--r--packages/plugins-queue-restate/src/semaphore.ts105
-rw-r--r--packages/plugins-queue-restate/src/service.ts133
-rw-r--r--packages/plugins-queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins-queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins-queue-restate/src/tests/utils.ts23
10 files changed, 887 insertions, 0 deletions
diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts
new file mode 100644
index 00000000..dddc8f00
--- /dev/null
+++ b/packages/plugins-queue-restate/src/admin.ts
@@ -0,0 +1,75 @@
+import { z } from "zod";
+
+export class AdminClient {
+ constructor(private addr: string) {}
+
+ async upsertDeployment(deploymentAddr: string) {
+ const res = await fetch(`${this.addr}/deployments`, {
+ method: "POST",
+ body: JSON.stringify({
+ uri: deploymentAddr,
+ force: true,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to upsert deployment: ${res.status}`);
+ }
+ }
+
+ async getStats(serviceName: string) {
+ const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`;
+ const res = await fetch(`${this.addr}/query`, {
+ method: "POST",
+ body: JSON.stringify({
+ query,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to get stats: ${res.status}`);
+ }
+ const zStatus = z.enum([
+ "pending",
+ "scheduled",
+ "ready",
+ "running",
+ "paused",
+ "backing-off",
+ "suspended",
+ "completed",
+ ]);
+ const zSchema = z.object({
+ rows: z.array(
+ z.object({
+ status: zStatus,
+ count: z.number(),
+ }),
+ ),
+ });
+
+ return zSchema.parse(await res.json()).rows.reduce(
+ (acc, cur) => {
+ acc[cur.status] = cur.count;
+ return acc;
+ },
+ {
+ pending: 0,
+ scheduled: 0,
+ ready: 0,
+ running: 0,
+ paused: 0,
+ "backing-off": 0,
+ suspended: 0,
+ completed: 0,
+ } as Record<z.infer<typeof zStatus>, number>,
+ );
+ }
+}
diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts
new file mode 100644
index 00000000..01175e86
--- /dev/null
+++ b/packages/plugins-queue-restate/src/env.ts
@@ -0,0 +1,13 @@
+import { z } from "zod";
+
+export const envConfig = z
+ .object({
+ RESTATE_LISTEN_PORT: z.coerce.number().optional(),
+ RESTATE_INGRESS_ADDR: z
+ .string()
+ .optional()
+ .default("http://localhost:8080"),
+ RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
+ RESTATE_PUB_KEY: z.string().optional(),
+ })
+ .parse(process.env);
diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts
new file mode 100644
index 00000000..72ebc860
--- /dev/null
+++ b/packages/plugins-queue-restate/src/idProvider.ts
@@ -0,0 +1,18 @@
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+export const idProvider = object({
+ name: "IdProvider",
+ handlers: {
+ get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => {
+ const state = (await ctx.get("nextId")) ?? 0;
+ ctx.set("nextId", state + 1);
+ return state;
+ },
+ },
+});
+
+export async function genId(ctx: Context) {
+ return ctx
+ .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider")
+ .get();
+}
diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts
new file mode 100644
index 00000000..bedc26af
--- /dev/null
+++ b/packages/plugins-queue-restate/src/index.ts
@@ -0,0 +1,201 @@
+import * as restate from "@restatedev/restate-sdk";
+import * as restateClient from "@restatedev/restate-sdk-clients";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+
+import { AdminClient } from "./admin";
+import { envConfig } from "./env";
+import { idProvider } from "./idProvider";
+import { semaphore } from "./semaphore";
+import { buildRestateService } from "./service";
+
+class RestateQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly client: restateClient.Ingress,
+ private readonly adminClient: AdminClient,
+ public readonly opts: QueueOptions,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ interface MyService {
+ run: (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => Promise<void>;
+ }
+ const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
+ const res = await cl.run(
+ {
+ payload,
+ priority: options?.priority ?? 0,
+ },
+ restateClient.rpc.sendOpts({
+ delay: options?.delayMs
+ ? {
+ milliseconds: options.delayMs,
+ }
+ : undefined,
+ idempotencyKey: options?.idempotencyKey,
+ }),
+ );
+ return res.invocationId;
+ }
+
+ async stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }> {
+ const res = await this.adminClient.getStats(this.name());
+ return {
+ pending: res.pending + res.ready,
+ pending_retry: res["backing-off"] + res.paused + res.suspended,
+ running: res.running,
+ failed: 0,
+ };
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ throw new Error("Method not implemented.");
+ }
+}
+
+class RestateRunnerWrapper<T> implements Runner<T> {
+ constructor(
+ private readonly wf: restate.ServiceDefinition<
+ string,
+ {
+ run: (ctx: restate.Context, data: T) => Promise<void>;
+ }
+ >,
+ ) {}
+
+ async run(): Promise<void> {
+ // No-op for restate
+ }
+
+ async stop(): Promise<void> {
+ // No-op for restate
+ }
+
+ async runUntilEmpty(): Promise<void> {
+ throw new Error("Method not implemented.");
+ }
+
+ get def(): restate.WorkflowDefinition<string, unknown> {
+ return this.wf;
+ }
+}
+
+class RestateQueueClient implements QueueClient {
+ private client: restateClient.Ingress;
+ private adminClient: AdminClient;
+ private queues = new Map<string, RestateQueueWrapper<unknown>>();
+ private services = new Map<string, RestateRunnerWrapper<unknown>>();
+
+ constructor() {
+ this.client = restateClient.connect({
+ url: envConfig.RESTATE_INGRESS_ADDR,
+ });
+ this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
+ }
+
+ async prepare(): Promise<void> {
+ // No-op for restate
+ }
+
+ async start(): Promise<void> {
+ const port = await restate.serve({
+ port: envConfig.RESTATE_LISTEN_PORT ?? 0,
+ services: [
+ ...[...this.services.values()].map((svc) => svc.def),
+ semaphore,
+ idProvider,
+ ],
+ identityKeys: envConfig.RESTATE_PUB_KEY
+ ? [envConfig.RESTATE_PUB_KEY]
+ : undefined,
+ logger: (meta, msg) => {
+ if (meta.context) {
+ // No need to log invocation logs
+ } else {
+ logger.log(meta.level, `[restate] ${msg}`);
+ }
+ },
+ });
+ logger.info(`Restate listening on port ${port}`);
+ }
+
+ createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const wrapper = new RestateQueueWrapper<T>(
+ name,
+ this.client,
+ this.adminClient,
+ opts,
+ );
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.services.get(name);
+ if (wrapper) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const svc = new RestateRunnerWrapper<T>(
+ buildRestateService(queue, funcs, opts, queue.opts),
+ );
+ this.services.set(name, svc);
+ return svc;
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class RestateQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ static isConfigured(): boolean {
+ return envConfig.RESTATE_LISTEN_PORT !== undefined;
+ }
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new RestateQueueClient();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts
new file mode 100644
index 00000000..253dbe33
--- /dev/null
+++ b/packages/plugins-queue-restate/src/semaphore.ts
@@ -0,0 +1,105 @@
+// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
+
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+interface QueueItem {
+ awakeable: string;
+ priority: number;
+}
+
+interface QueueState {
+ items: QueueItem[];
+ inFlight: number;
+}
+
+export const semaphore = object({
+ name: "Semaphore",
+ handlers: {
+ acquire: async (
+ ctx: ObjectContext<QueueState>,
+ req: { awakeableId: string; priority: number; capacity: number },
+ ): Promise<void> => {
+ const state = await getState(ctx);
+
+ state.items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ });
+
+ tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ },
+
+ release: async (
+ ctx: ObjectContext<QueueState>,
+ capacity: number,
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ state.inFlight--;
+ tick(ctx, state, capacity);
+ setState(ctx, state);
+ },
+ },
+});
+
+function selectAndPopItem(items: QueueItem[]): QueueItem {
+ let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 };
+ for (const [i, item] of items.entries()) {
+ if (item.priority > highest.priority) {
+ highest.priority = item.priority;
+ highest.index = i;
+ }
+ }
+ const [item] = items.splice(highest.index, 1);
+ return item;
+}
+
+function tick(
+ ctx: ObjectContext<QueueState>,
+ state: QueueState,
+ capacity: number,
+) {
+ while (state.inFlight < capacity && state.items.length > 0) {
+ const item = selectAndPopItem(state.items);
+ state.inFlight++;
+ ctx.resolveAwakeable(item.awakeable);
+ }
+}
+
+async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
+ return {
+ items: (await ctx.get("items")) ?? [],
+ inFlight: (await ctx.get("inFlight")) ?? 0,
+ };
+}
+
+function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
+ ctx.set("items", state.items);
+ ctx.set("inFlight", state.inFlight);
+}
+
+export class RestateSemaphore {
+ constructor(
+ private readonly ctx: Context,
+ private readonly id: string,
+ private readonly capacity: number,
+ ) {}
+
+ async acquire(priority: number) {
+ const awk = this.ctx.awakeable();
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .acquire({
+ awakeableId: awk.id,
+ priority,
+ capacity: this.capacity,
+ });
+ await awk.promise;
+ }
+ async release() {
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .release(this.capacity);
+ }
+}
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts
new file mode 100644
index 00000000..de5b070f
--- /dev/null
+++ b/packages/plugins-queue-restate/src/service.ts
@@ -0,0 +1,133 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type {
+ Queue,
+ QueueOptions,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import { genId } from "./idProvider";
+import { RestateSemaphore } from "./semaphore";
+
+export function buildRestateService<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ queueOpts: QueueOptions,
+) {
+ const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
+ return restate.service({
+ name: queue.name(),
+ options: {
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => {
+ const id = `${await genId(ctx)}`;
+ let payload = data.payload;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(data.payload);
+ if (!res.success) {
+ throw new restate.TerminalError(res.error.message, {
+ errorCode: 400,
+ });
+ }
+ payload = res.data;
+ }
+
+ const priority = data.priority ?? 0;
+
+ const semaphore = new RestateSemaphore(
+ ctx,
+ `queue:${queue.name()}`,
+ opts.concurrency,
+ );
+
+ let lastError: Error | undefined;
+ for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ await semaphore.acquire(priority);
+ const res = await runWorkerLogic(ctx, funcs, {
+ id,
+ data: payload,
+ priority,
+ runNumber,
+ numRetriesLeft: NUM_RETRIES - runNumber,
+ abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
+ });
+ await semaphore.release();
+ if (res.error) {
+ lastError = res.error;
+ // TODO: add backoff
+ await ctx.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ if (lastError) {
+ throw new restate.TerminalError(lastError.message, {
+ errorCode: 500,
+ cause: "cause" in lastError ? lastError.cause : undefined,
+ });
+ }
+ },
+ },
+ });
+}
+
+async function runWorkerLogic<T>(
+ ctx: restate.Context,
+ { run, onError, onComplete }: RunnerFuncs<T>,
+ data: {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ numRetriesLeft: number;
+ abortSignal: AbortSignal;
+ },
+) {
+ const res = await tryCatch(
+ ctx.run(
+ `main logic`,
+ async () => {
+ await run(data);
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ if (res.error) {
+ await tryCatch(
+ ctx.run(
+ `onError`,
+ async () =>
+ onError?.({
+ ...data,
+ error: res.error,
+ }),
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ return res;
+ }
+
+ await tryCatch(
+ ctx.run("onComplete", async () => await onComplete?.(data), {
+ maxRetryAttempts: 1,
+ }),
+ );
+ return res;
+}
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml
new file mode 100644
index 00000000..f24c2921
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/docker-compose.yml
@@ -0,0 +1,8 @@
+services:
+ restate:
+ image: docker.restate.dev/restatedev/restate:1.5
+ ports:
+ - "${RESTATE_INGRESS_PORT:-8080}:8080"
+ - "${RESTATE_ADMIN_PORT:-9070}:9070"
+ extra_hosts:
+ - "host.docker.internal:host-gateway"
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
new file mode 100644
index 00000000..692581d6
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/queue.test.ts
@@ -0,0 +1,221 @@
+import {
+ afterAll,
+ afterEach,
+ beforeAll,
+ beforeEach,
+ describe,
+ expect,
+ inject,
+ it,
+} from "vitest";
+
+import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+
+import { AdminClient } from "../admin.js";
+import { RestateQueueProvider } from "../index.js";
+import { waitUntil } from "./utils.js";
+
+type TestAction =
+ | { type: "val"; val: number }
+ | { type: "err"; err: string }
+ | { type: "stall"; durSec: number };
+
+describe("Restate Queue Provider", () => {
+ let queueClient: QueueClient;
+ let queue: Queue<TestAction>;
+ let adminClient: AdminClient;
+
+ const testState = {
+ results: [] as number[],
+ errors: [] as string[],
+ inFlight: 0,
+ maxInFlight: 0,
+ };
+
+ async function waitUntilQueueEmpty() {
+ await waitUntil(
+ async () => {
+ const stats = await queue.stats();
+ return stats.pending + stats.pending_retry + stats.running === 0;
+ },
+ "Queue to be empty",
+ 60000,
+ );
+ }
+
+ beforeEach(async () => {
+ testState.results = [];
+ testState.errors = [];
+ testState.inFlight = 0;
+ testState.maxInFlight = 0;
+ });
+ afterEach(async () => {
+ await waitUntilQueueEmpty();
+ });
+
+ beforeAll(async () => {
+ const ingressPort = inject("restateIngressPort");
+ const adminPort = inject("restateAdminPort");
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ const provider = new RestateQueueProvider();
+ const client = await provider.getClient();
+
+ if (!client) {
+ throw new Error("Failed to create queue client");
+ }
+
+ queueClient = client;
+ adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
+
+ queue = queueClient.createQueue<TestAction>("test-queue", {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ });
+
+ queueClient.createRunner(
+ queue,
+ {
+ run: async (job) => {
+ testState.inFlight++;
+ testState.maxInFlight = Math.max(
+ testState.maxInFlight,
+ testState.inFlight,
+ );
+ const jobData = job.data;
+ switch (jobData.type) {
+ case "val":
+ testState.results.push(jobData.val);
+ break;
+ case "err":
+ throw new Error(jobData.err);
+ case "stall":
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.durSec * 1000),
+ );
+ break;
+ }
+ },
+ onError: async (job) => {
+ testState.inFlight--;
+ const jobData = job.data;
+ if (jobData && jobData.type === "err") {
+ testState.errors.push(jobData.err);
+ }
+ },
+ onComplete: async () => {
+ testState.inFlight--;
+ },
+ },
+ {
+ concurrency: 3,
+ timeoutSecs: 2,
+ pollIntervalMs: 0 /* Doesn't matter */,
+ },
+ );
+
+ await queueClient.prepare();
+ await queueClient.start();
+
+ await adminClient.upsertDeployment("http://host.docker.internal:9080");
+ }, 90000);
+
+ afterAll(async () => {
+ if (queueClient?.shutdown) {
+ await queueClient.shutdown();
+ }
+ });
+
+ it("should enqueue and process a job", async () => {
+ const jobId = await queue.enqueue({ type: "val", val: 42 });
+
+ expect(jobId).toBeDefined();
+ expect(typeof jobId).toBe("string");
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([42]);
+ }, 60000);
+
+ it("should process multiple jobs", async () => {
+ await queue.enqueue({ type: "val", val: 1 });
+ await queue.enqueue({ type: "val", val: 2 });
+ await queue.enqueue({ type: "val", val: 3 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results.length).toEqual(3);
+ expect(testState.results).toContain(1);
+ expect(testState.results).toContain(2);
+ expect(testState.results).toContain(3);
+ }, 60000);
+
+ it("should retry failed jobs", async () => {
+ await queue.enqueue({ type: "err", err: "Test error" });
+
+ await waitUntilQueueEmpty();
+
+ // Initial attempt + 3 retries
+ expect(testState.errors).toEqual([
+ "Test error",
+ "Test error",
+ "Test error",
+ "Test error",
+ ]);
+ }, 90000);
+
+ it("should use idempotency key", async () => {
+ const idempotencyKey = `test-${Date.now()}`;
+
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([200]);
+ }, 60000);
+
+ it("should handle concurrent jobs", async () => {
+ const promises = [];
+ for (let i = 300; i < 320; i++) {
+ promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
+ }
+ await Promise.all(promises);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.maxInFlight).toEqual(3);
+ }, 60000);
+
+ it("should handle priorities", async () => {
+ // Hog the queue first
+ await Promise.all([
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ ]);
+
+ // Then those will get reprioritized
+ await Promise.all([
+ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
+ queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
+ queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
+
+ queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
+ queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
+ queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
+ ]);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ // Then in order of increasing priority
+ 302, 301, 300, 200, 201, 202,
+ ]);
+ }, 60000);
+});
diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
new file mode 100644
index 00000000..7d9dea5c
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
@@ -0,0 +1,90 @@
+import { execSync } from "child_process";
+import net from "net";
+import path from "path";
+import { fileURLToPath } from "url";
+import type { GlobalSetupContext } from "vitest/node";
+
+import { waitUntil } from "../utils.js";
+
+async function getRandomPort(): Promise<number> {
+ const server = net.createServer();
+ return new Promise<number>((resolve, reject) => {
+ server.unref();
+ server.on("error", reject);
+ server.listen(0, () => {
+ const port = (server.address() as net.AddressInfo).port;
+ server.close(() => resolve(port));
+ });
+ });
+}
+
+async function waitForHealthy(
+ ingressPort: number,
+ adminPort: number,
+ timeout = 60000,
+): Promise<void> {
+ await waitUntil(
+ async () => {
+ const response = await fetch(`http://localhost:${adminPort}/health`);
+ return response.ok;
+ },
+ "Restate admin API is healthy",
+ timeout,
+ );
+
+ await waitUntil(
+ async () => {
+ const response = await fetch(
+ `http://localhost:${ingressPort}/restate/health`,
+ );
+ return response.ok;
+ },
+ "Restate ingress is healthy",
+ timeout,
+ );
+}
+
+export default async function ({ provide }: GlobalSetupContext) {
+ const __dirname = path.dirname(fileURLToPath(import.meta.url));
+ const ingressPort = await getRandomPort();
+ const adminPort = await getRandomPort();
+
+ console.log(
+ `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
+ );
+ execSync(`docker compose up -d`, {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ env: {
+ ...process.env,
+ RESTATE_INGRESS_PORT: ingressPort.toString(),
+ RESTATE_ADMIN_PORT: adminPort.toString(),
+ },
+ });
+
+ console.log("Waiting for Restate to become healthy...");
+ await waitForHealthy(ingressPort, adminPort);
+
+ provide("restateIngressPort", ingressPort);
+ provide("restateAdminPort", adminPort);
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ return async () => {
+ console.log("Stopping Restate...");
+ execSync("docker compose down", {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ });
+ return Promise.resolve();
+ };
+}
+
+declare module "vitest" {
+ export interface ProvidedContext {
+ restateIngressPort: number;
+ restateAdminPort: number;
+ }
+}
diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts
new file mode 100644
index 00000000..e02d2dee
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/utils.ts
@@ -0,0 +1,23 @@
+export async function waitUntil(
+ f: () => Promise<boolean>,
+ description: string,
+ timeoutMs = 60000,
+): Promise<void> {
+ const startTime = Date.now();
+
+ while (Date.now() - startTime < timeoutMs) {
+ console.log(`Waiting for ${description}...`);
+ try {
+ const res = await f();
+ if (res) {
+ console.log(`${description}: success`);
+ return;
+ }
+ } catch (error) {
+ console.log(`${description}: error, retrying...: ${error}`);
+ }
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+
+ throw new Error(`${description}: timeout after ${timeoutMs}ms`);
+}