aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2025-07-27 19:37:11 +0100
committerMohamedBassem <me@mbassem.com>2025-07-27 19:37:11 +0100
commitb94896a0f8fa43b957a9bdd6ab57ada0ab8101af (patch)
treeed8f79ce7d407379fa0d8210db52959f849fac0e /apps/workers
parent7bb7f18fbf8e374efde2fe28bacfc29157b9fa19 (diff)
downloadkarakeep-b94896a0f8fa43b957a9bdd6ab57ada0ab8101af.tar.zst
refactor: Extract meilisearch as a plugin
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts2
-rw-r--r--apps/workers/package.json1
-rw-r--r--apps/workers/workers/searchWorker.ts103
3 files changed, 45 insertions, 61 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 1cc1ce49..a21b9c2d 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,5 +1,6 @@
import "dotenv/config";
+import { loadAllPlugins } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import { runQueueDBMigrations } from "@karakeep/shared/queues";
@@ -16,6 +17,7 @@ import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
async function main() {
+ await loadAllPlugins();
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
diff --git a/apps/workers/package.json b/apps/workers/package.json
index aa30878a..a771c710 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -8,6 +8,7 @@
"@ghostery/adblocker-playwright": "^2.5.1",
"@karakeep/db": "workspace:^0.1.0",
"@karakeep/shared": "workspace:^0.1.0",
+ "@karakeep/shared-server": "workspace:^0.1.0",
"@karakeep/trpc": "workspace:^0.1.0",
"@karakeep/tsconfig": "workspace:^0.1.0",
"@mozilla/readability": "^0.6.0",
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index 74fcfc42..4c924ceb 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -10,7 +10,11 @@ import {
SearchIndexingQueue,
zSearchIndexingRequestSchema,
} from "@karakeep/shared/queues";
-import { getSearchIdxClient } from "@karakeep/shared/search";
+import {
+ BookmarkSearchDocument,
+ getSearchClient,
+ SearchIndexClient,
+} from "@karakeep/shared/search";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
export class SearchIndexingWorker {
@@ -44,20 +48,7 @@ export class SearchIndexingWorker {
}
}
-async function ensureTaskSuccess(
- searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
- taskUid: number,
-) {
- const task = await searchClient.waitForTask(taskUid);
- if (task.error) {
- throw new Error(`Search task failed: ${task.error.message}`);
- }
-}
-
-async function runIndex(
- searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
- bookmarkId: string,
-) {
+async function runIndex(searchClient: SearchIndexClient, bookmarkId: string) {
const bookmark = await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, bookmarkId),
with: {
@@ -76,53 +67,43 @@ async function runIndex(
throw new Error(`Bookmark ${bookmarkId} not found`);
}
- const task = await searchClient.addDocuments(
- [
- {
- id: bookmark.id,
- userId: bookmark.userId,
- ...(bookmark.link
- ? {
- url: bookmark.link.url,
- linkTitle: bookmark.link.title,
- description: bookmark.link.description,
- content: await Bookmark.getBookmarkPlainTextContent(
- bookmark.link,
- bookmark.userId,
- ),
- publisher: bookmark.link.publisher,
- author: bookmark.link.author,
- datePublished: bookmark.link.datePublished,
- dateModified: bookmark.link.dateModified,
- }
- : undefined),
- ...(bookmark.asset
- ? {
- content: bookmark.asset.content,
- metadata: bookmark.asset.metadata,
- }
- : undefined),
- ...(bookmark.text ? { content: bookmark.text.text } : undefined),
- note: bookmark.note,
- summary: bookmark.summary,
- title: bookmark.title,
- createdAt: bookmark.createdAt.toISOString(),
- tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
- },
- ],
- {
- primaryKey: "id",
- },
- );
- await ensureTaskSuccess(searchClient, task.taskUid);
+ const document: BookmarkSearchDocument = {
+ id: bookmark.id,
+ userId: bookmark.userId,
+ ...(bookmark.link
+ ? {
+ url: bookmark.link.url,
+ linkTitle: bookmark.link.title,
+ description: bookmark.link.description,
+ content: await Bookmark.getBookmarkPlainTextContent(
+ bookmark.link,
+ bookmark.userId,
+ ),
+ publisher: bookmark.link.publisher,
+ author: bookmark.link.author,
+ datePublished: bookmark.link.datePublished,
+ dateModified: bookmark.link.dateModified,
+ }
+ : {}),
+ ...(bookmark.asset
+ ? {
+ content: bookmark.asset.content,
+ metadata: bookmark.asset.metadata,
+ }
+ : {}),
+ ...(bookmark.text ? { content: bookmark.text.text } : {}),
+ note: bookmark.note,
+ summary: bookmark.summary,
+ title: bookmark.title,
+ createdAt: bookmark.createdAt.toISOString(),
+ tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
+ };
+
+ await searchClient.addDocuments([document]);
}
-async function runDelete(
- searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
- bookmarkId: string,
-) {
- const task = await searchClient.deleteDocument(bookmarkId);
- await ensureTaskSuccess(searchClient, task.taskUid);
+async function runDelete(searchClient: SearchIndexClient, bookmarkId: string) {
+ await searchClient.deleteDocument(bookmarkId);
}
async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
@@ -135,7 +116,7 @@ async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
);
}
- const searchClient = await getSearchIdxClient();
+ const searchClient = await getSearchClient();
if (!searchClient) {
logger.debug(
`[search][${jobId}] Search is not configured, nothing to do now`,