diff options
| author | MohamedBassem <me@mbassem.com> | 2025-07-27 19:37:11 +0100 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2025-07-27 19:37:11 +0100 |
| commit | b94896a0f8fa43b957a9bdd6ab57ada0ab8101af (patch) | |
| tree | ed8f79ce7d407379fa0d8210db52959f849fac0e /apps/workers | |
| parent | 7bb7f18fbf8e374efde2fe28bacfc29157b9fa19 (diff) | |
| download | karakeep-b94896a0f8fa43b957a9bdd6ab57ada0ab8101af.tar.zst | |
refactor: Extract meilisearch as a plugin
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/index.ts | 2 | ||||
| -rw-r--r-- | apps/workers/package.json | 1 | ||||
| -rw-r--r-- | apps/workers/workers/searchWorker.ts | 103 |
3 files changed, 45 insertions, 61 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 1cc1ce49..a21b9c2d 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,5 +1,6 @@ import "dotenv/config"; +import { loadAllPlugins } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { runQueueDBMigrations } from "@karakeep/shared/queues"; @@ -16,6 +17,7 @@ import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; async function main() { + await loadAllPlugins(); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); diff --git a/apps/workers/package.json b/apps/workers/package.json index aa30878a..a771c710 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -8,6 +8,7 @@ "@ghostery/adblocker-playwright": "^2.5.1", "@karakeep/db": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0", + "@karakeep/shared-server": "workspace:^0.1.0", "@karakeep/trpc": "workspace:^0.1.0", "@karakeep/tsconfig": "workspace:^0.1.0", "@mozilla/readability": "^0.6.0", diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index 74fcfc42..4c924ceb 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -10,7 +10,11 @@ import { SearchIndexingQueue, zSearchIndexingRequestSchema, } from "@karakeep/shared/queues"; -import { getSearchIdxClient } from "@karakeep/shared/search"; +import { + BookmarkSearchDocument, + getSearchClient, + SearchIndexClient, +} from "@karakeep/shared/search"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; export class SearchIndexingWorker { @@ -44,20 +48,7 @@ export class SearchIndexingWorker { } } -async function ensureTaskSuccess( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - taskUid: number, -) { - const task = await searchClient.waitForTask(taskUid); - if (task.error) { - throw new Error(`Search task failed: ${task.error.message}`); - } -} - -async function runIndex( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { +async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) { const bookmark = await db.query.bookmarks.findFirst({ where: eq(bookmarks.id, bookmarkId), with: { @@ -76,53 +67,43 @@ async function runIndex( throw new Error(`Bookmark ${bookmarkId} not found`); } - const task = await searchClient.addDocuments( - [ - { - id: bookmark.id, - userId: bookmark.userId, - ...(bookmark.link - ? { - url: bookmark.link.url, - linkTitle: bookmark.link.title, - description: bookmark.link.description, - content: await Bookmark.getBookmarkPlainTextContent( - bookmark.link, - bookmark.userId, - ), - publisher: bookmark.link.publisher, - author: bookmark.link.author, - datePublished: bookmark.link.datePublished, - dateModified: bookmark.link.dateModified, - } - : undefined), - ...(bookmark.asset - ? { - content: bookmark.asset.content, - metadata: bookmark.asset.metadata, - } - : undefined), - ...(bookmark.text ? { content: bookmark.text.text } : undefined), - note: bookmark.note, - summary: bookmark.summary, - title: bookmark.title, - createdAt: bookmark.createdAt.toISOString(), - tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), - }, - ], - { - primaryKey: "id", - }, - ); - await ensureTaskSuccess(searchClient, task.taskUid); + const document: BookmarkSearchDocument = { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + linkTitle: bookmark.link.title, + description: bookmark.link.description, + content: await Bookmark.getBookmarkPlainTextContent( + bookmark.link, + bookmark.userId, + ), + publisher: bookmark.link.publisher, + author: bookmark.link.author, + datePublished: bookmark.link.datePublished, + dateModified: bookmark.link.dateModified, + } + : {}), + ...(bookmark.asset + ? { + content: bookmark.asset.content, + metadata: bookmark.asset.metadata, + } + : {}), + ...(bookmark.text ? { content: bookmark.text.text } : {}), + note: bookmark.note, + summary: bookmark.summary, + title: bookmark.title, + createdAt: bookmark.createdAt.toISOString(), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }; + + await searchClient.addDocuments([document]); } -async function runDelete( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { - const task = await searchClient.deleteDocument(bookmarkId); - await ensureTaskSuccess(searchClient, task.taskUid); +async function runDelete(searchClient: SearchIndexClient, bookmarkId: string) { + await searchClient.deleteDocument(bookmarkId); } async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { @@ -135,7 +116,7 @@ async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { ); } - const searchClient = await getSearchIdxClient(); + const searchClient = await getSearchClient(); if (!searchClient) { logger.debug( `[search][${jobId}] Search is not configured, nothing to do now`, |
