diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:40 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:57 +0000 |
| commit | 8d32055485858210252096483bb20533dc8bdf60 (patch) | |
| tree | ce8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared-server/src | |
| parent | 6ba61b46154e076fca47d3841b158105dbeeef80 (diff) | |
| download | karakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst | |
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to '')
| -rw-r--r-- | packages/shared-server/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/shared-server/src/plugins.ts | 2 | ||||
| -rw-r--r-- | packages/shared-server/src/queues.ts (renamed from packages/shared/queues.ts) | 48 |
3 files changed, 20 insertions, 31 deletions
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts index ff3c6abc..d42118c2 100644 --- a/packages/shared-server/src/index.ts +++ b/packages/shared-server/src/index.ts @@ -1,2 +1,3 @@ export { loadAllPlugins } from "./plugins"; export { QuotaService, StorageQuotaError } from "./services/quotaService"; +export * from "./queues"; diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts index 86a0b344..b6a88462 100644 --- a/packages/shared-server/src/plugins.ts +++ b/packages/shared-server/src/plugins.ts @@ -6,6 +6,8 @@ export async function loadAllPlugins() { return; } // Load plugins here. Order of plugin loading matter. + // Queue provider(s) + await import("@karakeep/plugins-queue-liteque"); await import("@karakeep/plugins-search-meilisearch"); PluginManager.logAllPlugins(); pluginsLoaded = true; diff --git a/packages/shared/queues.ts b/packages/shared-server/src/queues.ts index cf8920e1..c461c7cb 100644 --- a/packages/shared/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -1,18 +1,15 @@ -import path from "node:path"; -import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque"; import { z } from "zod"; -import serverConfig from "./config"; -import { zRuleEngineEventSchema } from "./types/rules"; +import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; -const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db"); +import { loadAllPlugins } from "."; -const queueDB = buildDBClient(QUEUE_DB_PATH, { - walEnabled: serverConfig.database.walMode, -}); +await loadAllPlugins(); +const QUEUE_CLIENT = await getQueueClient(); export function runQueueDBMigrations() { - migrateDB(queueDB); + QUEUE_CLIENT.init(); } // Link Crawler @@ -23,9 +20,8 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>( +export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( "link_crawler_queue", - queueDB, { defaultJobArgs: { numRetries: 5, @@ -41,9 +37,8 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>( +export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>( "openai_queue", - queueDB, { defaultJobArgs: { numRetries: 3, @@ -60,16 +55,13 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>( - "searching_indexing", - queueDB, - { +export const SearchIndexingQueue = + QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", { defaultJobArgs: { numRetries: 5, }, keepFailedJobs: false, - }, -); + }); // Tidy Assets Worker export const zTidyAssetsRequestSchema = z.object({ @@ -77,9 +69,8 @@ export const zTidyAssetsRequestSchema = z.object({ syncAssetMetadata: z.boolean().optional().default(false), }); export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>; -export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>( +export const TidyAssetsQueue = QUEUE_CLIENT.createQueue<ZTidyAssetsRequest>( "tidy_assets_queue", - queueDB, { defaultJobArgs: { numRetries: 1, @@ -107,9 +98,8 @@ export const zvideoRequestSchema = z.object({ }); export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>; -export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>( +export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>( "video_queue", - queueDB, { defaultJobArgs: { numRetries: 5, @@ -124,9 +114,8 @@ export const zFeedRequestSchema = z.object({ }); export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>; -export const FeedQueue = new SqliteQueue<ZFeedRequestSchema>( +export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>( "feed_queue", - queueDB, { defaultJobArgs: { // One retry is enough for the feed queue given that it's periodic @@ -145,9 +134,8 @@ export type AssetPreprocessingRequest = z.infer< typeof zAssetPreprocessingRequestSchema >; export const AssetPreprocessingQueue = - new SqliteQueue<AssetPreprocessingRequest>( + QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>( "asset_preprocessing_queue", - queueDB, { defaultJobArgs: { numRetries: 2, @@ -163,9 +151,8 @@ export const zWebhookRequestSchema = z.object({ userId: z.string().optional(), }); export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; -export const WebhookQueue = new SqliteQueue<ZWebhookRequest>( +export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>( "webhook_queue", - queueDB, { defaultJobArgs: { numRetries: 3, @@ -196,9 +183,8 @@ export const zRuleEngineRequestSchema = z.object({ events: z.array(zRuleEngineEventSchema), }); export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>; -export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>( +export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>( "rule_engine_queue", - queueDB, { defaultJobArgs: { numRetries: 1, |
