import type { Index } from "meilisearch"; import { Mutex } from "async-mutex"; import { MeiliSearch } from "meilisearch"; import type { BookmarkSearchDocument, FilterQuery, IndexingOptions, SearchIndexClient, SearchOptions, SearchResponse, } from "@karakeep/shared/search"; import serverConfig from "@karakeep/shared/config"; import { PluginProvider } from "@karakeep/shared/plugins"; import { envConfig } from "./env"; function filterToMeiliSearchFilter(filter: FilterQuery): string { switch (filter.type) { case "eq": return `${filter.field} = "${filter.value}"`; case "in": return `${filter.field} IN [${filter.values.join(",")}]`; default: { const exhaustiveCheck: never = filter; throw new Error(`Unhandled color case: ${exhaustiveCheck}`); } } } type PendingOperation = | { type: "add"; document: BookmarkSearchDocument; resolve: () => void; reject: (error: Error) => void; } | { type: "delete"; id: string; resolve: () => void; reject: (error: Error) => void; }; class BatchingDocumentQueue { private pendingOperations: PendingOperation[] = []; private flushTimeout: ReturnType | null = null; private mutex = new Mutex(); constructor( private index: Index, private jobTimeoutSec: number, private batchSize: number, private batchTimeoutMs: number, ) {} async addDocument(document: BookmarkSearchDocument): Promise { return new Promise((resolve, reject) => { this.pendingOperations.push({ type: "add", document, resolve, reject }); this.scheduleFlush(); if (this.pendingOperations.length >= this.batchSize) { void this.flush(); } }); } async deleteDocument(id: string): Promise { return new Promise((resolve, reject) => { this.pendingOperations.push({ type: "delete", id, resolve, reject }); this.scheduleFlush(); if (this.pendingOperations.length >= this.batchSize) { void this.flush(); } }); } private scheduleFlush(): void { if (this.flushTimeout === null) { this.flushTimeout = setTimeout(() => { void this.flush(); }, this.batchTimeoutMs); } } private async flush(): Promise { await this.mutex.runExclusive(async () => { if (this.flushTimeout) { clearTimeout(this.flushTimeout); this.flushTimeout = null; } // Process operations in order, batching consecutive operations of the same type while (this.pendingOperations.length > 0) { const currentType = this.pendingOperations[0].type; // Collect consecutive operations of the same type (up to batchSize) const batch: PendingOperation[] = []; while ( batch.length < this.batchSize && this.pendingOperations.length > 0 && this.pendingOperations[0].type === currentType ) { batch.push(this.pendingOperations.shift()!); } if (currentType === "add") { await this.flushAddBatch( batch as Extract[], ); } else { await this.flushDeleteBatch( batch as Extract[], ); } } }); } private async flushAddBatch( batch: Extract[], ): Promise { if (batch.length === 0) return; try { const documents = batch.map((p) => p.document); const task = await this.index.addDocuments(documents, { primaryKey: "id", }); await this.ensureTaskSuccess(task.taskUid); batch.forEach((p) => p.resolve()); } catch (error) { batch.forEach((p) => p.reject(error as Error)); } } private async flushDeleteBatch( batch: Extract[], ): Promise { if (batch.length === 0) return; try { const ids = batch.map((p) => p.id); const task = await this.index.deleteDocuments(ids); await this.ensureTaskSuccess(task.taskUid); batch.forEach((p) => p.resolve()); } catch (error) { batch.forEach((p) => p.reject(error as Error)); } } private async ensureTaskSuccess(taskUid: number): Promise { const task = await this.index.waitForTask(taskUid, { intervalMs: 200, timeOutMs: this.jobTimeoutSec * 1000 * 0.9, }); if (task.error) { throw new Error(`Search task failed: ${task.error.message}`); } } } class MeiliSearchIndexClient implements SearchIndexClient { private batchQueue: BatchingDocumentQueue; private jobTimeoutSec: number; constructor( private index: Index, jobTimeoutSec: number, batchSize: number, batchTimeoutMs: number, ) { this.jobTimeoutSec = jobTimeoutSec; this.batchQueue = new BatchingDocumentQueue( index, jobTimeoutSec, batchSize, batchTimeoutMs, ); } async addDocuments( documents: BookmarkSearchDocument[], options?: IndexingOptions, ): Promise { const shouldBatch = options?.batch !== false; if (shouldBatch) { await Promise.all( documents.map((doc) => this.batchQueue.addDocument(doc)), ); } else { // Direct indexing without batching const task = await this.index.addDocuments(documents, { primaryKey: "id", }); await this.ensureTaskSuccess(task.taskUid); } } async deleteDocuments( ids: string[], options?: IndexingOptions, ): Promise { const shouldBatch = options?.batch !== false; if (shouldBatch) { await Promise.all(ids.map((id) => this.batchQueue.deleteDocument(id))); } else { // Direct deletion without batching const task = await this.index.deleteDocuments(ids); await this.ensureTaskSuccess(task.taskUid); } } async search(options: SearchOptions): Promise { const result = await this.index.search(options.query, { filter: options.filter?.map((f) => filterToMeiliSearchFilter(f)), limit: options.limit, offset: options.offset, sort: options.sort?.map((s) => `${s.field}:${s.order}`), attributesToRetrieve: ["id"], showRankingScore: true, }); return { hits: result.hits.map((hit) => ({ id: hit.id, score: hit._rankingScore, })), totalHits: result.estimatedTotalHits ?? 0, processingTimeMs: result.processingTimeMs, }; } async clearIndex(): Promise { const task = await this.index.deleteAllDocuments(); await this.ensureTaskSuccess(task.taskUid); } private async ensureTaskSuccess(taskUid: number): Promise { const task = await this.index.waitForTask(taskUid, { intervalMs: 200, timeOutMs: this.jobTimeoutSec * 1000 * 0.9, }); if (task.error) { throw new Error(`Search task failed: ${task.error.message}`); } } } export class MeiliSearchProvider implements PluginProvider { private client: MeiliSearch | undefined; private indexClient: SearchIndexClient | undefined; private initPromise: Promise | undefined; private readonly indexName = "bookmarks"; constructor() { if (MeiliSearchProvider.isConfigured()) { this.client = new MeiliSearch({ host: envConfig.MEILI_ADDR!, apiKey: envConfig.MEILI_MASTER_KEY, }); } } static isConfigured(): boolean { return !!envConfig.MEILI_ADDR; } async getClient(): Promise { if (this.indexClient) { return this.indexClient; } if (this.initPromise) { return this.initPromise; } this.initPromise = this.initClient(); const client = await this.initPromise; this.initPromise = undefined; return client; } private async initClient(): Promise { if (!this.client) { return null; } const indices = await this.client.getIndexes(); let indexFound = indices.results.find((i) => i.uid === this.indexName); if (!indexFound) { const idx = await this.client.createIndex(this.indexName, { primaryKey: "id", }); await this.client.waitForTask(idx.taskUid); indexFound = await this.client.getIndex( this.indexName, ); } await this.configureIndex(indexFound); this.indexClient = new MeiliSearchIndexClient( indexFound, serverConfig.search.jobTimeoutSec, envConfig.MEILI_BATCH_SIZE, envConfig.MEILI_BATCH_TIMEOUT_MS, ); return this.indexClient; } private async configureIndex( index: Index, ): Promise { const desiredFilterableAttributes = ["id", "userId"].sort(); const desiredSortableAttributes = ["createdAt"].sort(); const settings = await index.getSettings(); if ( JSON.stringify(settings.filterableAttributes?.sort()) !== JSON.stringify(desiredFilterableAttributes) ) { console.log( `[meilisearch] Updating desired filterable attributes to ${desiredFilterableAttributes} from ${settings.filterableAttributes}`, ); const taskId = await index.updateFilterableAttributes( desiredFilterableAttributes, ); await this.client!.waitForTask(taskId.taskUid); } if ( JSON.stringify(settings.sortableAttributes?.sort()) !== JSON.stringify(desiredSortableAttributes) ) { console.log( `[meilisearch] Updating desired sortable attributes to ${desiredSortableAttributes} from ${settings.sortableAttributes}`, ); const taskId = await index.updateSortableAttributes( desiredSortableAttributes, ); await this.client!.waitForTask(taskId.taskUid); } } }