aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/shared')
-rw-r--r--packages/shared/index.ts1
-rw-r--r--packages/shared/package.json1
-rw-r--r--packages/shared/plugins.ts28
-rw-r--r--packages/shared/queueing.ts84
-rw-r--r--packages/shared/queues.ts222
5 files changed, 100 insertions, 236 deletions
diff --git a/packages/shared/index.ts b/packages/shared/index.ts
index e449443d..e69de29b 100644
--- a/packages/shared/index.ts
+++ b/packages/shared/index.ts
@@ -1 +0,0 @@
-export * as Queues from "./queues";
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 29879868..93739354 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -9,7 +9,6 @@
"glob": "^11.0.0",
"html-to-text": "^9.0.5",
"js-tiktoken": "^1.0.20",
- "liteque": "^0.6.0",
"nodemailer": "^7.0.4",
"ollama": "^0.5.14",
"openai": "^4.86.1",
diff --git a/packages/shared/plugins.ts b/packages/shared/plugins.ts
index 2ce5826a..2aa7df4a 100644
--- a/packages/shared/plugins.ts
+++ b/packages/shared/plugins.ts
@@ -1,14 +1,17 @@
// Implementation inspired from Outline
+import type { QueueClient } from "./queueing";
import logger from "./logger";
import { SearchIndexClient } from "./search";
export enum PluginType {
Search = "search",
+ Queue = "queue",
}
interface PluginTypeMap {
[PluginType.Search]: SearchIndexClient;
+ [PluginType.Queue]: QueueClient;
}
export interface TPlugin<T extends PluginType> {
@@ -21,37 +24,38 @@ export interface PluginProvider<T> {
getClient(): Promise<T | null>;
}
+// Preserve the key-dependent value type: for K, store TPlugin<K>[]
+type ProviderMap = { [K in PluginType]: TPlugin<K>[] };
+
export class PluginManager {
- private static providers = new Map<PluginType, TPlugin<PluginType>[]>();
+ private static providers: ProviderMap = {
+ [PluginType.Search]: [],
+ [PluginType.Queue]: [],
+ };
static register<T extends PluginType>(plugin: TPlugin<T>): void {
- const p = PluginManager.providers.get(plugin.type);
- if (!p) {
- PluginManager.providers.set(plugin.type, [plugin]);
- return;
- }
- p.push(plugin);
+ PluginManager.providers[plugin.type].push(plugin);
}
static async getClient<T extends PluginType>(
type: T,
): Promise<PluginTypeMap[T] | null> {
- const provider = PluginManager.providers.get(type);
- if (!provider) {
+ const providers: TPlugin<T>[] = PluginManager.providers[type];
+ if (providers.length === 0) {
return null;
}
- return await provider[provider.length - 1].provider.getClient();
+ return await providers[providers.length - 1]!.provider.getClient();
}
static isRegistered<T extends PluginType>(type: T): boolean {
- return !!PluginManager.providers.get(type);
+ return PluginManager.providers[type].length > 0;
}
static logAllPlugins() {
logger.info("Plugins (Last one wins):");
for (const type of Object.values(PluginType)) {
logger.info(` ${type}:`);
- const plugins = PluginManager.providers.get(type);
+ const plugins = PluginManager.providers[type];
if (!plugins) {
logger.info(" - None");
continue;
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
new file mode 100644
index 00000000..dfe3b31a
--- /dev/null
+++ b/packages/shared/queueing.ts
@@ -0,0 +1,84 @@
+import { ZodType } from "zod";
+
+import { PluginManager, PluginType } from "./plugins";
+
+export interface EnqueueOptions {
+ numRetries?: number;
+ idempotencyKey?: string;
+ priority?: number;
+ delayMs?: number;
+}
+
+export interface QueueOptions {
+ defaultJobArgs: {
+ numRetries: number;
+ };
+ keepFailedJobs: boolean;
+}
+
+export interface DequeuedJob<T> {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ abortSignal: AbortSignal;
+}
+
+export interface DequeuedJobError<T> {
+ id: string;
+ data?: T;
+ priority: number;
+ error: Error;
+ runNumber: number;
+ numRetriesLeft: number;
+}
+
+export interface RunnerFuncs<T> {
+ run: (job: DequeuedJob<T>) => Promise<void>;
+ onComplete?: (job: DequeuedJob<T>) => Promise<void>;
+ onError?: (job: DequeuedJobError<T>) => Promise<void>;
+}
+
+export interface RunnerOptions<T> {
+ pollIntervalMs?: number;
+ timeoutSecs: number;
+ concurrency: number;
+ validator?: ZodType<T>;
+}
+
+export interface Queue<T> {
+ name(): string;
+ enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
+ stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }>;
+ cancelAllNonRunning?(): Promise<number>;
+}
+
+export interface Runner<_T> {
+ run(): Promise<void>;
+ stop(): void;
+ runUntilEmpty?(): Promise<void>;
+}
+
+export interface QueueClient {
+ init(): Promise<void>;
+ createQueue<T>(name: string, options: QueueOptions): Queue<T>;
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T>;
+ shutdown?(): Promise<void>;
+}
+
+export async function getQueueClient(): Promise<QueueClient> {
+ const client = await PluginManager.getClient(PluginType.Queue);
+ if (!client) {
+ throw new Error("Failed to get queue client");
+ }
+ return client;
+}
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
deleted file mode 100644
index cf8920e1..00000000
--- a/packages/shared/queues.ts
+++ /dev/null
@@ -1,222 +0,0 @@
-import path from "node:path";
-import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque";
-import { z } from "zod";
-
-import serverConfig from "./config";
-import { zRuleEngineEventSchema } from "./types/rules";
-
-const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db");
-
-const queueDB = buildDBClient(QUEUE_DB_PATH, {
- walEnabled: serverConfig.database.walMode,
-});
-
-export function runQueueDBMigrations() {
- migrateDB(queueDB);
-}
-
-// Link Crawler
-export const zCrawlLinkRequestSchema = z.object({
- bookmarkId: z.string(),
- runInference: z.boolean().optional(),
- archiveFullPage: z.boolean().optional().default(false),
-});
-export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-
-export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>(
- "link_crawler_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 5,
- },
- keepFailedJobs: false,
- },
-);
-
-// Inference Worker
-export const zOpenAIRequestSchema = z.object({
- bookmarkId: z.string(),
- type: z.enum(["summarize", "tag"]).default("tag"),
-});
-export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-
-export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>(
- "openai_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
- },
-);
-
-// Search Indexing Worker
-export const zSearchIndexingRequestSchema = z.object({
- bookmarkId: z.string(),
- type: z.enum(["index", "delete"]),
-});
-export type ZSearchIndexingRequest = z.infer<
- typeof zSearchIndexingRequestSchema
->;
-export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
- "searching_indexing",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 5,
- },
- keepFailedJobs: false,
- },
-);
-
-// Tidy Assets Worker
-export const zTidyAssetsRequestSchema = z.object({
- cleanDanglingAssets: z.boolean().optional().default(false),
- syncAssetMetadata: z.boolean().optional().default(false),
-});
-export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>;
-export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
- "tidy_assets_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 1,
- },
- keepFailedJobs: false,
- },
-);
-
-export async function triggerSearchReindex(
- bookmarkId: string,
- opts?: EnqueueOptions,
-) {
- await SearchIndexingQueue.enqueue(
- {
- bookmarkId,
- type: "index",
- },
- opts,
- );
-}
-
-export const zvideoRequestSchema = z.object({
- bookmarkId: z.string(),
- url: z.string(),
-});
-export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-
-export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
- "video_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 5,
- },
- keepFailedJobs: false,
- },
-);
-
-// Feed Worker
-export const zFeedRequestSchema = z.object({
- feedId: z.string(),
-});
-export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-
-export const FeedQueue = new SqliteQueue<ZFeedRequestSchema>(
- "feed_queue",
- queueDB,
- {
- defaultJobArgs: {
- // One retry is enough for the feed queue given that it's periodic
- numRetries: 1,
- },
- keepFailedJobs: false,
- },
-);
-
-// Preprocess Assets
-export const zAssetPreprocessingRequestSchema = z.object({
- bookmarkId: z.string(),
- fixMode: z.boolean().optional().default(false),
-});
-export type AssetPreprocessingRequest = z.infer<
- typeof zAssetPreprocessingRequestSchema
->;
-export const AssetPreprocessingQueue =
- new SqliteQueue<AssetPreprocessingRequest>(
- "asset_preprocessing_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
- },
- );
-
-// Webhook worker
-export const zWebhookRequestSchema = z.object({
- bookmarkId: z.string(),
- operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]),
- userId: z.string().optional(),
-});
-export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
- "webhook_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
- },
-);
-
-export async function triggerWebhook(
- bookmarkId: string,
- operation: ZWebhookRequest["operation"],
- userId?: string,
- opts?: EnqueueOptions,
-) {
- await WebhookQueue.enqueue(
- {
- bookmarkId,
- userId,
- operation,
- },
- opts,
- );
-}
-
-// RuleEngine worker
-export const zRuleEngineRequestSchema = z.object({
- bookmarkId: z.string(),
- events: z.array(zRuleEngineEventSchema),
-});
-export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>(
- "rule_engine_queue",
- queueDB,
- {
- defaultJobArgs: {
- numRetries: 1,
- },
- keepFailedJobs: false,
- },
-);
-
-export async function triggerRuleEngineOnEvent(
- bookmarkId: string,
- events: z.infer<typeof zRuleEngineEventSchema>[],
- opts?: EnqueueOptions,
-) {
- await RuleEngineQueue.enqueue(
- {
- events,
- bookmarkId,
- },
- opts,
- );
-}