From cf1a25131fd45ab7c9a72b837be525c24457cd8b Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 3 Nov 2024 17:09:47 +0000 Subject: feature: Add support for subscribing to RSS feeds. Fixes #202 --- apps/web/app/settings/feeds/page.tsx | 5 + apps/web/components/settings/FeedSettings.tsx | 398 +++++++++++++++++++++++++ apps/web/components/settings/sidebar/items.tsx | 14 +- apps/workers/feedWorker.ts | 149 +++++++++ apps/workers/index.ts | 8 +- apps/workers/package.json | 2 + apps/workers/trpc.ts | 33 ++ 7 files changed, 606 insertions(+), 3 deletions(-) create mode 100644 apps/web/app/settings/feeds/page.tsx create mode 100644 apps/web/components/settings/FeedSettings.tsx create mode 100644 apps/workers/feedWorker.ts create mode 100644 apps/workers/trpc.ts (limited to 'apps') diff --git a/apps/web/app/settings/feeds/page.tsx b/apps/web/app/settings/feeds/page.tsx new file mode 100644 index 00000000..d2d1f182 --- /dev/null +++ b/apps/web/app/settings/feeds/page.tsx @@ -0,0 +1,5 @@ +import FeedSettings from "@/components/settings/FeedSettings"; + +export default function FeedSettingsPage() { + return ; +} diff --git a/apps/web/components/settings/FeedSettings.tsx b/apps/web/components/settings/FeedSettings.tsx new file mode 100644 index 00000000..9f2dbda9 --- /dev/null +++ b/apps/web/components/settings/FeedSettings.tsx @@ -0,0 +1,398 @@ +"use client"; + +import React from "react"; +import { ActionButton } from "@/components/ui/action-button"; +import { + Form, + FormControl, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; +import { FullPageSpinner } from "@/components/ui/full-page-spinner"; +import { Input } from "@/components/ui/input"; +import { toast } from "@/components/ui/use-toast"; +import { api } from "@/lib/trpc"; +import { zodResolver } from "@hookform/resolvers/zod"; +import { + ArrowDownToLine, + CheckCircle, + CircleDashed, + Edit, + Plus, + Save, + Trash2, + XCircle, +} from "lucide-react"; +import { useForm } from "react-hook-form"; +import { z } from "zod"; + +import { + ZFeed, + zNewFeedSchema, + zUpdateFeedSchema, +} from "@hoarder/shared/types/feeds"; + +import ActionConfirmingDialog from "../ui/action-confirming-dialog"; +import { Button } from "../ui/button"; +import { + Dialog, + DialogClose, + DialogContent, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "../ui/dialog"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "../ui/table"; + +export function FeedsEditorDialog() { + const [open, setOpen] = React.useState(false); + const apiUtils = api.useUtils(); + + const form = useForm>({ + resolver: zodResolver(zNewFeedSchema), + defaultValues: { + name: "", + url: "", + }, + }); + + React.useEffect(() => { + if (open) { + form.reset(); + } + }, [open]); + + const { mutateAsync: createFeed, isPending: isCreating } = + api.feeds.create.useMutation({ + onSuccess: () => { + toast({ + description: "Feed has been created!", + }); + apiUtils.feeds.list.invalidate(); + setOpen(false); + }, + }); + + return ( + + + + + + + Subscribe to a new Feed + +
+ { + await createFeed(value); + form.resetField("name"); + form.resetField("url"); + })} + > + { + return ( + + Name + + + + + + ); + }} + /> + { + return ( + + URL + + + + + + ); + }} + /> + + + + + + + { + await createFeed(value); + })} + loading={isCreating} + variant="default" + className="items-center" + > + + Add + + +
+
+ ); +} + +export function EditFeedDialog({ feed }: { feed: ZFeed }) { + const apiUtils = api.useUtils(); + const [open, setOpen] = React.useState(false); + React.useEffect(() => { + if (open) { + form.reset({ + feedId: feed.id, + name: feed.name, + url: feed.url, + }); + } + }, [open]); + const { mutateAsync: updateFeed, isPending: isUpdating } = + api.feeds.update.useMutation({ + onSuccess: () => { + toast({ + description: "Feed has been updated!", + }); + setOpen(false); + apiUtils.feeds.list.invalidate(); + }, + }); + const form = useForm>({ + resolver: zodResolver(zUpdateFeedSchema), + defaultValues: { + feedId: feed.id, + name: feed.name, + url: feed.url, + }, + }); + return ( + + + + + + + Edit Feed + + +
+ { + await updateFeed(value); + })} + > + { + return ( + + + + + + + ); + }} + /> + { + return ( + + Name + + + + + + ); + }} + /> + { + return ( + + URL + + + + + + ); + }} + /> + + + + + + + { + await updateFeed(value); + })} + type="submit" + className="items-center" + > + + Save + + +
+
+ ); +} + +export function FeedRow({ feed }: { feed: ZFeed }) { + const apiUtils = api.useUtils(); + const { mutate: deleteFeed, isPending: isDeleting } = + api.feeds.delete.useMutation({ + onSuccess: () => { + toast({ + description: "Feed has been deleted!", + }); + apiUtils.feeds.list.invalidate(); + }, + }); + + const { mutate: fetchNow, isPending: isFetching } = + api.feeds.fetchNow.useMutation({ + onSuccess: () => { + toast({ + description: "Feed fetch has been enqueued!", + }); + apiUtils.feeds.list.invalidate(); + }, + }); + + return ( + + {feed.name} + {feed.url} + {feed.lastFetchedAt?.toLocaleString()} + + {feed.lastFetchedStatus === "success" ? ( + + + + ) : feed.lastFetchedStatus === "failure" ? ( + + + + ) : ( + + + + )} + + + + fetchNow({ feedId: feed.id })} + > + + Fetch Now + + ( + deleteFeed({ feedId: feed.id })} + className="items-center" + type="button" + > + + Delete + + )} + > + + + + + ); +} + +export default function FeedSettings() { + const { data: feeds, isLoading } = api.feeds.list.useQuery(); + return ( + <> +
+
+
+
RSS Subscriptions
+ +
+ {isLoading && } + {feeds && feeds.feeds.length == 0 && ( +

+ You don't have any RSS subscriptions yet. +

+ )} + {feeds && feeds.feeds.length > 0 && ( + + + + Name + URL + Last Fetch + Last Status + Actions + + + + {feeds.feeds.map((feed) => ( + + ))} + +
+ )} +
+
+ + ); +} diff --git a/apps/web/components/settings/sidebar/items.tsx b/apps/web/components/settings/sidebar/items.tsx index 999825db..047ee233 100644 --- a/apps/web/components/settings/sidebar/items.tsx +++ b/apps/web/components/settings/sidebar/items.tsx @@ -1,5 +1,12 @@ import React from "react"; -import { ArrowLeft, Download, KeyRound, Sparkles, User } from "lucide-react"; +import { + ArrowLeft, + Download, + KeyRound, + Rss, + Sparkles, + User, +} from "lucide-react"; export const settingsSidebarItems: { name: string; @@ -21,6 +28,11 @@ export const settingsSidebarItems: { icon: , path: "/settings/ai", }, + { + name: "RSS Subscriptions", + icon: , + path: "/settings/feeds", + }, { name: "Import / Export", icon: , diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts new file mode 100644 index 00000000..1bd24641 --- /dev/null +++ b/apps/workers/feedWorker.ts @@ -0,0 +1,149 @@ +import { and, eq, inArray } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import Parser from "rss-parser"; +import { buildImpersonatingTRPCClient } from "trpc"; + +import type { ZFeedRequestSchema } from "@hoarder/shared/queues"; +import { db } from "@hoarder/db"; +import { rssFeedImportsTable, rssFeedsTable } from "@hoarder/db/schema"; +import logger from "@hoarder/shared/logger"; +import { FeedQueue } from "@hoarder/shared/queues"; +import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; + +export class FeedWorker { + static build() { + logger.info("Starting feed worker ..."); + const worker = new Runner( + FeedQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[feed][${jobId}] Completed successfully`); + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.data) { + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob) { + const jobId = req.id; + const feed = await db.query.rssFeedsTable.findFirst({ + where: eq(rssFeedsTable.id, req.data.feedId), + }); + if (!feed) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, + ); + } + + const response = await fetch(feed.url, { + signal: AbortSignal.timeout(5000), + }); + const contentType = response.headers.get("content-type"); + if (!contentType || !contentType.includes("application/xml")) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} is not a valid RSS feed`, + ); + } + const xmlData = await response.text(); + + logger.info( + `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, + ); + + const parser = new Parser(); + const feedData = await parser.parseString(xmlData); + + logger.info( + `[feed][${jobId}] Found ${feedData.items.length} entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + if (feedData.items.length === 0) { + logger.info(`[feed][${jobId}] No entries found.`); + return; + } + + const exitingEntries = await db.query.rssFeedImportsTable.findMany({ + where: and( + eq(rssFeedImportsTable.rssFeedId, feed.id), + inArray( + rssFeedImportsTable.entryId, + feedData.items + .map((item) => item.guid) + .filter((id): id is string => !!id), + ), + ), + }); + + const newEntries = feedData.items.filter( + (item) => + !exitingEntries.some((entry) => entry.entryId === item.guid) && item.link, + ); + + if (newEntries.length === 0) { + logger.info( + `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, + ); + return; + } + + logger.info( + `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + const trpcClient = await buildImpersonatingTRPCClient(feed.userId); + + const createdBookmarks = await Promise.allSettled( + newEntries.map((item) => + trpcClient.bookmarks.createBookmark({ + type: BookmarkTypes.LINK, + url: item.link!, + }), + ), + ); + + // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. + await db + .insert(rssFeedImportsTable) + .values( + newEntries.map((item, idx) => { + const b = createdBookmarks[idx]; + return { + entryId: item.guid!, + bookmarkId: b.status === "fulfilled" ? b.value.id : null, + rssFeedId: feed.id, + }; + }), + ) + .onConflictDoNothing(); + + logger.info( + `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, + ); + + return Promise.resolve(); +} diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 3b5896e4..c8978adc 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,5 +1,6 @@ import "dotenv/config"; +import { FeedWorker } from "feedWorker"; import { TidyAssetsWorker } from "tidyAssetsWorker"; import serverConfig from "@hoarder/shared/config"; @@ -16,12 +17,13 @@ async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets, video] = [ + const [crawler, openai, search, tidyAssets, video, feed] = [ await CrawlerWorker.build(), OpenAiWorker.build(), SearchIndexingWorker.build(), TidyAssetsWorker.build(), VideoWorker.build(), + FeedWorker.build(), ]; await Promise.any([ @@ -31,11 +33,12 @@ async function main() { search.run(), tidyAssets.run(), video.run(), + feed.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed and search workers ...", ); crawler.stop(); @@ -43,6 +46,7 @@ async function main() { search.stop(); tidyAssets.stop(); video.stop(); + feed.stop(); } main(); diff --git a/apps/workers/package.json b/apps/workers/package.json index 7f64e715..a7579319 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -6,6 +6,7 @@ "dependencies": { "@hoarder/db": "workspace:^0.1.0", "@hoarder/shared": "workspace:^0.1.0", + "@hoarder/trpc": "workspace:^0.1.0", "@hoarder/tsconfig": "workspace:^0.1.0", "@mozilla/readability": "^0.5.0", "@tsconfig/node21": "^21.0.1", @@ -32,6 +33,7 @@ "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-adblocker": "^2.13.6", "puppeteer-extra-plugin-stealth": "^2.11.2", + "rss-parser": "^3.13.0", "tesseract.js": "^5.1.1", "tsx": "^4.7.1", "typescript": "^5.3.3", diff --git a/apps/workers/trpc.ts b/apps/workers/trpc.ts new file mode 100644 index 00000000..cd2e4c99 --- /dev/null +++ b/apps/workers/trpc.ts @@ -0,0 +1,33 @@ +import { eq } from "drizzle-orm"; + +import { db } from "@hoarder/db"; +import { users } from "@hoarder/db/schema"; +import { createCallerFactory } from "@hoarder/trpc"; +import { appRouter } from "@hoarder/trpc/routers/_app"; + +/** + * This is only safe to use in the context of a worker. + */ +export async function buildImpersonatingTRPCClient(userId: string) { + const createCaller = createCallerFactory(appRouter); + + const user = await db.query.users.findFirst({ + where: eq(users.id, userId), + }); + if (!user) { + throw new Error("User not found"); + } + + return createCaller({ + user: { + id: user.id, + name: user.name, + email: user.email, + role: user.role, + }, + db, + req: { + ip: null, + }, + }); +} -- cgit v1.2.3-70-g09d2