aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/search-meilisearch
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-01 22:42:17 +0000
committerGitHub <noreply@github.com>2026-02-01 22:42:17 +0000
commit54243b8cc5ccd76fe23821f6e159b954a2166578 (patch)
tree45de1fa3324f8022825d521996ebc8b7ba890817 /packages/plugins/search-meilisearch
parente86188000147e0178ac6ca971f061b37daa40132 (diff)
downloadkarakeep-54243b8cc5ccd76fe23821f6e159b954a2166578.tar.zst
feat: batch meilisearch requests (#2441)
* feat: batch meilisearch requests * more fixes
Diffstat (limited to 'packages/plugins/search-meilisearch')
-rw-r--r--packages/plugins/search-meilisearch/src/env.ts2
-rw-r--r--packages/plugins/search-meilisearch/src/index.ts200
2 files changed, 191 insertions, 11 deletions
diff --git a/packages/plugins/search-meilisearch/src/env.ts b/packages/plugins/search-meilisearch/src/env.ts
index c06fdd55..50800950 100644
--- a/packages/plugins/search-meilisearch/src/env.ts
+++ b/packages/plugins/search-meilisearch/src/env.ts
@@ -4,5 +4,7 @@ export const envConfig = z
.object({
MEILI_ADDR: z.string().optional(),
MEILI_MASTER_KEY: z.string().default(""),
+ MEILI_BATCH_SIZE: z.coerce.number().int().positive().default(50),
+ MEILI_BATCH_TIMEOUT_MS: z.coerce.number().int().positive().default(500),
})
.parse(process.env);
diff --git a/packages/plugins/search-meilisearch/src/index.ts b/packages/plugins/search-meilisearch/src/index.ts
index 8ffd48cf..5ebbd2ec 100644
--- a/packages/plugins/search-meilisearch/src/index.ts
+++ b/packages/plugins/search-meilisearch/src/index.ts
@@ -1,9 +1,11 @@
import type { Index } from "meilisearch";
+import { Mutex } from "async-mutex";
import { MeiliSearch } from "meilisearch";
import type {
BookmarkSearchDocument,
FilterQuery,
+ IndexingOptions,
SearchIndexClient,
SearchOptions,
SearchResponse,
@@ -26,19 +28,190 @@ function filterToMeiliSearchFilter(filter: FilterQuery): string {
}
}
-class MeiliSearchIndexClient implements SearchIndexClient {
- constructor(private index: Index<BookmarkSearchDocument>) {}
+type PendingOperation =
+ | {
+ type: "add";
+ document: BookmarkSearchDocument;
+ resolve: () => void;
+ reject: (error: Error) => void;
+ }
+ | {
+ type: "delete";
+ id: string;
+ resolve: () => void;
+ reject: (error: Error) => void;
+ };
+
+class BatchingDocumentQueue {
+ private pendingOperations: PendingOperation[] = [];
+ private flushTimeout: ReturnType<typeof setTimeout> | null = null;
+ private mutex = new Mutex();
- async addDocuments(documents: BookmarkSearchDocument[]): Promise<void> {
- const task = await this.index.addDocuments(documents, {
- primaryKey: "id",
+ constructor(
+ private index: Index<BookmarkSearchDocument>,
+ private jobTimeoutSec: number,
+ private batchSize: number,
+ private batchTimeoutMs: number,
+ ) {}
+
+ async addDocument(document: BookmarkSearchDocument): Promise<void> {
+ return new Promise((resolve, reject) => {
+ this.pendingOperations.push({ type: "add", document, resolve, reject });
+ this.scheduleFlush();
+
+ if (this.pendingOperations.length >= this.batchSize) {
+ void this.flush();
+ }
});
- await this.ensureTaskSuccess(task.taskUid);
}
- async deleteDocuments(ids: string[]): Promise<void> {
- const task = await this.index.deleteDocuments(ids);
- await this.ensureTaskSuccess(task.taskUid);
+ async deleteDocument(id: string): Promise<void> {
+ return new Promise((resolve, reject) => {
+ this.pendingOperations.push({ type: "delete", id, resolve, reject });
+ this.scheduleFlush();
+
+ if (this.pendingOperations.length >= this.batchSize) {
+ void this.flush();
+ }
+ });
+ }
+
+ private scheduleFlush(): void {
+ if (this.flushTimeout === null) {
+ this.flushTimeout = setTimeout(() => {
+ void this.flush();
+ }, this.batchTimeoutMs);
+ }
+ }
+
+ private async flush(): Promise<void> {
+ await this.mutex.runExclusive(async () => {
+ if (this.flushTimeout) {
+ clearTimeout(this.flushTimeout);
+ this.flushTimeout = null;
+ }
+
+ // Process operations in order, batching consecutive operations of the same type
+ while (this.pendingOperations.length > 0) {
+ const currentType = this.pendingOperations[0].type;
+
+ // Collect consecutive operations of the same type (up to batchSize)
+ const batch: PendingOperation[] = [];
+ while (
+ batch.length < this.batchSize &&
+ this.pendingOperations.length > 0 &&
+ this.pendingOperations[0].type === currentType
+ ) {
+ batch.push(this.pendingOperations.shift()!);
+ }
+
+ if (currentType === "add") {
+ await this.flushAddBatch(
+ batch as Extract<PendingOperation, { type: "add" }>[],
+ );
+ } else {
+ await this.flushDeleteBatch(
+ batch as Extract<PendingOperation, { type: "delete" }>[],
+ );
+ }
+ }
+ });
+ }
+
+ private async flushAddBatch(
+ batch: Extract<PendingOperation, { type: "add" }>[],
+ ): Promise<void> {
+ if (batch.length === 0) return;
+
+ try {
+ const documents = batch.map((p) => p.document);
+ const task = await this.index.addDocuments(documents, {
+ primaryKey: "id",
+ });
+ await this.ensureTaskSuccess(task.taskUid);
+ batch.forEach((p) => p.resolve());
+ } catch (error) {
+ batch.forEach((p) => p.reject(error as Error));
+ }
+ }
+
+ private async flushDeleteBatch(
+ batch: Extract<PendingOperation, { type: "delete" }>[],
+ ): Promise<void> {
+ if (batch.length === 0) return;
+
+ try {
+ const ids = batch.map((p) => p.id);
+ const task = await this.index.deleteDocuments(ids);
+ await this.ensureTaskSuccess(task.taskUid);
+ batch.forEach((p) => p.resolve());
+ } catch (error) {
+ batch.forEach((p) => p.reject(error as Error));
+ }
+ }
+
+ private async ensureTaskSuccess(taskUid: number): Promise<void> {
+ const task = await this.index.waitForTask(taskUid, {
+ intervalMs: 200,
+ timeOutMs: this.jobTimeoutSec * 1000 * 0.9,
+ });
+ if (task.error) {
+ throw new Error(`Search task failed: ${task.error.message}`);
+ }
+ }
+}
+
+class MeiliSearchIndexClient implements SearchIndexClient {
+ private batchQueue: BatchingDocumentQueue;
+ private jobTimeoutSec: number;
+
+ constructor(
+ private index: Index<BookmarkSearchDocument>,
+ jobTimeoutSec: number,
+ batchSize: number,
+ batchTimeoutMs: number,
+ ) {
+ this.jobTimeoutSec = jobTimeoutSec;
+ this.batchQueue = new BatchingDocumentQueue(
+ index,
+ jobTimeoutSec,
+ batchSize,
+ batchTimeoutMs,
+ );
+ }
+
+ async addDocuments(
+ documents: BookmarkSearchDocument[],
+ options?: IndexingOptions,
+ ): Promise<void> {
+ const shouldBatch = options?.batch !== false;
+
+ if (shouldBatch) {
+ await Promise.all(
+ documents.map((doc) => this.batchQueue.addDocument(doc)),
+ );
+ } else {
+ // Direct indexing without batching
+ const task = await this.index.addDocuments(documents, {
+ primaryKey: "id",
+ });
+ await this.ensureTaskSuccess(task.taskUid);
+ }
+ }
+
+ async deleteDocuments(
+ ids: string[],
+ options?: IndexingOptions,
+ ): Promise<void> {
+ const shouldBatch = options?.batch !== false;
+
+ if (shouldBatch) {
+ await Promise.all(ids.map((id) => this.batchQueue.deleteDocument(id)));
+ } else {
+ // Direct deletion without batching
+ const task = await this.index.deleteDocuments(ids);
+ await this.ensureTaskSuccess(task.taskUid);
+ }
}
async search(options: SearchOptions): Promise<SearchResponse> {
@@ -69,7 +242,7 @@ class MeiliSearchIndexClient implements SearchIndexClient {
private async ensureTaskSuccess(taskUid: number): Promise<void> {
const task = await this.index.waitForTask(taskUid, {
intervalMs: 200,
- timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9,
+ timeOutMs: this.jobTimeoutSec * 1000 * 0.9,
});
if (task.error) {
throw new Error(`Search task failed: ${task.error.message}`);
@@ -130,7 +303,12 @@ export class MeiliSearchProvider implements PluginProvider<SearchIndexClient> {
}
await this.configureIndex(indexFound);
- this.indexClient = new MeiliSearchIndexClient(indexFound);
+ this.indexClient = new MeiliSearchIndexClient(
+ indexFound,
+ serverConfig.search.jobTimeoutSec,
+ envConfig.MEILI_BATCH_SIZE,
+ envConfig.MEILI_BATCH_TIMEOUT_MS,
+ );
return this.indexClient;
}