diff options
| author | MohamedBassem <me@mbassem.com> | 2024-10-12 16:47:22 +0000 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2024-10-12 17:37:42 +0000 |
| commit | c16173ea0fdbf6cc47b13756c0a77e8399669055 (patch) | |
| tree | 6b3ecd073259176059386eb16c6635e4699d26a3 /apps/workers/tidyAssetsWorker.ts | |
| parent | 9f87207d668fbe0a2039c63803128fbe5916f993 (diff) | |
| download | karakeep-c16173ea0fdbf6cc47b13756c0a77e8399669055.tar.zst | |
feature: Introduce a mechanism to cleanup dangling assets
Diffstat (limited to 'apps/workers/tidyAssetsWorker.ts')
| -rw-r--r-- | apps/workers/tidyAssetsWorker.ts | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts new file mode 100644 index 00000000..bc14aab9 --- /dev/null +++ b/apps/workers/tidyAssetsWorker.ts @@ -0,0 +1,107 @@ +import { eq } from "drizzle-orm"; + +import { db } from "@hoarder/db"; +import { assets } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; +import { deleteAsset, getAllAssets } from "@hoarder/shared/assetdb"; +import logger from "@hoarder/shared/logger"; +import { + TidyAssetsQueue, + ZTidyAssetsRequest, + zTidyAssetsRequestSchema, +} from "@hoarder/shared/queues"; + +export class TidyAssetsWorker { + static build() { + logger.info("Starting tidy assets worker ..."); + const worker = new Runner<ZTidyAssetsRequest>( + TidyAssetsQueue, + { + run: runTidyAssets, + onComplete: (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[tidyAssets][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job?.id ?? "unknown"; + logger.error( + `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function handleAsset( + asset: { + assetId: string; + userId: string; + size: number; + contentType: string; + fileName?: string | null; + }, + request: ZTidyAssetsRequest, + jobId: string, +) { + const dbRow = await db.query.assets.findFirst({ + where: eq(assets.id, asset.assetId), + }); + if (!dbRow) { + if (request.cleanDanglingAssets) { + await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); + logger.info( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, + ); + } else { + logger.warn( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, + ); + } + return; + } + + if (request.syncAssetMetadata) { + await db + .update(assets) + .set({ + contentType: asset.contentType, + fileName: asset.fileName, + size: asset.size, + }) + .where(eq(assets.id, asset.assetId)); + logger.info( + `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, + ); + } +} + +async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) { + const jobId = job.id ?? "unknown"; + + const request = zTidyAssetsRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + for await (const asset of getAllAssets()) { + try { + handleAsset(asset, request.data, jobId); + } catch (e) { + logger.error( + `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, + ); + } + } +} |
