aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/concurrency.test.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-01-12 23:47:25 +0000
committerMohamed Bassem <me@mbassem.com>2025-01-12 23:47:25 +0000
commit38d403bcc26244e778a6e7a2f75ee39a9ec7ed27 (patch)
tree192d990e67a0b95aece9381a149513d14f6b0464 /packages/shared/concurrency.test.ts
parent9fd26b472b18924ab11afcebace90329b0fe3abf (diff)
downloadkarakeep-38d403bcc26244e778a6e7a2f75ee39a9ec7ed27.tar.zst
fix: Limit concurrency of bulk actions. Fix #773
Diffstat (limited to 'packages/shared/concurrency.test.ts')
-rw-r--r--packages/shared/concurrency.test.ts170
1 files changed, 170 insertions, 0 deletions
diff --git a/packages/shared/concurrency.test.ts b/packages/shared/concurrency.test.ts
new file mode 100644
index 00000000..7ee1ccc3
--- /dev/null
+++ b/packages/shared/concurrency.test.ts
@@ -0,0 +1,170 @@
+// semaphore.test.ts
+
+import { describe, expect, it } from "vitest";
+
+import { AsyncSemaphore, limitConcurrency } from "./concurrency";
+
+describe("AsyncSemaphore", () => {
+ it("should acquire a permit if available", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should wait if no permit is available", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+
+ let acquired = false;
+ const acquirePromise = semaphore.acquire().then(() => {
+ acquired = true;
+ });
+
+ expect(acquired).toBe(false); // Should not resolve right away
+ semaphore.release();
+
+ await acquirePromise; // wait for the resolution of the promise
+ expect(acquired).toBe(true);
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should release a permit", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ semaphore.release();
+ expect(semaphore.available).toBe(1);
+ });
+
+ it("should handle multiple acquires and releases", async () => {
+ const semaphore = new AsyncSemaphore(2);
+ await semaphore.acquire();
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+
+ let resolved1 = false;
+ let resolved2 = false;
+ const promise1 = semaphore.acquire().then(() => {
+ resolved1 = true;
+ });
+ const promise2 = semaphore.acquire().then(() => {
+ resolved2 = true;
+ });
+
+ expect(resolved1).toBe(false);
+ expect(resolved2).toBe(false);
+
+ semaphore.release();
+ await promise1;
+ expect(resolved1).toBe(true);
+ expect(resolved2).toBe(false);
+
+ semaphore.release();
+ await promise2;
+ expect(resolved2).toBe(true);
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should acquire immediately if there is an available permit", async () => {
+ const semaphore = new AsyncSemaphore(2);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ });
+});
+
+describe("limitConcurrency", () => {
+ it("should execute all promises with concurrency limit", async () => {
+ const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
+
+ const promiseFunctions = [
+ async () => {
+ await delay(10);
+ return 1;
+ },
+ async () => {
+ await delay(5);
+ return 2;
+ },
+ async () => {
+ await delay(15);
+ return 3;
+ },
+ async () => {
+ await delay(10);
+ return 4;
+ },
+ ];
+
+ const concurrencyLimit = 2;
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+ expect(results).toHaveLength(promiseFunctions.length);
+
+ const resolvedResults = await Promise.all(results);
+ expect(resolvedResults).toEqual([1, 2, 3, 4]);
+ });
+
+ it("should limit concurrency", async () => {
+ const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
+ let runningCount = 0;
+ let maxCounter = 0;
+ const promiseFunctions = [...Array(50).keys()].map(() => async () => {
+ runningCount++;
+ maxCounter = Math.max(maxCounter, runningCount);
+ await delay(100);
+ runningCount--;
+ });
+ const concurrencyLimit = 2;
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+
+ await Promise.all(results);
+ expect(runningCount).toBe(0);
+ expect(maxCounter).toBe(concurrencyLimit);
+ });
+
+ it("should handle errors in promise functions", async () => {
+ const promiseFunctions = [
+ async () => {
+ return Promise.resolve(1);
+ },
+ async () => {
+ return Promise.resolve(2);
+ },
+ async () => {
+ return Promise.reject(new Error("Test Error"));
+ },
+ async () => {
+ return Promise.resolve(4);
+ },
+ ];
+ const concurrencyLimit = 2;
+
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+
+ await expect(Promise.all(results)).rejects.toThrow("Test Error"); // test that promise fails.
+
+ const resolveResults = await Promise.allSettled(results); // check that the other promises resolve even if the function fails
+
+ expect(resolveResults.map((r) => r.status)).toEqual([
+ "fulfilled",
+ "fulfilled",
+ "rejected",
+ "fulfilled",
+ ]);
+ expect(
+ resolveResults[0].status === "fulfilled" && resolveResults[0].value,
+ ).toBe(1);
+ expect(
+ resolveResults[1].status === "fulfilled" && resolveResults[1].value,
+ ).toBe(2);
+ expect(
+ resolveResults[2].status === "rejected" &&
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+ resolveResults[2].reason.message,
+ ).toBe("Test Error");
+ expect(
+ resolveResults[3].status === "fulfilled" && resolveResults[3].value,
+ ).toBe(4);
+ });
+});