From 54243b8cc5ccd76fe23821f6e159b954a2166578 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 1 Feb 2026 22:42:17 +0000 Subject: feat: batch meilisearch requests (#2441) * feat: batch meilisearch requests * more fixes --- packages/plugins/package.json | 1 + packages/plugins/search-meilisearch/src/env.ts | 2 + packages/plugins/search-meilisearch/src/index.ts | 200 +++++++++++++++++++++-- packages/shared/search.ts | 15 +- 4 files changed, 205 insertions(+), 13 deletions(-) (limited to 'packages') diff --git a/packages/plugins/package.json b/packages/plugins/package.json index daf46843..f263d593 100644 --- a/packages/plugins/package.json +++ b/packages/plugins/package.json @@ -22,6 +22,7 @@ "@karakeep/shared": "workspace:*", "@restatedev/restate-sdk": "^1.9.0", "@restatedev/restate-sdk-clients": "^1.9.0", + "async-mutex": "^0.4.1", "liteque": "^0.7.0", "meilisearch": "^0.45.0" }, diff --git a/packages/plugins/search-meilisearch/src/env.ts b/packages/plugins/search-meilisearch/src/env.ts index c06fdd55..50800950 100644 --- a/packages/plugins/search-meilisearch/src/env.ts +++ b/packages/plugins/search-meilisearch/src/env.ts @@ -4,5 +4,7 @@ export const envConfig = z .object({ MEILI_ADDR: z.string().optional(), MEILI_MASTER_KEY: z.string().default(""), + MEILI_BATCH_SIZE: z.coerce.number().int().positive().default(50), + MEILI_BATCH_TIMEOUT_MS: z.coerce.number().int().positive().default(500), }) .parse(process.env); diff --git a/packages/plugins/search-meilisearch/src/index.ts b/packages/plugins/search-meilisearch/src/index.ts index 8ffd48cf..5ebbd2ec 100644 --- a/packages/plugins/search-meilisearch/src/index.ts +++ b/packages/plugins/search-meilisearch/src/index.ts @@ -1,9 +1,11 @@ import type { Index } from "meilisearch"; +import { Mutex } from "async-mutex"; import { MeiliSearch } from "meilisearch"; import type { BookmarkSearchDocument, FilterQuery, + IndexingOptions, SearchIndexClient, SearchOptions, SearchResponse, @@ -26,19 +28,190 @@ function filterToMeiliSearchFilter(filter: FilterQuery): string { } } -class MeiliSearchIndexClient implements SearchIndexClient { - constructor(private index: Index) {} +type PendingOperation = + | { + type: "add"; + document: BookmarkSearchDocument; + resolve: () => void; + reject: (error: Error) => void; + } + | { + type: "delete"; + id: string; + resolve: () => void; + reject: (error: Error) => void; + }; + +class BatchingDocumentQueue { + private pendingOperations: PendingOperation[] = []; + private flushTimeout: ReturnType | null = null; + private mutex = new Mutex(); - async addDocuments(documents: BookmarkSearchDocument[]): Promise { - const task = await this.index.addDocuments(documents, { - primaryKey: "id", + constructor( + private index: Index, + private jobTimeoutSec: number, + private batchSize: number, + private batchTimeoutMs: number, + ) {} + + async addDocument(document: BookmarkSearchDocument): Promise { + return new Promise((resolve, reject) => { + this.pendingOperations.push({ type: "add", document, resolve, reject }); + this.scheduleFlush(); + + if (this.pendingOperations.length >= this.batchSize) { + void this.flush(); + } }); - await this.ensureTaskSuccess(task.taskUid); } - async deleteDocuments(ids: string[]): Promise { - const task = await this.index.deleteDocuments(ids); - await this.ensureTaskSuccess(task.taskUid); + async deleteDocument(id: string): Promise { + return new Promise((resolve, reject) => { + this.pendingOperations.push({ type: "delete", id, resolve, reject }); + this.scheduleFlush(); + + if (this.pendingOperations.length >= this.batchSize) { + void this.flush(); + } + }); + } + + private scheduleFlush(): void { + if (this.flushTimeout === null) { + this.flushTimeout = setTimeout(() => { + void this.flush(); + }, this.batchTimeoutMs); + } + } + + private async flush(): Promise { + await this.mutex.runExclusive(async () => { + if (this.flushTimeout) { + clearTimeout(this.flushTimeout); + this.flushTimeout = null; + } + + // Process operations in order, batching consecutive operations of the same type + while (this.pendingOperations.length > 0) { + const currentType = this.pendingOperations[0].type; + + // Collect consecutive operations of the same type (up to batchSize) + const batch: PendingOperation[] = []; + while ( + batch.length < this.batchSize && + this.pendingOperations.length > 0 && + this.pendingOperations[0].type === currentType + ) { + batch.push(this.pendingOperations.shift()!); + } + + if (currentType === "add") { + await this.flushAddBatch( + batch as Extract[], + ); + } else { + await this.flushDeleteBatch( + batch as Extract[], + ); + } + } + }); + } + + private async flushAddBatch( + batch: Extract[], + ): Promise { + if (batch.length === 0) return; + + try { + const documents = batch.map((p) => p.document); + const task = await this.index.addDocuments(documents, { + primaryKey: "id", + }); + await this.ensureTaskSuccess(task.taskUid); + batch.forEach((p) => p.resolve()); + } catch (error) { + batch.forEach((p) => p.reject(error as Error)); + } + } + + private async flushDeleteBatch( + batch: Extract[], + ): Promise { + if (batch.length === 0) return; + + try { + const ids = batch.map((p) => p.id); + const task = await this.index.deleteDocuments(ids); + await this.ensureTaskSuccess(task.taskUid); + batch.forEach((p) => p.resolve()); + } catch (error) { + batch.forEach((p) => p.reject(error as Error)); + } + } + + private async ensureTaskSuccess(taskUid: number): Promise { + const task = await this.index.waitForTask(taskUid, { + intervalMs: 200, + timeOutMs: this.jobTimeoutSec * 1000 * 0.9, + }); + if (task.error) { + throw new Error(`Search task failed: ${task.error.message}`); + } + } +} + +class MeiliSearchIndexClient implements SearchIndexClient { + private batchQueue: BatchingDocumentQueue; + private jobTimeoutSec: number; + + constructor( + private index: Index, + jobTimeoutSec: number, + batchSize: number, + batchTimeoutMs: number, + ) { + this.jobTimeoutSec = jobTimeoutSec; + this.batchQueue = new BatchingDocumentQueue( + index, + jobTimeoutSec, + batchSize, + batchTimeoutMs, + ); + } + + async addDocuments( + documents: BookmarkSearchDocument[], + options?: IndexingOptions, + ): Promise { + const shouldBatch = options?.batch !== false; + + if (shouldBatch) { + await Promise.all( + documents.map((doc) => this.batchQueue.addDocument(doc)), + ); + } else { + // Direct indexing without batching + const task = await this.index.addDocuments(documents, { + primaryKey: "id", + }); + await this.ensureTaskSuccess(task.taskUid); + } + } + + async deleteDocuments( + ids: string[], + options?: IndexingOptions, + ): Promise { + const shouldBatch = options?.batch !== false; + + if (shouldBatch) { + await Promise.all(ids.map((id) => this.batchQueue.deleteDocument(id))); + } else { + // Direct deletion without batching + const task = await this.index.deleteDocuments(ids); + await this.ensureTaskSuccess(task.taskUid); + } } async search(options: SearchOptions): Promise { @@ -69,7 +242,7 @@ class MeiliSearchIndexClient implements SearchIndexClient { private async ensureTaskSuccess(taskUid: number): Promise { const task = await this.index.waitForTask(taskUid, { intervalMs: 200, - timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9, + timeOutMs: this.jobTimeoutSec * 1000 * 0.9, }); if (task.error) { throw new Error(`Search task failed: ${task.error.message}`); @@ -130,7 +303,12 @@ export class MeiliSearchProvider implements PluginProvider { } await this.configureIndex(indexFound); - this.indexClient = new MeiliSearchIndexClient(indexFound); + this.indexClient = new MeiliSearchIndexClient( + indexFound, + serverConfig.search.jobTimeoutSec, + envConfig.MEILI_BATCH_SIZE, + envConfig.MEILI_BATCH_TIMEOUT_MS, + ); return this.indexClient; } diff --git a/packages/shared/search.ts b/packages/shared/search.ts index d23ab29f..651b5245 100644 --- a/packages/shared/search.ts +++ b/packages/shared/search.ts @@ -60,9 +60,20 @@ export interface SearchResponse { processingTimeMs: number; } +export interface IndexingOptions { + /** + * Whether to batch requests. Defaults to true. + * Set to false to bypass batching for improved reliability (e.g., on retries). + */ + batch?: boolean; +} + export interface SearchIndexClient { - addDocuments(documents: BookmarkSearchDocument[]): Promise; - deleteDocuments(ids: string[]): Promise; + addDocuments( + documents: BookmarkSearchDocument[], + options?: IndexingOptions, + ): Promise; + deleteDocuments(ids: string[], options?: IndexingOptions): Promise; search(options: SearchOptions): Promise; clearIndex(): Promise; } -- cgit v1.2.3-70-g09d2