import * as restate from "@restatedev/restate-sdk"; import * as restateClient from "@restatedev/restate-sdk-clients"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { EnqueueOptions, Queue, QueueClient, QueueOptions, Runner, RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; import logger from "@karakeep/shared/logger"; import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; import { buildRestateService } from "./service"; class RestateQueueWrapper implements Queue { constructor( private readonly _name: string, private readonly client: restateClient.Ingress, private readonly adminClient: AdminClient, public readonly opts: QueueOptions, ) {} name(): string { return this._name; } async enqueue( payload: T, options?: EnqueueOptions, ): Promise { interface MyService { run: ( ctx: restate.Context, data: { payload: T; priority: number; }, ) => Promise; } const cl = this.client.serviceSendClient({ name: this.name() }); const res = await cl.run( { payload, priority: options?.priority ?? 0, }, restateClient.rpc.sendOpts({ delay: options?.delayMs ? { milliseconds: options.delayMs, } : undefined, idempotencyKey: options?.idempotencyKey, }), ); return res.invocationId; } async stats(): Promise<{ pending: number; pending_retry: number; running: number; failed: number; }> { const res = await this.adminClient.getStats(this.name()); return { pending: res.pending + res.ready, pending_retry: res["backing-off"] + res.paused + res.suspended, running: res.running, failed: 0, }; } async cancelAllNonRunning(): Promise { throw new Error("Method not implemented."); } } class RestateRunnerWrapper implements Runner { constructor( private readonly wf: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise; } >, ) {} async run(): Promise { // No-op for restate } async stop(): Promise { // No-op for restate } async runUntilEmpty(): Promise { throw new Error("Method not implemented."); } get def(): restate.WorkflowDefinition { return this.wf; } } class RestateQueueClient implements QueueClient { private client: restateClient.Ingress; private adminClient: AdminClient; private queues = new Map>(); private services = new Map>(); constructor() { this.client = restateClient.connect({ url: envConfig.RESTATE_INGRESS_ADDR, }); this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); } async prepare(): Promise { // No-op for restate } async start(): Promise { const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, services: [ ...[...this.services.values()].map((svc) => svc.def), semaphore, idProvider, ], identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, logger: (meta, msg) => { if (meta.context) { // No need to log invocation logs } else { logger.log(meta.level, `[restate] ${msg}`); } }, }); logger.info(`Restate listening on port ${port}`); } createQueue(name: string, opts: QueueOptions): Queue { if (this.queues.has(name)) { throw new Error(`Queue ${name} already exists`); } const wrapper = new RestateQueueWrapper( name, this.client, this.adminClient, opts, ); this.queues.set(name, wrapper); return wrapper; } createRunner( queue: Queue, funcs: RunnerFuncs, opts: RunnerOptions, ): Runner { const name = queue.name(); let wrapper = this.services.get(name); if (wrapper) { throw new Error(`Queue ${name} already exists`); } const svc = new RestateRunnerWrapper( buildRestateService(queue, funcs, opts, queue.opts), ); this.services.set(name, svc); return svc; } async shutdown(): Promise { // No-op for sqlite } } export class RestateQueueProvider implements PluginProvider { private client: QueueClient | null = null; static isConfigured(): boolean { return envConfig.RESTATE_LISTEN_PORT !== undefined; } async getClient(): Promise { if (!this.client) { const client = new RestateQueueClient(); this.client = client; } return this.client; } }