From 38d403bcc26244e778a6e7a2f75ee39a9ec7ed27 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 12 Jan 2025 23:47:25 +0000 Subject: fix: Limit concurrency of bulk actions. Fix #773 --- packages/shared/concurrency.ts | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 packages/shared/concurrency.ts (limited to 'packages/shared/concurrency.ts') diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts new file mode 100644 index 00000000..e2934f9d --- /dev/null +++ b/packages/shared/concurrency.ts @@ -0,0 +1,56 @@ +export class AsyncSemaphore { + private permits: number; + private queue: (() => void)[] = []; + + constructor(permits: number) { + this.permits = permits; + } + + acquire(): Promise { + if (this.permits > 0) { + this.permits--; + return Promise.resolve(); + } else { + return new Promise((resolve) => { + this.queue.push(resolve); + }); + } + } + + release(): void { + if (this.queue.length > 0) { + const resolve = this.queue.shift(); + if (resolve) { + resolve(); + } + } else { + this.permits++; + } + } + + get available(): number { + return this.permits; + } +} + +export function limitConcurrency( + promises: (() => Promise)[], + concurrencyLimit: number, +): Promise[] { + const semaphore = new AsyncSemaphore(concurrencyLimit); + const results: Promise[] = []; + + for (const promiseFunction of promises) { + results.push( + semaphore + .acquire() + .then(() => { + return promiseFunction(); + }) + .finally(() => { + semaphore.release(); + }), + ); + } + return results; +} -- cgit v1.2.3-70-g09d2