diff options
Diffstat (limited to 'packages/workers')
| -rw-r--r-- | packages/workers/crawler.ts | 8 | ||||
| -rw-r--r-- | packages/workers/index.ts | 6 | ||||
| -rw-r--r-- | packages/workers/openai.ts | 24 | ||||
| -rw-r--r-- | packages/workers/search.ts | 115 |
4 files changed, 144 insertions, 9 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts index bfb46218..7be014a7 100644 --- a/packages/workers/crawler.ts +++ b/packages/workers/crawler.ts @@ -2,6 +2,7 @@ import logger from "@hoarder/shared/logger"; import { LinkCrawlerQueue, OpenAIQueue, + SearchIndexingQueue, ZCrawlLinkRequest, queueConnectionDetails, zCrawlLinkRequestSchema, @@ -30,6 +31,7 @@ import assert from "assert"; import serverConfig from "@hoarder/shared/config"; import { bookmarkLinks } from "@hoarder/db/schema"; import { eq } from "drizzle-orm"; +import { SearchIndexingWorker } from "./search"; const metascraperParser = metascraper([ metascraperReadability(), @@ -172,4 +174,10 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { OpenAIQueue.add("openai", { bookmarkId, }); + + // Update the search index + SearchIndexingQueue.add("search_indexing", { + bookmarkId, + type: "index", + }); } diff --git a/packages/workers/index.ts b/packages/workers/index.ts index 67be7af2..295eeaef 100644 --- a/packages/workers/index.ts +++ b/packages/workers/index.ts @@ -1,14 +1,16 @@ import "dotenv/config"; import { CrawlerWorker } from "./crawler"; import { OpenAiWorker } from "./openai"; +import { SearchIndexingWorker } from "./search"; async function main() { - const [crawler, openai] = [ + const [crawler, openai, search] = [ await CrawlerWorker.build(), await OpenAiWorker.build(), + await SearchIndexingWorker.build(), ]; - await Promise.all([crawler.run(), openai.run()]); + await Promise.all([crawler.run(), openai.run(), search.run()]); } main(); diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts index 8f85c4ec..cc456616 100644 --- a/packages/workers/openai.ts +++ b/packages/workers/openai.ts @@ -3,6 +3,7 @@ import logger from "@hoarder/shared/logger"; import serverConfig from "@hoarder/shared/config"; import { OpenAIQueue, + SearchIndexingQueue, ZOpenAIRequest, queueConnectionDetails, zOpenAIRequestSchema, @@ -159,13 +160,16 @@ async function connectTags(bookmarkId: string, tagIds: string[]) { if (tagIds.length == 0) { return; } - await db.insert(tagsOnBookmarks).values( - tagIds.map((tagId) => ({ - tagId, - bookmarkId, - attachedBy: "ai" as const, - })), - ); + await db + .insert(tagsOnBookmarks) + .values( + tagIds.map((tagId) => ({ + tagId, + bookmarkId, + attachedBy: "ai" as const, + })), + ) + .onConflictDoNothing(); } async function runOpenAI(job: Job<ZOpenAIRequest, void>) { @@ -203,4 +207,10 @@ async function runOpenAI(job: Job<ZOpenAIRequest, void>) { const tagIds = await createTags(tags, bookmark.userId); await connectTags(bookmarkId, tagIds); + + // Update the search index + SearchIndexingQueue.add("search_indexing", { + bookmarkId, + type: "index", + }); } diff --git a/packages/workers/search.ts b/packages/workers/search.ts new file mode 100644 index 00000000..a628b2ed --- /dev/null +++ b/packages/workers/search.ts @@ -0,0 +1,115 @@ +import { db } from "@hoarder/db"; +import logger from "@hoarder/shared/logger"; +import { getSearchIdxClient } from "@hoarder/shared/search"; +import { + SearchIndexingQueue, + ZSearchIndexingRequest, + queueConnectionDetails, + zSearchIndexingRequestSchema, +} from "@hoarder/shared/queues"; +import { Job } from "bullmq"; +import { Worker } from "bullmq"; +import { bookmarks } from "@hoarder/db/schema"; +import { eq } from "drizzle-orm"; + +export class SearchIndexingWorker { + static async build() { + logger.info("Starting search indexing worker ..."); + const worker = new Worker<ZSearchIndexingRequest, void>( + SearchIndexingQueue.name, + runSearchIndexing, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[search][${jobId}] openai job failed: ${error}`); + }); + + return worker; + } +} + +async function runIndex( + searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, + bookmarkId: string, +) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + text: true, + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark ${bookmarkId} not found`); + } + + searchClient.addDocuments([ + { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + title: bookmark.link.title, + description: bookmark.link.description, + } + : undefined), + ...(bookmark.text ? { content: bookmark.text.text } : undefined), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }, + ]); +} + +async function runDelete( + searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, + bookmarkId: string, +) { + await searchClient.deleteDocument(bookmarkId); +} + +async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) { + const jobId = job.id || "unknown"; + + const request = zSearchIndexingRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const searchClient = await getSearchIdxClient(); + if (!searchClient) { + logger.debug( + `[search][${jobId}] Search is not configured, nothing to do now`, + ); + return; + } + + const bookmarkId = request.data.bookmarkId; + switch (request.data.type) { + case "index": { + await runIndex(searchClient, bookmarkId); + break; + } + case "delete": { + await runDelete(searchClient, bookmarkId); + break; + } + } +} |
