From 8d32055485858210252096483bb20533dc8bdf60 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 14 Sep 2025 18:16:40 +0000 Subject: refactor: Move callsites to liteque to be behind a plugin --- packages/shared/queueing.ts | 84 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 packages/shared/queueing.ts (limited to 'packages/shared/queueing.ts') diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts new file mode 100644 index 00000000..dfe3b31a --- /dev/null +++ b/packages/shared/queueing.ts @@ -0,0 +1,84 @@ +import { ZodType } from "zod"; + +import { PluginManager, PluginType } from "./plugins"; + +export interface EnqueueOptions { + numRetries?: number; + idempotencyKey?: string; + priority?: number; + delayMs?: number; +} + +export interface QueueOptions { + defaultJobArgs: { + numRetries: number; + }; + keepFailedJobs: boolean; +} + +export interface DequeuedJob { + id: string; + data: T; + priority: number; + runNumber: number; + abortSignal: AbortSignal; +} + +export interface DequeuedJobError { + id: string; + data?: T; + priority: number; + error: Error; + runNumber: number; + numRetriesLeft: number; +} + +export interface RunnerFuncs { + run: (job: DequeuedJob) => Promise; + onComplete?: (job: DequeuedJob) => Promise; + onError?: (job: DequeuedJobError) => Promise; +} + +export interface RunnerOptions { + pollIntervalMs?: number; + timeoutSecs: number; + concurrency: number; + validator?: ZodType; +} + +export interface Queue { + name(): string; + enqueue(payload: T, options?: EnqueueOptions): Promise; + stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }>; + cancelAllNonRunning?(): Promise; +} + +export interface Runner<_T> { + run(): Promise; + stop(): void; + runUntilEmpty?(): Promise; +} + +export interface QueueClient { + init(): Promise; + createQueue(name: string, options: QueueOptions): Queue; + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner; + shutdown?(): Promise; +} + +export async function getQueueClient(): Promise { + const client = await PluginManager.getClient(PluginType.Queue); + if (!client) { + throw new Error("Failed to get queue client"); + } + return client; +} -- cgit v1.2.3-70-g09d2