aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/web/components/dashboard/BulkBookmarksAction.tsx41
-rw-r--r--apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx15
-rw-r--r--apps/web/components/dashboard/bookmarks/BulkTagModal.tsx33
-rw-r--r--packages/shared/concurrency.test.ts170
-rw-r--r--packages/shared/concurrency.ts56
5 files changed, 285 insertions, 30 deletions
diff --git a/apps/web/components/dashboard/BulkBookmarksAction.tsx b/apps/web/components/dashboard/BulkBookmarksAction.tsx
index eb0e0cec..9d5ecdbe 100644
--- a/apps/web/components/dashboard/BulkBookmarksAction.tsx
+++ b/apps/web/components/dashboard/BulkBookmarksAction.tsx
@@ -27,12 +27,15 @@ import {
useRecrawlBookmark,
useUpdateBookmark,
} from "@hoarder/shared-react/hooks/bookmarks";
+import { limitConcurrency } from "@hoarder/shared/concurrency";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
import BulkManageListsModal from "./bookmarks/BulkManageListsModal";
import BulkTagModal from "./bookmarks/BulkTagModal";
import { ArchivedActionIcon, FavouritedActionIcon } from "./bookmarks/icons";
+const MAX_CONCURRENT_BULK_ACTIONS = 50;
+
export default function BulkBookmarksAction() {
const { t } = useTranslation();
const { selectedBookmarks, isBulkEditEnabled } = useBulkActionsStore();
@@ -100,11 +103,15 @@ export default function BulkBookmarksAction() {
(item) => item.content.type === BookmarkTypes.LINK,
);
await Promise.all(
- links.map((item) =>
- recrawlBookmarkMutator.mutateAsync({
- bookmarkId: item.id,
- archiveFullPage,
- }),
+ limitConcurrency(
+ links.map(
+ (item) => () =>
+ recrawlBookmarkMutator.mutateAsync({
+ bookmarkId: item.id,
+ archiveFullPage,
+ }),
+ ),
+ MAX_CONCURRENT_BULK_ACTIONS,
),
);
toast({
@@ -145,12 +152,16 @@ export default function BulkBookmarksAction() {
archived,
}: UpdateBookmarkProps) => {
await Promise.all(
- selectedBookmarks.map((item) =>
- updateBookmarkMutator.mutateAsync({
- bookmarkId: item.id,
- favourited,
- archived,
- }),
+ limitConcurrency(
+ selectedBookmarks.map(
+ (item) => () =>
+ updateBookmarkMutator.mutateAsync({
+ bookmarkId: item.id,
+ favourited,
+ archived,
+ }),
+ ),
+ MAX_CONCURRENT_BULK_ACTIONS,
),
);
toast({
@@ -160,8 +171,12 @@ export default function BulkBookmarksAction() {
const deleteBookmarks = async () => {
await Promise.all(
- selectedBookmarks.map((item) =>
- deleteBookmarkMutator.mutateAsync({ bookmarkId: item.id }),
+ limitConcurrency(
+ selectedBookmarks.map(
+ (item) => () =>
+ deleteBookmarkMutator.mutateAsync({ bookmarkId: item.id }),
+ ),
+ MAX_CONCURRENT_BULK_ACTIONS,
),
);
toast({
diff --git a/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx b/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx
index 9c1f05d2..27e5c5e2 100644
--- a/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx
+++ b/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx
@@ -21,6 +21,7 @@ import { useForm } from "react-hook-form";
import { z } from "zod";
import { useAddBookmarkToList } from "@hoarder/shared-react/hooks/lists";
+import { limitConcurrency } from "@hoarder/shared/concurrency";
import { BookmarkListSelector } from "../lists/BookmarkListSelector";
@@ -67,11 +68,15 @@ export default function BulkManageListsModal({
const onSubmit = async (value: z.infer<typeof formSchema>) => {
const results = await Promise.allSettled(
- bookmarkIds.map((bookmarkId) =>
- addToList({
- bookmarkId,
- listId: value.listId,
- }),
+ limitConcurrency(
+ bookmarkIds.map(
+ (bookmarkId) => () =>
+ addToList({
+ bookmarkId,
+ listId: value.listId,
+ }),
+ ),
+ 50,
),
);
diff --git a/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx b/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx
index 3c8e75e7..03af9e11 100644
--- a/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx
+++ b/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx
@@ -11,6 +11,7 @@ import { toast } from "@/components/ui/use-toast";
import { useUpdateBookmarkTags } from "@hoarder/shared-react/hooks/bookmarks";
import { api } from "@hoarder/shared-react/trpc";
+import { limitConcurrency } from "@hoarder/shared/concurrency";
import { ZBookmark } from "@hoarder/shared/types/bookmarks";
import { TagsEditor } from "./TagsEditor";
@@ -59,12 +60,16 @@ export default function BulkTagModal({
const onAttach = async (tag: { tagName: string; tagId?: string }) => {
const results = await Promise.allSettled(
- bookmarkIds.map((id) =>
- mutateAsync({
- bookmarkId: id,
- attach: [tag],
- detach: [],
- }),
+ limitConcurrency(
+ bookmarkIds.map(
+ (id) => () =>
+ mutateAsync({
+ bookmarkId: id,
+ attach: [tag],
+ detach: [],
+ }),
+ ),
+ 50,
),
);
const successes = results.filter((r) => r.status == "fulfilled").length;
@@ -81,12 +86,16 @@ export default function BulkTagModal({
tagName: string;
}) => {
const results = await Promise.allSettled(
- bookmarkIds.map((id) =>
- mutateAsync({
- bookmarkId: id,
- attach: [],
- detach: [{ tagId }],
- }),
+ limitConcurrency(
+ bookmarkIds.map(
+ (id) => () =>
+ mutateAsync({
+ bookmarkId: id,
+ attach: [],
+ detach: [{ tagId }],
+ }),
+ ),
+ 50,
),
);
const successes = results.filter((r) => r.status == "fulfilled").length;
diff --git a/packages/shared/concurrency.test.ts b/packages/shared/concurrency.test.ts
new file mode 100644
index 00000000..7ee1ccc3
--- /dev/null
+++ b/packages/shared/concurrency.test.ts
@@ -0,0 +1,170 @@
+// semaphore.test.ts
+
+import { describe, expect, it } from "vitest";
+
+import { AsyncSemaphore, limitConcurrency } from "./concurrency";
+
+describe("AsyncSemaphore", () => {
+ it("should acquire a permit if available", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should wait if no permit is available", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+
+ let acquired = false;
+ const acquirePromise = semaphore.acquire().then(() => {
+ acquired = true;
+ });
+
+ expect(acquired).toBe(false); // Should not resolve right away
+ semaphore.release();
+
+ await acquirePromise; // wait for the resolution of the promise
+ expect(acquired).toBe(true);
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should release a permit", async () => {
+ const semaphore = new AsyncSemaphore(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ semaphore.release();
+ expect(semaphore.available).toBe(1);
+ });
+
+ it("should handle multiple acquires and releases", async () => {
+ const semaphore = new AsyncSemaphore(2);
+ await semaphore.acquire();
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+
+ let resolved1 = false;
+ let resolved2 = false;
+ const promise1 = semaphore.acquire().then(() => {
+ resolved1 = true;
+ });
+ const promise2 = semaphore.acquire().then(() => {
+ resolved2 = true;
+ });
+
+ expect(resolved1).toBe(false);
+ expect(resolved2).toBe(false);
+
+ semaphore.release();
+ await promise1;
+ expect(resolved1).toBe(true);
+ expect(resolved2).toBe(false);
+
+ semaphore.release();
+ await promise2;
+ expect(resolved2).toBe(true);
+ expect(semaphore.available).toBe(0);
+ });
+
+ it("should acquire immediately if there is an available permit", async () => {
+ const semaphore = new AsyncSemaphore(2);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(1);
+ await semaphore.acquire();
+ expect(semaphore.available).toBe(0);
+ });
+});
+
+describe("limitConcurrency", () => {
+ it("should execute all promises with concurrency limit", async () => {
+ const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
+
+ const promiseFunctions = [
+ async () => {
+ await delay(10);
+ return 1;
+ },
+ async () => {
+ await delay(5);
+ return 2;
+ },
+ async () => {
+ await delay(15);
+ return 3;
+ },
+ async () => {
+ await delay(10);
+ return 4;
+ },
+ ];
+
+ const concurrencyLimit = 2;
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+ expect(results).toHaveLength(promiseFunctions.length);
+
+ const resolvedResults = await Promise.all(results);
+ expect(resolvedResults).toEqual([1, 2, 3, 4]);
+ });
+
+ it("should limit concurrency", async () => {
+ const delay = (ms: number) => new Promise((res) => setTimeout(res, ms));
+ let runningCount = 0;
+ let maxCounter = 0;
+ const promiseFunctions = [...Array(50).keys()].map(() => async () => {
+ runningCount++;
+ maxCounter = Math.max(maxCounter, runningCount);
+ await delay(100);
+ runningCount--;
+ });
+ const concurrencyLimit = 2;
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+
+ await Promise.all(results);
+ expect(runningCount).toBe(0);
+ expect(maxCounter).toBe(concurrencyLimit);
+ });
+
+ it("should handle errors in promise functions", async () => {
+ const promiseFunctions = [
+ async () => {
+ return Promise.resolve(1);
+ },
+ async () => {
+ return Promise.resolve(2);
+ },
+ async () => {
+ return Promise.reject(new Error("Test Error"));
+ },
+ async () => {
+ return Promise.resolve(4);
+ },
+ ];
+ const concurrencyLimit = 2;
+
+ const results = limitConcurrency(promiseFunctions, concurrencyLimit);
+
+ await expect(Promise.all(results)).rejects.toThrow("Test Error"); // test that promise fails.
+
+ const resolveResults = await Promise.allSettled(results); // check that the other promises resolve even if the function fails
+
+ expect(resolveResults.map((r) => r.status)).toEqual([
+ "fulfilled",
+ "fulfilled",
+ "rejected",
+ "fulfilled",
+ ]);
+ expect(
+ resolveResults[0].status === "fulfilled" && resolveResults[0].value,
+ ).toBe(1);
+ expect(
+ resolveResults[1].status === "fulfilled" && resolveResults[1].value,
+ ).toBe(2);
+ expect(
+ resolveResults[2].status === "rejected" &&
+ // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
+ resolveResults[2].reason.message,
+ ).toBe("Test Error");
+ expect(
+ resolveResults[3].status === "fulfilled" && resolveResults[3].value,
+ ).toBe(4);
+ });
+});
diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts
new file mode 100644
index 00000000..e2934f9d
--- /dev/null
+++ b/packages/shared/concurrency.ts
@@ -0,0 +1,56 @@
+export class AsyncSemaphore {
+ private permits: number;
+ private queue: (() => void)[] = [];
+
+ constructor(permits: number) {
+ this.permits = permits;
+ }
+
+ acquire(): Promise<void> {
+ if (this.permits > 0) {
+ this.permits--;
+ return Promise.resolve();
+ } else {
+ return new Promise<void>((resolve) => {
+ this.queue.push(resolve);
+ });
+ }
+ }
+
+ release(): void {
+ if (this.queue.length > 0) {
+ const resolve = this.queue.shift();
+ if (resolve) {
+ resolve();
+ }
+ } else {
+ this.permits++;
+ }
+ }
+
+ get available(): number {
+ return this.permits;
+ }
+}
+
+export function limitConcurrency<T>(
+ promises: (() => Promise<T>)[],
+ concurrencyLimit: number,
+): Promise<T>[] {
+ const semaphore = new AsyncSemaphore(concurrencyLimit);
+ const results: Promise<T>[] = [];
+
+ for (const promiseFunction of promises) {
+ results.push(
+ semaphore
+ .acquire()
+ .then(() => {
+ return promiseFunction();
+ })
+ .finally(() => {
+ semaphore.release();
+ }),
+ );
+ }
+ return results;
+}