aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared-server/src
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-25 22:27:05 +0000
committerMohamed Bassem <me@mbassem.com>2026-02-01 10:44:54 +0000
commit9decab898983bc132835d4c517fc02aa695cb4af (patch)
treead7092f1b123412fcbf89f85298f830285bd8a05 /packages/shared-server/src
parent95bfa5691ce0d43fbaa9c011df4ec9561635ff8d (diff)
downloadkarakeep-9decab898983bc132835d4c517fc02aa695cb4af.tar.zst
refactor: lazy init background queues
Diffstat (limited to 'packages/shared-server/src')
-rw-r--r--packages/shared-server/src/queues.ts144
1 files changed, 96 insertions, 48 deletions
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index 140d9c0b..38c7bfe1 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -1,19 +1,75 @@
import { z } from "zod";
-import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing";
+import {
+ EnqueueOptions,
+ getQueueClient,
+ Queue,
+ QueueClient,
+ QueueOptions,
+} from "@karakeep/shared/queueing";
import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules";
import { loadAllPlugins } from ".";
-await loadAllPlugins();
-const QUEUE_CLIENT = await getQueueClient();
+// Lazy client initialization - plugins are loaded on first access
+// We cache the promise to ensure only one initialization happens even with concurrent calls
+let clientPromise: Promise<QueueClient> | null = null;
+
+function getClient(): Promise<QueueClient> {
+ if (!clientPromise) {
+ clientPromise = (async () => {
+ await loadAllPlugins();
+ return await getQueueClient();
+ })();
+ }
+ return clientPromise;
+}
+
+/**
+ * Creates a deferred queue that initializes lazily on first use.
+ * This allows the module to be imported without requiring plugins to be loaded.
+ */
+function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ // Cache the promise to ensure only one queue is created even with concurrent calls
+ let queuePromise: Promise<Queue<T>> | null = null;
+
+ const ensureQueue = (): Promise<Queue<T>> => {
+ if (!queuePromise) {
+ queuePromise = (async () => {
+ const client = await getClient();
+ return client.createQueue<T>(name, options);
+ })();
+ }
+ return queuePromise;
+ };
+
+ return {
+ opts: options,
+ name: () => name,
+ ensureInit: async () => {
+ await ensureQueue();
+ },
+ async enqueue(payload: T, opts?: EnqueueOptions) {
+ return (await ensureQueue()).enqueue(payload, opts);
+ },
+ async stats() {
+ return (await ensureQueue()).stats();
+ },
+ async cancelAllNonRunning() {
+ const q = await ensureQueue();
+ return q.cancelAllNonRunning?.() ?? 0;
+ },
+ };
+}
export async function prepareQueue() {
- await QUEUE_CLIENT.prepare();
+ const client = await getClient();
+ await client.prepare();
}
export async function startQueue() {
- await QUEUE_CLIENT.start();
+ const client = await getClient();
+ await client.start();
}
// Link Crawler
@@ -25,7 +81,7 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
+export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
{
defaultJobArgs: {
@@ -42,15 +98,12 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>(
- "openai_queue",
- {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
+export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", {
+ defaultJobArgs: {
+ numRetries: 3,
},
-);
+ keepFailedJobs: false,
+});
// Search Indexing Worker
export const zSearchIndexingRequestSchema = z.object({
@@ -60,13 +113,15 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue =
- QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", {
+export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>(
+ "searching_indexing",
+ {
defaultJobArgs: {
numRetries: 5,
},
keepFailedJobs: false,
- });
+ },
+);
// Admin maintenance worker
export const zTidyAssetsRequestSchema = z.object({
@@ -96,13 +151,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract<
{ type: "migrate_large_link_html" }
>;
-export const AdminMaintenanceQueue =
- QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", {
+export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>(
+ "admin_maintenance_queue",
+ {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: false,
- });
+ },
+);
export async function triggerSearchReindex(
bookmarkId: string,
@@ -126,7 +183,7 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>(
+export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>(
"video_queue",
{
defaultJobArgs: {
@@ -142,16 +199,13 @@ export const zFeedRequestSchema = z.object({
});
export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>(
- "feed_queue",
- {
- defaultJobArgs: {
- // One retry is enough for the feed queue given that it's periodic
- numRetries: 1,
- },
- keepFailedJobs: false,
+export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", {
+ defaultJobArgs: {
+ // One retry is enough for the feed queue given that it's periodic
+ numRetries: 1,
},
-);
+ keepFailedJobs: false,
+});
// Preprocess Assets
export const zAssetPreprocessingRequestSchema = z.object({
@@ -162,15 +216,12 @@ export type AssetPreprocessingRequest = z.infer<
typeof zAssetPreprocessingRequestSchema
>;
export const AssetPreprocessingQueue =
- QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>(
- "asset_preprocessing_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+ createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
- );
+ keepFailedJobs: false,
+ });
// Webhook worker
export const zWebhookRequestSchema = z.object({
@@ -179,7 +230,7 @@ export const zWebhookRequestSchema = z.object({
userId: z.string().optional(),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>(
+export const WebhookQueue = createDeferredQueue<ZWebhookRequest>(
"webhook_queue",
{
defaultJobArgs: {
@@ -211,7 +262,7 @@ export const zRuleEngineRequestSchema = z.object({
events: z.array(zRuleEngineEventSchema),
});
export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>(
+export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>(
"rule_engine_queue",
{
defaultJobArgs: {
@@ -241,12 +292,9 @@ export const zBackupRequestSchema = z.object({
backupId: z.string().optional(),
});
export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>;
-export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>(
- "backup_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
-);
+ keepFailedJobs: false,
+});