From 6ea5dd194e7be62c1a51566f31808be076d3b139 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 26 Oct 2025 09:58:51 +0000 Subject: refactor: generalize tidy assets queue into admin maintenance (#2059) * refactor: generalize admin maintenance queue * more fixes --- apps/web/components/admin/BackgroundJobs.tsx | 53 ++++++---- apps/web/lib/i18n/locales/en/translation.json | 6 +- apps/workers/index.ts | 4 +- .../workers/adminMaintenance/tasks/tidyAssets.ts | 78 +++++++++++++++ apps/workers/workers/adminMaintenanceWorker.ts | 74 ++++++++++++++ apps/workers/workers/tidyAssetsWorker.ts | 110 --------------------- docs/docs/03-configuration.md | 2 +- packages/shared-server/src/queues.ts | 25 +++-- packages/trpc/routers/admin.ts | 30 +++--- packages/trpc/stats.ts | 4 +- 10 files changed, 227 insertions(+), 159 deletions(-) create mode 100644 apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts create mode 100644 apps/workers/workers/adminMaintenanceWorker.ts delete mode 100644 apps/workers/workers/tidyAssetsWorker.ts diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx index 569fd67a..5920b1ab 100644 --- a/apps/web/components/admin/BackgroundJobs.tsx +++ b/apps/web/components/admin/BackgroundJobs.tsx @@ -320,20 +320,22 @@ function useJobActions() { }, }); - const { mutateAsync: tidyAssets, isPending: isTidyAssetsPending } = - api.admin.tidyAssets.useMutation({ - onSuccess: () => { - toast({ - description: "Tidy assets request has been enqueued!", - }); - }, - onError: (e) => { - toast({ - variant: "destructive", - description: e.message, - }); - }, - }); + const { + mutateAsync: runAdminMaintenanceTask, + isPending: isAdminMaintenancePending, + } = api.admin.runAdminMaintenanceTask.useMutation({ + onSuccess: () => { + toast({ + description: "Admin maintenance request has been enqueued!", + }); + }, + onError: (e) => { + toast({ + variant: "destructive", + description: e.message, + }); + }, + }); return { crawlActions: [ @@ -409,11 +411,18 @@ function useJobActions() { loading: isReprocessingPending, }, ], - tidyAssetsActions: [ + adminMaintenanceActions: [ { label: t("admin.background_jobs.actions.clean_assets"), - onClick: () => tidyAssets(), - loading: isTidyAssetsPending, + onClick: () => + runAdminMaintenanceTask({ + type: "tidy_assets", + args: { + cleanDanglingAssets: true, + syncAssetMetadata: true, + }, + }), + loading: isAdminMaintenancePending, }, ], }; @@ -480,11 +489,13 @@ export default function BackgroundJobs() { actions: actions.assetPreprocessingActions, }, { - title: t("admin.background_jobs.jobs.tidy_assets.title"), + title: t("admin.background_jobs.jobs.admin_maintenance.title"), icon: Database, - stats: { queued: serverStats.tidyAssetsStats.queued }, - description: t("admin.background_jobs.jobs.tidy_assets.description"), - actions: actions.tidyAssetsActions, + stats: { queued: serverStats.adminMaintenanceStats.queued }, + description: t( + "admin.background_jobs.jobs.admin_maintenance.description", + ), + actions: actions.adminMaintenanceActions, }, { title: t("admin.background_jobs.jobs.video.title"), diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json index 4e79be51..f000bab5 100644 --- a/apps/web/lib/i18n/locales/en/translation.json +++ b/apps/web/lib/i18n/locales/en/translation.json @@ -381,9 +381,9 @@ "title": "Asset Preprocessing Jobs", "description": "Image and document preprocessing (screenshots, text extraction, etc.)" }, - "tidy_assets": { - "title": "Tidy Assets Jobs", - "description": "Asset cleanup and storage optimization" + "admin_maintenance": { + "title": "Admin Maintenance Jobs", + "description": "Administrative cleanup and asset maintenance" }, "video": { "title": "Video Download Jobs", diff --git a/apps/workers/index.ts b/apps/workers/index.ts index c0270f0d..38f831d7 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -11,13 +11,13 @@ import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { shutdownPromise } from "./exit"; +import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; -import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; @@ -25,7 +25,7 @@ const workerBuilders = { crawler: () => CrawlerWorker.build(), inference: () => OpenAiWorker.build(), search: () => SearchIndexingWorker.build(), - tidyAssets: () => TidyAssetsWorker.build(), + adminMaintenance: () => AdminMaintenanceWorker.build(), video: () => VideoWorker.build(), feed: () => FeedWorker.build(), assetPreprocessing: () => AssetPreprocessingWorker.build(), diff --git a/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts new file mode 100644 index 00000000..8732c7c4 --- /dev/null +++ b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts @@ -0,0 +1,78 @@ +import { eq } from "drizzle-orm"; + +import { db } from "@karakeep/db"; +import { assets } from "@karakeep/db/schema"; +import { + ZAdminMaintenanceTidyAssetsTask, + ZTidyAssetsRequest, + zTidyAssetsRequestSchema, +} from "@karakeep/shared-server"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob } from "@karakeep/shared/queueing"; + +async function handleAsset( + asset: { + assetId: string; + userId: string; + size: number; + contentType: string; + fileName?: string | null; + }, + request: ZTidyAssetsRequest, + jobId: string, +) { + const dbRow = await db.query.assets.findFirst({ + where: eq(assets.id, asset.assetId), + }); + if (!dbRow) { + if (request.cleanDanglingAssets) { + await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); + logger.info( + `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, + ); + } else { + logger.warn( + `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, + ); + } + return; + } + + if (request.syncAssetMetadata) { + await db + .update(assets) + .set({ + contentType: asset.contentType, + fileName: asset.fileName, + size: asset.size, + }) + .where(eq(assets.id, asset.assetId)); + logger.info( + `[adminMaintenance:tidy_assets][${jobId}] Updated metadata for asset ${asset.assetId}`, + ); + } +} + +export async function runTidyAssetsTask( + job: DequeuedJob, + task: ZAdminMaintenanceTidyAssetsTask, +) { + const jobId = job.id; + const parseResult = zTidyAssetsRequestSchema.safeParse(task.args); + if (!parseResult.success) { + throw new Error( + `[adminMaintenance:tidy_assets][${jobId}] Got malformed tidy asset args: ${parseResult.error.toString()}`, + ); + } + + for await (const asset of getAllAssets()) { + try { + await handleAsset(asset, parseResult.data, jobId); + } catch (error) { + logger.error( + `[adminMaintenance:tidy_assets][${jobId}] Failed to tidy asset ${asset.assetId}: ${error}`, + ); + } + } +} diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts new file mode 100644 index 00000000..f8af3de0 --- /dev/null +++ b/apps/workers/workers/adminMaintenanceWorker.ts @@ -0,0 +1,74 @@ +import { workerStatsCounter } from "metrics"; + +import { + AdminMaintenanceQueue, + ZAdminMaintenanceTask, + zAdminMaintenanceTaskSchema, + ZAdminMaintenanceTidyAssetsTask, +} from "@karakeep/shared-server"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; + +import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets"; + +export class AdminMaintenanceWorker { + static async build() { + logger.info("Starting admin maintenance worker ..."); + const worker = + (await getQueueClient())!.createRunner( + AdminMaintenanceQueue, + { + run: runAdminMaintenance, + onComplete: (job) => { + workerStatsCounter + .labels(`adminMaintenance:${job.data.type}`, "completed") + .inc(); + logger.info( + `[adminMaintenance:${job.data.type}][${job.id}] Completed successfully`, + ); + return Promise.resolve(); + }, + onError: (job) => { + workerStatsCounter + .labels(`adminMaintenance:${job.data?.type}`, "failed") + .inc(); + logger.error( + `[adminMaintenance:${job.data?.type}][${job.id}] Job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function runAdminMaintenance(job: DequeuedJob) { + const jobId = job.id; + const parsed = zAdminMaintenanceTaskSchema.safeParse(job.data); + if (!parsed.success) { + throw new Error( + `[adminMaintenance][${jobId}] Got malformed job request: ${parsed.error.toString()}`, + ); + } + + const task = parsed.data; + + switch (task.type) { + case "tidy_assets": + return runTidyAssetsTask( + job as DequeuedJob, + task, + ); + default: + throw new Error( + `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`, + ); + } +} diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts deleted file mode 100644 index b5b95185..00000000 --- a/apps/workers/workers/tidyAssetsWorker.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { eq } from "drizzle-orm"; -import { workerStatsCounter } from "metrics"; - -import { db } from "@karakeep/db"; -import { assets } from "@karakeep/db/schema"; -import { - TidyAssetsQueue, - ZTidyAssetsRequest, - zTidyAssetsRequestSchema, -} from "@karakeep/shared-server"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; -import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; - -export class TidyAssetsWorker { - static async build() { - logger.info("Starting tidy assets worker ..."); - const worker = (await getQueueClient())!.createRunner( - TidyAssetsQueue, - { - run: runTidyAssets, - onComplete: (job) => { - workerStatsCounter.labels("tidyAssets", "completed").inc(); - const jobId = job.id; - logger.info(`[tidyAssets][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - workerStatsCounter.labels("tidyAssets", "failed").inc(); - const jobId = job.id; - logger.error( - `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function handleAsset( - asset: { - assetId: string; - userId: string; - size: number; - contentType: string; - fileName?: string | null; - }, - request: ZTidyAssetsRequest, - jobId: string, -) { - const dbRow = await db.query.assets.findFirst({ - where: eq(assets.id, asset.assetId), - }); - if (!dbRow) { - if (request.cleanDanglingAssets) { - await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); - logger.info( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, - ); - } else { - logger.warn( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, - ); - } - return; - } - - if (request.syncAssetMetadata) { - await db - .update(assets) - .set({ - contentType: asset.contentType, - fileName: asset.fileName, - size: asset.size, - }) - .where(eq(assets.id, asset.assetId)); - logger.info( - `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, - ); - } -} - -async function runTidyAssets(job: DequeuedJob) { - const jobId = job.id; - - const request = zTidyAssetsRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - for await (const asset of getAllAssets()) { - try { - handleAsset(asset, request.data, jobId); - } catch (e) { - logger.error( - `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, - ); - } - } -} diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md index d9e55322..a7682e72 100644 --- a/docs/docs/03-configuration.md +++ b/docs/docs/03-configuration.md @@ -7,7 +7,7 @@ The app is mainly configured by environment variables. All the used environment | PORT | No | 3000 | The port on which the web server will listen. DON'T CHANGE THIS IF YOU'RE USING DOCKER, instead changed the docker bound external port. | | WORKERS_PORT | No | 0 (Random Port) | The port on which the worker will export its prometheus metrics on `/metrics`. By default it's a random unused port. If you want to utilize those metrics, fix the port to a value (and export it in docker if you're using docker). | | WORKERS_HOST | No | 127.0.0.1 | Host to listen to for requests to WORKERS_PORT. You will need to set this if running in a container, since localhost will not be reachable from outside | -| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,tidyAssets,video,feed,assetPreprocessing,webhook,ruleEngine. | +| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,adminMaintenance,video,feed,assetPreprocessing,webhook,ruleEngine. | | WORKERS_DISABLED_WORKERS | No | Not set | Comma separated list of worker names to disable. Takes precedence over `WORKERS_ENABLED_WORKERS`. | | DATA_DIR | Yes | Not set | The path for the persistent data directory. This is where the db lives. Assets are stored here by default unless `ASSETS_DIR` is set. | | ASSETS_DIR | No | Not set | The path where crawled assets will be stored. If not set, defaults to `${DATA_DIR}/assets`. | diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index a666446e..813b9c3b 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -67,21 +67,34 @@ export const SearchIndexingQueue = keepFailedJobs: false, }); -// Tidy Assets Worker +// Admin maintenance worker export const zTidyAssetsRequestSchema = z.object({ cleanDanglingAssets: z.boolean().optional().default(false), syncAssetMetadata: z.boolean().optional().default(false), }); export type ZTidyAssetsRequest = z.infer; -export const TidyAssetsQueue = QUEUE_CLIENT.createQueue( - "tidy_assets_queue", - { + +export const zAdminMaintenanceTaskSchema = z.discriminatedUnion("type", [ + z.object({ + type: z.literal("tidy_assets"), + args: zTidyAssetsRequestSchema, + }), +]); + +export type ZAdminMaintenanceTask = z.infer; +export type ZAdminMaintenanceTaskType = ZAdminMaintenanceTask["type"]; +export type ZAdminMaintenanceTidyAssetsTask = Extract< + ZAdminMaintenanceTask, + { type: "tidy_assets" } +>; + +export const AdminMaintenanceQueue = + QUEUE_CLIENT.createQueue("admin_maintenance_queue", { defaultJobArgs: { numRetries: 1, }, keepFailedJobs: false, - }, -); + }); export async function triggerSearchReindex( bookmarkId: string, diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index a40dfa6f..881d947c 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -4,15 +4,16 @@ import { z } from "zod"; import { assets, bookmarkLinks, bookmarks, users } from "@karakeep/db/schema"; import { + AdminMaintenanceQueue, AssetPreprocessingQueue, FeedQueue, LinkCrawlerQueue, OpenAIQueue, SearchIndexingQueue, - TidyAssetsQueue, triggerSearchReindex, VideoWorkerQueue, WebhookQueue, + zAdminMaintenanceTaskSchema, } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { PluginManager, PluginType } from "@karakeep/shared/plugins"; @@ -63,7 +64,7 @@ export const adminAppRouter = router({ indexingStats: z.object({ queued: z.number(), }), - tidyAssetsStats: z.object({ + adminMaintenanceStats: z.object({ queued: z.number(), }), videoStats: z.object({ @@ -95,8 +96,8 @@ export const adminAppRouter = router({ [{ value: pendingInference }], [{ value: failedInference }], - // Tidy Assets - queuedTidyAssets, + // Admin maintenance + queuedAdminMaintenance, // Video queuedVideo, @@ -145,8 +146,8 @@ export const adminAppRouter = router({ ), ), - // Tidy Assets - TidyAssetsQueue.stats(), + // Admin maintenance + AdminMaintenanceQueue.stats(), // Video VideoWorkerQueue.stats(), @@ -175,8 +176,10 @@ export const adminAppRouter = router({ indexingStats: { queued: queuedIndexing.pending + queuedIndexing.pending_retry, }, - tidyAssetsStats: { - queued: queuedTidyAssets.pending + queuedTidyAssets.pending_retry, + adminMaintenanceStats: { + queued: + queuedAdminMaintenance.pending + + queuedAdminMaintenance.pending_retry, }, videoStats: { queued: queuedVideo.pending + queuedVideo.pending_retry, @@ -277,12 +280,11 @@ export const adminAppRouter = router({ ), ); }), - tidyAssets: adminProcedure.mutation(async () => { - await TidyAssetsQueue.enqueue({ - cleanDanglingAssets: true, - syncAssetMetadata: true, - }); - }), + runAdminMaintenanceTask: adminProcedure + .input(zAdminMaintenanceTaskSchema) + .mutation(async ({ input }) => { + await AdminMaintenanceQueue.enqueue(input); + }), userStats: adminProcedure .output( z.record( diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts index c6d5c94c..60766069 100644 --- a/packages/trpc/stats.ts +++ b/packages/trpc/stats.ts @@ -4,13 +4,13 @@ import { Counter, Gauge, Histogram, register } from "prom-client"; import { db } from "@karakeep/db"; import { assets, bookmarks, users } from "@karakeep/db/schema"; import { + AdminMaintenanceQueue, AssetPreprocessingQueue, FeedQueue, LinkCrawlerQueue, OpenAIQueue, RuleEngineQueue, SearchIndexingQueue, - TidyAssetsQueue, VideoWorkerQueue, WebhookQueue, } from "@karakeep/shared-server"; @@ -25,7 +25,7 @@ const queuePendingJobsGauge = new Gauge({ { name: "link_crawler", queue: LinkCrawlerQueue }, { name: "openai", queue: OpenAIQueue }, { name: "search_indexing", queue: SearchIndexingQueue }, - { name: "tidy_assets", queue: TidyAssetsQueue }, + { name: "admin_maintenance", queue: AdminMaintenanceQueue }, { name: "video_worker", queue: VideoWorkerQueue }, { name: "feed", queue: FeedQueue }, { name: "asset_preprocessing", queue: AssetPreprocessingQueue }, -- cgit v1.2.3-70-g09d2