aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts4
-rw-r--r--apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts78
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts74
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts110
4 files changed, 154 insertions, 112 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index c0270f0d..38f831d7 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -11,13 +11,13 @@ import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import { shutdownPromise } from "./exit";
+import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
-import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
@@ -25,7 +25,7 @@ const workerBuilders = {
crawler: () => CrawlerWorker.build(),
inference: () => OpenAiWorker.build(),
search: () => SearchIndexingWorker.build(),
- tidyAssets: () => TidyAssetsWorker.build(),
+ adminMaintenance: () => AdminMaintenanceWorker.build(),
video: () => VideoWorker.build(),
feed: () => FeedWorker.build(),
assetPreprocessing: () => AssetPreprocessingWorker.build(),
diff --git a/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
new file mode 100644
index 00000000..8732c7c4
--- /dev/null
+++ b/apps/workers/workers/adminMaintenance/tasks/tidyAssets.ts
@@ -0,0 +1,78 @@
+import { eq } from "drizzle-orm";
+
+import { db } from "@karakeep/db";
+import { assets } from "@karakeep/db/schema";
+import {
+ ZAdminMaintenanceTidyAssetsTask,
+ ZTidyAssetsRequest,
+ zTidyAssetsRequestSchema,
+} from "@karakeep/shared-server";
+import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob } from "@karakeep/shared/queueing";
+
+async function handleAsset(
+ asset: {
+ assetId: string;
+ userId: string;
+ size: number;
+ contentType: string;
+ fileName?: string | null;
+ },
+ request: ZTidyAssetsRequest,
+ jobId: string,
+) {
+ const dbRow = await db.query.assets.findFirst({
+ where: eq(assets.id, asset.assetId),
+ });
+ if (!dbRow) {
+ if (request.cleanDanglingAssets) {
+ await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
+ );
+ } else {
+ logger.warn(
+ `[adminMaintenance:tidy_assets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
+ );
+ }
+ return;
+ }
+
+ if (request.syncAssetMetadata) {
+ await db
+ .update(assets)
+ .set({
+ contentType: asset.contentType,
+ fileName: asset.fileName,
+ size: asset.size,
+ })
+ .where(eq(assets.id, asset.assetId));
+ logger.info(
+ `[adminMaintenance:tidy_assets][${jobId}] Updated metadata for asset ${asset.assetId}`,
+ );
+ }
+}
+
+export async function runTidyAssetsTask(
+ job: DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task: ZAdminMaintenanceTidyAssetsTask,
+) {
+ const jobId = job.id;
+ const parseResult = zTidyAssetsRequestSchema.safeParse(task.args);
+ if (!parseResult.success) {
+ throw new Error(
+ `[adminMaintenance:tidy_assets][${jobId}] Got malformed tidy asset args: ${parseResult.error.toString()}`,
+ );
+ }
+
+ for await (const asset of getAllAssets()) {
+ try {
+ await handleAsset(asset, parseResult.data, jobId);
+ } catch (error) {
+ logger.error(
+ `[adminMaintenance:tidy_assets][${jobId}] Failed to tidy asset ${asset.assetId}: ${error}`,
+ );
+ }
+ }
+}
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
new file mode 100644
index 00000000..f8af3de0
--- /dev/null
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -0,0 +1,74 @@
+import { workerStatsCounter } from "metrics";
+
+import {
+ AdminMaintenanceQueue,
+ ZAdminMaintenanceTask,
+ zAdminMaintenanceTaskSchema,
+ ZAdminMaintenanceTidyAssetsTask,
+} from "@karakeep/shared-server";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
+
+import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets";
+
+export class AdminMaintenanceWorker {
+ static async build() {
+ logger.info("Starting admin maintenance worker ...");
+ const worker =
+ (await getQueueClient())!.createRunner<ZAdminMaintenanceTask>(
+ AdminMaintenanceQueue,
+ {
+ run: runAdminMaintenance,
+ onComplete: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data.type}`, "completed")
+ .inc();
+ logger.info(
+ `[adminMaintenance:${job.data.type}][${job.id}] Completed successfully`,
+ );
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ workerStatsCounter
+ .labels(`adminMaintenance:${job.data?.type}`, "failed")
+ .inc();
+ logger.error(
+ `[adminMaintenance:${job.data?.type}][${job.id}] Job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) {
+ const jobId = job.id;
+ const parsed = zAdminMaintenanceTaskSchema.safeParse(job.data);
+ if (!parsed.success) {
+ throw new Error(
+ `[adminMaintenance][${jobId}] Got malformed job request: ${parsed.error.toString()}`,
+ );
+ }
+
+ const task = parsed.data;
+
+ switch (task.type) {
+ case "tidy_assets":
+ return runTidyAssetsTask(
+ job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
+ task,
+ );
+ default:
+ throw new Error(
+ `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`,
+ );
+ }
+}
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
deleted file mode 100644
index b5b95185..00000000
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ /dev/null
@@ -1,110 +0,0 @@
-import { eq } from "drizzle-orm";
-import { workerStatsCounter } from "metrics";
-
-import { db } from "@karakeep/db";
-import { assets } from "@karakeep/db/schema";
-import {
- TidyAssetsQueue,
- ZTidyAssetsRequest,
- zTidyAssetsRequestSchema,
-} from "@karakeep/shared-server";
-import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
-import logger from "@karakeep/shared/logger";
-import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
-
-export class TidyAssetsWorker {
- static async build() {
- logger.info("Starting tidy assets worker ...");
- const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>(
- TidyAssetsQueue,
- {
- run: runTidyAssets,
- onComplete: (job) => {
- workerStatsCounter.labels("tidyAssets", "completed").inc();
- const jobId = job.id;
- logger.info(`[tidyAssets][${jobId}] Completed successfully`);
- return Promise.resolve();
- },
- onError: (job) => {
- workerStatsCounter.labels("tidyAssets", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
- },
- },
- {
- concurrency: 1,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
-
- return worker;
- }
-}
-
-async function handleAsset(
- asset: {
- assetId: string;
- userId: string;
- size: number;
- contentType: string;
- fileName?: string | null;
- },
- request: ZTidyAssetsRequest,
- jobId: string,
-) {
- const dbRow = await db.query.assets.findFirst({
- where: eq(assets.id, asset.assetId),
- });
- if (!dbRow) {
- if (request.cleanDanglingAssets) {
- await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
- logger.info(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
- );
- } else {
- logger.warn(
- `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
- );
- }
- return;
- }
-
- if (request.syncAssetMetadata) {
- await db
- .update(assets)
- .set({
- contentType: asset.contentType,
- fileName: asset.fileName,
- size: asset.size,
- })
- .where(eq(assets.id, asset.assetId));
- logger.info(
- `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`,
- );
- }
-}
-
-async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
- const jobId = job.id;
-
- const request = zTidyAssetsRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- for await (const asset of getAllAssets()) {
- try {
- handleAsset(asset, request.data, jobId);
- } catch (e) {
- logger.error(
- `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`,
- );
- }
- }
-}