diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-05 07:04:29 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-05 07:04:29 +0100 |
| commit | 74a1f7b6b600d4cb53352dde7def374c3125721a (patch) | |
| tree | 70b79ebae61456f6ff2cb02a37351fa9817fb342 /packages | |
| parent | 4a580d713621f99abb8baabc9b847ce039d44842 (diff) | |
| download | karakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst | |
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration
* add retry
* add delay + idempotency
* implement concurrency limits
* add admin stats
* add todos
* add id provider
* handle onComplete failures
* add tests
* add pub key and fix logging
* add priorities
* fail call after retries
* more fixes
* fix retries left
* some refactoring
* fix package.json
* upgrade sdk
* some test cleanups
Diffstat (limited to 'packages')
22 files changed, 991 insertions, 11 deletions
diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json index a31c9707..bb4b1aac 100644 --- a/packages/plugins-queue-liteque/package.json +++ b/packages/plugins-queue-liteque/package.json @@ -6,8 +6,8 @@ "type": "module", "scripts": { "typecheck": "tsc --noEmit", - "format": "prettier . --ignore-path ../../.prettierignore", - "format:fix": "prettier . --write --ignore-path ../../.prettierignore", + "format": "prettier . --cache --ignore-path ../../.prettierignore --check", + "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", "lint": "oxlint .", "lint:fix": "oxlint . --fix", "test": "vitest" @@ -24,4 +24,3 @@ }, "prettier": "@karakeep/prettier-config" } - diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts index 16e6e20a..ddc2181c 100644 --- a/packages/plugins-queue-liteque/src/index.ts +++ b/packages/plugins-queue-liteque/src/index.ts @@ -24,6 +24,7 @@ class LitequeQueueWrapper<T> implements Queue<T> { constructor( private readonly _name: string, private readonly lq: LQ<T>, + public readonly opts: QueueOptions, ) {} name(): string { @@ -60,10 +61,14 @@ class LitequeQueueClient implements QueueClient { private queues = new Map<string, LitequeQueueWrapper<unknown>>(); - async init(): Promise<void> { + async prepare(): Promise<void> { migrateDB(this.db); } + async start(): Promise<void> { + // No-op for sqlite + } + createQueue<T>(name: string, options: QueueOptions): Queue<T> { if (this.queues.has(name)) { throw new Error(`Queue ${name} already exists`); @@ -72,7 +77,7 @@ class LitequeQueueClient implements QueueClient { defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, keepFailedJobs: options.keepFailedJobs, }); - const wrapper = new LitequeQueueWrapper<T>(name, lq); + const wrapper = new LitequeQueueWrapper<T>(name, lq, options); this.queues.set(name, wrapper); return wrapper; } diff --git a/packages/plugins-queue-restate/.oxlintrc.json b/packages/plugins-queue-restate/.oxlintrc.json new file mode 100644 index 00000000..79ba0255 --- /dev/null +++ b/packages/plugins-queue-restate/.oxlintrc.json @@ -0,0 +1,19 @@ +{ + "$schema": "../../node_modules/oxlint/configuration_schema.json", + "extends": [ + "../../tooling/oxlint/oxlint-base.json" + ], + "env": { + "builtin": true, + "commonjs": true + }, + "ignorePatterns": [ + "**/*.config.js", + "**/*.config.cjs", + "**/.eslintrc.cjs", + "**/.next", + "**/dist", + "**/build", + "**/pnpm-lock.yaml" + ] +} diff --git a/packages/plugins-queue-restate/index.ts b/packages/plugins-queue-restate/index.ts new file mode 100644 index 00000000..d313615c --- /dev/null +++ b/packages/plugins-queue-restate/index.ts @@ -0,0 +1,12 @@ +// Auto-register the Restate queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { RestateQueueProvider } from "./src"; + +if (RestateQueueProvider.isConfigured()) { + PluginManager.register({ + type: PluginType.Queue, + name: "Restate", + provider: new RestateQueueProvider(), + }); +} diff --git a/packages/plugins-queue-restate/package.json b/packages/plugins-queue-restate/package.json new file mode 100644 index 00000000..16681150 --- /dev/null +++ b/packages/plugins-queue-restate/package.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@karakeep/plugins-queue-restate", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "typecheck": "tsc --noEmit", + "format": "prettier . --cache --ignore-path ../../.prettierignore --check", + "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", + "lint": "oxlint .", + "lint:fix": "oxlint . --fix", + "test": "vitest" + }, + "dependencies": { + "@karakeep/shared": "workspace:*", + "@restatedev/restate-sdk": "^1.9.0", + "@restatedev/restate-sdk-clients": "^1.9.0" + }, + "devDependencies": { + "@karakeep/prettier-config": "workspace:^0.1.0", + "@karakeep/tsconfig": "workspace:^0.1.0", + "vite-tsconfig-paths": "^4.3.1", + "vitest": "^3.2.4" + }, + "prettier": "@karakeep/prettier-config" +} diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts new file mode 100644 index 00000000..dddc8f00 --- /dev/null +++ b/packages/plugins-queue-restate/src/admin.ts @@ -0,0 +1,75 @@ +import { z } from "zod"; + +export class AdminClient { + constructor(private addr: string) {} + + async upsertDeployment(deploymentAddr: string) { + const res = await fetch(`${this.addr}/deployments`, { + method: "POST", + body: JSON.stringify({ + uri: deploymentAddr, + force: true, + }), + headers: { + "Content-Type": "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to upsert deployment: ${res.status}`); + } + } + + async getStats(serviceName: string) { + const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`; + const res = await fetch(`${this.addr}/query`, { + method: "POST", + body: JSON.stringify({ + query, + }), + headers: { + "Content-Type": "application/json", + Accept: "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to get stats: ${res.status}`); + } + const zStatus = z.enum([ + "pending", + "scheduled", + "ready", + "running", + "paused", + "backing-off", + "suspended", + "completed", + ]); + const zSchema = z.object({ + rows: z.array( + z.object({ + status: zStatus, + count: z.number(), + }), + ), + }); + + return zSchema.parse(await res.json()).rows.reduce( + (acc, cur) => { + acc[cur.status] = cur.count; + return acc; + }, + { + pending: 0, + scheduled: 0, + ready: 0, + running: 0, + paused: 0, + "backing-off": 0, + suspended: 0, + completed: 0, + } as Record<z.infer<typeof zStatus>, number>, + ); + } +} diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts new file mode 100644 index 00000000..01175e86 --- /dev/null +++ b/packages/plugins-queue-restate/src/env.ts @@ -0,0 +1,13 @@ +import { z } from "zod"; + +export const envConfig = z + .object({ + RESTATE_LISTEN_PORT: z.coerce.number().optional(), + RESTATE_INGRESS_ADDR: z + .string() + .optional() + .default("http://localhost:8080"), + RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), + RESTATE_PUB_KEY: z.string().optional(), + }) + .parse(process.env); diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts new file mode 100644 index 00000000..72ebc860 --- /dev/null +++ b/packages/plugins-queue-restate/src/idProvider.ts @@ -0,0 +1,18 @@ +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +export const idProvider = object({ + name: "IdProvider", + handlers: { + get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => { + const state = (await ctx.get("nextId")) ?? 0; + ctx.set("nextId", state + 1); + return state; + }, + }, +}); + +export async function genId(ctx: Context) { + return ctx + .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider") + .get(); +} diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts new file mode 100644 index 00000000..bedc26af --- /dev/null +++ b/packages/plugins-queue-restate/src/index.ts @@ -0,0 +1,201 @@ +import * as restate from "@restatedev/restate-sdk"; +import * as restateClient from "@restatedev/restate-sdk-clients"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; + +import { AdminClient } from "./admin"; +import { envConfig } from "./env"; +import { idProvider } from "./idProvider"; +import { semaphore } from "./semaphore"; +import { buildRestateService } from "./service"; + +class RestateQueueWrapper<T> implements Queue<T> { + constructor( + private readonly _name: string, + private readonly client: restateClient.Ingress, + private readonly adminClient: AdminClient, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise<string | undefined> { + interface MyService { + run: ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => Promise<void>; + } + const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); + const res = await cl.run( + { + payload, + priority: options?.priority ?? 0, + }, + restateClient.rpc.sendOpts({ + delay: options?.delayMs + ? { + milliseconds: options.delayMs, + } + : undefined, + idempotencyKey: options?.idempotencyKey, + }), + ); + return res.invocationId; + } + + async stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }> { + const res = await this.adminClient.getStats(this.name()); + return { + pending: res.pending + res.ready, + pending_retry: res["backing-off"] + res.paused + res.suspended, + running: res.running, + failed: 0, + }; + } + + async cancelAllNonRunning(): Promise<number> { + throw new Error("Method not implemented."); + } +} + +class RestateRunnerWrapper<T> implements Runner<T> { + constructor( + private readonly wf: restate.ServiceDefinition< + string, + { + run: (ctx: restate.Context, data: T) => Promise<void>; + } + >, + ) {} + + async run(): Promise<void> { + // No-op for restate + } + + async stop(): Promise<void> { + // No-op for restate + } + + async runUntilEmpty(): Promise<void> { + throw new Error("Method not implemented."); + } + + get def(): restate.WorkflowDefinition<string, unknown> { + return this.wf; + } +} + +class RestateQueueClient implements QueueClient { + private client: restateClient.Ingress; + private adminClient: AdminClient; + private queues = new Map<string, RestateQueueWrapper<unknown>>(); + private services = new Map<string, RestateRunnerWrapper<unknown>>(); + + constructor() { + this.client = restateClient.connect({ + url: envConfig.RESTATE_INGRESS_ADDR, + }); + this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); + } + + async prepare(): Promise<void> { + // No-op for restate + } + + async start(): Promise<void> { + const port = await restate.serve({ + port: envConfig.RESTATE_LISTEN_PORT ?? 0, + services: [ + ...[...this.services.values()].map((svc) => svc.def), + semaphore, + idProvider, + ], + identityKeys: envConfig.RESTATE_PUB_KEY + ? [envConfig.RESTATE_PUB_KEY] + : undefined, + logger: (meta, msg) => { + if (meta.context) { + // No need to log invocation logs + } else { + logger.log(meta.level, `[restate] ${msg}`); + } + }, + }); + logger.info(`Restate listening on port ${port}`); + } + + createQueue<T>(name: string, opts: QueueOptions): Queue<T> { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const wrapper = new RestateQueueWrapper<T>( + name, + this.client, + this.adminClient, + opts, + ); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T> { + const name = queue.name(); + let wrapper = this.services.get(name); + if (wrapper) { + throw new Error(`Queue ${name} already exists`); + } + const svc = new RestateRunnerWrapper<T>( + buildRestateService(queue, funcs, opts, queue.opts), + ); + this.services.set(name, svc); + return svc; + } + + async shutdown(): Promise<void> { + // No-op for sqlite + } +} + +export class RestateQueueProvider implements PluginProvider<QueueClient> { + private client: QueueClient | null = null; + + static isConfigured(): boolean { + return envConfig.RESTATE_LISTEN_PORT !== undefined; + } + + async getClient(): Promise<QueueClient | null> { + if (!this.client) { + const client = new RestateQueueClient(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts new file mode 100644 index 00000000..253dbe33 --- /dev/null +++ b/packages/plugins-queue-restate/src/semaphore.ts @@ -0,0 +1,105 @@ +// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts + +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +interface QueueItem { + awakeable: string; + priority: number; +} + +interface QueueState { + items: QueueItem[]; + inFlight: number; +} + +export const semaphore = object({ + name: "Semaphore", + handlers: { + acquire: async ( + ctx: ObjectContext<QueueState>, + req: { awakeableId: string; priority: number; capacity: number }, + ): Promise<void> => { + const state = await getState(ctx); + + state.items.push({ + awakeable: req.awakeableId, + priority: req.priority, + }); + + tick(ctx, state, req.capacity); + + setState(ctx, state); + }, + + release: async ( + ctx: ObjectContext<QueueState>, + capacity: number, + ): Promise<void> => { + const state = await getState(ctx); + state.inFlight--; + tick(ctx, state, capacity); + setState(ctx, state); + }, + }, +}); + +function selectAndPopItem(items: QueueItem[]): QueueItem { + let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority > highest.priority) { + highest.priority = item.priority; + highest.index = i; + } + } + const [item] = items.splice(highest.index, 1); + return item; +} + +function tick( + ctx: ObjectContext<QueueState>, + state: QueueState, + capacity: number, +) { + while (state.inFlight < capacity && state.items.length > 0) { + const item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } +} + +async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext<QueueState>, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +export class RestateSemaphore { + constructor( + private readonly ctx: Context, + private readonly id: string, + private readonly capacity: number, + ) {} + + async acquire(priority: number) { + const awk = this.ctx.awakeable(); + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .acquire({ + awakeableId: awk.id, + priority, + capacity: this.capacity, + }); + await awk.promise; + } + async release() { + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .release(this.capacity); + } +} diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts new file mode 100644 index 00000000..de5b070f --- /dev/null +++ b/packages/plugins-queue-restate/src/service.ts @@ -0,0 +1,133 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import { genId } from "./idProvider"; +import { RestateSemaphore } from "./semaphore"; + +export function buildRestateService<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => { + const id = `${await genId(ctx)}`; + let payload = data.payload; + if (opts.validator) { + const res = opts.validator.safeParse(data.payload); + if (!res.success) { + throw new restate.TerminalError(res.error.message, { + errorCode: 400, + }); + } + payload = res.data; + } + + const priority = data.priority ?? 0; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + ); + + let lastError: Error | undefined; + for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + await semaphore.acquire(priority); + const res = await runWorkerLogic(ctx, funcs, { + id, + data: payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), + }); + await semaphore.release(); + if (res.error) { + lastError = res.error; + // TODO: add backoff + await ctx.sleep(1000); + } else { + break; + } + } + if (lastError) { + throw new restate.TerminalError(lastError.message, { + errorCode: 500, + cause: "cause" in lastError ? lastError.cause : undefined, + }); + } + }, + }, + }); +} + +async function runWorkerLogic<T>( + ctx: restate.Context, + { run, onError, onComplete }: RunnerFuncs<T>, + data: { + id: string; + data: T; + priority: number; + runNumber: number; + numRetriesLeft: number; + abortSignal: AbortSignal; + }, +) { + const res = await tryCatch( + ctx.run( + `main logic`, + async () => { + await run(data); + }, + { + maxRetryAttempts: 1, + }, + ), + ); + if (res.error) { + await tryCatch( + ctx.run( + `onError`, + async () => + onError?.({ + ...data, + error: res.error, + }), + { + maxRetryAttempts: 1, + }, + ), + ); + return res; + } + + await tryCatch( + ctx.run("onComplete", async () => await onComplete?.(data), { + maxRetryAttempts: 1, + }), + ); + return res; +} diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml new file mode 100644 index 00000000..f24c2921 --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/docker-compose.yml @@ -0,0 +1,8 @@ +services: + restate: + image: docker.restate.dev/restatedev/restate:1.5 + ports: + - "${RESTATE_INGRESS_PORT:-8080}:8080" + - "${RESTATE_ADMIN_PORT:-9070}:9070" + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts new file mode 100644 index 00000000..692581d6 --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/queue.test.ts @@ -0,0 +1,221 @@ +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + inject, + it, +} from "vitest"; + +import type { Queue, QueueClient } from "@karakeep/shared/queueing"; + +import { AdminClient } from "../admin.js"; +import { RestateQueueProvider } from "../index.js"; +import { waitUntil } from "./utils.js"; + +type TestAction = + | { type: "val"; val: number } + | { type: "err"; err: string } + | { type: "stall"; durSec: number }; + +describe("Restate Queue Provider", () => { + let queueClient: QueueClient; + let queue: Queue<TestAction>; + let adminClient: AdminClient; + + const testState = { + results: [] as number[], + errors: [] as string[], + inFlight: 0, + maxInFlight: 0, + }; + + async function waitUntilQueueEmpty() { + await waitUntil( + async () => { + const stats = await queue.stats(); + return stats.pending + stats.pending_retry + stats.running === 0; + }, + "Queue to be empty", + 60000, + ); + } + + beforeEach(async () => { + testState.results = []; + testState.errors = []; + testState.inFlight = 0; + testState.maxInFlight = 0; + }); + afterEach(async () => { + await waitUntilQueueEmpty(); + }); + + beforeAll(async () => { + const ingressPort = inject("restateIngressPort"); + const adminPort = inject("restateAdminPort"); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + const provider = new RestateQueueProvider(); + const client = await provider.getClient(); + + if (!client) { + throw new Error("Failed to create queue client"); + } + + queueClient = client; + adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); + + queue = queueClient.createQueue<TestAction>("test-queue", { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }); + + queueClient.createRunner( + queue, + { + run: async (job) => { + testState.inFlight++; + testState.maxInFlight = Math.max( + testState.maxInFlight, + testState.inFlight, + ); + const jobData = job.data; + switch (jobData.type) { + case "val": + testState.results.push(jobData.val); + break; + case "err": + throw new Error(jobData.err); + case "stall": + await new Promise((resolve) => + setTimeout(resolve, jobData.durSec * 1000), + ); + break; + } + }, + onError: async (job) => { + testState.inFlight--; + const jobData = job.data; + if (jobData && jobData.type === "err") { + testState.errors.push(jobData.err); + } + }, + onComplete: async () => { + testState.inFlight--; + }, + }, + { + concurrency: 3, + timeoutSecs: 2, + pollIntervalMs: 0 /* Doesn't matter */, + }, + ); + + await queueClient.prepare(); + await queueClient.start(); + + await adminClient.upsertDeployment("http://host.docker.internal:9080"); + }, 90000); + + afterAll(async () => { + if (queueClient?.shutdown) { + await queueClient.shutdown(); + } + }); + + it("should enqueue and process a job", async () => { + const jobId = await queue.enqueue({ type: "val", val: 42 }); + + expect(jobId).toBeDefined(); + expect(typeof jobId).toBe("string"); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([42]); + }, 60000); + + it("should process multiple jobs", async () => { + await queue.enqueue({ type: "val", val: 1 }); + await queue.enqueue({ type: "val", val: 2 }); + await queue.enqueue({ type: "val", val: 3 }); + + await waitUntilQueueEmpty(); + + expect(testState.results.length).toEqual(3); + expect(testState.results).toContain(1); + expect(testState.results).toContain(2); + expect(testState.results).toContain(3); + }, 60000); + + it("should retry failed jobs", async () => { + await queue.enqueue({ type: "err", err: "Test error" }); + + await waitUntilQueueEmpty(); + + // Initial attempt + 3 retries + expect(testState.errors).toEqual([ + "Test error", + "Test error", + "Test error", + "Test error", + ]); + }, 90000); + + it("should use idempotency key", async () => { + const idempotencyKey = `test-${Date.now()}`; + + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([200]); + }, 60000); + + it("should handle concurrent jobs", async () => { + const promises = []; + for (let i = 300; i < 320; i++) { + promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); + } + await Promise.all(promises); + + await waitUntilQueueEmpty(); + + expect(testState.maxInFlight).toEqual(3); + }, 60000); + + it("should handle priorities", async () => { + // Hog the queue first + await Promise.all([ + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + ]); + + // Then those will get reprioritized + await Promise.all([ + queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), + queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), + queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), + + queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), + queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), + queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), + ]); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + // Then in order of increasing priority + 302, 301, 300, 200, 201, 202, + ]); + }, 60000); +}); diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts new file mode 100644 index 00000000..7d9dea5c --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts @@ -0,0 +1,90 @@ +import { execSync } from "child_process"; +import net from "net"; +import path from "path"; +import { fileURLToPath } from "url"; +import type { GlobalSetupContext } from "vitest/node"; + +import { waitUntil } from "../utils.js"; + +async function getRandomPort(): Promise<number> { + const server = net.createServer(); + return new Promise<number>((resolve, reject) => { + server.unref(); + server.on("error", reject); + server.listen(0, () => { + const port = (server.address() as net.AddressInfo).port; + server.close(() => resolve(port)); + }); + }); +} + +async function waitForHealthy( + ingressPort: number, + adminPort: number, + timeout = 60000, +): Promise<void> { + await waitUntil( + async () => { + const response = await fetch(`http://localhost:${adminPort}/health`); + return response.ok; + }, + "Restate admin API is healthy", + timeout, + ); + + await waitUntil( + async () => { + const response = await fetch( + `http://localhost:${ingressPort}/restate/health`, + ); + return response.ok; + }, + "Restate ingress is healthy", + timeout, + ); +} + +export default async function ({ provide }: GlobalSetupContext) { + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const ingressPort = await getRandomPort(); + const adminPort = await getRandomPort(); + + console.log( + `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, + ); + execSync(`docker compose up -d`, { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + env: { + ...process.env, + RESTATE_INGRESS_PORT: ingressPort.toString(), + RESTATE_ADMIN_PORT: adminPort.toString(), + }, + }); + + console.log("Waiting for Restate to become healthy..."); + await waitForHealthy(ingressPort, adminPort); + + provide("restateIngressPort", ingressPort); + provide("restateAdminPort", adminPort); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + return async () => { + console.log("Stopping Restate..."); + execSync("docker compose down", { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + }); + return Promise.resolve(); + }; +} + +declare module "vitest" { + export interface ProvidedContext { + restateIngressPort: number; + restateAdminPort: number; + } +} diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts new file mode 100644 index 00000000..e02d2dee --- /dev/null +++ b/packages/plugins-queue-restate/src/tests/utils.ts @@ -0,0 +1,23 @@ +export async function waitUntil( + f: () => Promise<boolean>, + description: string, + timeoutMs = 60000, +): Promise<void> { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + console.log(`Waiting for ${description}...`); + try { + const res = await f(); + if (res) { + console.log(`${description}: success`); + return; + } + } catch (error) { + console.log(`${description}: error, retrying...: ${error}`); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + throw new Error(`${description}: timeout after ${timeoutMs}ms`); +} diff --git a/packages/plugins-queue-restate/tsconfig.json b/packages/plugins-queue-restate/tsconfig.json new file mode 100644 index 00000000..3bfa695c --- /dev/null +++ b/packages/plugins-queue-restate/tsconfig.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@karakeep/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} + diff --git a/packages/plugins-queue-restate/vitest.config.ts b/packages/plugins-queue-restate/vitest.config.ts new file mode 100644 index 00000000..73e0e1b9 --- /dev/null +++ b/packages/plugins-queue-restate/vitest.config.ts @@ -0,0 +1,14 @@ +/// <reference types="vitest" /> + +import tsconfigPaths from "vite-tsconfig-paths"; +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + plugins: [tsconfigPaths()], + test: { + globalSetup: ["./src/tests/setup/startContainers.ts"], + teardownTimeout: 30000, + include: ["src/tests/**/*.test.ts"], + testTimeout: 60000, + }, +}); diff --git a/packages/plugins-search-meilisearch/package.json b/packages/plugins-search-meilisearch/package.json index 3bc9db80..c9482731 100644 --- a/packages/plugins-search-meilisearch/package.json +++ b/packages/plugins-search-meilisearch/package.json @@ -6,8 +6,8 @@ "type": "module", "scripts": { "typecheck": "tsc --noEmit", - "format": "prettier . --ignore-path ../../.prettierignore", - "format:fix": "prettier . --write --ignore-path ../../.prettierignore", + "format": "prettier . --cache --ignore-path ../../.prettierignore --check", + "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", "lint": "oxlint .", "lint:fix": "oxlint . --fix", "test": "vitest" diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index 9c1b52a8..d18eb4a0 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -7,6 +7,7 @@ "dependencies": { "@karakeep/db": "workspace:^0.1.0", "@karakeep/plugins-queue-liteque": "workspace:^0.1.0", + "@karakeep/plugins-queue-restate": "workspace:^0.1.0", "@karakeep/plugins-search-meilisearch": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0" }, diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts index b6a88462..de08b9b0 100644 --- a/packages/shared-server/src/plugins.ts +++ b/packages/shared-server/src/plugins.ts @@ -8,6 +8,7 @@ export async function loadAllPlugins() { // Load plugins here. Order of plugin loading matter. // Queue provider(s) await import("@karakeep/plugins-queue-liteque"); + await import("@karakeep/plugins-queue-restate"); await import("@karakeep/plugins-search-meilisearch"); PluginManager.logAllPlugins(); pluginsLoaded = true; diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index 1c4e0452..c9f8276d 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -8,8 +8,12 @@ import { loadAllPlugins } from "."; await loadAllPlugins(); const QUEUE_CLIENT = await getQueueClient(); -export function runQueueDBMigrations() { - QUEUE_CLIENT.init(); +export async function prepareQueue() { + await QUEUE_CLIENT.prepare(); +} + +export async function startQueue() { + await QUEUE_CLIENT.start(); } // Link Crawler diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts index dfe3b31a..e401972b 100644 --- a/packages/shared/queueing.ts +++ b/packages/shared/queueing.ts @@ -3,7 +3,6 @@ import { ZodType } from "zod"; import { PluginManager, PluginType } from "./plugins"; export interface EnqueueOptions { - numRetries?: number; idempotencyKey?: string; priority?: number; delayMs?: number; @@ -47,6 +46,7 @@ export interface RunnerOptions<T> { } export interface Queue<T> { + opts: QueueOptions; name(): string; enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>; stats(): Promise<{ @@ -65,7 +65,8 @@ export interface Runner<_T> { } export interface QueueClient { - init(): Promise<void>; + prepare(): Promise<void>; + start(): Promise<void>; createQueue<T>(name: string, options: QueueOptions): Queue<T>; createRunner<T>( queue: Queue<T>, |
