From 7ab7db8e48360417498643eec2384b0fcb7fbdfb Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Tue, 30 Dec 2025 12:43:08 +0200 Subject: chore: worker tracing (#2321) --- apps/workers/index.ts | 4 + apps/workers/workerTracing.ts | 43 + apps/workers/workers/adminMaintenanceWorker.ts | 6 +- apps/workers/workers/assetPreprocessingWorker.ts | 3 +- apps/workers/workers/backupWorker.ts | 3 +- apps/workers/workers/crawlerWorker.ts | 1774 +++++++++++---------- apps/workers/workers/feedWorker.ts | 3 +- apps/workers/workers/inference/inferenceWorker.ts | 3 +- apps/workers/workers/ruleEngineWorker.ts | 3 +- apps/workers/workers/searchWorker.ts | 3 +- apps/workers/workers/videoWorker.ts | 3 +- apps/workers/workers/webhookWorker.ts | 3 +- 12 files changed, 1030 insertions(+), 821 deletions(-) create mode 100644 apps/workers/workerTracing.ts (limited to 'apps/workers') diff --git a/apps/workers/index.ts b/apps/workers/index.ts index b605b50f..07840a4c 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -3,8 +3,10 @@ import "dotenv/config"; import { buildServer } from "server"; import { + initTracing, loadAllPlugins, prepareQueue, + shutdownTracing, startQueue, } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; @@ -51,6 +53,7 @@ function isWorkerEnabled(name: WorkerName) { async function main() { await loadAllPlugins(); + initTracing("workers"); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); await prepareQueue(); @@ -97,6 +100,7 @@ async function main() { worker.stop(); } await httpServer.stop(); + await shutdownTracing(); process.exit(0); } diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts new file mode 100644 index 00000000..3ff16d1c --- /dev/null +++ b/apps/workers/workerTracing.ts @@ -0,0 +1,43 @@ +import type { DequeuedJob } from "@karakeep/shared/queueing"; +import { getTracer, withSpan } from "@karakeep/shared-server"; + +const tracer = getTracer("@karakeep/workers"); + +type WorkerRunFn = ( + job: DequeuedJob, +) => Promise; + +/** + * Wraps a worker run function with OpenTelemetry tracing. + * Creates a span for each job execution and automatically handles error recording. + * + * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run") + * @param fn - The worker run function to wrap + * @returns A wrapped function that executes within a traced span + * + * @example + * ```ts + * const run = withWorkerTracing("feedWorker.run", async (job) => { + * // Your worker logic here + * }); + * ``` + */ +export function withWorkerTracing( + name: string, + fn: WorkerRunFn, +): WorkerRunFn { + return async (job: DequeuedJob): Promise => { + return await withSpan( + tracer, + name, + { + attributes: { + "job.id": job.id, + "job.priority": job.priority, + "job.runNumber": job.runNumber, + }, + }, + () => fn(job), + ); + }; +} diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts index e5312964..92d52a22 100644 --- a/apps/workers/workers/adminMaintenanceWorker.ts +++ b/apps/workers/workers/adminMaintenanceWorker.ts @@ -1,4 +1,5 @@ import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import { AdminMaintenanceQueue, @@ -20,7 +21,10 @@ export class AdminMaintenanceWorker { (await getQueueClient())!.createRunner( AdminMaintenanceQueue, { - run: runAdminMaintenance, + run: withWorkerTracing( + "adminMaintenanceWorker.run", + runAdminMaintenance, + ), onComplete: (job) => { workerStatsCounter .labels(`adminMaintenance:${job.data.type}`, "completed") diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index b585a15e..a5b439fc 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -4,6 +4,7 @@ import { workerStatsCounter } from "metrics"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; +import { withWorkerTracing } from "workerTracing"; import type { AssetPreprocessingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -36,7 +37,7 @@ export class AssetPreprocessingWorker { (await getQueueClient())!.createRunner( AssetPreprocessingQueue, { - run: run, + run: withWorkerTracing("assetPreprocessingWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("assetPreprocessing", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts index c2d1ae5a..01f54b28 100644 --- a/apps/workers/workers/backupWorker.ts +++ b/apps/workers/workers/backupWorker.ts @@ -8,6 +8,7 @@ import archiver from "archiver"; import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import cron from "node-cron"; +import { withWorkerTracing } from "workerTracing"; import type { ZBackupRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -107,7 +108,7 @@ export class BackupWorker { const worker = (await getQueueClient())!.createRunner( BackupQueue, { - run: run, + run: withWorkerTracing("backupWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("backup", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 411f615a..597d45d3 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -37,6 +37,7 @@ import { import { Browser, BrowserContextOptions } from "playwright"; import { chromium } from "playwright-extra"; import StealthPlugin from "puppeteer-extra-plugin-stealth"; +import { withWorkerTracing } from "workerTracing"; import { getBookmarkDetails, updateAsset } from "workerUtils"; import { z } from "zod"; @@ -52,12 +53,14 @@ import { } from "@karakeep/db/schema"; import { AssetPreprocessingQueue, + getTracer, LinkCrawlerQueue, OpenAIQueue, QuotaService, triggerSearchReindex, triggerWebhook, VideoWorkerQueue, + withSpan, zCrawlLinkRequestSchema, } from "@karakeep/shared-server"; import { @@ -86,6 +89,8 @@ import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import metascraperAmazonImproved from "../metascraper-plugins/metascraper-amazon-improved"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; +const tracer = getTracer("@karakeep/workers"); + function abortPromise(signal: AbortSignal): Promise { if (signal.aborted) { const p = Promise.reject(signal.reason ?? new Error("AbortError")); @@ -325,7 +330,7 @@ export class CrawlerWorker { >( LinkCrawlerQueue, { - run: runCrawler, + run: withWorkerTracing("crawlerWorker.run", runCrawler), onComplete: async (job) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; @@ -427,22 +432,29 @@ async function browserlessCrawlPage( url: string, abortSignal: AbortSignal, ) { - logger.info( - `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, - ); - const response = await fetchWithProxy(url, { - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - logger.info( - `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + return await withSpan( + tracer, + "crawlerWorker.browserlessCrawlPage", + { attributes: { url, jobId } }, + async () => { + logger.info( + `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, + ); + const response = await fetchWithProxy(url, { + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + logger.info( + `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + ); + return { + htmlContent: await response.text(), + statusCode: response.status, + screenshot: undefined, + pdf: undefined, + url: response.url, + }; + }, ); - return { - htmlContent: await response.text(), - statusCode: response.status, - screenshot: undefined, - pdf: undefined, - url: response.url, - }; } async function crawlPage( @@ -458,229 +470,243 @@ async function crawlPage( statusCode: number; url: string; }> { - // Check user's browser crawling setting - const userData = await db.query.users.findFirst({ - where: eq(users.id, userId), - columns: { browserCrawlingEnabled: true }, - }); - if (!userData) { - logger.error(`[Crawler][${jobId}] User ${userId} not found`); - throw new Error(`User ${userId} not found`); - } - - const browserCrawlingEnabled = userData.browserCrawlingEnabled; - - if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) { - return browserlessCrawlPage(jobId, url, abortSignal); - } - - let browser: Browser | undefined; - if (serverConfig.crawler.browserConnectOnDemand) { - browser = await startBrowserInstance(); - } else { - browser = globalBrowser; - } - if (!browser) { - return browserlessCrawlPage(jobId, url, abortSignal); - } - - const proxyConfig = getPlaywrightProxyConfig(); - const isRunningInProxyContext = - proxyConfig !== undefined && - !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []); - const context = await browser.newContext({ - viewport: { width: 1440, height: 900 }, - userAgent: - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - proxy: proxyConfig, - }); - - try { - if (globalCookies.length > 0) { - await context.addCookies(globalCookies); - logger.info( - `[Crawler][${jobId}] Cookies successfully loaded into browser context`, - ); - } + return await withSpan( + tracer, + "crawlerWorker.crawlPage", + { attributes: { url, jobId, userId, forceStorePdf } }, + async () => { + // Check user's browser crawling setting + const userData = await db.query.users.findFirst({ + where: eq(users.id, userId), + columns: { browserCrawlingEnabled: true }, + }); + if (!userData) { + logger.error(`[Crawler][${jobId}] User ${userId} not found`); + throw new Error(`User ${userId} not found`); + } - // Create a new page in the context - const page = await context.newPage(); + const browserCrawlingEnabled = userData.browserCrawlingEnabled; - // Apply ad blocking - if (globalBlocker) { - await globalBlocker.enableBlockingInPage(page); - } + if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) { + return browserlessCrawlPage(jobId, url, abortSignal); + } - // Block audio/video resources and disallowed sub-requests - await page.route("**/*", async (route) => { - if (abortSignal.aborted) { - await route.abort("aborted"); - return; + let browser: Browser | undefined; + if (serverConfig.crawler.browserConnectOnDemand) { + browser = await startBrowserInstance(); + } else { + browser = globalBrowser; } - const request = route.request(); - const resourceType = request.resourceType(); - - // Block audio/video resources - if ( - resourceType === "media" || - request.headers()["content-type"]?.includes("video/") || - request.headers()["content-type"]?.includes("audio/") - ) { - await route.abort("aborted"); - return; + if (!browser) { + return browserlessCrawlPage(jobId, url, abortSignal); } - const requestUrl = request.url(); - const requestIsRunningInProxyContext = + const proxyConfig = getPlaywrightProxyConfig(); + const isRunningInProxyContext = proxyConfig !== undefined && - !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []); - if ( - requestUrl.startsWith("http://") || - requestUrl.startsWith("https://") - ) { - const validation = await validateUrl( - requestUrl, - requestIsRunningInProxyContext, - ); - if (!validation.ok) { - logger.warn( - `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`, + !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []); + const context = await browser.newContext({ + viewport: { width: 1440, height: 900 }, + userAgent: + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + proxy: proxyConfig, + }); + + try { + if (globalCookies.length > 0) { + await context.addCookies(globalCookies); + logger.info( + `[Crawler][${jobId}] Cookies successfully loaded into browser context`, ); - await route.abort("blockedbyclient"); - return; } - } - // Continue with other requests - await route.continue(); - }); + // Create a new page in the context + const page = await context.newPage(); - // Navigate to the target URL - const navigationValidation = await validateUrl( - url, - isRunningInProxyContext, - ); - if (!navigationValidation.ok) { - throw new Error( - `Disallowed navigation target "${url}": ${navigationValidation.reason}`, - ); - } - const targetUrl = navigationValidation.url.toString(); - logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`); - const response = await Promise.race([ - page.goto(targetUrl, { - timeout: serverConfig.crawler.navigateTimeoutSec * 1000, - waitUntil: "domcontentloaded", - }), - abortPromise(abortSignal).then(() => null), - ]); + // Apply ad blocking + if (globalBlocker) { + await globalBlocker.enableBlockingInPage(page); + } - logger.info( - `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`, - ); + // Block audio/video resources and disallowed sub-requests + await page.route("**/*", async (route) => { + if (abortSignal.aborted) { + await route.abort("aborted"); + return; + } + const request = route.request(); + const resourceType = request.resourceType(); + + // Block audio/video resources + if ( + resourceType === "media" || + request.headers()["content-type"]?.includes("video/") || + request.headers()["content-type"]?.includes("audio/") + ) { + await route.abort("aborted"); + return; + } - // Wait until network is relatively idle or timeout after 5 seconds - await Promise.race([ - page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})), - new Promise((resolve) => setTimeout(resolve, 5000)), - abortPromise(abortSignal), - ]); + const requestUrl = request.url(); + const requestIsRunningInProxyContext = + proxyConfig !== undefined && + !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []); + if ( + requestUrl.startsWith("http://") || + requestUrl.startsWith("https://") + ) { + const validation = await validateUrl( + requestUrl, + requestIsRunningInProxyContext, + ); + if (!validation.ok) { + logger.warn( + `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`, + ); + await route.abort("blockedbyclient"); + return; + } + } - abortSignal.throwIfAborted(); + // Continue with other requests + await route.continue(); + }); - logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); + // Navigate to the target URL + const navigationValidation = await validateUrl( + url, + isRunningInProxyContext, + ); + if (!navigationValidation.ok) { + throw new Error( + `Disallowed navigation target "${url}": ${navigationValidation.reason}`, + ); + } + const targetUrl = navigationValidation.url.toString(); + logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`); + const response = await Promise.race([ + page.goto(targetUrl, { + timeout: serverConfig.crawler.navigateTimeoutSec * 1000, + waitUntil: "domcontentloaded", + }), + abortPromise(abortSignal).then(() => null), + ]); - // Extract content from the page - const htmlContent = await page.content(); + logger.info( + `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`, + ); - abortSignal.throwIfAborted(); + // Wait until network is relatively idle or timeout after 5 seconds + await Promise.race([ + page + .waitForLoadState("networkidle", { timeout: 5000 }) + .catch(() => ({})), + new Promise((resolve) => setTimeout(resolve, 5000)), + abortPromise(abortSignal), + ]); - logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); + abortSignal.throwIfAborted(); - // Take a screenshot if configured - let screenshot: Buffer | undefined = undefined; - if (serverConfig.crawler.storeScreenshot) { - const { data: screenshotData, error: screenshotError } = await tryCatch( - Promise.race([ - page.screenshot({ - // If you change this, you need to change the asset type in the store function. - type: "jpeg", - fullPage: serverConfig.crawler.fullPageScreenshot, - quality: 80, - }), - new Promise((_, reject) => - setTimeout( - () => - reject( - "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", - ), - serverConfig.crawler.screenshotTimeoutSec * 1000, - ), - ), - abortPromise(abortSignal).then(() => Buffer.from("")), - ]), - ); - abortSignal.throwIfAborted(); - if (screenshotError) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, - ); - } else { logger.info( - `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, + `[Crawler][${jobId}] Finished waiting for the page to load.`, ); - screenshot = screenshotData; - } - } - // Capture PDF if configured or explicitly requested - let pdf: Buffer | undefined = undefined; - if (serverConfig.crawler.storePdf || forceStorePdf) { - const { data: pdfData, error: pdfError } = await tryCatch( - Promise.race([ - page.pdf({ - format: "A4", - printBackground: true, - }), - new Promise((_, reject) => - setTimeout( - () => - reject( - "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", - ), - serverConfig.crawler.screenshotTimeoutSec * 1000, - ), - ), - abortPromise(abortSignal).then(() => Buffer.from("")), - ]), - ); - abortSignal.throwIfAborted(); - if (pdfError) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`, - ); - } else { + // Extract content from the page + const htmlContent = await page.content(); + + abortSignal.throwIfAborted(); + logger.info( - `[Crawler][${jobId}] Finished capturing page content as PDF`, + `[Crawler][${jobId}] Successfully fetched the page content.`, ); - pdf = pdfData; - } - } - return { - htmlContent, - statusCode: response?.status() ?? 0, - screenshot, - pdf, - url: page.url(), - }; - } finally { - await context.close(); - // Only close the browser if it was created on demand - if (serverConfig.crawler.browserConnectOnDemand) { - await browser.close(); - } - } + // Take a screenshot if configured + let screenshot: Buffer | undefined = undefined; + if (serverConfig.crawler.storeScreenshot) { + const { data: screenshotData, error: screenshotError } = + await tryCatch( + Promise.race([ + page.screenshot({ + // If you change this, you need to change the asset type in the store function. + type: "jpeg", + fullPage: serverConfig.crawler.fullPageScreenshot, + quality: 80, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, + ), + ), + abortPromise(abortSignal).then(() => Buffer.from("")), + ]), + ); + abortSignal.throwIfAborted(); + if (screenshotError) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, + ); + } else { + logger.info( + `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, + ); + screenshot = screenshotData; + } + } + + // Capture PDF if configured or explicitly requested + let pdf: Buffer | undefined = undefined; + if (serverConfig.crawler.storePdf || forceStorePdf) { + const { data: pdfData, error: pdfError } = await tryCatch( + Promise.race([ + page.pdf({ + format: "A4", + printBackground: true, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, + ), + ), + abortPromise(abortSignal).then(() => Buffer.from("")), + ]), + ); + abortSignal.throwIfAborted(); + if (pdfError) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`, + ); + } else { + logger.info( + `[Crawler][${jobId}] Finished capturing page content as PDF`, + ); + pdf = pdfData; + } + } + + return { + htmlContent, + statusCode: response?.status() ?? 0, + screenshot, + pdf, + url: page.url(), + }; + } finally { + await context.close(); + // Only close the browser if it was created on demand + if (serverConfig.crawler.browserConnectOnDemand) { + await browser.close(); + } + } + }, + ); } async function extractMetadata( @@ -688,54 +714,70 @@ async function extractMetadata( url: string, jobId: string, ) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + return await withSpan( + tracer, + "crawlerWorker.extractMetadata", + { attributes: { url, jobId } }, + async () => { + logger.info( + `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + ); + const meta = await metascraperParser({ + url, + html: htmlContent, + // We don't want to validate the URL again as we've already done it by visiting the page. + // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). + validateUrl: false, + }); + logger.info( + `[Crawler][${jobId}] Done extracting metadata from the page.`, + ); + return meta; + }, ); - const meta = await metascraperParser({ - url, - html: htmlContent, - // We don't want to validate the URL again as we've already done it by visiting the page. - // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). - validateUrl: false, - }); - logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); - return meta; } -function extractReadableContent( +async function extractReadableContent( htmlContent: string, url: string, jobId: string, ) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract readable content ...`, - ); - const virtualConsole = new VirtualConsole(); - const dom = new JSDOM(htmlContent, { url, virtualConsole }); - let result: { content: string } | null = null; - try { - const readableContent = new Readability(dom.window.document).parse(); - if (!readableContent || typeof readableContent.content !== "string") { - return null; - } - - const purifyWindow = new JSDOM("").window; - try { - const purify = DOMPurify(purifyWindow); - const purifiedHTML = purify.sanitize(readableContent.content); + return await withSpan( + tracer, + "crawlerWorker.extractReadableContent", + { attributes: { url, jobId } }, + async () => { + logger.info( + `[Crawler][${jobId}] Will attempt to extract readable content ...`, + ); + const virtualConsole = new VirtualConsole(); + const dom = new JSDOM(htmlContent, { url, virtualConsole }); + let result: { content: string } | null = null; + try { + const readableContent = new Readability(dom.window.document).parse(); + if (!readableContent || typeof readableContent.content !== "string") { + return null; + } - logger.info(`[Crawler][${jobId}] Done extracting readable content.`); - result = { - content: purifiedHTML, - }; - } finally { - purifyWindow.close(); - } - } finally { - dom.window.close(); - } + const purifyWindow = new JSDOM("").window; + try { + const purify = DOMPurify(purifyWindow); + const purifiedHTML = purify.sanitize(readableContent.content); + + logger.info(`[Crawler][${jobId}] Done extracting readable content.`); + result = { + content: purifiedHTML, + }; + } finally { + purifyWindow.close(); + } + } finally { + dom.window.close(); + } - return result; + return result; + }, + ); } async function storeScreenshot( @@ -743,45 +785,58 @@ async function storeScreenshot( userId: string, jobId: string, ) { - if (!serverConfig.crawler.storeScreenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, - ); - return null; - } - if (!screenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, - ); - return null; - } - const assetId = newAssetId(); - const contentType = "image/jpeg"; - const fileName = "screenshot.jpeg"; + return await withSpan( + tracer, + "crawlerWorker.storeScreenshot", + { + attributes: { + jobId, + userId, + size: screenshot?.byteLength ?? 0, + }, + }, + async () => { + if (!serverConfig.crawler.storeScreenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, + ); + return null; + } + if (!screenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "image/jpeg"; + const fileName = "screenshot.jpeg"; - // Check storage quota before saving the screenshot - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, screenshot.byteLength), - ); + // Check storage quota before saving the screenshot + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, screenshot.byteLength), + ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`, - ); - return null; - } + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - quotaApproved, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`, + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; + }, ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; } async function storePdf( @@ -789,37 +844,52 @@ async function storePdf( userId: string, jobId: string, ) { - if (!pdf) { - logger.info(`[Crawler][${jobId}] Skipping storing the PDF as it's empty.`); - return null; - } - const assetId = newAssetId(); - const contentType = "application/pdf"; - const fileName = "page.pdf"; + return await withSpan( + tracer, + "crawlerWorker.storePdf", + { + attributes: { + jobId, + userId, + size: pdf?.byteLength ?? 0, + }, + }, + async () => { + if (!pdf) { + logger.info( + `[Crawler][${jobId}] Skipping storing the PDF as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "application/pdf"; + const fileName = "page.pdf"; - // Check storage quota before saving the PDF - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, pdf.byteLength), - ); + // Check storage quota before saving the PDF + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, pdf.byteLength), + ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`, - ); - return null; - } + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: pdf, - quotaApproved, - }); - logger.info( - `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`, + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: pdf, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`, + ); + return { assetId, contentType, fileName, size: pdf.byteLength }; + }, ); - return { assetId, contentType, fileName, size: pdf.byteLength }; } async function downloadAndStoreFile( @@ -829,91 +899,98 @@ async function downloadAndStoreFile( fileType: string, abortSignal: AbortSignal, ) { - let assetPath: string | undefined; - try { - logger.info( - `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`, - ); - const response = await fetchWithProxy(url, { - signal: abortSignal, - }); - if (!response.ok || response.body == null) { - throw new Error(`Failed to download ${fileType}: ${response.status}`); - } - - const contentType = normalizeContentType( - response.headers.get("content-type"), - ); - if (!contentType) { - throw new Error("No content type in the response"); - } + return await withSpan( + tracer, + "crawlerWorker.downloadAndStoreFile", + { attributes: { url, jobId, userId, fileType } }, + async () => { + let assetPath: string | undefined; + try { + logger.info( + `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`, + ); + const response = await fetchWithProxy(url, { + signal: abortSignal, + }); + if (!response.ok || response.body == null) { + throw new Error(`Failed to download ${fileType}: ${response.status}`); + } - const assetId = newAssetId(); - assetPath = path.join(os.tmpdir(), assetId); - - let bytesRead = 0; - const contentLengthEnforcer = new Transform({ - transform(chunk, _, callback) { - bytesRead += chunk.length; - - if (abortSignal.aborted) { - callback(new Error("AbortError")); - } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) { - callback( - new Error( - `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`, - ), - ); - } else { - callback(null, chunk); // pass data along unchanged + const contentType = normalizeContentType( + response.headers.get("content-type"), + ); + if (!contentType) { + throw new Error("No content type in the response"); } - }, - flush(callback) { - callback(); - }, - }); - await pipeline( - response.body, - contentLengthEnforcer, - fsSync.createWriteStream(assetPath), - ); + const assetId = newAssetId(); + assetPath = path.join(os.tmpdir(), assetId); - // Check storage quota before saving the asset - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, bytesRead), - ); + let bytesRead = 0; + const contentLengthEnforcer = new Transform({ + transform(chunk, _, callback) { + bytesRead += chunk.length; - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`, - ); - return null; - } + if (abortSignal.aborted) { + callback(new Error("AbortError")); + } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) { + callback( + new Error( + `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`, + ), + ); + } else { + callback(null, chunk); // pass data along unchanged + } + }, + flush(callback) { + callback(); + }, + }); - await saveAssetFromFile({ - userId, - assetId, - metadata: { contentType }, - assetPath, - quotaApproved, - }); + await pipeline( + response.body, + contentLengthEnforcer, + fsSync.createWriteStream(assetPath), + ); - logger.info( - `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`, - ); + // Check storage quota before saving the asset + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, bytesRead), + ); - return { assetId, userId, contentType, size: bytesRead }; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, - ); - return null; - } finally { - if (assetPath) { - await tryCatch(fs.unlink(assetPath)); - } - } + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } + + await saveAssetFromFile({ + userId, + assetId, + metadata: { contentType }, + assetPath, + quotaApproved, + }); + + logger.info( + `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`, + ); + + return { assetId, userId, contentType, size: bytesRead }; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, + ); + return null; + } finally { + if (assetPath) { + await tryCatch(fs.unlink(assetPath)); + } + } + }, + ); } async function downloadAndStoreImage( @@ -938,77 +1015,84 @@ async function archiveWebpage( jobId: string, abortSignal: AbortSignal, ) { - logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); - const assetId = newAssetId(); - const assetPath = path.join(os.tmpdir(), assetId); - - let res = await execa({ - input: html, - cancelSignal: abortSignal, - env: { - https_proxy: serverConfig.proxy.httpsProxy - ? getRandomProxy(serverConfig.proxy.httpsProxy) - : undefined, - http_proxy: serverConfig.proxy.httpProxy - ? getRandomProxy(serverConfig.proxy.httpProxy) - : undefined, - no_proxy: serverConfig.proxy.noProxy?.join(","), - }, - })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); + return await withSpan( + tracer, + "crawlerWorker.archiveWebpage", + { attributes: { url, jobId, userId } }, + async () => { + logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); + const assetId = newAssetId(); + const assetPath = path.join(os.tmpdir(), assetId); + + let res = await execa({ + input: html, + cancelSignal: abortSignal, + env: { + https_proxy: serverConfig.proxy.httpsProxy + ? getRandomProxy(serverConfig.proxy.httpsProxy) + : undefined, + http_proxy: serverConfig.proxy.httpProxy + ? getRandomProxy(serverConfig.proxy.httpProxy) + : undefined, + no_proxy: serverConfig.proxy.noProxy?.join(","), + }, + })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); - if (res.isCanceled) { - logger.error( - `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + if (res.isCanceled) { + logger.error( + `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - if (res.exitCode !== 0) { - logger.error( - `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + if (res.exitCode !== 0) { + logger.error( + `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - const contentType = "text/html"; + const contentType = "text/html"; - // Get file size and check quota before saving - const stats = await fs.stat(assetPath); - const fileSize = stats.size; + // Get file size and check quota before saving + const stats = await fs.stat(assetPath); + const fileSize = stats.size; - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, fileSize), - ); + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, fileSize), + ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - quotaApproved, - }); + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + quotaApproved, + }); - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, - ); + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; + }, + ); } async function getContentType( @@ -1016,26 +1100,33 @@ async function getContentType( jobId: string, abortSignal: AbortSignal, ): Promise { - try { - logger.info( - `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, - ); - const response = await fetchWithProxy(url, { - method: "GET", - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - const rawContentType = response.headers.get("content-type"); - const contentType = normalizeContentType(rawContentType); - logger.info( - `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, - ); - return contentType; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, - ); - return null; - } + return await withSpan( + tracer, + "crawlerWorker.getContentType", + { attributes: { url, jobId } }, + async () => { + try { + logger.info( + `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, + ); + const response = await fetchWithProxy(url, { + method: "GET", + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + const rawContentType = response.headers.get("content-type"); + const contentType = normalizeContentType(rawContentType); + logger.info( + `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, + ); + return contentType; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, + ); + return null; + } + }, + ); } /** @@ -1054,53 +1145,60 @@ async function handleAsAssetBookmark( bookmarkId: string, abortSignal: AbortSignal, ) { - const downloaded = await downloadAndStoreFile( - url, - userId, - jobId, - assetType, - abortSignal, - ); - if (!downloaded) { - return; - } - const fileName = path.basename(new URL(url).pathname); - await db.transaction(async (trx) => { - await updateAsset( - undefined, - { - id: downloaded.assetId, - bookmarkId, + return await withSpan( + tracer, + "crawlerWorker.handleAsAssetBookmark", + { attributes: { url, jobId, userId, bookmarkId, assetType } }, + async () => { + const downloaded = await downloadAndStoreFile( + url, userId, - assetType: AssetTypes.BOOKMARK_ASSET, - contentType: downloaded.contentType, - size: downloaded.size, - fileName, - }, - trx, - ); - await trx.insert(bookmarkAssets).values({ - id: bookmarkId, - assetType, - assetId: downloaded.assetId, - content: null, - fileName, - sourceUrl: url, - }); - // Switch the type of the bookmark from LINK to ASSET - await trx - .update(bookmarks) - .set({ type: BookmarkTypes.ASSET }) - .where(eq(bookmarks.id, bookmarkId)); - await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); - }); - await AssetPreprocessingQueue.enqueue( - { - bookmarkId, - fixMode: false, - }, - { - groupId: userId, + jobId, + assetType, + abortSignal, + ); + if (!downloaded) { + return; + } + const fileName = path.basename(new URL(url).pathname); + await db.transaction(async (trx) => { + await updateAsset( + undefined, + { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.BOOKMARK_ASSET, + contentType: downloaded.contentType, + size: downloaded.size, + fileName, + }, + trx, + ); + await trx.insert(bookmarkAssets).values({ + id: bookmarkId, + assetType, + assetId: downloaded.assetId, + content: null, + fileName, + sourceUrl: url, + }); + // Switch the type of the bookmark from LINK to ASSET + await trx + .update(bookmarks) + .set({ type: BookmarkTypes.ASSET }) + .where(eq(bookmarks.id, bookmarkId)); + await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); + }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId, + fixMode: false, + }, + { + groupId: userId, + }, + ); }, ); } @@ -1115,60 +1213,73 @@ async function storeHtmlContent( userId: string, jobId: string, ): Promise { - if (!htmlContent) { - return { result: "not_stored" }; - } + return await withSpan( + tracer, + "crawlerWorker.storeHtmlContent", + { + attributes: { + jobId, + userId, + contentSize: htmlContent ? Buffer.byteLength(htmlContent, "utf8") : 0, + }, + }, + async () => { + if (!htmlContent) { + return { result: "not_stored" }; + } - const contentSize = Buffer.byteLength(htmlContent, "utf8"); + const contentSize = Buffer.byteLength(htmlContent, "utf8"); - // Only store in assets if content is >= 50KB - if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) { - logger.info( - `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`, - ); - return { result: "store_inline" }; - } + // Only store in assets if content is >= 50KB + if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) { + logger.info( + `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`, + ); + return { result: "store_inline" }; + } - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, contentSize), - ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`, - ); - return { result: "not_stored" }; - } + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, contentSize), + ); + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`, + ); + return { result: "not_stored" }; + } - const assetId = newAssetId(); + const assetId = newAssetId(); - const { error: saveError } = await tryCatch( - saveAsset({ - userId, - assetId, - asset: Buffer.from(htmlContent, "utf8"), - metadata: { - contentType: ASSET_TYPES.TEXT_HTML, - fileName: null, - }, - quotaApproved, - }), - ); - if (saveError) { - logger.error( - `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`, - ); - throw saveError; - } + const { error: saveError } = await tryCatch( + saveAsset({ + userId, + assetId, + asset: Buffer.from(htmlContent, "utf8"), + metadata: { + contentType: ASSET_TYPES.TEXT_HTML, + fileName: null, + }, + quotaApproved, + }), + ); + if (saveError) { + logger.error( + `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`, + ); + throw saveError; + } - logger.info( - `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, - ); + logger.info( + `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, + ); - return { - result: "stored", - assetId, - size: contentSize, - }; + return { + result: "stored", + assetId, + size: contentSize, + }; + }, + ); } async function crawlAndParseUrl( @@ -1186,244 +1297,275 @@ async function crawlAndParseUrl( forceStorePdf: boolean, abortSignal: AbortSignal, ) { - let result: { - htmlContent: string; - screenshot: Buffer | undefined; - pdf: Buffer | undefined; - statusCode: number | null; - url: string; - }; - - if (precrawledArchiveAssetId) { - logger.info( - `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, - ); - const asset = await readAsset({ - userId, - assetId: precrawledArchiveAssetId, - }); - result = { - htmlContent: asset.asset.toString(), - screenshot: undefined, - pdf: undefined, - statusCode: 200, - url, - }; - } else { - result = await crawlPage(jobId, url, userId, forceStorePdf, abortSignal); - } - abortSignal.throwIfAborted(); - - const { htmlContent, screenshot, pdf, statusCode, url: browserUrl } = result; - - // Track status code in Prometheus - if (statusCode !== null) { - crawlerStatusCodeCounter.labels(statusCode.toString()).inc(); - } - - const meta = await Promise.race([ - extractMetadata(htmlContent, browserUrl, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); - - let readableContent: { content: string } | null = meta.readableContentHtml - ? { content: meta.readableContentHtml } - : null; - if (!readableContent) { - readableContent = await Promise.race([ - extractReadableContent( - meta.contentHtml ?? htmlContent, - browserUrl, + return await withSpan( + tracer, + "crawlerWorker.crawlAndParseUrl", + { + attributes: { + url, jobId, - ), - abortPromise(abortSignal), - ]); - } - abortSignal.throwIfAborted(); - - const screenshotAssetInfo = await Promise.race([ - storeScreenshot(screenshot, userId, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); - - const pdfAssetInfo = await Promise.race([ - storePdf(pdf, userId, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); - - const htmlContentAssetInfo = await storeHtmlContent( - readableContent?.content, - userId, - jobId, - ); - abortSignal.throwIfAborted(); - let imageAssetInfo: DBAssetType | null = null; - if (meta.image) { - const downloaded = await downloadAndStoreImage( - meta.image, - userId, - jobId, - abortSignal, - ); - if (downloaded) { - imageAssetInfo = { - id: downloaded.assetId, - bookmarkId, userId, - assetType: AssetTypes.LINK_BANNER_IMAGE, - contentType: downloaded.contentType, - size: downloaded.size, + bookmarkId, + archiveFullPage, + forceStorePdf, + hasPrecrawledArchive: !!precrawledArchiveAssetId, + }, + }, + async () => { + let result: { + htmlContent: string; + screenshot: Buffer | undefined; + pdf: Buffer | undefined; + statusCode: number | null; + url: string; }; - } - } - abortSignal.throwIfAborted(); - const parseDate = (date: string | undefined) => { - if (!date) { - return null; - } - try { - return new Date(date); - } catch { - return null; - } - }; - - // TODO(important): Restrict the size of content to store - const assetDeletionTasks: Promise[] = []; - const inlineHtmlContent = - htmlContentAssetInfo.result === "store_inline" - ? (readableContent?.content ?? null) - : null; - readableContent = null; - await db.transaction(async (txn) => { - await txn - .update(bookmarkLinks) - .set({ - title: meta.title, - description: meta.description, - // Don't store data URIs as they're not valid URLs and are usually quite large - imageUrl: meta.image?.startsWith("data:") ? null : meta.image, - favicon: meta.logo, - htmlContent: inlineHtmlContent, - contentAssetId: - htmlContentAssetInfo.result === "stored" - ? htmlContentAssetInfo.assetId - : null, - crawledAt: new Date(), - crawlStatusCode: statusCode, - author: meta.author, - publisher: meta.publisher, - datePublished: parseDate(meta.datePublished), - dateModified: parseDate(meta.dateModified), - }) - .where(eq(bookmarkLinks.id, bookmarkId)); - - if (screenshotAssetInfo) { - await updateAsset( - oldScreenshotAssetId, - { - id: screenshotAssetInfo.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_SCREENSHOT, - contentType: screenshotAssetInfo.contentType, - size: screenshotAssetInfo.size, - fileName: screenshotAssetInfo.fileName, - }, - txn, - ); - assetDeletionTasks.push(silentDeleteAsset(userId, oldScreenshotAssetId)); - } - if (pdfAssetInfo) { - await updateAsset( - oldPdfAssetId, - { - id: pdfAssetInfo.assetId, - bookmarkId, + if (precrawledArchiveAssetId) { + logger.info( + `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, + ); + const asset = await readAsset({ userId, - assetType: AssetTypes.LINK_PDF, - contentType: pdfAssetInfo.contentType, - size: pdfAssetInfo.size, - fileName: pdfAssetInfo.fileName, - }, - txn, - ); - assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId)); - } - if (imageAssetInfo) { - await updateAsset(oldImageAssetId, imageAssetInfo, txn); - assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId)); - } - if (htmlContentAssetInfo.result === "stored") { - await updateAsset( - oldContentAssetId, - { - id: htmlContentAssetInfo.assetId, - bookmarkId, + assetId: precrawledArchiveAssetId, + }); + result = { + htmlContent: asset.asset.toString(), + screenshot: undefined, + pdf: undefined, + statusCode: 200, + url, + }; + } else { + result = await crawlPage( + jobId, + url, userId, - assetType: AssetTypes.LINK_HTML_CONTENT, - contentType: ASSET_TYPES.TEXT_HTML, - size: htmlContentAssetInfo.size, - fileName: null, - }, - txn, - ); - assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); - } else if (oldContentAssetId) { - // Unlink the old content asset - await txn.delete(assets).where(eq(assets.id, oldContentAssetId)); - assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); - } - }); - - // Delete the old assets if any - await Promise.all(assetDeletionTasks); + forceStorePdf, + abortSignal, + ); + } + abortSignal.throwIfAborted(); - return async () => { - if ( - !precrawledArchiveAssetId && - (serverConfig.crawler.fullPageArchive || archiveFullPage) - ) { - const archiveResult = await archiveWebpage( + const { htmlContent, - browserUrl, + screenshot, + pdf, + statusCode, + url: browserUrl, + } = result; + + // Track status code in Prometheus + if (statusCode !== null) { + crawlerStatusCodeCounter.labels(statusCode.toString()).inc(); + } + + const meta = await Promise.race([ + extractMetadata(htmlContent, browserUrl, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); + + let readableContent: { content: string } | null = meta.readableContentHtml + ? { content: meta.readableContentHtml } + : null; + if (!readableContent) { + readableContent = await Promise.race([ + extractReadableContent( + meta.contentHtml ?? htmlContent, + browserUrl, + jobId, + ), + abortPromise(abortSignal), + ]); + } + abortSignal.throwIfAborted(); + + const screenshotAssetInfo = await Promise.race([ + storeScreenshot(screenshot, userId, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); + + const pdfAssetInfo = await Promise.race([ + storePdf(pdf, userId, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); + + const htmlContentAssetInfo = await storeHtmlContent( + readableContent?.content, userId, jobId, - abortSignal, ); + abortSignal.throwIfAborted(); + let imageAssetInfo: DBAssetType | null = null; + if (meta.image) { + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); + if (downloaded) { + imageAssetInfo = { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_BANNER_IMAGE, + contentType: downloaded.contentType, + size: downloaded.size, + }; + } + } + abortSignal.throwIfAborted(); - if (archiveResult) { - const { - assetId: fullPageArchiveAssetId, - size, - contentType, - } = archiveResult; + const parseDate = (date: string | undefined) => { + if (!date) { + return null; + } + try { + return new Date(date); + } catch { + return null; + } + }; - await db.transaction(async (txn) => { + // TODO(important): Restrict the size of content to store + const assetDeletionTasks: Promise[] = []; + const inlineHtmlContent = + htmlContentAssetInfo.result === "store_inline" + ? (readableContent?.content ?? null) + : null; + readableContent = null; + await db.transaction(async (txn) => { + await txn + .update(bookmarkLinks) + .set({ + title: meta.title, + description: meta.description, + // Don't store data URIs as they're not valid URLs and are usually quite large + imageUrl: meta.image?.startsWith("data:") ? null : meta.image, + favicon: meta.logo, + htmlContent: inlineHtmlContent, + contentAssetId: + htmlContentAssetInfo.result === "stored" + ? htmlContentAssetInfo.assetId + : null, + crawledAt: new Date(), + crawlStatusCode: statusCode, + author: meta.author, + publisher: meta.publisher, + datePublished: parseDate(meta.datePublished), + dateModified: parseDate(meta.dateModified), + }) + .where(eq(bookmarkLinks.id, bookmarkId)); + + if (screenshotAssetInfo) { await updateAsset( - oldFullPageArchiveAssetId, + oldScreenshotAssetId, { - id: fullPageArchiveAssetId, + id: screenshotAssetInfo.assetId, bookmarkId, userId, - assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, - contentType, - size, + assetType: AssetTypes.LINK_SCREENSHOT, + contentType: screenshotAssetInfo.contentType, + size: screenshotAssetInfo.size, + fileName: screenshotAssetInfo.fileName, + }, + txn, + ); + assetDeletionTasks.push( + silentDeleteAsset(userId, oldScreenshotAssetId), + ); + } + if (pdfAssetInfo) { + await updateAsset( + oldPdfAssetId, + { + id: pdfAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_PDF, + contentType: pdfAssetInfo.contentType, + size: pdfAssetInfo.size, + fileName: pdfAssetInfo.fileName, + }, + txn, + ); + assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId)); + } + if (imageAssetInfo) { + await updateAsset(oldImageAssetId, imageAssetInfo, txn); + assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId)); + } + if (htmlContentAssetInfo.result === "stored") { + await updateAsset( + oldContentAssetId, + { + id: htmlContentAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_HTML_CONTENT, + contentType: ASSET_TYPES.TEXT_HTML, + size: htmlContentAssetInfo.size, fileName: null, }, txn, ); - }); - if (oldFullPageArchiveAssetId) { - await silentDeleteAsset(userId, oldFullPageArchiveAssetId); + assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); + } else if (oldContentAssetId) { + // Unlink the old content asset + await txn.delete(assets).where(eq(assets.id, oldContentAssetId)); + assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); } - } - } - }; + }); + + // Delete the old assets if any + await Promise.all(assetDeletionTasks); + + return async () => { + if ( + !precrawledArchiveAssetId && + (serverConfig.crawler.fullPageArchive || archiveFullPage) + ) { + const archiveResult = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); + + if (archiveResult) { + const { + assetId: fullPageArchiveAssetId, + size, + contentType, + } = archiveResult; + + await db.transaction(async (txn) => { + await updateAsset( + oldFullPageArchiveAssetId, + { + id: fullPageArchiveAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, + contentType, + size, + fileName: null, + }, + txn, + ); + }); + if (oldFullPageArchiveAssetId) { + await silentDeleteAsset(userId, oldFullPageArchiveAssetId); + } + } + } + }; + }, + ); } /** @@ -1431,39 +1573,47 @@ async function crawlAndParseUrl( * @throws {QueueRetryAfterError} if the domain is rate limited */ async function checkDomainRateLimit(url: string, jobId: string): Promise { - const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; - if (!crawlerDomainRateLimitConfig) { - return; - } + return await withSpan( + tracer, + "crawlerWorker.checkDomainRateLimit", + { attributes: { url, jobId } }, + async () => { + const crawlerDomainRateLimitConfig = + serverConfig.crawler.domainRatelimiting; + if (!crawlerDomainRateLimitConfig) { + return; + } - const rateLimitClient = await getRateLimitClient(); - if (!rateLimitClient) { - return; - } + const rateLimitClient = await getRateLimitClient(); + if (!rateLimitClient) { + return; + } - const hostname = new URL(url).hostname; - const rateLimitResult = rateLimitClient.checkRateLimit( - { - name: "domain-ratelimit", - maxRequests: crawlerDomainRateLimitConfig.maxRequests, - windowMs: crawlerDomainRateLimitConfig.windowMs, + const hostname = new URL(url).hostname; + const rateLimitResult = rateLimitClient.checkRateLimit( + { + name: "domain-ratelimit", + maxRequests: crawlerDomainRateLimitConfig.maxRequests, + windowMs: crawlerDomainRateLimitConfig.windowMs, + }, + hostname, + ); + + if (!rateLimitResult.allowed) { + const resetInSeconds = rateLimitResult.resetInSeconds; + // Add jitter to prevent thundering herd: +40% random variation + const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 + const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); + logger.info( + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + ); + throw new QueueRetryAfterError( + `Domain "${hostname}" is rate limited`, + delayMs, + ); + } }, - hostname, ); - - if (!rateLimitResult.allowed) { - const resetInSeconds = rateLimitResult.resetInSeconds; - // Add jitter to prevent thundering herd: +40% random variation - const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 - const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); - logger.info( - `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, - ); - throw new QueueRetryAfterError( - `Domain "${hostname}" is rate limited`, - delayMs, - ); - } } async function runCrawler( diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 2a1334a9..cd4f3e82 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -4,6 +4,7 @@ import { fetchWithProxy } from "network"; import cron from "node-cron"; import Parser from "rss-parser"; import { buildImpersonatingTRPCClient } from "trpc"; +import { withWorkerTracing } from "workerTracing"; import { z } from "zod"; import type { ZFeedRequestSchema } from "@karakeep/shared-server"; @@ -88,7 +89,7 @@ export class FeedWorker { const worker = (await getQueueClient())!.createRunner( FeedQueue, { - run: run, + run: withWorkerTracing("feedWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("feed", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index eefc1dd8..57ad1a22 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import type { ZOpenAIRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -42,7 +43,7 @@ export class OpenAiWorker { const worker = (await getQueueClient())!.createRunner( OpenAIQueue, { - run: runOpenAI, + run: withWorkerTracing("inferenceWorker.run", runOpenAI), onComplete: async (job) => { workerStatsCounter.labels("inference", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 98a9de74..00e20127 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -1,6 +1,7 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import { buildImpersonatingAuthedContext } from "trpc"; +import { withWorkerTracing } from "workerTracing"; import type { ZRuleEngineRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -20,7 +21,7 @@ export class RuleEngineWorker { const worker = (await getQueueClient())!.createRunner( RuleEngineQueue, { - run: runRuleEngine, + run: withWorkerTracing("ruleEngineWorker.run", runRuleEngine), onComplete: (job) => { workerStatsCounter.labels("ruleEngine", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index fed30c9b..7a9c3a8d 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import type { ZSearchIndexingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -25,7 +26,7 @@ export class SearchIndexingWorker { (await getQueueClient())!.createRunner( SearchIndexingQueue, { - run: runSearchIndexing, + run: withWorkerTracing("searchWorker.run", runSearchIndexing), onComplete: (job) => { workerStatsCounter.labels("search", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index 03525fdf..1ffbf674 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -4,6 +4,7 @@ import path from "path"; import { execa } from "execa"; import { workerStatsCounter } from "metrics"; import { getProxyAgent, validateUrl } from "network"; +import { withWorkerTracing } from "workerTracing"; import { db } from "@karakeep/db"; import { AssetTypes } from "@karakeep/db/schema"; @@ -35,7 +36,7 @@ export class VideoWorker { return (await getQueueClient())!.createRunner( VideoWorkerQueue, { - run: runWorker, + run: withWorkerTracing("videoWorker.run", runWorker), onComplete: async (job) => { workerStatsCounter.labels("video", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 0d661372..875a0ac6 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -1,6 +1,7 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import { fetchWithProxy } from "network"; +import { withWorkerTracing } from "workerTracing"; import { db } from "@karakeep/db"; import { bookmarks, webhooksTable } from "@karakeep/db/schema"; @@ -19,7 +20,7 @@ export class WebhookWorker { const worker = (await getQueueClient())!.createRunner( WebhookQueue, { - run: runWebhook, + run: withWorkerTracing("webhookWorker.run", runWebhook), onComplete: async (job) => { workerStatsCounter.labels("webhook", "completed").inc(); const jobId = job.id; -- cgit v1.2.3-70-g09d2