aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-25 22:27:05 +0000
committerMohamed Bassem <me@mbassem.com>2026-02-01 10:44:54 +0000
commit9decab898983bc132835d4c517fc02aa695cb4af (patch)
treead7092f1b123412fcbf89f85298f830285bd8a05
parent95bfa5691ce0d43fbaa9c011df4ec9561635ff8d (diff)
downloadkarakeep-9decab898983bc132835d4c517fc02aa695cb4af.tar.zst
refactor: lazy init background queues
-rw-r--r--apps/workers/index.ts60
-rw-r--r--packages/plugins/queue-liteque/src/index.ts4
-rw-r--r--packages/plugins/queue-restate/src/index.ts4
-rw-r--r--packages/shared-server/src/queues.ts144
-rw-r--r--packages/shared/queueing.ts1
5 files changed, 155 insertions, 58 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 07840a4c..dfbac85b 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -3,11 +3,21 @@ import "dotenv/config";
import { buildServer } from "server";
import {
+ AdminMaintenanceQueue,
+ AssetPreprocessingQueue,
+ BackupQueue,
+ FeedQueue,
initTracing,
+ LinkCrawlerQueue,
loadAllPlugins,
+ OpenAIQueue,
prepareQueue,
+ RuleEngineQueue,
+ SearchIndexingQueue,
shutdownTracing,
startQueue,
+ VideoWorkerQueue,
+ WebhookQueue,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
@@ -25,16 +35,46 @@ import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
const workerBuilders = {
- crawler: () => CrawlerWorker.build(),
- inference: () => OpenAiWorker.build(),
- search: () => SearchIndexingWorker.build(),
- adminMaintenance: () => AdminMaintenanceWorker.build(),
- video: () => VideoWorker.build(),
- feed: () => FeedWorker.build(),
- assetPreprocessing: () => AssetPreprocessingWorker.build(),
- webhook: () => WebhookWorker.build(),
- ruleEngine: () => RuleEngineWorker.build(),
- backup: () => BackupWorker.build(),
+ crawler: async () => {
+ await LinkCrawlerQueue.ensureInit();
+ return CrawlerWorker.build();
+ },
+ inference: async () => {
+ await OpenAIQueue.ensureInit();
+ return OpenAiWorker.build();
+ },
+ search: async () => {
+ await SearchIndexingQueue.ensureInit();
+ return SearchIndexingWorker.build();
+ },
+ adminMaintenance: async () => {
+ await AdminMaintenanceQueue.ensureInit();
+ return AdminMaintenanceWorker.build();
+ },
+ video: async () => {
+ await VideoWorkerQueue.ensureInit();
+ return VideoWorker.build();
+ },
+ feed: async () => {
+ await FeedQueue.ensureInit();
+ return FeedWorker.build();
+ },
+ assetPreprocessing: async () => {
+ await AssetPreprocessingQueue.ensureInit();
+ return AssetPreprocessingWorker.build();
+ },
+ webhook: async () => {
+ await WebhookQueue.ensureInit();
+ return WebhookWorker.build();
+ },
+ ruleEngine: async () => {
+ await RuleEngineQueue.ensureInit();
+ return RuleEngineWorker.build();
+ },
+ backup: async () => {
+ await BackupQueue.ensureInit();
+ return BackupWorker.build();
+ },
} as const;
type WorkerName = keyof typeof workerBuilders;
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
index b809d158..48c4ed59 100644
--- a/packages/plugins/queue-liteque/src/index.ts
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -28,6 +28,10 @@ class LitequeQueueWrapper<T> implements Queue<T> {
public readonly opts: QueueOptions,
) {}
+ ensureInit(): Promise<void> {
+ return Promise.resolve();
+ }
+
name(): string {
return this._name;
}
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index 64572b03..f8761291 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -27,6 +27,10 @@ class RestateQueueWrapper<T> implements Queue<T> {
public readonly opts: QueueOptions,
) {}
+ ensureInit(): Promise<void> {
+ return Promise.resolve();
+ }
+
name(): string {
return this._name;
}
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index 140d9c0b..38c7bfe1 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -1,19 +1,75 @@
import { z } from "zod";
-import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing";
+import {
+ EnqueueOptions,
+ getQueueClient,
+ Queue,
+ QueueClient,
+ QueueOptions,
+} from "@karakeep/shared/queueing";
import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules";
import { loadAllPlugins } from ".";
-await loadAllPlugins();
-const QUEUE_CLIENT = await getQueueClient();
+// Lazy client initialization - plugins are loaded on first access
+// We cache the promise to ensure only one initialization happens even with concurrent calls
+let clientPromise: Promise<QueueClient> | null = null;
+
+function getClient(): Promise<QueueClient> {
+ if (!clientPromise) {
+ clientPromise = (async () => {
+ await loadAllPlugins();
+ return await getQueueClient();
+ })();
+ }
+ return clientPromise;
+}
+
+/**
+ * Creates a deferred queue that initializes lazily on first use.
+ * This allows the module to be imported without requiring plugins to be loaded.
+ */
+function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ // Cache the promise to ensure only one queue is created even with concurrent calls
+ let queuePromise: Promise<Queue<T>> | null = null;
+
+ const ensureQueue = (): Promise<Queue<T>> => {
+ if (!queuePromise) {
+ queuePromise = (async () => {
+ const client = await getClient();
+ return client.createQueue<T>(name, options);
+ })();
+ }
+ return queuePromise;
+ };
+
+ return {
+ opts: options,
+ name: () => name,
+ ensureInit: async () => {
+ await ensureQueue();
+ },
+ async enqueue(payload: T, opts?: EnqueueOptions) {
+ return (await ensureQueue()).enqueue(payload, opts);
+ },
+ async stats() {
+ return (await ensureQueue()).stats();
+ },
+ async cancelAllNonRunning() {
+ const q = await ensureQueue();
+ return q.cancelAllNonRunning?.() ?? 0;
+ },
+ };
+}
export async function prepareQueue() {
- await QUEUE_CLIENT.prepare();
+ const client = await getClient();
+ await client.prepare();
}
export async function startQueue() {
- await QUEUE_CLIENT.start();
+ const client = await getClient();
+ await client.start();
}
// Link Crawler
@@ -25,7 +81,7 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
+export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
{
defaultJobArgs: {
@@ -42,15 +98,12 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>(
- "openai_queue",
- {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
+export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", {
+ defaultJobArgs: {
+ numRetries: 3,
},
-);
+ keepFailedJobs: false,
+});
// Search Indexing Worker
export const zSearchIndexingRequestSchema = z.object({
@@ -60,13 +113,15 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue =
- QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", {
+export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>(
+ "searching_indexing",
+ {
defaultJobArgs: {
numRetries: 5,
},
keepFailedJobs: false,
- });
+ },
+);
// Admin maintenance worker
export const zTidyAssetsRequestSchema = z.object({
@@ -96,13 +151,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract<
{ type: "migrate_large_link_html" }
>;
-export const AdminMaintenanceQueue =
- QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", {
+export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>(
+ "admin_maintenance_queue",
+ {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: false,
- });
+ },
+);
export async function triggerSearchReindex(
bookmarkId: string,
@@ -126,7 +183,7 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>(
+export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>(
"video_queue",
{
defaultJobArgs: {
@@ -142,16 +199,13 @@ export const zFeedRequestSchema = z.object({
});
export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>(
- "feed_queue",
- {
- defaultJobArgs: {
- // One retry is enough for the feed queue given that it's periodic
- numRetries: 1,
- },
- keepFailedJobs: false,
+export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", {
+ defaultJobArgs: {
+ // One retry is enough for the feed queue given that it's periodic
+ numRetries: 1,
},
-);
+ keepFailedJobs: false,
+});
// Preprocess Assets
export const zAssetPreprocessingRequestSchema = z.object({
@@ -162,15 +216,12 @@ export type AssetPreprocessingRequest = z.infer<
typeof zAssetPreprocessingRequestSchema
>;
export const AssetPreprocessingQueue =
- QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>(
- "asset_preprocessing_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+ createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
- );
+ keepFailedJobs: false,
+ });
// Webhook worker
export const zWebhookRequestSchema = z.object({
@@ -179,7 +230,7 @@ export const zWebhookRequestSchema = z.object({
userId: z.string().optional(),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>(
+export const WebhookQueue = createDeferredQueue<ZWebhookRequest>(
"webhook_queue",
{
defaultJobArgs: {
@@ -211,7 +262,7 @@ export const zRuleEngineRequestSchema = z.object({
events: z.array(zRuleEngineEventSchema),
});
export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>(
+export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>(
"rule_engine_queue",
{
defaultJobArgs: {
@@ -241,12 +292,9 @@ export const zBackupRequestSchema = z.object({
backupId: z.string().optional(),
});
export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>;
-export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>(
- "backup_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
-);
+ keepFailedJobs: false,
+});
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
index bc2c9cfa..d1f4bcef 100644
--- a/packages/shared/queueing.ts
+++ b/packages/shared/queueing.ts
@@ -63,6 +63,7 @@ export interface RunnerOptions<T> {
export interface Queue<T> {
opts: QueueOptions;
+ ensureInit(): Promise<void>;
name(): string;
enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
stats(): Promise<{