aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/workers/index.ts4
-rw-r--r--apps/workers/workerTracing.ts43
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts6
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts3
-rw-r--r--apps/workers/workers/backupWorker.ts3
-rw-r--r--apps/workers/workers/crawlerWorker.ts1774
-rw-r--r--apps/workers/workers/feedWorker.ts3
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts3
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts3
-rw-r--r--apps/workers/workers/searchWorker.ts3
-rw-r--r--apps/workers/workers/videoWorker.ts3
-rw-r--r--apps/workers/workers/webhookWorker.ts3
12 files changed, 1030 insertions, 821 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index b605b50f..07840a4c 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -3,8 +3,10 @@ import "dotenv/config";
import { buildServer } from "server";
import {
+ initTracing,
loadAllPlugins,
prepareQueue,
+ shutdownTracing,
startQueue,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
@@ -51,6 +53,7 @@ function isWorkerEnabled(name: WorkerName) {
async function main() {
await loadAllPlugins();
+ initTracing("workers");
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
await prepareQueue();
@@ -97,6 +100,7 @@ async function main() {
worker.stop();
}
await httpServer.stop();
+ await shutdownTracing();
process.exit(0);
}
diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts
new file mode 100644
index 00000000..3ff16d1c
--- /dev/null
+++ b/apps/workers/workerTracing.ts
@@ -0,0 +1,43 @@
+import type { DequeuedJob } from "@karakeep/shared/queueing";
+import { getTracer, withSpan } from "@karakeep/shared-server";
+
+const tracer = getTracer("@karakeep/workers");
+
+type WorkerRunFn<TData, TResult = void> = (
+ job: DequeuedJob<TData>,
+) => Promise<TResult>;
+
+/**
+ * Wraps a worker run function with OpenTelemetry tracing.
+ * Creates a span for each job execution and automatically handles error recording.
+ *
+ * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run")
+ * @param fn - The worker run function to wrap
+ * @returns A wrapped function that executes within a traced span
+ *
+ * @example
+ * ```ts
+ * const run = withWorkerTracing("feedWorker.run", async (job) => {
+ * // Your worker logic here
+ * });
+ * ```
+ */
+export function withWorkerTracing<TData, TResult = void>(
+ name: string,
+ fn: WorkerRunFn<TData, TResult>,
+): WorkerRunFn<TData, TResult> {
+ return async (job: DequeuedJob<TData>): Promise<TResult> => {
+ return await withSpan(
+ tracer,
+ name,
+ {
+ attributes: {
+ "job.id": job.id,
+ "job.priority": job.priority,
+ "job.runNumber": job.runNumber,
+ },
+ },
+ () => fn(job),
+ );
+ };
+}
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
index e5312964..92d52a22 100644
--- a/apps/workers/workers/adminMaintenanceWorker.ts
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -1,4 +1,5 @@
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import {
AdminMaintenanceQueue,
@@ -20,7 +21,10 @@ export class AdminMaintenanceWorker {
(await getQueueClient())!.createRunner<ZAdminMaintenanceTask>(
AdminMaintenanceQueue,
{
- run: runAdminMaintenance,
+ run: withWorkerTracing(
+ "adminMaintenanceWorker.run",
+ runAdminMaintenance,
+ ),
onComplete: (job) => {
workerStatsCounter
.labels(`adminMaintenance:${job.data.type}`, "completed")
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index b585a15e..a5b439fc 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -4,6 +4,7 @@ import { workerStatsCounter } from "metrics";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
+import { withWorkerTracing } from "workerTracing";
import type { AssetPreprocessingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -36,7 +37,7 @@ export class AssetPreprocessingWorker {
(await getQueueClient())!.createRunner<AssetPreprocessingRequest>(
AssetPreprocessingQueue,
{
- run: run,
+ run: withWorkerTracing("assetPreprocessingWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("assetPreprocessing", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts
index c2d1ae5a..01f54b28 100644
--- a/apps/workers/workers/backupWorker.ts
+++ b/apps/workers/workers/backupWorker.ts
@@ -8,6 +8,7 @@ import archiver from "archiver";
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import cron from "node-cron";
+import { withWorkerTracing } from "workerTracing";
import type { ZBackupRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -107,7 +108,7 @@ export class BackupWorker {
const worker = (await getQueueClient())!.createRunner<ZBackupRequest>(
BackupQueue,
{
- run: run,
+ run: withWorkerTracing("backupWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("backup", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 411f615a..597d45d3 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -37,6 +37,7 @@ import {
import { Browser, BrowserContextOptions } from "playwright";
import { chromium } from "playwright-extra";
import StealthPlugin from "puppeteer-extra-plugin-stealth";
+import { withWorkerTracing } from "workerTracing";
import { getBookmarkDetails, updateAsset } from "workerUtils";
import { z } from "zod";
@@ -52,12 +53,14 @@ import {
} from "@karakeep/db/schema";
import {
AssetPreprocessingQueue,
+ getTracer,
LinkCrawlerQueue,
OpenAIQueue,
QuotaService,
triggerSearchReindex,
triggerWebhook,
VideoWorkerQueue,
+ withSpan,
zCrawlLinkRequestSchema,
} from "@karakeep/shared-server";
import {
@@ -86,6 +89,8 @@ import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import metascraperAmazonImproved from "../metascraper-plugins/metascraper-amazon-improved";
import metascraperReddit from "../metascraper-plugins/metascraper-reddit";
+const tracer = getTracer("@karakeep/workers");
+
function abortPromise(signal: AbortSignal): Promise<never> {
if (signal.aborted) {
const p = Promise.reject(signal.reason ?? new Error("AbortError"));
@@ -325,7 +330,7 @@ export class CrawlerWorker {
>(
LinkCrawlerQueue,
{
- run: runCrawler,
+ run: withWorkerTracing("crawlerWorker.run", runCrawler),
onComplete: async (job) => {
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
@@ -427,22 +432,29 @@ async function browserlessCrawlPage(
url: string,
abortSignal: AbortSignal,
) {
- logger.info(
- `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
- );
- const response = await fetchWithProxy(url, {
- signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
- });
- logger.info(
- `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.browserlessCrawlPage",
+ { attributes: { url, jobId } },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
+ );
+ const response = await fetchWithProxy(url, {
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
+ });
+ logger.info(
+ `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
+ );
+ return {
+ htmlContent: await response.text(),
+ statusCode: response.status,
+ screenshot: undefined,
+ pdf: undefined,
+ url: response.url,
+ };
+ },
);
- return {
- htmlContent: await response.text(),
- statusCode: response.status,
- screenshot: undefined,
- pdf: undefined,
- url: response.url,
- };
}
async function crawlPage(
@@ -458,229 +470,243 @@ async function crawlPage(
statusCode: number;
url: string;
}> {
- // Check user's browser crawling setting
- const userData = await db.query.users.findFirst({
- where: eq(users.id, userId),
- columns: { browserCrawlingEnabled: true },
- });
- if (!userData) {
- logger.error(`[Crawler][${jobId}] User ${userId} not found`);
- throw new Error(`User ${userId} not found`);
- }
-
- const browserCrawlingEnabled = userData.browserCrawlingEnabled;
-
- if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) {
- return browserlessCrawlPage(jobId, url, abortSignal);
- }
-
- let browser: Browser | undefined;
- if (serverConfig.crawler.browserConnectOnDemand) {
- browser = await startBrowserInstance();
- } else {
- browser = globalBrowser;
- }
- if (!browser) {
- return browserlessCrawlPage(jobId, url, abortSignal);
- }
-
- const proxyConfig = getPlaywrightProxyConfig();
- const isRunningInProxyContext =
- proxyConfig !== undefined &&
- !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []);
- const context = await browser.newContext({
- viewport: { width: 1440, height: 900 },
- userAgent:
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
- proxy: proxyConfig,
- });
-
- try {
- if (globalCookies.length > 0) {
- await context.addCookies(globalCookies);
- logger.info(
- `[Crawler][${jobId}] Cookies successfully loaded into browser context`,
- );
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.crawlPage",
+ { attributes: { url, jobId, userId, forceStorePdf } },
+ async () => {
+ // Check user's browser crawling setting
+ const userData = await db.query.users.findFirst({
+ where: eq(users.id, userId),
+ columns: { browserCrawlingEnabled: true },
+ });
+ if (!userData) {
+ logger.error(`[Crawler][${jobId}] User ${userId} not found`);
+ throw new Error(`User ${userId} not found`);
+ }
- // Create a new page in the context
- const page = await context.newPage();
+ const browserCrawlingEnabled = userData.browserCrawlingEnabled;
- // Apply ad blocking
- if (globalBlocker) {
- await globalBlocker.enableBlockingInPage(page);
- }
+ if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) {
+ return browserlessCrawlPage(jobId, url, abortSignal);
+ }
- // Block audio/video resources and disallowed sub-requests
- await page.route("**/*", async (route) => {
- if (abortSignal.aborted) {
- await route.abort("aborted");
- return;
+ let browser: Browser | undefined;
+ if (serverConfig.crawler.browserConnectOnDemand) {
+ browser = await startBrowserInstance();
+ } else {
+ browser = globalBrowser;
}
- const request = route.request();
- const resourceType = request.resourceType();
-
- // Block audio/video resources
- if (
- resourceType === "media" ||
- request.headers()["content-type"]?.includes("video/") ||
- request.headers()["content-type"]?.includes("audio/")
- ) {
- await route.abort("aborted");
- return;
+ if (!browser) {
+ return browserlessCrawlPage(jobId, url, abortSignal);
}
- const requestUrl = request.url();
- const requestIsRunningInProxyContext =
+ const proxyConfig = getPlaywrightProxyConfig();
+ const isRunningInProxyContext =
proxyConfig !== undefined &&
- !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []);
- if (
- requestUrl.startsWith("http://") ||
- requestUrl.startsWith("https://")
- ) {
- const validation = await validateUrl(
- requestUrl,
- requestIsRunningInProxyContext,
- );
- if (!validation.ok) {
- logger.warn(
- `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`,
+ !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []);
+ const context = await browser.newContext({
+ viewport: { width: 1440, height: 900 },
+ userAgent:
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
+ proxy: proxyConfig,
+ });
+
+ try {
+ if (globalCookies.length > 0) {
+ await context.addCookies(globalCookies);
+ logger.info(
+ `[Crawler][${jobId}] Cookies successfully loaded into browser context`,
);
- await route.abort("blockedbyclient");
- return;
}
- }
- // Continue with other requests
- await route.continue();
- });
+ // Create a new page in the context
+ const page = await context.newPage();
- // Navigate to the target URL
- const navigationValidation = await validateUrl(
- url,
- isRunningInProxyContext,
- );
- if (!navigationValidation.ok) {
- throw new Error(
- `Disallowed navigation target "${url}": ${navigationValidation.reason}`,
- );
- }
- const targetUrl = navigationValidation.url.toString();
- logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`);
- const response = await Promise.race([
- page.goto(targetUrl, {
- timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
- waitUntil: "domcontentloaded",
- }),
- abortPromise(abortSignal).then(() => null),
- ]);
+ // Apply ad blocking
+ if (globalBlocker) {
+ await globalBlocker.enableBlockingInPage(page);
+ }
- logger.info(
- `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`,
- );
+ // Block audio/video resources and disallowed sub-requests
+ await page.route("**/*", async (route) => {
+ if (abortSignal.aborted) {
+ await route.abort("aborted");
+ return;
+ }
+ const request = route.request();
+ const resourceType = request.resourceType();
+
+ // Block audio/video resources
+ if (
+ resourceType === "media" ||
+ request.headers()["content-type"]?.includes("video/") ||
+ request.headers()["content-type"]?.includes("audio/")
+ ) {
+ await route.abort("aborted");
+ return;
+ }
- // Wait until network is relatively idle or timeout after 5 seconds
- await Promise.race([
- page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})),
- new Promise((resolve) => setTimeout(resolve, 5000)),
- abortPromise(abortSignal),
- ]);
+ const requestUrl = request.url();
+ const requestIsRunningInProxyContext =
+ proxyConfig !== undefined &&
+ !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []);
+ if (
+ requestUrl.startsWith("http://") ||
+ requestUrl.startsWith("https://")
+ ) {
+ const validation = await validateUrl(
+ requestUrl,
+ requestIsRunningInProxyContext,
+ );
+ if (!validation.ok) {
+ logger.warn(
+ `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`,
+ );
+ await route.abort("blockedbyclient");
+ return;
+ }
+ }
- abortSignal.throwIfAborted();
+ // Continue with other requests
+ await route.continue();
+ });
- logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`);
+ // Navigate to the target URL
+ const navigationValidation = await validateUrl(
+ url,
+ isRunningInProxyContext,
+ );
+ if (!navigationValidation.ok) {
+ throw new Error(
+ `Disallowed navigation target "${url}": ${navigationValidation.reason}`,
+ );
+ }
+ const targetUrl = navigationValidation.url.toString();
+ logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`);
+ const response = await Promise.race([
+ page.goto(targetUrl, {
+ timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
+ waitUntil: "domcontentloaded",
+ }),
+ abortPromise(abortSignal).then(() => null),
+ ]);
- // Extract content from the page
- const htmlContent = await page.content();
+ logger.info(
+ `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`,
+ );
- abortSignal.throwIfAborted();
+ // Wait until network is relatively idle or timeout after 5 seconds
+ await Promise.race([
+ page
+ .waitForLoadState("networkidle", { timeout: 5000 })
+ .catch(() => ({})),
+ new Promise((resolve) => setTimeout(resolve, 5000)),
+ abortPromise(abortSignal),
+ ]);
- logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`);
+ abortSignal.throwIfAborted();
- // Take a screenshot if configured
- let screenshot: Buffer | undefined = undefined;
- if (serverConfig.crawler.storeScreenshot) {
- const { data: screenshotData, error: screenshotError } = await tryCatch(
- Promise.race<Buffer>([
- page.screenshot({
- // If you change this, you need to change the asset type in the store function.
- type: "jpeg",
- fullPage: serverConfig.crawler.fullPageScreenshot,
- quality: 80,
- }),
- new Promise((_, reject) =>
- setTimeout(
- () =>
- reject(
- "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
- ),
- serverConfig.crawler.screenshotTimeoutSec * 1000,
- ),
- ),
- abortPromise(abortSignal).then(() => Buffer.from("")),
- ]),
- );
- abortSignal.throwIfAborted();
- if (screenshotError) {
- logger.warn(
- `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`,
- );
- } else {
logger.info(
- `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`,
+ `[Crawler][${jobId}] Finished waiting for the page to load.`,
);
- screenshot = screenshotData;
- }
- }
- // Capture PDF if configured or explicitly requested
- let pdf: Buffer | undefined = undefined;
- if (serverConfig.crawler.storePdf || forceStorePdf) {
- const { data: pdfData, error: pdfError } = await tryCatch(
- Promise.race<Buffer>([
- page.pdf({
- format: "A4",
- printBackground: true,
- }),
- new Promise((_, reject) =>
- setTimeout(
- () =>
- reject(
- "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
- ),
- serverConfig.crawler.screenshotTimeoutSec * 1000,
- ),
- ),
- abortPromise(abortSignal).then(() => Buffer.from("")),
- ]),
- );
- abortSignal.throwIfAborted();
- if (pdfError) {
- logger.warn(
- `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`,
- );
- } else {
+ // Extract content from the page
+ const htmlContent = await page.content();
+
+ abortSignal.throwIfAborted();
+
logger.info(
- `[Crawler][${jobId}] Finished capturing page content as PDF`,
+ `[Crawler][${jobId}] Successfully fetched the page content.`,
);
- pdf = pdfData;
- }
- }
- return {
- htmlContent,
- statusCode: response?.status() ?? 0,
- screenshot,
- pdf,
- url: page.url(),
- };
- } finally {
- await context.close();
- // Only close the browser if it was created on demand
- if (serverConfig.crawler.browserConnectOnDemand) {
- await browser.close();
- }
- }
+ // Take a screenshot if configured
+ let screenshot: Buffer | undefined = undefined;
+ if (serverConfig.crawler.storeScreenshot) {
+ const { data: screenshotData, error: screenshotError } =
+ await tryCatch(
+ Promise.race<Buffer>([
+ page.screenshot({
+ // If you change this, you need to change the asset type in the store function.
+ type: "jpeg",
+ fullPage: serverConfig.crawler.fullPageScreenshot,
+ quality: 80,
+ }),
+ new Promise((_, reject) =>
+ setTimeout(
+ () =>
+ reject(
+ "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
+ ),
+ serverConfig.crawler.screenshotTimeoutSec * 1000,
+ ),
+ ),
+ abortPromise(abortSignal).then(() => Buffer.from("")),
+ ]),
+ );
+ abortSignal.throwIfAborted();
+ if (screenshotError) {
+ logger.warn(
+ `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`,
+ );
+ } else {
+ logger.info(
+ `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`,
+ );
+ screenshot = screenshotData;
+ }
+ }
+
+ // Capture PDF if configured or explicitly requested
+ let pdf: Buffer | undefined = undefined;
+ if (serverConfig.crawler.storePdf || forceStorePdf) {
+ const { data: pdfData, error: pdfError } = await tryCatch(
+ Promise.race<Buffer>([
+ page.pdf({
+ format: "A4",
+ printBackground: true,
+ }),
+ new Promise((_, reject) =>
+ setTimeout(
+ () =>
+ reject(
+ "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
+ ),
+ serverConfig.crawler.screenshotTimeoutSec * 1000,
+ ),
+ ),
+ abortPromise(abortSignal).then(() => Buffer.from("")),
+ ]),
+ );
+ abortSignal.throwIfAborted();
+ if (pdfError) {
+ logger.warn(
+ `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`,
+ );
+ } else {
+ logger.info(
+ `[Crawler][${jobId}] Finished capturing page content as PDF`,
+ );
+ pdf = pdfData;
+ }
+ }
+
+ return {
+ htmlContent,
+ statusCode: response?.status() ?? 0,
+ screenshot,
+ pdf,
+ url: page.url(),
+ };
+ } finally {
+ await context.close();
+ // Only close the browser if it was created on demand
+ if (serverConfig.crawler.browserConnectOnDemand) {
+ await browser.close();
+ }
+ }
+ },
+ );
}
async function extractMetadata(
@@ -688,54 +714,70 @@ async function extractMetadata(
url: string,
jobId: string,
) {
- logger.info(
- `[Crawler][${jobId}] Will attempt to extract metadata from page ...`,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.extractMetadata",
+ { attributes: { url, jobId } },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Will attempt to extract metadata from page ...`,
+ );
+ const meta = await metascraperParser({
+ url,
+ html: htmlContent,
+ // We don't want to validate the URL again as we've already done it by visiting the page.
+ // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params).
+ validateUrl: false,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Done extracting metadata from the page.`,
+ );
+ return meta;
+ },
);
- const meta = await metascraperParser({
- url,
- html: htmlContent,
- // We don't want to validate the URL again as we've already done it by visiting the page.
- // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params).
- validateUrl: false,
- });
- logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`);
- return meta;
}
-function extractReadableContent(
+async function extractReadableContent(
htmlContent: string,
url: string,
jobId: string,
) {
- logger.info(
- `[Crawler][${jobId}] Will attempt to extract readable content ...`,
- );
- const virtualConsole = new VirtualConsole();
- const dom = new JSDOM(htmlContent, { url, virtualConsole });
- let result: { content: string } | null = null;
- try {
- const readableContent = new Readability(dom.window.document).parse();
- if (!readableContent || typeof readableContent.content !== "string") {
- return null;
- }
-
- const purifyWindow = new JSDOM("").window;
- try {
- const purify = DOMPurify(purifyWindow);
- const purifiedHTML = purify.sanitize(readableContent.content);
+ return await withSpan(
+ tracer,
+ "crawlerWorker.extractReadableContent",
+ { attributes: { url, jobId } },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Will attempt to extract readable content ...`,
+ );
+ const virtualConsole = new VirtualConsole();
+ const dom = new JSDOM(htmlContent, { url, virtualConsole });
+ let result: { content: string } | null = null;
+ try {
+ const readableContent = new Readability(dom.window.document).parse();
+ if (!readableContent || typeof readableContent.content !== "string") {
+ return null;
+ }
- logger.info(`[Crawler][${jobId}] Done extracting readable content.`);
- result = {
- content: purifiedHTML,
- };
- } finally {
- purifyWindow.close();
- }
- } finally {
- dom.window.close();
- }
+ const purifyWindow = new JSDOM("").window;
+ try {
+ const purify = DOMPurify(purifyWindow);
+ const purifiedHTML = purify.sanitize(readableContent.content);
+
+ logger.info(`[Crawler][${jobId}] Done extracting readable content.`);
+ result = {
+ content: purifiedHTML,
+ };
+ } finally {
+ purifyWindow.close();
+ }
+ } finally {
+ dom.window.close();
+ }
- return result;
+ return result;
+ },
+ );
}
async function storeScreenshot(
@@ -743,45 +785,58 @@ async function storeScreenshot(
userId: string,
jobId: string,
) {
- if (!serverConfig.crawler.storeScreenshot) {
- logger.info(
- `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`,
- );
- return null;
- }
- if (!screenshot) {
- logger.info(
- `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`,
- );
- return null;
- }
- const assetId = newAssetId();
- const contentType = "image/jpeg";
- const fileName = "screenshot.jpeg";
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storeScreenshot",
+ {
+ attributes: {
+ jobId,
+ userId,
+ size: screenshot?.byteLength ?? 0,
+ },
+ },
+ async () => {
+ if (!serverConfig.crawler.storeScreenshot) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`,
+ );
+ return null;
+ }
+ if (!screenshot) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`,
+ );
+ return null;
+ }
+ const assetId = newAssetId();
+ const contentType = "image/jpeg";
+ const fileName = "screenshot.jpeg";
- // Check storage quota before saving the screenshot
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, screenshot.byteLength),
- );
+ // Check storage quota before saving the screenshot
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, screenshot.byteLength),
+ );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`,
- );
- return null;
- }
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
- await saveAsset({
- userId,
- assetId,
- metadata: { contentType, fileName },
- asset: screenshot,
- quotaApproved,
- });
- logger.info(
- `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`,
+ await saveAsset({
+ userId,
+ assetId,
+ metadata: { contentType, fileName },
+ asset: screenshot,
+ quotaApproved,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`,
+ );
+ return { assetId, contentType, fileName, size: screenshot.byteLength };
+ },
);
- return { assetId, contentType, fileName, size: screenshot.byteLength };
}
async function storePdf(
@@ -789,37 +844,52 @@ async function storePdf(
userId: string,
jobId: string,
) {
- if (!pdf) {
- logger.info(`[Crawler][${jobId}] Skipping storing the PDF as it's empty.`);
- return null;
- }
- const assetId = newAssetId();
- const contentType = "application/pdf";
- const fileName = "page.pdf";
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storePdf",
+ {
+ attributes: {
+ jobId,
+ userId,
+ size: pdf?.byteLength ?? 0,
+ },
+ },
+ async () => {
+ if (!pdf) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the PDF as it's empty.`,
+ );
+ return null;
+ }
+ const assetId = newAssetId();
+ const contentType = "application/pdf";
+ const fileName = "page.pdf";
- // Check storage quota before saving the PDF
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, pdf.byteLength),
- );
+ // Check storage quota before saving the PDF
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, pdf.byteLength),
+ );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`,
- );
- return null;
- }
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
- await saveAsset({
- userId,
- assetId,
- metadata: { contentType, fileName },
- asset: pdf,
- quotaApproved,
- });
- logger.info(
- `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`,
+ await saveAsset({
+ userId,
+ assetId,
+ metadata: { contentType, fileName },
+ asset: pdf,
+ quotaApproved,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`,
+ );
+ return { assetId, contentType, fileName, size: pdf.byteLength };
+ },
);
- return { assetId, contentType, fileName, size: pdf.byteLength };
}
async function downloadAndStoreFile(
@@ -829,91 +899,98 @@ async function downloadAndStoreFile(
fileType: string,
abortSignal: AbortSignal,
) {
- let assetPath: string | undefined;
- try {
- logger.info(
- `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`,
- );
- const response = await fetchWithProxy(url, {
- signal: abortSignal,
- });
- if (!response.ok || response.body == null) {
- throw new Error(`Failed to download ${fileType}: ${response.status}`);
- }
-
- const contentType = normalizeContentType(
- response.headers.get("content-type"),
- );
- if (!contentType) {
- throw new Error("No content type in the response");
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.downloadAndStoreFile",
+ { attributes: { url, jobId, userId, fileType } },
+ async () => {
+ let assetPath: string | undefined;
+ try {
+ logger.info(
+ `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`,
+ );
+ const response = await fetchWithProxy(url, {
+ signal: abortSignal,
+ });
+ if (!response.ok || response.body == null) {
+ throw new Error(`Failed to download ${fileType}: ${response.status}`);
+ }
- const assetId = newAssetId();
- assetPath = path.join(os.tmpdir(), assetId);
-
- let bytesRead = 0;
- const contentLengthEnforcer = new Transform({
- transform(chunk, _, callback) {
- bytesRead += chunk.length;
-
- if (abortSignal.aborted) {
- callback(new Error("AbortError"));
- } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) {
- callback(
- new Error(
- `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`,
- ),
- );
- } else {
- callback(null, chunk); // pass data along unchanged
+ const contentType = normalizeContentType(
+ response.headers.get("content-type"),
+ );
+ if (!contentType) {
+ throw new Error("No content type in the response");
}
- },
- flush(callback) {
- callback();
- },
- });
- await pipeline(
- response.body,
- contentLengthEnforcer,
- fsSync.createWriteStream(assetPath),
- );
+ const assetId = newAssetId();
+ assetPath = path.join(os.tmpdir(), assetId);
- // Check storage quota before saving the asset
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, bytesRead),
- );
+ let bytesRead = 0;
+ const contentLengthEnforcer = new Transform({
+ transform(chunk, _, callback) {
+ bytesRead += chunk.length;
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`,
- );
- return null;
- }
+ if (abortSignal.aborted) {
+ callback(new Error("AbortError"));
+ } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) {
+ callback(
+ new Error(
+ `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`,
+ ),
+ );
+ } else {
+ callback(null, chunk); // pass data along unchanged
+ }
+ },
+ flush(callback) {
+ callback();
+ },
+ });
- await saveAssetFromFile({
- userId,
- assetId,
- metadata: { contentType },
- assetPath,
- quotaApproved,
- });
+ await pipeline(
+ response.body,
+ contentLengthEnforcer,
+ fsSync.createWriteStream(assetPath),
+ );
- logger.info(
- `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`,
- );
+ // Check storage quota before saving the asset
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, bytesRead),
+ );
- return { assetId, userId, contentType, size: bytesRead };
- } catch (e) {
- logger.error(
- `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
- );
- return null;
- } finally {
- if (assetPath) {
- await tryCatch(fs.unlink(assetPath));
- }
- }
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
+
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ metadata: { contentType },
+ assetPath,
+ quotaApproved,
+ });
+
+ logger.info(
+ `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`,
+ );
+
+ return { assetId, userId, contentType, size: bytesRead };
+ } catch (e) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
+ );
+ return null;
+ } finally {
+ if (assetPath) {
+ await tryCatch(fs.unlink(assetPath));
+ }
+ }
+ },
+ );
}
async function downloadAndStoreImage(
@@ -938,77 +1015,84 @@ async function archiveWebpage(
jobId: string,
abortSignal: AbortSignal,
) {
- logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
- const assetId = newAssetId();
- const assetPath = path.join(os.tmpdir(), assetId);
-
- let res = await execa({
- input: html,
- cancelSignal: abortSignal,
- env: {
- https_proxy: serverConfig.proxy.httpsProxy
- ? getRandomProxy(serverConfig.proxy.httpsProxy)
- : undefined,
- http_proxy: serverConfig.proxy.httpProxy
- ? getRandomProxy(serverConfig.proxy.httpProxy)
- : undefined,
- no_proxy: serverConfig.proxy.noProxy?.join(","),
- },
- })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
+ return await withSpan(
+ tracer,
+ "crawlerWorker.archiveWebpage",
+ { attributes: { url, jobId, userId } },
+ async () => {
+ logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
+ const assetId = newAssetId();
+ const assetPath = path.join(os.tmpdir(), assetId);
+
+ let res = await execa({
+ input: html,
+ cancelSignal: abortSignal,
+ env: {
+ https_proxy: serverConfig.proxy.httpsProxy
+ ? getRandomProxy(serverConfig.proxy.httpsProxy)
+ : undefined,
+ http_proxy: serverConfig.proxy.httpProxy
+ ? getRandomProxy(serverConfig.proxy.httpProxy)
+ : undefined,
+ no_proxy: serverConfig.proxy.noProxy?.join(","),
+ },
+ })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
- if (res.isCanceled) {
- logger.error(
- `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ if (res.isCanceled) {
+ logger.error(
+ `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- if (res.exitCode !== 0) {
- logger.error(
- `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ if (res.exitCode !== 0) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- const contentType = "text/html";
+ const contentType = "text/html";
- // Get file size and check quota before saving
- const stats = await fs.stat(assetPath);
- const fileSize = stats.size;
+ // Get file size and check quota before saving
+ const stats = await fs.stat(assetPath);
+ const fileSize = stats.size;
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, fileSize),
- );
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, fileSize),
+ );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- await saveAssetFromFile({
- userId,
- assetId,
- assetPath,
- metadata: {
- contentType,
- },
- quotaApproved,
- });
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata: {
+ contentType,
+ },
+ quotaApproved,
+ });
- logger.info(
- `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
- );
+ logger.info(
+ `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
+ );
- return {
- assetId,
- contentType,
- size: await getAssetSize({ userId, assetId }),
- };
+ return {
+ assetId,
+ contentType,
+ size: await getAssetSize({ userId, assetId }),
+ };
+ },
+ );
}
async function getContentType(
@@ -1016,26 +1100,33 @@ async function getContentType(
jobId: string,
abortSignal: AbortSignal,
): Promise<string | null> {
- try {
- logger.info(
- `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`,
- );
- const response = await fetchWithProxy(url, {
- method: "GET",
- signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
- });
- const rawContentType = response.headers.get("content-type");
- const contentType = normalizeContentType(rawContentType);
- logger.info(
- `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`,
- );
- return contentType;
- } catch (e) {
- logger.error(
- `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`,
- );
- return null;
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.getContentType",
+ { attributes: { url, jobId } },
+ async () => {
+ try {
+ logger.info(
+ `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`,
+ );
+ const response = await fetchWithProxy(url, {
+ method: "GET",
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
+ });
+ const rawContentType = response.headers.get("content-type");
+ const contentType = normalizeContentType(rawContentType);
+ logger.info(
+ `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`,
+ );
+ return contentType;
+ } catch (e) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`,
+ );
+ return null;
+ }
+ },
+ );
}
/**
@@ -1054,53 +1145,60 @@ async function handleAsAssetBookmark(
bookmarkId: string,
abortSignal: AbortSignal,
) {
- const downloaded = await downloadAndStoreFile(
- url,
- userId,
- jobId,
- assetType,
- abortSignal,
- );
- if (!downloaded) {
- return;
- }
- const fileName = path.basename(new URL(url).pathname);
- await db.transaction(async (trx) => {
- await updateAsset(
- undefined,
- {
- id: downloaded.assetId,
- bookmarkId,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.handleAsAssetBookmark",
+ { attributes: { url, jobId, userId, bookmarkId, assetType } },
+ async () => {
+ const downloaded = await downloadAndStoreFile(
+ url,
userId,
- assetType: AssetTypes.BOOKMARK_ASSET,
- contentType: downloaded.contentType,
- size: downloaded.size,
- fileName,
- },
- trx,
- );
- await trx.insert(bookmarkAssets).values({
- id: bookmarkId,
- assetType,
- assetId: downloaded.assetId,
- content: null,
- fileName,
- sourceUrl: url,
- });
- // Switch the type of the bookmark from LINK to ASSET
- await trx
- .update(bookmarks)
- .set({ type: BookmarkTypes.ASSET })
- .where(eq(bookmarks.id, bookmarkId));
- await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
- });
- await AssetPreprocessingQueue.enqueue(
- {
- bookmarkId,
- fixMode: false,
- },
- {
- groupId: userId,
+ jobId,
+ assetType,
+ abortSignal,
+ );
+ if (!downloaded) {
+ return;
+ }
+ const fileName = path.basename(new URL(url).pathname);
+ await db.transaction(async (trx) => {
+ await updateAsset(
+ undefined,
+ {
+ id: downloaded.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.BOOKMARK_ASSET,
+ contentType: downloaded.contentType,
+ size: downloaded.size,
+ fileName,
+ },
+ trx,
+ );
+ await trx.insert(bookmarkAssets).values({
+ id: bookmarkId,
+ assetType,
+ assetId: downloaded.assetId,
+ content: null,
+ fileName,
+ sourceUrl: url,
+ });
+ // Switch the type of the bookmark from LINK to ASSET
+ await trx
+ .update(bookmarks)
+ .set({ type: BookmarkTypes.ASSET })
+ .where(eq(bookmarks.id, bookmarkId));
+ await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
+ });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId,
+ fixMode: false,
+ },
+ {
+ groupId: userId,
+ },
+ );
},
);
}
@@ -1115,60 +1213,73 @@ async function storeHtmlContent(
userId: string,
jobId: string,
): Promise<StoreHtmlResult> {
- if (!htmlContent) {
- return { result: "not_stored" };
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storeHtmlContent",
+ {
+ attributes: {
+ jobId,
+ userId,
+ contentSize: htmlContent ? Buffer.byteLength(htmlContent, "utf8") : 0,
+ },
+ },
+ async () => {
+ if (!htmlContent) {
+ return { result: "not_stored" };
+ }
- const contentSize = Buffer.byteLength(htmlContent, "utf8");
+ const contentSize = Buffer.byteLength(htmlContent, "utf8");
- // Only store in assets if content is >= 50KB
- if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) {
- logger.info(
- `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`,
- );
- return { result: "store_inline" };
- }
+ // Only store in assets if content is >= 50KB
+ if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) {
+ logger.info(
+ `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`,
+ );
+ return { result: "store_inline" };
+ }
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, contentSize),
- );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`,
- );
- return { result: "not_stored" };
- }
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, contentSize),
+ );
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return { result: "not_stored" };
+ }
- const assetId = newAssetId();
+ const assetId = newAssetId();
- const { error: saveError } = await tryCatch(
- saveAsset({
- userId,
- assetId,
- asset: Buffer.from(htmlContent, "utf8"),
- metadata: {
- contentType: ASSET_TYPES.TEXT_HTML,
- fileName: null,
- },
- quotaApproved,
- }),
- );
- if (saveError) {
- logger.error(
- `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`,
- );
- throw saveError;
- }
+ const { error: saveError } = await tryCatch(
+ saveAsset({
+ userId,
+ assetId,
+ asset: Buffer.from(htmlContent, "utf8"),
+ metadata: {
+ contentType: ASSET_TYPES.TEXT_HTML,
+ fileName: null,
+ },
+ quotaApproved,
+ }),
+ );
+ if (saveError) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`,
+ );
+ throw saveError;
+ }
- logger.info(
- `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`,
- );
+ logger.info(
+ `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`,
+ );
- return {
- result: "stored",
- assetId,
- size: contentSize,
- };
+ return {
+ result: "stored",
+ assetId,
+ size: contentSize,
+ };
+ },
+ );
}
async function crawlAndParseUrl(
@@ -1186,244 +1297,275 @@ async function crawlAndParseUrl(
forceStorePdf: boolean,
abortSignal: AbortSignal,
) {
- let result: {
- htmlContent: string;
- screenshot: Buffer | undefined;
- pdf: Buffer | undefined;
- statusCode: number | null;
- url: string;
- };
-
- if (precrawledArchiveAssetId) {
- logger.info(
- `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`,
- );
- const asset = await readAsset({
- userId,
- assetId: precrawledArchiveAssetId,
- });
- result = {
- htmlContent: asset.asset.toString(),
- screenshot: undefined,
- pdf: undefined,
- statusCode: 200,
- url,
- };
- } else {
- result = await crawlPage(jobId, url, userId, forceStorePdf, abortSignal);
- }
- abortSignal.throwIfAborted();
-
- const { htmlContent, screenshot, pdf, statusCode, url: browserUrl } = result;
-
- // Track status code in Prometheus
- if (statusCode !== null) {
- crawlerStatusCodeCounter.labels(statusCode.toString()).inc();
- }
-
- const meta = await Promise.race([
- extractMetadata(htmlContent, browserUrl, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
-
- let readableContent: { content: string } | null = meta.readableContentHtml
- ? { content: meta.readableContentHtml }
- : null;
- if (!readableContent) {
- readableContent = await Promise.race([
- extractReadableContent(
- meta.contentHtml ?? htmlContent,
- browserUrl,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.crawlAndParseUrl",
+ {
+ attributes: {
+ url,
jobId,
- ),
- abortPromise(abortSignal),
- ]);
- }
- abortSignal.throwIfAborted();
-
- const screenshotAssetInfo = await Promise.race([
- storeScreenshot(screenshot, userId, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
-
- const pdfAssetInfo = await Promise.race([
- storePdf(pdf, userId, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
-
- const htmlContentAssetInfo = await storeHtmlContent(
- readableContent?.content,
- userId,
- jobId,
- );
- abortSignal.throwIfAborted();
- let imageAssetInfo: DBAssetType | null = null;
- if (meta.image) {
- const downloaded = await downloadAndStoreImage(
- meta.image,
- userId,
- jobId,
- abortSignal,
- );
- if (downloaded) {
- imageAssetInfo = {
- id: downloaded.assetId,
- bookmarkId,
userId,
- assetType: AssetTypes.LINK_BANNER_IMAGE,
- contentType: downloaded.contentType,
- size: downloaded.size,
+ bookmarkId,
+ archiveFullPage,
+ forceStorePdf,
+ hasPrecrawledArchive: !!precrawledArchiveAssetId,
+ },
+ },
+ async () => {
+ let result: {
+ htmlContent: string;
+ screenshot: Buffer | undefined;
+ pdf: Buffer | undefined;
+ statusCode: number | null;
+ url: string;
};
- }
- }
- abortSignal.throwIfAborted();
- const parseDate = (date: string | undefined) => {
- if (!date) {
- return null;
- }
- try {
- return new Date(date);
- } catch {
- return null;
- }
- };
-
- // TODO(important): Restrict the size of content to store
- const assetDeletionTasks: Promise<void>[] = [];
- const inlineHtmlContent =
- htmlContentAssetInfo.result === "store_inline"
- ? (readableContent?.content ?? null)
- : null;
- readableContent = null;
- await db.transaction(async (txn) => {
- await txn
- .update(bookmarkLinks)
- .set({
- title: meta.title,
- description: meta.description,
- // Don't store data URIs as they're not valid URLs and are usually quite large
- imageUrl: meta.image?.startsWith("data:") ? null : meta.image,
- favicon: meta.logo,
- htmlContent: inlineHtmlContent,
- contentAssetId:
- htmlContentAssetInfo.result === "stored"
- ? htmlContentAssetInfo.assetId
- : null,
- crawledAt: new Date(),
- crawlStatusCode: statusCode,
- author: meta.author,
- publisher: meta.publisher,
- datePublished: parseDate(meta.datePublished),
- dateModified: parseDate(meta.dateModified),
- })
- .where(eq(bookmarkLinks.id, bookmarkId));
-
- if (screenshotAssetInfo) {
- await updateAsset(
- oldScreenshotAssetId,
- {
- id: screenshotAssetInfo.assetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_SCREENSHOT,
- contentType: screenshotAssetInfo.contentType,
- size: screenshotAssetInfo.size,
- fileName: screenshotAssetInfo.fileName,
- },
- txn,
- );
- assetDeletionTasks.push(silentDeleteAsset(userId, oldScreenshotAssetId));
- }
- if (pdfAssetInfo) {
- await updateAsset(
- oldPdfAssetId,
- {
- id: pdfAssetInfo.assetId,
- bookmarkId,
+ if (precrawledArchiveAssetId) {
+ logger.info(
+ `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`,
+ );
+ const asset = await readAsset({
userId,
- assetType: AssetTypes.LINK_PDF,
- contentType: pdfAssetInfo.contentType,
- size: pdfAssetInfo.size,
- fileName: pdfAssetInfo.fileName,
- },
- txn,
- );
- assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId));
- }
- if (imageAssetInfo) {
- await updateAsset(oldImageAssetId, imageAssetInfo, txn);
- assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId));
- }
- if (htmlContentAssetInfo.result === "stored") {
- await updateAsset(
- oldContentAssetId,
- {
- id: htmlContentAssetInfo.assetId,
- bookmarkId,
+ assetId: precrawledArchiveAssetId,
+ });
+ result = {
+ htmlContent: asset.asset.toString(),
+ screenshot: undefined,
+ pdf: undefined,
+ statusCode: 200,
+ url,
+ };
+ } else {
+ result = await crawlPage(
+ jobId,
+ url,
userId,
- assetType: AssetTypes.LINK_HTML_CONTENT,
- contentType: ASSET_TYPES.TEXT_HTML,
- size: htmlContentAssetInfo.size,
- fileName: null,
- },
- txn,
- );
- assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
- } else if (oldContentAssetId) {
- // Unlink the old content asset
- await txn.delete(assets).where(eq(assets.id, oldContentAssetId));
- assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
- }
- });
-
- // Delete the old assets if any
- await Promise.all(assetDeletionTasks);
+ forceStorePdf,
+ abortSignal,
+ );
+ }
+ abortSignal.throwIfAborted();
- return async () => {
- if (
- !precrawledArchiveAssetId &&
- (serverConfig.crawler.fullPageArchive || archiveFullPage)
- ) {
- const archiveResult = await archiveWebpage(
+ const {
htmlContent,
- browserUrl,
+ screenshot,
+ pdf,
+ statusCode,
+ url: browserUrl,
+ } = result;
+
+ // Track status code in Prometheus
+ if (statusCode !== null) {
+ crawlerStatusCodeCounter.labels(statusCode.toString()).inc();
+ }
+
+ const meta = await Promise.race([
+ extractMetadata(htmlContent, browserUrl, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
+
+ let readableContent: { content: string } | null = meta.readableContentHtml
+ ? { content: meta.readableContentHtml }
+ : null;
+ if (!readableContent) {
+ readableContent = await Promise.race([
+ extractReadableContent(
+ meta.contentHtml ?? htmlContent,
+ browserUrl,
+ jobId,
+ ),
+ abortPromise(abortSignal),
+ ]);
+ }
+ abortSignal.throwIfAborted();
+
+ const screenshotAssetInfo = await Promise.race([
+ storeScreenshot(screenshot, userId, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
+
+ const pdfAssetInfo = await Promise.race([
+ storePdf(pdf, userId, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
+
+ const htmlContentAssetInfo = await storeHtmlContent(
+ readableContent?.content,
userId,
jobId,
- abortSignal,
);
+ abortSignal.throwIfAborted();
+ let imageAssetInfo: DBAssetType | null = null;
+ if (meta.image) {
+ const downloaded = await downloadAndStoreImage(
+ meta.image,
+ userId,
+ jobId,
+ abortSignal,
+ );
+ if (downloaded) {
+ imageAssetInfo = {
+ id: downloaded.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_BANNER_IMAGE,
+ contentType: downloaded.contentType,
+ size: downloaded.size,
+ };
+ }
+ }
+ abortSignal.throwIfAborted();
- if (archiveResult) {
- const {
- assetId: fullPageArchiveAssetId,
- size,
- contentType,
- } = archiveResult;
+ const parseDate = (date: string | undefined) => {
+ if (!date) {
+ return null;
+ }
+ try {
+ return new Date(date);
+ } catch {
+ return null;
+ }
+ };
- await db.transaction(async (txn) => {
+ // TODO(important): Restrict the size of content to store
+ const assetDeletionTasks: Promise<void>[] = [];
+ const inlineHtmlContent =
+ htmlContentAssetInfo.result === "store_inline"
+ ? (readableContent?.content ?? null)
+ : null;
+ readableContent = null;
+ await db.transaction(async (txn) => {
+ await txn
+ .update(bookmarkLinks)
+ .set({
+ title: meta.title,
+ description: meta.description,
+ // Don't store data URIs as they're not valid URLs and are usually quite large
+ imageUrl: meta.image?.startsWith("data:") ? null : meta.image,
+ favicon: meta.logo,
+ htmlContent: inlineHtmlContent,
+ contentAssetId:
+ htmlContentAssetInfo.result === "stored"
+ ? htmlContentAssetInfo.assetId
+ : null,
+ crawledAt: new Date(),
+ crawlStatusCode: statusCode,
+ author: meta.author,
+ publisher: meta.publisher,
+ datePublished: parseDate(meta.datePublished),
+ dateModified: parseDate(meta.dateModified),
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
+
+ if (screenshotAssetInfo) {
await updateAsset(
- oldFullPageArchiveAssetId,
+ oldScreenshotAssetId,
{
- id: fullPageArchiveAssetId,
+ id: screenshotAssetInfo.assetId,
bookmarkId,
userId,
- assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
- contentType,
- size,
+ assetType: AssetTypes.LINK_SCREENSHOT,
+ contentType: screenshotAssetInfo.contentType,
+ size: screenshotAssetInfo.size,
+ fileName: screenshotAssetInfo.fileName,
+ },
+ txn,
+ );
+ assetDeletionTasks.push(
+ silentDeleteAsset(userId, oldScreenshotAssetId),
+ );
+ }
+ if (pdfAssetInfo) {
+ await updateAsset(
+ oldPdfAssetId,
+ {
+ id: pdfAssetInfo.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_PDF,
+ contentType: pdfAssetInfo.contentType,
+ size: pdfAssetInfo.size,
+ fileName: pdfAssetInfo.fileName,
+ },
+ txn,
+ );
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId));
+ }
+ if (imageAssetInfo) {
+ await updateAsset(oldImageAssetId, imageAssetInfo, txn);
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId));
+ }
+ if (htmlContentAssetInfo.result === "stored") {
+ await updateAsset(
+ oldContentAssetId,
+ {
+ id: htmlContentAssetInfo.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_HTML_CONTENT,
+ contentType: ASSET_TYPES.TEXT_HTML,
+ size: htmlContentAssetInfo.size,
fileName: null,
},
txn,
);
- });
- if (oldFullPageArchiveAssetId) {
- await silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
+ } else if (oldContentAssetId) {
+ // Unlink the old content asset
+ await txn.delete(assets).where(eq(assets.id, oldContentAssetId));
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
}
- }
- }
- };
+ });
+
+ // Delete the old assets if any
+ await Promise.all(assetDeletionTasks);
+
+ return async () => {
+ if (
+ !precrawledArchiveAssetId &&
+ (serverConfig.crawler.fullPageArchive || archiveFullPage)
+ ) {
+ const archiveResult = await archiveWebpage(
+ htmlContent,
+ browserUrl,
+ userId,
+ jobId,
+ abortSignal,
+ );
+
+ if (archiveResult) {
+ const {
+ assetId: fullPageArchiveAssetId,
+ size,
+ contentType,
+ } = archiveResult;
+
+ await db.transaction(async (txn) => {
+ await updateAsset(
+ oldFullPageArchiveAssetId,
+ {
+ id: fullPageArchiveAssetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
+ contentType,
+ size,
+ fileName: null,
+ },
+ txn,
+ );
+ });
+ if (oldFullPageArchiveAssetId) {
+ await silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ }
+ }
+ }
+ };
+ },
+ );
}
/**
@@ -1431,39 +1573,47 @@ async function crawlAndParseUrl(
* @throws {QueueRetryAfterError} if the domain is rate limited
*/
async function checkDomainRateLimit(url: string, jobId: string): Promise<void> {
- const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
- if (!crawlerDomainRateLimitConfig) {
- return;
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.checkDomainRateLimit",
+ { attributes: { url, jobId } },
+ async () => {
+ const crawlerDomainRateLimitConfig =
+ serverConfig.crawler.domainRatelimiting;
+ if (!crawlerDomainRateLimitConfig) {
+ return;
+ }
- const rateLimitClient = await getRateLimitClient();
- if (!rateLimitClient) {
- return;
- }
+ const rateLimitClient = await getRateLimitClient();
+ if (!rateLimitClient) {
+ return;
+ }
- const hostname = new URL(url).hostname;
- const rateLimitResult = rateLimitClient.checkRateLimit(
- {
- name: "domain-ratelimit",
- maxRequests: crawlerDomainRateLimitConfig.maxRequests,
- windowMs: crawlerDomainRateLimitConfig.windowMs,
+ const hostname = new URL(url).hostname;
+ const rateLimitResult = rateLimitClient.checkRateLimit(
+ {
+ name: "domain-ratelimit",
+ maxRequests: crawlerDomainRateLimitConfig.maxRequests,
+ windowMs: crawlerDomainRateLimitConfig.windowMs,
+ },
+ hostname,
+ );
+
+ if (!rateLimitResult.allowed) {
+ const resetInSeconds = rateLimitResult.resetInSeconds;
+ // Add jitter to prevent thundering herd: +40% random variation
+ const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
+ const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
+ logger.info(
+ `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
+ );
+ throw new QueueRetryAfterError(
+ `Domain "${hostname}" is rate limited`,
+ delayMs,
+ );
+ }
},
- hostname,
);
-
- if (!rateLimitResult.allowed) {
- const resetInSeconds = rateLimitResult.resetInSeconds;
- // Add jitter to prevent thundering herd: +40% random variation
- const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
- const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
- logger.info(
- `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
- );
- throw new QueueRetryAfterError(
- `Domain "${hostname}" is rate limited`,
- delayMs,
- );
- }
}
async function runCrawler(
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 2a1334a9..cd4f3e82 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -4,6 +4,7 @@ import { fetchWithProxy } from "network";
import cron from "node-cron";
import Parser from "rss-parser";
import { buildImpersonatingTRPCClient } from "trpc";
+import { withWorkerTracing } from "workerTracing";
import { z } from "zod";
import type { ZFeedRequestSchema } from "@karakeep/shared-server";
@@ -88,7 +89,7 @@ export class FeedWorker {
const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>(
FeedQueue,
{
- run: run,
+ run: withWorkerTracing("feedWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("feed", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
index eefc1dd8..57ad1a22 100644
--- a/apps/workers/workers/inference/inferenceWorker.ts
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import type { ZOpenAIRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -42,7 +43,7 @@ export class OpenAiWorker {
const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>(
OpenAIQueue,
{
- run: runOpenAI,
+ run: withWorkerTracing("inferenceWorker.run", runOpenAI),
onComplete: async (job) => {
workerStatsCounter.labels("inference", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 98a9de74..00e20127 100644
--- a/apps/workers/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
@@ -1,6 +1,7 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import { buildImpersonatingAuthedContext } from "trpc";
+import { withWorkerTracing } from "workerTracing";
import type { ZRuleEngineRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -20,7 +21,7 @@ export class RuleEngineWorker {
const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>(
RuleEngineQueue,
{
- run: runRuleEngine,
+ run: withWorkerTracing("ruleEngineWorker.run", runRuleEngine),
onComplete: (job) => {
workerStatsCounter.labels("ruleEngine", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index fed30c9b..7a9c3a8d 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import type { ZSearchIndexingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -25,7 +26,7 @@ export class SearchIndexingWorker {
(await getQueueClient())!.createRunner<ZSearchIndexingRequest>(
SearchIndexingQueue,
{
- run: runSearchIndexing,
+ run: withWorkerTracing("searchWorker.run", runSearchIndexing),
onComplete: (job) => {
workerStatsCounter.labels("search", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index 03525fdf..1ffbf674 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -4,6 +4,7 @@ import path from "path";
import { execa } from "execa";
import { workerStatsCounter } from "metrics";
import { getProxyAgent, validateUrl } from "network";
+import { withWorkerTracing } from "workerTracing";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
@@ -35,7 +36,7 @@ export class VideoWorker {
return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
- run: runWorker,
+ run: withWorkerTracing("videoWorker.run", runWorker),
onComplete: async (job) => {
workerStatsCounter.labels("video", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 0d661372..875a0ac6 100644
--- a/apps/workers/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts
@@ -1,6 +1,7 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import { fetchWithProxy } from "network";
+import { withWorkerTracing } from "workerTracing";
import { db } from "@karakeep/db";
import { bookmarks, webhooksTable } from "@karakeep/db/schema";
@@ -19,7 +20,7 @@ export class WebhookWorker {
const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>(
WebhookQueue,
{
- run: runWebhook,
+ run: withWorkerTracing("webhookWorker.run", runWebhook),
onComplete: async (job) => {
workerStatsCounter.labels("webhook", "completed").inc();
const jobId = job.id;