aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/web/components/admin/BackgroundJobs.tsx53
-rw-r--r--apps/web/lib/i18n/locales/en/translation.json6
-rw-r--r--apps/workers/index.ts4
-rw-r--r--apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts78
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts74
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts110
-rw-r--r--docs/docs/03-configuration.md2
-rw-r--r--packages/shared-server/src/queues.ts25
-rw-r--r--packages/trpc/routers/admin.ts30
-rw-r--r--packages/trpc/stats.ts4
10 files changed, 227 insertions, 159 deletions
diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx
index 569fd67a..5920b1ab 100644
--- a/apps/web/components/admin/BackgroundJobs.tsx
+++ b/apps/web/components/admin/BackgroundJobs.tsx
@@ -320,20 +320,22 @@ function useJobActions() {
},
});
- const { mutateAsync: tidyAssets, isPending: isTidyAssetsPending } =
- api.admin.tidyAssets.useMutation({
- onSuccess: () => {
- toast({
- description: "Tidy assets request has been enqueued!",
- });
- },
- onError: (e) => {
- toast({
- variant: "destructive",
- description: e.message,
- });
- },
- });
+ const {
+ mutateAsync: runAdminMaintenanceTask,
+ isPending: isAdminMaintenancePending,
+ } = api.admin.runAdminMaintenanceTask.useMutation({
+ onSuccess: () => {
+ toast({
+ description: "Admin maintenance request has been enqueued!",
+ });
+ },
+ onError: (e) => {
+ toast({
+ variant: "destructive",
+ description: e.message,
+ });
+ },
+ });
return {
crawlActions: [
@@ -409,11 +411,18 @@ function useJobActions() {
loading: isReprocessingPending,
},
],
- tidyAssetsActions: [
+ adminMaintenanceActions: [
{
label: t("admin.background_jobs.actions.clean_assets"),
- onClick: () => tidyAssets(),
- loading: isTidyAssetsPending,
+ onClick: () =>
+ runAdminMaintenanceTask({
+ type: "tidy_assets",
+ args: {
+ cleanDanglingAssets: true,
+ syncAssetMetadata: true,
+ },
+ }),
+ loading: isAdminMaintenancePending,
},
],
};
@@ -480,11 +489,13 @@ export default function BackgroundJobs() {
actions: actions.assetPreprocessingActions,
},
{
- title: t("admin.background_jobs.jobs.tidy_assets.title"),
+ title: t("admin.background_jobs.jobs.admin_maintenance.title"),
icon: Database,
- stats: { queued: serverStats.tidyAssetsStats.queued },
- description: t("admin.background_jobs.jobs.tidy_assets.description"),
- actions: actions.tidyAssetsActions,
+ stats: { queued: serverStats.adminMaintenanceStats.queued },
+ description: t(
+ "admin.background_jobs.jobs.admin_maintenance.description",
+ ),
+ actions: actions.adminMaintenanceActions,
},
{
title: t("admin.background_jobs.jobs.video.title"),
diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json
index 4e79be51..f000bab5 100644
--- a/apps/web/lib/i18n/locales/en/translation.json
+++ b/apps/web/lib/i18n/locales/en/translation.json
@@ -381,9 +381,9 @@
"title": "Asset Preprocessing Jobs",
"description": "Image and document preprocessing (screenshots, text extraction, etc.)"
},
- "tidy_assets": {
- "title": "Tidy Assets Jobs",
- "description": "Asset cleanup and storage optimization"
+ "admin_maintenance": {
+ "title": "Admin Maintenance Jobs",
+ "description": "Administrative cleanup and asset maintenance"
},
"video": {
"title": "Video Download Jobs",
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index c0270f0d..38f831d7 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -11,13 +11,13 @@ import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import { shutdownPromise } from "./exit";
+import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
-import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
@@ -25,7 +25,7 @@ const workerBuilders = {
crawler: () => CrawlerWorker.build(),
inference: () => OpenAiWorker.build(),
search: () => SearchIndexingWorker.build(),
- tidyAssets: () => TidyAssetsWorker.build(),
+ adminMaintenance: () => AdminMaintenanceWorker.build(),
video: () => VideoWorker.build(),
feed: () => FeedWorker.build(),
assetPreprocessing: () => AssetPreprocessingWorker.build(),
diff --git a/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
new file mode 100644
index 00000000..8732c7c4
--- /dev/null
+++ b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
@@ -0,0 +1,78 @@
+import { eq } from "drizzle-orm";
+
+import { db } from "@karakeep/db";
+import { assets } from "@karakeep/db/schema";
+import {
+ ZAdminMaintenanceTidyAssetsTask,
+ ZTidyAssetsRequest,
+ zTidyAssetsRequestSchema,
+} from "@karakeep/shared-server";
+import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob } from "@karakeep/shared/queueing";
+
+async function handleAsset(
+ asset: {
+ assetId: string;
+ userId: string;
+ size: number;
+ contentType: string;
+ fileName?: string | null;
+ },
+ request: ZTidyAssetsRequest,
+ jobId: string,
+) {
+ const dbRow = await db.query.assets.findFirst({
+ where: eq(assets.id, asset.assetId),
+ });
+ if (!dbRow) {
+ if (request.cleanDanglingAssets) {
+ await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
+ );
+ } else {
+ logger.warn(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
+ );
+ }
+ return;
+ }
+
+ if (request.syncAssetMetadata) {
+ await db
+ .update(assets)
+ .set({
+ contentType: asset.contentType,
+ fileName: asset.fileName,
+ size: asset.size,
+ })
+ .where(eq(assets.id, asset.assetId));
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Updated metadata for asset ${asset.assetId}`,
+ );
+ }
+}
+
+export async function runTidyAssetsTask(
+ job: DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task: ZAdminMaintenanceTidyAssetsTask,
+) {
+ const jobId = job.id;
+ const parseResult = zTidyAssetsRequestSchema.safeParse(task.args);
+ if (!parseResult.success) {
+ throw new Error(
+ `[adminMaintenance:tidy_assets][${jobId}] Got malformed tidy asset args: ${parseResult.error.toString()}`,
+ );
+ }
+
+ for await (const asset of getAllAssets()) {
+ try {
+ await handleAsset(asset, parseResult.data, jobId);
+ } catch (error) {
+ logger.error(
+ `[adminMaintenance:tidy_assets][${jobId}] Failed to tidy asset ${asset.assetId}: ${error}`,
+ );
+ }
+ }
+}
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
new file mode 100644
index 00000000..f8af3de0
--- /dev/null
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -0,0 +1,74 @@
+import { workerStatsCounter } from "metrics";
+
+import {
+ AdminMaintenanceQueue,
+ ZAdminMaintenanceTask,
+ zAdminMaintenanceTaskSchema,
+ ZAdminMaintenanceTidyAssetsTask,
+} from "@karakeep/shared-server";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
+
+import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets";
+
+export class AdminMaintenanceWorker {
+ static async build() {
+ logger.info("Starting admin maintenance worker ...");
+ const worker =
+ (await getQueueClient())!.createRunner<ZAdminMaintenanceTask>(
+ AdminMaintenanceQueue,
+ {
+ run: runAdminMaintenance,
+ onComplete: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data.type}`, "completed")
+ .inc();
+ logger.info(
+ `[adminMaintenance:${job.data.type}][${job.id}] Completed successfully`,
+ );
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data?.type}`, "failed")
+ .inc();
+ logger.error(
+ `[adminMaintenance:${job.data?.type}][${job.id}] Job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) {
+ const jobId = job.id;
+ const parsed = zAdminMaintenanceTaskSchema.safeParse(job.data);
+ if (!parsed.success) {
+ throw new Error(
+ `[adminMaintenance][${jobId}] Got malformed job request: ${parsed.error.toString()}`,
+ );
+ }
+
+ const task = parsed.data;
+
+ switch (task.type) {
+ case "tidy_assets":
+ return runTidyAssetsTask(
+ job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task,
+ );
+ default:
+ throw new Error(
+ `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`,
+ );
+ }
+}
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
deleted file mode 100644
index b5b95185..00000000
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ /dev/null
@@ -1,110 +0,0 @@
-import { eq } from "drizzle-orm";
-import { workerStatsCounter } from "metrics";
-
-import { db } from "@karakeep/db";
-import { assets } from "@karakeep/db/schema";
-import {
- TidyAssetsQueue,
- ZTidyAssetsRequest,
- zTidyAssetsRequestSchema,
-} from "@karakeep/shared-server";
-import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
-import logger from "@karakeep/shared/logger";
-import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
-
-export class TidyAssetsWorker {
- static async build() {
- logger.info("Starting tidy assets worker ...");
- const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>(
- TidyAssetsQueue,
- {
- run: runTidyAssets,
- onComplete: (job) => {
- workerStatsCounter.labels("tidyAssets", "completed").inc();
- const jobId = job.id;
- logger.info(`[tidyAssets][${jobId}] Completed successfully`);
- return Promise.resolve();
- },
- onError: (job) => {
- workerStatsCounter.labels("tidyAssets", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
- },
- },
- {
- concurrency: 1,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
-
- return worker;
- }
-}
-
-async function handleAsset(
- asset: {
- assetId: string;
- userId: string;
- size: number;
- contentType: string;
- fileName?: string | null;
- },
- request: ZTidyAssetsRequest,
- jobId: string,
-) {
- const dbRow = await db.query.assets.findFirst({
- where: eq(assets.id, asset.assetId),
- });
- if (!dbRow) {
- if (request.cleanDanglingAssets) {
- await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
- logger.info(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
- );
- } else {
- logger.warn(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
- );
- }
- return;
- }
-
- if (request.syncAssetMetadata) {
- await db
- .update(assets)
- .set({
- contentType: asset.contentType,
- fileName: asset.fileName,
- size: asset.size,
- })
- .where(eq(assets.id, asset.assetId));
- logger.info(
- `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`,
- );
- }
-}
-
-async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
- const jobId = job.id;
-
- const request = zTidyAssetsRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- for await (const asset of getAllAssets()) {
- try {
- handleAsset(asset, request.data, jobId);
- } catch (e) {
- logger.error(
- `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`,
- );
- }
- }
-}
diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md
index d9e55322..a7682e72 100644
--- a/docs/docs/03-configuration.md
+++ b/docs/docs/03-configuration.md
@@ -7,7 +7,7 @@ The app is mainly configured by environment variables. All the used environment
| PORT | No | 3000 | The port on which the web server will listen. DON'T CHANGE THIS IF YOU'RE USING DOCKER, instead changed the docker bound external port. |
| WORKERS_PORT | No | 0 (Random Port) | The port on which the worker will export its prometheus metrics on `/metrics`. By default it's a random unused port. If you want to utilize those metrics, fix the port to a value (and export it in docker if you're using docker). |
| WORKERS_HOST | No | 127.0.0.1 | Host to listen to for requests to WORKERS_PORT. You will need to set this if running in a container, since localhost will not be reachable from outside |
-| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,tidyAssets,video,feed,assetPreprocessing,webhook,ruleEngine. |
+| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,adminMaintenance,video,feed,assetPreprocessing,webhook,ruleEngine. |
| WORKERS_DISABLED_WORKERS | No | Not set | Comma separated list of worker names to disable. Takes precedence over `WORKERS_ENABLED_WORKERS`. |
| DATA_DIR | Yes | Not set | The path for the persistent data directory. This is where the db lives. Assets are stored here by default unless `ASSETS_DIR` is set. |
| ASSETS_DIR | No | Not set | The path where crawled assets will be stored. If not set, defaults to `${DATA_DIR}/assets`. |
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index a666446e..813b9c3b 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -67,21 +67,34 @@ export const SearchIndexingQueue =
keepFailedJobs: false,
});
-// Tidy Assets Worker
+// Admin maintenance worker
export const zTidyAssetsRequestSchema = z.object({
cleanDanglingAssets: z.boolean().optional().default(false),
syncAssetMetadata: z.boolean().optional().default(false),
});
export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>;
-export const TidyAssetsQueue = QUEUE_CLIENT.createQueue<ZTidyAssetsRequest>(
- "tidy_assets_queue",
- {
+
+export const zAdminMaintenanceTaskSchema = z.discriminatedUnion("type", [
+ z.object({
+ type: z.literal("tidy_assets"),
+ args: zTidyAssetsRequestSchema,
+ }),
+]);
+
+export type ZAdminMaintenanceTask = z.infer<typeof zAdminMaintenanceTaskSchema>;
+export type ZAdminMaintenanceTaskType = ZAdminMaintenanceTask["type"];
+export type ZAdminMaintenanceTidyAssetsTask = Extract<
+ ZAdminMaintenanceTask,
+ { type: "tidy_assets" }
+>;
+
+export const AdminMaintenanceQueue =
+ QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: false,
- },
-);
+ });
export async function triggerSearchReindex(
bookmarkId: string,
diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts
index a40dfa6f..881d947c 100644
--- a/packages/trpc/routers/admin.ts
+++ b/packages/trpc/routers/admin.ts
@@ -4,15 +4,16 @@ import { z } from "zod";
import { assets, bookmarkLinks, bookmarks, users } from "@karakeep/db/schema";
import {
+ AdminMaintenanceQueue,
AssetPreprocessingQueue,
FeedQueue,
LinkCrawlerQueue,
OpenAIQueue,
SearchIndexingQueue,
- TidyAssetsQueue,
triggerSearchReindex,
VideoWorkerQueue,
WebhookQueue,
+ zAdminMaintenanceTaskSchema,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { PluginManager, PluginType } from "@karakeep/shared/plugins";
@@ -63,7 +64,7 @@ export const adminAppRouter = router({
indexingStats: z.object({
queued: z.number(),
}),
- tidyAssetsStats: z.object({
+ adminMaintenanceStats: z.object({
queued: z.number(),
}),
videoStats: z.object({
@@ -95,8 +96,8 @@ export const adminAppRouter = router({
[{ value: pendingInference }],
[{ value: failedInference }],
- // Tidy Assets
- queuedTidyAssets,
+ // Admin maintenance
+ queuedAdminMaintenance,
// Video
queuedVideo,
@@ -145,8 +146,8 @@ export const adminAppRouter = router({
),
),
- // Tidy Assets
- TidyAssetsQueue.stats(),
+ // Admin maintenance
+ AdminMaintenanceQueue.stats(),
// Video
VideoWorkerQueue.stats(),
@@ -175,8 +176,10 @@ export const adminAppRouter = router({
indexingStats: {
queued: queuedIndexing.pending + queuedIndexing.pending_retry,
},
- tidyAssetsStats: {
- queued: queuedTidyAssets.pending + queuedTidyAssets.pending_retry,
+ adminMaintenanceStats: {
+ queued:
+ queuedAdminMaintenance.pending +
+ queuedAdminMaintenance.pending_retry,
},
videoStats: {
queued: queuedVideo.pending + queuedVideo.pending_retry,
@@ -277,12 +280,11 @@ export const adminAppRouter = router({
),
);
}),
- tidyAssets: adminProcedure.mutation(async () => {
- await TidyAssetsQueue.enqueue({
- cleanDanglingAssets: true,
- syncAssetMetadata: true,
- });
- }),
+ runAdminMaintenanceTask: adminProcedure
+ .input(zAdminMaintenanceTaskSchema)
+ .mutation(async ({ input }) => {
+ await AdminMaintenanceQueue.enqueue(input);
+ }),
userStats: adminProcedure
.output(
z.record(
diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts
index c6d5c94c..60766069 100644
--- a/packages/trpc/stats.ts
+++ b/packages/trpc/stats.ts
@@ -4,13 +4,13 @@ import { Counter, Gauge, Histogram, register } from "prom-client";
import { db } from "@karakeep/db";
import { assets, bookmarks, users } from "@karakeep/db/schema";
import {
+ AdminMaintenanceQueue,
AssetPreprocessingQueue,
FeedQueue,
LinkCrawlerQueue,
OpenAIQueue,
RuleEngineQueue,
SearchIndexingQueue,
- TidyAssetsQueue,
VideoWorkerQueue,
WebhookQueue,
} from "@karakeep/shared-server";
@@ -25,7 +25,7 @@ const queuePendingJobsGauge = new Gauge({
{ name: "link_crawler", queue: LinkCrawlerQueue },
{ name: "openai", queue: OpenAIQueue },
{ name: "search_indexing", queue: SearchIndexingQueue },
- { name: "tidy_assets", queue: TidyAssetsQueue },
+ { name: "admin_maintenance", queue: AdminMaintenanceQueue },
{ name: "video_worker", queue: VideoWorkerQueue },
{ name: "feed", queue: FeedQueue },
{ name: "asset_preprocessing", queue: AssetPreprocessingQueue },