aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2024-10-12 16:47:22 +0000
committerMohamedBassem <me@mbassem.com>2024-10-12 17:37:42 +0000
commitc16173ea0fdbf6cc47b13756c0a77e8399669055 (patch)
tree6b3ecd073259176059386eb16c6635e4699d26a3 /apps/workers
parent9f87207d668fbe0a2039c63803128fbe5916f993 (diff)
downloadkarakeep-c16173ea0fdbf6cc47b13756c0a77e8399669055.tar.zst
feature: Introduce a mechanism to cleanup dangling assets
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts12
-rw-r--r--apps/workers/tidyAssetsWorker.ts107
2 files changed, 116 insertions, 3 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index e576776a..f9a05e59 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,5 +1,7 @@
import "dotenv/config";
+import { TidyAssetsWorker } from "tidyAssetsWorker";
+
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import { runQueueDBMigrations } from "@hoarder/shared/queues";
@@ -13,21 +15,25 @@ async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [crawler, openai, search] = [
+ const [crawler, openai, search, tidyAssets] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
+ TidyAssetsWorker.build(),
];
await Promise.any([
- Promise.all([crawler.run(), openai.run(), search.run()]),
+ Promise.all([crawler.run(), openai.run(), search.run(), tidyAssets.run()]),
shutdownPromise,
]);
- logger.info("Shutting down crawler, openai and search workers ...");
+ logger.info(
+ "Shutting down crawler, openai, tidyAssets and search workers ...",
+ );
crawler.stop();
openai.stop();
search.stop();
+ tidyAssets.stop();
}
main();
diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts
new file mode 100644
index 00000000..bc14aab9
--- /dev/null
+++ b/apps/workers/tidyAssetsWorker.ts
@@ -0,0 +1,107 @@
+import { eq } from "drizzle-orm";
+
+import { db } from "@hoarder/db";
+import { assets } from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
+import { deleteAsset, getAllAssets } from "@hoarder/shared/assetdb";
+import logger from "@hoarder/shared/logger";
+import {
+ TidyAssetsQueue,
+ ZTidyAssetsRequest,
+ zTidyAssetsRequestSchema,
+} from "@hoarder/shared/queues";
+
+export class TidyAssetsWorker {
+ static build() {
+ logger.info("Starting tidy assets worker ...");
+ const worker = new Runner<ZTidyAssetsRequest>(
+ TidyAssetsQueue,
+ {
+ run: runTidyAssets,
+ onComplete: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[tidyAssets][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(
+ `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function handleAsset(
+ asset: {
+ assetId: string;
+ userId: string;
+ size: number;
+ contentType: string;
+ fileName?: string | null;
+ },
+ request: ZTidyAssetsRequest,
+ jobId: string,
+) {
+ const dbRow = await db.query.assets.findFirst({
+ where: eq(assets.id, asset.assetId),
+ });
+ if (!dbRow) {
+ if (request.cleanDanglingAssets) {
+ await deleteAsset({ userId: asset.userId, assetId: asset.assetId });
+ logger.info(
+ `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`,
+ );
+ } else {
+ logger.warn(
+ `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`,
+ );
+ }
+ return;
+ }
+
+ if (request.syncAssetMetadata) {
+ await db
+ .update(assets)
+ .set({
+ contentType: asset.contentType,
+ fileName: asset.fileName,
+ size: asset.size,
+ })
+ .where(eq(assets.id, asset.assetId));
+ logger.info(
+ `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`,
+ );
+ }
+}
+
+async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
+ const jobId = job.id ?? "unknown";
+
+ const request = zTidyAssetsRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ for await (const asset of getAllAssets()) {
+ try {
+ handleAsset(asset, request.data, jobId);
+ } catch (e) {
+ logger.error(
+ `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`,
+ );
+ }
+ }
+}