diff options
| -rw-r--r-- | apps/web/components/dashboard/BulkBookmarksAction.tsx | 41 | ||||
| -rw-r--r-- | apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx | 15 | ||||
| -rw-r--r-- | apps/web/components/dashboard/bookmarks/BulkTagModal.tsx | 33 | ||||
| -rw-r--r-- | packages/shared/concurrency.test.ts | 170 | ||||
| -rw-r--r-- | packages/shared/concurrency.ts | 56 |
5 files changed, 285 insertions, 30 deletions
diff --git a/apps/web/components/dashboard/BulkBookmarksAction.tsx b/apps/web/components/dashboard/BulkBookmarksAction.tsx index eb0e0cec..9d5ecdbe 100644 --- a/apps/web/components/dashboard/BulkBookmarksAction.tsx +++ b/apps/web/components/dashboard/BulkBookmarksAction.tsx @@ -27,12 +27,15 @@ import { useRecrawlBookmark, useUpdateBookmark, } from "@hoarder/shared-react/hooks/bookmarks"; +import { limitConcurrency } from "@hoarder/shared/concurrency"; import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; import BulkManageListsModal from "./bookmarks/BulkManageListsModal"; import BulkTagModal from "./bookmarks/BulkTagModal"; import { ArchivedActionIcon, FavouritedActionIcon } from "./bookmarks/icons"; +const MAX_CONCURRENT_BULK_ACTIONS = 50; + export default function BulkBookmarksAction() { const { t } = useTranslation(); const { selectedBookmarks, isBulkEditEnabled } = useBulkActionsStore(); @@ -100,11 +103,15 @@ export default function BulkBookmarksAction() { (item) => item.content.type === BookmarkTypes.LINK, ); await Promise.all( - links.map((item) => - recrawlBookmarkMutator.mutateAsync({ - bookmarkId: item.id, - archiveFullPage, - }), + limitConcurrency( + links.map( + (item) => () => + recrawlBookmarkMutator.mutateAsync({ + bookmarkId: item.id, + archiveFullPage, + }), + ), + MAX_CONCURRENT_BULK_ACTIONS, ), ); toast({ @@ -145,12 +152,16 @@ export default function BulkBookmarksAction() { archived, }: UpdateBookmarkProps) => { await Promise.all( - selectedBookmarks.map((item) => - updateBookmarkMutator.mutateAsync({ - bookmarkId: item.id, - favourited, - archived, - }), + limitConcurrency( + selectedBookmarks.map( + (item) => () => + updateBookmarkMutator.mutateAsync({ + bookmarkId: item.id, + favourited, + archived, + }), + ), + MAX_CONCURRENT_BULK_ACTIONS, ), ); toast({ @@ -160,8 +171,12 @@ export default function BulkBookmarksAction() { const deleteBookmarks = async () => { await Promise.all( - selectedBookmarks.map((item) => - deleteBookmarkMutator.mutateAsync({ bookmarkId: item.id }), + limitConcurrency( + selectedBookmarks.map( + (item) => () => + deleteBookmarkMutator.mutateAsync({ bookmarkId: item.id }), + ), + MAX_CONCURRENT_BULK_ACTIONS, ), ); toast({ diff --git a/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx b/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx index 9c1f05d2..27e5c5e2 100644 --- a/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx +++ b/apps/web/components/dashboard/bookmarks/BulkManageListsModal.tsx @@ -21,6 +21,7 @@ import { useForm } from "react-hook-form"; import { z } from "zod"; import { useAddBookmarkToList } from "@hoarder/shared-react/hooks/lists"; +import { limitConcurrency } from "@hoarder/shared/concurrency"; import { BookmarkListSelector } from "../lists/BookmarkListSelector"; @@ -67,11 +68,15 @@ export default function BulkManageListsModal({ const onSubmit = async (value: z.infer<typeof formSchema>) => { const results = await Promise.allSettled( - bookmarkIds.map((bookmarkId) => - addToList({ - bookmarkId, - listId: value.listId, - }), + limitConcurrency( + bookmarkIds.map( + (bookmarkId) => () => + addToList({ + bookmarkId, + listId: value.listId, + }), + ), + 50, ), ); diff --git a/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx b/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx index 3c8e75e7..03af9e11 100644 --- a/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx +++ b/apps/web/components/dashboard/bookmarks/BulkTagModal.tsx @@ -11,6 +11,7 @@ import { toast } from "@/components/ui/use-toast"; import { useUpdateBookmarkTags } from "@hoarder/shared-react/hooks/bookmarks"; import { api } from "@hoarder/shared-react/trpc"; +import { limitConcurrency } from "@hoarder/shared/concurrency"; import { ZBookmark } from "@hoarder/shared/types/bookmarks"; import { TagsEditor } from "./TagsEditor"; @@ -59,12 +60,16 @@ export default function BulkTagModal({ const onAttach = async (tag: { tagName: string; tagId?: string }) => { const results = await Promise.allSettled( - bookmarkIds.map((id) => - mutateAsync({ - bookmarkId: id, - attach: [tag], - detach: [], - }), + limitConcurrency( + bookmarkIds.map( + (id) => () => + mutateAsync({ + bookmarkId: id, + attach: [tag], + detach: [], + }), + ), + 50, ), ); const successes = results.filter((r) => r.status == "fulfilled").length; @@ -81,12 +86,16 @@ export default function BulkTagModal({ tagName: string; }) => { const results = await Promise.allSettled( - bookmarkIds.map((id) => - mutateAsync({ - bookmarkId: id, - attach: [], - detach: [{ tagId }], - }), + limitConcurrency( + bookmarkIds.map( + (id) => () => + mutateAsync({ + bookmarkId: id, + attach: [], + detach: [{ tagId }], + }), + ), + 50, ), ); const successes = results.filter((r) => r.status == "fulfilled").length; diff --git a/packages/shared/concurrency.test.ts b/packages/shared/concurrency.test.ts new file mode 100644 index 00000000..7ee1ccc3 --- /dev/null +++ b/packages/shared/concurrency.test.ts @@ -0,0 +1,170 @@ +// semaphore.test.ts + +import { describe, expect, it } from "vitest"; + +import { AsyncSemaphore, limitConcurrency } from "./concurrency"; + +describe("AsyncSemaphore", () => { + it("should acquire a permit if available", async () => { + const semaphore = new AsyncSemaphore(1); + await semaphore.acquire(); + expect(semaphore.available).toBe(0); + }); + + it("should wait if no permit is available", async () => { + const semaphore = new AsyncSemaphore(1); + await semaphore.acquire(); + + let acquired = false; + const acquirePromise = semaphore.acquire().then(() => { + acquired = true; + }); + + expect(acquired).toBe(false); // Should not resolve right away + semaphore.release(); + + await acquirePromise; // wait for the resolution of the promise + expect(acquired).toBe(true); + expect(semaphore.available).toBe(0); + }); + + it("should release a permit", async () => { + const semaphore = new AsyncSemaphore(1); + await semaphore.acquire(); + expect(semaphore.available).toBe(0); + semaphore.release(); + expect(semaphore.available).toBe(1); + }); + + it("should handle multiple acquires and releases", async () => { + const semaphore = new AsyncSemaphore(2); + await semaphore.acquire(); + await semaphore.acquire(); + expect(semaphore.available).toBe(0); + + let resolved1 = false; + let resolved2 = false; + const promise1 = semaphore.acquire().then(() => { + resolved1 = true; + }); + const promise2 = semaphore.acquire().then(() => { + resolved2 = true; + }); + + expect(resolved1).toBe(false); + expect(resolved2).toBe(false); + + semaphore.release(); + await promise1; + expect(resolved1).toBe(true); + expect(resolved2).toBe(false); + + semaphore.release(); + await promise2; + expect(resolved2).toBe(true); + expect(semaphore.available).toBe(0); + }); + + it("should acquire immediately if there is an available permit", async () => { + const semaphore = new AsyncSemaphore(2); + await semaphore.acquire(); + expect(semaphore.available).toBe(1); + await semaphore.acquire(); + expect(semaphore.available).toBe(0); + }); +}); + +describe("limitConcurrency", () => { + it("should execute all promises with concurrency limit", async () => { + const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)); + + const promiseFunctions = [ + async () => { + await delay(10); + return 1; + }, + async () => { + await delay(5); + return 2; + }, + async () => { + await delay(15); + return 3; + }, + async () => { + await delay(10); + return 4; + }, + ]; + + const concurrencyLimit = 2; + const results = limitConcurrency(promiseFunctions, concurrencyLimit); + expect(results).toHaveLength(promiseFunctions.length); + + const resolvedResults = await Promise.all(results); + expect(resolvedResults).toEqual([1, 2, 3, 4]); + }); + + it("should limit concurrency", async () => { + const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)); + let runningCount = 0; + let maxCounter = 0; + const promiseFunctions = [...Array(50).keys()].map(() => async () => { + runningCount++; + maxCounter = Math.max(maxCounter, runningCount); + await delay(100); + runningCount--; + }); + const concurrencyLimit = 2; + const results = limitConcurrency(promiseFunctions, concurrencyLimit); + + await Promise.all(results); + expect(runningCount).toBe(0); + expect(maxCounter).toBe(concurrencyLimit); + }); + + it("should handle errors in promise functions", async () => { + const promiseFunctions = [ + async () => { + return Promise.resolve(1); + }, + async () => { + return Promise.resolve(2); + }, + async () => { + return Promise.reject(new Error("Test Error")); + }, + async () => { + return Promise.resolve(4); + }, + ]; + const concurrencyLimit = 2; + + const results = limitConcurrency(promiseFunctions, concurrencyLimit); + + await expect(Promise.all(results)).rejects.toThrow("Test Error"); // test that promise fails. + + const resolveResults = await Promise.allSettled(results); // check that the other promises resolve even if the function fails + + expect(resolveResults.map((r) => r.status)).toEqual([ + "fulfilled", + "fulfilled", + "rejected", + "fulfilled", + ]); + expect( + resolveResults[0].status === "fulfilled" && resolveResults[0].value, + ).toBe(1); + expect( + resolveResults[1].status === "fulfilled" && resolveResults[1].value, + ).toBe(2); + expect( + resolveResults[2].status === "rejected" && + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + resolveResults[2].reason.message, + ).toBe("Test Error"); + expect( + resolveResults[3].status === "fulfilled" && resolveResults[3].value, + ).toBe(4); + }); +}); diff --git a/packages/shared/concurrency.ts b/packages/shared/concurrency.ts new file mode 100644 index 00000000..e2934f9d --- /dev/null +++ b/packages/shared/concurrency.ts @@ -0,0 +1,56 @@ +export class AsyncSemaphore { + private permits: number; + private queue: (() => void)[] = []; + + constructor(permits: number) { + this.permits = permits; + } + + acquire(): Promise<void> { + if (this.permits > 0) { + this.permits--; + return Promise.resolve(); + } else { + return new Promise<void>((resolve) => { + this.queue.push(resolve); + }); + } + } + + release(): void { + if (this.queue.length > 0) { + const resolve = this.queue.shift(); + if (resolve) { + resolve(); + } + } else { + this.permits++; + } + } + + get available(): number { + return this.permits; + } +} + +export function limitConcurrency<T>( + promises: (() => Promise<T>)[], + concurrencyLimit: number, +): Promise<T>[] { + const semaphore = new AsyncSemaphore(concurrencyLimit); + const results: Promise<T>[] = []; + + for (const promiseFunction of promises) { + results.push( + semaphore + .acquire() + .then(() => { + return promiseFunction(); + }) + .finally(() => { + semaphore.release(); + }), + ); + } + return results; +} |
