diff options
Diffstat (limited to 'packages/shared/concurrency.ts')
| -rw-r--r-- | packages/shared/concurrency.ts | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts new file mode 100644 index 00000000..e2934f9d --- /dev/null +++ b/packages/shared/concurrency.ts @@ -0,0 +1,56 @@ +export class AsyncSemaphore { + private permits: number; + private queue: (() => void)[] = []; + + constructor(permits: number) { + this.permits = permits; + } + + acquire(): Promise<void> { + if (this.permits > 0) { + this.permits--; + return Promise.resolve(); + } else { + return new Promise<void>((resolve) => { + this.queue.push(resolve); + }); + } + } + + release(): void { + if (this.queue.length > 0) { + const resolve = this.queue.shift(); + if (resolve) { + resolve(); + } + } else { + this.permits++; + } + } + + get available(): number { + return this.permits; + } +} + +export function limitConcurrency<T>( + promises: (() => Promise<T>)[], + concurrencyLimit: number, +): Promise<T>[] { + const semaphore = new AsyncSemaphore(concurrencyLimit); + const results: Promise<T>[] = []; + + for (const promiseFunction of promises) { + results.push( + semaphore + .acquire() + .then(() => { + return promiseFunction(); + }) + .finally(() => { + semaphore.release(); + }), + ); + } + return results; +} |
