diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:40 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:57 +0000 |
| commit | 8d32055485858210252096483bb20533dc8bdf60 (patch) | |
| tree | ce8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared-server/src | |
| parent | 6ba61b46154e076fca47d3841b158105dbeeef80 (diff) | |
| download | karakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst | |
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/shared-server/src')
| -rw-r--r-- | packages/shared-server/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/shared-server/src/plugins.ts | 2 | ||||
| -rw-r--r-- | packages/shared-server/src/queues.ts | 208 |
3 files changed, 211 insertions, 0 deletions
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts index ff3c6abc..d42118c2 100644 --- a/packages/shared-server/src/index.ts +++ b/packages/shared-server/src/index.ts @@ -1,2 +1,3 @@ export { loadAllPlugins } from "./plugins"; export { QuotaService, StorageQuotaError } from "./services/quotaService"; +export * from "./queues"; diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts index 86a0b344..b6a88462 100644 --- a/packages/shared-server/src/plugins.ts +++ b/packages/shared-server/src/plugins.ts @@ -6,6 +6,8 @@ export async function loadAllPlugins() { return; } // Load plugins here. Order of plugin loading matter. + // Queue provider(s) + await import("@karakeep/plugins-queue-liteque"); await import("@karakeep/plugins-search-meilisearch"); PluginManager.logAllPlugins(); pluginsLoaded = true; diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts new file mode 100644 index 00000000..c461c7cb --- /dev/null +++ b/packages/shared-server/src/queues.ts @@ -0,0 +1,208 @@ +import { z } from "zod"; + +import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; + +import { loadAllPlugins } from "."; + +await loadAllPlugins(); +const QUEUE_CLIENT = await getQueueClient(); + +export function runQueueDBMigrations() { + QUEUE_CLIENT.init(); +} + +// Link Crawler +export const zCrawlLinkRequestSchema = z.object({ + bookmarkId: z.string(), + runInference: z.boolean().optional(), + archiveFullPage: z.boolean().optional().default(false), +}); +export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>; + +export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( + "link_crawler_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + +// Inference Worker +export const zOpenAIRequestSchema = z.object({ + bookmarkId: z.string(), + type: z.enum(["summarize", "tag"]).default("tag"), +}); +export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; + +export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>( + "openai_queue", + { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }, +); + +// Search Indexing Worker +export const zSearchIndexingRequestSchema = z.object({ + bookmarkId: z.string(), + type: z.enum(["index", "delete"]), +}); +export type ZSearchIndexingRequest = z.infer< + typeof zSearchIndexingRequestSchema +>; +export const SearchIndexingQueue = + QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }); + +// Tidy Assets Worker +export const zTidyAssetsRequestSchema = z.object({ + cleanDanglingAssets: z.boolean().optional().default(false), + syncAssetMetadata: z.boolean().optional().default(false), +}); +export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>; +export const TidyAssetsQueue = QUEUE_CLIENT.createQueue<ZTidyAssetsRequest>( + "tidy_assets_queue", + { + defaultJobArgs: { + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +export async function triggerSearchReindex( + bookmarkId: string, + opts?: EnqueueOptions, +) { + await SearchIndexingQueue.enqueue( + { + bookmarkId, + type: "index", + }, + opts, + ); +} + +export const zvideoRequestSchema = z.object({ + bookmarkId: z.string(), + url: z.string(), +}); +export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>; + +export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>( + "video_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + +// Feed Worker +export const zFeedRequestSchema = z.object({ + feedId: z.string(), +}); +export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>; + +export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>( + "feed_queue", + { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +// Preprocess Assets +export const zAssetPreprocessingRequestSchema = z.object({ + bookmarkId: z.string(), + fixMode: z.boolean().optional().default(false), +}); +export type AssetPreprocessingRequest = z.infer< + typeof zAssetPreprocessingRequestSchema +>; +export const AssetPreprocessingQueue = + QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>( + "asset_preprocessing_queue", + { + defaultJobArgs: { + numRetries: 2, + }, + keepFailedJobs: false, + }, + ); + +// Webhook worker +export const zWebhookRequestSchema = z.object({ + bookmarkId: z.string(), + operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]), + userId: z.string().optional(), +}); +export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; +export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>( + "webhook_queue", + { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }, +); + +export async function triggerWebhook( + bookmarkId: string, + operation: ZWebhookRequest["operation"], + userId?: string, + opts?: EnqueueOptions, +) { + await WebhookQueue.enqueue( + { + bookmarkId, + userId, + operation, + }, + opts, + ); +} + +// RuleEngine worker +export const zRuleEngineRequestSchema = z.object({ + bookmarkId: z.string(), + events: z.array(zRuleEngineEventSchema), +}); +export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>; +export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>( + "rule_engine_queue", + { + defaultJobArgs: { + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +export async function triggerRuleEngineOnEvent( + bookmarkId: string, + events: z.infer<typeof zRuleEngineEventSchema>[], + opts?: EnqueueOptions, +) { + await RuleEngineQueue.enqueue( + { + events, + bookmarkId, + }, + opts, + ); +} |
