From 4a13c36da50f6b3171d817edebefe96ba85dc666 Mon Sep 17 00:00:00 2001 From: kamtschatka Date: Mon, 28 Oct 2024 02:51:00 +0100 Subject: feature: Archive videos using yt-dlp. Fixes #215 (#525) * Allow downloading more content from a webpage and index it #215 Added a worker that allows downloading videos depending on the environment variables refactored the code a bit added new video asset updated documentation * Some tweaks * Drop the dependency on the yt-dlp wrapper * Update openapi specs * Dont log an error when the url is not supported * Better handle supported websites that dont download anything --------- Co-authored-by: Mohamed Bassem --- apps/workers/crawlerWorker.ts | 59 +++--------- apps/workers/index.ts | 15 +++- apps/workers/videoWorker.ts | 202 ++++++++++++++++++++++++++++++++++++++++++ apps/workers/workerUtils.ts | 48 ++++++++++ 4 files changed, 272 insertions(+), 52 deletions(-) create mode 100644 apps/workers/videoWorker.ts create mode 100644 apps/workers/workerUtils.ts (limited to 'apps/workers') diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index ca0f6608..d5bc555e 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -23,9 +23,10 @@ import puppeteer from "puppeteer-extra"; import AdblockerPlugin from "puppeteer-extra-plugin-adblocker"; import StealthPlugin from "puppeteer-extra-plugin-stealth"; import { withTimeout } from "utils"; +import { getBookmarkDetails, updateAsset } from "workerUtils"; import type { ZCrawlLinkRequest } from "@hoarder/shared/queues"; -import { db, HoarderDBTransaction } from "@hoarder/db"; +import { db } from "@hoarder/db"; import { assets, AssetTypes, @@ -35,12 +36,12 @@ import { } from "@hoarder/db/schema"; import { ASSET_TYPES, - deleteAsset, getAssetSize, IMAGE_ASSET_TYPES, newAssetId, saveAsset, saveAssetFromFile, + silentDeleteAsset, SUPPORTED_UPLOAD_ASSET_TYPES, } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; @@ -49,6 +50,7 @@ import { LinkCrawlerQueue, OpenAIQueue, triggerSearchReindex, + triggerVideoWorker, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; @@ -207,33 +209,6 @@ async function changeBookmarkStatus( .where(eq(bookmarkLinks.id, bookmarkId)); } -async function getBookmarkDetails(bookmarkId: string) { - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: true, - assets: true, - }, - }); - - if (!bookmark || !bookmark.link) { - throw new Error("The bookmark either doesn't exist or is not a link"); - } - return { - url: bookmark.link.url, - userId: bookmark.userId, - screenshotAssetId: bookmark.assets.find( - (a) => a.assetType == AssetTypes.LINK_SCREENSHOT, - )?.id, - imageAssetId: bookmark.assets.find( - (a) => a.assetType == AssetTypes.LINK_BANNER_IMAGE, - )?.id, - fullPageArchiveAssetId: bookmark.assets.find( - (a) => a.assetType == AssetTypes.LINK_FULL_PAGE_ARCHIVE, - )?.id, - }; -} - /** * This provides some "basic" protection from malicious URLs. However, all of those * can be easily circumvented by pointing dns of origin to localhost, or with @@ -609,12 +584,8 @@ async function crawlAndParseUrl( // Delete the old assets if any await Promise.all([ - oldScreenshotAssetId - ? deleteAsset({ userId, assetId: oldScreenshotAssetId }).catch(() => ({})) - : {}, - oldImageAssetId - ? deleteAsset({ userId, assetId: oldImageAssetId }).catch(() => ({})) - : {}, + silentDeleteAsset(userId, oldScreenshotAssetId), + silentDeleteAsset(userId, oldImageAssetId), ]); return async () => { @@ -641,9 +612,7 @@ async function crawlAndParseUrl( ); }); if (oldFullPageArchiveAssetId) { - await deleteAsset({ userId, assetId: oldFullPageArchiveAssetId }).catch( - () => ({}), - ); + silentDeleteAsset(userId, oldFullPageArchiveAssetId); } } }; @@ -713,17 +682,9 @@ async function runCrawler(job: DequeuedJob) { // Update the search index await triggerSearchReindex(bookmarkId); + // Trigger a potential download of a video from the URL + await triggerVideoWorker(bookmarkId, url); + // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); } - -async function updateAsset( - oldAssetId: string | undefined, - newAsset: DBAssetType, - txn: HoarderDBTransaction, -) { - if (oldAssetId) { - await txn.delete(assets).where(eq(assets.id, oldAssetId)); - } - await txn.insert(assets).values(newAsset); -} diff --git a/apps/workers/index.ts b/apps/workers/index.ts index f9a05e59..3b5896e4 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -10,30 +10,39 @@ import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; import { OpenAiWorker } from "./openaiWorker"; import { SearchIndexingWorker } from "./searchWorker"; +import { VideoWorker } from "./videoWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets] = [ + const [crawler, openai, search, tidyAssets, video] = [ await CrawlerWorker.build(), OpenAiWorker.build(), SearchIndexingWorker.build(), TidyAssetsWorker.build(), + VideoWorker.build(), ]; await Promise.any([ - Promise.all([crawler.run(), openai.run(), search.run(), tidyAssets.run()]), + Promise.all([ + crawler.run(), + openai.run(), + search.run(), + tidyAssets.run(), + video.run(), + ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets and search workers ...", + "Shutting down crawler, openai, tidyAssets, video and search workers ...", ); crawler.stop(); openai.stop(); search.stop(); tidyAssets.stop(); + video.stop(); } main(); diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts new file mode 100644 index 00000000..5448f0fa --- /dev/null +++ b/apps/workers/videoWorker.ts @@ -0,0 +1,202 @@ +import fs from "fs"; +import * as os from "os"; +import path from "path"; +import { execa } from "execa"; +import { DequeuedJob, Runner } from "liteque"; + +import { db } from "@hoarder/db"; +import { AssetTypes } from "@hoarder/db/schema"; +import { + ASSET_TYPES, + getAssetSize, + newAssetId, + saveAssetFromFile, +} from "@hoarder/shared/assetdb"; +import serverConfig from "@hoarder/shared/config"; +import logger from "@hoarder/shared/logger"; +import { VideoWorkerQueue, ZVideoRequest } from "@hoarder/shared/queues"; + +import { withTimeout } from "./utils"; +import { getBookmarkDetails, updateAsset } from "./workerUtils"; + +const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); + +export class VideoWorker { + static build() { + logger.info("Starting video worker ..."); + + return new Runner( + VideoWorkerQueue, + { + run: withTimeout( + runWorker, + /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, + ), + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info( + `[VideoCrawler][${jobId}] Video Download Completed successfully`, + ); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error( + `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, + ); + return Promise.resolve(); + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.downloadVideoTimeout, + concurrency: 1, + }, + ); + } +} + +function prepareYtDlpArguments(url: string, assetPath: string) { + // TODO allow custom commandline arguments? + const ytDlpArguments = [url]; + if (serverConfig.crawler.maxVideoDownloadSize > 0) { + ytDlpArguments.push( + "-f", + `best[filesize<${serverConfig.crawler.maxVideoDownloadSize}M]`, + ); + } + ytDlpArguments.push("-o", assetPath); + ytDlpArguments.push("--no-playlist"); + return ytDlpArguments; +} + +async function runWorker(job: DequeuedJob) { + const jobId = job.id ?? "unknown"; + const { bookmarkId } = job.data; + + const { + url, + userId, + videoAssetId: oldVideoAssetId, + } = await getBookmarkDetails(bookmarkId); + + if (!serverConfig.crawler.downloadVideo) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it is disabled in the config.`, + ); + return; + } + + const videoAssetId = newAssetId(); + let assetPath = `${TMP_FOLDER}/${videoAssetId}`; + await fs.promises.mkdir(TMP_FOLDER, { recursive: true }); + + const ytDlpArguments = prepareYtDlpArguments(url, assetPath); + + try { + logger.info( + `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`, + ); + + await execa`yt-dlp ${ytDlpArguments}`; + const downloadPath = await findAssetFile(videoAssetId); + if (!downloadPath) { + logger.info( + "[VideoCrawler][${jobId}] yt-dlp didn't download anything. Skipping ...", + ); + return; + } + assetPath = downloadPath; + } catch (e) { + const err = e as Error; + if (err.message.includes("ERROR: Unsupported URL:")) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it's not one of the supported yt-dlp URLs`, + ); + return; + } + console.log(JSON.stringify(err)); + logger.error( + `[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`, + ); + await deleteLeftOverAssetFile(jobId, videoAssetId); + return; + } + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, + ); + await saveAssetFromFile({ + userId, + assetId: videoAssetId, + assetPath, + metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, + }); + + await db.transaction(async (txn) => { + await updateAsset( + oldVideoAssetId, + { + id: videoAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_VIDEO, + contentType: ASSET_TYPES.VIDEO_MP4, + size: await getAssetSize({ userId, assetId: videoAssetId }), + }, + txn, + ); + }); + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, + ); +} + +/** + * Deletes leftover assets in case the download fails + * + * @param jobId the id of the job + * @param assetId the id of the asset to delete + */ +async function deleteLeftOverAssetFile( + jobId: string, + assetId: string, +): Promise { + let assetFile; + try { + assetFile = await findAssetFile(assetId); + } catch { + // ignore exception, no asset file was found + return; + } + if (!assetFile) { + return; + } + logger.info( + `[VideoCrawler][${jobId}] Deleting leftover video asset "${assetFile}".`, + ); + try { + await fs.promises.rm(assetFile); + } catch (e) { + logger.error( + `[VideoCrawler][${jobId}] Failed deleting leftover video asset "${assetFile}".`, + ); + } +} + +/** + * yt-dlp automatically adds a file ending to the passed in filename --> we have to search it again in the folder + * + * @param assetId the id of the asset to search + * @returns the path to the downloaded asset + */ +async function findAssetFile(assetId: string): Promise { + const files = await fs.promises.readdir(TMP_FOLDER); + for (const file of files) { + if (file.startsWith(assetId)) { + return path.join(TMP_FOLDER, file); + } + } + return null; +} diff --git a/apps/workers/workerUtils.ts b/apps/workers/workerUtils.ts new file mode 100644 index 00000000..e93d241b --- /dev/null +++ b/apps/workers/workerUtils.ts @@ -0,0 +1,48 @@ +import { eq } from "drizzle-orm"; + +import { db, HoarderDBTransaction } from "@hoarder/db"; +import { assets, AssetTypes, bookmarks } from "@hoarder/db/schema"; + +type DBAssetType = typeof assets.$inferInsert; + +export async function updateAsset( + oldAssetId: string | undefined, + newAsset: DBAssetType, + txn: HoarderDBTransaction, +) { + if (oldAssetId) { + await txn.delete(assets).where(eq(assets.id, oldAssetId)); + } + + await txn.insert(assets).values(newAsset); +} + +export async function getBookmarkDetails(bookmarkId: string) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + assets: true, + }, + }); + + if (!bookmark || !bookmark.link) { + throw new Error("The bookmark either doesn't exist or is not a link"); + } + return { + url: bookmark.link.url, + userId: bookmark.userId, + screenshotAssetId: bookmark.assets.find( + (a) => a.assetType == AssetTypes.LINK_SCREENSHOT, + )?.id, + imageAssetId: bookmark.assets.find( + (a) => a.assetType == AssetTypes.LINK_BANNER_IMAGE, + )?.id, + fullPageArchiveAssetId: bookmark.assets.find( + (a) => a.assetType == AssetTypes.LINK_FULL_PAGE_ARCHIVE, + )?.id, + videoAssetId: bookmark.assets.find( + (a) => a.assetType == AssetTypes.LINK_VIDEO, + )?.id, + }; +} -- cgit v1.2.3-70-g09d2