aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkamtschatka <simon.schatka@gmx.at>2024-07-21 23:08:06 +0200
committerGitHub <noreply@github.com>2024-07-21 22:08:06 +0100
commitc5c62de28944077004f01960ee9f7e12b7d26c2c (patch)
treea31c88ff70938c4afe062afbdb078ea3cb83df9a
parent4c23ea931dc01e0d99869eb518c85f891d006baa (diff)
downloadkarakeep-c5c62de28944077004f01960ee9f7e12b7d26c2c.tar.zst
fix: async/await issues with the new queue (#319)
-rw-r--r--apps/workers/crawlerWorker.ts4
-rw-r--r--apps/workers/openaiWorker.ts2
-rw-r--r--packages/queue/runner.test.ts14
-rw-r--r--packages/shared/queues.ts8
-rw-r--r--packages/trpc/routers/bookmarks.ts10
-rw-r--r--packages/trpc/routers/tags.ts14
6 files changed, 27 insertions, 25 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index a1917523..bb226a27 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -653,13 +653,13 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- OpenAIQueue.enqueue({
+ await OpenAIQueue.enqueue({
bookmarkId,
});
}
// Update the search index
- triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId);
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 9e6e2f23..55695938 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -397,5 +397,5 @@ async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
await connectTags(bookmarkId, tags, bookmark.userId);
// Update the search index
- triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId);
}
diff --git a/packages/queue/runner.test.ts b/packages/queue/runner.test.ts
index 9e50c9a5..7777b422 100644
--- a/packages/queue/runner.test.ts
+++ b/packages/queue/runner.test.ts
@@ -157,9 +157,9 @@ describe("SqiteQueueRunner", () => {
barrier,
);
- queue.enqueue({ increment: 1 });
- queue.enqueue({ increment: 2 });
- queue.enqueue({ increment: 3 });
+ await queue.enqueue({ increment: 1 });
+ await queue.enqueue({ increment: 2 });
+ await queue.enqueue({ increment: 3 });
expect(await queue.stats()).toEqual({
pending: 3,
@@ -215,9 +215,9 @@ describe("SqiteQueueRunner", () => {
barrier.allowParticipantsToProceed();
const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier);
- queue.enqueue({ increment: 1, succeedAfter: 2 });
- queue.enqueue({ increment: 1, succeedAfter: 10 });
- queue.enqueue({ increment: 3, succeedAfter: 0 });
+ await queue.enqueue({ increment: 1, succeedAfter: 2 });
+ await queue.enqueue({ increment: 1, succeedAfter: 10 });
+ await queue.enqueue({ increment: 3, succeedAfter: 0 });
const runnerPromise = runner.runUntilEmpty();
@@ -256,7 +256,7 @@ describe("SqiteQueueRunner", () => {
barrier,
);
- queue.enqueue({ increment: 1, blockForSec: 10 });
+ await queue.enqueue({ increment: 1, blockForSec: 10 });
await runner.runUntilEmpty();
expect(await queue.stats()).toEqual({
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index 8747fb3f..756f095d 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -64,15 +64,15 @@ export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
},
);
-export function triggerSearchReindex(bookmarkId: string) {
- SearchIndexingQueue.enqueue({
+export async function triggerSearchReindex(bookmarkId: string) {
+ await SearchIndexingQueue.enqueue({
bookmarkId,
type: "index",
});
}
-export function triggerSearchDeletion(bookmarkId: string) {
- SearchIndexingQueue.enqueue({
+export async function triggerSearchDeletion(bookmarkId: string) {
+ await SearchIndexingQueue.enqueue({
bookmarkId: bookmarkId,
type: "delete",
});
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 43bb4db7..d2aa36bb 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -323,7 +323,7 @@ export const bookmarksAppRouter = router({
break;
}
}
- triggerSearchReindex(bookmark.id);
+ await triggerSearchReindex(bookmark.id);
return bookmark;
}),
@@ -353,7 +353,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
- triggerSearchReindex(input.bookmarkId);
+ await triggerSearchReindex(input.bookmarkId);
return res[0];
}),
@@ -379,7 +379,7 @@ export const bookmarksAppRouter = router({
message: "Bookmark not found",
});
}
- triggerSearchReindex(input.bookmarkId);
+ await triggerSearchReindex(input.bookmarkId);
}),
deleteBookmark: authedProcedure
@@ -405,7 +405,7 @@ export const bookmarksAppRouter = router({
eq(bookmarks.id, input.bookmarkId),
),
);
- triggerSearchDeletion(input.bookmarkId);
+ await triggerSearchDeletion(input.bookmarkId);
if (deleted.changes > 0 && bookmark) {
await cleanupAssetForBookmark({
asset: bookmark.asset,
@@ -747,7 +747,7 @@ export const bookmarksAppRouter = router({
})),
)
.onConflictDoNothing();
- triggerSearchReindex(input.bookmarkId);
+ await triggerSearchReindex(input.bookmarkId);
return {
bookmarkId: input.bookmarkId,
attached: allIds,
diff --git a/packages/trpc/routers/tags.ts b/packages/trpc/routers/tags.ts
index 7cb2c971..e76a09ec 100644
--- a/packages/trpc/routers/tags.ts
+++ b/packages/trpc/routers/tags.ts
@@ -115,8 +115,10 @@ export const tagsAppRouter = router({
if (res.changes == 0) {
throw new TRPCError({ code: "NOT_FOUND" });
}
- affectedBookmarks.forEach(({ bookmarkId }) =>
- triggerSearchReindex(bookmarkId),
+ await Promise.all(
+ affectedBookmarks.map(({ bookmarkId }) =>
+ triggerSearchReindex(bookmarkId),
+ ),
);
}),
deleteUnused: authedProcedure
@@ -185,11 +187,11 @@ export const tagsAppRouter = router({
},
},
);
- await Promise.all([
+ await Promise.all(
affectedBookmarks
.map((b) => b.bookmarkId)
.map((id) => triggerSearchReindex(id)),
- ]);
+ );
} catch (e) {
// Best Effort attempt to reindex affected bookmarks
console.error("Failed to reindex affected bookmarks", e);
@@ -304,9 +306,9 @@ export const tagsAppRouter = router({
);
try {
- await Promise.all([
+ await Promise.all(
affectedBookmarks.map((id) => triggerSearchReindex(id)),
- ]);
+ );
} catch (e) {
// Best Effort attempt to reindex affected bookmarks
console.error("Failed to reindex affected bookmarks", e);