diff options
Diffstat (limited to 'packages/plugins-queue-restate')
| -rw-r--r-- | packages/plugins-queue-restate/.oxlintrc.json | 19 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/index.ts | 12 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/package.json | 27 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/admin.ts | 75 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/env.ts | 13 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/idProvider.ts | 21 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/index.ts | 201 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/semaphore.ts | 109 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/service.ts | 133 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/tests/docker-compose.yml | 8 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/tests/queue.test.ts | 221 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/tests/setup/startContainers.ts | 90 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/tests/utils.ts | 23 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/tsconfig.json | 10 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/vitest.config.ts | 14 |
15 files changed, 0 insertions, 976 deletions
diff --git a/packages/plugins-queue-restate/.oxlintrc.json b/packages/plugins-queue-restate/.oxlintrc.json deleted file mode 100644 index 79ba0255..00000000 --- a/packages/plugins-queue-restate/.oxlintrc.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "../../node_modules/oxlint/configuration_schema.json", - "extends": [ - "../../tooling/oxlint/oxlint-base.json" - ], - "env": { - "builtin": true, - "commonjs": true - }, - "ignorePatterns": [ - "**/*.config.js", - "**/*.config.cjs", - "**/.eslintrc.cjs", - "**/.next", - "**/dist", - "**/build", - "**/pnpm-lock.yaml" - ] -} diff --git a/packages/plugins-queue-restate/index.ts b/packages/plugins-queue-restate/index.ts deleted file mode 100644 index d313615c..00000000 --- a/packages/plugins-queue-restate/index.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Auto-register the Restate queue provider when this package is imported -import { PluginManager, PluginType } from "@karakeep/shared/plugins"; - -import { RestateQueueProvider } from "./src"; - -if (RestateQueueProvider.isConfigured()) { - PluginManager.register({ - type: PluginType.Queue, - name: "Restate", - provider: new RestateQueueProvider(), - }); -} diff --git a/packages/plugins-queue-restate/package.json b/packages/plugins-queue-restate/package.json deleted file mode 100644 index 16681150..00000000 --- a/packages/plugins-queue-restate/package.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@karakeep/plugins-queue-restate", - "version": "0.1.0", - "private": true, - "type": "module", - "scripts": { - "typecheck": "tsc --noEmit", - "format": "prettier . --cache --ignore-path ../../.prettierignore --check", - "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", - "lint": "oxlint .", - "lint:fix": "oxlint . --fix", - "test": "vitest" - }, - "dependencies": { - "@karakeep/shared": "workspace:*", - "@restatedev/restate-sdk": "^1.9.0", - "@restatedev/restate-sdk-clients": "^1.9.0" - }, - "devDependencies": { - "@karakeep/prettier-config": "workspace:^0.1.0", - "@karakeep/tsconfig": "workspace:^0.1.0", - "vite-tsconfig-paths": "^4.3.1", - "vitest": "^3.2.4" - }, - "prettier": "@karakeep/prettier-config" -} diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts deleted file mode 100644 index dddc8f00..00000000 --- a/packages/plugins-queue-restate/src/admin.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { z } from "zod"; - -export class AdminClient { - constructor(private addr: string) {} - - async upsertDeployment(deploymentAddr: string) { - const res = await fetch(`${this.addr}/deployments`, { - method: "POST", - body: JSON.stringify({ - uri: deploymentAddr, - force: true, - }), - headers: { - "Content-Type": "application/json", - }, - }); - - if (!res.ok) { - throw new Error(`Failed to upsert deployment: ${res.status}`); - } - } - - async getStats(serviceName: string) { - const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`; - const res = await fetch(`${this.addr}/query`, { - method: "POST", - body: JSON.stringify({ - query, - }), - headers: { - "Content-Type": "application/json", - Accept: "application/json", - }, - }); - - if (!res.ok) { - throw new Error(`Failed to get stats: ${res.status}`); - } - const zStatus = z.enum([ - "pending", - "scheduled", - "ready", - "running", - "paused", - "backing-off", - "suspended", - "completed", - ]); - const zSchema = z.object({ - rows: z.array( - z.object({ - status: zStatus, - count: z.number(), - }), - ), - }); - - return zSchema.parse(await res.json()).rows.reduce( - (acc, cur) => { - acc[cur.status] = cur.count; - return acc; - }, - { - pending: 0, - scheduled: 0, - ready: 0, - running: 0, - paused: 0, - "backing-off": 0, - suspended: 0, - completed: 0, - } as Record<z.infer<typeof zStatus>, number>, - ); - } -} diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts deleted file mode 100644 index 01175e86..00000000 --- a/packages/plugins-queue-restate/src/env.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { z } from "zod"; - -export const envConfig = z - .object({ - RESTATE_LISTEN_PORT: z.coerce.number().optional(), - RESTATE_INGRESS_ADDR: z - .string() - .optional() - .default("http://localhost:8080"), - RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), - RESTATE_PUB_KEY: z.string().optional(), - }) - .parse(process.env); diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts deleted file mode 100644 index ee85f46f..00000000 --- a/packages/plugins-queue-restate/src/idProvider.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; - -export const idProvider = object({ - name: "IdProvider", - handlers: { - get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => { - const state = (await ctx.get("nextId")) ?? 0; - ctx.set("nextId", state + 1); - return state; - }, - }, - options: { - ingressPrivate: true, - }, -}); - -export async function genId(ctx: Context) { - return ctx - .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider") - .get(); -} diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts deleted file mode 100644 index bedc26af..00000000 --- a/packages/plugins-queue-restate/src/index.ts +++ /dev/null @@ -1,201 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; -import * as restateClient from "@restatedev/restate-sdk-clients"; - -import type { PluginProvider } from "@karakeep/shared/plugins"; -import type { - EnqueueOptions, - Queue, - QueueClient, - QueueOptions, - Runner, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import logger from "@karakeep/shared/logger"; - -import { AdminClient } from "./admin"; -import { envConfig } from "./env"; -import { idProvider } from "./idProvider"; -import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; - -class RestateQueueWrapper<T> implements Queue<T> { - constructor( - private readonly _name: string, - private readonly client: restateClient.Ingress, - private readonly adminClient: AdminClient, - public readonly opts: QueueOptions, - ) {} - - name(): string { - return this._name; - } - - async enqueue( - payload: T, - options?: EnqueueOptions, - ): Promise<string | undefined> { - interface MyService { - run: ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => Promise<void>; - } - const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); - const res = await cl.run( - { - payload, - priority: options?.priority ?? 0, - }, - restateClient.rpc.sendOpts({ - delay: options?.delayMs - ? { - milliseconds: options.delayMs, - } - : undefined, - idempotencyKey: options?.idempotencyKey, - }), - ); - return res.invocationId; - } - - async stats(): Promise<{ - pending: number; - pending_retry: number; - running: number; - failed: number; - }> { - const res = await this.adminClient.getStats(this.name()); - return { - pending: res.pending + res.ready, - pending_retry: res["backing-off"] + res.paused + res.suspended, - running: res.running, - failed: 0, - }; - } - - async cancelAllNonRunning(): Promise<number> { - throw new Error("Method not implemented."); - } -} - -class RestateRunnerWrapper<T> implements Runner<T> { - constructor( - private readonly wf: restate.ServiceDefinition< - string, - { - run: (ctx: restate.Context, data: T) => Promise<void>; - } - >, - ) {} - - async run(): Promise<void> { - // No-op for restate - } - - async stop(): Promise<void> { - // No-op for restate - } - - async runUntilEmpty(): Promise<void> { - throw new Error("Method not implemented."); - } - - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; - } -} - -class RestateQueueClient implements QueueClient { - private client: restateClient.Ingress; - private adminClient: AdminClient; - private queues = new Map<string, RestateQueueWrapper<unknown>>(); - private services = new Map<string, RestateRunnerWrapper<unknown>>(); - - constructor() { - this.client = restateClient.connect({ - url: envConfig.RESTATE_INGRESS_ADDR, - }); - this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); - } - - async prepare(): Promise<void> { - // No-op for restate - } - - async start(): Promise<void> { - const port = await restate.serve({ - port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - semaphore, - idProvider, - ], - identityKeys: envConfig.RESTATE_PUB_KEY - ? [envConfig.RESTATE_PUB_KEY] - : undefined, - logger: (meta, msg) => { - if (meta.context) { - // No need to log invocation logs - } else { - logger.log(meta.level, `[restate] ${msg}`); - } - }, - }); - logger.info(`Restate listening on port ${port}`); - } - - createQueue<T>(name: string, opts: QueueOptions): Queue<T> { - if (this.queues.has(name)) { - throw new Error(`Queue ${name} already exists`); - } - const wrapper = new RestateQueueWrapper<T>( - name, - this.client, - this.adminClient, - opts, - ); - this.queues.set(name, wrapper); - return wrapper; - } - - createRunner<T>( - queue: Queue<T>, - funcs: RunnerFuncs<T>, - opts: RunnerOptions<T>, - ): Runner<T> { - const name = queue.name(); - let wrapper = this.services.get(name); - if (wrapper) { - throw new Error(`Queue ${name} already exists`); - } - const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), - ); - this.services.set(name, svc); - return svc; - } - - async shutdown(): Promise<void> { - // No-op for sqlite - } -} - -export class RestateQueueProvider implements PluginProvider<QueueClient> { - private client: QueueClient | null = null; - - static isConfigured(): boolean { - return envConfig.RESTATE_LISTEN_PORT !== undefined; - } - - async getClient(): Promise<QueueClient | null> { - if (!this.client) { - const client = new RestateQueueClient(); - this.client = client; - } - return this.client; - } -} diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts deleted file mode 100644 index ad636f98..00000000 --- a/packages/plugins-queue-restate/src/semaphore.ts +++ /dev/null @@ -1,109 +0,0 @@ -// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts - -import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; - -interface QueueItem { - awakeable: string; - priority: number; -} - -interface QueueState { - items: QueueItem[]; - inFlight: number; -} - -export const semaphore = object({ - name: "Semaphore", - handlers: { - acquire: async ( - ctx: ObjectContext<QueueState>, - req: { awakeableId: string; priority: number; capacity: number }, - ): Promise<void> => { - const state = await getState(ctx); - - state.items.push({ - awakeable: req.awakeableId, - priority: req.priority, - }); - - tick(ctx, state, req.capacity); - - setState(ctx, state); - }, - - release: async ( - ctx: ObjectContext<QueueState>, - capacity: number, - ): Promise<void> => { - const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); - setState(ctx, state); - }, - }, - options: { - ingressPrivate: true, - }, -}); - -// Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; - } - } - const [item] = items.splice(selected.index, 1); - return item; -} - -function tick( - ctx: ObjectContext<QueueState>, - state: QueueState, - capacity: number, -) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); - state.inFlight++; - ctx.resolveAwakeable(item.awakeable); - } -} - -async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { - return { - items: (await ctx.get("items")) ?? [], - inFlight: (await ctx.get("inFlight")) ?? 0, - }; -} - -function setState(ctx: ObjectContext<QueueState>, state: QueueState) { - ctx.set("items", state.items); - ctx.set("inFlight", state.inFlight); -} - -export class RestateSemaphore { - constructor( - private readonly ctx: Context, - private readonly id: string, - private readonly capacity: number, - ) {} - - async acquire(priority: number) { - const awk = this.ctx.awakeable(); - await this.ctx - .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .acquire({ - awakeableId: awk.id, - priority, - capacity: this.capacity, - }); - await awk.promise; - } - async release() { - await this.ctx - .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .release(this.capacity); - } -} diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts deleted file mode 100644 index de5b070f..00000000 --- a/packages/plugins-queue-restate/src/service.ts +++ /dev/null @@ -1,133 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; - -import type { - Queue, - QueueOptions, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import { tryCatch } from "@karakeep/shared/tryCatch"; - -import { genId } from "./idProvider"; -import { RestateSemaphore } from "./semaphore"; - -export function buildRestateService<T>( - queue: Queue<T>, - funcs: RunnerFuncs<T>, - opts: RunnerOptions<T>, - queueOpts: QueueOptions, -) { - const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; - return restate.service({ - name: queue.name(), - options: { - inactivityTimeout: { - seconds: opts.timeoutSecs, - }, - }, - handlers: { - run: async ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => { - const id = `${await genId(ctx)}`; - let payload = data.payload; - if (opts.validator) { - const res = opts.validator.safeParse(data.payload); - if (!res.success) { - throw new restate.TerminalError(res.error.message, { - errorCode: 400, - }); - } - payload = res.data; - } - - const priority = data.priority ?? 0; - - const semaphore = new RestateSemaphore( - ctx, - `queue:${queue.name()}`, - opts.concurrency, - ); - - let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - await semaphore.acquire(priority); - const res = await runWorkerLogic(ctx, funcs, { - id, - data: payload, - priority, - runNumber, - numRetriesLeft: NUM_RETRIES - runNumber, - abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), - }); - await semaphore.release(); - if (res.error) { - lastError = res.error; - // TODO: add backoff - await ctx.sleep(1000); - } else { - break; - } - } - if (lastError) { - throw new restate.TerminalError(lastError.message, { - errorCode: 500, - cause: "cause" in lastError ? lastError.cause : undefined, - }); - } - }, - }, - }); -} - -async function runWorkerLogic<T>( - ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs<T>, - data: { - id: string; - data: T; - priority: number; - runNumber: number; - numRetriesLeft: number; - abortSignal: AbortSignal; - }, -) { - const res = await tryCatch( - ctx.run( - `main logic`, - async () => { - await run(data); - }, - { - maxRetryAttempts: 1, - }, - ), - ); - if (res.error) { - await tryCatch( - ctx.run( - `onError`, - async () => - onError?.({ - ...data, - error: res.error, - }), - { - maxRetryAttempts: 1, - }, - ), - ); - return res; - } - - await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { - maxRetryAttempts: 1, - }), - ); - return res; -} diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml deleted file mode 100644 index f24c2921..00000000 --- a/packages/plugins-queue-restate/src/tests/docker-compose.yml +++ /dev/null @@ -1,8 +0,0 @@ -services: - restate: - image: docker.restate.dev/restatedev/restate:1.5 - ports: - - "${RESTATE_INGRESS_PORT:-8080}:8080" - - "${RESTATE_ADMIN_PORT:-9070}:9070" - extra_hosts: - - "host.docker.internal:host-gateway" diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts deleted file mode 100644 index e59d47cb..00000000 --- a/packages/plugins-queue-restate/src/tests/queue.test.ts +++ /dev/null @@ -1,221 +0,0 @@ -import { - afterAll, - afterEach, - beforeAll, - beforeEach, - describe, - expect, - inject, - it, -} from "vitest"; - -import type { Queue, QueueClient } from "@karakeep/shared/queueing"; - -import { AdminClient } from "../admin.js"; -import { RestateQueueProvider } from "../index.js"; -import { waitUntil } from "./utils.js"; - -type TestAction = - | { type: "val"; val: number } - | { type: "err"; err: string } - | { type: "stall"; durSec: number }; - -describe("Restate Queue Provider", () => { - let queueClient: QueueClient; - let queue: Queue<TestAction>; - let adminClient: AdminClient; - - const testState = { - results: [] as number[], - errors: [] as string[], - inFlight: 0, - maxInFlight: 0, - }; - - async function waitUntilQueueEmpty() { - await waitUntil( - async () => { - const stats = await queue.stats(); - return stats.pending + stats.pending_retry + stats.running === 0; - }, - "Queue to be empty", - 60000, - ); - } - - beforeEach(async () => { - testState.results = []; - testState.errors = []; - testState.inFlight = 0; - testState.maxInFlight = 0; - }); - afterEach(async () => { - await waitUntilQueueEmpty(); - }); - - beforeAll(async () => { - const ingressPort = inject("restateIngressPort"); - const adminPort = inject("restateAdminPort"); - - process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; - process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; - process.env.RESTATE_LISTEN_PORT = "9080"; - - const provider = new RestateQueueProvider(); - const client = await provider.getClient(); - - if (!client) { - throw new Error("Failed to create queue client"); - } - - queueClient = client; - adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); - - queue = queueClient.createQueue<TestAction>("test-queue", { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }); - - queueClient.createRunner( - queue, - { - run: async (job) => { - testState.inFlight++; - testState.maxInFlight = Math.max( - testState.maxInFlight, - testState.inFlight, - ); - const jobData = job.data; - switch (jobData.type) { - case "val": - testState.results.push(jobData.val); - break; - case "err": - throw new Error(jobData.err); - case "stall": - await new Promise((resolve) => - setTimeout(resolve, jobData.durSec * 1000), - ); - break; - } - }, - onError: async (job) => { - testState.inFlight--; - const jobData = job.data; - if (jobData && jobData.type === "err") { - testState.errors.push(jobData.err); - } - }, - onComplete: async () => { - testState.inFlight--; - }, - }, - { - concurrency: 3, - timeoutSecs: 2, - pollIntervalMs: 0 /* Doesn't matter */, - }, - ); - - await queueClient.prepare(); - await queueClient.start(); - - await adminClient.upsertDeployment("http://host.docker.internal:9080"); - }, 90000); - - afterAll(async () => { - if (queueClient?.shutdown) { - await queueClient.shutdown(); - } - }); - - it("should enqueue and process a job", async () => { - const jobId = await queue.enqueue({ type: "val", val: 42 }); - - expect(jobId).toBeDefined(); - expect(typeof jobId).toBe("string"); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([42]); - }, 60000); - - it("should process multiple jobs", async () => { - await queue.enqueue({ type: "val", val: 1 }); - await queue.enqueue({ type: "val", val: 2 }); - await queue.enqueue({ type: "val", val: 3 }); - - await waitUntilQueueEmpty(); - - expect(testState.results.length).toEqual(3); - expect(testState.results).toContain(1); - expect(testState.results).toContain(2); - expect(testState.results).toContain(3); - }, 60000); - - it("should retry failed jobs", async () => { - await queue.enqueue({ type: "err", err: "Test error" }); - - await waitUntilQueueEmpty(); - - // Initial attempt + 3 retries - expect(testState.errors).toEqual([ - "Test error", - "Test error", - "Test error", - "Test error", - ]); - }, 90000); - - it("should use idempotency key", async () => { - const idempotencyKey = `test-${Date.now()}`; - - await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); - await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([200]); - }, 60000); - - it("should handle concurrent jobs", async () => { - const promises = []; - for (let i = 300; i < 320; i++) { - promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); - } - await Promise.all(promises); - - await waitUntilQueueEmpty(); - - expect(testState.maxInFlight).toEqual(3); - }, 60000); - - it("should handle priorities", async () => { - // Hog the queue first - await Promise.all([ - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), - ]); - - // Then those will get reprioritized - await Promise.all([ - queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), - queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), - queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), - - queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), - queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), - queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), - ]); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([ - // Lower numeric priority value should run first - 202, 201, 200, 300, 301, 302, - ]); - }, 60000); -}); diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts deleted file mode 100644 index 7d9dea5c..00000000 --- a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { execSync } from "child_process"; -import net from "net"; -import path from "path"; -import { fileURLToPath } from "url"; -import type { GlobalSetupContext } from "vitest/node"; - -import { waitUntil } from "../utils.js"; - -async function getRandomPort(): Promise<number> { - const server = net.createServer(); - return new Promise<number>((resolve, reject) => { - server.unref(); - server.on("error", reject); - server.listen(0, () => { - const port = (server.address() as net.AddressInfo).port; - server.close(() => resolve(port)); - }); - }); -} - -async function waitForHealthy( - ingressPort: number, - adminPort: number, - timeout = 60000, -): Promise<void> { - await waitUntil( - async () => { - const response = await fetch(`http://localhost:${adminPort}/health`); - return response.ok; - }, - "Restate admin API is healthy", - timeout, - ); - - await waitUntil( - async () => { - const response = await fetch( - `http://localhost:${ingressPort}/restate/health`, - ); - return response.ok; - }, - "Restate ingress is healthy", - timeout, - ); -} - -export default async function ({ provide }: GlobalSetupContext) { - const __dirname = path.dirname(fileURLToPath(import.meta.url)); - const ingressPort = await getRandomPort(); - const adminPort = await getRandomPort(); - - console.log( - `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, - ); - execSync(`docker compose up -d`, { - cwd: path.join(__dirname, ".."), - stdio: "ignore", - env: { - ...process.env, - RESTATE_INGRESS_PORT: ingressPort.toString(), - RESTATE_ADMIN_PORT: adminPort.toString(), - }, - }); - - console.log("Waiting for Restate to become healthy..."); - await waitForHealthy(ingressPort, adminPort); - - provide("restateIngressPort", ingressPort); - provide("restateAdminPort", adminPort); - - process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; - process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; - process.env.RESTATE_LISTEN_PORT = "9080"; - - return async () => { - console.log("Stopping Restate..."); - execSync("docker compose down", { - cwd: path.join(__dirname, ".."), - stdio: "ignore", - }); - return Promise.resolve(); - }; -} - -declare module "vitest" { - export interface ProvidedContext { - restateIngressPort: number; - restateAdminPort: number; - } -} diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts deleted file mode 100644 index e02d2dee..00000000 --- a/packages/plugins-queue-restate/src/tests/utils.ts +++ /dev/null @@ -1,23 +0,0 @@ -export async function waitUntil( - f: () => Promise<boolean>, - description: string, - timeoutMs = 60000, -): Promise<void> { - const startTime = Date.now(); - - while (Date.now() - startTime < timeoutMs) { - console.log(`Waiting for ${description}...`); - try { - const res = await f(); - if (res) { - console.log(`${description}: success`); - return; - } - } catch (error) { - console.log(`${description}: error, retrying...: ${error}`); - } - await new Promise((resolve) => setTimeout(resolve, 1000)); - } - - throw new Error(`${description}: timeout after ${timeoutMs}ms`); -} diff --git a/packages/plugins-queue-restate/tsconfig.json b/packages/plugins-queue-restate/tsconfig.json deleted file mode 100644 index 3bfa695c..00000000 --- a/packages/plugins-queue-restate/tsconfig.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@karakeep/tsconfig/node.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" - } -} - diff --git a/packages/plugins-queue-restate/vitest.config.ts b/packages/plugins-queue-restate/vitest.config.ts deleted file mode 100644 index 73e0e1b9..00000000 --- a/packages/plugins-queue-restate/vitest.config.ts +++ /dev/null @@ -1,14 +0,0 @@ -/// <reference types="vitest" /> - -import tsconfigPaths from "vite-tsconfig-paths"; -import { defineConfig } from "vitest/config"; - -export default defineConfig({ - plugins: [tsconfigPaths()], - test: { - globalSetup: ["./src/tests/setup/startContainers.ts"], - teardownTimeout: 30000, - include: ["src/tests/**/*.test.ts"], - testTimeout: 60000, - }, -}); |
