aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/concurrency.test.ts
blob: 7ee1ccc33bdc0eb61e9b4eb19085b1d2908bb8a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// semaphore.test.ts

import { describe, expect, it } from "vitest";

import { AsyncSemaphore, limitConcurrency } from "./concurrency";

describe("AsyncSemaphore", () => {
  it("should acquire a permit if available", async () => {
    const semaphore = new AsyncSemaphore(1);
    await semaphore.acquire();
    expect(semaphore.available).toBe(0);
  });

  it("should wait if no permit is available", async () => {
    const semaphore = new AsyncSemaphore(1);
    await semaphore.acquire();

    let acquired = false;
    const acquirePromise = semaphore.acquire().then(() => {
      acquired = true;
    });

    expect(acquired).toBe(false); // Should not resolve right away
    semaphore.release();

    await acquirePromise; // wait for the resolution of the promise
    expect(acquired).toBe(true);
    expect(semaphore.available).toBe(0);
  });

  it("should release a permit", async () => {
    const semaphore = new AsyncSemaphore(1);
    await semaphore.acquire();
    expect(semaphore.available).toBe(0);
    semaphore.release();
    expect(semaphore.available).toBe(1);
  });

  it("should handle multiple acquires and releases", async () => {
    const semaphore = new AsyncSemaphore(2);
    await semaphore.acquire();
    await semaphore.acquire();
    expect(semaphore.available).toBe(0);

    let resolved1 = false;
    let resolved2 = false;
    const promise1 = semaphore.acquire().then(() => {
      resolved1 = true;
    });
    const promise2 = semaphore.acquire().then(() => {
      resolved2 = true;
    });

    expect(resolved1).toBe(false);
    expect(resolved2).toBe(false);

    semaphore.release();
    await promise1;
    expect(resolved1).toBe(true);
    expect(resolved2).toBe(false);

    semaphore.release();
    await promise2;
    expect(resolved2).toBe(true);
    expect(semaphore.available).toBe(0);
  });

  it("should acquire immediately if there is an available permit", async () => {
    const semaphore = new AsyncSemaphore(2);
    await semaphore.acquire();
    expect(semaphore.available).toBe(1);
    await semaphore.acquire();
    expect(semaphore.available).toBe(0);
  });
});

describe("limitConcurrency", () => {
  it("should execute all promises with concurrency limit", async () => {
    const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));

    const promiseFunctions = [
      async () => {
        await delay(10);
        return 1;
      },
      async () => {
        await delay(5);
        return 2;
      },
      async () => {
        await delay(15);
        return 3;
      },
      async () => {
        await delay(10);
        return 4;
      },
    ];

    const concurrencyLimit = 2;
    const results = limitConcurrency(promiseFunctions, concurrencyLimit);
    expect(results).toHaveLength(promiseFunctions.length);

    const resolvedResults = await Promise.all(results);
    expect(resolvedResults).toEqual([1, 2, 3, 4]);
  });

  it("should limit concurrency", async () => {
    const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
    let runningCount = 0;
    let maxCounter = 0;
    const promiseFunctions = [...Array(50).keys()].map(() => async () => {
      runningCount++;
      maxCounter = Math.max(maxCounter, runningCount);
      await delay(100);
      runningCount--;
    });
    const concurrencyLimit = 2;
    const results = limitConcurrency(promiseFunctions, concurrencyLimit);

    await Promise.all(results);
    expect(runningCount).toBe(0);
    expect(maxCounter).toBe(concurrencyLimit);
  });

  it("should handle errors in promise functions", async () => {
    const promiseFunctions = [
      async () => {
        return Promise.resolve(1);
      },
      async () => {
        return Promise.resolve(2);
      },
      async () => {
        return Promise.reject(new Error("Test Error"));
      },
      async () => {
        return Promise.resolve(4);
      },
    ];
    const concurrencyLimit = 2;

    const results = limitConcurrency(promiseFunctions, concurrencyLimit);

    await expect(Promise.all(results)).rejects.toThrow("Test Error"); // test that promise fails.

    const resolveResults = await Promise.allSettled(results); // check that the other promises resolve even if the function fails

    expect(resolveResults.map((r) => r.status)).toEqual([
      "fulfilled",
      "fulfilled",
      "rejected",
      "fulfilled",
    ]);
    expect(
      resolveResults[0].status === "fulfilled" && resolveResults[0].value,
    ).toBe(1);
    expect(
      resolveResults[1].status === "fulfilled" && resolveResults[1].value,
    ).toBe(2);
    expect(
      resolveResults[2].status === "rejected" &&
        // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
        resolveResults[2].reason.message,
    ).toBe("Test Error");
    expect(
      resolveResults[3].status === "fulfilled" && resolveResults[3].value,
    ).toBe(4);
  });
});