aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/feedWorker.ts149
-rw-r--r--apps/workers/index.ts8
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/trpc.ts33
4 files changed, 190 insertions, 2 deletions
diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts
new file mode 100644
index 00000000..1bd24641
--- /dev/null
+++ b/apps/workers/feedWorker.ts
@@ -0,0 +1,149 @@
+import { and, eq, inArray } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import Parser from "rss-parser";
+import { buildImpersonatingTRPCClient } from "trpc";
+
+import type { ZFeedRequestSchema } from "@hoarder/shared/queues";
+import { db } from "@hoarder/db";
+import { rssFeedImportsTable, rssFeedsTable } from "@hoarder/db/schema";
+import logger from "@hoarder/shared/logger";
+import { FeedQueue } from "@hoarder/shared/queues";
+import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
+
+export class FeedWorker {
+ static build() {
+ logger.info("Starting feed worker ...");
+ const worker = new Runner<ZFeedRequestSchema>(
+ FeedQueue,
+ {
+ run: run,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[feed][${jobId}] Completed successfully`);
+ await db
+ .update(rssFeedsTable)
+ .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() })
+ .where(eq(rssFeedsTable.id, job.data?.feedId));
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`,
+ );
+ if (job.data) {
+ await db
+ .update(rssFeedsTable)
+ .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() })
+ .where(eq(rssFeedsTable.id, job.data?.feedId));
+ }
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function run(req: DequeuedJob<ZFeedRequestSchema>) {
+ const jobId = req.id;
+ const feed = await db.query.rssFeedsTable.findFirst({
+ where: eq(rssFeedsTable.id, req.data.feedId),
+ });
+ if (!feed) {
+ throw new Error(
+ `[feed][${jobId}] Feed with id ${req.data.feedId} not found`,
+ );
+ }
+
+ const response = await fetch(feed.url, {
+ signal: AbortSignal.timeout(5000),
+ });
+ const contentType = response.headers.get("content-type");
+ if (!contentType || !contentType.includes("application/xml")) {
+ throw new Error(
+ `[feed][${jobId}] Feed with id ${req.data.feedId} is not a valid RSS feed`,
+ );
+ }
+ const xmlData = await response.text();
+
+ logger.info(
+ `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`,
+ );
+
+ const parser = new Parser();
+ const feedData = await parser.parseString(xmlData);
+
+ logger.info(
+ `[feed][${jobId}] Found ${feedData.items.length} entries in feed "${feed.name}" (${feed.id}) ...`,
+ );
+
+ if (feedData.items.length === 0) {
+ logger.info(`[feed][${jobId}] No entries found.`);
+ return;
+ }
+
+ const exitingEntries = await db.query.rssFeedImportsTable.findMany({
+ where: and(
+ eq(rssFeedImportsTable.rssFeedId, feed.id),
+ inArray(
+ rssFeedImportsTable.entryId,
+ feedData.items
+ .map((item) => item.guid)
+ .filter((id): id is string => !!id),
+ ),
+ ),
+ });
+
+ const newEntries = feedData.items.filter(
+ (item) =>
+ !exitingEntries.some((entry) => entry.entryId === item.guid) && item.link,
+ );
+
+ if (newEntries.length === 0) {
+ logger.info(
+ `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`,
+ );
+ return;
+ }
+
+ logger.info(
+ `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`,
+ );
+
+ const trpcClient = await buildImpersonatingTRPCClient(feed.userId);
+
+ const createdBookmarks = await Promise.allSettled(
+ newEntries.map((item) =>
+ trpcClient.bookmarks.createBookmark({
+ type: BookmarkTypes.LINK,
+ url: item.link!,
+ }),
+ ),
+ );
+
+ // It's ok if this is not transactional as the bookmarks will get linked in the next iteration.
+ await db
+ .insert(rssFeedImportsTable)
+ .values(
+ newEntries.map((item, idx) => {
+ const b = createdBookmarks[idx];
+ return {
+ entryId: item.guid!,
+ bookmarkId: b.status === "fulfilled" ? b.value.id : null,
+ rssFeedId: feed.id,
+ };
+ }),
+ )
+ .onConflictDoNothing();
+
+ logger.info(
+ `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`,
+ );
+
+ return Promise.resolve();
+}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 3b5896e4..c8978adc 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,5 +1,6 @@
import "dotenv/config";
+import { FeedWorker } from "feedWorker";
import { TidyAssetsWorker } from "tidyAssetsWorker";
import serverConfig from "@hoarder/shared/config";
@@ -16,12 +17,13 @@ async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [crawler, openai, search, tidyAssets, video] = [
+ const [crawler, openai, search, tidyAssets, video, feed] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
VideoWorker.build(),
+ FeedWorker.build(),
];
await Promise.any([
@@ -31,11 +33,12 @@ async function main() {
search.run(),
tidyAssets.run(),
video.run(),
+ feed.run(),
]),
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video and search workers ...",
+ "Shutting down crawler, openai, tidyAssets, video, feed and search workers ...",
);
crawler.stop();
@@ -43,6 +46,7 @@ async function main() {
search.stop();
tidyAssets.stop();
video.stop();
+ feed.stop();
}
main();
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 7f64e715..a7579319 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -6,6 +6,7 @@
"dependencies": {
"@hoarder/db": "workspace:^0.1.0",
"@hoarder/shared": "workspace:^0.1.0",
+ "@hoarder/trpc": "workspace:^0.1.0",
"@hoarder/tsconfig": "workspace:^0.1.0",
"@mozilla/readability": "^0.5.0",
"@tsconfig/node21": "^21.0.1",
@@ -32,6 +33,7 @@
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-adblocker": "^2.13.6",
"puppeteer-extra-plugin-stealth": "^2.11.2",
+ "rss-parser": "^3.13.0",
"tesseract.js": "^5.1.1",
"tsx": "^4.7.1",
"typescript": "^5.3.3",
diff --git a/apps/workers/trpc.ts b/apps/workers/trpc.ts
new file mode 100644
index 00000000..cd2e4c99
--- /dev/null
+++ b/apps/workers/trpc.ts
@@ -0,0 +1,33 @@
+import { eq } from "drizzle-orm";
+
+import { db } from "@hoarder/db";
+import { users } from "@hoarder/db/schema";
+import { createCallerFactory } from "@hoarder/trpc";
+import { appRouter } from "@hoarder/trpc/routers/_app";
+
+/**
+ * This is only safe to use in the context of a worker.
+ */
+export async function buildImpersonatingTRPCClient(userId: string) {
+ const createCaller = createCallerFactory(appRouter);
+
+ const user = await db.query.users.findFirst({
+ where: eq(users.id, userId),
+ });
+ if (!user) {
+ throw new Error("User not found");
+ }
+
+ return createCaller({
+ user: {
+ id: user.id,
+ name: user.name,
+ email: user.email,
+ role: user.role,
+ },
+ db,
+ req: {
+ ip: null,
+ },
+ });
+}