aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts84
-rw-r--r--apps/workers/metascraper-plugins/metascraper-amazon-improved.ts77
-rw-r--r--apps/workers/metascraper-plugins/metascraper-reddit.ts358
-rw-r--r--apps/workers/metrics.ts14
-rw-r--r--apps/workers/network.ts11
-rw-r--r--apps/workers/package.json3
-rw-r--r--apps/workers/workerTracing.ts43
-rw-r--r--apps/workers/workerUtils.ts4
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts6
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts72
-rw-r--r--apps/workers/workers/backupWorker.ts3
-rw-r--r--apps/workers/workers/crawlerWorker.ts1950
-rw-r--r--apps/workers/workers/feedWorker.ts7
-rw-r--r--apps/workers/workers/importWorker.ts698
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts3
-rw-r--r--apps/workers/workers/inference/summarize.ts54
-rw-r--r--apps/workers/workers/inference/tagging.ts165
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts14
-rw-r--r--apps/workers/workers/searchWorker.ts28
-rw-r--r--apps/workers/workers/videoWorker.ts3
-rw-r--r--apps/workers/workers/webhookWorker.ts3
21 files changed, 2709 insertions, 891 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index b605b50f..c7b9533d 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -3,9 +3,22 @@ import "dotenv/config";
import { buildServer } from "server";
import {
+ AdminMaintenanceQueue,
+ AssetPreprocessingQueue,
+ BackupQueue,
+ FeedQueue,
+ initTracing,
+ LinkCrawlerQueue,
loadAllPlugins,
+ LowPriorityCrawlerQueue,
+ OpenAIQueue,
prepareQueue,
+ RuleEngineQueue,
+ SearchIndexingQueue,
+ shutdownTracing,
startQueue,
+ VideoWorkerQueue,
+ WebhookQueue,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
@@ -16,6 +29,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
+import { ImportWorker } from "./workers/importWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
@@ -23,19 +37,53 @@ import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
const workerBuilders = {
- crawler: () => CrawlerWorker.build(),
- inference: () => OpenAiWorker.build(),
- search: () => SearchIndexingWorker.build(),
- adminMaintenance: () => AdminMaintenanceWorker.build(),
- video: () => VideoWorker.build(),
- feed: () => FeedWorker.build(),
- assetPreprocessing: () => AssetPreprocessingWorker.build(),
- webhook: () => WebhookWorker.build(),
- ruleEngine: () => RuleEngineWorker.build(),
- backup: () => BackupWorker.build(),
+ crawler: async () => {
+ await LinkCrawlerQueue.ensureInit();
+ return CrawlerWorker.build(LinkCrawlerQueue);
+ },
+ lowPriorityCrawler: async () => {
+ await LowPriorityCrawlerQueue.ensureInit();
+ return CrawlerWorker.build(LowPriorityCrawlerQueue);
+ },
+ inference: async () => {
+ await OpenAIQueue.ensureInit();
+ return OpenAiWorker.build();
+ },
+ search: async () => {
+ await SearchIndexingQueue.ensureInit();
+ return SearchIndexingWorker.build();
+ },
+ adminMaintenance: async () => {
+ await AdminMaintenanceQueue.ensureInit();
+ return AdminMaintenanceWorker.build();
+ },
+ video: async () => {
+ await VideoWorkerQueue.ensureInit();
+ return VideoWorker.build();
+ },
+ feed: async () => {
+ await FeedQueue.ensureInit();
+ return FeedWorker.build();
+ },
+ assetPreprocessing: async () => {
+ await AssetPreprocessingQueue.ensureInit();
+ return AssetPreprocessingWorker.build();
+ },
+ webhook: async () => {
+ await WebhookQueue.ensureInit();
+ return WebhookWorker.build();
+ },
+ ruleEngine: async () => {
+ await RuleEngineQueue.ensureInit();
+ return RuleEngineWorker.build();
+ },
+ backup: async () => {
+ await BackupQueue.ensureInit();
+ return BackupWorker.build();
+ },
} as const;
-type WorkerName = keyof typeof workerBuilders;
+type WorkerName = keyof typeof workerBuilders | "import";
const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
@@ -51,6 +99,7 @@ function isWorkerEnabled(name: WorkerName) {
async function main() {
await loadAllPlugins();
+ initTracing("workers");
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
await prepareQueue();
@@ -75,10 +124,19 @@ async function main() {
BackupSchedulingWorker.start();
}
+ // Start import polling worker
+ let importWorker: ImportWorker | null = null;
+ let importWorkerPromise: Promise<void> | null = null;
+ if (isWorkerEnabled("import")) {
+ importWorker = new ImportWorker();
+ importWorkerPromise = importWorker.start();
+ }
+
await Promise.any([
Promise.all([
...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
+ ...(importWorkerPromise ? [importWorkerPromise] : []),
]),
shutdownPromise,
]);
@@ -93,10 +151,14 @@ async function main() {
if (workers.some((w) => w.name === "backup")) {
BackupSchedulingWorker.stop();
}
+ if (importWorker) {
+ importWorker.stop();
+ }
for (const { worker } of workers) {
worker.stop();
}
await httpServer.stop();
+ await shutdownTracing();
process.exit(0);
}
diff --git a/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts b/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts
new file mode 100644
index 00000000..ea9bf2e9
--- /dev/null
+++ b/apps/workers/metascraper-plugins/metascraper-amazon-improved.ts
@@ -0,0 +1,77 @@
+import type { Rules } from "metascraper";
+
+/**
+ * Improved Amazon metascraper plugin that fixes image extraction.
+ *
+ * The default metascraper-amazon package uses `.a-dynamic-image` selector
+ * which matches the FIRST element with that class. On amazon.com pages,
+ * this is often the Prime logo instead of the product image.
+ *
+ * This plugin uses more specific selectors to target the actual product
+ * image:
+ * - #landingImage: The main product image ID
+ * - #imgTagWrapperId img: Fallback container for product images
+ * - #imageBlock img: Additional fallback for newer Amazon layouts
+ *
+ * By placing this plugin BEFORE metascraperAmazon() in the plugin chain,
+ * we ensure the correct image is extracted while keeping all other Amazon
+ * metadata (title, brand, description) from the original plugin.
+ */
+
+const REGEX_AMAZON_URL =
+ /https?:\/\/(.*amazon\..*\/.*|.*amzn\..*\/.*|.*a\.co\/.*)/i;
+
+const test = ({ url }: { url: string }): boolean => REGEX_AMAZON_URL.test(url);
+
+const metascraperAmazonImproved = () => {
+ const rules: Rules = {
+ pkgName: "metascraper-amazon-improved",
+ test,
+ image: ({ htmlDom }) => {
+ // Try the main product image ID first (most reliable)
+ // Prefer data-old-hires attribute for high-resolution images
+ const landingImageHires = htmlDom("#landingImage").attr("data-old-hires");
+ if (landingImageHires) {
+ return landingImageHires;
+ }
+
+ const landingImageSrc = htmlDom("#landingImage").attr("src");
+ if (landingImageSrc) {
+ return landingImageSrc;
+ }
+
+ // Fallback to image block container
+ const imgTagHires = htmlDom("#imgTagWrapperId img").attr(
+ "data-old-hires",
+ );
+ if (imgTagHires) {
+ return imgTagHires;
+ }
+
+ const imgTagSrc = htmlDom("#imgTagWrapperId img").attr("src");
+ if (imgTagSrc) {
+ return imgTagSrc;
+ }
+
+ // Additional fallback for newer Amazon layouts
+ const imageBlockHires = htmlDom("#imageBlock img")
+ .first()
+ .attr("data-old-hires");
+ if (imageBlockHires) {
+ return imageBlockHires;
+ }
+
+ const imageBlockSrc = htmlDom("#imageBlock img").first().attr("src");
+ if (imageBlockSrc) {
+ return imageBlockSrc;
+ }
+
+ // Return undefined to allow next plugin to try
+ return undefined;
+ },
+ };
+
+ return rules;
+};
+
+export default metascraperAmazonImproved;
diff --git a/apps/workers/metascraper-plugins/metascraper-reddit.ts b/apps/workers/metascraper-plugins/metascraper-reddit.ts
index 1fbee3ea..a5de5fe3 100644
--- a/apps/workers/metascraper-plugins/metascraper-reddit.ts
+++ b/apps/workers/metascraper-plugins/metascraper-reddit.ts
@@ -1,4 +1,8 @@
-import type { Rules } from "metascraper";
+import type { CheerioAPI } from "cheerio";
+import type { Rules, RulesOptions } from "metascraper";
+import { decode as decodeHtmlEntities } from "html-entities";
+import { fetchWithProxy } from "network";
+import { z } from "zod";
import logger from "@karakeep/shared/logger";
@@ -28,15 +32,267 @@ import logger from "@karakeep/shared/logger";
* will return 'undefined' and the next plugin
* should continue to attempt to extract images.
*
- * Note: there is another way to accomplish this.
- * If '.json' is appended to a Reddit url, the
- * server will provide a JSON document summarizing
- * the post. If there are preview images, they are
- * included in a section of the JSON. To prevent
- * additional server requests, this method is not
- * currently being used.
+ * We also attempt to fetch the Reddit JSON response
+ * (by appending '.json' to the URL) to grab the
+ * title and preview images directly from the API.
**/
+const redditPreviewImageSchema = z.object({
+ source: z.object({ url: z.string().optional() }).optional(),
+ resolutions: z.array(z.object({ url: z.string().optional() })).optional(),
+});
+
+const redditMediaMetadataItemSchema = z.object({
+ s: z.object({ u: z.string().optional() }).optional(),
+ p: z.array(z.object({ u: z.string().optional() })).optional(),
+});
+
+const redditPostSchema = z.object({
+ title: z.string().optional(),
+ preview: z
+ .object({ images: z.array(redditPreviewImageSchema).optional() })
+ .optional(),
+ url_overridden_by_dest: z.string().optional(),
+ url: z.string().optional(),
+ thumbnail: z.string().optional(),
+ media_metadata: z.record(redditMediaMetadataItemSchema).optional(),
+ author: z.string().optional(),
+ created_utc: z.number().optional(),
+ selftext: z.string().nullish(),
+ selftext_html: z.string().nullish(),
+ subreddit_name_prefixed: z.string().optional(),
+});
+
+type RedditPostData = z.infer<typeof redditPostSchema>;
+
+const redditResponseSchema = z.array(
+ z.object({
+ data: z.object({
+ children: z.array(z.object({ data: redditPostSchema })).optional(),
+ }),
+ }),
+);
+
+interface RedditFetchResult {
+ fetched: boolean;
+ post?: RedditPostData;
+}
+
+const REDDIT_CACHE_TTL_MS = 60 * 1000; // 1 minute TTL to avoid stale data
+
+interface RedditCacheEntry {
+ expiresAt: number;
+ promise: Promise<RedditFetchResult>;
+}
+
+const redditJsonCache = new Map<string, RedditCacheEntry>();
+
+const purgeExpiredCacheEntries = (now: number) => {
+ for (const [key, entry] of redditJsonCache.entries()) {
+ if (entry.expiresAt <= now) {
+ redditJsonCache.delete(key);
+ }
+ }
+};
+
+const decodeRedditUrl = (url?: string): string | undefined => {
+ if (!url) {
+ return undefined;
+ }
+ const decoded = decodeHtmlEntities(url);
+ return decoded || undefined;
+};
+
+const buildJsonUrl = (url: string): string => {
+ const urlObj = new URL(url);
+
+ if (!urlObj.pathname.endsWith(".json")) {
+ urlObj.pathname = urlObj.pathname.replace(/\/?$/, ".json");
+ }
+
+ return urlObj.toString();
+};
+
+const extractImageFromMediaMetadata = (
+ media_metadata?: RedditPostData["media_metadata"],
+): string | undefined => {
+ if (!media_metadata) {
+ return undefined;
+ }
+ const firstItem = Object.values(media_metadata)[0];
+ if (!firstItem) {
+ return undefined;
+ }
+
+ return (
+ decodeRedditUrl(firstItem.s?.u) ??
+ decodeRedditUrl(firstItem.p?.[0]?.u) ??
+ undefined
+ );
+};
+
+const isRedditImageHost = (urlCandidate: string): boolean => {
+ try {
+ const hostname = new URL(urlCandidate).hostname;
+ return hostname.includes("redd.it");
+ } catch {
+ return false;
+ }
+};
+
+const extractImageFromPost = (post: RedditPostData): string | undefined => {
+ const previewImage = post.preview?.images?.[0];
+ const previewUrl =
+ decodeRedditUrl(previewImage?.source?.url) ??
+ decodeRedditUrl(previewImage?.resolutions?.[0]?.url);
+ if (previewUrl) {
+ return previewUrl;
+ }
+
+ const mediaUrl = extractImageFromMediaMetadata(post.media_metadata);
+ if (mediaUrl) {
+ return mediaUrl;
+ }
+
+ const directUrl =
+ decodeRedditUrl(post.url_overridden_by_dest) ??
+ decodeRedditUrl(post.url) ??
+ decodeRedditUrl(post.thumbnail);
+
+ if (directUrl && isRedditImageHost(directUrl)) {
+ return directUrl;
+ }
+
+ return undefined;
+};
+
+const extractTitleFromPost = (post: RedditPostData): string | undefined =>
+ post.title?.trim() || undefined;
+
+const extractAuthorFromPost = (post: RedditPostData): string | undefined =>
+ post.author?.trim() || undefined;
+
+const extractDateFromPost = (post: RedditPostData): string | undefined => {
+ if (!post.created_utc) {
+ return undefined;
+ }
+ const date = new Date(post.created_utc * 1000);
+ return Number.isNaN(date.getTime()) ? undefined : date.toISOString();
+};
+
+const extractPublisherFromPost = (post: RedditPostData): string | undefined =>
+ post.subreddit_name_prefixed?.trim() || "Reddit";
+
+const REDDIT_LOGO_URL =
+ "https://www.redditstatic.com/desktop2x/img/favicon/android-icon-192x192.png";
+
+const fallbackDomImage = ({ htmlDom }: { htmlDom: CheerioAPI }) => {
+ // 'preview' subdomain images are more likely to be what we're after
+ // but it could be in the 'i' subdomain.
+ // returns undefined if neither exists
+ const previewImages = htmlDom('img[src*="preview.redd.it"]')
+ .map((_, el) => htmlDom(el).attr("src"))
+ .get();
+ const iImages = htmlDom('img[src*="i.redd.it"]')
+ .map((_, el) => htmlDom(el).attr("src"))
+ .get();
+ return previewImages[0] || iImages[0];
+};
+
+const fallbackDomTitle = ({ htmlDom }: { htmlDom: CheerioAPI }) => {
+ const title: string | undefined = htmlDom("shreddit-title[title]")
+ .first()
+ .attr("title");
+ const postTitle: string | undefined =
+ title ?? htmlDom("shreddit-post[post-title]").first().attr("post-title");
+ return postTitle ? postTitle.trim() : undefined;
+};
+
+const fetchRedditPostData = async (url: string): Promise<RedditFetchResult> => {
+ const cached = redditJsonCache.get(url);
+ const now = Date.now();
+
+ purgeExpiredCacheEntries(now);
+
+ if (cached && cached.expiresAt > now) {
+ return cached.promise;
+ }
+
+ const promise = (async () => {
+ let jsonUrl: string;
+ try {
+ jsonUrl = buildJsonUrl(url);
+ } catch (error) {
+ logger.warn(
+ "[MetascraperReddit] Failed to construct Reddit JSON URL",
+ error,
+ );
+ return { fetched: false };
+ }
+
+ let response;
+ try {
+ response = await fetchWithProxy(jsonUrl, {
+ headers: { accept: "application/json" },
+ });
+ } catch (error) {
+ logger.warn(
+ `[MetascraperReddit] Failed to fetch Reddit JSON for ${jsonUrl}`,
+ error,
+ );
+ return { fetched: false };
+ }
+
+ if (response.status === 403) {
+ // API forbidden; fall back to DOM scraping.
+ return { fetched: false };
+ }
+
+ if (!response.ok) {
+ logger.warn(
+ `[MetascraperReddit] Reddit JSON request failed for ${jsonUrl} with status ${response.status}`,
+ );
+ return { fetched: false };
+ }
+
+ let payload: unknown;
+ try {
+ payload = await response.json();
+ } catch (error) {
+ logger.warn(
+ `[MetascraperReddit] Failed to parse Reddit JSON for ${jsonUrl}`,
+ error,
+ );
+ return { fetched: false };
+ }
+
+ const parsed = redditResponseSchema.safeParse(payload);
+ if (!parsed.success) {
+ logger.warn(
+ "[MetascraperReddit] Reddit JSON schema validation failed",
+ parsed.error,
+ );
+ return { fetched: false };
+ }
+
+ const firstListingWithChildren = parsed.data.find(
+ (listing) => (listing.data.children?.length ?? 0) > 0,
+ );
+
+ return {
+ fetched: true,
+ post: firstListingWithChildren?.data.children?.[0]?.data,
+ };
+ })();
+
+ redditJsonCache.set(url, {
+ promise,
+ expiresAt: now + REDDIT_CACHE_TTL_MS,
+ });
+
+ return promise;
+};
+
const domainFromUrl = (url: string): string => {
/**
* First-party metascraper plugins import metascraper-helpers,
@@ -71,27 +327,71 @@ const metascraperReddit = () => {
const rules: Rules = {
pkgName: "metascraper-reddit",
test,
- image: ({ htmlDom }) => {
- // 'preview' subdomain images are more likely to be what we're after
- // but it could be in the 'i' subdomain.
- // returns undefined if neither exists
- const previewImages = htmlDom('img[src*="preview.redd.it"]')
- .map((i, el) => htmlDom(el).attr("src"))
- .get();
- const iImages = htmlDom('img[src*="i.redd.it"]')
- .map((i, el) => htmlDom(el).attr("src"))
- .get();
- return previewImages[0] || iImages[0];
- },
- title: ({ htmlDom }) => {
- const title: string | undefined = htmlDom("shreddit-title[title]")
- .first()
- .attr("title");
- const postTitle: string | undefined =
- title ??
- htmlDom("shreddit-post[post-title]").first().attr("post-title");
- return postTitle ? postTitle.trim() : undefined;
- },
+ image: (async ({ url, htmlDom }: { url: string; htmlDom: CheerioAPI }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ const redditImage = extractImageFromPost(result.post);
+ if (redditImage) {
+ return redditImage;
+ }
+ }
+
+ // If we successfully fetched JSON but found no Reddit image,
+ // avoid falling back to random DOM images.
+ if (result.fetched) {
+ return undefined;
+ }
+
+ return fallbackDomImage({ htmlDom });
+ }) as unknown as RulesOptions,
+ title: (async ({ url, htmlDom }: { url: string; htmlDom: CheerioAPI }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ const redditTitle = extractTitleFromPost(result.post);
+ if (redditTitle) {
+ return redditTitle;
+ }
+ }
+
+ return fallbackDomTitle({ htmlDom });
+ }) as unknown as RulesOptions,
+ author: (async ({ url }: { url: string }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ return extractAuthorFromPost(result.post);
+ }
+ return undefined;
+ }) as unknown as RulesOptions,
+ datePublished: (async ({ url }: { url: string }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ return extractDateFromPost(result.post);
+ }
+ return undefined;
+ }) as unknown as RulesOptions,
+ publisher: (async ({ url }: { url: string }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ return extractPublisherFromPost(result.post);
+ }
+ return undefined;
+ }) as unknown as RulesOptions,
+ logo: (async ({ url }: { url: string }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ return REDDIT_LOGO_URL;
+ }
+ return undefined;
+ }) as unknown as RulesOptions,
+ readableContentHtml: (async ({ url }: { url: string }) => {
+ const result = await fetchRedditPostData(url);
+ if (result.post) {
+ const decoded = decodeHtmlEntities(result.post.selftext_html ?? "");
+ // The post has no content, return the title
+ return (decoded || result.post.title) ?? null;
+ }
+ return undefined;
+ }) as unknown as RulesOptions,
};
return rules;
diff --git a/apps/workers/metrics.ts b/apps/workers/metrics.ts
index 3dc4d2c0..42b5aa46 100644
--- a/apps/workers/metrics.ts
+++ b/apps/workers/metrics.ts
@@ -1,7 +1,7 @@
import { prometheus } from "@hono/prometheus";
-import { Counter, Registry } from "prom-client";
+import { Counter, Histogram, Registry } from "prom-client";
-const registry = new Registry();
+export const registry = new Registry();
export const { printMetrics } = prometheus({
registry: registry,
@@ -21,5 +21,15 @@ export const crawlerStatusCodeCounter = new Counter({
labelNames: ["status_code"],
});
+export const bookmarkCrawlLatencyHistogram = new Histogram({
+ name: "karakeep_bookmark_crawl_latency_seconds",
+ help: "Latency from bookmark creation to crawl completion (excludes recrawls and imports)",
+ buckets: [
+ 0.1, 0.25, 0.5, 1, 2.5, 5, 7.5, 10, 15, 20, 30, 45, 60, 90, 120, 180, 300,
+ 600, 900, 1200,
+ ],
+});
+
registry.registerMetric(workerStatsCounter);
registry.registerMetric(crawlerStatusCodeCounter);
+registry.registerMetric(bookmarkCrawlLatencyHistogram);
diff --git a/apps/workers/network.ts b/apps/workers/network.ts
index 0dc46da4..2ef8483f 100644
--- a/apps/workers/network.ts
+++ b/apps/workers/network.ts
@@ -86,6 +86,15 @@ function isAddressForbidden(address: string): boolean {
return DISALLOWED_IP_RANGES.has(parsed.range());
}
+export function getBookmarkDomain(url?: string | null): string | undefined {
+ if (!url) return undefined;
+ try {
+ return new URL(url).hostname;
+ } catch {
+ return undefined;
+ }
+}
+
export type UrlValidationResult =
| { ok: true; url: URL }
| { ok: false; reason: string };
@@ -163,7 +172,7 @@ export async function validateUrl(
if (isAddressForbidden(hostname)) {
return {
ok: false,
- reason: `Refusing to access disallowed IP address ${hostname} (requested via ${parsedUrl.toString()})`,
+ reason: `Refusing to access disallowed IP address ${hostname} (requested via ${parsedUrl.toString()}). You can use CRAWLER_ALLOWED_INTERNAL_HOSTNAMES to allowlist specific hostnames for internal access.`,
} as const;
}
return { ok: true, url: parsedUrl } as const;
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 7a5a1c81..fdec2ebf 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -22,6 +22,7 @@
"drizzle-orm": "^0.44.2",
"execa": "9.3.1",
"hono": "^4.10.6",
+ "html-entities": "^2.6.0",
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.6",
"ipaddr.js": "^2.2.0",
@@ -52,7 +53,7 @@
"prom-client": "^15.1.3",
"puppeteer-extra-plugin-stealth": "^2.11.2",
"rss-parser": "^3.13.0",
- "tesseract.js": "^5.1.1",
+ "tesseract.js": "^7.0.0",
"tsx": "^4.8.1",
"typescript": "^5.9",
"zod": "^3.24.2"
diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts
new file mode 100644
index 00000000..3ff16d1c
--- /dev/null
+++ b/apps/workers/workerTracing.ts
@@ -0,0 +1,43 @@
+import type { DequeuedJob } from "@karakeep/shared/queueing";
+import { getTracer, withSpan } from "@karakeep/shared-server";
+
+const tracer = getTracer("@karakeep/workers");
+
+type WorkerRunFn<TData, TResult = void> = (
+ job: DequeuedJob<TData>,
+) => Promise<TResult>;
+
+/**
+ * Wraps a worker run function with OpenTelemetry tracing.
+ * Creates a span for each job execution and automatically handles error recording.
+ *
+ * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run")
+ * @param fn - The worker run function to wrap
+ * @returns A wrapped function that executes within a traced span
+ *
+ * @example
+ * ```ts
+ * const run = withWorkerTracing("feedWorker.run", async (job) => {
+ * // Your worker logic here
+ * });
+ * ```
+ */
+export function withWorkerTracing<TData, TResult = void>(
+ name: string,
+ fn: WorkerRunFn<TData, TResult>,
+): WorkerRunFn<TData, TResult> {
+ return async (job: DequeuedJob<TData>): Promise<TResult> => {
+ return await withSpan(
+ tracer,
+ name,
+ {
+ attributes: {
+ "job.id": job.id,
+ "job.priority": job.priority,
+ "job.runNumber": job.runNumber,
+ },
+ },
+ () => fn(job),
+ );
+ };
+}
diff --git a/apps/workers/workerUtils.ts b/apps/workers/workerUtils.ts
index 3eaf5b4b..48e3b277 100644
--- a/apps/workers/workerUtils.ts
+++ b/apps/workers/workerUtils.ts
@@ -31,9 +31,13 @@ export async function getBookmarkDetails(bookmarkId: string) {
return {
url: bookmark.link.url,
userId: bookmark.userId,
+ createdAt: bookmark.createdAt,
+ crawledAt: bookmark.link.crawledAt,
screenshotAssetId: bookmark.assets.find(
(a) => a.assetType == AssetTypes.LINK_SCREENSHOT,
)?.id,
+ pdfAssetId: bookmark.assets.find((a) => a.assetType == AssetTypes.LINK_PDF)
+ ?.id,
imageAssetId: bookmark.assets.find(
(a) => a.assetType == AssetTypes.LINK_BANNER_IMAGE,
)?.id,
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
index e5312964..92d52a22 100644
--- a/apps/workers/workers/adminMaintenanceWorker.ts
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -1,4 +1,5 @@
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import {
AdminMaintenanceQueue,
@@ -20,7 +21,10 @@ export class AdminMaintenanceWorker {
(await getQueueClient())!.createRunner<ZAdminMaintenanceTask>(
AdminMaintenanceQueue,
{
- run: runAdminMaintenance,
+ run: withWorkerTracing(
+ "adminMaintenanceWorker.run",
+ runAdminMaintenance,
+ ),
onComplete: (job) => {
workerStatsCounter
.labels(`adminMaintenance:${job.data.type}`, "completed")
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index ff16906d..d12457d3 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -4,6 +4,7 @@ import { workerStatsCounter } from "metrics";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
+import { withWorkerTracing } from "workerTracing";
import type { AssetPreprocessingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -22,7 +23,9 @@ import {
} from "@karakeep/shared-server";
import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
+import { InferenceClientFactory } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
+import { buildOCRPrompt } from "@karakeep/shared/prompts";
import {
DequeuedJob,
EnqueueOptions,
@@ -36,7 +39,7 @@ export class AssetPreprocessingWorker {
(await getQueueClient())!.createRunner<AssetPreprocessingRequest>(
AssetPreprocessingQueue,
{
- run: run,
+ run: withWorkerTracing("assetPreprocessingWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("assetPreprocessing", "completed").inc();
const jobId = job.id;
@@ -62,7 +65,7 @@ export class AssetPreprocessingWorker {
{
concurrency: serverConfig.assetPreprocessing.numWorkers,
pollIntervalMs: 1000,
- timeoutSecs: 30,
+ timeoutSecs: serverConfig.assetPreprocessing.jobTimeoutSec,
},
);
@@ -88,6 +91,36 @@ async function readImageText(buffer: Buffer) {
}
}
+async function readImageTextWithLLM(
+ buffer: Buffer,
+ contentType: string,
+): Promise<string | null> {
+ const inferenceClient = InferenceClientFactory.build();
+ if (!inferenceClient) {
+ logger.warn(
+ "[assetPreprocessing] LLM OCR is enabled but no inference client is configured. Falling back to Tesseract.",
+ );
+ return readImageText(buffer);
+ }
+
+ const base64 = buffer.toString("base64");
+ const prompt = buildOCRPrompt();
+
+ const response = await inferenceClient.inferFromImage(
+ prompt,
+ contentType,
+ base64,
+ { schema: null },
+ );
+
+ const extractedText = response.response.trim();
+ if (!extractedText) {
+ return null;
+ }
+
+ return extractedText;
+}
+
async function readPDFText(buffer: Buffer): Promise<{
text: string;
metadata: Record<string, object>;
@@ -199,6 +232,7 @@ export async function extractAndSavePDFScreenshot(
async function extractAndSaveImageText(
jobId: string,
asset: Buffer,
+ contentType: string,
bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
isFixMode: boolean,
): Promise<boolean> {
@@ -212,16 +246,31 @@ async function extractAndSaveImageText(
}
}
let imageText = null;
- logger.info(
- `[assetPreprocessing][${jobId}] Attempting to extract text from image.`,
- );
- try {
- imageText = await readImageText(asset);
- } catch (e) {
- logger.error(
- `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`,
+
+ if (serverConfig.ocr.useLLM) {
+ logger.info(
+ `[assetPreprocessing][${jobId}] Attempting to extract text from image using LLM OCR.`,
);
+ try {
+ imageText = await readImageTextWithLLM(asset, contentType);
+ } catch (e) {
+ logger.error(
+ `[assetPreprocessing][${jobId}] Failed to read image text with LLM: ${e}`,
+ );
+ }
+ } else {
+ logger.info(
+ `[assetPreprocessing][${jobId}] Attempting to extract text from image using Tesseract.`,
+ );
+ try {
+ imageText = await readImageText(asset);
+ } catch (e) {
+ logger.error(
+ `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`,
+ );
+ }
}
+
if (!imageText) {
return false;
}
@@ -313,7 +362,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
);
}
- const { asset } = await readAsset({
+ const { asset, metadata } = await readAsset({
userId: bookmark.userId,
assetId: bookmark.asset.assetId,
});
@@ -330,6 +379,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
const extractedText = await extractAndSaveImageText(
jobId,
asset,
+ metadata.contentType,
bookmark,
isFixMode,
);
diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts
index c2d1ae5a..01f54b28 100644
--- a/apps/workers/workers/backupWorker.ts
+++ b/apps/workers/workers/backupWorker.ts
@@ -8,6 +8,7 @@ import archiver from "archiver";
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import cron from "node-cron";
+import { withWorkerTracing } from "workerTracing";
import type { ZBackupRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -107,7 +108,7 @@ export class BackupWorker {
const worker = (await getQueueClient())!.createRunner<ZBackupRequest>(
BackupQueue,
{
- run: run,
+ run: withWorkerTracing("backupWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("backup", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 740d5dac..9815571e 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -9,7 +9,7 @@ import { PlaywrightBlocker } from "@ghostery/adblocker-playwright";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
import DOMPurify from "dompurify";
-import { eq } from "drizzle-orm";
+import { and, eq } from "drizzle-orm";
import { execa } from "execa";
import { exitAbortController } from "exit";
import { HttpProxyAgent } from "http-proxy-agent";
@@ -27,9 +27,14 @@ import metascraperTitle from "metascraper-title";
import metascraperUrl from "metascraper-url";
import metascraperX from "metascraper-x";
import metascraperYoutube from "metascraper-youtube";
-import { crawlerStatusCodeCounter, workerStatsCounter } from "metrics";
+import {
+ bookmarkCrawlLatencyHistogram,
+ crawlerStatusCodeCounter,
+ workerStatsCounter,
+} from "metrics";
import {
fetchWithProxy,
+ getBookmarkDomain,
getRandomProxy,
matchesNoProxy,
validateUrl,
@@ -37,6 +42,7 @@ import {
import { Browser, BrowserContextOptions } from "playwright";
import { chromium } from "playwright-extra";
import StealthPlugin from "puppeteer-extra-plugin-stealth";
+import { withWorkerTracing } from "workerTracing";
import { getBookmarkDetails, updateAsset } from "workerUtils";
import { z } from "zod";
@@ -52,12 +58,14 @@ import {
} from "@karakeep/db/schema";
import {
AssetPreprocessingQueue,
- LinkCrawlerQueue,
+ getTracer,
OpenAIQueue,
QuotaService,
+ setSpanAttributes,
triggerSearchReindex,
triggerWebhook,
VideoWorkerQueue,
+ withSpan,
zCrawlLinkRequestSchema,
} from "@karakeep/shared-server";
import {
@@ -75,15 +83,21 @@ import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import {
DequeuedJob,
+ DequeuedJobError,
EnqueueOptions,
getQueueClient,
+ Queue,
+ QueueRetryAfterError,
} from "@karakeep/shared/queueing";
import { getRateLimitClient } from "@karakeep/shared/ratelimiting";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+import metascraperAmazonImproved from "../metascraper-plugins/metascraper-amazon-improved";
import metascraperReddit from "../metascraper-plugins/metascraper-reddit";
+const tracer = getTracer("@karakeep/workers");
+
function abortPromise(signal: AbortSignal): Promise<never> {
if (signal.aborted) {
const p = Promise.reject(signal.reason ?? new Error("AbortError"));
@@ -125,6 +139,7 @@ const metascraperParser = metascraper([
dateModified: true,
datePublished: true,
}),
+ metascraperAmazonImproved(), // Fix image extraction bug - must come before metascraperAmazon()
metascraperAmazon(),
metascraperYoutube({
gotOpts: {
@@ -185,7 +200,7 @@ const cookieSchema = z.object({
const cookiesSchema = z.array(cookieSchema);
interface CrawlerRunResult {
- status: "completed" | "rescheduled";
+ status: "completed";
}
function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] {
@@ -288,57 +303,68 @@ async function launchBrowser() {
}
export class CrawlerWorker {
- static async build() {
- chromium.use(StealthPlugin());
- if (serverConfig.crawler.enableAdblocker) {
- logger.info("[crawler] Loading adblocker ...");
- const globalBlockerResult = await tryCatch(
- PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, {
- path: path.join(os.tmpdir(), "karakeep_adblocker.bin"),
- read: fs.readFile,
- write: fs.writeFile,
- }),
- );
- if (globalBlockerResult.error) {
- logger.error(
- `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`,
- );
- } else {
- globalBlocker = globalBlockerResult.data;
- }
- }
- if (!serverConfig.crawler.browserConnectOnDemand) {
- await launchBrowser();
- } else {
- logger.info(
- "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance",
- );
+ private static initPromise: Promise<void> | null = null;
+
+ private static ensureInitialized() {
+ if (!CrawlerWorker.initPromise) {
+ CrawlerWorker.initPromise = (async () => {
+ chromium.use(StealthPlugin());
+ if (serverConfig.crawler.enableAdblocker) {
+ logger.info("[crawler] Loading adblocker ...");
+ const globalBlockerResult = await tryCatch(
+ PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, {
+ path: path.join(os.tmpdir(), "karakeep_adblocker.bin"),
+ read: fs.readFile,
+ write: fs.writeFile,
+ }),
+ );
+ if (globalBlockerResult.error) {
+ logger.error(
+ `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`,
+ );
+ } else {
+ globalBlocker = globalBlockerResult.data;
+ }
+ }
+ if (!serverConfig.crawler.browserConnectOnDemand) {
+ await launchBrowser();
+ } else {
+ logger.info(
+ "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance",
+ );
+ }
+ await loadCookiesFromFile();
+ })();
}
+ return CrawlerWorker.initPromise;
+ }
+
+ static async build(queue: Queue<ZCrawlLinkRequest>) {
+ await CrawlerWorker.ensureInitialized();
logger.info("Starting crawler worker ...");
- const worker = (await getQueueClient())!.createRunner<
+ const worker = (await getQueueClient()).createRunner<
ZCrawlLinkRequest,
CrawlerRunResult
>(
- LinkCrawlerQueue,
+ queue,
{
- run: runCrawler,
- onComplete: async (job, result) => {
- if (result.status === "rescheduled") {
- logger.info(
- `[Crawler][${job.id}] Rescheduled due to domain rate limiting`,
- );
- return;
- }
+ run: withWorkerTracing("crawlerWorker.run", runCrawler),
+ onComplete: async (job: DequeuedJob<ZCrawlLinkRequest>) => {
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
const bookmarkId = job.data.bookmarkId;
if (bookmarkId) {
- await changeBookmarkStatus(bookmarkId, "success");
+ await db
+ .update(bookmarkLinks)
+ .set({
+ crawlStatus: "success",
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
}
},
- onError: async (job) => {
+ onError: async (job: DequeuedJobError<ZCrawlLinkRequest>) => {
workerStatsCounter.labels("crawler", "failed").inc();
if (job.numRetriesLeft == 0) {
workerStatsCounter.labels("crawler", "failed_permanent").inc();
@@ -349,7 +375,36 @@ export class CrawlerWorker {
);
const bookmarkId = job.data?.bookmarkId;
if (bookmarkId && job.numRetriesLeft == 0) {
- await changeBookmarkStatus(bookmarkId, "failure");
+ await db.transaction(async (tx) => {
+ await tx
+ .update(bookmarkLinks)
+ .set({
+ crawlStatus: "failure",
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
+ await tx
+ .update(bookmarks)
+ .set({
+ taggingStatus: null,
+ })
+ .where(
+ and(
+ eq(bookmarks.id, bookmarkId),
+ eq(bookmarks.taggingStatus, "pending"),
+ ),
+ );
+ await tx
+ .update(bookmarks)
+ .set({
+ summarizationStatus: null,
+ })
+ .where(
+ and(
+ eq(bookmarks.id, bookmarkId),
+ eq(bookmarks.summarizationStatus, "pending"),
+ ),
+ );
+ });
}
},
},
@@ -360,8 +415,6 @@ export class CrawlerWorker {
},
);
- await loadCookiesFromFile();
-
return worker;
}
}
@@ -391,239 +444,300 @@ async function loadCookiesFromFile(): Promise<void> {
type DBAssetType = typeof assets.$inferInsert;
-async function changeBookmarkStatus(
- bookmarkId: string,
- crawlStatus: "success" | "failure",
-) {
- await db
- .update(bookmarkLinks)
- .set({
- crawlStatus,
- })
- .where(eq(bookmarkLinks.id, bookmarkId));
-}
-
async function browserlessCrawlPage(
jobId: string,
url: string,
abortSignal: AbortSignal,
) {
- logger.info(
- `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
- );
- const response = await fetchWithProxy(url, {
- signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
- });
- logger.info(
- `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.browserlessCrawlPage",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ },
+ },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
+ );
+ const response = await fetchWithProxy(url, {
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
+ });
+ logger.info(
+ `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
+ );
+ return {
+ htmlContent: await response.text(),
+ statusCode: response.status,
+ screenshot: undefined,
+ pdf: undefined,
+ url: response.url,
+ };
+ },
);
- return {
- htmlContent: await response.text(),
- statusCode: response.status,
- screenshot: undefined,
- url: response.url,
- };
}
async function crawlPage(
jobId: string,
url: string,
userId: string,
+ forceStorePdf: boolean,
abortSignal: AbortSignal,
): Promise<{
htmlContent: string;
screenshot: Buffer | undefined;
+ pdf: Buffer | undefined;
statusCode: number;
url: string;
}> {
- // Check user's browser crawling setting
- const userData = await db.query.users.findFirst({
- where: eq(users.id, userId),
- columns: { browserCrawlingEnabled: true },
- });
- if (!userData) {
- logger.error(`[Crawler][${jobId}] User ${userId} not found`);
- throw new Error(`User ${userId} not found`);
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.crawlPage",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ "user.id": userId,
+ "crawler.forceStorePdf": forceStorePdf,
+ },
+ },
+ async () => {
+ // Check user's browser crawling setting
+ const userData = await db.query.users.findFirst({
+ where: eq(users.id, userId),
+ columns: { browserCrawlingEnabled: true },
+ });
+ if (!userData) {
+ logger.error(`[Crawler][${jobId}] User ${userId} not found`);
+ throw new Error(`User ${userId} not found`);
+ }
- const browserCrawlingEnabled = userData.browserCrawlingEnabled;
+ const browserCrawlingEnabled = userData.browserCrawlingEnabled;
- if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) {
- return browserlessCrawlPage(jobId, url, abortSignal);
- }
+ if (browserCrawlingEnabled !== null && !browserCrawlingEnabled) {
+ return browserlessCrawlPage(jobId, url, abortSignal);
+ }
- let browser: Browser | undefined;
- if (serverConfig.crawler.browserConnectOnDemand) {
- browser = await startBrowserInstance();
- } else {
- browser = globalBrowser;
- }
- if (!browser) {
- return browserlessCrawlPage(jobId, url, abortSignal);
- }
+ let browser: Browser | undefined;
+ if (serverConfig.crawler.browserConnectOnDemand) {
+ browser = await startBrowserInstance();
+ } else {
+ browser = globalBrowser;
+ }
+ if (!browser) {
+ return browserlessCrawlPage(jobId, url, abortSignal);
+ }
- const proxyConfig = getPlaywrightProxyConfig();
- const isRunningInProxyContext =
- proxyConfig !== undefined &&
- !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []);
- const context = await browser.newContext({
- viewport: { width: 1440, height: 900 },
- userAgent:
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
- proxy: proxyConfig,
- });
+ const proxyConfig = getPlaywrightProxyConfig();
+ const isRunningInProxyContext =
+ proxyConfig !== undefined &&
+ !matchesNoProxy(url, proxyConfig.bypass?.split(",") ?? []);
+ const context = await browser.newContext({
+ viewport: { width: 1440, height: 900 },
+ userAgent:
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
+ proxy: proxyConfig,
+ });
- try {
- if (globalCookies.length > 0) {
- await context.addCookies(globalCookies);
- logger.info(
- `[Crawler][${jobId}] Cookies successfully loaded into browser context`,
- );
- }
+ try {
+ if (globalCookies.length > 0) {
+ await context.addCookies(globalCookies);
+ logger.info(
+ `[Crawler][${jobId}] Cookies successfully loaded into browser context`,
+ );
+ }
- // Create a new page in the context
- const page = await context.newPage();
+ // Create a new page in the context
+ const page = await context.newPage();
- // Apply ad blocking
- if (globalBlocker) {
- await globalBlocker.enableBlockingInPage(page);
- }
+ // Apply ad blocking
+ if (globalBlocker) {
+ await globalBlocker.enableBlockingInPage(page);
+ }
- // Block audio/video resources and disallowed sub-requests
- await page.route("**/*", async (route) => {
- if (abortSignal.aborted) {
- await route.abort("aborted");
- return;
- }
- const request = route.request();
- const resourceType = request.resourceType();
+ // Block audio/video resources and disallowed sub-requests
+ await page.route("**/*", async (route) => {
+ if (abortSignal.aborted) {
+ await route.abort("aborted");
+ return;
+ }
+ const request = route.request();
+ const resourceType = request.resourceType();
- // Block audio/video resources
- if (
- resourceType === "media" ||
- request.headers()["content-type"]?.includes("video/") ||
- request.headers()["content-type"]?.includes("audio/")
- ) {
- await route.abort("aborted");
- return;
- }
+ // Block audio/video resources
+ if (
+ resourceType === "media" ||
+ request.headers()["content-type"]?.includes("video/") ||
+ request.headers()["content-type"]?.includes("audio/")
+ ) {
+ await route.abort("aborted");
+ return;
+ }
- const requestUrl = request.url();
- const requestIsRunningInProxyContext =
- proxyConfig !== undefined &&
- !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []);
- if (
- requestUrl.startsWith("http://") ||
- requestUrl.startsWith("https://")
- ) {
- const validation = await validateUrl(
- requestUrl,
- requestIsRunningInProxyContext,
+ const requestUrl = request.url();
+ const requestIsRunningInProxyContext =
+ proxyConfig !== undefined &&
+ !matchesNoProxy(requestUrl, proxyConfig.bypass?.split(",") ?? []);
+ if (
+ requestUrl.startsWith("http://") ||
+ requestUrl.startsWith("https://")
+ ) {
+ const validation = await validateUrl(
+ requestUrl,
+ requestIsRunningInProxyContext,
+ );
+ if (!validation.ok) {
+ logger.warn(
+ `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`,
+ );
+ await route.abort("blockedbyclient");
+ return;
+ }
+ }
+
+ // Continue with other requests
+ await route.continue();
+ });
+
+ // Navigate to the target URL
+ const navigationValidation = await validateUrl(
+ url,
+ isRunningInProxyContext,
);
- if (!validation.ok) {
- logger.warn(
- `[Crawler][${jobId}] Blocking sub-request to disallowed URL "${requestUrl}": ${validation.reason}`,
+ if (!navigationValidation.ok) {
+ throw new Error(
+ `Disallowed navigation target "${url}": ${navigationValidation.reason}`,
);
- await route.abort("blockedbyclient");
- return;
}
- }
-
- // Continue with other requests
- await route.continue();
- });
+ const targetUrl = navigationValidation.url.toString();
+ logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`);
+ const response = await Promise.race([
+ page.goto(targetUrl, {
+ timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
+ waitUntil: "domcontentloaded",
+ }),
+ abortPromise(abortSignal).then(() => null),
+ ]);
- // Navigate to the target URL
- const navigationValidation = await validateUrl(
- url,
- isRunningInProxyContext,
- );
- if (!navigationValidation.ok) {
- throw new Error(
- `Disallowed navigation target "${url}": ${navigationValidation.reason}`,
- );
- }
- const targetUrl = navigationValidation.url.toString();
- logger.info(`[Crawler][${jobId}] Navigating to "${targetUrl}"`);
- const response = await Promise.race([
- page.goto(targetUrl, {
- timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
- waitUntil: "domcontentloaded",
- }),
- abortPromise(abortSignal).then(() => null),
- ]);
+ logger.info(
+ `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`,
+ );
- logger.info(
- `[Crawler][${jobId}] Successfully navigated to "${targetUrl}". Waiting for the page to load ...`,
- );
+ // Wait until network is relatively idle or timeout after 5 seconds
+ await Promise.race([
+ page
+ .waitForLoadState("networkidle", { timeout: 5000 })
+ .catch(() => ({})),
+ new Promise((resolve) => setTimeout(resolve, 5000)),
+ abortPromise(abortSignal),
+ ]);
- // Wait until network is relatively idle or timeout after 5 seconds
- await Promise.race([
- page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})),
- new Promise((resolve) => setTimeout(resolve, 5000)),
- abortPromise(abortSignal),
- ]);
+ abortSignal.throwIfAborted();
- abortSignal.throwIfAborted();
+ logger.info(
+ `[Crawler][${jobId}] Finished waiting for the page to load.`,
+ );
- logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`);
+ // Extract content from the page
+ const htmlContent = await page.content();
- // Extract content from the page
- const htmlContent = await page.content();
+ abortSignal.throwIfAborted();
- abortSignal.throwIfAborted();
+ logger.info(
+ `[Crawler][${jobId}] Successfully fetched the page content.`,
+ );
- logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`);
+ // Take a screenshot if configured
+ let screenshot: Buffer | undefined = undefined;
+ if (serverConfig.crawler.storeScreenshot) {
+ const { data: screenshotData, error: screenshotError } =
+ await tryCatch(
+ Promise.race<Buffer>([
+ page.screenshot({
+ // If you change this, you need to change the asset type in the store function.
+ type: "jpeg",
+ fullPage: serverConfig.crawler.fullPageScreenshot,
+ quality: 80,
+ }),
+ new Promise((_, reject) =>
+ setTimeout(
+ () =>
+ reject(
+ "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
+ ),
+ serverConfig.crawler.screenshotTimeoutSec * 1000,
+ ),
+ ),
+ abortPromise(abortSignal).then(() => Buffer.from("")),
+ ]),
+ );
+ abortSignal.throwIfAborted();
+ if (screenshotError) {
+ logger.warn(
+ `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`,
+ );
+ } else {
+ logger.info(
+ `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`,
+ );
+ screenshot = screenshotData;
+ }
+ }
- // Take a screenshot if configured
- let screenshot: Buffer | undefined = undefined;
- if (serverConfig.crawler.storeScreenshot) {
- const { data: screenshotData, error: screenshotError } = await tryCatch(
- Promise.race<Buffer>([
- page.screenshot({
- // If you change this, you need to change the asset type in the store function.
- type: "jpeg",
- fullPage: serverConfig.crawler.fullPageScreenshot,
- quality: 80,
- }),
- new Promise((_, reject) =>
- setTimeout(
- () =>
- reject(
- "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
+ // Capture PDF if configured or explicitly requested
+ let pdf: Buffer | undefined = undefined;
+ if (serverConfig.crawler.storePdf || forceStorePdf) {
+ const { data: pdfData, error: pdfError } = await tryCatch(
+ Promise.race<Buffer>([
+ page.pdf({
+ format: "A4",
+ printBackground: true,
+ }),
+ new Promise((_, reject) =>
+ setTimeout(
+ () =>
+ reject(
+ "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC",
+ ),
+ serverConfig.crawler.screenshotTimeoutSec * 1000,
),
- serverConfig.crawler.screenshotTimeoutSec * 1000,
- ),
- ),
- abortPromise(abortSignal).then(() => Buffer.from("")),
- ]),
- );
- abortSignal.throwIfAborted();
- if (screenshotError) {
- logger.warn(
- `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`,
- );
- } else {
- logger.info(
- `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`,
- );
- screenshot = screenshotData;
- }
- }
+ ),
+ abortPromise(abortSignal).then(() => Buffer.from("")),
+ ]),
+ );
+ abortSignal.throwIfAborted();
+ if (pdfError) {
+ logger.warn(
+ `[Crawler][${jobId}] Failed to capture the PDF. Reason: ${pdfError}`,
+ );
+ } else {
+ logger.info(
+ `[Crawler][${jobId}] Finished capturing page content as PDF`,
+ );
+ pdf = pdfData;
+ }
+ }
- return {
- htmlContent,
- statusCode: response?.status() ?? 0,
- screenshot,
- url: page.url(),
- };
- } finally {
- await context.close();
- // Only close the browser if it was created on demand
- if (serverConfig.crawler.browserConnectOnDemand) {
- await browser.close();
- }
- }
+ return {
+ htmlContent,
+ statusCode: response?.status() ?? 0,
+ screenshot,
+ pdf,
+ url: page.url(),
+ };
+ } finally {
+ await context.close();
+ // Only close the browser if it was created on demand
+ if (serverConfig.crawler.browserConnectOnDemand) {
+ await browser.close();
+ }
+ }
+ },
+ );
}
async function extractMetadata(
@@ -631,54 +745,82 @@ async function extractMetadata(
url: string,
jobId: string,
) {
- logger.info(
- `[Crawler][${jobId}] Will attempt to extract metadata from page ...`,
+ return await withSpan(
+ tracer,
+ "crawlerWorker.extractMetadata",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ },
+ },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Will attempt to extract metadata from page ...`,
+ );
+ const meta = await metascraperParser({
+ url,
+ html: htmlContent,
+ // We don't want to validate the URL again as we've already done it by visiting the page.
+ // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params).
+ validateUrl: false,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Done extracting metadata from the page.`,
+ );
+ return meta;
+ },
);
- const meta = await metascraperParser({
- url,
- html: htmlContent,
- // We don't want to validate the URL again as we've already done it by visiting the page.
- // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params).
- validateUrl: false,
- });
- logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`);
- return meta;
}
-function extractReadableContent(
+async function extractReadableContent(
htmlContent: string,
url: string,
jobId: string,
) {
- logger.info(
- `[Crawler][${jobId}] Will attempt to extract readable content ...`,
- );
- const virtualConsole = new VirtualConsole();
- const dom = new JSDOM(htmlContent, { url, virtualConsole });
- let result: { content: string } | null = null;
- try {
- const readableContent = new Readability(dom.window.document).parse();
- if (!readableContent || typeof readableContent.content !== "string") {
- return null;
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.extractReadableContent",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ },
+ },
+ async () => {
+ logger.info(
+ `[Crawler][${jobId}] Will attempt to extract readable content ...`,
+ );
+ const virtualConsole = new VirtualConsole();
+ const dom = new JSDOM(htmlContent, { url, virtualConsole });
+ let result: { content: string } | null = null;
+ try {
+ const readableContent = new Readability(dom.window.document).parse();
+ if (!readableContent || typeof readableContent.content !== "string") {
+ return null;
+ }
- const purifyWindow = new JSDOM("").window;
- try {
- const purify = DOMPurify(purifyWindow);
- const purifiedHTML = purify.sanitize(readableContent.content);
+ const purifyWindow = new JSDOM("").window;
+ try {
+ const purify = DOMPurify(purifyWindow);
+ const purifiedHTML = purify.sanitize(readableContent.content);
- logger.info(`[Crawler][${jobId}] Done extracting readable content.`);
- result = {
- content: purifiedHTML,
- };
- } finally {
- purifyWindow.close();
- }
- } finally {
- dom.window.close();
- }
+ logger.info(`[Crawler][${jobId}] Done extracting readable content.`);
+ result = {
+ content: purifiedHTML,
+ };
+ } finally {
+ purifyWindow.close();
+ }
+ } finally {
+ dom.window.close();
+ }
- return result;
+ return result;
+ },
+ );
}
async function storeScreenshot(
@@ -686,45 +828,111 @@ async function storeScreenshot(
userId: string,
jobId: string,
) {
- if (!serverConfig.crawler.storeScreenshot) {
- logger.info(
- `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`,
- );
- return null;
- }
- if (!screenshot) {
- logger.info(
- `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`,
- );
- return null;
- }
- const assetId = newAssetId();
- const contentType = "image/jpeg";
- const fileName = "screenshot.jpeg";
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storeScreenshot",
+ {
+ attributes: {
+ "job.id": jobId,
+ "user.id": userId,
+ "asset.size": screenshot?.byteLength ?? 0,
+ },
+ },
+ async () => {
+ if (!serverConfig.crawler.storeScreenshot) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`,
+ );
+ return null;
+ }
+ if (!screenshot) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`,
+ );
+ return null;
+ }
+ const assetId = newAssetId();
+ const contentType = "image/jpeg";
+ const fileName = "screenshot.jpeg";
- // Check storage quota before saving the screenshot
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, screenshot.byteLength),
+ // Check storage quota before saving the screenshot
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, screenshot.byteLength),
+ );
+
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
+
+ await saveAsset({
+ userId,
+ assetId,
+ metadata: { contentType, fileName },
+ asset: screenshot,
+ quotaApproved,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`,
+ );
+ return { assetId, contentType, fileName, size: screenshot.byteLength };
+ },
);
+}
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`,
- );
- return null;
- }
+async function storePdf(
+ pdf: Buffer | undefined,
+ userId: string,
+ jobId: string,
+) {
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storePdf",
+ {
+ attributes: {
+ "job.id": jobId,
+ "user.id": userId,
+ "asset.size": pdf?.byteLength ?? 0,
+ },
+ },
+ async () => {
+ if (!pdf) {
+ logger.info(
+ `[Crawler][${jobId}] Skipping storing the PDF as it's empty.`,
+ );
+ return null;
+ }
+ const assetId = newAssetId();
+ const contentType = "application/pdf";
+ const fileName = "page.pdf";
- await saveAsset({
- userId,
- assetId,
- metadata: { contentType, fileName },
- asset: screenshot,
- quotaApproved,
- });
- logger.info(
- `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`,
+ // Check storage quota before saving the PDF
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, pdf.byteLength),
+ );
+
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping PDF storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
+
+ await saveAsset({
+ userId,
+ assetId,
+ metadata: { contentType, fileName },
+ asset: pdf,
+ quotaApproved,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Stored the PDF as assetId: ${assetId} (${pdf.byteLength} bytes)`,
+ );
+ return { assetId, contentType, fileName, size: pdf.byteLength };
+ },
);
- return { assetId, contentType, fileName, size: screenshot.byteLength };
}
async function downloadAndStoreFile(
@@ -734,91 +942,106 @@ async function downloadAndStoreFile(
fileType: string,
abortSignal: AbortSignal,
) {
- let assetPath: string | undefined;
- try {
- logger.info(
- `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`,
- );
- const response = await fetchWithProxy(url, {
- signal: abortSignal,
- });
- if (!response.ok || response.body == null) {
- throw new Error(`Failed to download ${fileType}: ${response.status}`);
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.downloadAndStoreFile",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ "user.id": userId,
+ "asset.type": fileType,
+ },
+ },
+ async () => {
+ let assetPath: string | undefined;
+ try {
+ logger.info(
+ `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`,
+ );
+ const response = await fetchWithProxy(url, {
+ signal: abortSignal,
+ });
+ if (!response.ok || response.body == null) {
+ throw new Error(`Failed to download ${fileType}: ${response.status}`);
+ }
- const contentType = normalizeContentType(
- response.headers.get("content-type"),
- );
- if (!contentType) {
- throw new Error("No content type in the response");
- }
+ const contentType = normalizeContentType(
+ response.headers.get("content-type"),
+ );
+ if (!contentType) {
+ throw new Error("No content type in the response");
+ }
- const assetId = newAssetId();
- assetPath = path.join(os.tmpdir(), assetId);
+ const assetId = newAssetId();
+ assetPath = path.join(os.tmpdir(), assetId);
- let bytesRead = 0;
- const contentLengthEnforcer = new Transform({
- transform(chunk, _, callback) {
- bytesRead += chunk.length;
+ let bytesRead = 0;
+ const contentLengthEnforcer = new Transform({
+ transform(chunk, _, callback) {
+ bytesRead += chunk.length;
- if (abortSignal.aborted) {
- callback(new Error("AbortError"));
- } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) {
- callback(
- new Error(
- `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`,
- ),
- );
- } else {
- callback(null, chunk); // pass data along unchanged
- }
- },
- flush(callback) {
- callback();
- },
- });
+ if (abortSignal.aborted) {
+ callback(new Error("AbortError"));
+ } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) {
+ callback(
+ new Error(
+ `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`,
+ ),
+ );
+ } else {
+ callback(null, chunk); // pass data along unchanged
+ }
+ },
+ flush(callback) {
+ callback();
+ },
+ });
- await pipeline(
- response.body,
- contentLengthEnforcer,
- fsSync.createWriteStream(assetPath),
- );
+ await pipeline(
+ response.body,
+ contentLengthEnforcer,
+ fsSync.createWriteStream(assetPath),
+ );
- // Check storage quota before saving the asset
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, bytesRead),
- );
+ // Check storage quota before saving the asset
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, bytesRead),
+ );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`,
- );
- return null;
- }
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return null;
+ }
- await saveAssetFromFile({
- userId,
- assetId,
- metadata: { contentType },
- assetPath,
- quotaApproved,
- });
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ metadata: { contentType },
+ assetPath,
+ quotaApproved,
+ });
- logger.info(
- `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`,
- );
+ logger.info(
+ `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`,
+ );
- return { assetId, userId, contentType, size: bytesRead };
- } catch (e) {
- logger.error(
- `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
- );
- return null;
- } finally {
- if (assetPath) {
- await tryCatch(fs.unlink(assetPath));
- }
- }
+ return { assetId, userId, contentType, size: bytesRead };
+ } catch (e) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
+ );
+ return null;
+ } finally {
+ if (assetPath) {
+ await tryCatch(fs.unlink(assetPath));
+ }
+ }
+ },
+ );
}
async function downloadAndStoreImage(
@@ -843,77 +1066,91 @@ async function archiveWebpage(
jobId: string,
abortSignal: AbortSignal,
) {
- logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
- const assetId = newAssetId();
- const assetPath = path.join(os.tmpdir(), assetId);
-
- let res = await execa({
- input: html,
- cancelSignal: abortSignal,
- env: {
- https_proxy: serverConfig.proxy.httpsProxy
- ? getRandomProxy(serverConfig.proxy.httpsProxy)
- : undefined,
- http_proxy: serverConfig.proxy.httpProxy
- ? getRandomProxy(serverConfig.proxy.httpProxy)
- : undefined,
- no_proxy: serverConfig.proxy.noProxy?.join(","),
+ return await withSpan(
+ tracer,
+ "crawlerWorker.archiveWebpage",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ "user.id": userId,
+ },
},
- })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
+ async () => {
+ logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
+ const assetId = newAssetId();
+ const assetPath = path.join(os.tmpdir(), assetId);
- if (res.isCanceled) {
- logger.error(
- `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ let res = await execa({
+ input: html,
+ cancelSignal: abortSignal,
+ env: {
+ https_proxy: serverConfig.proxy.httpsProxy
+ ? getRandomProxy(serverConfig.proxy.httpsProxy)
+ : undefined,
+ http_proxy: serverConfig.proxy.httpProxy
+ ? getRandomProxy(serverConfig.proxy.httpProxy)
+ : undefined,
+ no_proxy: serverConfig.proxy.noProxy?.join(","),
+ },
+ })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
- if (res.exitCode !== 0) {
- logger.error(
- `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ if (res.isCanceled) {
+ logger.error(
+ `[Crawler][${jobId}] Canceled archiving the page as we hit global timeout.`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- const contentType = "text/html";
+ if (res.exitCode !== 0) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to archive the page as the command exited with code ${res.exitCode}`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- // Get file size and check quota before saving
- const stats = await fs.stat(assetPath);
- const fileSize = stats.size;
+ const contentType = "text/html";
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, fileSize),
- );
+ // Get file size and check quota before saving
+ const stats = await fs.stat(assetPath);
+ const fileSize = stats.size;
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`,
- );
- await tryCatch(fs.unlink(assetPath));
- return null;
- }
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, fileSize),
+ );
- await saveAssetFromFile({
- userId,
- assetId,
- assetPath,
- metadata: {
- contentType,
- },
- quotaApproved,
- });
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`,
+ );
+ await tryCatch(fs.unlink(assetPath));
+ return null;
+ }
- logger.info(
- `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
- );
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata: {
+ contentType,
+ },
+ quotaApproved,
+ });
- return {
- assetId,
- contentType,
- size: await getAssetSize({ userId, assetId }),
- };
+ logger.info(
+ `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
+ );
+
+ return {
+ assetId,
+ contentType,
+ size: await getAssetSize({ userId, assetId }),
+ };
+ },
+ );
}
async function getContentType(
@@ -921,26 +1158,45 @@ async function getContentType(
jobId: string,
abortSignal: AbortSignal,
): Promise<string | null> {
- try {
- logger.info(
- `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`,
- );
- const response = await fetchWithProxy(url, {
- method: "HEAD",
- signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
- });
- const rawContentType = response.headers.get("content-type");
- const contentType = normalizeContentType(rawContentType);
- logger.info(
- `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`,
- );
- return contentType;
- } catch (e) {
- logger.error(
- `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`,
- );
- return null;
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.getContentType",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ },
+ },
+ async () => {
+ try {
+ logger.info(
+ `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`,
+ );
+ const response = await fetchWithProxy(url, {
+ method: "GET",
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
+ });
+ setSpanAttributes({
+ "crawler.getContentType.statusCode": response.status,
+ });
+ const rawContentType = response.headers.get("content-type");
+ const contentType = normalizeContentType(rawContentType);
+ setSpanAttributes({
+ "crawler.contentType": contentType ?? undefined,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`,
+ );
+ return contentType;
+ } catch (e) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`,
+ );
+ return null;
+ }
+ },
+ );
}
/**
@@ -959,53 +1215,69 @@ async function handleAsAssetBookmark(
bookmarkId: string,
abortSignal: AbortSignal,
) {
- const downloaded = await downloadAndStoreFile(
- url,
- userId,
- jobId,
- assetType,
- abortSignal,
- );
- if (!downloaded) {
- return;
- }
- const fileName = path.basename(new URL(url).pathname);
- await db.transaction(async (trx) => {
- await updateAsset(
- undefined,
- {
- id: downloaded.assetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.BOOKMARK_ASSET,
- contentType: downloaded.contentType,
- size: downloaded.size,
- fileName,
- },
- trx,
- );
- await trx.insert(bookmarkAssets).values({
- id: bookmarkId,
- assetType,
- assetId: downloaded.assetId,
- content: null,
- fileName,
- sourceUrl: url,
- });
- // Switch the type of the bookmark from LINK to ASSET
- await trx
- .update(bookmarks)
- .set({ type: BookmarkTypes.ASSET })
- .where(eq(bookmarks.id, bookmarkId));
- await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
- });
- await AssetPreprocessingQueue.enqueue(
+ return await withSpan(
+ tracer,
+ "crawlerWorker.handleAsAssetBookmark",
{
- bookmarkId,
- fixMode: false,
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ "user.id": userId,
+ "bookmark.id": bookmarkId,
+ "asset.type": assetType,
+ },
},
- {
- groupId: userId,
+ async () => {
+ const downloaded = await downloadAndStoreFile(
+ url,
+ userId,
+ jobId,
+ assetType,
+ abortSignal,
+ );
+ if (!downloaded) {
+ return;
+ }
+ const fileName = path.basename(new URL(url).pathname);
+ await db.transaction(async (trx) => {
+ await updateAsset(
+ undefined,
+ {
+ id: downloaded.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.BOOKMARK_ASSET,
+ contentType: downloaded.contentType,
+ size: downloaded.size,
+ fileName,
+ },
+ trx,
+ );
+ await trx.insert(bookmarkAssets).values({
+ id: bookmarkId,
+ assetType,
+ assetId: downloaded.assetId,
+ content: null,
+ fileName,
+ sourceUrl: url,
+ });
+ // Switch the type of the bookmark from LINK to ASSET
+ await trx
+ .update(bookmarks)
+ .set({ type: BookmarkTypes.ASSET })
+ .where(eq(bookmarks.id, bookmarkId));
+ await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
+ });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId,
+ fixMode: false,
+ },
+ {
+ groupId: userId,
+ },
+ );
},
);
}
@@ -1020,60 +1292,75 @@ async function storeHtmlContent(
userId: string,
jobId: string,
): Promise<StoreHtmlResult> {
- if (!htmlContent) {
- return { result: "not_stored" };
- }
+ return await withSpan(
+ tracer,
+ "crawlerWorker.storeHtmlContent",
+ {
+ attributes: {
+ "job.id": jobId,
+ "user.id": userId,
+ "bookmark.content.size": htmlContent
+ ? Buffer.byteLength(htmlContent, "utf8")
+ : 0,
+ },
+ },
+ async () => {
+ if (!htmlContent) {
+ return { result: "not_stored" };
+ }
- const contentSize = Buffer.byteLength(htmlContent, "utf8");
+ const contentSize = Buffer.byteLength(htmlContent, "utf8");
- // Only store in assets if content is >= 50KB
- if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) {
- logger.info(
- `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`,
- );
- return { result: "store_inline" };
- }
+ // Only store in assets if content is >= 50KB
+ if (contentSize < serverConfig.crawler.htmlContentSizeThreshold) {
+ logger.info(
+ `[Crawler][${jobId}] HTML content size (${contentSize} bytes) is below threshold, storing inline`,
+ );
+ return { result: "store_inline" };
+ }
- const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, contentSize),
- );
- if (quotaError) {
- logger.warn(
- `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`,
- );
- return { result: "not_stored" };
- }
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, contentSize),
+ );
+ if (quotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`,
+ );
+ return { result: "not_stored" };
+ }
- const assetId = newAssetId();
+ const assetId = newAssetId();
- const { error: saveError } = await tryCatch(
- saveAsset({
- userId,
- assetId,
- asset: Buffer.from(htmlContent, "utf8"),
- metadata: {
- contentType: ASSET_TYPES.TEXT_HTML,
- fileName: null,
- },
- quotaApproved,
- }),
- );
- if (saveError) {
- logger.error(
- `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`,
- );
- throw saveError;
- }
+ const { error: saveError } = await tryCatch(
+ saveAsset({
+ userId,
+ assetId,
+ asset: Buffer.from(htmlContent, "utf8"),
+ metadata: {
+ contentType: ASSET_TYPES.TEXT_HTML,
+ fileName: null,
+ },
+ quotaApproved,
+ }),
+ );
+ if (saveError) {
+ logger.error(
+ `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`,
+ );
+ throw saveError;
+ }
- logger.info(
- `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`,
- );
+ logger.info(
+ `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`,
+ );
- return {
- result: "stored",
- assetId,
- size: contentSize,
- };
+ return {
+ result: "stored",
+ assetId,
+ size: contentSize,
+ };
+ },
+ );
}
async function crawlAndParseUrl(
@@ -1082,268 +1369,352 @@ async function crawlAndParseUrl(
jobId: string,
bookmarkId: string,
oldScreenshotAssetId: string | undefined,
+ oldPdfAssetId: string | undefined,
oldImageAssetId: string | undefined,
oldFullPageArchiveAssetId: string | undefined,
oldContentAssetId: string | undefined,
precrawledArchiveAssetId: string | undefined,
archiveFullPage: boolean,
+ forceStorePdf: boolean,
abortSignal: AbortSignal,
) {
- let result: {
- htmlContent: string;
- screenshot: Buffer | undefined;
- statusCode: number | null;
- url: string;
- };
-
- if (precrawledArchiveAssetId) {
- logger.info(
- `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`,
- );
- const asset = await readAsset({
- userId,
- assetId: precrawledArchiveAssetId,
- });
- result = {
- htmlContent: asset.asset.toString(),
- screenshot: undefined,
- statusCode: 200,
- url,
- };
- } else {
- result = await crawlPage(jobId, url, userId, abortSignal);
- }
- abortSignal.throwIfAborted();
-
- const { htmlContent, screenshot, statusCode, url: browserUrl } = result;
+ return await withSpan(
+ tracer,
+ "crawlerWorker.crawlAndParseUrl",
+ {
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ "user.id": userId,
+ "bookmark.id": bookmarkId,
+ "crawler.archiveFullPage": archiveFullPage,
+ "crawler.forceStorePdf": forceStorePdf,
+ "crawler.hasPrecrawledArchive": !!precrawledArchiveAssetId,
+ },
+ },
+ async () => {
+ let result: {
+ htmlContent: string;
+ screenshot: Buffer | undefined;
+ pdf: Buffer | undefined;
+ statusCode: number | null;
+ url: string;
+ };
- // Track status code in Prometheus
- if (statusCode !== null) {
- crawlerStatusCodeCounter.labels(statusCode.toString()).inc();
- }
+ if (precrawledArchiveAssetId) {
+ logger.info(
+ `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`,
+ );
+ const asset = await readAsset({
+ userId,
+ assetId: precrawledArchiveAssetId,
+ });
+ result = {
+ htmlContent: asset.asset.toString(),
+ screenshot: undefined,
+ pdf: undefined,
+ statusCode: 200,
+ url,
+ };
+ } else {
+ result = await crawlPage(
+ jobId,
+ url,
+ userId,
+ forceStorePdf,
+ abortSignal,
+ );
+ }
+ abortSignal.throwIfAborted();
- const meta = await Promise.race([
- extractMetadata(htmlContent, browserUrl, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
+ const {
+ htmlContent,
+ screenshot,
+ pdf,
+ statusCode,
+ url: browserUrl,
+ } = result;
- let readableContent = await Promise.race([
- extractReadableContent(htmlContent, browserUrl, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
+ // Track status code in Prometheus
+ if (statusCode !== null) {
+ crawlerStatusCodeCounter.labels(statusCode.toString()).inc();
+ setSpanAttributes({
+ "crawler.statusCode": statusCode,
+ });
+ }
- const screenshotAssetInfo = await Promise.race([
- storeScreenshot(screenshot, userId, jobId),
- abortPromise(abortSignal),
- ]);
- abortSignal.throwIfAborted();
+ const meta = await Promise.race([
+ extractMetadata(htmlContent, browserUrl, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
- const htmlContentAssetInfo = await storeHtmlContent(
- readableContent?.content,
- userId,
- jobId,
- );
- abortSignal.throwIfAborted();
- let imageAssetInfo: DBAssetType | null = null;
- if (meta.image) {
- const downloaded = await downloadAndStoreImage(
- meta.image,
- userId,
- jobId,
- abortSignal,
- );
- if (downloaded) {
- imageAssetInfo = {
- id: downloaded.assetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_BANNER_IMAGE,
- contentType: downloaded.contentType,
- size: downloaded.size,
+ const parseDate = (date: string | undefined) => {
+ if (!date) {
+ return null;
+ }
+ try {
+ return new Date(date);
+ } catch {
+ return null;
+ }
};
- }
- }
- abortSignal.throwIfAborted();
- const parseDate = (date: string | undefined) => {
- if (!date) {
- return null;
- }
- try {
- return new Date(date);
- } catch {
- return null;
- }
- };
+ // Phase 1: Write metadata immediately for fast user feedback.
+ // Content and asset storage happen later and can be slow (banner
+ // image download, screenshot/pdf upload, etc.).
+ await db
+ .update(bookmarkLinks)
+ .set({
+ title: meta.title,
+ description: meta.description,
+ // Don't store data URIs as they're not valid URLs and are usually quite large
+ imageUrl: meta.image?.startsWith("data:") ? null : meta.image,
+ favicon: meta.logo,
+ crawlStatusCode: statusCode,
+ author: meta.author,
+ publisher: meta.publisher,
+ datePublished: parseDate(meta.datePublished),
+ dateModified: parseDate(meta.dateModified),
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
- // TODO(important): Restrict the size of content to store
- const assetDeletionTasks: Promise<void>[] = [];
- const inlineHtmlContent =
- htmlContentAssetInfo.result === "store_inline"
- ? (readableContent?.content ?? null)
- : null;
- readableContent = null;
- await db.transaction(async (txn) => {
- await txn
- .update(bookmarkLinks)
- .set({
- title: meta.title,
- description: meta.description,
- // Don't store data URIs as they're not valid URLs and are usually quite large
- imageUrl: meta.image?.startsWith("data:") ? null : meta.image,
- favicon: meta.logo,
- htmlContent: inlineHtmlContent,
- contentAssetId:
- htmlContentAssetInfo.result === "stored"
- ? htmlContentAssetInfo.assetId
- : null,
- crawledAt: new Date(),
- crawlStatusCode: statusCode,
- author: meta.author,
- publisher: meta.publisher,
- datePublished: parseDate(meta.datePublished),
- dateModified: parseDate(meta.dateModified),
- })
- .where(eq(bookmarkLinks.id, bookmarkId));
+ let readableContent: { content: string } | null = meta.readableContentHtml
+ ? { content: meta.readableContentHtml }
+ : null;
+ if (!readableContent) {
+ readableContent = await Promise.race([
+ extractReadableContent(
+ meta.contentHtml ?? htmlContent,
+ browserUrl,
+ jobId,
+ ),
+ abortPromise(abortSignal),
+ ]);
+ }
+ abortSignal.throwIfAborted();
- if (screenshotAssetInfo) {
- await updateAsset(
- oldScreenshotAssetId,
- {
- id: screenshotAssetInfo.assetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_SCREENSHOT,
- contentType: screenshotAssetInfo.contentType,
- size: screenshotAssetInfo.size,
- fileName: screenshotAssetInfo.fileName,
- },
- txn,
- );
- assetDeletionTasks.push(silentDeleteAsset(userId, oldScreenshotAssetId));
- }
- if (imageAssetInfo) {
- await updateAsset(oldImageAssetId, imageAssetInfo, txn);
- assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId));
- }
- if (htmlContentAssetInfo.result === "stored") {
- await updateAsset(
- oldContentAssetId,
- {
- id: htmlContentAssetInfo.assetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_HTML_CONTENT,
- contentType: ASSET_TYPES.TEXT_HTML,
- size: htmlContentAssetInfo.size,
- fileName: null,
- },
- txn,
- );
- assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
- } else if (oldContentAssetId) {
- // Unlink the old content asset
- await txn.delete(assets).where(eq(assets.id, oldContentAssetId));
- assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
- }
- });
+ const screenshotAssetInfo = await Promise.race([
+ storeScreenshot(screenshot, userId, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
- // Delete the old assets if any
- await Promise.all(assetDeletionTasks);
+ const pdfAssetInfo = await Promise.race([
+ storePdf(pdf, userId, jobId),
+ abortPromise(abortSignal),
+ ]);
+ abortSignal.throwIfAborted();
- return async () => {
- if (
- !precrawledArchiveAssetId &&
- (serverConfig.crawler.fullPageArchive || archiveFullPage)
- ) {
- const archiveResult = await archiveWebpage(
- htmlContent,
- browserUrl,
+ const htmlContentAssetInfo = await storeHtmlContent(
+ readableContent?.content,
userId,
jobId,
- abortSignal,
);
+ abortSignal.throwIfAborted();
+ let imageAssetInfo: DBAssetType | null = null;
+ if (meta.image) {
+ const downloaded = await downloadAndStoreImage(
+ meta.image,
+ userId,
+ jobId,
+ abortSignal,
+ );
+ if (downloaded) {
+ imageAssetInfo = {
+ id: downloaded.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_BANNER_IMAGE,
+ contentType: downloaded.contentType,
+ size: downloaded.size,
+ };
+ }
+ }
+ abortSignal.throwIfAborted();
- if (archiveResult) {
- const {
- assetId: fullPageArchiveAssetId,
- size,
- contentType,
- } = archiveResult;
+ // Phase 2: Write content and asset references.
+ // TODO(important): Restrict the size of content to store
+ const assetDeletionTasks: Promise<void>[] = [];
+ const inlineHtmlContent =
+ htmlContentAssetInfo.result === "store_inline"
+ ? (readableContent?.content ?? null)
+ : null;
+ readableContent = null;
+ await db.transaction(async (txn) => {
+ await txn
+ .update(bookmarkLinks)
+ .set({
+ crawledAt: new Date(),
+ htmlContent: inlineHtmlContent,
+ contentAssetId:
+ htmlContentAssetInfo.result === "stored"
+ ? htmlContentAssetInfo.assetId
+ : null,
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
- await db.transaction(async (txn) => {
+ if (screenshotAssetInfo) {
await updateAsset(
- oldFullPageArchiveAssetId,
+ oldScreenshotAssetId,
{
- id: fullPageArchiveAssetId,
+ id: screenshotAssetInfo.assetId,
bookmarkId,
userId,
- assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
- contentType,
- size,
+ assetType: AssetTypes.LINK_SCREENSHOT,
+ contentType: screenshotAssetInfo.contentType,
+ size: screenshotAssetInfo.size,
+ fileName: screenshotAssetInfo.fileName,
+ },
+ txn,
+ );
+ assetDeletionTasks.push(
+ silentDeleteAsset(userId, oldScreenshotAssetId),
+ );
+ }
+ if (pdfAssetInfo) {
+ await updateAsset(
+ oldPdfAssetId,
+ {
+ id: pdfAssetInfo.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_PDF,
+ contentType: pdfAssetInfo.contentType,
+ size: pdfAssetInfo.size,
+ fileName: pdfAssetInfo.fileName,
+ },
+ txn,
+ );
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldPdfAssetId));
+ }
+ if (imageAssetInfo) {
+ await updateAsset(oldImageAssetId, imageAssetInfo, txn);
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldImageAssetId));
+ }
+ if (htmlContentAssetInfo.result === "stored") {
+ await updateAsset(
+ oldContentAssetId,
+ {
+ id: htmlContentAssetInfo.assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_HTML_CONTENT,
+ contentType: ASSET_TYPES.TEXT_HTML,
+ size: htmlContentAssetInfo.size,
fileName: null,
},
txn,
);
- });
- if (oldFullPageArchiveAssetId) {
- await silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
+ } else if (oldContentAssetId) {
+ // Unlink the old content asset
+ await txn.delete(assets).where(eq(assets.id, oldContentAssetId));
+ assetDeletionTasks.push(silentDeleteAsset(userId, oldContentAssetId));
}
- }
- }
- };
+ });
+
+ // Delete the old assets if any
+ await Promise.all(assetDeletionTasks);
+
+ return async () => {
+ if (
+ !precrawledArchiveAssetId &&
+ (serverConfig.crawler.fullPageArchive || archiveFullPage)
+ ) {
+ const archiveResult = await archiveWebpage(
+ htmlContent,
+ browserUrl,
+ userId,
+ jobId,
+ abortSignal,
+ );
+
+ if (archiveResult) {
+ const {
+ assetId: fullPageArchiveAssetId,
+ size,
+ contentType,
+ } = archiveResult;
+
+ await db.transaction(async (txn) => {
+ await updateAsset(
+ oldFullPageArchiveAssetId,
+ {
+ id: fullPageArchiveAssetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
+ contentType,
+ size,
+ fileName: null,
+ },
+ txn,
+ );
+ });
+ if (oldFullPageArchiveAssetId) {
+ await silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ }
+ }
+ }
+ };
+ },
+ );
}
/**
- * Checks if the domain should be rate limited and reschedules the job if needed.
- * @returns true if the job should continue, false if it was rescheduled
+ * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed.
+ * @throws {QueueRetryAfterError} if the domain is rate limited
*/
-async function checkDomainRateLimit(
- url: string,
- jobId: string,
- jobData: ZCrawlLinkRequest,
- userId: string,
- jobPriority?: number,
-): Promise<boolean> {
- const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
- if (!crawlerDomainRateLimitConfig) {
- return true;
- }
-
- const rateLimitClient = await getRateLimitClient();
- if (!rateLimitClient) {
- return true;
- }
-
- const hostname = new URL(url).hostname;
- const rateLimitResult = rateLimitClient.checkRateLimit(
+async function checkDomainRateLimit(url: string, jobId: string): Promise<void> {
+ return await withSpan(
+ tracer,
+ "crawlerWorker.checkDomainRateLimit",
{
- name: "domain-ratelimit",
- maxRequests: crawlerDomainRateLimitConfig.maxRequests,
- windowMs: crawlerDomainRateLimitConfig.windowMs,
+ attributes: {
+ "bookmark.url": url,
+ "bookmark.domain": getBookmarkDomain(url),
+ "job.id": jobId,
+ },
},
- hostname,
- );
+ async () => {
+ const crawlerDomainRateLimitConfig =
+ serverConfig.crawler.domainRatelimiting;
+ if (!crawlerDomainRateLimitConfig) {
+ return;
+ }
- if (!rateLimitResult.allowed) {
- const resetInSeconds = rateLimitResult.resetInSeconds;
- // Add jitter to prevent thundering herd: +40% random variation
- const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
- const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
- logger.info(
- `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
- );
- await LinkCrawlerQueue.enqueue(jobData, {
- priority: jobPriority,
- delayMs,
- groupId: userId,
- });
- return false;
- }
+ const rateLimitClient = await getRateLimitClient();
+ if (!rateLimitClient) {
+ return;
+ }
+
+ const hostname = new URL(url).hostname;
+ const rateLimitResult = rateLimitClient.checkRateLimit(
+ {
+ name: "domain-ratelimit",
+ maxRequests: crawlerDomainRateLimitConfig.maxRequests,
+ windowMs: crawlerDomainRateLimitConfig.windowMs,
+ },
+ hostname,
+ );
- return true;
+ if (!rateLimitResult.allowed) {
+ const resetInSeconds = rateLimitResult.resetInSeconds;
+ // Add jitter to prevent thundering herd: +40% random variation
+ const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
+ const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
+ logger.info(
+ `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
+ );
+ throw new QueueRetryAfterError(
+ `Domain "${hostname}" is rate limited`,
+ delayMs,
+ );
+ }
+ },
+ );
}
async function runCrawler(
@@ -1359,28 +1730,21 @@ async function runCrawler(
return { status: "completed" };
}
- const { bookmarkId, archiveFullPage } = request.data;
+ const { bookmarkId, archiveFullPage, storePdf } = request.data;
const {
url,
userId,
+ createdAt,
+ crawledAt,
screenshotAssetId: oldScreenshotAssetId,
+ pdfAssetId: oldPdfAssetId,
imageAssetId: oldImageAssetId,
fullPageArchiveAssetId: oldFullPageArchiveAssetId,
contentAssetId: oldContentAssetId,
precrawledArchiveAssetId,
} = await getBookmarkDetails(bookmarkId);
- const shouldContinue = await checkDomainRateLimit(
- url,
- jobId,
- job.data,
- userId,
- job.priority,
- );
-
- if (!shouldContinue) {
- return { status: "rescheduled" };
- }
+ await checkDomainRateLimit(url, jobId);
logger.info(
`[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,
@@ -1421,11 +1785,13 @@ async function runCrawler(
jobId,
bookmarkId,
oldScreenshotAssetId,
+ oldPdfAssetId,
oldImageAssetId,
oldFullPageArchiveAssetId,
oldContentAssetId,
precrawledArchiveAssetId,
archiveFullPage,
+ storePdf ?? false,
job.abortSignal,
);
@@ -1473,5 +1839,13 @@ async function runCrawler(
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
+
+ // Record the latency from bookmark creation to crawl completion.
+ // Only for first-time, high-priority crawls (excludes recrawls and imports).
+ if (crawledAt === null && job.priority === 0) {
+ const latencySeconds = (Date.now() - createdAt.getTime()) / 1000;
+ bookmarkCrawlLatencyHistogram.observe(latencySeconds);
+ }
+
return { status: "completed" };
}
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 2a1334a9..eed7ccb1 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -4,6 +4,7 @@ import { fetchWithProxy } from "network";
import cron from "node-cron";
import Parser from "rss-parser";
import { buildImpersonatingTRPCClient } from "trpc";
+import { withWorkerTracing } from "workerTracing";
import { z } from "zod";
import type { ZFeedRequestSchema } from "@karakeep/shared-server";
@@ -88,7 +89,7 @@ export class FeedWorker {
const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>(
FeedQueue,
{
- run: run,
+ run: withWorkerTracing("feedWorker.run", run),
onComplete: async (job) => {
workerStatsCounter.labels("feed", "completed").inc();
const jobId = job.id;
@@ -155,9 +156,9 @@ async function run(req: DequeuedJob<ZFeedRequestSchema>) {
const response = await fetchWithProxy(feed.url, {
signal: AbortSignal.timeout(5000),
headers: {
- UserAgent:
+ "User-Agent":
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
- Accept: "application/rss+xml",
+ Accept: "application/rss+xml, application/xml;q=0.9, text/xml;q=0.8",
},
});
if (response.status !== 200) {
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
new file mode 100644
index 00000000..e5b5c27e
--- /dev/null
+++ b/apps/workers/workers/importWorker.ts
@@ -0,0 +1,698 @@
+import { TRPCError } from "@trpc/server";
+import {
+ and,
+ count,
+ eq,
+ gt,
+ inArray,
+ isNotNull,
+ isNull,
+ lt,
+ or,
+} from "drizzle-orm";
+import { Counter, Gauge, Histogram } from "prom-client";
+import { buildImpersonatingTRPCClient } from "trpc";
+
+import { db } from "@karakeep/db";
+import {
+ bookmarkLinks,
+ bookmarks,
+ importSessions,
+ importStagingBookmarks,
+} from "@karakeep/db/schema";
+import { LowPriorityCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server";
+import logger, { throttledLogger } from "@karakeep/shared/logger";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+
+import { registry } from "../metrics";
+
+// Prometheus metrics
+const importStagingProcessedCounter = new Counter({
+ name: "karakeep_import_staging_processed_total",
+ help: "Total number of staged items processed",
+ labelNames: ["result"],
+ registers: [registry],
+});
+
+const importStagingStaleResetCounter = new Counter({
+ name: "karakeep_import_staging_stale_reset_total",
+ help: "Total number of stale processing items reset to pending",
+ registers: [registry],
+});
+
+const importStagingInFlightGauge = new Gauge({
+ name: "karakeep_import_staging_in_flight",
+ help: "Current number of in-flight items (processing + recently completed)",
+ registers: [registry],
+});
+
+const importSessionsGauge = new Gauge({
+ name: "karakeep_import_sessions_active",
+ help: "Number of active import sessions by status",
+ labelNames: ["status"],
+ registers: [registry],
+});
+
+const importStagingPendingGauge = new Gauge({
+ name: "karakeep_import_staging_pending_total",
+ help: "Total number of pending items in staging table",
+ registers: [registry],
+});
+
+const importBatchDurationHistogram = new Histogram({
+ name: "karakeep_import_batch_duration_seconds",
+ help: "Time taken to process a batch of staged items",
+ buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
+ registers: [registry],
+});
+
+const backpressureLogger = throttledLogger(60_000);
+
+function sleep(ms: number): Promise<void> {
+ return new Promise((resolve) => setTimeout(resolve, ms));
+}
+
+/**
+ * Extract a safe, user-facing error message from an error.
+ * Avoids leaking internal details like database errors, stack traces, or file paths.
+ */
+function getSafeErrorMessage(error: unknown): string {
+ // TRPCError client errors are designed to be user-facing
+ if (error instanceof TRPCError && error.code !== "INTERNAL_SERVER_ERROR") {
+ return error.message;
+ }
+
+ // Known safe validation errors thrown within the import worker
+ if (error instanceof Error) {
+ const safeMessages = [
+ "URL is required for link bookmarks",
+ "Content is required for text bookmarks",
+ ];
+ if (safeMessages.includes(error.message)) {
+ return error.message;
+ }
+ }
+
+ return "An unexpected error occurred while processing the bookmark";
+}
+
+export class ImportWorker {
+ private running = false;
+ private pollIntervalMs = 5000;
+
+ // Backpressure settings
+ private maxInFlight = 50;
+ private batchSize = 10;
+ private staleThresholdMs = 60 * 60 * 1000; // 1 hour
+
+ async start() {
+ this.running = true;
+ let iterationCount = 0;
+
+ logger.info("[import] Starting import polling worker");
+
+ while (this.running) {
+ try {
+ // Periodically reset stale processing items (every 60 iterations ~= 1 min)
+ if (iterationCount % 60 === 0) {
+ await this.resetStaleProcessingItems();
+ }
+ iterationCount++;
+
+ // Check if any processing items have completed downstream work
+ await this.checkAndCompleteProcessingItems();
+
+ const processed = await this.processBatch();
+ if (processed === 0) {
+ await this.checkAndCompleteIdleSessions();
+ await this.updateGauges();
+ // Nothing to do, wait before polling again
+ await sleep(this.pollIntervalMs);
+ } else {
+ await this.updateGauges();
+ }
+ } catch (error) {
+ logger.error(`[import] Error in polling loop: ${error}`);
+ await sleep(this.pollIntervalMs);
+ }
+ }
+ }
+
+ stop() {
+ logger.info("[import] Stopping import polling worker");
+ this.running = false;
+ }
+
+ private async processBatch(): Promise<number> {
+ const countPendingItems = await this.countPendingItems();
+ importStagingPendingGauge.set(countPendingItems);
+ if (countPendingItems === 0) {
+ // Nothing to do, wait before polling again
+ return 0;
+ }
+
+ // 1. Check backpressure - inflight items + queue sizes
+ const availableCapacity = await this.getAvailableCapacity();
+
+ if (availableCapacity <= 0) {
+ // At capacity, wait before trying again
+ backpressureLogger(
+ "info",
+ `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`,
+ );
+ return 0;
+ }
+
+ logger.debug(
+ `[import] ${countPendingItems} pending items, available capacity: ${availableCapacity}`,
+ );
+
+ // 2. Get candidate IDs with fair scheduling across users
+ const batchLimit = Math.min(this.batchSize, availableCapacity);
+ const candidateIds = await this.getNextBatchFairly(batchLimit);
+
+ if (candidateIds.length === 0) return 0;
+
+ // 3. Atomically claim rows - only rows still pending will be claimed
+ // This prevents race conditions where multiple workers select the same rows
+ const batch = await db
+ .update(importStagingBookmarks)
+ .set({ status: "processing", processingStartedAt: new Date() })
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importStagingBookmarks.id, candidateIds),
+ ),
+ )
+ .returning();
+
+ // If no rows were claimed (another worker got them first), skip processing
+ if (batch.length === 0) return 0;
+
+ const batchTimer = importBatchDurationHistogram.startTimer();
+
+ // 4. Mark session(s) as running (using claimed rows, not candidates)
+ const sessionIds = [...new Set(batch.map((b) => b.importSessionId))];
+ logger.info(
+ `[import] Claimed batch of ${batch.length} items from ${sessionIds.length} session(s): [${sessionIds.join(", ")}]`,
+ );
+ await db
+ .update(importSessions)
+ .set({ status: "running" })
+ .where(
+ and(
+ inArray(importSessions.id, sessionIds),
+ eq(importSessions.status, "pending"),
+ ),
+ );
+
+ // 5. Process in parallel
+ const results = await Promise.allSettled(
+ batch.map((staged) => this.processOneBookmark(staged)),
+ );
+
+ const outcomes: Record<string, number> = {};
+ for (const r of results) {
+ const key = r.status === "fulfilled" ? r.value : "error";
+ outcomes[key] = (outcomes[key] ?? 0) + 1;
+ }
+ logger.debug(
+ `[import] Batch results: ${Object.entries(outcomes)
+ .map(([k, v]) => `${k}=${v}`)
+ .join(", ")}`,
+ );
+
+ // 6. Check if any sessions are now complete
+ await this.checkAndCompleteEmptySessions(sessionIds);
+
+ batchTimer(); // Record batch duration
+
+ return batch.length;
+ }
+
+ private async updateGauges() {
+ // Update active sessions gauge by status
+ const sessions = await db
+ .select({
+ status: importSessions.status,
+ count: count(),
+ })
+ .from(importSessions)
+ .where(
+ inArray(importSessions.status, [
+ "staging",
+ "pending",
+ "running",
+ "paused",
+ ]),
+ )
+ .groupBy(importSessions.status);
+
+ // Reset all status gauges to 0 first
+ for (const status of ["staging", "pending", "running", "paused"]) {
+ importSessionsGauge.set({ status }, 0);
+ }
+
+ // Set actual values
+ for (const s of sessions) {
+ importSessionsGauge.set({ status: s.status }, s.count);
+ }
+ }
+
+ private async checkAndCompleteIdleSessions() {
+ const sessions = await db
+ .select({ id: importSessions.id })
+ .from(importSessions)
+ .where(inArray(importSessions.status, ["pending", "running"]));
+
+ const sessionIds = sessions.map((session) => session.id);
+ if (sessionIds.length === 0) {
+ return;
+ }
+
+ await this.checkAndCompleteEmptySessions(sessionIds);
+ }
+
+ private async countPendingItems(): Promise<number> {
+ const res = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .innerJoin(
+ importSessions,
+ eq(importStagingBookmarks.importSessionId, importSessions.id),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importSessions.status, ["pending", "running"]),
+ ),
+ );
+ return res[0]?.count ?? 0;
+ }
+
+ private async getNextBatchFairly(limit: number): Promise<string[]> {
+ // Query pending item IDs from active sessions, ordered by:
+ // 1. User's last-served timestamp (fairness)
+ // 2. Staging item creation time (FIFO within user)
+ // Returns only IDs - actual rows will be fetched atomically during claim
+ const results = await db
+ .select({
+ id: importStagingBookmarks.id,
+ })
+ .from(importStagingBookmarks)
+ .innerJoin(
+ importSessions,
+ eq(importStagingBookmarks.importSessionId, importSessions.id),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importSessions.status, ["pending", "running"]),
+ ),
+ )
+ .orderBy(importSessions.lastProcessedAt, importStagingBookmarks.createdAt)
+ .limit(limit);
+
+ return results.map((r) => r.id);
+ }
+
+ private async attachBookmarkToLists(
+ caller: Awaited<ReturnType<typeof buildImpersonatingTRPCClient>>,
+ session: typeof importSessions.$inferSelect,
+ staged: typeof importStagingBookmarks.$inferSelect,
+ bookmarkId: string,
+ ): Promise<void> {
+ const listIds = new Set<string>();
+
+ if (session.rootListId) {
+ listIds.add(session.rootListId);
+ }
+
+ if (staged.listIds && staged.listIds.length > 0) {
+ for (const listId of staged.listIds) {
+ listIds.add(listId);
+ }
+ }
+
+ for (const listId of listIds) {
+ try {
+ await caller.lists.addToList({ listId, bookmarkId });
+ } catch (error) {
+ logger.warn(
+ `[import] Failed to add bookmark ${bookmarkId} to list ${listId}: ${error}`,
+ );
+ }
+ }
+ }
+
+ private async processOneBookmark(
+ staged: typeof importStagingBookmarks.$inferSelect,
+ ): Promise<string> {
+ const session = await db.query.importSessions.findFirst({
+ where: eq(importSessions.id, staged.importSessionId),
+ });
+
+ if (!session || session.status === "paused") {
+ // Session paused mid-batch, reset item to pending
+ await db
+ .update(importStagingBookmarks)
+ .set({ status: "pending" })
+ .where(eq(importStagingBookmarks.id, staged.id));
+ return "reset";
+ }
+
+ try {
+ // Use existing tRPC mutation via internal caller
+ // Note: Duplicate detection is handled by createBookmark itself
+ const caller = await buildImpersonatingTRPCClient(session.userId);
+
+ // Build the request based on bookmark type
+ type CreateBookmarkInput = Parameters<
+ typeof caller.bookmarks.createBookmark
+ >[0];
+
+ const baseRequest = {
+ title: staged.title ?? undefined,
+ note: staged.note ?? undefined,
+ createdAt: staged.sourceAddedAt ?? undefined,
+ crawlPriority: "low" as const,
+ };
+
+ let bookmarkRequest: CreateBookmarkInput;
+
+ if (staged.type === "link") {
+ if (!staged.url) {
+ throw new Error("URL is required for link bookmarks");
+ }
+ bookmarkRequest = {
+ ...baseRequest,
+ type: BookmarkTypes.LINK,
+ url: staged.url,
+ };
+ } else if (staged.type === "text") {
+ if (!staged.content) {
+ throw new Error("Content is required for text bookmarks");
+ }
+ bookmarkRequest = {
+ ...baseRequest,
+ type: BookmarkTypes.TEXT,
+ text: staged.content,
+ };
+ } else {
+ // asset type - skip for now as it needs special handling
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "failed",
+ result: "rejected",
+ resultReason: "Asset bookmarks not yet supported",
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "unsupported";
+ }
+
+ const result = await caller.bookmarks.createBookmark(bookmarkRequest);
+
+ // Apply tags via existing mutation (for both new and duplicate bookmarks)
+ if (staged.tags && staged.tags.length > 0) {
+ await caller.bookmarks.updateTags({
+ bookmarkId: result.id,
+ attach: staged.tags.map((t) => ({ tagName: t })),
+ detach: [],
+ });
+ }
+
+ // Handle duplicate case (createBookmark returns alreadyExists: true)
+ if (result.alreadyExists) {
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "completed",
+ result: "skipped_duplicate",
+ resultReason: "URL already exists",
+ resultBookmarkId: result.id,
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ importStagingProcessedCounter.inc({ result: "skipped_duplicate" });
+ await this.attachBookmarkToLists(caller, session, staged, result.id);
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "duplicate";
+ }
+
+ // Mark as accepted but keep in "processing" until crawl/tag is done
+ // The item will be moved to "completed" by checkAndCompleteProcessingItems()
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ result: "accepted",
+ resultBookmarkId: result.id,
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ await this.attachBookmarkToLists(caller, session, staged, result.id);
+
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "accepted";
+ } catch (error) {
+ logger.error(
+ `[import] Error processing staged item ${staged.id}: ${error}`,
+ );
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "failed",
+ result: "rejected",
+ resultReason: getSafeErrorMessage(error),
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ importStagingProcessedCounter.inc({ result: "rejected" });
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "failed";
+ }
+ }
+
+ private async updateSessionLastProcessedAt(sessionId: string) {
+ await db
+ .update(importSessions)
+ .set({ lastProcessedAt: new Date() })
+ .where(eq(importSessions.id, sessionId));
+ }
+
+ private async checkAndCompleteEmptySessions(sessionIds: string[]) {
+ for (const sessionId of sessionIds) {
+ const remaining = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.importSessionId, sessionId),
+ inArray(importStagingBookmarks.status, ["pending", "processing"]),
+ ),
+ );
+
+ if (remaining[0]?.count === 0) {
+ logger.info(
+ `[import] Session ${sessionId} completed, all items processed`,
+ );
+ await db
+ .update(importSessions)
+ .set({ status: "completed" })
+ .where(eq(importSessions.id, sessionId));
+ }
+ }
+ }
+
+ /**
+ * Check processing items that have a bookmark created and mark them as completed
+ * once downstream processing (crawling/tagging) is done.
+ */
+ private async checkAndCompleteProcessingItems(): Promise<number> {
+ // Find processing items where:
+ // - A bookmark was created (resultBookmarkId is set)
+ // - Downstream processing is complete (crawl/tag not pending)
+ const completedItems = await db
+ .select({
+ id: importStagingBookmarks.id,
+ importSessionId: importStagingBookmarks.importSessionId,
+ crawlStatus: bookmarkLinks.crawlStatus,
+ taggingStatus: bookmarks.taggingStatus,
+ })
+ .from(importStagingBookmarks)
+ .leftJoin(
+ bookmarks,
+ eq(bookmarks.id, importStagingBookmarks.resultBookmarkId),
+ )
+ .leftJoin(
+ bookmarkLinks,
+ eq(bookmarkLinks.id, importStagingBookmarks.resultBookmarkId),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ isNotNull(importStagingBookmarks.resultBookmarkId),
+ // Crawl is done (not pending) - either success, failure, or null (not a link)
+ or(
+ isNull(bookmarkLinks.crawlStatus),
+ eq(bookmarkLinks.crawlStatus, "success"),
+ eq(bookmarkLinks.crawlStatus, "failure"),
+ ),
+ // Tagging is done (not pending) - either success, failure, or null
+ or(
+ isNull(bookmarks.taggingStatus),
+ eq(bookmarks.taggingStatus, "success"),
+ eq(bookmarks.taggingStatus, "failure"),
+ ),
+ ),
+ );
+
+ if (completedItems.length === 0) {
+ return 0;
+ }
+
+ const succeededItems = completedItems.filter(
+ (i) => i.crawlStatus !== "failure" && i.taggingStatus !== "failure",
+ );
+ const failedItems = completedItems.filter(
+ (i) => i.crawlStatus === "failure" || i.taggingStatus === "failure",
+ );
+
+ logger.debug(
+ `[import] ${completedItems.length} item(s) finished downstream processing (${succeededItems.length} succeeded, ${failedItems.length} failed)`,
+ );
+
+ // Mark succeeded items as completed
+ if (succeededItems.length > 0) {
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "completed",
+ completedAt: new Date(),
+ })
+ .where(
+ inArray(
+ importStagingBookmarks.id,
+ succeededItems.map((i) => i.id),
+ ),
+ );
+
+ importStagingProcessedCounter.inc(
+ { result: "accepted" },
+ succeededItems.length,
+ );
+ }
+
+ // Mark failed items as failed
+ if (failedItems.length > 0) {
+ for (const item of failedItems) {
+ const reason =
+ item.crawlStatus === "failure" ? "Crawl failed" : "Tagging failed";
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "failed",
+ result: "rejected",
+ resultReason: reason,
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, item.id));
+ }
+
+ importStagingProcessedCounter.inc(
+ { result: "rejected" },
+ failedItems.length,
+ );
+ }
+
+ // Check if any sessions are now complete
+ const sessionIds = [
+ ...new Set(completedItems.map((i) => i.importSessionId)),
+ ];
+ await this.checkAndCompleteEmptySessions(sessionIds);
+
+ return completedItems.length;
+ }
+
+ /**
+ * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues.
+ */
+ private async getAvailableCapacity(): Promise<number> {
+ const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([
+ db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ gt(
+ importStagingBookmarks.processingStartedAt,
+ new Date(Date.now() - this.staleThresholdMs),
+ ),
+ ),
+ ),
+ LowPriorityCrawlerQueue.stats(),
+ OpenAIQueue.stats(),
+ ]);
+
+ const crawlerTotal =
+ crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry;
+ const openaiTotal =
+ openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry;
+ const processingTotal = processingCount[0]?.count ?? 0;
+
+ const inFlight = Math.max(crawlerTotal, openaiTotal, processingTotal);
+ importStagingInFlightGauge.set(inFlight);
+
+ return this.maxInFlight - inFlight;
+ }
+
+ /**
+ * Reset stale "processing" items back to "pending" so they can be retried.
+ * Called periodically to handle crashed workers or stuck items.
+ *
+ * Only resets items that don't have a resultBookmarkId - those with a bookmark
+ * are waiting for downstream processing (crawl/tag), not stale.
+ */
+ private async resetStaleProcessingItems(): Promise<number> {
+ const staleThreshold = new Date(Date.now() - this.staleThresholdMs);
+
+ const staleItems = await db
+ .select({ id: importStagingBookmarks.id })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ lt(importStagingBookmarks.processingStartedAt, staleThreshold),
+ // Only reset items that haven't created a bookmark yet
+ // Items with a bookmark are waiting for downstream, not stale
+ isNull(importStagingBookmarks.resultBookmarkId),
+ ),
+ );
+
+ if (staleItems.length > 0) {
+ logger.warn(
+ `[import] Resetting ${staleItems.length} stale processing items`,
+ );
+
+ await db
+ .update(importStagingBookmarks)
+ .set({ status: "pending", processingStartedAt: null })
+ .where(
+ inArray(
+ importStagingBookmarks.id,
+ staleItems.map((i) => i.id),
+ ),
+ );
+
+ importStagingStaleResetCounter.inc(staleItems.length);
+ return staleItems.length;
+ }
+
+ return 0;
+ }
+}
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
index eefc1dd8..57ad1a22 100644
--- a/apps/workers/workers/inference/inferenceWorker.ts
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import type { ZOpenAIRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -42,7 +43,7 @@ export class OpenAiWorker {
const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>(
OpenAIQueue,
{
- run: runOpenAI,
+ run: withWorkerTracing("inferenceWorker.run", runOpenAI),
onComplete: async (job) => {
workerStatsCounter.labels("inference", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 23636961..922eb5b7 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -1,12 +1,17 @@
import { and, eq } from "drizzle-orm";
+import { getBookmarkDomain } from "network";
import { db } from "@karakeep/db";
-import { bookmarks, customPrompts } from "@karakeep/db/schema";
-import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server";
+import { bookmarks, customPrompts, users } from "@karakeep/db/schema";
+import {
+ setSpanAttributes,
+ triggerSearchReindex,
+ ZOpenAIRequest,
+} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { InferenceClient } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
-import { buildSummaryPrompt } from "@karakeep/shared/prompts";
+import { buildSummaryPrompt } from "@karakeep/shared/prompts.server";
import { DequeuedJob } from "@karakeep/shared/queueing";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
@@ -22,6 +27,7 @@ async function fetchBookmarkDetailsForSummary(bookmarkId: string) {
description: true,
htmlContent: true,
contentAssetId: true,
+ crawlStatusCode: true,
publisher: true,
author: true,
url: true,
@@ -56,6 +62,33 @@ export async function runSummarization(
const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId);
+ // Check user-level preference
+ const userSettings = await db.query.users.findFirst({
+ where: eq(users.id, bookmarkData.userId),
+ columns: {
+ autoSummarizationEnabled: true,
+ inferredTagLang: true,
+ },
+ });
+
+ setSpanAttributes({
+ "user.id": bookmarkData.userId,
+ "bookmark.id": bookmarkData.id,
+ "bookmark.url": bookmarkData.link?.url,
+ "bookmark.domain": getBookmarkDomain(bookmarkData.link?.url),
+ "bookmark.content.type": bookmarkData.type,
+ "crawler.statusCode": bookmarkData.link?.crawlStatusCode ?? undefined,
+ "inference.type": "summarization",
+ "inference.model": serverConfig.inference.textModel,
+ });
+
+ if (userSettings?.autoSummarizationEnabled === false) {
+ logger.debug(
+ `[inference][${jobId}] Skipping summarization job for bookmark with id "${bookmarkId}" because user has disabled auto-summarization.`,
+ );
+ return;
+ }
+
let textToSummarize = "";
if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) {
const link = bookmarkData.link;
@@ -105,13 +138,21 @@ URL: ${link.url ?? ""}
},
});
+ setSpanAttributes({
+ "inference.prompt.customCount": prompts.length,
+ });
+
const summaryPrompt = await buildSummaryPrompt(
- serverConfig.inference.inferredTagLang,
+ userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang,
prompts.map((p) => p.text),
textToSummarize,
serverConfig.inference.contextLength,
);
+ setSpanAttributes({
+ "inference.prompt.size": Buffer.byteLength(summaryPrompt, "utf8"),
+ });
+
const summaryResult = await inferenceClient.inferFromText(summaryPrompt, {
schema: null, // Summaries are typically free-form text
abortSignal: job.abortSignal,
@@ -123,6 +164,11 @@ URL: ${link.url ?? ""}
);
}
+ setSpanAttributes({
+ "inference.summary.size": Buffer.byteLength(summaryResult.response, "utf8"),
+ "inference.totalTokens": summaryResult.totalTokens,
+ });
+
logger.info(
`[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`,
);
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 5a79fd22..668c1d5e 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,4 +1,5 @@
-import { and, Column, eq, inArray, sql } from "drizzle-orm";
+import { and, eq, inArray } from "drizzle-orm";
+import { getBookmarkDomain } from "network";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
@@ -7,14 +8,17 @@ import type {
InferenceClient,
InferenceResponse,
} from "@karakeep/shared/inference";
+import type { ZTagStyle } from "@karakeep/shared/types/users";
import { db } from "@karakeep/db";
import {
bookmarks,
bookmarkTags,
customPrompts,
tagsOnBookmarks,
+ users,
} from "@karakeep/db/schema";
import {
+ setSpanAttributes,
triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
@@ -22,7 +26,8 @@ import {
import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
+import { buildImagePrompt } from "@karakeep/shared/prompts";
+import { buildTextPrompt } from "@karakeep/shared/prompts.server";
import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
@@ -66,18 +71,21 @@ function parseJsonFromLLMResponse(response: string): unknown {
}
}
-function tagNormalizer(col: Column) {
+function tagNormalizer() {
+ // This function needs to be in sync with the generated normalizedName column in bookmarkTags
function normalizeTag(tag: string) {
return tag.toLowerCase().replace(/[ \-_]/g, "");
}
return {
normalizeTag,
- sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`,
};
}
async function buildPrompt(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
+ tagStyle: ZTagStyle,
+ inferredTagLang: string,
+ curatedTags?: string[],
): Promise<string | null> {
const prompts = await fetchCustomPrompts(bookmark.userId, "text");
if (bookmark.link) {
@@ -95,22 +103,26 @@ async function buildPrompt(
return null;
}
return await buildTextPrompt(
- serverConfig.inference.inferredTagLang,
+ inferredTagLang,
prompts,
`URL: ${bookmark.link.url}
Title: ${bookmark.link.title ?? ""}
Description: ${bookmark.link.description ?? ""}
Content: ${content ?? ""}`,
serverConfig.inference.contextLength,
+ tagStyle,
+ curatedTags,
);
}
if (bookmark.text) {
return await buildTextPrompt(
- serverConfig.inference.inferredTagLang,
+ inferredTagLang,
prompts,
bookmark.text.text ?? "",
serverConfig.inference.contextLength,
+ tagStyle,
+ curatedTags,
);
}
@@ -122,6 +134,9 @@ async function inferTagsFromImage(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
abortSignal: AbortSignal,
+ tagStyle: ZTagStyle,
+ inferredTagLang: string,
+ curatedTags?: string[],
): Promise<InferenceResponse | null> {
const { asset, metadata } = await readAsset({
userId: bookmark.userId,
@@ -141,10 +156,15 @@ async function inferTagsFromImage(
}
const base64 = asset.toString("base64");
+ setSpanAttributes({
+ "inference.model": serverConfig.inference.imageModel,
+ });
return inferenceClient.inferFromImage(
buildImagePrompt(
- serverConfig.inference.inferredTagLang,
+ inferredTagLang,
await fetchCustomPrompts(bookmark.userId, "images"),
+ tagStyle,
+ curatedTags,
),
metadata.contentType,
base64,
@@ -166,6 +186,10 @@ async function fetchCustomPrompts(
},
});
+ setSpanAttributes({
+ "inference.prompt.customCount": prompts.length,
+ });
+
let promptTexts = prompts.map((p) => p.text);
if (containsTagsPlaceholder(prompts)) {
promptTexts = await replaceTagsPlaceholders(promptTexts, userId);
@@ -214,13 +238,24 @@ async function inferTagsFromPDF(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
abortSignal: AbortSignal,
+ tagStyle: ZTagStyle,
+ inferredTagLang: string,
+ curatedTags?: string[],
) {
const prompt = await buildTextPrompt(
- serverConfig.inference.inferredTagLang,
+ inferredTagLang,
await fetchCustomPrompts(bookmark.userId, "text"),
`Content: ${bookmark.asset.content}`,
serverConfig.inference.contextLength,
+ tagStyle,
+ curatedTags,
);
+ setSpanAttributes({
+ "inference.model": serverConfig.inference.textModel,
+ });
+ setSpanAttributes({
+ "inference.prompt.size": Buffer.byteLength(prompt, "utf8"),
+ });
return inferenceClient.inferFromText(prompt, {
schema: openAIResponseSchema,
abortSignal,
@@ -231,11 +266,25 @@ async function inferTagsFromText(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
abortSignal: AbortSignal,
+ tagStyle: ZTagStyle,
+ inferredTagLang: string,
+ curatedTags?: string[],
) {
- const prompt = await buildPrompt(bookmark);
+ const prompt = await buildPrompt(
+ bookmark,
+ tagStyle,
+ inferredTagLang,
+ curatedTags,
+ );
if (!prompt) {
return null;
}
+ setSpanAttributes({
+ "inference.model": serverConfig.inference.textModel,
+ });
+ setSpanAttributes({
+ "inference.prompt.size": Buffer.byteLength(prompt, "utf8"),
+ });
return await inferenceClient.inferFromText(prompt, {
schema: openAIResponseSchema,
abortSignal,
@@ -247,10 +296,32 @@ async function inferTags(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
abortSignal: AbortSignal,
+ tagStyle: ZTagStyle,
+ inferredTagLang: string,
+ curatedTags?: string[],
) {
+ setSpanAttributes({
+ "user.id": bookmark.userId,
+ "bookmark.id": bookmark.id,
+ "bookmark.url": bookmark.link?.url,
+ "bookmark.domain": getBookmarkDomain(bookmark.link?.url),
+ "bookmark.content.type": bookmark.type,
+ "crawler.statusCode": bookmark.link?.crawlStatusCode ?? undefined,
+ "inference.tagging.style": tagStyle,
+ "inference.lang": inferredTagLang,
+ "inference.type": "tagging",
+ });
+
let response: InferenceResponse | null;
if (bookmark.link || bookmark.text) {
- response = await inferTagsFromText(bookmark, inferenceClient, abortSignal);
+ response = await inferTagsFromText(
+ bookmark,
+ inferenceClient,
+ abortSignal,
+ tagStyle,
+ inferredTagLang,
+ curatedTags,
+ );
} else if (bookmark.asset) {
switch (bookmark.asset.assetType) {
case "image":
@@ -259,6 +330,9 @@ async function inferTags(
bookmark,
inferenceClient,
abortSignal,
+ tagStyle,
+ inferredTagLang,
+ curatedTags,
);
break;
case "pdf":
@@ -267,6 +341,9 @@ async function inferTags(
bookmark,
inferenceClient,
abortSignal,
+ tagStyle,
+ inferredTagLang,
+ curatedTags,
);
break;
default:
@@ -298,6 +375,10 @@ async function inferTags(
}
return tag.trim();
});
+ setSpanAttributes({
+ "inference.tagging.numGeneratedTags": tags.length,
+ "inference.totalTokens": response.totalTokens,
+ });
return tags;
} catch (e) {
@@ -317,12 +398,10 @@ async function connectTags(
return;
}
- await db.transaction(async (tx) => {
+ const res = await db.transaction(async (tx) => {
// Attempt to match exiting tags with the new ones
const { matchedTagIds, notFoundTagNames } = await (async () => {
- const { normalizeTag, sql: normalizedTagSql } = tagNormalizer(
- bookmarkTags.name,
- );
+ const { normalizeTag } = tagNormalizer();
const normalizedInferredTags = inferredTags.map((t) => ({
originalTag: t,
normalizedTag: normalizeTag(t),
@@ -332,7 +411,7 @@ async function connectTags(
where: and(
eq(bookmarkTags.userId, userId),
inArray(
- normalizedTagSql,
+ bookmarkTags.normalizedName,
normalizedInferredTags.map((t) => t.normalizedTag),
),
),
@@ -394,17 +473,19 @@ async function connectTags(
.onConflictDoNothing()
.returning();
- await triggerRuleEngineOnEvent(bookmarkId, [
- ...detachedTags.map((t) => ({
- type: "tagRemoved" as const,
- tagId: t.tagId,
- })),
- ...attachedTags.map((t) => ({
- type: "tagAdded" as const,
- tagId: t.tagId,
- })),
- ]);
+ return { detachedTags, attachedTags };
});
+
+ await triggerRuleEngineOnEvent(bookmarkId, [
+ ...res.detachedTags.map((t) => ({
+ type: "tagRemoved" as const,
+ tagId: t.tagId,
+ })),
+ ...res.attachedTags.map((t) => ({
+ type: "tagAdded" as const,
+ tagId: t.tagId,
+ })),
+ ]);
}
async function fetchBookmark(linkId: string) {
@@ -437,6 +518,37 @@ export async function runTagging(
);
}
+ // Check user-level preference
+ const userSettings = await db.query.users.findFirst({
+ where: eq(users.id, bookmark.userId),
+ columns: {
+ autoTaggingEnabled: true,
+ tagStyle: true,
+ curatedTagIds: true,
+ inferredTagLang: true,
+ },
+ });
+
+ if (userSettings?.autoTaggingEnabled === false) {
+ logger.debug(
+ `[inference][${jobId}] Skipping tagging job for bookmark with id "${bookmarkId}" because user has disabled auto-tagging.`,
+ );
+ return;
+ }
+
+ // Resolve curated tag names if configured
+ let curatedTagNames: string[] | undefined;
+ if (userSettings?.curatedTagIds && userSettings.curatedTagIds.length > 0) {
+ const tags = await db.query.bookmarkTags.findMany({
+ where: and(
+ eq(bookmarkTags.userId, bookmark.userId),
+ inArray(bookmarkTags.id, userSettings.curatedTagIds),
+ ),
+ columns: { name: true },
+ });
+ curatedTagNames = tags.map((t) => t.name);
+ }
+
logger.info(
`[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`,
);
@@ -446,6 +558,9 @@ export async function runTagging(
bookmark,
inferenceClient,
job.abortSignal,
+ userSettings?.tagStyle ?? "as-generated",
+ userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang,
+ curatedTagNames,
);
if (tags === null) {
diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 98a9de74..ecf733cd 100644
--- a/apps/workers/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
@@ -1,6 +1,7 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import { buildImpersonatingAuthedContext } from "trpc";
+import { withWorkerTracing } from "workerTracing";
import type { ZRuleEngineRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -20,7 +21,7 @@ export class RuleEngineWorker {
const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>(
RuleEngineQueue,
{
- run: runRuleEngine,
+ run: withWorkerTracing("ruleEngineWorker.run", runRuleEngine),
onComplete: (job) => {
workerStatsCounter.labels("ruleEngine", "completed").inc();
const jobId = job.id;
@@ -66,14 +67,21 @@ async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) {
const bookmark = await getBookmarkUserId(bookmarkId);
if (!bookmark) {
- throw new Error(
- `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ logger.info(
+ `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found, skipping`,
);
+ return;
}
const userId = bookmark.userId;
const authedCtx = await buildImpersonatingAuthedContext(userId);
const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId);
+ if (!ruleEngine) {
+ logger.info(
+ `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found during rule evaluation, skipping`,
+ );
+ return;
+ }
const results = (
await Promise.all(events.map((event) => ruleEngine.onEvent(event)))
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index fed30c9b..b0608dce 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
+import { withWorkerTracing } from "workerTracing";
import type { ZSearchIndexingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
@@ -25,7 +26,7 @@ export class SearchIndexingWorker {
(await getQueueClient())!.createRunner<ZSearchIndexingRequest>(
SearchIndexingQueue,
{
- run: runSearchIndexing,
+ run: withWorkerTracing("searchWorker.run", runSearchIndexing),
onComplete: (job) => {
workerStatsCounter.labels("search", "completed").inc();
const jobId = job.id;
@@ -55,7 +56,11 @@ export class SearchIndexingWorker {
}
}
-async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) {
+async function runIndex(
+ searchClient: SearchIndexClient,
+ bookmarkId: string,
+ batch: boolean,
+) {
const bookmark = await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, bookmarkId),
with: {
@@ -106,11 +111,15 @@ async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) {
tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
};
- await searchClient.addDocuments([document]);
+ await searchClient.addDocuments([document], { batch });
}
-async function runDelete(searchClient: SearchIndexClient, bookmarkId: string) {
- await searchClient.deleteDocuments([bookmarkId]);
+async function runDelete(
+ searchClient: SearchIndexClient,
+ bookmarkId: string,
+ batch: boolean,
+) {
+ await searchClient.deleteDocuments([bookmarkId], { batch });
}
async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
@@ -132,17 +141,20 @@ async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
}
const bookmarkId = request.data.bookmarkId;
+ // Disable batching on retries (runNumber > 0) for improved reliability
+ const batch = job.runNumber === 0;
+
logger.info(
- `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`,
+ `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} (run ${job.runNumber}, batch=${batch}) ...`,
);
switch (request.data.type) {
case "index": {
- await runIndex(searchClient, bookmarkId);
+ await runIndex(searchClient, bookmarkId, batch);
break;
}
case "delete": {
- await runDelete(searchClient, bookmarkId);
+ await runDelete(searchClient, bookmarkId, batch);
break;
}
}
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index 03525fdf..1ffbf674 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -4,6 +4,7 @@ import path from "path";
import { execa } from "execa";
import { workerStatsCounter } from "metrics";
import { getProxyAgent, validateUrl } from "network";
+import { withWorkerTracing } from "workerTracing";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
@@ -35,7 +36,7 @@ export class VideoWorker {
return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
- run: runWorker,
+ run: withWorkerTracing("videoWorker.run", runWorker),
onComplete: async (job) => {
workerStatsCounter.labels("video", "completed").inc();
const jobId = job.id;
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 0d661372..875a0ac6 100644
--- a/apps/workers/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts
@@ -1,6 +1,7 @@
import { eq } from "drizzle-orm";
import { workerStatsCounter } from "metrics";
import { fetchWithProxy } from "network";
+import { withWorkerTracing } from "workerTracing";
import { db } from "@karakeep/db";
import { bookmarks, webhooksTable } from "@karakeep/db/schema";
@@ -19,7 +20,7 @@ export class WebhookWorker {
const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>(
WebhookQueue,
{
- run: runWebhook,
+ run: withWorkerTracing("webhookWorker.run", runWebhook),
onComplete: async (job) => {
workerStatsCounter.labels("webhook", "completed").inc();
const jobId = job.id;