From 8d32055485858210252096483bb20533dc8bdf60 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 14 Sep 2025 18:16:40 +0000 Subject: refactor: Move callsites to liteque to be behind a plugin --- packages/shared/index.ts | 1 - packages/shared/package.json | 1 - packages/shared/plugins.ts | 28 +++--- packages/shared/queueing.ts | 84 ++++++++++++++++ packages/shared/queues.ts | 222 ------------------------------------------- 5 files changed, 100 insertions(+), 236 deletions(-) create mode 100644 packages/shared/queueing.ts delete mode 100644 packages/shared/queues.ts (limited to 'packages/shared') diff --git a/packages/shared/index.ts b/packages/shared/index.ts index e449443d..e69de29b 100644 --- a/packages/shared/index.ts +++ b/packages/shared/index.ts @@ -1 +0,0 @@ -export * as Queues from "./queues"; diff --git a/packages/shared/package.json b/packages/shared/package.json index 29879868..93739354 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -9,7 +9,6 @@ "glob": "^11.0.0", "html-to-text": "^9.0.5", "js-tiktoken": "^1.0.20", - "liteque": "^0.6.0", "nodemailer": "^7.0.4", "ollama": "^0.5.14", "openai": "^4.86.1", diff --git a/packages/shared/plugins.ts b/packages/shared/plugins.ts index 2ce5826a..2aa7df4a 100644 --- a/packages/shared/plugins.ts +++ b/packages/shared/plugins.ts @@ -1,14 +1,17 @@ // Implementation inspired from Outline +import type { QueueClient } from "./queueing"; import logger from "./logger"; import { SearchIndexClient } from "./search"; export enum PluginType { Search = "search", + Queue = "queue", } interface PluginTypeMap { [PluginType.Search]: SearchIndexClient; + [PluginType.Queue]: QueueClient; } export interface TPlugin { @@ -21,37 +24,38 @@ export interface PluginProvider { getClient(): Promise; } +// Preserve the key-dependent value type: for K, store TPlugin[] +type ProviderMap = { [K in PluginType]: TPlugin[] }; + export class PluginManager { - private static providers = new Map[]>(); + private static providers: ProviderMap = { + [PluginType.Search]: [], + [PluginType.Queue]: [], + }; static register(plugin: TPlugin): void { - const p = PluginManager.providers.get(plugin.type); - if (!p) { - PluginManager.providers.set(plugin.type, [plugin]); - return; - } - p.push(plugin); + PluginManager.providers[plugin.type].push(plugin); } static async getClient( type: T, ): Promise { - const provider = PluginManager.providers.get(type); - if (!provider) { + const providers: TPlugin[] = PluginManager.providers[type]; + if (providers.length === 0) { return null; } - return await provider[provider.length - 1].provider.getClient(); + return await providers[providers.length - 1]!.provider.getClient(); } static isRegistered(type: T): boolean { - return !!PluginManager.providers.get(type); + return PluginManager.providers[type].length > 0; } static logAllPlugins() { logger.info("Plugins (Last one wins):"); for (const type of Object.values(PluginType)) { logger.info(` ${type}:`); - const plugins = PluginManager.providers.get(type); + const plugins = PluginManager.providers[type]; if (!plugins) { logger.info(" - None"); continue; diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts new file mode 100644 index 00000000..dfe3b31a --- /dev/null +++ b/packages/shared/queueing.ts @@ -0,0 +1,84 @@ +import { ZodType } from "zod"; + +import { PluginManager, PluginType } from "./plugins"; + +export interface EnqueueOptions { + numRetries?: number; + idempotencyKey?: string; + priority?: number; + delayMs?: number; +} + +export interface QueueOptions { + defaultJobArgs: { + numRetries: number; + }; + keepFailedJobs: boolean; +} + +export interface DequeuedJob { + id: string; + data: T; + priority: number; + runNumber: number; + abortSignal: AbortSignal; +} + +export interface DequeuedJobError { + id: string; + data?: T; + priority: number; + error: Error; + runNumber: number; + numRetriesLeft: number; +} + +export interface RunnerFuncs { + run: (job: DequeuedJob) => Promise; + onComplete?: (job: DequeuedJob) => Promise; + onError?: (job: DequeuedJobError) => Promise; +} + +export interface RunnerOptions { + pollIntervalMs?: number; + timeoutSecs: number; + concurrency: number; + validator?: ZodType; +} + +export interface Queue { + name(): string; + enqueue(payload: T, options?: EnqueueOptions): Promise; + stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }>; + cancelAllNonRunning?(): Promise; +} + +export interface Runner<_T> { + run(): Promise; + stop(): void; + runUntilEmpty?(): Promise; +} + +export interface QueueClient { + init(): Promise; + createQueue(name: string, options: QueueOptions): Queue; + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner; + shutdown?(): Promise; +} + +export async function getQueueClient(): Promise { + const client = await PluginManager.getClient(PluginType.Queue); + if (!client) { + throw new Error("Failed to get queue client"); + } + return client; +} diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts deleted file mode 100644 index cf8920e1..00000000 --- a/packages/shared/queues.ts +++ /dev/null @@ -1,222 +0,0 @@ -import path from "node:path"; -import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque"; -import { z } from "zod"; - -import serverConfig from "./config"; -import { zRuleEngineEventSchema } from "./types/rules"; - -const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db"); - -const queueDB = buildDBClient(QUEUE_DB_PATH, { - walEnabled: serverConfig.database.walMode, -}); - -export function runQueueDBMigrations() { - migrateDB(queueDB); -} - -// Link Crawler -export const zCrawlLinkRequestSchema = z.object({ - bookmarkId: z.string(), - runInference: z.boolean().optional(), - archiveFullPage: z.boolean().optional().default(false), -}); -export type ZCrawlLinkRequest = z.input; - -export const LinkCrawlerQueue = new SqliteQueue( - "link_crawler_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Inference Worker -export const zOpenAIRequestSchema = z.object({ - bookmarkId: z.string(), - type: z.enum(["summarize", "tag"]).default("tag"), -}); -export type ZOpenAIRequest = z.infer; - -export const OpenAIQueue = new SqliteQueue( - "openai_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }, -); - -// Search Indexing Worker -export const zSearchIndexingRequestSchema = z.object({ - bookmarkId: z.string(), - type: z.enum(["index", "delete"]), -}); -export type ZSearchIndexingRequest = z.infer< - typeof zSearchIndexingRequestSchema ->; -export const SearchIndexingQueue = new SqliteQueue( - "searching_indexing", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Tidy Assets Worker -export const zTidyAssetsRequestSchema = z.object({ - cleanDanglingAssets: z.boolean().optional().default(false), - syncAssetMetadata: z.boolean().optional().default(false), -}); -export type ZTidyAssetsRequest = z.infer; -export const TidyAssetsQueue = new SqliteQueue( - "tidy_assets_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -export async function triggerSearchReindex( - bookmarkId: string, - opts?: EnqueueOptions, -) { - await SearchIndexingQueue.enqueue( - { - bookmarkId, - type: "index", - }, - opts, - ); -} - -export const zvideoRequestSchema = z.object({ - bookmarkId: z.string(), - url: z.string(), -}); -export type ZVideoRequest = z.infer; - -export const VideoWorkerQueue = new SqliteQueue( - "video_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Feed Worker -export const zFeedRequestSchema = z.object({ - feedId: z.string(), -}); -export type ZFeedRequestSchema = z.infer; - -export const FeedQueue = new SqliteQueue( - "feed_queue", - queueDB, - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -// Preprocess Assets -export const zAssetPreprocessingRequestSchema = z.object({ - bookmarkId: z.string(), - fixMode: z.boolean().optional().default(false), -}); -export type AssetPreprocessingRequest = z.infer< - typeof zAssetPreprocessingRequestSchema ->; -export const AssetPreprocessingQueue = - new SqliteQueue( - "asset_preprocessing_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, - }, - ); - -// Webhook worker -export const zWebhookRequestSchema = z.object({ - bookmarkId: z.string(), - operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]), - userId: z.string().optional(), -}); -export type ZWebhookRequest = z.infer; -export const WebhookQueue = new SqliteQueue( - "webhook_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }, -); - -export async function triggerWebhook( - bookmarkId: string, - operation: ZWebhookRequest["operation"], - userId?: string, - opts?: EnqueueOptions, -) { - await WebhookQueue.enqueue( - { - bookmarkId, - userId, - operation, - }, - opts, - ); -} - -// RuleEngine worker -export const zRuleEngineRequestSchema = z.object({ - bookmarkId: z.string(), - events: z.array(zRuleEngineEventSchema), -}); -export type ZRuleEngineRequest = z.infer; -export const RuleEngineQueue = new SqliteQueue( - "rule_engine_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -export async function triggerRuleEngineOnEvent( - bookmarkId: string, - events: z.infer[], - opts?: EnqueueOptions, -) { - await RuleEngineQueue.enqueue( - { - events, - bookmarkId, - }, - opts, - ); -} -- cgit v1.2.3-70-g09d2