diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-26 09:58:51 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-26 09:58:51 +0000 |
| commit | 6ea5dd194e7be62c1a51566f31808be076d3b139 (patch) | |
| tree | 3a7d207bfd78a4e18d9ef135c7b5fef1106a318f /apps | |
| parent | 046c29dcf1083f0ab89b080f7696e6d642a6bd17 (diff) | |
| download | karakeep-6ea5dd194e7be62c1a51566f31808be076d3b139.tar.zst | |
refactor: generalize tidy assets queue into admin maintenance (#2059)
* refactor: generalize admin maintenance queue
* more fixes
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/web/components/admin/BackgroundJobs.tsx | 53 | ||||
| -rw-r--r-- | apps/web/lib/i18n/locales/en/translation.json | 6 | ||||
| -rw-r--r-- | apps/workers/index.ts | 4 | ||||
| -rw-r--r-- | apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts | 78 | ||||
| -rw-r--r-- | apps/workers/workers/adminMaintenanceWorker.ts | 74 | ||||
| -rw-r--r-- | apps/workers/workers/tidyAssetsWorker.ts | 110 |
6 files changed, 189 insertions, 136 deletions
diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx index 569fd67a..5920b1ab 100644 --- a/apps/web/components/admin/BackgroundJobs.tsx +++ b/apps/web/components/admin/BackgroundJobs.tsx @@ -320,20 +320,22 @@ function useJobActions() { }, }); - const { mutateAsync: tidyAssets, isPending: isTidyAssetsPending } = - api.admin.tidyAssets.useMutation({ - onSuccess: () => { - toast({ - description: "Tidy assets request has been enqueued!", - }); - }, - onError: (e) => { - toast({ - variant: "destructive", - description: e.message, - }); - }, - }); + const { + mutateAsync: runAdminMaintenanceTask, + isPending: isAdminMaintenancePending, + } = api.admin.runAdminMaintenanceTask.useMutation({ + onSuccess: () => { + toast({ + description: "Admin maintenance request has been enqueued!", + }); + }, + onError: (e) => { + toast({ + variant: "destructive", + description: e.message, + }); + }, + }); return { crawlActions: [ @@ -409,11 +411,18 @@ function useJobActions() { loading: isReprocessingPending, }, ], - tidyAssetsActions: [ + adminMaintenanceActions: [ { label: t("admin.background_jobs.actions.clean_assets"), - onClick: () => tidyAssets(), - loading: isTidyAssetsPending, + onClick: () => + runAdminMaintenanceTask({ + type: "tidy_assets", + args: { + cleanDanglingAssets: true, + syncAssetMetadata: true, + }, + }), + loading: isAdminMaintenancePending, }, ], }; @@ -480,11 +489,13 @@ export default function BackgroundJobs() { actions: actions.assetPreprocessingActions, }, { - title: t("admin.background_jobs.jobs.tidy_assets.title"), + title: t("admin.background_jobs.jobs.admin_maintenance.title"), icon: Database, - stats: { queued: serverStats.tidyAssetsStats.queued }, - description: t("admin.background_jobs.jobs.tidy_assets.description"), - actions: actions.tidyAssetsActions, + stats: { queued: serverStats.adminMaintenanceStats.queued }, + description: t( + "admin.background_jobs.jobs.admin_maintenance.description", + ), + actions: actions.adminMaintenanceActions, }, { title: t("admin.background_jobs.jobs.video.title"), diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json index 4e79be51..f000bab5 100644 --- a/apps/web/lib/i18n/locales/en/translation.json +++ b/apps/web/lib/i18n/locales/en/translation.json @@ -381,9 +381,9 @@ "title": "Asset Preprocessing Jobs", "description": "Image and document preprocessing (screenshots, text extraction, etc.)" }, - "tidy_assets": { - "title": "Tidy Assets Jobs", - "description": "Asset cleanup and storage optimization" + "admin_maintenance": { + "title": "Admin Maintenance Jobs", + "description": "Administrative cleanup and asset maintenance" }, "video": { "title": "Video Download Jobs", diff --git a/apps/workers/index.ts b/apps/workers/index.ts index c0270f0d..38f831d7 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -11,13 +11,13 @@ import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { shutdownPromise } from "./exit"; +import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; -import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; @@ -25,7 +25,7 @@ const workerBuilders = { crawler: () => CrawlerWorker.build(), inference: () => OpenAiWorker.build(), search: () => SearchIndexingWorker.build(), - tidyAssets: () => TidyAssetsWorker.build(), + adminMaintenance: () => AdminMaintenanceWorker.build(), video: () => VideoWorker.build(), feed: () => FeedWorker.build(), assetPreprocessing: () => AssetPreprocessingWorker.build(), diff --git a/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts new file mode 100644 index 00000000..8732c7c4 --- /dev/null +++ b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts @@ -0,0 +1,78 @@ +import { eq } from "drizzle-orm"; + +import { db } from "@karakeep/db"; +import { assets } from "@karakeep/db/schema"; +import { + ZAdminMaintenanceTidyAssetsTask, + ZTidyAssetsRequest, + zTidyAssetsRequestSchema, +} from "@karakeep/shared-server"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob } from "@karakeep/shared/queueing"; + +async function handleAsset( + asset: { + assetId: string; + userId: string; + size: number; + contentType: string; + fileName?: string | null; + }, + request: ZTidyAssetsRequest, + jobId: string, +) { + const dbRow = await db.query.assets.findFirst({ + where: eq(assets.id, asset.assetId), + }); + if (!dbRow) { + if (request.cleanDanglingAssets) { + await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); + logger.info( + `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, + ); + } else { + logger.warn( + `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, + ); + } + return; + } + + if (request.syncAssetMetadata) { + await db + .update(assets) + .set({ + contentType: asset.contentType, + fileName: asset.fileName, + size: asset.size, + }) + .where(eq(assets.id, asset.assetId)); + logger.info( + `[adminMaintenance:tidy_assets][${jobId}] Updated metadata for asset ${asset.assetId}`, + ); + } +} + +export async function runTidyAssetsTask( + job: DequeuedJob<ZAdminMaintenanceTidyAssetsTask>, + task: ZAdminMaintenanceTidyAssetsTask, +) { + const jobId = job.id; + const parseResult = zTidyAssetsRequestSchema.safeParse(task.args); + if (!parseResult.success) { + throw new Error( + `[adminMaintenance:tidy_assets][${jobId}] Got malformed tidy asset args: ${parseResult.error.toString()}`, + ); + } + + for await (const asset of getAllAssets()) { + try { + await handleAsset(asset, parseResult.data, jobId); + } catch (error) { + logger.error( + `[adminMaintenance:tidy_assets][${jobId}] Failed to tidy asset ${asset.assetId}: ${error}`, + ); + } + } +} diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts new file mode 100644 index 00000000..f8af3de0 --- /dev/null +++ b/apps/workers/workers/adminMaintenanceWorker.ts @@ -0,0 +1,74 @@ +import { workerStatsCounter } from "metrics"; + +import { + AdminMaintenanceQueue, + ZAdminMaintenanceTask, + zAdminMaintenanceTaskSchema, + ZAdminMaintenanceTidyAssetsTask, +} from "@karakeep/shared-server"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; + +import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets"; + +export class AdminMaintenanceWorker { + static async build() { + logger.info("Starting admin maintenance worker ..."); + const worker = + (await getQueueClient())!.createRunner<ZAdminMaintenanceTask>( + AdminMaintenanceQueue, + { + run: runAdminMaintenance, + onComplete: (job) => { + workerStatsCounter + .labels(`adminMaintenance:${job.data.type}`, "completed") + .inc(); + logger.info( + `[adminMaintenance:${job.data.type}][${job.id}] Completed successfully`, + ); + return Promise.resolve(); + }, + onError: (job) => { + workerStatsCounter + .labels(`adminMaintenance:${job.data?.type}`, "failed") + .inc(); + logger.error( + `[adminMaintenance:${job.data?.type}][${job.id}] Job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) { + const jobId = job.id; + const parsed = zAdminMaintenanceTaskSchema.safeParse(job.data); + if (!parsed.success) { + throw new Error( + `[adminMaintenance][${jobId}] Got malformed job request: ${parsed.error.toString()}`, + ); + } + + const task = parsed.data; + + switch (task.type) { + case "tidy_assets": + return runTidyAssetsTask( + job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>, + task, + ); + default: + throw new Error( + `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`, + ); + } +} diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts deleted file mode 100644 index b5b95185..00000000 --- a/apps/workers/workers/tidyAssetsWorker.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { eq } from "drizzle-orm"; -import { workerStatsCounter } from "metrics"; - -import { db } from "@karakeep/db"; -import { assets } from "@karakeep/db/schema"; -import { - TidyAssetsQueue, - ZTidyAssetsRequest, - zTidyAssetsRequestSchema, -} from "@karakeep/shared-server"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; -import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; - -export class TidyAssetsWorker { - static async build() { - logger.info("Starting tidy assets worker ..."); - const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>( - TidyAssetsQueue, - { - run: runTidyAssets, - onComplete: (job) => { - workerStatsCounter.labels("tidyAssets", "completed").inc(); - const jobId = job.id; - logger.info(`[tidyAssets][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - workerStatsCounter.labels("tidyAssets", "failed").inc(); - const jobId = job.id; - logger.error( - `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function handleAsset( - asset: { - assetId: string; - userId: string; - size: number; - contentType: string; - fileName?: string | null; - }, - request: ZTidyAssetsRequest, - jobId: string, -) { - const dbRow = await db.query.assets.findFirst({ - where: eq(assets.id, asset.assetId), - }); - if (!dbRow) { - if (request.cleanDanglingAssets) { - await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); - logger.info( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, - ); - } else { - logger.warn( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, - ); - } - return; - } - - if (request.syncAssetMetadata) { - await db - .update(assets) - .set({ - contentType: asset.contentType, - fileName: asset.fileName, - size: asset.size, - }) - .where(eq(assets.id, asset.assetId)); - logger.info( - `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, - ); - } -} - -async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) { - const jobId = job.id; - - const request = zTidyAssetsRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - for await (const asset of getAllAssets()) { - try { - handleAsset(asset, request.data, jobId); - } catch (e) { - logger.error( - `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, - ); - } - } -} |
