aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-10-26 09:58:51 +0000
committerGitHub <noreply@github.com>2025-10-26 09:58:51 +0000
commit6ea5dd194e7be62c1a51566f31808be076d3b139 (patch)
tree3a7d207bfd78a4e18d9ef135c7b5fef1106a318f /apps
parent046c29dcf1083f0ab89b080f7696e6d642a6bd17 (diff)
downloadkarakeep-6ea5dd194e7be62c1a51566f31808be076d3b139.tar.zst
refactor: generalize tidy assets queue into admin maintenance (#2059)
* refactor: generalize admin maintenance queue * more fixes
Diffstat (limited to 'apps')
-rw-r--r--apps/web/components/admin/BackgroundJobs.tsx53
-rw-r--r--apps/web/lib/i18n/locales/en/translation.json6
-rw-r--r--apps/workers/index.ts4
-rw-r--r--apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts78
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts74
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts110
6 files changed, 189 insertions, 136 deletions
diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx
index 569fd67a..5920b1ab 100644
--- a/apps/web/components/admin/BackgroundJobs.tsx
+++ b/apps/web/components/admin/BackgroundJobs.tsx
@@ -320,20 +320,22 @@ function useJobActions() {
},
});
- const { mutateAsync: tidyAssets, isPending: isTidyAssetsPending } =
- api.admin.tidyAssets.useMutation({
- onSuccess: () => {
- toast({
- description: "Tidy assets request has been enqueued!",
- });
- },
- onError: (e) => {
- toast({
- variant: "destructive",
- description: e.message,
- });
- },
- });
+ const {
+ mutateAsync: runAdminMaintenanceTask,
+ isPending: isAdminMaintenancePending,
+ } = api.admin.runAdminMaintenanceTask.useMutation({
+ onSuccess: () => {
+ toast({
+ description: "Admin maintenance request has been enqueued!",
+ });
+ },
+ onError: (e) => {
+ toast({
+ variant: "destructive",
+ description: e.message,
+ });
+ },
+ });
return {
crawlActions: [
@@ -409,11 +411,18 @@ function useJobActions() {
loading: isReprocessingPending,
},
],
- tidyAssetsActions: [
+ adminMaintenanceActions: [
{
label: t("admin.background_jobs.actions.clean_assets"),
- onClick: () => tidyAssets(),
- loading: isTidyAssetsPending,
+ onClick: () =>
+ runAdminMaintenanceTask({
+ type: "tidy_assets",
+ args: {
+ cleanDanglingAssets: true,
+ syncAssetMetadata: true,
+ },
+ }),
+ loading: isAdminMaintenancePending,
},
],
};
@@ -480,11 +489,13 @@ export default function BackgroundJobs() {
actions: actions.assetPreprocessingActions,
},
{
- title: t("admin.background_jobs.jobs.tidy_assets.title"),
+ title: t("admin.background_jobs.jobs.admin_maintenance.title"),
icon: Database,
- stats: { queued: serverStats.tidyAssetsStats.queued },
- description: t("admin.background_jobs.jobs.tidy_assets.description"),
- actions: actions.tidyAssetsActions,
+ stats: { queued: serverStats.adminMaintenanceStats.queued },
+ description: t(
+ "admin.background_jobs.jobs.admin_maintenance.description",
+ ),
+ actions: actions.adminMaintenanceActions,
},
{
title: t("admin.background_jobs.jobs.video.title"),
diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json
index 4e79be51..f000bab5 100644
--- a/apps/web/lib/i18n/locales/en/translation.json
+++ b/apps/web/lib/i18n/locales/en/translation.json
@@ -381,9 +381,9 @@
"title": "Asset Preprocessing Jobs",
"description": "Image and document preprocessing (screenshots, text extraction, etc.)"
},
- "tidy_assets": {
- "title": "Tidy Assets Jobs",
- "description": "Asset cleanup and storage optimization"
+ "admin_maintenance": {
+ "title": "Admin Maintenance Jobs",
+ "description": "Administrative cleanup and asset maintenance"
},
"video": {
"title": "Video Download Jobs",
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index c0270f0d..38f831d7 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -11,13 +11,13 @@ import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import { shutdownPromise } from "./exit";
+import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
-import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
@@ -25,7 +25,7 @@ const workerBuilders = {
crawler: () => CrawlerWorker.build(),
inference: () => OpenAiWorker.build(),
search: () => SearchIndexingWorker.build(),
- tidyAssets: () => TidyAssetsWorker.build(),
+ adminMaintenance: () => AdminMaintenanceWorker.build(),
video: () => VideoWorker.build(),
feed: () => FeedWorker.build(),
assetPreprocessing: () => AssetPreprocessingWorker.build(),
diff --git a/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
new file mode 100644
index 00000000..8732c7c4
--- /dev/null
+++ b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
@@ -0,0 +1,78 @@
+import { eq } from "drizzle-orm";
+
+import { db } from "@karakeep/db";
+import { assets } from "@karakeep/db/schema";
+import {
+ ZAdminMaintenanceTidyAssetsTask,
+ ZTidyAssetsRequest,
+ zTidyAssetsRequestSchema,
+} from "@karakeep/shared-server";
+import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob } from "@karakeep/shared/queueing";
+
+async function handleAsset(
+ asset: {
+ assetId: string;
+ userId: string;
+ size: number;
+ contentType: string;
+ fileName?: string | null;
+ },
+ request: ZTidyAssetsRequest,
+ jobId: string,
+) {
+ const dbRow = await db.query.assets.findFirst({
+ where: eq(assets.id, asset.assetId),
+ });
+ if (!dbRow) {
+ if (request.cleanDanglingAssets) {
+ await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
+ );
+ } else {
+ logger.warn(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
+ );
+ }
+ return;
+ }
+
+ if (request.syncAssetMetadata) {
+ await db
+ .update(assets)
+ .set({
+ contentType: asset.contentType,
+ fileName: asset.fileName,
+ size: asset.size,
+ })
+ .where(eq(assets.id, asset.assetId));
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Updated metadata for asset ${asset.assetId}`,
+ );
+ }
+}
+
+export async function runTidyAssetsTask(
+ job: DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task: ZAdminMaintenanceTidyAssetsTask,
+) {
+ const jobId = job.id;
+ const parseResult = zTidyAssetsRequestSchema.safeParse(task.args);
+ if (!parseResult.success) {
+ throw new Error(
+ `[adminMaintenance:tidy_assets][${jobId}] Got malformed tidy asset args: ${parseResult.error.toString()}`,
+ );
+ }
+
+ for await (const asset of getAllAssets()) {
+ try {
+ await handleAsset(asset, parseResult.data, jobId);
+ } catch (error) {
+ logger.error(
+ `[adminMaintenance:tidy_assets][${jobId}] Failed to tidy asset ${asset.assetId}: ${error}`,
+ );
+ }
+ }
+}
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
new file mode 100644
index 00000000..f8af3de0
--- /dev/null
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -0,0 +1,74 @@
+import { workerStatsCounter } from "metrics";
+
+import {
+ AdminMaintenanceQueue,
+ ZAdminMaintenanceTask,
+ zAdminMaintenanceTaskSchema,
+ ZAdminMaintenanceTidyAssetsTask,
+} from "@karakeep/shared-server";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
+
+import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets";
+
+export class AdminMaintenanceWorker {
+ static async build() {
+ logger.info("Starting admin maintenance worker ...");
+ const worker =
+ (await getQueueClient())!.createRunner<ZAdminMaintenanceTask>(
+ AdminMaintenanceQueue,
+ {
+ run: runAdminMaintenance,
+ onComplete: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data.type}`, "completed")
+ .inc();
+ logger.info(
+ `[adminMaintenance:${job.data.type}][${job.id}] Completed successfully`,
+ );
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data?.type}`, "failed")
+ .inc();
+ logger.error(
+ `[adminMaintenance:${job.data?.type}][${job.id}] Job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) {
+ const jobId = job.id;
+ const parsed = zAdminMaintenanceTaskSchema.safeParse(job.data);
+ if (!parsed.success) {
+ throw new Error(
+ `[adminMaintenance][${jobId}] Got malformed job request: ${parsed.error.toString()}`,
+ );
+ }
+
+ const task = parsed.data;
+
+ switch (task.type) {
+ case "tidy_assets":
+ return runTidyAssetsTask(
+ job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task,
+ );
+ default:
+ throw new Error(
+ `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`,
+ );
+ }
+}
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
deleted file mode 100644
index b5b95185..00000000
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ /dev/null
@@ -1,110 +0,0 @@
-import { eq } from "drizzle-orm";
-import { workerStatsCounter } from "metrics";
-
-import { db } from "@karakeep/db";
-import { assets } from "@karakeep/db/schema";
-import {
- TidyAssetsQueue,
- ZTidyAssetsRequest,
- zTidyAssetsRequestSchema,
-} from "@karakeep/shared-server";
-import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
-import logger from "@karakeep/shared/logger";
-import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
-
-export class TidyAssetsWorker {
- static async build() {
- logger.info("Starting tidy assets worker ...");
- const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>(
- TidyAssetsQueue,
- {
- run: runTidyAssets,
- onComplete: (job) => {
- workerStatsCounter.labels("tidyAssets", "completed").inc();
- const jobId = job.id;
- logger.info(`[tidyAssets][${jobId}] Completed successfully`);
- return Promise.resolve();
- },
- onError: (job) => {
- workerStatsCounter.labels("tidyAssets", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
- },
- },
- {
- concurrency: 1,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
-
- return worker;
- }
-}
-
-async function handleAsset(
- asset: {
- assetId: string;
- userId: string;
- size: number;
- contentType: string;
- fileName?: string | null;
- },
- request: ZTidyAssetsRequest,
- jobId: string,
-) {
- const dbRow = await db.query.assets.findFirst({
- where: eq(assets.id, asset.assetId),
- });
- if (!dbRow) {
- if (request.cleanDanglingAssets) {
- await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
- logger.info(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
- );
- } else {
- logger.warn(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
- );
- }
- return;
- }
-
- if (request.syncAssetMetadata) {
- await db
- .update(assets)
- .set({
- contentType: asset.contentType,
- fileName: asset.fileName,
- size: asset.size,
- })
- .where(eq(assets.id, asset.assetId));
- logger.info(
- `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`,
- );
- }
-}
-
-async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
- const jobId = job.id;
-
- const request = zTidyAssetsRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- for await (const asset of getAllAssets()) {
- try {
- handleAsset(asset, request.data, jobId);
- } catch (e) {
- logger.error(
- `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`,
- );
- }
- }
-}