diff options
Diffstat (limited to 'apps/workers')
21 files changed, 2709 insertions, 891 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index b605b50f..c7b9533d 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -3,9 +3,22 @@ import "dotenv/config"; import { buildServer } from "server"; import { + AdminMaintenanceQueue, + AssetPreprocessingQueue, + BackupQueue, + FeedQueue, + initTracing, + LinkCrawlerQueue, loadAllPlugins, + LowPriorityCrawlerQueue, + OpenAIQueue, prepareQueue, + RuleEngineQueue, + SearchIndexingQueue, + shutdownTracing, startQueue, + VideoWorkerQueue, + WebhookQueue, } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; @@ -16,6 +29,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { ImportWorker } from "./workers/importWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; @@ -23,19 +37,53 @@ import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; const workerBuilders = { - crawler: () => CrawlerWorker.build(), - inference: () => OpenAiWorker.build(), - search: () => SearchIndexingWorker.build(), - adminMaintenance: () => AdminMaintenanceWorker.build(), - video: () => VideoWorker.build(), - feed: () => FeedWorker.build(), - assetPreprocessing: () => AssetPreprocessingWorker.build(), - webhook: () => WebhookWorker.build(), - ruleEngine: () => RuleEngineWorker.build(), - backup: () => BackupWorker.build(), + crawler: async () => { + await LinkCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LinkCrawlerQueue); + }, + lowPriorityCrawler: async () => { + await LowPriorityCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LowPriorityCrawlerQueue); + }, + inference: async () => { + await OpenAIQueue.ensureInit(); + return OpenAiWorker.build(); + }, + search: async () => { + await SearchIndexingQueue.ensureInit(); + return SearchIndexingWorker.build(); + }, + adminMaintenance: async () => { + await AdminMaintenanceQueue.ensureInit(); + return AdminMaintenanceWorker.build(); + }, + video: async () => { + await VideoWorkerQueue.ensureInit(); + return VideoWorker.build(); + }, + feed: async () => { + await FeedQueue.ensureInit(); + return FeedWorker.build(); + }, + assetPreprocessing: async () => { + await AssetPreprocessingQueue.ensureInit(); + return AssetPreprocessingWorker.build(); + }, + webhook: async () => { + await WebhookQueue.ensureInit(); + return WebhookWorker.build(); + }, + ruleEngine: async () => { + await RuleEngineQueue.ensureInit(); + return RuleEngineWorker.build(); + }, + backup: async () => { + await BackupQueue.ensureInit(); + return BackupWorker.build(); + }, } as const; -type WorkerName = keyof typeof workerBuilders; +type WorkerName = keyof typeof workerBuilders | "import"; const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); @@ -51,6 +99,7 @@ function isWorkerEnabled(name: WorkerName) { async function main() { await loadAllPlugins(); + initTracing("workers"); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); await prepareQueue(); @@ -75,10 +124,19 @@ async function main() { BackupSchedulingWorker.start(); } + // Start import polling worker + let importWorker: ImportWorker | null = null; + let importWorkerPromise: Promise<void> | null = null; + if (isWorkerEnabled("import")) { + importWorker = new ImportWorker(); + importWorkerPromise = importWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), httpServer.serve(), + ...(importWorkerPromise ? [importWorkerPromise] : []), ]), shutdownPromise, ]); @@ -93,10 +151,14 @@ async function main() { if (workers.some((w) => w.name === "backup")) { BackupSchedulingWorker.stop(); } + if (importWorker) { + importWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } await httpServer.stop(); + await shutdownTracing(); process.exit(0); } diff --git a/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts b/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts new file mode 100644 index 00000000..ea9bf2e9 --- /dev/null +++ b/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts @@ -0,0 +1,77 @@ +import type { Rules } from "metascraper"; + +/** + * Improved Amazon metascraper plugin that fixes image extraction. + * + * The default metascraper-amazon package uses `.a-dynamic-image` selector + * which matches the FIRST element with that class. On amazon.com pages, + * this is often the Prime logo instead of the product image. + * + * This plugin uses more specific selectors to target the actual product + * image: + * - #landingImage: The main product image ID + * - #imgTagWrapperId img: Fallback container for product images + * - #imageBlock img: Additional fallback for newer Amazon layouts + * + * By placing this plugin BEFORE metascraperAmazon() in the plugin chain, + * we ensure the correct image is extracted while keeping all other Amazon + * metadata (title, brand, description) from the original plugin. + */ + +const REGEX_AMAZON_URL = + /https?:\/\/(.*amazon\..*\/.*|.*amzn\..*\/.*|.*a\.co\/.*)/i; + +const test = ({ url }: { url: string }): boolean => REGEX_AMAZON_URL.test(url); + +const metascraperAmazonImproved = () => { + const rules: Rules = { + pkgName: "metascraper-amazon-improved", + test, + image: ({ htmlDom }) => { + // Try the main product image ID first (most reliable) + // Prefer data-old-hires attribute for high-resolution images + const landingImageHires = htmlDom("#landingImage").attr("data-old-hires"); + if (landingImageHires) { + return landingImageHires; + } + + const landingImageSrc = htmlDom("#landingImage").attr("src"); + if (landingImageSrc) { + return landingImageSrc; + } + + // Fallback to image block container + const imgTagHires = htmlDom("#imgTagWrapperId img").attr( + "data-old-hires", + ); + if (imgTagHires) { + return imgTagHires; + } + + const imgTagSrc = htmlDom("#imgTagWrapperId img").attr("src"); + if (imgTagSrc) { + return imgTagSrc; + } + + // Additional fallback for newer Amazon layouts + const imageBlockHires = htmlDom("#imageBlock img") + .first() + .attr("data-old-hires"); + if (imageBlockHires) { + return imageBlockHires; + } + + const imageBlockSrc = htmlDom("#imageBlock img").first().attr("src"); + if (imageBlockSrc) { + return imageBlockSrc; + } + + // Return undefined to allow next plugin to try + return undefined; + }, + }; + + return rules; +}; + +export default metascraperAmazonImproved; diff --git a/apps/workers/metascraper-plugins/metascraper-reddit.ts b/apps/workers/metascraper-plugins/metascraper-reddit.ts index 1fbee3ea..a5de5fe3 100644 --- a/apps/workers/metascraper-plugins/metascraper-reddit.ts +++ b/apps/workers/metascraper-plugins/metascraper-reddit.ts @@ -1,4 +1,8 @@ -import type { Rules } from "metascraper"; +import type { CheerioAPI } from "cheerio"; +import type { Rules, RulesOptions } from "metascraper"; +import { decode as decodeHtmlEntities } from "html-entities"; +import { fetchWithProxy } from "network"; +import { z } from "zod"; import logger from "@karakeep/shared/logger"; @@ -28,15 +32,267 @@ import logger from "@karakeep/shared/logger"; * will return 'undefined' and the next plugin * should continue to attempt to extract images. * - * Note: there is another way to accomplish this. - * If '.json' is appended to a Reddit url, the - * server will provide a JSON document summarizing - * the post. If there are preview images, they are - * included in a section of the JSON. To prevent - * additional server requests, this method is not - * currently being used. + * We also attempt to fetch the Reddit JSON response + * (by appending '.json' to the URL) to grab the + * title and preview images directly from the API. **/ +const redditPreviewImageSchema = z.object({ + source: z.object({ url: z.string().optional() }).optional(), + resolutions: z.array(z.object({ url: z.string().optional() })).optional(), +}); + +const redditMediaMetadataItemSchema = z.object({ + s: z.object({ u: z.string().optional() }).optional(), + p: z.array(z.object({ u: z.string().optional() })).optional(), +}); + +const redditPostSchema = z.object({ + title: z.string().optional(), + preview: z + .object({ images: z.array(redditPreviewImageSchema).optional() }) + .optional(), + url_overridden_by_dest: z.string().optional(), + url: z.string().optional(), + thumbnail: z.string().optional(), + media_metadata: z.record(redditMediaMetadataItemSchema).optional(), + author: z.string().optional(), + created_utc: z.number().optional(), + selftext: z.string().nullish(), + selftext_html: z.string().nullish(), + subreddit_name_prefixed: z.string().optional(), +}); + +type RedditPostData = z.infer<typeof redditPostSchema>; + +const redditResponseSchema = z.array( + z.object({ + data: z.object({ + children: z.array(z.object({ data: redditPostSchema })).optional(), + }), + }), +); + +interface RedditFetchResult { + fetched: boolean; + post?: RedditPostData; +} + +const REDDIT_CACHE_TTL_MS = 60 * 1000; // 1 minute TTL to avoid stale data + +interface RedditCacheEntry { + expiresAt: number; + promise: Promise<RedditFetchResult>; +} + +const redditJsonCache = new Map<string, RedditCacheEntry>(); + +const purgeExpiredCacheEntries = (now: number) => { + for (const [key, entry] of redditJsonCache.entries()) { + if (entry.expiresAt <= now) { + redditJsonCache.delete(key); + } + } +}; + +const decodeRedditUrl = (url?: string): string | undefined => { + if (!url) { + return undefined; + } + const decoded = decodeHtmlEntities(url); + return decoded || undefined; +}; + +const buildJsonUrl = (url: string): string => { + const urlObj = new URL(url); + + if (!urlObj.pathname.endsWith(".json")) { + urlObj.pathname = urlObj.pathname.replace(/\/?$/, ".json"); + } + + return urlObj.toString(); +}; + +const extractImageFromMediaMetadata = ( + media_metadata?: RedditPostData["media_metadata"], +): string | undefined => { + if (!media_metadata) { + return undefined; + } + const firstItem = Object.values(media_metadata)[0]; + if (!firstItem) { + return undefined; + } + + return ( + decodeRedditUrl(firstItem.s?.u) ?? + decodeRedditUrl(firstItem.p?.[0]?.u) ?? + undefined + ); +}; + +const isRedditImageHost = (urlCandidate: string): boolean => { + try { + const hostname = new URL(urlCandidate).hostname; + return hostname.includes("redd.it"); + } catch { + return false; + } +}; + +const extractImageFromPost = (post: RedditPostData): string | undefined => { + const previewImage = post.preview?.images?.[0]; + const previewUrl = + decodeRedditUrl(previewImage?.source?.url) ?? + decodeRedditUrl(previewImage?.resolutions?.[0]?.url); + if (previewUrl) { + return previewUrl; + } + + const mediaUrl = extractImageFromMediaMetadata(post.media_metadata); + if (mediaUrl) { + return mediaUrl; + } + + const directUrl = + decodeRedditUrl(post.url_overridden_by_dest) ?? + decodeRedditUrl(post.url) ?? + decodeRedditUrl(post.thumbnail); + + if (directUrl && isRedditImageHost(directUrl)) { + return directUrl; + } + + return undefined; +}; + +const extractTitleFromPost = (post: RedditPostData): string | undefined => + post.title?.trim() || undefined; + +const extractAuthorFromPost = (post: RedditPostData): string | undefined => + post.author?.trim() || undefined; + +const extractDateFromPost = (post: RedditPostData): string | undefined => { + if (!post.created_utc) { + return undefined; + } + const date = new Date(post.created_utc * 1000); + return Number.isNaN(date.getTime()) ? undefined : date.toISOString(); +}; + +const extractPublisherFromPost = (post: RedditPostData): string | undefined => + post.subreddit_name_prefixed?.trim() || "Reddit"; + +const REDDIT_LOGO_URL = + "https://www.redditstatic.com/desktop2x/img/favicon/android-icon-192x192.png"; + +const fallbackDomImage = ({ htmlDom }: { htmlDom: CheerioAPI }) => { + // 'preview' subdomain images are more likely to be what we're after + // but it could be in the 'i' subdomain. + // returns undefined if neither exists + const previewImages = htmlDom('img[src*="preview.redd.it"]') + .map((_, el) => htmlDom(el).attr("src")) + .get(); + const iImages = htmlDom('img[src*="i.redd.it"]') + .map((_, el) => htmlDom(el).attr("src")) + .get(); + return previewImages[0] || iImages[0]; +}; + +const fallbackDomTitle = ({ htmlDom }: { htmlDom: CheerioAPI }) => { + const title: string | undefined = htmlDom("shreddit-title[title]") + .first() + .attr("title"); + const postTitle: string | undefined = + title ?? htmlDom("shreddit-post[post-title]").first().attr("post-title"); + return postTitle ? postTitle.trim() : undefined; +}; + +const fetchRedditPostData = async (url: string): Promise<RedditFetchResult> => { + const cached = redditJsonCache.get(url); + const now = Date.now(); + + purgeExpiredCacheEntries(now); + + if (cached && cached.expiresAt > now) { + return cached.promise; + } + + const promise = (async () => { + let jsonUrl: string; + try { + jsonUrl = buildJsonUrl(url); + } catch (error) { + logger.warn( + "[MetascraperReddit] Failed to construct Reddit JSON URL", + error, + ); + return { fetched: false }; + } + + let response; + try { + response = await fetchWithProxy(jsonUrl, { + headers: { accept: "application/json" }, + }); + } catch (error) { + logger.warn( + `[MetascraperReddit] Failed to fetch Reddit JSON for ${jsonUrl}`, + error, + ); + return { fetched: false }; + } + + if (response.status === 403) { + // API forbidden; fall back to DOM scraping. + return { fetched: false }; + } + + if (!response.ok) { + logger.warn( + `[MetascraperReddit] Reddit JSON request failed for ${jsonUrl} with status ${response.status}`, + ); + return { fetched: false }; + } + + let payload: unknown; + try { + payload = await response.json(); + } catch (error) { + logger.warn( + `[MetascraperReddit] Failed to parse Reddit JSON for ${jsonUrl}`, + error, + ); + return { fetched: false }; + } + + const parsed = redditResponseSchema.safeParse(payload); + if (!parsed.success) { + logger.warn( + "[MetascraperReddit] Reddit JSON schema validation failed", + parsed.error, + ); + return { fetched: false }; + } + + const firstListingWithChildren = parsed.data.find( + (listing) => (listing.data.children?.length ?? 0) > 0, + ); + + return { + fetched: true, + post: firstListingWithChildren?.data.children?.[0]?.data, + }; + })(); + + redditJsonCache.set(url, { + promise, + expiresAt: now + REDDIT_CACHE_TTL_MS, + }); + + return promise; +}; + const domainFromUrl = (url: string): string => { /** * First-party metascraper plugins import metascraper-helpers, @@ -71,27 +327,71 @@ const metascraperReddit = () => { const rules: Rules = { pkgName: "metascraper-reddit", test, - image: ({ htmlDom }) => { - // 'preview' subdomain images are more likely to be what we're after - // but it could be in the 'i' subdomain. - // returns undefined if neither exists - const previewImages = htmlDom('img[src*="preview.redd.it"]') - .map((i, el) => htmlDom(el).attr("src")) - .get(); - const iImages = htmlDom('img[src*="i.redd.it"]') - .map((i, el) => htmlDom(el).attr("src")) - .get(); - return previewImages[0] || iImages[0]; - }, - title: ({ htmlDom }) => { - const title: string | undefined = htmlDom("shreddit-title[title]") - .first() - .attr("title"); - const postTitle: string | undefined = - title ?? - htmlDom("shreddit-post[post-title]").first().attr("post-title"); - return postTitle ? postTitle.trim() : undefined; - }, + image: (async ({ url, htmlDom }: { url: string; htmlDom: CheerioAPI }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + const redditImage = extractImageFromPost(result.post); + if (redditImage) { + return redditImage; + } + } + + // If we successfully fetched JSON but found no Reddit image, + // avoid falling back to random DOM images. + if (result.fetched) { + return undefined; + } + + return fallbackDomImage({ htmlDom }); + }) as unknown as RulesOptions, + title: (async ({ url, htmlDom }: { url: string; htmlDom: CheerioAPI }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + const redditTitle = extractTitleFromPost(result.post); + if (redditTitle) { + return redditTitle; + } + } + + return fallbackDomTitle({ htmlDom }); + }) as unknown as RulesOptions, + author: (async ({ url }: { url: string }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + return extractAuthorFromPost(result.post); + } + return undefined; + }) as unknown as RulesOptions, + datePublished: (async ({ url }: { url: string }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + return extractDateFromPost(result.post); + } + return undefined; + }) as unknown as RulesOptions, + publisher: (async ({ url }: { url: string }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + return extractPublisherFromPost(result.post); + } + return undefined; + }) as unknown as RulesOptions, + logo: (async ({ url }: { url: string }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + return REDDIT_LOGO_URL; + } + return undefined; + }) as unknown as RulesOptions, + readableContentHtml: (async ({ url }: { url: string }) => { + const result = await fetchRedditPostData(url); + if (result.post) { + const decoded = decodeHtmlEntities(result.post.selftext_html ?? ""); + // The post has no content, return the title + return (decoded || result.post.title) ?? null; + } + return undefined; + }) as unknown as RulesOptions, }; return rules; diff --git a/apps/workers/metrics.ts b/apps/workers/metrics.ts index 3dc4d2c0..42b5aa46 100644 --- a/apps/workers/metrics.ts +++ b/apps/workers/metrics.ts @@ -1,7 +1,7 @@ import { prometheus } from "@hono/prometheus"; -import { Counter, Registry } from "prom-client"; +import { Counter, Histogram, Registry } from "prom-client"; -const registry = new Registry(); +export const registry = new Registry(); export const { printMetrics } = prometheus({ registry: registry, @@ -21,5 +21,15 @@ export const crawlerStatusCodeCounter = new Counter({ labelNames: ["status_code"], }); +export const bookmarkCrawlLatencyHistogram = new Histogram({ + name: "karakeep_bookmark_crawl_latency_seconds", + help: "Latency from bookmark creation to crawl completion (excludes recrawls and imports)", + buckets: [ + 0.1, 0.25, 0.5, 1, 2.5, 5, 7.5, 10, 15, 20, 30, 45, 60, 90, 120, 180, 300, + 600, 900, 1200, + ], +}); + registry.registerMetric(workerStatsCounter); registry.registerMetric(crawlerStatusCodeCounter); +registry.registerMetric(bookmarkCrawlLatencyHistogram); diff --git a/apps/workers/network.ts b/apps/workers/network.ts index 0dc46da4..2ef8483f 100644 --- a/apps/workers/network.ts +++ b/apps/workers/network.ts @@ -86,6 +86,15 @@ function isAddressForbidden(address: string): boolean { return DISALLOWED_IP_RANGES.has(parsed.range()); } +export function getBookmarkDomain(url?: string | null): string | undefined { + if (!url) return undefined; + try { + return new URL(url).hostname; + } catch { + return undefined; + } +} + export type UrlValidationResult = | { ok: true; url: URL } | { ok: false; reason: string }; @@ -163,7 +172,7 @@ export async function validateUrl( if (isAddressForbidden(hostname)) { return { ok: false, - reason: `Refusing to access disallowed IP address ${hostname} (requested via ${parsedUrl.toString()})`, + reason: `Refusing to access disallowed IP address ${hostname} (requested via ${parsedUrl.toString()}). You can use CRAWLER_ALLOWED_INTERNAL_HOSTNAMES to allowlist specific hostnames for internal access.`, } as const; } return { ok: true, url: parsedUrl } as const; diff --git a/apps/workers/package.json b/apps/workers/package.json index 7a5a1c81..fdec2ebf 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -22,6 +22,7 @@ "drizzle-orm": "^0.44.2", "execa": "9.3.1", "hono": "^4.10.6", + "html-entities": "^2.6.0", "http-proxy-agent": "^7.0.2", "https-proxy-agent": "^7.0.6", "ipaddr.js": "^2.2.0", @@ -52,7 +53,7 @@ "prom-client": "^15.1.3", "puppeteer-extra-plugin-stealth": "^2.11.2", "rss-parser": "^3.13.0", - "tesseract.js": "^5.1.1", + "tesseract.js": "^7.0.0", "tsx": "^4.8.1", "typescript": "^5.9", "zod": "^3.24.2" diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts new file mode 100644 index 00000000..3ff16d1c --- /dev/null +++ b/apps/workers/workerTracing.ts @@ -0,0 +1,43 @@ +import type { DequeuedJob } from "@karakeep/shared/queueing"; +import { getTracer, withSpan } from "@karakeep/shared-server"; + +const tracer = getTracer("@karakeep/workers"); + +type WorkerRunFn<TData, TResult = void> = ( + job: DequeuedJob<TData>, +) => Promise<TResult>; + +/** + * Wraps a worker run function with OpenTelemetry tracing. + * Creates a span for each job execution and automatically handles error recording. + * + * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run") + * @param fn - The worker run function to wrap + * @returns A wrapped function that executes within a traced span + * + * @example + * ```ts + * const run = withWorkerTracing("feedWorker.run", async (job) => { + * // Your worker logic here + * }); + * ``` + */ +export function withWorkerTracing<TData, TResult = void>( + name: string, + fn: WorkerRunFn<TData, TResult>, +): WorkerRunFn<TData, TResult> { + return async (job: DequeuedJob<TData>): Promise<TResult> => { + return await withSpan( + tracer, + name, + { + attributes: { + "job.id": job.id, + "job.priority": job.priority, + "job.runNumber": job.runNumber, + }, + }, + () => fn(job), + ); + }; +} diff --git a/apps/workers/workerUtils.ts b/apps/workers/workerUtils.ts index 3eaf5b4b..48e3b277 100644 --- a/apps/workers/workerUtils.ts +++ b/apps/workers/workerUtils.ts @@ -31,9 +31,13 @@ export async function getBookmarkDetails(bookmarkId: string) { return {
url: bookmark.link.url,
userId: bookmark.userId,
+ createdAt: bookmark.createdAt,
+ crawledAt: bookmark.link.crawledAt,
screenshotAssetId: bookmark.assets.find(
(a) => a.assetType == AssetTypes.LINK_SCREENSHOT,
)?.id,
+ pdfAssetId: bookmark.assets.find((a) => a.assetType == AssetTypes.LINK_PDF)
+ ?.id,
imageAssetId: bookmark.assets.find(
(a) => a.assetType == AssetTypes.LINK_BANNER_IMAGE,
)?.id,
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts index e5312964..92d52a22 100644 --- a/apps/workers/workers/adminMaintenanceWorker.ts +++ b/apps/workers/workers/adminMaintenanceWorker.ts @@ -1,4 +1,5 @@ import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import { AdminMaintenanceQueue, @@ -20,7 +21,10 @@ export class AdminMaintenanceWorker { (await getQueueClient())!.createRunner<ZAdminMaintenanceTask>( AdminMaintenanceQueue, { - run: runAdminMaintenance, + run: withWorkerTracing( + "adminMaintenanceWorker.run", + runAdminMaintenance, + ), onComplete: (job) => { workerStatsCounter .labels(`adminMaintenance:${job.data.type}`, "completed") diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index ff16906d..d12457d3 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -4,6 +4,7 @@ import { workerStatsCounter } from "metrics"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; +import { withWorkerTracing } from "workerTracing"; import type { AssetPreprocessingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -22,7 +23,9 @@ import { } from "@karakeep/shared-server"; import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; +import { InferenceClientFactory } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; +import { buildOCRPrompt } from "@karakeep/shared/prompts"; import { DequeuedJob, EnqueueOptions, @@ -36,7 +39,7 @@ export class AssetPreprocessingWorker { (await getQueueClient())!.createRunner<AssetPreprocessingRequest>( AssetPreprocessingQueue, { - run: run, + run: withWorkerTracing("assetPreprocessingWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("assetPreprocessing", "completed").inc(); const jobId = job.id; @@ -62,7 +65,7 @@ export class AssetPreprocessingWorker { { concurrency: serverConfig.assetPreprocessing.numWorkers, pollIntervalMs: 1000, - timeoutSecs: 30, + timeoutSecs: serverConfig.assetPreprocessing.jobTimeoutSec, }, ); @@ -88,6 +91,36 @@ async function readImageText(buffer: Buffer) { } } +async function readImageTextWithLLM( + buffer: Buffer, + contentType: string, +): Promise<string | null> { + const inferenceClient = InferenceClientFactory.build(); + if (!inferenceClient) { + logger.warn( + "[assetPreprocessing] LLM OCR is enabled but no inference client is configured. Falling back to Tesseract.", + ); + return readImageText(buffer); + } + + const base64 = buffer.toString("base64"); + const prompt = buildOCRPrompt(); + + const response = await inferenceClient.inferFromImage( + prompt, + contentType, + base64, + { schema: null }, + ); + + const extractedText = response.response.trim(); + if (!extractedText) { + return null; + } + + return extractedText; +} + async function readPDFText(buffer: Buffer): Promise<{ text: string; metadata: Record<string, object>; @@ -199,6 +232,7 @@ export async function extractAndSavePDFScreenshot( async function extractAndSaveImageText( jobId: string, asset: Buffer, + contentType: string, bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>, isFixMode: boolean, ): Promise<boolean> { @@ -212,16 +246,31 @@ async function extractAndSaveImageText( } } let imageText = null; - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, - ); - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, + + if (serverConfig.ocr.useLLM) { + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from image using LLM OCR.`, ); + try { + imageText = await readImageTextWithLLM(asset, contentType); + } catch (e) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to read image text with LLM: ${e}`, + ); + } + } else { + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from image using Tesseract.`, + ); + try { + imageText = await readImageText(asset); + } catch (e) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, + ); + } } + if (!imageText) { return false; } @@ -313,7 +362,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { ); } - const { asset } = await readAsset({ + const { asset, metadata } = await readAsset({ userId: bookmark.userId, assetId: bookmark.asset.assetId, }); @@ -330,6 +379,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { const extractedText = await extractAndSaveImageText( jobId, asset, + metadata.contentType, bookmark, isFixMode, ); diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts index c2d1ae5a..01f54b28 100644 --- a/apps/workers/workers/backupWorker.ts +++ b/apps/workers/workers/backupWorker.ts @@ -8,6 +8,7 @@ import archiver from "archiver"; import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import cron from "node-cron"; +import { withWorkerTracing } from "workerTracing"; import type { ZBackupRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -107,7 +108,7 @@ export class BackupWorker { const worker = (await getQueueClient())!.createRunner<ZBackupRequest>( BackupQueue, { - run: run, + run: withWorkerTracing("backupWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("backup", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 740d5dac..9815571e 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -9,7 +9,7 @@ import { PlaywrightBlocker } from "@ghostery/adblocker-playwright"; import { Readability } from "@mozilla/readability"; import { Mutex } from "async-mutex"; import DOMPurify from "dompurify"; -import { eq } from "drizzle-orm"; +import { and, eq } from "drizzle-orm"; import { execa } from "execa"; import { exitAbortController } from "exit"; import { HttpProxyAgent } from "http-proxy-agent"; @@ -27,9 +27,14 @@ import metascraperTitle from "metascraper-title"; import metascraperUrl from "metascraper-url"; import metascraperX from "metascraper-x"; import metascraperYoutube from "metascraper-youtube"; -import { crawlerStatusCodeCounter, workerStatsCounter } from "metrics"; +import { + bookmarkCrawlLatencyHistogram, + crawlerStatusCodeCounter, + workerStatsCounter, +} from "metrics"; import { fetchWithProxy, + getBookmarkDomain, getRandomProxy, matchesNoProxy, validateUrl, @@ -37,6 +42,7 @@ import { import { Browser, BrowserContextOptions } from "playwright"; import { chromium } from "playwright-extra"; import StealthPlugin from "puppeteer-extra-plugin-stealth"; +import { withWorkerTracing } from "workerTracing"; import { getBookmarkDetails, updateAsset } from "workerUtils"; import { z } from "zod"; @@ -52,12 +58,14 @@ import { } from "@karakeep/db/schema"; import { AssetPreprocessingQueue, - LinkCrawlerQueue, + getTracer, OpenAIQueue, QuotaService, + setSpanAttributes, triggerSearchReindex, triggerWebhook, VideoWorkerQueue, + withSpan, zCrawlLinkRequestSchema, } from "@karakeep/shared-server"; import { @@ -75,15 +83,21 @@ import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { DequeuedJob, + DequeuedJobError, EnqueueOptions, getQueueClient, + Queue, + QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; +import metascraperAmazonImproved from "../metascraper-plugins/metascraper-amazon-improved"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; +const tracer = getTracer("@karakeep/workers"); + function abortPromise(signal: AbortSignal): Promise<never> { if (signal.aborted) { const p = Promise.reject(signal.reason ?? new Error("AbortError")); @@ -125,6 +139,7 @@ const metascraperParser = metascraper([ dateModified: true, datePublished: true, }), + metascraperAmazonImproved(), // Fix image extraction bug - must come before metascraperAmazon() metascraperAmazon(), metascraperYoutube({ gotOpts: { @@ -185,7 +200,7 @@ const cookieSchema = z.object({ const cookiesSchema = z.array(cookieSchema); interface CrawlerRunResult { - status: "completed" | "rescheduled"; + status: "completed"; } function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] { @@ -288,57 +303,68 @@ async function launchBrowser() { } export class CrawlerWorker { - static async build() { - chromium.use(StealthPlugin()); - if (serverConfig.crawler.enableAdblocker) { - logger.info("[crawler] Loading adblocker ..."); - const globalBlockerResult = await tryCatch( - PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }), - ); - if (globalBlockerResult.error) { - logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, - ); - } else { - globalBlocker = globalBlockerResult.data; - } - } - if (!serverConfig.crawler.browserConnectOnDemand) { - await launchBrowser(); - } else { - logger.info( - "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", - ); + private static initPromise: Promise<void> | null = null; + + private static ensureInitialized() { + if (!CrawlerWorker.initPromise) { + CrawlerWorker.initPromise = (async () => { + chromium.use(StealthPlugin()); + if (serverConfig.crawler.enableAdblocker) { + logger.info("[crawler] Loading adblocker ..."); + const globalBlockerResult = await tryCatch( + PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }), + ); + if (globalBlockerResult.error) { + logger.error( + `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, + ); + } else { + globalBlocker = globalBlockerResult.data; + } + } + if (!serverConfig.crawler.browserConnectOnDemand) { + await launchBrowser(); + } else { + logger.info( + "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", + ); + } + await loadCookiesFromFile(); + })(); } + return CrawlerWorker.initPromise; + } + + static async build(queue: Queue<ZCrawlLinkRequest>) { + await CrawlerWorker.ensureInitialized(); logger.info("Starting crawler worker ..."); - const worker = (await getQueueClient())!.createRunner< + const worker = (await getQueueClient()).createRunner< ZCrawlLinkRequest, CrawlerRunResult >( - LinkCrawlerQueue, + queue, { - run: runCrawler, - onComplete: async (job, result) => { - if (result.status === "rescheduled") { - logger.info( - `[Crawler][${job.id}] Rescheduled due to domain rate limiting`, - ); - return; - } + run: withWorkerTracing("crawlerWorker.run", runCrawler), + onComplete: async (job: DequeuedJob<ZCrawlLinkRequest>) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); const bookmarkId = job.data.bookmarkId; if (bookmarkId) { - await changeBookmarkStatus(bookmarkId, "success"); + await db + .update(bookmarkLinks) + .set({ + crawlStatus: "success", + }) + .where(eq(bookmarkLinks.id, bookmarkId)); } }, - onError: async (job) => { + onError: async (job: DequeuedJobError<ZCrawlLinkRequest>) => { workerStatsCounter.labels("crawler", "failed").inc(); if (job.numRetriesLeft == 0) { workerStatsCounter.labels("crawler", "failed_permanent").inc(); @@ -349,7 +375,36 @@ export class CrawlerWorker { ); const bookmarkId = job.data?.bookmarkId; if (bookmarkId && job.numRetriesLeft == 0) { - await changeBookmarkStatus(bookmarkId, "failure"); + await db.transaction(async (tx) => { + await tx + .update(bookmarkLinks) + .set({ + crawlStatus: "failure", + }) + .where(eq(bookmarkLinks.id, bookmarkId)); + await tx + .update(bookmarks) + .set({ + taggingStatus: null, + }) + .where( + and( + eq(bookmarks.id, bookmarkId), + eq(bookmarks.taggingStatus, "pending"), + ), + ); + await tx + .update(bookmarks) + .set({ + summarizationStatus: null, + }) + .where( + and( + eq(bookmarks.id, bookmarkId), + eq(bookmarks.summarizationStatus, "pending"), + ), + ); + }); } }, }, @@ -360,8 +415,6 @@ export class CrawlerWorker { }, ); - await loadCookiesFromFile(); - return worker; } } @@ -391,239 +444,300 @@ async function loadCookiesFromFile(): Promise<void> { type DBAssetType = typeof assets.$inferInsert; -async function changeBookmarkStatus( - bookmarkId: string, - crawlStatus: "success" | "failure", -) { - await db - .update(bookmarkLinks) - .set({ - crawlStatus, - }) - .where(eq(bookmarkLinks.id, bookmarkId)); -} - async function browserlessCrawlPage( jobId: string, url: string, abortSignal: AbortSignal, ) { - logger.info( - `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, - ); - const response = await fetchWithProxy(url, { - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - logger.info( - `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + return await withSpan( + tracer, + "crawlerWorker.browserlessCrawlPage", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, + async () => { + logger.info( + `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, + ); + const response = await fetchWithProxy(url, { + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + logger.info( + `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + ); + return { + htmlContent: await response.text(), + statusCode: response.status, + screenshot: undefined, + pdf: undefined, + url: response.url, + }; + }, ); - return { - htmlContent: await response.text(), - statusCode: response.status, - screenshot: undefined, - url: response.url, - }; } async function crawlPage( jobId: string, url: string, userId: string, + forceStorePdf: boolean, abortSignal: AbortSignal, ): Promise<{ htmlContent: string; screenshot: Buffer | undefined; + pdf: Buffer | undefined; statusCode: number; url: string; }> { - // Check user's browser crawling setting - const userData = await db.query.users.findFirst({ - where: eq(users.id, userId), - columns: { browserCrawlingEnabled: true }, - }); - if (!userData) { - logger.error(`[Crawler][${jobId}] User ${userId} not found`); - throw new Error(`User ${userId} not found`); - } + return await withSpan( + tracer, + "crawlerWorker.crawlPage", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "crawler.forceStorePdf": forceStorePdf, + }, + }, + async () => { + // Check user's browser crawling setting + const userData = await db.query.users.findFirst({ + where: eq(users.id, userId), + columns: { browserCrawlingEnabled: true }, + }); + if (!userData) { + logger.error(`[Crawler][${jobId}] User ${userId} not found`); + throw new Error(`User ${userId} not found`); + } - const browserCrawlingEnabled = userData.browserCrawlingEnabled; + const browserCrawlingEnabled = userData.browserCrawlingEnabled; - if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) { - return browserlessCrawlPage(jobId, url, abortSignal); - } + if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) { + return browserlessCrawlPage(jobId, url, abortSignal); + } - let browser: Browser | undefined; - if (serverConfig.crawler.browserConnectOnDemand) { - browser = await startBrowserInstance(); - } else { - browser = globalBrowser; - } - if (!browser) { - return browserlessCrawlPage(jobId, url, abortSignal); - } + let browser: Browser | undefined; + if (serverConfig.crawler.browserConnectOnDemand) { + browser = await startBrowserInstance(); + } else { + browser = globalBrowser; + } + if (!browser) { + return browserlessCrawlPage(jobId, url, abortSignal); + } - const proxyConfig = getPlaywrightProxyConfig(); - const isRunningInProxyContext = - proxyConfig !== undefined && - !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []); - const context = await browser.newContext({ - viewport: { width: 1440, height: 900 }, - userAgent: - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - proxy: proxyConfig, - }); + const proxyConfig = getPlaywrightProxyConfig(); + const isRunningInProxyContext = + proxyConfig !== undefined && + !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []); + const context = await browser.newContext({ + viewport: { width: 1440, height: 900 }, + userAgent: + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + proxy: proxyConfig, + }); - try { - if (globalCookies.length > 0) { - await context.addCookies(globalCookies); - logger.info( - `[Crawler][${jobId}] Cookies successfully loaded into browser context`, - ); - } + try { + if (globalCookies.length > 0) { + await context.addCookies(globalCookies); + logger.info( + `[Crawler][${jobId}] Cookies successfully loaded into browser context`, + ); + } - // Create a new page in the context - const page = await context.newPage(); + // Create a new page in the context + const page = await context.newPage(); - // Apply ad blocking - if (globalBlocker) { - await globalBlocker.enableBlockingInPage(page); - } + // Apply ad blocking + if (globalBlocker) { + await globalBlocker.enableBlockingInPage(page); + } - // Block audio/video resources and disallowed sub-requests - await page.route("**/*", async (route) => { - if (abortSignal.aborted) { - await route.abort("aborted"); - return; - } - const request = route.request(); - const resourceType = request.resourceType(); + // Block audio/video resources and disallowed sub-requests + await page.route("**/*", async (route) => { + if (abortSignal.aborted) { + await route.abort("aborted"); + return; + } + const request = route.request(); + const resourceType = request.resourceType(); - // Block audio/video resources - if ( - resourceType === "media" || - request.headers()["content-type"]?.includes("video/") || - request.headers()["content-type"]?.includes("audio/") - ) { - await route.abort("aborted"); - return; - } + // Block audio/video resources + if ( + resourceType === "media" || + request.headers()["content-type"]?.includes("video/") || + request.headers()["content-type"]?.includes("audio/") + ) { + await route.abort("aborted"); + return; + } - const requestUrl = request.url(); - const requestIsRunningInProxyContext = - proxyConfig !== undefined && - !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []); - if ( - requestUrl.startsWith("http://") || - requestUrl.startsWith("https://") - ) { - const validation = await validateUrl( - requestUrl, - requestIsRunningInProxyContext, + const requestUrl = request.url(); + const requestIsRunningInProxyContext = + proxyConfig !== undefined && + !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []); + if ( + requestUrl.startsWith("http://") || + requestUrl.startsWith("https://") + ) { + const validation = await validateUrl( + requestUrl, + requestIsRunningInProxyContext, + ); + if (!validation.ok) { + logger.warn( + `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`, + ); + await route.abort("blockedbyclient"); + return; + } + } + + // Continue with other requests + await route.continue(); + }); + + // Navigate to the target URL + const navigationValidation = await validateUrl( + url, + isRunningInProxyContext, ); - if (!validation.ok) { - logger.warn( - `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`, + if (!navigationValidation.ok) { + throw new Error( + `Disallowed navigation target "${url}": ${navigationValidation.reason}`, ); - await route.abort("blockedbyclient"); - return; } - } - - // Continue with other requests - await route.continue(); - }); + const targetUrl = navigationValidation.url.toString(); + logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`); + const response = await Promise.race([ + page.goto(targetUrl, { + timeout: serverConfig.crawler.navigateTimeoutSec * 1000, + waitUntil: "domcontentloaded", + }), + abortPromise(abortSignal).then(() => null), + ]); - // Navigate to the target URL - const navigationValidation = await validateUrl( - url, - isRunningInProxyContext, - ); - if (!navigationValidation.ok) { - throw new Error( - `Disallowed navigation target "${url}": ${navigationValidation.reason}`, - ); - } - const targetUrl = navigationValidation.url.toString(); - logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`); - const response = await Promise.race([ - page.goto(targetUrl, { - timeout: serverConfig.crawler.navigateTimeoutSec * 1000, - waitUntil: "domcontentloaded", - }), - abortPromise(abortSignal).then(() => null), - ]); + logger.info( + `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`, + ); - logger.info( - `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`, - ); + // Wait until network is relatively idle or timeout after 5 seconds + await Promise.race([ + page + .waitForLoadState("networkidle", { timeout: 5000 }) + .catch(() => ({})), + new Promise((resolve) => setTimeout(resolve, 5000)), + abortPromise(abortSignal), + ]); - // Wait until network is relatively idle or timeout after 5 seconds - await Promise.race([ - page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})), - new Promise((resolve) => setTimeout(resolve, 5000)), - abortPromise(abortSignal), - ]); + abortSignal.throwIfAborted(); - abortSignal.throwIfAborted(); + logger.info( + `[Crawler][${jobId}] Finished waiting for the page to load.`, + ); - logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); + // Extract content from the page + const htmlContent = await page.content(); - // Extract content from the page - const htmlContent = await page.content(); + abortSignal.throwIfAborted(); - abortSignal.throwIfAborted(); + logger.info( + `[Crawler][${jobId}] Successfully fetched the page content.`, + ); - logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); + // Take a screenshot if configured + let screenshot: Buffer | undefined = undefined; + if (serverConfig.crawler.storeScreenshot) { + const { data: screenshotData, error: screenshotError } = + await tryCatch( + Promise.race<Buffer>([ + page.screenshot({ + // If you change this, you need to change the asset type in the store function. + type: "jpeg", + fullPage: serverConfig.crawler.fullPageScreenshot, + quality: 80, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, + ), + ), + abortPromise(abortSignal).then(() => Buffer.from("")), + ]), + ); + abortSignal.throwIfAborted(); + if (screenshotError) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, + ); + } else { + logger.info( + `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, + ); + screenshot = screenshotData; + } + } - // Take a screenshot if configured - let screenshot: Buffer | undefined = undefined; - if (serverConfig.crawler.storeScreenshot) { - const { data: screenshotData, error: screenshotError } = await tryCatch( - Promise.race<Buffer>([ - page.screenshot({ - // If you change this, you need to change the asset type in the store function. - type: "jpeg", - fullPage: serverConfig.crawler.fullPageScreenshot, - quality: 80, - }), - new Promise((_, reject) => - setTimeout( - () => - reject( - "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + // Capture PDF if configured or explicitly requested + let pdf: Buffer | undefined = undefined; + if (serverConfig.crawler.storePdf || forceStorePdf) { + const { data: pdfData, error: pdfError } = await tryCatch( + Promise.race<Buffer>([ + page.pdf({ + format: "A4", + printBackground: true, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, ), - serverConfig.crawler.screenshotTimeoutSec * 1000, - ), - ), - abortPromise(abortSignal).then(() => Buffer.from("")), - ]), - ); - abortSignal.throwIfAborted(); - if (screenshotError) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, - ); - } else { - logger.info( - `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, - ); - screenshot = screenshotData; - } - } + ), + abortPromise(abortSignal).then(() => Buffer.from("")), + ]), + ); + abortSignal.throwIfAborted(); + if (pdfError) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`, + ); + } else { + logger.info( + `[Crawler][${jobId}] Finished capturing page content as PDF`, + ); + pdf = pdfData; + } + } - return { - htmlContent, - statusCode: response?.status() ?? 0, - screenshot, - url: page.url(), - }; - } finally { - await context.close(); - // Only close the browser if it was created on demand - if (serverConfig.crawler.browserConnectOnDemand) { - await browser.close(); - } - } + return { + htmlContent, + statusCode: response?.status() ?? 0, + screenshot, + pdf, + url: page.url(), + }; + } finally { + await context.close(); + // Only close the browser if it was created on demand + if (serverConfig.crawler.browserConnectOnDemand) { + await browser.close(); + } + } + }, + ); } async function extractMetadata( @@ -631,54 +745,82 @@ async function extractMetadata( url: string, jobId: string, ) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + return await withSpan( + tracer, + "crawlerWorker.extractMetadata", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, + async () => { + logger.info( + `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + ); + const meta = await metascraperParser({ + url, + html: htmlContent, + // We don't want to validate the URL again as we've already done it by visiting the page. + // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). + validateUrl: false, + }); + logger.info( + `[Crawler][${jobId}] Done extracting metadata from the page.`, + ); + return meta; + }, ); - const meta = await metascraperParser({ - url, - html: htmlContent, - // We don't want to validate the URL again as we've already done it by visiting the page. - // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). - validateUrl: false, - }); - logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); - return meta; } -function extractReadableContent( +async function extractReadableContent( htmlContent: string, url: string, jobId: string, ) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract readable content ...`, - ); - const virtualConsole = new VirtualConsole(); - const dom = new JSDOM(htmlContent, { url, virtualConsole }); - let result: { content: string } | null = null; - try { - const readableContent = new Readability(dom.window.document).parse(); - if (!readableContent || typeof readableContent.content !== "string") { - return null; - } + return await withSpan( + tracer, + "crawlerWorker.extractReadableContent", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, + async () => { + logger.info( + `[Crawler][${jobId}] Will attempt to extract readable content ...`, + ); + const virtualConsole = new VirtualConsole(); + const dom = new JSDOM(htmlContent, { url, virtualConsole }); + let result: { content: string } | null = null; + try { + const readableContent = new Readability(dom.window.document).parse(); + if (!readableContent || typeof readableContent.content !== "string") { + return null; + } - const purifyWindow = new JSDOM("").window; - try { - const purify = DOMPurify(purifyWindow); - const purifiedHTML = purify.sanitize(readableContent.content); + const purifyWindow = new JSDOM("").window; + try { + const purify = DOMPurify(purifyWindow); + const purifiedHTML = purify.sanitize(readableContent.content); - logger.info(`[Crawler][${jobId}] Done extracting readable content.`); - result = { - content: purifiedHTML, - }; - } finally { - purifyWindow.close(); - } - } finally { - dom.window.close(); - } + logger.info(`[Crawler][${jobId}] Done extracting readable content.`); + result = { + content: purifiedHTML, + }; + } finally { + purifyWindow.close(); + } + } finally { + dom.window.close(); + } - return result; + return result; + }, + ); } async function storeScreenshot( @@ -686,45 +828,111 @@ async function storeScreenshot( userId: string, jobId: string, ) { - if (!serverConfig.crawler.storeScreenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, - ); - return null; - } - if (!screenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, - ); - return null; - } - const assetId = newAssetId(); - const contentType = "image/jpeg"; - const fileName = "screenshot.jpeg"; + return await withSpan( + tracer, + "crawlerWorker.storeScreenshot", + { + attributes: { + "job.id": jobId, + "user.id": userId, + "asset.size": screenshot?.byteLength ?? 0, + }, + }, + async () => { + if (!serverConfig.crawler.storeScreenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, + ); + return null; + } + if (!screenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "image/jpeg"; + const fileName = "screenshot.jpeg"; - // Check storage quota before saving the screenshot - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, screenshot.byteLength), + // Check storage quota before saving the screenshot + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, screenshot.byteLength), + ); + + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } + + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; + }, ); +} - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`, - ); - return null; - } +async function storePdf( + pdf: Buffer | undefined, + userId: string, + jobId: string, +) { + return await withSpan( + tracer, + "crawlerWorker.storePdf", + { + attributes: { + "job.id": jobId, + "user.id": userId, + "asset.size": pdf?.byteLength ?? 0, + }, + }, + async () => { + if (!pdf) { + logger.info( + `[Crawler][${jobId}] Skipping storing the PDF as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "application/pdf"; + const fileName = "page.pdf"; - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - quotaApproved, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`, + // Check storage quota before saving the PDF + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, pdf.byteLength), + ); + + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } + + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: pdf, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`, + ); + return { assetId, contentType, fileName, size: pdf.byteLength }; + }, ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; } async function downloadAndStoreFile( @@ -734,91 +942,106 @@ async function downloadAndStoreFile( fileType: string, abortSignal: AbortSignal, ) { - let assetPath: string | undefined; - try { - logger.info( - `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`, - ); - const response = await fetchWithProxy(url, { - signal: abortSignal, - }); - if (!response.ok || response.body == null) { - throw new Error(`Failed to download ${fileType}: ${response.status}`); - } + return await withSpan( + tracer, + "crawlerWorker.downloadAndStoreFile", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "asset.type": fileType, + }, + }, + async () => { + let assetPath: string | undefined; + try { + logger.info( + `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`, + ); + const response = await fetchWithProxy(url, { + signal: abortSignal, + }); + if (!response.ok || response.body == null) { + throw new Error(`Failed to download ${fileType}: ${response.status}`); + } - const contentType = normalizeContentType( - response.headers.get("content-type"), - ); - if (!contentType) { - throw new Error("No content type in the response"); - } + const contentType = normalizeContentType( + response.headers.get("content-type"), + ); + if (!contentType) { + throw new Error("No content type in the response"); + } - const assetId = newAssetId(); - assetPath = path.join(os.tmpdir(), assetId); + const assetId = newAssetId(); + assetPath = path.join(os.tmpdir(), assetId); - let bytesRead = 0; - const contentLengthEnforcer = new Transform({ - transform(chunk, _, callback) { - bytesRead += chunk.length; + let bytesRead = 0; + const contentLengthEnforcer = new Transform({ + transform(chunk, _, callback) { + bytesRead += chunk.length; - if (abortSignal.aborted) { - callback(new Error("AbortError")); - } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) { - callback( - new Error( - `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`, - ), - ); - } else { - callback(null, chunk); // pass data along unchanged - } - }, - flush(callback) { - callback(); - }, - }); + if (abortSignal.aborted) { + callback(new Error("AbortError")); + } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) { + callback( + new Error( + `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`, + ), + ); + } else { + callback(null, chunk); // pass data along unchanged + } + }, + flush(callback) { + callback(); + }, + }); - await pipeline( - response.body, - contentLengthEnforcer, - fsSync.createWriteStream(assetPath), - ); + await pipeline( + response.body, + contentLengthEnforcer, + fsSync.createWriteStream(assetPath), + ); - // Check storage quota before saving the asset - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, bytesRead), - ); + // Check storage quota before saving the asset + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, bytesRead), + ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`, - ); - return null; - } + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } - await saveAssetFromFile({ - userId, - assetId, - metadata: { contentType }, - assetPath, - quotaApproved, - }); + await saveAssetFromFile({ + userId, + assetId, + metadata: { contentType }, + assetPath, + quotaApproved, + }); - logger.info( - `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`, - ); + logger.info( + `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`, + ); - return { assetId, userId, contentType, size: bytesRead }; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, - ); - return null; - } finally { - if (assetPath) { - await tryCatch(fs.unlink(assetPath)); - } - } + return { assetId, userId, contentType, size: bytesRead }; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, + ); + return null; + } finally { + if (assetPath) { + await tryCatch(fs.unlink(assetPath)); + } + } + }, + ); } async function downloadAndStoreImage( @@ -843,77 +1066,91 @@ async function archiveWebpage( jobId: string, abortSignal: AbortSignal, ) { - logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); - const assetId = newAssetId(); - const assetPath = path.join(os.tmpdir(), assetId); - - let res = await execa({ - input: html, - cancelSignal: abortSignal, - env: { - https_proxy: serverConfig.proxy.httpsProxy - ? getRandomProxy(serverConfig.proxy.httpsProxy) - : undefined, - http_proxy: serverConfig.proxy.httpProxy - ? getRandomProxy(serverConfig.proxy.httpProxy) - : undefined, - no_proxy: serverConfig.proxy.noProxy?.join(","), + return await withSpan( + tracer, + "crawlerWorker.archiveWebpage", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + }, }, - })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); + async () => { + logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); + const assetId = newAssetId(); + const assetPath = path.join(os.tmpdir(), assetId); - if (res.isCanceled) { - logger.error( - `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + let res = await execa({ + input: html, + cancelSignal: abortSignal, + env: { + https_proxy: serverConfig.proxy.httpsProxy + ? getRandomProxy(serverConfig.proxy.httpsProxy) + : undefined, + http_proxy: serverConfig.proxy.httpProxy + ? getRandomProxy(serverConfig.proxy.httpProxy) + : undefined, + no_proxy: serverConfig.proxy.noProxy?.join(","), + }, + })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); - if (res.exitCode !== 0) { - logger.error( - `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + if (res.isCanceled) { + logger.error( + `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - const contentType = "text/html"; + if (res.exitCode !== 0) { + logger.error( + `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - // Get file size and check quota before saving - const stats = await fs.stat(assetPath); - const fileSize = stats.size; + const contentType = "text/html"; - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, fileSize), - ); + // Get file size and check quota before saving + const stats = await fs.stat(assetPath); + const fileSize = stats.size; - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, - ); - await tryCatch(fs.unlink(assetPath)); - return null; - } + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, fileSize), + ); - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - quotaApproved, - }); + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, + ); + await tryCatch(fs.unlink(assetPath)); + return null; + } - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, - ); + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + quotaApproved, + }); - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); + + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; + }, + ); } async function getContentType( @@ -921,26 +1158,45 @@ async function getContentType( jobId: string, abortSignal: AbortSignal, ): Promise<string | null> { - try { - logger.info( - `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, - ); - const response = await fetchWithProxy(url, { - method: "HEAD", - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - const rawContentType = response.headers.get("content-type"); - const contentType = normalizeContentType(rawContentType); - logger.info( - `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, - ); - return contentType; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, - ); - return null; - } + return await withSpan( + tracer, + "crawlerWorker.getContentType", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, + async () => { + try { + logger.info( + `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, + ); + const response = await fetchWithProxy(url, { + method: "GET", + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + setSpanAttributes({ + "crawler.getContentType.statusCode": response.status, + }); + const rawContentType = response.headers.get("content-type"); + const contentType = normalizeContentType(rawContentType); + setSpanAttributes({ + "crawler.contentType": contentType ?? undefined, + }); + logger.info( + `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, + ); + return contentType; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, + ); + return null; + } + }, + ); } /** @@ -959,53 +1215,69 @@ async function handleAsAssetBookmark( bookmarkId: string, abortSignal: AbortSignal, ) { - const downloaded = await downloadAndStoreFile( - url, - userId, - jobId, - assetType, - abortSignal, - ); - if (!downloaded) { - return; - } - const fileName = path.basename(new URL(url).pathname); - await db.transaction(async (trx) => { - await updateAsset( - undefined, - { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.BOOKMARK_ASSET, - contentType: downloaded.contentType, - size: downloaded.size, - fileName, - }, - trx, - ); - await trx.insert(bookmarkAssets).values({ - id: bookmarkId, - assetType, - assetId: downloaded.assetId, - content: null, - fileName, - sourceUrl: url, - }); - // Switch the type of the bookmark from LINK to ASSET - await trx - .update(bookmarks) - .set({ type: BookmarkTypes.ASSET }) - .where(eq(bookmarks.id, bookmarkId)); - await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); - }); - await AssetPreprocessingQueue.enqueue( + return await withSpan( + tracer, + "crawlerWorker.handleAsAssetBookmark", { - bookmarkId, - fixMode: false, + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "bookmark.id": bookmarkId, + "asset.type": assetType, + }, }, - { - groupId: userId, + async () => { + const downloaded = await downloadAndStoreFile( + url, + userId, + jobId, + assetType, + abortSignal, + ); + if (!downloaded) { + return; + } + const fileName = path.basename(new URL(url).pathname); + await db.transaction(async (trx) => { + await updateAsset( + undefined, + { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.BOOKMARK_ASSET, + contentType: downloaded.contentType, + size: downloaded.size, + fileName, + }, + trx, + ); + await trx.insert(bookmarkAssets).values({ + id: bookmarkId, + assetType, + assetId: downloaded.assetId, + content: null, + fileName, + sourceUrl: url, + }); + // Switch the type of the bookmark from LINK to ASSET + await trx + .update(bookmarks) + .set({ type: BookmarkTypes.ASSET }) + .where(eq(bookmarks.id, bookmarkId)); + await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); + }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId, + fixMode: false, + }, + { + groupId: userId, + }, + ); }, ); } @@ -1020,60 +1292,75 @@ async function storeHtmlContent( userId: string, jobId: string, ): Promise<StoreHtmlResult> { - if (!htmlContent) { - return { result: "not_stored" }; - } + return await withSpan( + tracer, + "crawlerWorker.storeHtmlContent", + { + attributes: { + "job.id": jobId, + "user.id": userId, + "bookmark.content.size": htmlContent + ? Buffer.byteLength(htmlContent, "utf8") + : 0, + }, + }, + async () => { + if (!htmlContent) { + return { result: "not_stored" }; + } - const contentSize = Buffer.byteLength(htmlContent, "utf8"); + const contentSize = Buffer.byteLength(htmlContent, "utf8"); - // Only store in assets if content is >= 50KB - if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) { - logger.info( - `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`, - ); - return { result: "store_inline" }; - } + // Only store in assets if content is >= 50KB + if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) { + logger.info( + `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`, + ); + return { result: "store_inline" }; + } - const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, contentSize), - ); - if (quotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`, - ); - return { result: "not_stored" }; - } + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, contentSize), + ); + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`, + ); + return { result: "not_stored" }; + } - const assetId = newAssetId(); + const assetId = newAssetId(); - const { error: saveError } = await tryCatch( - saveAsset({ - userId, - assetId, - asset: Buffer.from(htmlContent, "utf8"), - metadata: { - contentType: ASSET_TYPES.TEXT_HTML, - fileName: null, - }, - quotaApproved, - }), - ); - if (saveError) { - logger.error( - `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`, - ); - throw saveError; - } + const { error: saveError } = await tryCatch( + saveAsset({ + userId, + assetId, + asset: Buffer.from(htmlContent, "utf8"), + metadata: { + contentType: ASSET_TYPES.TEXT_HTML, + fileName: null, + }, + quotaApproved, + }), + ); + if (saveError) { + logger.error( + `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`, + ); + throw saveError; + } - logger.info( - `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, - ); + logger.info( + `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, + ); - return { - result: "stored", - assetId, - size: contentSize, - }; + return { + result: "stored", + assetId, + size: contentSize, + }; + }, + ); } async function crawlAndParseUrl( @@ -1082,268 +1369,352 @@ async function crawlAndParseUrl( jobId: string, bookmarkId: string, oldScreenshotAssetId: string | undefined, + oldPdfAssetId: string | undefined, oldImageAssetId: string | undefined, oldFullPageArchiveAssetId: string | undefined, oldContentAssetId: string | undefined, precrawledArchiveAssetId: string | undefined, archiveFullPage: boolean, + forceStorePdf: boolean, abortSignal: AbortSignal, ) { - let result: { - htmlContent: string; - screenshot: Buffer | undefined; - statusCode: number | null; - url: string; - }; - - if (precrawledArchiveAssetId) { - logger.info( - `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, - ); - const asset = await readAsset({ - userId, - assetId: precrawledArchiveAssetId, - }); - result = { - htmlContent: asset.asset.toString(), - screenshot: undefined, - statusCode: 200, - url, - }; - } else { - result = await crawlPage(jobId, url, userId, abortSignal); - } - abortSignal.throwIfAborted(); - - const { htmlContent, screenshot, statusCode, url: browserUrl } = result; + return await withSpan( + tracer, + "crawlerWorker.crawlAndParseUrl", + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "bookmark.id": bookmarkId, + "crawler.archiveFullPage": archiveFullPage, + "crawler.forceStorePdf": forceStorePdf, + "crawler.hasPrecrawledArchive": !!precrawledArchiveAssetId, + }, + }, + async () => { + let result: { + htmlContent: string; + screenshot: Buffer | undefined; + pdf: Buffer | undefined; + statusCode: number | null; + url: string; + }; - // Track status code in Prometheus - if (statusCode !== null) { - crawlerStatusCodeCounter.labels(statusCode.toString()).inc(); - } + if (precrawledArchiveAssetId) { + logger.info( + `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, + ); + const asset = await readAsset({ + userId, + assetId: precrawledArchiveAssetId, + }); + result = { + htmlContent: asset.asset.toString(), + screenshot: undefined, + pdf: undefined, + statusCode: 200, + url, + }; + } else { + result = await crawlPage( + jobId, + url, + userId, + forceStorePdf, + abortSignal, + ); + } + abortSignal.throwIfAborted(); - const meta = await Promise.race([ - extractMetadata(htmlContent, browserUrl, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); + const { + htmlContent, + screenshot, + pdf, + statusCode, + url: browserUrl, + } = result; - let readableContent = await Promise.race([ - extractReadableContent(htmlContent, browserUrl, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); + // Track status code in Prometheus + if (statusCode !== null) { + crawlerStatusCodeCounter.labels(statusCode.toString()).inc(); + setSpanAttributes({ + "crawler.statusCode": statusCode, + }); + } - const screenshotAssetInfo = await Promise.race([ - storeScreenshot(screenshot, userId, jobId), - abortPromise(abortSignal), - ]); - abortSignal.throwIfAborted(); + const meta = await Promise.race([ + extractMetadata(htmlContent, browserUrl, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); - const htmlContentAssetInfo = await storeHtmlContent( - readableContent?.content, - userId, - jobId, - ); - abortSignal.throwIfAborted(); - let imageAssetInfo: DBAssetType | null = null; - if (meta.image) { - const downloaded = await downloadAndStoreImage( - meta.image, - userId, - jobId, - abortSignal, - ); - if (downloaded) { - imageAssetInfo = { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_BANNER_IMAGE, - contentType: downloaded.contentType, - size: downloaded.size, + const parseDate = (date: string | undefined) => { + if (!date) { + return null; + } + try { + return new Date(date); + } catch { + return null; + } }; - } - } - abortSignal.throwIfAborted(); - const parseDate = (date: string | undefined) => { - if (!date) { - return null; - } - try { - return new Date(date); - } catch { - return null; - } - }; + // Phase 1: Write metadata immediately for fast user feedback. + // Content and asset storage happen later and can be slow (banner + // image download, screenshot/pdf upload, etc.). + await db + .update(bookmarkLinks) + .set({ + title: meta.title, + description: meta.description, + // Don't store data URIs as they're not valid URLs and are usually quite large + imageUrl: meta.image?.startsWith("data:") ? null : meta.image, + favicon: meta.logo, + crawlStatusCode: statusCode, + author: meta.author, + publisher: meta.publisher, + datePublished: parseDate(meta.datePublished), + dateModified: parseDate(meta.dateModified), + }) + .where(eq(bookmarkLinks.id, bookmarkId)); - // TODO(important): Restrict the size of content to store - const assetDeletionTasks: Promise<void>[] = []; - const inlineHtmlContent = - htmlContentAssetInfo.result === "store_inline" - ? (readableContent?.content ?? null) - : null; - readableContent = null; - await db.transaction(async (txn) => { - await txn - .update(bookmarkLinks) - .set({ - title: meta.title, - description: meta.description, - // Don't store data URIs as they're not valid URLs and are usually quite large - imageUrl: meta.image?.startsWith("data:") ? null : meta.image, - favicon: meta.logo, - htmlContent: inlineHtmlContent, - contentAssetId: - htmlContentAssetInfo.result === "stored" - ? htmlContentAssetInfo.assetId - : null, - crawledAt: new Date(), - crawlStatusCode: statusCode, - author: meta.author, - publisher: meta.publisher, - datePublished: parseDate(meta.datePublished), - dateModified: parseDate(meta.dateModified), - }) - .where(eq(bookmarkLinks.id, bookmarkId)); + let readableContent: { content: string } | null = meta.readableContentHtml + ? { content: meta.readableContentHtml } + : null; + if (!readableContent) { + readableContent = await Promise.race([ + extractReadableContent( + meta.contentHtml ?? htmlContent, + browserUrl, + jobId, + ), + abortPromise(abortSignal), + ]); + } + abortSignal.throwIfAborted(); - if (screenshotAssetInfo) { - await updateAsset( - oldScreenshotAssetId, - { - id: screenshotAssetInfo.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_SCREENSHOT, - contentType: screenshotAssetInfo.contentType, - size: screenshotAssetInfo.size, - fileName: screenshotAssetInfo.fileName, - }, - txn, - ); - assetDeletionTasks.push(silentDeleteAsset(userId, oldScreenshotAssetId)); - } - if (imageAssetInfo) { - await updateAsset(oldImageAssetId, imageAssetInfo, txn); - assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId)); - } - if (htmlContentAssetInfo.result === "stored") { - await updateAsset( - oldContentAssetId, - { - id: htmlContentAssetInfo.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_HTML_CONTENT, - contentType: ASSET_TYPES.TEXT_HTML, - size: htmlContentAssetInfo.size, - fileName: null, - }, - txn, - ); - assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); - } else if (oldContentAssetId) { - // Unlink the old content asset - await txn.delete(assets).where(eq(assets.id, oldContentAssetId)); - assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); - } - }); + const screenshotAssetInfo = await Promise.race([ + storeScreenshot(screenshot, userId, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); - // Delete the old assets if any - await Promise.all(assetDeletionTasks); + const pdfAssetInfo = await Promise.race([ + storePdf(pdf, userId, jobId), + abortPromise(abortSignal), + ]); + abortSignal.throwIfAborted(); - return async () => { - if ( - !precrawledArchiveAssetId && - (serverConfig.crawler.fullPageArchive || archiveFullPage) - ) { - const archiveResult = await archiveWebpage( - htmlContent, - browserUrl, + const htmlContentAssetInfo = await storeHtmlContent( + readableContent?.content, userId, jobId, - abortSignal, ); + abortSignal.throwIfAborted(); + let imageAssetInfo: DBAssetType | null = null; + if (meta.image) { + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); + if (downloaded) { + imageAssetInfo = { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_BANNER_IMAGE, + contentType: downloaded.contentType, + size: downloaded.size, + }; + } + } + abortSignal.throwIfAborted(); - if (archiveResult) { - const { - assetId: fullPageArchiveAssetId, - size, - contentType, - } = archiveResult; + // Phase 2: Write content and asset references. + // TODO(important): Restrict the size of content to store + const assetDeletionTasks: Promise<void>[] = []; + const inlineHtmlContent = + htmlContentAssetInfo.result === "store_inline" + ? (readableContent?.content ?? null) + : null; + readableContent = null; + await db.transaction(async (txn) => { + await txn + .update(bookmarkLinks) + .set({ + crawledAt: new Date(), + htmlContent: inlineHtmlContent, + contentAssetId: + htmlContentAssetInfo.result === "stored" + ? htmlContentAssetInfo.assetId + : null, + }) + .where(eq(bookmarkLinks.id, bookmarkId)); - await db.transaction(async (txn) => { + if (screenshotAssetInfo) { await updateAsset( - oldFullPageArchiveAssetId, + oldScreenshotAssetId, { - id: fullPageArchiveAssetId, + id: screenshotAssetInfo.assetId, bookmarkId, userId, - assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, - contentType, - size, + assetType: AssetTypes.LINK_SCREENSHOT, + contentType: screenshotAssetInfo.contentType, + size: screenshotAssetInfo.size, + fileName: screenshotAssetInfo.fileName, + }, + txn, + ); + assetDeletionTasks.push( + silentDeleteAsset(userId, oldScreenshotAssetId), + ); + } + if (pdfAssetInfo) { + await updateAsset( + oldPdfAssetId, + { + id: pdfAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_PDF, + contentType: pdfAssetInfo.contentType, + size: pdfAssetInfo.size, + fileName: pdfAssetInfo.fileName, + }, + txn, + ); + assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId)); + } + if (imageAssetInfo) { + await updateAsset(oldImageAssetId, imageAssetInfo, txn); + assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId)); + } + if (htmlContentAssetInfo.result === "stored") { + await updateAsset( + oldContentAssetId, + { + id: htmlContentAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_HTML_CONTENT, + contentType: ASSET_TYPES.TEXT_HTML, + size: htmlContentAssetInfo.size, fileName: null, }, txn, ); - }); - if (oldFullPageArchiveAssetId) { - await silentDeleteAsset(userId, oldFullPageArchiveAssetId); + assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); + } else if (oldContentAssetId) { + // Unlink the old content asset + await txn.delete(assets).where(eq(assets.id, oldContentAssetId)); + assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId)); } - } - } - }; + }); + + // Delete the old assets if any + await Promise.all(assetDeletionTasks); + + return async () => { + if ( + !precrawledArchiveAssetId && + (serverConfig.crawler.fullPageArchive || archiveFullPage) + ) { + const archiveResult = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); + + if (archiveResult) { + const { + assetId: fullPageArchiveAssetId, + size, + contentType, + } = archiveResult; + + await db.transaction(async (txn) => { + await updateAsset( + oldFullPageArchiveAssetId, + { + id: fullPageArchiveAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, + contentType, + size, + fileName: null, + }, + txn, + ); + }); + if (oldFullPageArchiveAssetId) { + await silentDeleteAsset(userId, oldFullPageArchiveAssetId); + } + } + } + }; + }, + ); } /** - * Checks if the domain should be rate limited and reschedules the job if needed. - * @returns true if the job should continue, false if it was rescheduled + * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed. + * @throws {QueueRetryAfterError} if the domain is rate limited */ -async function checkDomainRateLimit( - url: string, - jobId: string, - jobData: ZCrawlLinkRequest, - userId: string, - jobPriority?: number, -): Promise<boolean> { - const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; - if (!crawlerDomainRateLimitConfig) { - return true; - } - - const rateLimitClient = await getRateLimitClient(); - if (!rateLimitClient) { - return true; - } - - const hostname = new URL(url).hostname; - const rateLimitResult = rateLimitClient.checkRateLimit( +async function checkDomainRateLimit(url: string, jobId: string): Promise<void> { + return await withSpan( + tracer, + "crawlerWorker.checkDomainRateLimit", { - name: "domain-ratelimit", - maxRequests: crawlerDomainRateLimitConfig.maxRequests, - windowMs: crawlerDomainRateLimitConfig.windowMs, + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, }, - hostname, - ); + async () => { + const crawlerDomainRateLimitConfig = + serverConfig.crawler.domainRatelimiting; + if (!crawlerDomainRateLimitConfig) { + return; + } - if (!rateLimitResult.allowed) { - const resetInSeconds = rateLimitResult.resetInSeconds; - // Add jitter to prevent thundering herd: +40% random variation - const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 - const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); - logger.info( - `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, - ); - await LinkCrawlerQueue.enqueue(jobData, { - priority: jobPriority, - delayMs, - groupId: userId, - }); - return false; - } + const rateLimitClient = await getRateLimitClient(); + if (!rateLimitClient) { + return; + } + + const hostname = new URL(url).hostname; + const rateLimitResult = rateLimitClient.checkRateLimit( + { + name: "domain-ratelimit", + maxRequests: crawlerDomainRateLimitConfig.maxRequests, + windowMs: crawlerDomainRateLimitConfig.windowMs, + }, + hostname, + ); - return true; + if (!rateLimitResult.allowed) { + const resetInSeconds = rateLimitResult.resetInSeconds; + // Add jitter to prevent thundering herd: +40% random variation + const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 + const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); + logger.info( + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + ); + throw new QueueRetryAfterError( + `Domain "${hostname}" is rate limited`, + delayMs, + ); + } + }, + ); } async function runCrawler( @@ -1359,28 +1730,21 @@ async function runCrawler( return { status: "completed" }; } - const { bookmarkId, archiveFullPage } = request.data; + const { bookmarkId, archiveFullPage, storePdf } = request.data; const { url, userId, + createdAt, + crawledAt, screenshotAssetId: oldScreenshotAssetId, + pdfAssetId: oldPdfAssetId, imageAssetId: oldImageAssetId, fullPageArchiveAssetId: oldFullPageArchiveAssetId, contentAssetId: oldContentAssetId, precrawledArchiveAssetId, } = await getBookmarkDetails(bookmarkId); - const shouldContinue = await checkDomainRateLimit( - url, - jobId, - job.data, - userId, - job.priority, - ); - - if (!shouldContinue) { - return { status: "rescheduled" }; - } + await checkDomainRateLimit(url, jobId); logger.info( `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, @@ -1421,11 +1785,13 @@ async function runCrawler( jobId, bookmarkId, oldScreenshotAssetId, + oldPdfAssetId, oldImageAssetId, oldFullPageArchiveAssetId, oldContentAssetId, precrawledArchiveAssetId, archiveFullPage, + storePdf ?? false, job.abortSignal, ); @@ -1473,5 +1839,13 @@ async function runCrawler( // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); } + + // Record the latency from bookmark creation to crawl completion. + // Only for first-time, high-priority crawls (excludes recrawls and imports). + if (crawledAt === null && job.priority === 0) { + const latencySeconds = (Date.now() - createdAt.getTime()) / 1000; + bookmarkCrawlLatencyHistogram.observe(latencySeconds); + } + return { status: "completed" }; } diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 2a1334a9..eed7ccb1 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -4,6 +4,7 @@ import { fetchWithProxy } from "network"; import cron from "node-cron"; import Parser from "rss-parser"; import { buildImpersonatingTRPCClient } from "trpc"; +import { withWorkerTracing } from "workerTracing"; import { z } from "zod"; import type { ZFeedRequestSchema } from "@karakeep/shared-server"; @@ -88,7 +89,7 @@ export class FeedWorker { const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>( FeedQueue, { - run: run, + run: withWorkerTracing("feedWorker.run", run), onComplete: async (job) => { workerStatsCounter.labels("feed", "completed").inc(); const jobId = job.id; @@ -155,9 +156,9 @@ async function run(req: DequeuedJob<ZFeedRequestSchema>) { const response = await fetchWithProxy(feed.url, { signal: AbortSignal.timeout(5000), headers: { - UserAgent: + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - Accept: "application/rss+xml", + Accept: "application/rss+xml, application/xml;q=0.9, text/xml;q=0.8", }, }); if (response.status !== 200) { diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts new file mode 100644 index 00000000..e5b5c27e --- /dev/null +++ b/apps/workers/workers/importWorker.ts @@ -0,0 +1,698 @@ +import { TRPCError } from "@trpc/server"; +import { + and, + count, + eq, + gt, + inArray, + isNotNull, + isNull, + lt, + or, +} from "drizzle-orm"; +import { Counter, Gauge, Histogram } from "prom-client"; +import { buildImpersonatingTRPCClient } from "trpc"; + +import { db } from "@karakeep/db"; +import { + bookmarkLinks, + bookmarks, + importSessions, + importStagingBookmarks, +} from "@karakeep/db/schema"; +import { LowPriorityCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; +import logger, { throttledLogger } from "@karakeep/shared/logger"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +import { registry } from "../metrics"; + +// Prometheus metrics +const importStagingProcessedCounter = new Counter({ + name: "karakeep_import_staging_processed_total", + help: "Total number of staged items processed", + labelNames: ["result"], + registers: [registry], +}); + +const importStagingStaleResetCounter = new Counter({ + name: "karakeep_import_staging_stale_reset_total", + help: "Total number of stale processing items reset to pending", + registers: [registry], +}); + +const importStagingInFlightGauge = new Gauge({ + name: "karakeep_import_staging_in_flight", + help: "Current number of in-flight items (processing + recently completed)", + registers: [registry], +}); + +const importSessionsGauge = new Gauge({ + name: "karakeep_import_sessions_active", + help: "Number of active import sessions by status", + labelNames: ["status"], + registers: [registry], +}); + +const importStagingPendingGauge = new Gauge({ + name: "karakeep_import_staging_pending_total", + help: "Total number of pending items in staging table", + registers: [registry], +}); + +const importBatchDurationHistogram = new Histogram({ + name: "karakeep_import_batch_duration_seconds", + help: "Time taken to process a batch of staged items", + buckets: [0.1, 0.5, 1, 2, 5, 10, 30], + registers: [registry], +}); + +const backpressureLogger = throttledLogger(60_000); + +function sleep(ms: number): Promise<void> { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Extract a safe, user-facing error message from an error. + * Avoids leaking internal details like database errors, stack traces, or file paths. + */ +function getSafeErrorMessage(error: unknown): string { + // TRPCError client errors are designed to be user-facing + if (error instanceof TRPCError && error.code !== "INTERNAL_SERVER_ERROR") { + return error.message; + } + + // Known safe validation errors thrown within the import worker + if (error instanceof Error) { + const safeMessages = [ + "URL is required for link bookmarks", + "Content is required for text bookmarks", + ]; + if (safeMessages.includes(error.message)) { + return error.message; + } + } + + return "An unexpected error occurred while processing the bookmark"; +} + +export class ImportWorker { + private running = false; + private pollIntervalMs = 5000; + + // Backpressure settings + private maxInFlight = 50; + private batchSize = 10; + private staleThresholdMs = 60 * 60 * 1000; // 1 hour + + async start() { + this.running = true; + let iterationCount = 0; + + logger.info("[import] Starting import polling worker"); + + while (this.running) { + try { + // Periodically reset stale processing items (every 60 iterations ~= 1 min) + if (iterationCount % 60 === 0) { + await this.resetStaleProcessingItems(); + } + iterationCount++; + + // Check if any processing items have completed downstream work + await this.checkAndCompleteProcessingItems(); + + const processed = await this.processBatch(); + if (processed === 0) { + await this.checkAndCompleteIdleSessions(); + await this.updateGauges(); + // Nothing to do, wait before polling again + await sleep(this.pollIntervalMs); + } else { + await this.updateGauges(); + } + } catch (error) { + logger.error(`[import] Error in polling loop: ${error}`); + await sleep(this.pollIntervalMs); + } + } + } + + stop() { + logger.info("[import] Stopping import polling worker"); + this.running = false; + } + + private async processBatch(): Promise<number> { + const countPendingItems = await this.countPendingItems(); + importStagingPendingGauge.set(countPendingItems); + if (countPendingItems === 0) { + // Nothing to do, wait before polling again + return 0; + } + + // 1. Check backpressure - inflight items + queue sizes + const availableCapacity = await this.getAvailableCapacity(); + + if (availableCapacity <= 0) { + // At capacity, wait before trying again + backpressureLogger( + "info", + `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`, + ); + return 0; + } + + logger.debug( + `[import] ${countPendingItems} pending items, available capacity: ${availableCapacity}`, + ); + + // 2. Get candidate IDs with fair scheduling across users + const batchLimit = Math.min(this.batchSize, availableCapacity); + const candidateIds = await this.getNextBatchFairly(batchLimit); + + if (candidateIds.length === 0) return 0; + + // 3. Atomically claim rows - only rows still pending will be claimed + // This prevents race conditions where multiple workers select the same rows + const batch = await db + .update(importStagingBookmarks) + .set({ status: "processing", processingStartedAt: new Date() }) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importStagingBookmarks.id, candidateIds), + ), + ) + .returning(); + + // If no rows were claimed (another worker got them first), skip processing + if (batch.length === 0) return 0; + + const batchTimer = importBatchDurationHistogram.startTimer(); + + // 4. Mark session(s) as running (using claimed rows, not candidates) + const sessionIds = [...new Set(batch.map((b) => b.importSessionId))]; + logger.info( + `[import] Claimed batch of ${batch.length} items from ${sessionIds.length} session(s): [${sessionIds.join(", ")}]`, + ); + await db + .update(importSessions) + .set({ status: "running" }) + .where( + and( + inArray(importSessions.id, sessionIds), + eq(importSessions.status, "pending"), + ), + ); + + // 5. Process in parallel + const results = await Promise.allSettled( + batch.map((staged) => this.processOneBookmark(staged)), + ); + + const outcomes: Record<string, number> = {}; + for (const r of results) { + const key = r.status === "fulfilled" ? r.value : "error"; + outcomes[key] = (outcomes[key] ?? 0) + 1; + } + logger.debug( + `[import] Batch results: ${Object.entries(outcomes) + .map(([k, v]) => `${k}=${v}`) + .join(", ")}`, + ); + + // 6. Check if any sessions are now complete + await this.checkAndCompleteEmptySessions(sessionIds); + + batchTimer(); // Record batch duration + + return batch.length; + } + + private async updateGauges() { + // Update active sessions gauge by status + const sessions = await db + .select({ + status: importSessions.status, + count: count(), + }) + .from(importSessions) + .where( + inArray(importSessions.status, [ + "staging", + "pending", + "running", + "paused", + ]), + ) + .groupBy(importSessions.status); + + // Reset all status gauges to 0 first + for (const status of ["staging", "pending", "running", "paused"]) { + importSessionsGauge.set({ status }, 0); + } + + // Set actual values + for (const s of sessions) { + importSessionsGauge.set({ status: s.status }, s.count); + } + } + + private async checkAndCompleteIdleSessions() { + const sessions = await db + .select({ id: importSessions.id }) + .from(importSessions) + .where(inArray(importSessions.status, ["pending", "running"])); + + const sessionIds = sessions.map((session) => session.id); + if (sessionIds.length === 0) { + return; + } + + await this.checkAndCompleteEmptySessions(sessionIds); + } + + private async countPendingItems(): Promise<number> { + const res = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .innerJoin( + importSessions, + eq(importStagingBookmarks.importSessionId, importSessions.id), + ) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importSessions.status, ["pending", "running"]), + ), + ); + return res[0]?.count ?? 0; + } + + private async getNextBatchFairly(limit: number): Promise<string[]> { + // Query pending item IDs from active sessions, ordered by: + // 1. User's last-served timestamp (fairness) + // 2. Staging item creation time (FIFO within user) + // Returns only IDs - actual rows will be fetched atomically during claim + const results = await db + .select({ + id: importStagingBookmarks.id, + }) + .from(importStagingBookmarks) + .innerJoin( + importSessions, + eq(importStagingBookmarks.importSessionId, importSessions.id), + ) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importSessions.status, ["pending", "running"]), + ), + ) + .orderBy(importSessions.lastProcessedAt, importStagingBookmarks.createdAt) + .limit(limit); + + return results.map((r) => r.id); + } + + private async attachBookmarkToLists( + caller: Awaited<ReturnType<typeof buildImpersonatingTRPCClient>>, + session: typeof importSessions.$inferSelect, + staged: typeof importStagingBookmarks.$inferSelect, + bookmarkId: string, + ): Promise<void> { + const listIds = new Set<string>(); + + if (session.rootListId) { + listIds.add(session.rootListId); + } + + if (staged.listIds && staged.listIds.length > 0) { + for (const listId of staged.listIds) { + listIds.add(listId); + } + } + + for (const listId of listIds) { + try { + await caller.lists.addToList({ listId, bookmarkId }); + } catch (error) { + logger.warn( + `[import] Failed to add bookmark ${bookmarkId} to list ${listId}: ${error}`, + ); + } + } + } + + private async processOneBookmark( + staged: typeof importStagingBookmarks.$inferSelect, + ): Promise<string> { + const session = await db.query.importSessions.findFirst({ + where: eq(importSessions.id, staged.importSessionId), + }); + + if (!session || session.status === "paused") { + // Session paused mid-batch, reset item to pending + await db + .update(importStagingBookmarks) + .set({ status: "pending" }) + .where(eq(importStagingBookmarks.id, staged.id)); + return "reset"; + } + + try { + // Use existing tRPC mutation via internal caller + // Note: Duplicate detection is handled by createBookmark itself + const caller = await buildImpersonatingTRPCClient(session.userId); + + // Build the request based on bookmark type + type CreateBookmarkInput = Parameters< + typeof caller.bookmarks.createBookmark + >[0]; + + const baseRequest = { + title: staged.title ?? undefined, + note: staged.note ?? undefined, + createdAt: staged.sourceAddedAt ?? undefined, + crawlPriority: "low" as const, + }; + + let bookmarkRequest: CreateBookmarkInput; + + if (staged.type === "link") { + if (!staged.url) { + throw new Error("URL is required for link bookmarks"); + } + bookmarkRequest = { + ...baseRequest, + type: BookmarkTypes.LINK, + url: staged.url, + }; + } else if (staged.type === "text") { + if (!staged.content) { + throw new Error("Content is required for text bookmarks"); + } + bookmarkRequest = { + ...baseRequest, + type: BookmarkTypes.TEXT, + text: staged.content, + }; + } else { + // asset type - skip for now as it needs special handling + await db + .update(importStagingBookmarks) + .set({ + status: "failed", + result: "rejected", + resultReason: "Asset bookmarks not yet supported", + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + await this.updateSessionLastProcessedAt(staged.importSessionId); + return "unsupported"; + } + + const result = await caller.bookmarks.createBookmark(bookmarkRequest); + + // Apply tags via existing mutation (for both new and duplicate bookmarks) + if (staged.tags && staged.tags.length > 0) { + await caller.bookmarks.updateTags({ + bookmarkId: result.id, + attach: staged.tags.map((t) => ({ tagName: t })), + detach: [], + }); + } + + // Handle duplicate case (createBookmark returns alreadyExists: true) + if (result.alreadyExists) { + await db + .update(importStagingBookmarks) + .set({ + status: "completed", + result: "skipped_duplicate", + resultReason: "URL already exists", + resultBookmarkId: result.id, + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + importStagingProcessedCounter.inc({ result: "skipped_duplicate" }); + await this.attachBookmarkToLists(caller, session, staged, result.id); + await this.updateSessionLastProcessedAt(staged.importSessionId); + return "duplicate"; + } + + // Mark as accepted but keep in "processing" until crawl/tag is done + // The item will be moved to "completed" by checkAndCompleteProcessingItems() + await db + .update(importStagingBookmarks) + .set({ + result: "accepted", + resultBookmarkId: result.id, + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + await this.attachBookmarkToLists(caller, session, staged, result.id); + + await this.updateSessionLastProcessedAt(staged.importSessionId); + return "accepted"; + } catch (error) { + logger.error( + `[import] Error processing staged item ${staged.id}: ${error}`, + ); + await db + .update(importStagingBookmarks) + .set({ + status: "failed", + result: "rejected", + resultReason: getSafeErrorMessage(error), + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + importStagingProcessedCounter.inc({ result: "rejected" }); + await this.updateSessionLastProcessedAt(staged.importSessionId); + return "failed"; + } + } + + private async updateSessionLastProcessedAt(sessionId: string) { + await db + .update(importSessions) + .set({ lastProcessedAt: new Date() }) + .where(eq(importSessions.id, sessionId)); + } + + private async checkAndCompleteEmptySessions(sessionIds: string[]) { + for (const sessionId of sessionIds) { + const remaining = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.importSessionId, sessionId), + inArray(importStagingBookmarks.status, ["pending", "processing"]), + ), + ); + + if (remaining[0]?.count === 0) { + logger.info( + `[import] Session ${sessionId} completed, all items processed`, + ); + await db + .update(importSessions) + .set({ status: "completed" }) + .where(eq(importSessions.id, sessionId)); + } + } + } + + /** + * Check processing items that have a bookmark created and mark them as completed + * once downstream processing (crawling/tagging) is done. + */ + private async checkAndCompleteProcessingItems(): Promise<number> { + // Find processing items where: + // - A bookmark was created (resultBookmarkId is set) + // - Downstream processing is complete (crawl/tag not pending) + const completedItems = await db + .select({ + id: importStagingBookmarks.id, + importSessionId: importStagingBookmarks.importSessionId, + crawlStatus: bookmarkLinks.crawlStatus, + taggingStatus: bookmarks.taggingStatus, + }) + .from(importStagingBookmarks) + .leftJoin( + bookmarks, + eq(bookmarks.id, importStagingBookmarks.resultBookmarkId), + ) + .leftJoin( + bookmarkLinks, + eq(bookmarkLinks.id, importStagingBookmarks.resultBookmarkId), + ) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + isNotNull(importStagingBookmarks.resultBookmarkId), + // Crawl is done (not pending) - either success, failure, or null (not a link) + or( + isNull(bookmarkLinks.crawlStatus), + eq(bookmarkLinks.crawlStatus, "success"), + eq(bookmarkLinks.crawlStatus, "failure"), + ), + // Tagging is done (not pending) - either success, failure, or null + or( + isNull(bookmarks.taggingStatus), + eq(bookmarks.taggingStatus, "success"), + eq(bookmarks.taggingStatus, "failure"), + ), + ), + ); + + if (completedItems.length === 0) { + return 0; + } + + const succeededItems = completedItems.filter( + (i) => i.crawlStatus !== "failure" && i.taggingStatus !== "failure", + ); + const failedItems = completedItems.filter( + (i) => i.crawlStatus === "failure" || i.taggingStatus === "failure", + ); + + logger.debug( + `[import] ${completedItems.length} item(s) finished downstream processing (${succeededItems.length} succeeded, ${failedItems.length} failed)`, + ); + + // Mark succeeded items as completed + if (succeededItems.length > 0) { + await db + .update(importStagingBookmarks) + .set({ + status: "completed", + completedAt: new Date(), + }) + .where( + inArray( + importStagingBookmarks.id, + succeededItems.map((i) => i.id), + ), + ); + + importStagingProcessedCounter.inc( + { result: "accepted" }, + succeededItems.length, + ); + } + + // Mark failed items as failed + if (failedItems.length > 0) { + for (const item of failedItems) { + const reason = + item.crawlStatus === "failure" ? "Crawl failed" : "Tagging failed"; + await db + .update(importStagingBookmarks) + .set({ + status: "failed", + result: "rejected", + resultReason: reason, + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, item.id)); + } + + importStagingProcessedCounter.inc( + { result: "rejected" }, + failedItems.length, + ); + } + + // Check if any sessions are now complete + const sessionIds = [ + ...new Set(completedItems.map((i) => i.importSessionId)), + ]; + await this.checkAndCompleteEmptySessions(sessionIds); + + return completedItems.length; + } + + /** + * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues. + */ + private async getAvailableCapacity(): Promise<number> { + const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([ + db + .select({ count: count() }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + gt( + importStagingBookmarks.processingStartedAt, + new Date(Date.now() - this.staleThresholdMs), + ), + ), + ), + LowPriorityCrawlerQueue.stats(), + OpenAIQueue.stats(), + ]); + + const crawlerTotal = + crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry; + const openaiTotal = + openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry; + const processingTotal = processingCount[0]?.count ?? 0; + + const inFlight = Math.max(crawlerTotal, openaiTotal, processingTotal); + importStagingInFlightGauge.set(inFlight); + + return this.maxInFlight - inFlight; + } + + /** + * Reset stale "processing" items back to "pending" so they can be retried. + * Called periodically to handle crashed workers or stuck items. + * + * Only resets items that don't have a resultBookmarkId - those with a bookmark + * are waiting for downstream processing (crawl/tag), not stale. + */ + private async resetStaleProcessingItems(): Promise<number> { + const staleThreshold = new Date(Date.now() - this.staleThresholdMs); + + const staleItems = await db + .select({ id: importStagingBookmarks.id }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + lt(importStagingBookmarks.processingStartedAt, staleThreshold), + // Only reset items that haven't created a bookmark yet + // Items with a bookmark are waiting for downstream, not stale + isNull(importStagingBookmarks.resultBookmarkId), + ), + ); + + if (staleItems.length > 0) { + logger.warn( + `[import] Resetting ${staleItems.length} stale processing items`, + ); + + await db + .update(importStagingBookmarks) + .set({ status: "pending", processingStartedAt: null }) + .where( + inArray( + importStagingBookmarks.id, + staleItems.map((i) => i.id), + ), + ); + + importStagingStaleResetCounter.inc(staleItems.length); + return staleItems.length; + } + + return 0; + } +} diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index eefc1dd8..57ad1a22 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import type { ZOpenAIRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -42,7 +43,7 @@ export class OpenAiWorker { const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>( OpenAIQueue, { - run: runOpenAI, + run: withWorkerTracing("inferenceWorker.run", runOpenAI), onComplete: async (job) => { workerStatsCounter.labels("inference", "completed").inc(); const jobId = job.id; diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 23636961..922eb5b7 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -1,12 +1,17 @@ import { and, eq } from "drizzle-orm"; +import { getBookmarkDomain } from "network"; import { db } from "@karakeep/db"; -import { bookmarks, customPrompts } from "@karakeep/db/schema"; -import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server"; +import { bookmarks, customPrompts, users } from "@karakeep/db/schema"; +import { + setSpanAttributes, + triggerSearchReindex, + ZOpenAIRequest, +} from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClient } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; -import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts.server"; import { DequeuedJob } from "@karakeep/shared/queueing"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; @@ -22,6 +27,7 @@ async function fetchBookmarkDetailsForSummary(bookmarkId: string) { description: true, htmlContent: true, contentAssetId: true, + crawlStatusCode: true, publisher: true, author: true, url: true, @@ -56,6 +62,33 @@ export async function runSummarization( const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId); + // Check user-level preference + const userSettings = await db.query.users.findFirst({ + where: eq(users.id, bookmarkData.userId), + columns: { + autoSummarizationEnabled: true, + inferredTagLang: true, + }, + }); + + setSpanAttributes({ + "user.id": bookmarkData.userId, + "bookmark.id": bookmarkData.id, + "bookmark.url": bookmarkData.link?.url, + "bookmark.domain": getBookmarkDomain(bookmarkData.link?.url), + "bookmark.content.type": bookmarkData.type, + "crawler.statusCode": bookmarkData.link?.crawlStatusCode ?? undefined, + "inference.type": "summarization", + "inference.model": serverConfig.inference.textModel, + }); + + if (userSettings?.autoSummarizationEnabled === false) { + logger.debug( + `[inference][${jobId}] Skipping summarization job for bookmark with id "${bookmarkId}" because user has disabled auto-summarization.`, + ); + return; + } + let textToSummarize = ""; if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) { const link = bookmarkData.link; @@ -105,13 +138,21 @@ URL: ${link.url ?? ""} }, }); + setSpanAttributes({ + "inference.prompt.customCount": prompts.length, + }); + const summaryPrompt = await buildSummaryPrompt( - serverConfig.inference.inferredTagLang, + userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang, prompts.map((p) => p.text), textToSummarize, serverConfig.inference.contextLength, ); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(summaryPrompt, "utf8"), + }); + const summaryResult = await inferenceClient.inferFromText(summaryPrompt, { schema: null, // Summaries are typically free-form text abortSignal: job.abortSignal, @@ -123,6 +164,11 @@ URL: ${link.url ?? ""} ); } + setSpanAttributes({ + "inference.summary.size": Buffer.byteLength(summaryResult.response, "utf8"), + "inference.totalTokens": summaryResult.totalTokens, + }); + logger.info( `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`, ); diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 5a79fd22..668c1d5e 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,4 +1,5 @@ -import { and, Column, eq, inArray, sql } from "drizzle-orm"; +import { and, eq, inArray } from "drizzle-orm"; +import { getBookmarkDomain } from "network"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -7,14 +8,17 @@ import type { InferenceClient, InferenceResponse, } from "@karakeep/shared/inference"; +import type { ZTagStyle } from "@karakeep/shared/types/users"; import { db } from "@karakeep/db"; import { bookmarks, bookmarkTags, customPrompts, tagsOnBookmarks, + users, } from "@karakeep/db/schema"; import { + setSpanAttributes, triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, @@ -22,7 +26,8 @@ import { import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; -import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; +import { buildImagePrompt } from "@karakeep/shared/prompts"; +import { buildTextPrompt } from "@karakeep/shared/prompts.server"; import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; @@ -66,18 +71,21 @@ function parseJsonFromLLMResponse(response: string): unknown { } } -function tagNormalizer(col: Column) { +function tagNormalizer() { + // This function needs to be in sync with the generated normalizedName column in bookmarkTags function normalizeTag(tag: string) { return tag.toLowerCase().replace(/[ \-_]/g, ""); } return { normalizeTag, - sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, }; } async function buildPrompt( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, + tagStyle: ZTagStyle, + inferredTagLang: string, + curatedTags?: string[], ): Promise<string | null> { const prompts = await fetchCustomPrompts(bookmark.userId, "text"); if (bookmark.link) { @@ -95,22 +103,26 @@ async function buildPrompt( return null; } return await buildTextPrompt( - serverConfig.inference.inferredTagLang, + inferredTagLang, prompts, `URL: ${bookmark.link.url} Title: ${bookmark.link.title ?? ""} Description: ${bookmark.link.description ?? ""} Content: ${content ?? ""}`, serverConfig.inference.contextLength, + tagStyle, + curatedTags, ); } if (bookmark.text) { return await buildTextPrompt( - serverConfig.inference.inferredTagLang, + inferredTagLang, prompts, bookmark.text.text ?? "", serverConfig.inference.contextLength, + tagStyle, + curatedTags, ); } @@ -122,6 +134,9 @@ async function inferTagsFromImage( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, abortSignal: AbortSignal, + tagStyle: ZTagStyle, + inferredTagLang: string, + curatedTags?: string[], ): Promise<InferenceResponse | null> { const { asset, metadata } = await readAsset({ userId: bookmark.userId, @@ -141,10 +156,15 @@ async function inferTagsFromImage( } const base64 = asset.toString("base64"); + setSpanAttributes({ + "inference.model": serverConfig.inference.imageModel, + }); return inferenceClient.inferFromImage( buildImagePrompt( - serverConfig.inference.inferredTagLang, + inferredTagLang, await fetchCustomPrompts(bookmark.userId, "images"), + tagStyle, + curatedTags, ), metadata.contentType, base64, @@ -166,6 +186,10 @@ async function fetchCustomPrompts( }, }); + setSpanAttributes({ + "inference.prompt.customCount": prompts.length, + }); + let promptTexts = prompts.map((p) => p.text); if (containsTagsPlaceholder(prompts)) { promptTexts = await replaceTagsPlaceholders(promptTexts, userId); @@ -214,13 +238,24 @@ async function inferTagsFromPDF( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, abortSignal: AbortSignal, + tagStyle: ZTagStyle, + inferredTagLang: string, + curatedTags?: string[], ) { const prompt = await buildTextPrompt( - serverConfig.inference.inferredTagLang, + inferredTagLang, await fetchCustomPrompts(bookmark.userId, "text"), `Content: ${bookmark.asset.content}`, serverConfig.inference.contextLength, + tagStyle, + curatedTags, ); + setSpanAttributes({ + "inference.model": serverConfig.inference.textModel, + }); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(prompt, "utf8"), + }); return inferenceClient.inferFromText(prompt, { schema: openAIResponseSchema, abortSignal, @@ -231,11 +266,25 @@ async function inferTagsFromText( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, abortSignal: AbortSignal, + tagStyle: ZTagStyle, + inferredTagLang: string, + curatedTags?: string[], ) { - const prompt = await buildPrompt(bookmark); + const prompt = await buildPrompt( + bookmark, + tagStyle, + inferredTagLang, + curatedTags, + ); if (!prompt) { return null; } + setSpanAttributes({ + "inference.model": serverConfig.inference.textModel, + }); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(prompt, "utf8"), + }); return await inferenceClient.inferFromText(prompt, { schema: openAIResponseSchema, abortSignal, @@ -247,10 +296,32 @@ async function inferTags( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, abortSignal: AbortSignal, + tagStyle: ZTagStyle, + inferredTagLang: string, + curatedTags?: string[], ) { + setSpanAttributes({ + "user.id": bookmark.userId, + "bookmark.id": bookmark.id, + "bookmark.url": bookmark.link?.url, + "bookmark.domain": getBookmarkDomain(bookmark.link?.url), + "bookmark.content.type": bookmark.type, + "crawler.statusCode": bookmark.link?.crawlStatusCode ?? undefined, + "inference.tagging.style": tagStyle, + "inference.lang": inferredTagLang, + "inference.type": "tagging", + }); + let response: InferenceResponse | null; if (bookmark.link || bookmark.text) { - response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); + response = await inferTagsFromText( + bookmark, + inferenceClient, + abortSignal, + tagStyle, + inferredTagLang, + curatedTags, + ); } else if (bookmark.asset) { switch (bookmark.asset.assetType) { case "image": @@ -259,6 +330,9 @@ async function inferTags( bookmark, inferenceClient, abortSignal, + tagStyle, + inferredTagLang, + curatedTags, ); break; case "pdf": @@ -267,6 +341,9 @@ async function inferTags( bookmark, inferenceClient, abortSignal, + tagStyle, + inferredTagLang, + curatedTags, ); break; default: @@ -298,6 +375,10 @@ async function inferTags( } return tag.trim(); }); + setSpanAttributes({ + "inference.tagging.numGeneratedTags": tags.length, + "inference.totalTokens": response.totalTokens, + }); return tags; } catch (e) { @@ -317,12 +398,10 @@ async function connectTags( return; } - await db.transaction(async (tx) => { + const res = await db.transaction(async (tx) => { // Attempt to match exiting tags with the new ones const { matchedTagIds, notFoundTagNames } = await (async () => { - const { normalizeTag, sql: normalizedTagSql } = tagNormalizer( - bookmarkTags.name, - ); + const { normalizeTag } = tagNormalizer(); const normalizedInferredTags = inferredTags.map((t) => ({ originalTag: t, normalizedTag: normalizeTag(t), @@ -332,7 +411,7 @@ async function connectTags( where: and( eq(bookmarkTags.userId, userId), inArray( - normalizedTagSql, + bookmarkTags.normalizedName, normalizedInferredTags.map((t) => t.normalizedTag), ), ), @@ -394,17 +473,19 @@ async function connectTags( .onConflictDoNothing() .returning(); - await triggerRuleEngineOnEvent(bookmarkId, [ - ...detachedTags.map((t) => ({ - type: "tagRemoved" as const, - tagId: t.tagId, - })), - ...attachedTags.map((t) => ({ - type: "tagAdded" as const, - tagId: t.tagId, - })), - ]); + return { detachedTags, attachedTags }; }); + + await triggerRuleEngineOnEvent(bookmarkId, [ + ...res.detachedTags.map((t) => ({ + type: "tagRemoved" as const, + tagId: t.tagId, + })), + ...res.attachedTags.map((t) => ({ + type: "tagAdded" as const, + tagId: t.tagId, + })), + ]); } async function fetchBookmark(linkId: string) { @@ -437,6 +518,37 @@ export async function runTagging( ); } + // Check user-level preference + const userSettings = await db.query.users.findFirst({ + where: eq(users.id, bookmark.userId), + columns: { + autoTaggingEnabled: true, + tagStyle: true, + curatedTagIds: true, + inferredTagLang: true, + }, + }); + + if (userSettings?.autoTaggingEnabled === false) { + logger.debug( + `[inference][${jobId}] Skipping tagging job for bookmark with id "${bookmarkId}" because user has disabled auto-tagging.`, + ); + return; + } + + // Resolve curated tag names if configured + let curatedTagNames: string[] | undefined; + if (userSettings?.curatedTagIds && userSettings.curatedTagIds.length > 0) { + const tags = await db.query.bookmarkTags.findMany({ + where: and( + eq(bookmarkTags.userId, bookmark.userId), + inArray(bookmarkTags.id, userSettings.curatedTagIds), + ), + columns: { name: true }, + }); + curatedTagNames = tags.map((t) => t.name); + } + logger.info( `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, ); @@ -446,6 +558,9 @@ export async function runTagging( bookmark, inferenceClient, job.abortSignal, + userSettings?.tagStyle ?? "as-generated", + userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang, + curatedTagNames, ); if (tags === null) { diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 98a9de74..ecf733cd 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -1,6 +1,7 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import { buildImpersonatingAuthedContext } from "trpc"; +import { withWorkerTracing } from "workerTracing"; import type { ZRuleEngineRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -20,7 +21,7 @@ export class RuleEngineWorker { const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>( RuleEngineQueue, { - run: runRuleEngine, + run: withWorkerTracing("ruleEngineWorker.run", runRuleEngine), onComplete: (job) => { workerStatsCounter.labels("ruleEngine", "completed").inc(); const jobId = job.id; @@ -66,14 +67,21 @@ async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) { const bookmark = await getBookmarkUserId(bookmarkId); if (!bookmark) { - throw new Error( - `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, + logger.info( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found, skipping`, ); + return; } const userId = bookmark.userId; const authedCtx = await buildImpersonatingAuthedContext(userId); const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); + if (!ruleEngine) { + logger.info( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found during rule evaluation, skipping`, + ); + return; + } const results = ( await Promise.all(events.map((event) => ruleEngine.onEvent(event))) diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index fed30c9b..b0608dce 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; +import { withWorkerTracing } from "workerTracing"; import type { ZSearchIndexingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; @@ -25,7 +26,7 @@ export class SearchIndexingWorker { (await getQueueClient())!.createRunner<ZSearchIndexingRequest>( SearchIndexingQueue, { - run: runSearchIndexing, + run: withWorkerTracing("searchWorker.run", runSearchIndexing), onComplete: (job) => { workerStatsCounter.labels("search", "completed").inc(); const jobId = job.id; @@ -55,7 +56,11 @@ export class SearchIndexingWorker { } } -async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) { +async function runIndex( + searchClient: SearchIndexClient, + bookmarkId: string, + batch: boolean, +) { const bookmark = await db.query.bookmarks.findFirst({ where: eq(bookmarks.id, bookmarkId), with: { @@ -106,11 +111,15 @@ async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) { tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), }; - await searchClient.addDocuments([document]); + await searchClient.addDocuments([document], { batch }); } -async function runDelete(searchClient: SearchIndexClient, bookmarkId: string) { - await searchClient.deleteDocuments([bookmarkId]); +async function runDelete( + searchClient: SearchIndexClient, + bookmarkId: string, + batch: boolean, +) { + await searchClient.deleteDocuments([bookmarkId], { batch }); } async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { @@ -132,17 +141,20 @@ async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { } const bookmarkId = request.data.bookmarkId; + // Disable batching on retries (runNumber > 0) for improved reliability + const batch = job.runNumber === 0; + logger.info( - `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, + `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} (run ${job.runNumber}, batch=${batch}) ...`, ); switch (request.data.type) { case "index": { - await runIndex(searchClient, bookmarkId); + await runIndex(searchClient, bookmarkId, batch); break; } case "delete": { - await runDelete(searchClient, bookmarkId); + await runDelete(searchClient, bookmarkId, batch); break; } } diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index 03525fdf..1ffbf674 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -4,6 +4,7 @@ import path from "path"; import { execa } from "execa";
import { workerStatsCounter } from "metrics";
import { getProxyAgent, validateUrl } from "network";
+import { withWorkerTracing } from "workerTracing";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
@@ -35,7 +36,7 @@ export class VideoWorker { return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
- run: runWorker,
+ run: withWorkerTracing("videoWorker.run", runWorker),
onComplete: async (job) => {
workerStatsCounter.labels("video", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 0d661372..875a0ac6 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -1,6 +1,7 @@ import { eq } from "drizzle-orm"; import { workerStatsCounter } from "metrics"; import { fetchWithProxy } from "network"; +import { withWorkerTracing } from "workerTracing"; import { db } from "@karakeep/db"; import { bookmarks, webhooksTable } from "@karakeep/db/schema"; @@ -19,7 +20,7 @@ export class WebhookWorker { const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>( WebhookQueue, { - run: runWebhook, + run: withWorkerTracing("webhookWorker.run", runWebhook), onComplete: async (job) => { workerStatsCounter.labels("webhook", "completed").inc(); const jobId = job.id; |
