diff options
| author | Mohamed Bassem <me@mbassem.com> | 2024-07-21 19:18:58 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-21 19:18:58 +0100 |
| commit | 9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch) | |
| tree | 2423f82619d48656f8dc60870fab8b152eef4401 /packages/shared/queues.ts | |
| parent | edbd98d7841388d1169a3a3b159367487bda431e (diff) | |
| download | karakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst | |
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'packages/shared/queues.ts')
| -rw-r--r-- | packages/shared/queues.ts | 60 |
1 files changed, 27 insertions, 33 deletions
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index 2b890755..8747fb3f 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -1,14 +1,17 @@ -import { Queue } from "bullmq"; +import path from "node:path"; import { z } from "zod"; +import { buildDBClient, migrateDB, SqliteQueue } from "@hoarder/queue"; + import serverConfig from "./config"; -export const queueConnectionDetails = { - host: serverConfig.bullMQ.redisHost, - port: serverConfig.bullMQ.redisPort, - db: serverConfig.bullMQ.redisDBIdx, - password: serverConfig.bullMQ.redisPassword, -}; +const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db"); + +const queueDB = buildDBClient(QUEUE_DB_PATH); + +export function runQueueDBMigrations() { + migrateDB(queueDB); +} // Link Crawler export const zCrawlLinkRequestSchema = z.object({ @@ -17,16 +20,12 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.infer<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = new Queue<ZCrawlLinkRequest, void>( +export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>( "link_crawler_queue", + queueDB, { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 5, - backoff: { - type: "exponential", - delay: 1000, - }, + defaultJobArgs: { + numRetries: 5, }, }, ); @@ -37,16 +36,15 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = new Queue<ZOpenAIRequest, void>("openai_queue", { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 3, - backoff: { - type: "exponential", - delay: 500, +export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>( + "openai_queue", + queueDB, + { + defaultJobArgs: { + numRetries: 3, }, }, -}); +); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -56,29 +54,25 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = new Queue<ZSearchIndexingRequest, void>( +export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>( "searching_indexing", + queueDB, { - connection: queueConnectionDetails, - defaultJobOptions: { - attempts: 5, - backoff: { - type: "exponential", - delay: 1000, - }, + defaultJobArgs: { + numRetries: 5, }, }, ); export function triggerSearchReindex(bookmarkId: string) { - SearchIndexingQueue.add("search_indexing", { + SearchIndexingQueue.enqueue({ bookmarkId, type: "index", }); } export function triggerSearchDeletion(bookmarkId: string) { - SearchIndexingQueue.add("search_indexing", { + SearchIndexingQueue.enqueue({ bookmarkId: bookmarkId, type: "delete", }); |
