aboutsummaryrefslogtreecommitdiffstats
path: root/packages/trpc
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-07-21 19:18:58 +0100
committerGitHub <noreply@github.com>2024-07-21 19:18:58 +0100
commit9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch)
tree2423f82619d48656f8dc60870fab8b152eef4401 /packages/trpc
parentedbd98d7841388d1169a3a3b159367487bda431e (diff)
downloadkarakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'packages/trpc')
-rw-r--r--packages/trpc/routers/admin.ts26
-rw-r--r--packages/trpc/routers/bookmarks.ts6
2 files changed, 16 insertions, 16 deletions
diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts
index 05831b92..14cb4ac9 100644
--- a/packages/trpc/routers/admin.ts
+++ b/packages/trpc/routers/admin.ts
@@ -18,17 +18,17 @@ export const adminAppRouter = router({
numUsers: z.number(),
numBookmarks: z.number(),
crawlStats: z.object({
- queuedInRedis: z.number(),
+ queued: z.number(),
pending: z.number(),
failed: z.number(),
}),
inferenceStats: z.object({
- queuedInRedis: z.number(),
+ queued: z.number(),
pending: z.number(),
failed: z.number(),
}),
indexingStats: z.object({
- queuedInRedis: z.number(),
+ queued: z.number(),
}),
}),
)
@@ -38,15 +38,15 @@ export const adminAppRouter = router({
[{ value: numBookmarks }],
// Crawls
- pendingCrawlsInRedis,
+ queuedCrawls,
[{ value: pendingCrawls }],
[{ value: failedCrawls }],
// Indexing
- pendingIndexingInRedis,
+ queuedIndexing,
// Inference
- pendingInferenceInRedis,
+ queuedInferences,
[{ value: pendingInference }],
[{ value: failedInference }],
] = await Promise.all([
@@ -54,7 +54,7 @@ export const adminAppRouter = router({
ctx.db.select({ value: count() }).from(bookmarks),
// Crawls
- LinkCrawlerQueue.getWaitingCount(),
+ LinkCrawlerQueue.stats(),
ctx.db
.select({ value: count() })
.from(bookmarkLinks)
@@ -65,10 +65,10 @@ export const adminAppRouter = router({
.where(eq(bookmarkLinks.crawlStatus, "failure")),
// Indexing
- SearchIndexingQueue.getWaitingCount(),
+ SearchIndexingQueue.stats(),
// Inference
- OpenAIQueue.getWaitingCount(),
+ OpenAIQueue.stats(),
ctx.db
.select({ value: count() })
.from(bookmarks)
@@ -83,17 +83,17 @@ export const adminAppRouter = router({
numUsers,
numBookmarks,
crawlStats: {
- queuedInRedis: pendingCrawlsInRedis,
+ queued: queuedCrawls.pending + queuedCrawls.pending_retry,
pending: pendingCrawls,
failed: failedCrawls,
},
inferenceStats: {
- queuedInRedis: pendingInferenceInRedis,
+ queued: queuedInferences.pending + queuedInferences.pending_retry,
pending: pendingInference,
failed: failedInference,
},
indexingStats: {
- queuedInRedis: pendingIndexingInRedis,
+ queued: queuedIndexing.pending + queuedIndexing.pending_retry,
},
};
}),
@@ -116,7 +116,7 @@ export const adminAppRouter = router({
await Promise.all(
bookmarkIds.map((b) =>
- LinkCrawlerQueue.add("crawl", {
+ LinkCrawlerQueue.enqueue({
bookmarkId: b.id,
runInference: input.runInference,
}),
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 1e5e7dfc..43bb4db7 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -310,14 +310,14 @@ export const bookmarksAppRouter = router({
switch (bookmark.content.type) {
case BookmarkTypes.LINK: {
// The crawling job triggers openai when it's done
- await LinkCrawlerQueue.add("crawl", {
+ await LinkCrawlerQueue.enqueue({
bookmarkId: bookmark.id,
});
break;
}
case BookmarkTypes.TEXT:
case BookmarkTypes.ASSET: {
- await OpenAIQueue.add("openai", {
+ await OpenAIQueue.enqueue({
bookmarkId: bookmark.id,
});
break;
@@ -418,7 +418,7 @@ export const bookmarksAppRouter = router({
.input(z.object({ bookmarkId: z.string() }))
.use(ensureBookmarkOwnership)
.mutation(async ({ input }) => {
- await LinkCrawlerQueue.add("crawl", {
+ await LinkCrawlerQueue.enqueue({
bookmarkId: input.bookmarkId,
});
}),