aboutsummaryrefslogtreecommitdiffstats
path: root/packages/workers
diff options
context:
space:
mode:
Diffstat (limited to 'packages/workers')
-rw-r--r--packages/workers/crawler.ts8
-rw-r--r--packages/workers/index.ts6
-rw-r--r--packages/workers/openai.ts24
-rw-r--r--packages/workers/search.ts115
4 files changed, 144 insertions, 9 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts
index bfb46218..7be014a7 100644
--- a/packages/workers/crawler.ts
+++ b/packages/workers/crawler.ts
@@ -2,6 +2,7 @@ import logger from "@hoarder/shared/logger";
import {
LinkCrawlerQueue,
OpenAIQueue,
+ SearchIndexingQueue,
ZCrawlLinkRequest,
queueConnectionDetails,
zCrawlLinkRequestSchema,
@@ -30,6 +31,7 @@ import assert from "assert";
import serverConfig from "@hoarder/shared/config";
import { bookmarkLinks } from "@hoarder/db/schema";
import { eq } from "drizzle-orm";
+import { SearchIndexingWorker } from "./search";
const metascraperParser = metascraper([
metascraperReadability(),
@@ -172,4 +174,10 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
OpenAIQueue.add("openai", {
bookmarkId,
});
+
+ // Update the search index
+ SearchIndexingQueue.add("search_indexing", {
+ bookmarkId,
+ type: "index",
+ });
}
diff --git a/packages/workers/index.ts b/packages/workers/index.ts
index 67be7af2..295eeaef 100644
--- a/packages/workers/index.ts
+++ b/packages/workers/index.ts
@@ -1,14 +1,16 @@
import "dotenv/config";
import { CrawlerWorker } from "./crawler";
import { OpenAiWorker } from "./openai";
+import { SearchIndexingWorker } from "./search";
async function main() {
- const [crawler, openai] = [
+ const [crawler, openai, search] = [
await CrawlerWorker.build(),
await OpenAiWorker.build(),
+ await SearchIndexingWorker.build(),
];
- await Promise.all([crawler.run(), openai.run()]);
+ await Promise.all([crawler.run(), openai.run(), search.run()]);
}
main();
diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts
index 8f85c4ec..cc456616 100644
--- a/packages/workers/openai.ts
+++ b/packages/workers/openai.ts
@@ -3,6 +3,7 @@ import logger from "@hoarder/shared/logger";
import serverConfig from "@hoarder/shared/config";
import {
OpenAIQueue,
+ SearchIndexingQueue,
ZOpenAIRequest,
queueConnectionDetails,
zOpenAIRequestSchema,
@@ -159,13 +160,16 @@ async function connectTags(bookmarkId: string, tagIds: string[]) {
if (tagIds.length == 0) {
return;
}
- await db.insert(tagsOnBookmarks).values(
- tagIds.map((tagId) => ({
- tagId,
- bookmarkId,
- attachedBy: "ai" as const,
- })),
- );
+ await db
+ .insert(tagsOnBookmarks)
+ .values(
+ tagIds.map((tagId) => ({
+ tagId,
+ bookmarkId,
+ attachedBy: "ai" as const,
+ })),
+ )
+ .onConflictDoNothing();
}
async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
@@ -203,4 +207,10 @@ async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
const tagIds = await createTags(tags, bookmark.userId);
await connectTags(bookmarkId, tagIds);
+
+ // Update the search index
+ SearchIndexingQueue.add("search_indexing", {
+ bookmarkId,
+ type: "index",
+ });
}
diff --git a/packages/workers/search.ts b/packages/workers/search.ts
new file mode 100644
index 00000000..a628b2ed
--- /dev/null
+++ b/packages/workers/search.ts
@@ -0,0 +1,115 @@
+import { db } from "@hoarder/db";
+import logger from "@hoarder/shared/logger";
+import { getSearchIdxClient } from "@hoarder/shared/search";
+import {
+ SearchIndexingQueue,
+ ZSearchIndexingRequest,
+ queueConnectionDetails,
+ zSearchIndexingRequestSchema,
+} from "@hoarder/shared/queues";
+import { Job } from "bullmq";
+import { Worker } from "bullmq";
+import { bookmarks } from "@hoarder/db/schema";
+import { eq } from "drizzle-orm";
+
+export class SearchIndexingWorker {
+ static async build() {
+ logger.info("Starting search indexing worker ...");
+ const worker = new Worker<ZSearchIndexingRequest, void>(
+ SearchIndexingQueue.name,
+ runSearchIndexing,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[search][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[search][${jobId}] openai job failed: ${error}`);
+ });
+
+ return worker;
+ }
+}
+
+async function runIndex(
+ searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
+ bookmarkId: string,
+) {
+ const bookmark = await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, bookmarkId),
+ with: {
+ link: true,
+ text: true,
+ tagsOnBookmarks: {
+ with: {
+ tag: true,
+ },
+ },
+ },
+ });
+
+ if (!bookmark) {
+ throw new Error(`Bookmark ${bookmarkId} not found`);
+ }
+
+ searchClient.addDocuments([
+ {
+ id: bookmark.id,
+ userId: bookmark.userId,
+ ...(bookmark.link
+ ? {
+ url: bookmark.link.url,
+ title: bookmark.link.title,
+ description: bookmark.link.description,
+ }
+ : undefined),
+ ...(bookmark.text ? { content: bookmark.text.text } : undefined),
+ tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
+ },
+ ]);
+}
+
+async function runDelete(
+ searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
+ bookmarkId: string,
+) {
+ await searchClient.deleteDocument(bookmarkId);
+}
+
+async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ const request = zSearchIndexingRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[search][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ const searchClient = await getSearchIdxClient();
+ if (!searchClient) {
+ logger.debug(
+ `[search][${jobId}] Search is not configured, nothing to do now`,
+ );
+ return;
+ }
+
+ const bookmarkId = request.data.bookmarkId;
+ switch (request.data.type) {
+ case "index": {
+ await runIndex(searchClient, bookmarkId);
+ break;
+ }
+ case "delete": {
+ await runDelete(searchClient, bookmarkId);
+ break;
+ }
+ }
+}