diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
| commit | 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 (patch) | |
| tree | 3119581aafce5321aaba9719ba3b2597d000d564 /apps/workers/searchWorker.ts | |
| parent | a5ae67c241d8cdd452acd4d98800ec61740c041f (diff) | |
| download | karakeep-2743d9e38ecfdbf757d4d2f97bcf09d601245b59.tar.zst | |
feat: Add AI auto summarization. Fixes #1163
Diffstat (limited to 'apps/workers/searchWorker.ts')
| -rw-r--r-- | apps/workers/searchWorker.ts | 156 |
1 files changed, 0 insertions, 156 deletions
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts deleted file mode 100644 index e7b827a9..00000000 --- a/apps/workers/searchWorker.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; - -import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { - SearchIndexingQueue, - zSearchIndexingRequestSchema, -} from "@karakeep/shared/queues"; -import { getSearchIdxClient } from "@karakeep/shared/search"; - -export class SearchIndexingWorker { - static build() { - logger.info("Starting search indexing worker ..."); - const worker = new Runner<ZSearchIndexingRequest>( - SearchIndexingQueue, - { - run: runSearchIndexing, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[search][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function ensureTaskSuccess( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - taskUid: number, -) { - const task = await searchClient.waitForTask(taskUid); - if (task.error) { - throw new Error(`Search task failed: ${task.error.message}`); - } -} - -async function runIndex( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: true, - text: true, - asset: true, - tagsOnBookmarks: { - with: { - tag: true, - }, - }, - }, - }); - - if (!bookmark) { - throw new Error(`Bookmark ${bookmarkId} not found`); - } - - const task = await searchClient.addDocuments( - [ - { - id: bookmark.id, - userId: bookmark.userId, - ...(bookmark.link - ? { - url: bookmark.link.url, - linkTitle: bookmark.link.title, - description: bookmark.link.description, - content: bookmark.link.content, - publisher: bookmark.link.publisher, - author: bookmark.link.author, - datePublished: bookmark.link.datePublished, - dateModified: bookmark.link.dateModified, - } - : undefined), - ...(bookmark.asset - ? { - content: bookmark.asset.content, - metadata: bookmark.asset.metadata, - } - : undefined), - ...(bookmark.text ? { content: bookmark.text.text } : undefined), - note: bookmark.note, - summary: bookmark.summary, - title: bookmark.title, - createdAt: bookmark.createdAt.toISOString(), - tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), - }, - ], - { - primaryKey: "id", - }, - ); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runDelete( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { - const task = await searchClient.deleteDocument(bookmarkId); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { - const jobId = job.id; - - const request = zSearchIndexingRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const searchClient = await getSearchIdxClient(); - if (!searchClient) { - logger.debug( - `[search][${jobId}] Search is not configured, nothing to do now`, - ); - return; - } - - const bookmarkId = request.data.bookmarkId; - logger.info( - `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, - ); - - switch (request.data.type) { - case "index": { - await runIndex(searchClient, bookmarkId); - break; - } - case "delete": { - await runDelete(searchClient, bookmarkId); - break; - } - } -} |
