diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
| commit | 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 (patch) | |
| tree | 3119581aafce5321aaba9719ba3b2597d000d564 /apps/workers/feedWorker.ts | |
| parent | a5ae67c241d8cdd452acd4d98800ec61740c041f (diff) | |
| download | karakeep-2743d9e38ecfdbf757d4d2f97bcf09d601245b59.tar.zst | |
feat: Add AI auto summarization. Fixes #1163
Diffstat (limited to 'apps/workers/feedWorker.ts')
| -rw-r--r-- | apps/workers/feedWorker.ts | 215 |
1 files changed, 0 insertions, 215 deletions
diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts deleted file mode 100644 index 1eaba0c3..00000000 --- a/apps/workers/feedWorker.ts +++ /dev/null @@ -1,215 +0,0 @@ -import { and, eq, inArray } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import cron from "node-cron"; -import Parser from "rss-parser"; -import { buildImpersonatingTRPCClient } from "trpc"; -import { z } from "zod"; - -import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { FeedQueue } from "@karakeep/shared/queues"; -import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; - -export const FeedRefreshingWorker = cron.schedule( - "0 * * * *", - () => { - logger.info("[feed] Scheduling feed refreshing jobs ..."); - db.query.rssFeedsTable - .findMany({ - columns: { - id: true, - }, - where: eq(rssFeedsTable.enabled, true), - }) - .then((feeds) => { - for (const feed of feeds) { - FeedQueue.enqueue( - { - feedId: feed.id, - }, - { - idempotencyKey: feed.id, - }, - ); - } - }); - }, - { - runOnInit: false, - scheduled: false, - }, -); - -export class FeedWorker { - static build() { - logger.info("Starting feed worker ..."); - const worker = new Runner<ZFeedRequestSchema>( - FeedQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[feed][${jobId}] Completed successfully`); - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.data) { - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function run(req: DequeuedJob<ZFeedRequestSchema>) { - const jobId = req.id; - const feed = await db.query.rssFeedsTable.findFirst({ - where: eq(rssFeedsTable.id, req.data.feedId), - }); - if (!feed) { - throw new Error( - `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, - ); - } - logger.info( - `[feed][${jobId}] Starting fetching feed "${feed.name}" (${feed.id}) ...`, - ); - - const response = await fetch(feed.url, { - signal: AbortSignal.timeout(5000), - headers: { - UserAgent: - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - Accept: "application/rss+xml", - }, - }); - if (response.status !== 200) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) returned a non-success status: ${response.status}.`, - ); - } - const contentType = response.headers.get("content-type"); - if (!contentType || !contentType.includes("xml")) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) is not a valid RSS feed`, - ); - } - const xmlData = await response.text(); - - logger.info( - `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, - ); - - const parser = new Parser({ - customFields: { - item: ["id"], - }, - }); - const unparseFeedData = await parser.parseString(xmlData); - - // Apparently, we can't trust the output of the xml parser. So let's do our own type - // validation. - const feedItemsSchema = z.object({ - id: z.coerce.string(), - link: z.string().optional(), - guid: z.string().optional(), - }); - - const feedItems = unparseFeedData.items - .map((i) => feedItemsSchema.safeParse(i)) - .flatMap((i) => (i.success ? [i.data] : [])); - - logger.info( - `[feed][${jobId}] Found ${feedItems.length} entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - if (feedItems.length === 0) { - logger.info(`[feed][${jobId}] No entries found.`); - return; - } - - // For feeds that don't have guids, use the link as the id - feedItems.forEach((item) => { - item.guid = item.guid ?? `${item.id}` ?? item.link; - }); - - const exitingEntries = await db.query.rssFeedImportsTable.findMany({ - where: and( - eq(rssFeedImportsTable.rssFeedId, feed.id), - inArray( - rssFeedImportsTable.entryId, - feedItems.map((item) => item.guid).filter((id): id is string => !!id), - ), - ), - }); - - const newEntries = feedItems.filter( - (item) => - !exitingEntries.some((entry) => entry.entryId === item.guid) && - item.link && - item.guid, - ); - - if (newEntries.length === 0) { - logger.info( - `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, - ); - return; - } - - logger.info( - `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - const trpcClient = await buildImpersonatingTRPCClient(feed.userId); - - const createdBookmarks = await Promise.allSettled( - newEntries.map((item) => - trpcClient.bookmarks.createBookmark({ - type: BookmarkTypes.LINK, - url: item.link!, - }), - ), - ); - - // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. - await db - .insert(rssFeedImportsTable) - .values( - newEntries.map((item, idx) => { - const b = createdBookmarks[idx]; - return { - entryId: item.guid!, - bookmarkId: b.status === "fulfilled" ? b.value.id : null, - rssFeedId: feed.id, - }; - }), - ) - .onConflictDoNothing(); - - logger.info( - `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, - ); - - return Promise.resolve(); -} |
