aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src')
-rw-r--r--packages/plugins-queue-restate/src/admin.ts75
-rw-r--r--packages/plugins-queue-restate/src/env.ts13
-rw-r--r--packages/plugins-queue-restate/src/idProvider.ts21
-rw-r--r--packages/plugins-queue-restate/src/index.ts201
-rw-r--r--packages/plugins-queue-restate/src/semaphore.ts109
-rw-r--r--packages/plugins-queue-restate/src/service.ts133
-rw-r--r--packages/plugins-queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins-queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins-queue-restate/src/tests/utils.ts23
10 files changed, 0 insertions, 894 deletions
diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts
deleted file mode 100644
index dddc8f00..00000000
--- a/packages/plugins-queue-restate/src/admin.ts
+++ /dev/null
@@ -1,75 +0,0 @@
-import { z } from "zod";
-
-export class AdminClient {
- constructor(private addr: string) {}
-
- async upsertDeployment(deploymentAddr: string) {
- const res = await fetch(`${this.addr}/deployments`, {
- method: "POST",
- body: JSON.stringify({
- uri: deploymentAddr,
- force: true,
- }),
- headers: {
- "Content-Type": "application/json",
- },
- });
-
- if (!res.ok) {
- throw new Error(`Failed to upsert deployment: ${res.status}`);
- }
- }
-
- async getStats(serviceName: string) {
- const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`;
- const res = await fetch(`${this.addr}/query`, {
- method: "POST",
- body: JSON.stringify({
- query,
- }),
- headers: {
- "Content-Type": "application/json",
- Accept: "application/json",
- },
- });
-
- if (!res.ok) {
- throw new Error(`Failed to get stats: ${res.status}`);
- }
- const zStatus = z.enum([
- "pending",
- "scheduled",
- "ready",
- "running",
- "paused",
- "backing-off",
- "suspended",
- "completed",
- ]);
- const zSchema = z.object({
- rows: z.array(
- z.object({
- status: zStatus,
- count: z.number(),
- }),
- ),
- });
-
- return zSchema.parse(await res.json()).rows.reduce(
- (acc, cur) => {
- acc[cur.status] = cur.count;
- return acc;
- },
- {
- pending: 0,
- scheduled: 0,
- ready: 0,
- running: 0,
- paused: 0,
- "backing-off": 0,
- suspended: 0,
- completed: 0,
- } as Record<z.infer<typeof zStatus>, number>,
- );
- }
-}
diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts
deleted file mode 100644
index 01175e86..00000000
--- a/packages/plugins-queue-restate/src/env.ts
+++ /dev/null
@@ -1,13 +0,0 @@
-import { z } from "zod";
-
-export const envConfig = z
- .object({
- RESTATE_LISTEN_PORT: z.coerce.number().optional(),
- RESTATE_INGRESS_ADDR: z
- .string()
- .optional()
- .default("http://localhost:8080"),
- RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
- RESTATE_PUB_KEY: z.string().optional(),
- })
- .parse(process.env);
diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts
deleted file mode 100644
index ee85f46f..00000000
--- a/packages/plugins-queue-restate/src/idProvider.ts
+++ /dev/null
@@ -1,21 +0,0 @@
-import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
-
-export const idProvider = object({
- name: "IdProvider",
- handlers: {
- get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => {
- const state = (await ctx.get("nextId")) ?? 0;
- ctx.set("nextId", state + 1);
- return state;
- },
- },
- options: {
- ingressPrivate: true,
- },
-});
-
-export async function genId(ctx: Context) {
- return ctx
- .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider")
- .get();
-}
diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts
deleted file mode 100644
index bedc26af..00000000
--- a/packages/plugins-queue-restate/src/index.ts
+++ /dev/null
@@ -1,201 +0,0 @@
-import * as restate from "@restatedev/restate-sdk";
-import * as restateClient from "@restatedev/restate-sdk-clients";
-
-import type { PluginProvider } from "@karakeep/shared/plugins";
-import type {
- EnqueueOptions,
- Queue,
- QueueClient,
- QueueOptions,
- Runner,
- RunnerFuncs,
- RunnerOptions,
-} from "@karakeep/shared/queueing";
-import logger from "@karakeep/shared/logger";
-
-import { AdminClient } from "./admin";
-import { envConfig } from "./env";
-import { idProvider } from "./idProvider";
-import { semaphore } from "./semaphore";
-import { buildRestateService } from "./service";
-
-class RestateQueueWrapper<T> implements Queue<T> {
- constructor(
- private readonly _name: string,
- private readonly client: restateClient.Ingress,
- private readonly adminClient: AdminClient,
- public readonly opts: QueueOptions,
- ) {}
-
- name(): string {
- return this._name;
- }
-
- async enqueue(
- payload: T,
- options?: EnqueueOptions,
- ): Promise<string | undefined> {
- interface MyService {
- run: (
- ctx: restate.Context,
- data: {
- payload: T;
- priority: number;
- },
- ) => Promise<void>;
- }
- const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
- const res = await cl.run(
- {
- payload,
- priority: options?.priority ?? 0,
- },
- restateClient.rpc.sendOpts({
- delay: options?.delayMs
- ? {
- milliseconds: options.delayMs,
- }
- : undefined,
- idempotencyKey: options?.idempotencyKey,
- }),
- );
- return res.invocationId;
- }
-
- async stats(): Promise<{
- pending: number;
- pending_retry: number;
- running: number;
- failed: number;
- }> {
- const res = await this.adminClient.getStats(this.name());
- return {
- pending: res.pending + res.ready,
- pending_retry: res["backing-off"] + res.paused + res.suspended,
- running: res.running,
- failed: 0,
- };
- }
-
- async cancelAllNonRunning(): Promise<number> {
- throw new Error("Method not implemented.");
- }
-}
-
-class RestateRunnerWrapper<T> implements Runner<T> {
- constructor(
- private readonly wf: restate.ServiceDefinition<
- string,
- {
- run: (ctx: restate.Context, data: T) => Promise<void>;
- }
- >,
- ) {}
-
- async run(): Promise<void> {
- // No-op for restate
- }
-
- async stop(): Promise<void> {
- // No-op for restate
- }
-
- async runUntilEmpty(): Promise<void> {
- throw new Error("Method not implemented.");
- }
-
- get def(): restate.WorkflowDefinition<string, unknown> {
- return this.wf;
- }
-}
-
-class RestateQueueClient implements QueueClient {
- private client: restateClient.Ingress;
- private adminClient: AdminClient;
- private queues = new Map<string, RestateQueueWrapper<unknown>>();
- private services = new Map<string, RestateRunnerWrapper<unknown>>();
-
- constructor() {
- this.client = restateClient.connect({
- url: envConfig.RESTATE_INGRESS_ADDR,
- });
- this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
- }
-
- async prepare(): Promise<void> {
- // No-op for restate
- }
-
- async start(): Promise<void> {
- const port = await restate.serve({
- port: envConfig.RESTATE_LISTEN_PORT ?? 0,
- services: [
- ...[...this.services.values()].map((svc) => svc.def),
- semaphore,
- idProvider,
- ],
- identityKeys: envConfig.RESTATE_PUB_KEY
- ? [envConfig.RESTATE_PUB_KEY]
- : undefined,
- logger: (meta, msg) => {
- if (meta.context) {
- // No need to log invocation logs
- } else {
- logger.log(meta.level, `[restate] ${msg}`);
- }
- },
- });
- logger.info(`Restate listening on port ${port}`);
- }
-
- createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
- if (this.queues.has(name)) {
- throw new Error(`Queue ${name} already exists`);
- }
- const wrapper = new RestateQueueWrapper<T>(
- name,
- this.client,
- this.adminClient,
- opts,
- );
- this.queues.set(name, wrapper);
- return wrapper;
- }
-
- createRunner<T>(
- queue: Queue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- ): Runner<T> {
- const name = queue.name();
- let wrapper = this.services.get(name);
- if (wrapper) {
- throw new Error(`Queue ${name} already exists`);
- }
- const svc = new RestateRunnerWrapper<T>(
- buildRestateService(queue, funcs, opts, queue.opts),
- );
- this.services.set(name, svc);
- return svc;
- }
-
- async shutdown(): Promise<void> {
- // No-op for sqlite
- }
-}
-
-export class RestateQueueProvider implements PluginProvider<QueueClient> {
- private client: QueueClient | null = null;
-
- static isConfigured(): boolean {
- return envConfig.RESTATE_LISTEN_PORT !== undefined;
- }
-
- async getClient(): Promise<QueueClient | null> {
- if (!this.client) {
- const client = new RestateQueueClient();
- this.client = client;
- }
- return this.client;
- }
-}
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts
deleted file mode 100644
index ad636f98..00000000
--- a/packages/plugins-queue-restate/src/semaphore.ts
+++ /dev/null
@@ -1,109 +0,0 @@
-// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
-
-import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
-
-interface QueueItem {
- awakeable: string;
- priority: number;
-}
-
-interface QueueState {
- items: QueueItem[];
- inFlight: number;
-}
-
-export const semaphore = object({
- name: "Semaphore",
- handlers: {
- acquire: async (
- ctx: ObjectContext<QueueState>,
- req: { awakeableId: string; priority: number; capacity: number },
- ): Promise<void> => {
- const state = await getState(ctx);
-
- state.items.push({
- awakeable: req.awakeableId,
- priority: req.priority,
- });
-
- tick(ctx, state, req.capacity);
-
- setState(ctx, state);
- },
-
- release: async (
- ctx: ObjectContext<QueueState>,
- capacity: number,
- ): Promise<void> => {
- const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
- setState(ctx, state);
- },
- },
- options: {
- ingressPrivate: true,
- },
-});
-
-// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(items: QueueItem[]): QueueItem {
- let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
- for (const [i, item] of items.entries()) {
- if (item.priority < selected.priority) {
- selected.priority = item.priority;
- selected.index = i;
- }
- }
- const [item] = items.splice(selected.index, 1);
- return item;
-}
-
-function tick(
- ctx: ObjectContext<QueueState>,
- state: QueueState,
- capacity: number,
-) {
- while (state.inFlight < capacity && state.items.length > 0) {
- const item = selectAndPopItem(state.items);
- state.inFlight++;
- ctx.resolveAwakeable(item.awakeable);
- }
-}
-
-async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
- return {
- items: (await ctx.get("items")) ?? [],
- inFlight: (await ctx.get("inFlight")) ?? 0,
- };
-}
-
-function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
- ctx.set("items", state.items);
- ctx.set("inFlight", state.inFlight);
-}
-
-export class RestateSemaphore {
- constructor(
- private readonly ctx: Context,
- private readonly id: string,
- private readonly capacity: number,
- ) {}
-
- async acquire(priority: number) {
- const awk = this.ctx.awakeable();
- await this.ctx
- .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .acquire({
- awakeableId: awk.id,
- priority,
- capacity: this.capacity,
- });
- await awk.promise;
- }
- async release() {
- await this.ctx
- .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
- }
-}
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts
deleted file mode 100644
index de5b070f..00000000
--- a/packages/plugins-queue-restate/src/service.ts
+++ /dev/null
@@ -1,133 +0,0 @@
-import * as restate from "@restatedev/restate-sdk";
-
-import type {
- Queue,
- QueueOptions,
- RunnerFuncs,
- RunnerOptions,
-} from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-
-import { genId } from "./idProvider";
-import { RestateSemaphore } from "./semaphore";
-
-export function buildRestateService<T>(
- queue: Queue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- priority: number;
- },
- ) => {
- const id = `${await genId(ctx)}`;
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- await semaphore.acquire(priority);
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
- if (res.error) {
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000);
- } else {
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-async function runWorkerLogic<T>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-) {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- await run(data);
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return res;
- }
-
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data), {
- maxRetryAttempts: 1,
- }),
- );
- return res;
-}
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml
deleted file mode 100644
index f24c2921..00000000
--- a/packages/plugins-queue-restate/src/tests/docker-compose.yml
+++ /dev/null
@@ -1,8 +0,0 @@
-services:
- restate:
- image: docker.restate.dev/restatedev/restate:1.5
- ports:
- - "${RESTATE_INGRESS_PORT:-8080}:8080"
- - "${RESTATE_ADMIN_PORT:-9070}:9070"
- extra_hosts:
- - "host.docker.internal:host-gateway"
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
deleted file mode 100644
index e59d47cb..00000000
--- a/packages/plugins-queue-restate/src/tests/queue.test.ts
+++ /dev/null
@@ -1,221 +0,0 @@
-import {
- afterAll,
- afterEach,
- beforeAll,
- beforeEach,
- describe,
- expect,
- inject,
- it,
-} from "vitest";
-
-import type { Queue, QueueClient } from "@karakeep/shared/queueing";
-
-import { AdminClient } from "../admin.js";
-import { RestateQueueProvider } from "../index.js";
-import { waitUntil } from "./utils.js";
-
-type TestAction =
- | { type: "val"; val: number }
- | { type: "err"; err: string }
- | { type: "stall"; durSec: number };
-
-describe("Restate Queue Provider", () => {
- let queueClient: QueueClient;
- let queue: Queue<TestAction>;
- let adminClient: AdminClient;
-
- const testState = {
- results: [] as number[],
- errors: [] as string[],
- inFlight: 0,
- maxInFlight: 0,
- };
-
- async function waitUntilQueueEmpty() {
- await waitUntil(
- async () => {
- const stats = await queue.stats();
- return stats.pending + stats.pending_retry + stats.running === 0;
- },
- "Queue to be empty",
- 60000,
- );
- }
-
- beforeEach(async () => {
- testState.results = [];
- testState.errors = [];
- testState.inFlight = 0;
- testState.maxInFlight = 0;
- });
- afterEach(async () => {
- await waitUntilQueueEmpty();
- });
-
- beforeAll(async () => {
- const ingressPort = inject("restateIngressPort");
- const adminPort = inject("restateAdminPort");
-
- process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
- process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
- process.env.RESTATE_LISTEN_PORT = "9080";
-
- const provider = new RestateQueueProvider();
- const client = await provider.getClient();
-
- if (!client) {
- throw new Error("Failed to create queue client");
- }
-
- queueClient = client;
- adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
-
- queue = queueClient.createQueue<TestAction>("test-queue", {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
- });
-
- queueClient.createRunner(
- queue,
- {
- run: async (job) => {
- testState.inFlight++;
- testState.maxInFlight = Math.max(
- testState.maxInFlight,
- testState.inFlight,
- );
- const jobData = job.data;
- switch (jobData.type) {
- case "val":
- testState.results.push(jobData.val);
- break;
- case "err":
- throw new Error(jobData.err);
- case "stall":
- await new Promise((resolve) =>
- setTimeout(resolve, jobData.durSec * 1000),
- );
- break;
- }
- },
- onError: async (job) => {
- testState.inFlight--;
- const jobData = job.data;
- if (jobData && jobData.type === "err") {
- testState.errors.push(jobData.err);
- }
- },
- onComplete: async () => {
- testState.inFlight--;
- },
- },
- {
- concurrency: 3,
- timeoutSecs: 2,
- pollIntervalMs: 0 /* Doesn't matter */,
- },
- );
-
- await queueClient.prepare();
- await queueClient.start();
-
- await adminClient.upsertDeployment("http://host.docker.internal:9080");
- }, 90000);
-
- afterAll(async () => {
- if (queueClient?.shutdown) {
- await queueClient.shutdown();
- }
- });
-
- it("should enqueue and process a job", async () => {
- const jobId = await queue.enqueue({ type: "val", val: 42 });
-
- expect(jobId).toBeDefined();
- expect(typeof jobId).toBe("string");
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([42]);
- }, 60000);
-
- it("should process multiple jobs", async () => {
- await queue.enqueue({ type: "val", val: 1 });
- await queue.enqueue({ type: "val", val: 2 });
- await queue.enqueue({ type: "val", val: 3 });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results.length).toEqual(3);
- expect(testState.results).toContain(1);
- expect(testState.results).toContain(2);
- expect(testState.results).toContain(3);
- }, 60000);
-
- it("should retry failed jobs", async () => {
- await queue.enqueue({ type: "err", err: "Test error" });
-
- await waitUntilQueueEmpty();
-
- // Initial attempt + 3 retries
- expect(testState.errors).toEqual([
- "Test error",
- "Test error",
- "Test error",
- "Test error",
- ]);
- }, 90000);
-
- it("should use idempotency key", async () => {
- const idempotencyKey = `test-${Date.now()}`;
-
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
- await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([200]);
- }, 60000);
-
- it("should handle concurrent jobs", async () => {
- const promises = [];
- for (let i = 300; i < 320; i++) {
- promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
- }
- await Promise.all(promises);
-
- await waitUntilQueueEmpty();
-
- expect(testState.maxInFlight).toEqual(3);
- }, 60000);
-
- it("should handle priorities", async () => {
- // Hog the queue first
- await Promise.all([
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
- ]);
-
- // Then those will get reprioritized
- await Promise.all([
- queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
- queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
- queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
-
- queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
- queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
- queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
- ]);
-
- await waitUntilQueueEmpty();
-
- expect(testState.results).toEqual([
- // Lower numeric priority value should run first
- 202, 201, 200, 300, 301, 302,
- ]);
- }, 60000);
-});
diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
deleted file mode 100644
index 7d9dea5c..00000000
--- a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
+++ /dev/null
@@ -1,90 +0,0 @@
-import { execSync } from "child_process";
-import net from "net";
-import path from "path";
-import { fileURLToPath } from "url";
-import type { GlobalSetupContext } from "vitest/node";
-
-import { waitUntil } from "../utils.js";
-
-async function getRandomPort(): Promise<number> {
- const server = net.createServer();
- return new Promise<number>((resolve, reject) => {
- server.unref();
- server.on("error", reject);
- server.listen(0, () => {
- const port = (server.address() as net.AddressInfo).port;
- server.close(() => resolve(port));
- });
- });
-}
-
-async function waitForHealthy(
- ingressPort: number,
- adminPort: number,
- timeout = 60000,
-): Promise<void> {
- await waitUntil(
- async () => {
- const response = await fetch(`http://localhost:${adminPort}/health`);
- return response.ok;
- },
- "Restate admin API is healthy",
- timeout,
- );
-
- await waitUntil(
- async () => {
- const response = await fetch(
- `http://localhost:${ingressPort}/restate/health`,
- );
- return response.ok;
- },
- "Restate ingress is healthy",
- timeout,
- );
-}
-
-export default async function ({ provide }: GlobalSetupContext) {
- const __dirname = path.dirname(fileURLToPath(import.meta.url));
- const ingressPort = await getRandomPort();
- const adminPort = await getRandomPort();
-
- console.log(
- `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
- );
- execSync(`docker compose up -d`, {
- cwd: path.join(__dirname, ".."),
- stdio: "ignore",
- env: {
- ...process.env,
- RESTATE_INGRESS_PORT: ingressPort.toString(),
- RESTATE_ADMIN_PORT: adminPort.toString(),
- },
- });
-
- console.log("Waiting for Restate to become healthy...");
- await waitForHealthy(ingressPort, adminPort);
-
- provide("restateIngressPort", ingressPort);
- provide("restateAdminPort", adminPort);
-
- process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
- process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
- process.env.RESTATE_LISTEN_PORT = "9080";
-
- return async () => {
- console.log("Stopping Restate...");
- execSync("docker compose down", {
- cwd: path.join(__dirname, ".."),
- stdio: "ignore",
- });
- return Promise.resolve();
- };
-}
-
-declare module "vitest" {
- export interface ProvidedContext {
- restateIngressPort: number;
- restateAdminPort: number;
- }
-}
diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts
deleted file mode 100644
index e02d2dee..00000000
--- a/packages/plugins-queue-restate/src/tests/utils.ts
+++ /dev/null
@@ -1,23 +0,0 @@
-export async function waitUntil(
- f: () => Promise<boolean>,
- description: string,
- timeoutMs = 60000,
-): Promise<void> {
- const startTime = Date.now();
-
- while (Date.now() - startTime < timeoutMs) {
- console.log(`Waiting for ${description}...`);
- try {
- const res = await f();
- if (res) {
- console.log(`${description}: success`);
- return;
- }
- } catch (error) {
- console.log(`${description}: error, retrying...: ${error}`);
- }
- await new Promise((resolve) => setTimeout(resolve, 1000));
- }
-
- throw new Error(`${description}: timeout after ${timeoutMs}ms`);
-}