aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/concurrency.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/shared/concurrency.ts')
-rw-r--r--packages/shared/concurrency.ts56
1 files changed, 56 insertions, 0 deletions
diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts
new file mode 100644
index 00000000..e2934f9d
--- /dev/null
+++ b/packages/shared/concurrency.ts
@@ -0,0 +1,56 @@
+export class AsyncSemaphore {
+ private permits: number;
+ private queue: (() => void)[] = [];
+
+ constructor(permits: number) {
+ this.permits = permits;
+ }
+
+ acquire(): Promise<void> {
+ if (this.permits > 0) {
+ this.permits--;
+ return Promise.resolve();
+ } else {
+ return new Promise<void>((resolve) => {
+ this.queue.push(resolve);
+ });
+ }
+ }
+
+ release(): void {
+ if (this.queue.length > 0) {
+ const resolve = this.queue.shift();
+ if (resolve) {
+ resolve();
+ }
+ } else {
+ this.permits++;
+ }
+ }
+
+ get available(): number {
+ return this.permits;
+ }
+}
+
+export function limitConcurrency<T>(
+ promises: (() => Promise<T>)[],
+ concurrencyLimit: number,
+): Promise<T>[] {
+ const semaphore = new AsyncSemaphore(concurrencyLimit);
+ const results: Promise<T>[] = [];
+
+ for (const promiseFunction of promises) {
+ results.push(
+ semaphore
+ .acquire()
+ .then(() => {
+ return promiseFunction();
+ })
+ .finally(() => {
+ semaphore.release();
+ }),
+ );
+ }
+ return results;
+}