From a5434730ede1272f195d6a4b13207b840a5ac2cf Mon Sep 17 00:00:00 2001 From: MohamedBassem Date: Fri, 1 Mar 2024 21:01:00 +0000 Subject: feature: Add full text search support --- packages/workers/search.ts | 115 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 packages/workers/search.ts (limited to 'packages/workers/search.ts') diff --git a/packages/workers/search.ts b/packages/workers/search.ts new file mode 100644 index 00000000..a628b2ed --- /dev/null +++ b/packages/workers/search.ts @@ -0,0 +1,115 @@ +import { db } from "@hoarder/db"; +import logger from "@hoarder/shared/logger"; +import { getSearchIdxClient } from "@hoarder/shared/search"; +import { + SearchIndexingQueue, + ZSearchIndexingRequest, + queueConnectionDetails, + zSearchIndexingRequestSchema, +} from "@hoarder/shared/queues"; +import { Job } from "bullmq"; +import { Worker } from "bullmq"; +import { bookmarks } from "@hoarder/db/schema"; +import { eq } from "drizzle-orm"; + +export class SearchIndexingWorker { + static async build() { + logger.info("Starting search indexing worker ..."); + const worker = new Worker( + SearchIndexingQueue.name, + runSearchIndexing, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[search][${jobId}] openai job failed: ${error}`); + }); + + return worker; + } +} + +async function runIndex( + searchClient: NonNullable>>, + bookmarkId: string, +) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + text: true, + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark ${bookmarkId} not found`); + } + + searchClient.addDocuments([ + { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + title: bookmark.link.title, + description: bookmark.link.description, + } + : undefined), + ...(bookmark.text ? { content: bookmark.text.text } : undefined), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }, + ]); +} + +async function runDelete( + searchClient: NonNullable>>, + bookmarkId: string, +) { + await searchClient.deleteDocument(bookmarkId); +} + +async function runSearchIndexing(job: Job) { + const jobId = job.id || "unknown"; + + const request = zSearchIndexingRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const searchClient = await getSearchIdxClient(); + if (!searchClient) { + logger.debug( + `[search][${jobId}] Search is not configured, nothing to do now`, + ); + return; + } + + const bookmarkId = request.data.bookmarkId; + switch (request.data.type) { + case "index": { + await runIndex(searchClient, bookmarkId); + break; + } + case "delete": { + await runDelete(searchClient, bookmarkId); + break; + } + } +} -- cgit v1.2.3-70-g09d2