From 9edd154440c18bcc4542560e229eb293f9e0c2d4 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 21 Jul 2024 19:18:58 +0100 Subject: refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309) --- packages/trpc/routers/admin.ts | 26 +++++++++++++------------- packages/trpc/routers/bookmarks.ts | 6 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) (limited to 'packages/trpc') diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index 05831b92..14cb4ac9 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -18,17 +18,17 @@ export const adminAppRouter = router({ numUsers: z.number(), numBookmarks: z.number(), crawlStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), pending: z.number(), failed: z.number(), }), inferenceStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), pending: z.number(), failed: z.number(), }), indexingStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), }), }), ) @@ -38,15 +38,15 @@ export const adminAppRouter = router({ [{ value: numBookmarks }], // Crawls - pendingCrawlsInRedis, + queuedCrawls, [{ value: pendingCrawls }], [{ value: failedCrawls }], // Indexing - pendingIndexingInRedis, + queuedIndexing, // Inference - pendingInferenceInRedis, + queuedInferences, [{ value: pendingInference }], [{ value: failedInference }], ] = await Promise.all([ @@ -54,7 +54,7 @@ export const adminAppRouter = router({ ctx.db.select({ value: count() }).from(bookmarks), // Crawls - LinkCrawlerQueue.getWaitingCount(), + LinkCrawlerQueue.stats(), ctx.db .select({ value: count() }) .from(bookmarkLinks) @@ -65,10 +65,10 @@ export const adminAppRouter = router({ .where(eq(bookmarkLinks.crawlStatus, "failure")), // Indexing - SearchIndexingQueue.getWaitingCount(), + SearchIndexingQueue.stats(), // Inference - OpenAIQueue.getWaitingCount(), + OpenAIQueue.stats(), ctx.db .select({ value: count() }) .from(bookmarks) @@ -83,17 +83,17 @@ export const adminAppRouter = router({ numUsers, numBookmarks, crawlStats: { - queuedInRedis: pendingCrawlsInRedis, + queued: queuedCrawls.pending + queuedCrawls.pending_retry, pending: pendingCrawls, failed: failedCrawls, }, inferenceStats: { - queuedInRedis: pendingInferenceInRedis, + queued: queuedInferences.pending + queuedInferences.pending_retry, pending: pendingInference, failed: failedInference, }, indexingStats: { - queuedInRedis: pendingIndexingInRedis, + queued: queuedIndexing.pending + queuedIndexing.pending_retry, }, }; }), @@ -116,7 +116,7 @@ export const adminAppRouter = router({ await Promise.all( bookmarkIds.map((b) => - LinkCrawlerQueue.add("crawl", { + LinkCrawlerQueue.enqueue({ bookmarkId: b.id, runInference: input.runInference, }), diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 1e5e7dfc..43bb4db7 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -310,14 +310,14 @@ export const bookmarksAppRouter = router({ switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done - await LinkCrawlerQueue.add("crawl", { + await LinkCrawlerQueue.enqueue({ bookmarkId: bookmark.id, }); break; } case BookmarkTypes.TEXT: case BookmarkTypes.ASSET: { - await OpenAIQueue.add("openai", { + await OpenAIQueue.enqueue({ bookmarkId: bookmark.id, }); break; @@ -418,7 +418,7 @@ export const bookmarksAppRouter = router({ .input(z.object({ bookmarkId: z.string() })) .use(ensureBookmarkOwnership) .mutation(async ({ input }) => { - await LinkCrawlerQueue.add("crawl", { + await LinkCrawlerQueue.enqueue({ bookmarkId: input.bookmarkId, }); }), -- cgit v1.2.3-70-g09d2