diff options
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/feedWorker.ts | 149 | ||||
| -rw-r--r-- | apps/workers/index.ts | 8 | ||||
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/trpc.ts | 33 |
4 files changed, 190 insertions, 2 deletions
diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts new file mode 100644 index 00000000..1bd24641 --- /dev/null +++ b/apps/workers/feedWorker.ts @@ -0,0 +1,149 @@ +import { and, eq, inArray } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import Parser from "rss-parser"; +import { buildImpersonatingTRPCClient } from "trpc"; + +import type { ZFeedRequestSchema } from "@hoarder/shared/queues"; +import { db } from "@hoarder/db"; +import { rssFeedImportsTable, rssFeedsTable } from "@hoarder/db/schema"; +import logger from "@hoarder/shared/logger"; +import { FeedQueue } from "@hoarder/shared/queues"; +import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; + +export class FeedWorker { + static build() { + logger.info("Starting feed worker ..."); + const worker = new Runner<ZFeedRequestSchema>( + FeedQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[feed][${jobId}] Completed successfully`); + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.data) { + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob<ZFeedRequestSchema>) { + const jobId = req.id; + const feed = await db.query.rssFeedsTable.findFirst({ + where: eq(rssFeedsTable.id, req.data.feedId), + }); + if (!feed) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, + ); + } + + const response = await fetch(feed.url, { + signal: AbortSignal.timeout(5000), + }); + const contentType = response.headers.get("content-type"); + if (!contentType || !contentType.includes("application/xml")) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} is not a valid RSS feed`, + ); + } + const xmlData = await response.text(); + + logger.info( + `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, + ); + + const parser = new Parser(); + const feedData = await parser.parseString(xmlData); + + logger.info( + `[feed][${jobId}] Found ${feedData.items.length} entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + if (feedData.items.length === 0) { + logger.info(`[feed][${jobId}] No entries found.`); + return; + } + + const exitingEntries = await db.query.rssFeedImportsTable.findMany({ + where: and( + eq(rssFeedImportsTable.rssFeedId, feed.id), + inArray( + rssFeedImportsTable.entryId, + feedData.items + .map((item) => item.guid) + .filter((id): id is string => !!id), + ), + ), + }); + + const newEntries = feedData.items.filter( + (item) => + !exitingEntries.some((entry) => entry.entryId === item.guid) && item.link, + ); + + if (newEntries.length === 0) { + logger.info( + `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, + ); + return; + } + + logger.info( + `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + const trpcClient = await buildImpersonatingTRPCClient(feed.userId); + + const createdBookmarks = await Promise.allSettled( + newEntries.map((item) => + trpcClient.bookmarks.createBookmark({ + type: BookmarkTypes.LINK, + url: item.link!, + }), + ), + ); + + // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. + await db + .insert(rssFeedImportsTable) + .values( + newEntries.map((item, idx) => { + const b = createdBookmarks[idx]; + return { + entryId: item.guid!, + bookmarkId: b.status === "fulfilled" ? b.value.id : null, + rssFeedId: feed.id, + }; + }), + ) + .onConflictDoNothing(); + + logger.info( + `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, + ); + + return Promise.resolve(); +} diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 3b5896e4..c8978adc 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,5 +1,6 @@ import "dotenv/config"; +import { FeedWorker } from "feedWorker"; import { TidyAssetsWorker } from "tidyAssetsWorker"; import serverConfig from "@hoarder/shared/config"; @@ -16,12 +17,13 @@ async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets, video] = [ + const [crawler, openai, search, tidyAssets, video, feed] = [ await CrawlerWorker.build(), OpenAiWorker.build(), SearchIndexingWorker.build(), TidyAssetsWorker.build(), VideoWorker.build(), + FeedWorker.build(), ]; await Promise.any([ @@ -31,11 +33,12 @@ async function main() { search.run(), tidyAssets.run(), video.run(), + feed.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed and search workers ...", ); crawler.stop(); @@ -43,6 +46,7 @@ async function main() { search.stop(); tidyAssets.stop(); video.stop(); + feed.stop(); } main(); diff --git a/apps/workers/package.json b/apps/workers/package.json index 7f64e715..a7579319 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -6,6 +6,7 @@ "dependencies": { "@hoarder/db": "workspace:^0.1.0", "@hoarder/shared": "workspace:^0.1.0", + "@hoarder/trpc": "workspace:^0.1.0", "@hoarder/tsconfig": "workspace:^0.1.0", "@mozilla/readability": "^0.5.0", "@tsconfig/node21": "^21.0.1", @@ -32,6 +33,7 @@ "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-adblocker": "^2.13.6", "puppeteer-extra-plugin-stealth": "^2.11.2", + "rss-parser": "^3.13.0", "tesseract.js": "^5.1.1", "tsx": "^4.7.1", "typescript": "^5.3.3", diff --git a/apps/workers/trpc.ts b/apps/workers/trpc.ts new file mode 100644 index 00000000..cd2e4c99 --- /dev/null +++ b/apps/workers/trpc.ts @@ -0,0 +1,33 @@ +import { eq } from "drizzle-orm"; + +import { db } from "@hoarder/db"; +import { users } from "@hoarder/db/schema"; +import { createCallerFactory } from "@hoarder/trpc"; +import { appRouter } from "@hoarder/trpc/routers/_app"; + +/** + * This is only safe to use in the context of a worker. + */ +export async function buildImpersonatingTRPCClient(userId: string) { + const createCaller = createCallerFactory(appRouter); + + const user = await db.query.users.findFirst({ + where: eq(users.id, userId), + }); + if (!user) { + throw new Error("User not found"); + } + + return createCaller({ + user: { + id: user.id, + name: user.name, + email: user.email, + role: user.role, + }, + db, + req: { + ip: null, + }, + }); +} |
