diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:40 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:57 +0000 |
| commit | 8d32055485858210252096483bb20533dc8bdf60 (patch) | |
| tree | ce8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared/queueing.ts | |
| parent | 6ba61b46154e076fca47d3841b158105dbeeef80 (diff) | |
| download | karakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst | |
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/shared/queueing.ts')
| -rw-r--r-- | packages/shared/queueing.ts | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts new file mode 100644 index 00000000..dfe3b31a --- /dev/null +++ b/packages/shared/queueing.ts @@ -0,0 +1,84 @@ +import { ZodType } from "zod"; + +import { PluginManager, PluginType } from "./plugins"; + +export interface EnqueueOptions { + numRetries?: number; + idempotencyKey?: string; + priority?: number; + delayMs?: number; +} + +export interface QueueOptions { + defaultJobArgs: { + numRetries: number; + }; + keepFailedJobs: boolean; +} + +export interface DequeuedJob<T> { + id: string; + data: T; + priority: number; + runNumber: number; + abortSignal: AbortSignal; +} + +export interface DequeuedJobError<T> { + id: string; + data?: T; + priority: number; + error: Error; + runNumber: number; + numRetriesLeft: number; +} + +export interface RunnerFuncs<T> { + run: (job: DequeuedJob<T>) => Promise<void>; + onComplete?: (job: DequeuedJob<T>) => Promise<void>; + onError?: (job: DequeuedJobError<T>) => Promise<void>; +} + +export interface RunnerOptions<T> { + pollIntervalMs?: number; + timeoutSecs: number; + concurrency: number; + validator?: ZodType<T>; +} + +export interface Queue<T> { + name(): string; + enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>; + stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }>; + cancelAllNonRunning?(): Promise<number>; +} + +export interface Runner<_T> { + run(): Promise<void>; + stop(): void; + runUntilEmpty?(): Promise<void>; +} + +export interface QueueClient { + init(): Promise<void>; + createQueue<T>(name: string, options: QueueOptions): Queue<T>; + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T>; + shutdown?(): Promise<void>; +} + +export async function getQueueClient(): Promise<QueueClient> { + const client = await PluginManager.getClient(PluginType.Queue); + if (!client) { + throw new Error("Failed to get queue client"); + } + return client; +} |
