aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared-server/src
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/shared-server/src
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to '')
-rw-r--r--packages/shared-server/src/index.ts1
-rw-r--r--packages/shared-server/src/plugins.ts2
-rw-r--r--packages/shared-server/src/queues.ts (renamed from packages/shared/queues.ts)48
3 files changed, 20 insertions, 31 deletions
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts
index ff3c6abc..d42118c2 100644
--- a/packages/shared-server/src/index.ts
+++ b/packages/shared-server/src/index.ts
@@ -1,2 +1,3 @@
export { loadAllPlugins } from "./plugins";
export { QuotaService, StorageQuotaError } from "./services/quotaService";
+export * from "./queues";
diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts
index 86a0b344..b6a88462 100644
--- a/packages/shared-server/src/plugins.ts
+++ b/packages/shared-server/src/plugins.ts
@@ -6,6 +6,8 @@ export async function loadAllPlugins() {
return;
}
// Load plugins here. Order of plugin loading matter.
+ // Queue provider(s)
+ await import("@karakeep/plugins-queue-liteque");
await import("@karakeep/plugins-search-meilisearch");
PluginManager.logAllPlugins();
pluginsLoaded = true;
diff --git a/packages/shared/queues.ts b/packages/shared-server/src/queues.ts
index cf8920e1..c461c7cb 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -1,18 +1,15 @@
-import path from "node:path";
-import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque";
import { z } from "zod";
-import serverConfig from "./config";
-import { zRuleEngineEventSchema } from "./types/rules";
+import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing";
+import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules";
-const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db");
+import { loadAllPlugins } from ".";
-const queueDB = buildDBClient(QUEUE_DB_PATH, {
- walEnabled: serverConfig.database.walMode,
-});
+await loadAllPlugins();
+const QUEUE_CLIENT = await getQueueClient();
export function runQueueDBMigrations() {
- migrateDB(queueDB);
+ QUEUE_CLIENT.init();
}
// Link Crawler
@@ -23,9 +20,8 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>(
+export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 5,
@@ -41,9 +37,8 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>(
+export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>(
"openai_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 3,
@@ -60,16 +55,13 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
- "searching_indexing",
- queueDB,
- {
+export const SearchIndexingQueue =
+ QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", {
defaultJobArgs: {
numRetries: 5,
},
keepFailedJobs: false,
- },
-);
+ });
// Tidy Assets Worker
export const zTidyAssetsRequestSchema = z.object({
@@ -77,9 +69,8 @@ export const zTidyAssetsRequestSchema = z.object({
syncAssetMetadata: z.boolean().optional().default(false),
});
export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>;
-export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
+export const TidyAssetsQueue = QUEUE_CLIENT.createQueue<ZTidyAssetsRequest>(
"tidy_assets_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 1,
@@ -107,9 +98,8 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
+export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>(
"video_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 5,
@@ -124,9 +114,8 @@ export const zFeedRequestSchema = z.object({
});
export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-export const FeedQueue = new SqliteQueue<ZFeedRequestSchema>(
+export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>(
"feed_queue",
- queueDB,
{
defaultJobArgs: {
// One retry is enough for the feed queue given that it's periodic
@@ -145,9 +134,8 @@ export type AssetPreprocessingRequest = z.infer<
typeof zAssetPreprocessingRequestSchema
>;
export const AssetPreprocessingQueue =
- new SqliteQueue<AssetPreprocessingRequest>(
+ QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>(
"asset_preprocessing_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 2,
@@ -163,9 +151,8 @@ export const zWebhookRequestSchema = z.object({
userId: z.string().optional(),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
+export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>(
"webhook_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 3,
@@ -196,9 +183,8 @@ export const zRuleEngineRequestSchema = z.object({
events: z.array(zRuleEngineEventSchema),
});
export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>(
+export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>(
"rule_engine_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 1,