diff options
| author | Mohamed Bassem <me@mbassem.com> | 2024-07-21 19:18:58 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-21 19:18:58 +0100 |
| commit | 9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch) | |
| tree | 2423f82619d48656f8dc60870fab8b152eef4401 /packages | |
| parent | edbd98d7841388d1169a3a3b159367487bda431e (diff) | |
| download | karakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst | |
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/shared/config.ts | 10 | ||||
| -rw-r--r-- | packages/shared/package.json | 2 | ||||
| -rw-r--r-- | packages/shared/queues.ts | 60 | ||||
| -rw-r--r-- | packages/trpc/routers/admin.ts | 26 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 6 |
5 files changed, 44 insertions, 60 deletions
diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 2c739a0c..28bfdbdb 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -15,10 +15,6 @@ const allEnv = z.object({ OLLAMA_BASE_URL: z.string().url().optional(), INFERENCE_TEXT_MODEL: z.string().default("gpt-3.5-turbo-0125"), INFERENCE_IMAGE_MODEL: z.string().default("gpt-4o-2024-05-13"), - REDIS_HOST: z.string().default("localhost"), - REDIS_PORT: z.coerce.number().default(6379), - REDIS_DB_IDX: z.coerce.number().optional(), - REDIS_PASSWORD: z.string().optional(), CRAWLER_HEADLESS_BROWSER: stringBool("true"), BROWSER_WEB_URL: z.string().url().optional(), BROWSER_WEBSOCKET_URL: z.string().url().optional(), @@ -58,12 +54,6 @@ const serverConfigSchema = allEnv.transform((val) => { imageModel: val.INFERENCE_IMAGE_MODEL, inferredTagLang: val.INFERENCE_LANG, }, - bullMQ: { - redisHost: val.REDIS_HOST, - redisPort: val.REDIS_PORT, - redisDBIdx: val.REDIS_DB_IDX, - redisPassword: val.REDIS_PASSWORD, - }, crawler: { numWorkers: val.CRAWLER_NUM_WORKERS, headlessBrowser: val.CRAWLER_HEADLESS_BROWSER, diff --git a/packages/shared/package.json b/packages/shared/package.json index 032f3db5..2b1ae973 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -5,7 +5,7 @@ "private": true, "type": "module", "dependencies": { - "bullmq": "^5.1.9", + "@hoarder/queue": "workspace:^0.1.0", "meilisearch": "^0.37.0", "winston": "^3.11.0", "zod": "^3.22.4" diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index 2b890755..8747fb3f 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -1,14 +1,17 @@ -import { Queue } from "bullmq"; +import path from "node:path"; import { z } from "zod"; +import { buildDBClient, migrateDB, SqliteQueue } from "@hoarder/queue"; + import serverConfig from "./config"; -export const queueConnectionDetails = { - host: serverConfig.bullMQ.redisHost, - port: serverConfig.bullMQ.redisPort, - db: serverConfig.bullMQ.redisDBIdx, - password: serverConfig.bullMQ.redisPassword, -}; +const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db"); + +const queueDB = buildDBClient(QUEUE_DB_PATH); + +export function runQueueDBMigrations() { + migrateDB(queueDB); +} // Link Crawler export const zCrawlLinkRequestSchema = z.object({ @@ -17,16 +20,12 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.infer<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = new Queue<ZCrawlLinkRequest, void>( +export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>( "link_crawler_queue", + queueDB, { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 5, - backoff: { - type: "exponential", - delay: 1000, - }, + defaultJobArgs: { + numRetries: 5, }, }, ); @@ -37,16 +36,15 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = new Queue<ZOpenAIRequest, void>("openai_queue", { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 3, - backoff: { - type: "exponential", - delay: 500, +export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>( + "openai_queue", + queueDB, + { + defaultJobArgs: { + numRetries: 3, }, }, -}); +); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -56,29 +54,25 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = new Queue<ZSearchIndexingRequest, void>( +export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>( "searching_indexing", + queueDB, { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 5, - backoff: { - type: "exponential", - delay: 1000, - }, + defaultJobArgs: { + numRetries: 5, }, }, ); export function triggerSearchReindex(bookmarkId: string) { - SearchIndexingQueue.add("search_indexing", { + SearchIndexingQueue.enqueue({ bookmarkId, type: "index", }); } export function triggerSearchDeletion(bookmarkId: string) { - SearchIndexingQueue.add("search_indexing", { + SearchIndexingQueue.enqueue({ bookmarkId: bookmarkId, type: "delete", }); diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index 05831b92..14cb4ac9 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -18,17 +18,17 @@ export const adminAppRouter = router({ numUsers: z.number(), numBookmarks: z.number(), crawlStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), pending: z.number(), failed: z.number(), }), inferenceStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), pending: z.number(), failed: z.number(), }), indexingStats: z.object({ - queuedInRedis: z.number(), + queued: z.number(), }), }), ) @@ -38,15 +38,15 @@ export const adminAppRouter = router({ [{ value: numBookmarks }], // Crawls - pendingCrawlsInRedis, + queuedCrawls, [{ value: pendingCrawls }], [{ value: failedCrawls }], // Indexing - pendingIndexingInRedis, + queuedIndexing, // Inference - pendingInferenceInRedis, + queuedInferences, [{ value: pendingInference }], [{ value: failedInference }], ] = await Promise.all([ @@ -54,7 +54,7 @@ export const adminAppRouter = router({ ctx.db.select({ value: count() }).from(bookmarks), // Crawls - LinkCrawlerQueue.getWaitingCount(), + LinkCrawlerQueue.stats(), ctx.db .select({ value: count() }) .from(bookmarkLinks) @@ -65,10 +65,10 @@ export const adminAppRouter = router({ .where(eq(bookmarkLinks.crawlStatus, "failure")), // Indexing - SearchIndexingQueue.getWaitingCount(), + SearchIndexingQueue.stats(), // Inference - OpenAIQueue.getWaitingCount(), + OpenAIQueue.stats(), ctx.db .select({ value: count() }) .from(bookmarks) @@ -83,17 +83,17 @@ export const adminAppRouter = router({ numUsers, numBookmarks, crawlStats: { - queuedInRedis: pendingCrawlsInRedis, + queued: queuedCrawls.pending + queuedCrawls.pending_retry, pending: pendingCrawls, failed: failedCrawls, }, inferenceStats: { - queuedInRedis: pendingInferenceInRedis, + queued: queuedInferences.pending + queuedInferences.pending_retry, pending: pendingInference, failed: failedInference, }, indexingStats: { - queuedInRedis: pendingIndexingInRedis, + queued: queuedIndexing.pending + queuedIndexing.pending_retry, }, }; }), @@ -116,7 +116,7 @@ export const adminAppRouter = router({ await Promise.all( bookmarkIds.map((b) => - LinkCrawlerQueue.add("crawl", { + LinkCrawlerQueue.enqueue({ bookmarkId: b.id, runInference: input.runInference, }), diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 1e5e7dfc..43bb4db7 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -310,14 +310,14 @@ export const bookmarksAppRouter = router({ switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done - await LinkCrawlerQueue.add("crawl", { + await LinkCrawlerQueue.enqueue({ bookmarkId: bookmark.id, }); break; } case BookmarkTypes.TEXT: case BookmarkTypes.ASSET: { - await OpenAIQueue.add("openai", { + await OpenAIQueue.enqueue({ bookmarkId: bookmark.id, }); break; @@ -418,7 +418,7 @@ export const bookmarksAppRouter = router({ .input(z.object({ bookmarkId: z.string() })) .use(ensureBookmarkOwnership) .mutation(async ({ input }) => { - await LinkCrawlerQueue.add("crawl", { + await LinkCrawlerQueue.enqueue({ bookmarkId: input.bookmarkId, }); }), |
