aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/queueing.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared/queueing.ts
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/shared/queueing.ts')
-rw-r--r--packages/shared/queueing.ts84
1 files changed, 84 insertions, 0 deletions
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
new file mode 100644
index 00000000..dfe3b31a
--- /dev/null
+++ b/packages/shared/queueing.ts
@@ -0,0 +1,84 @@
+import { ZodType } from "zod";
+
+import { PluginManager, PluginType } from "./plugins";
+
+export interface EnqueueOptions {
+ numRetries?: number;
+ idempotencyKey?: string;
+ priority?: number;
+ delayMs?: number;
+}
+
+export interface QueueOptions {
+ defaultJobArgs: {
+ numRetries: number;
+ };
+ keepFailedJobs: boolean;
+}
+
+export interface DequeuedJob<T> {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ abortSignal: AbortSignal;
+}
+
+export interface DequeuedJobError<T> {
+ id: string;
+ data?: T;
+ priority: number;
+ error: Error;
+ runNumber: number;
+ numRetriesLeft: number;
+}
+
+export interface RunnerFuncs<T> {
+ run: (job: DequeuedJob<T>) => Promise<void>;
+ onComplete?: (job: DequeuedJob<T>) => Promise<void>;
+ onError?: (job: DequeuedJobError<T>) => Promise<void>;
+}
+
+export interface RunnerOptions<T> {
+ pollIntervalMs?: number;
+ timeoutSecs: number;
+ concurrency: number;
+ validator?: ZodType<T>;
+}
+
+export interface Queue<T> {
+ name(): string;
+ enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
+ stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }>;
+ cancelAllNonRunning?(): Promise<number>;
+}
+
+export interface Runner<_T> {
+ run(): Promise<void>;
+ stop(): void;
+ runUntilEmpty?(): Promise<void>;
+}
+
+export interface QueueClient {
+ init(): Promise<void>;
+ createQueue<T>(name: string, options: QueueOptions): Queue<T>;
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T>;
+ shutdown?(): Promise<void>;
+}
+
+export async function getQueueClient(): Promise<QueueClient> {
+ const client = await PluginManager.getClient(PluginType.Queue);
+ if (!client) {
+ throw new Error("Failed to get queue client");
+ }
+ return client;
+}