diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-01-12 23:47:25 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-01-12 23:47:25 +0000 |
| commit | 38d403bcc26244e778a6e7a2f75ee39a9ec7ed27 (patch) | |
| tree | 192d990e67a0b95aece9381a149513d14f6b0464 /packages/shared/concurrency.ts | |
| parent | 9fd26b472b18924ab11afcebace90329b0fe3abf (diff) | |
| download | karakeep-38d403bcc26244e778a6e7a2f75ee39a9ec7ed27.tar.zst | |
fix: Limit concurrency of bulk actions. Fix #773
Diffstat (limited to 'packages/shared/concurrency.ts')
| -rw-r--r-- | packages/shared/concurrency.ts | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts new file mode 100644 index 00000000..e2934f9d --- /dev/null +++ b/packages/shared/concurrency.ts @@ -0,0 +1,56 @@ +export class AsyncSemaphore { + private permits: number; + private queue: (() => void)[] = []; + + constructor(permits: number) { + this.permits = permits; + } + + acquire(): Promise<void> { + if (this.permits > 0) { + this.permits--; + return Promise.resolve(); + } else { + return new Promise<void>((resolve) => { + this.queue.push(resolve); + }); + } + } + + release(): void { + if (this.queue.length > 0) { + const resolve = this.queue.shift(); + if (resolve) { + resolve(); + } + } else { + this.permits++; + } + } + + get available(): number { + return this.permits; + } +} + +export function limitConcurrency<T>( + promises: (() => Promise<T>)[], + concurrencyLimit: number, +): Promise<T>[] { + const semaphore = new AsyncSemaphore(concurrencyLimit); + const results: Promise<T>[] = []; + + for (const promiseFunction of promises) { + results.push( + semaphore + .acquire() + .then(() => { + return promiseFunction(); + }) + .finally(() => { + semaphore.release(); + }), + ); + } + return results; +} |
