aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/queues.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-07-21 19:18:58 +0100
committerGitHub <noreply@github.com>2024-07-21 19:18:58 +0100
commit9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch)
tree2423f82619d48656f8dc60870fab8b152eef4401 /packages/shared/queues.ts
parentedbd98d7841388d1169a3a3b159367487bda431e (diff)
downloadkarakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'packages/shared/queues.ts')
-rw-r--r--packages/shared/queues.ts60
1 files changed, 27 insertions, 33 deletions
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index 2b890755..8747fb3f 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -1,14 +1,17 @@
-import { Queue } from "bullmq";
+import path from "node:path";
import { z } from "zod";
+import { buildDBClient, migrateDB, SqliteQueue } from "@hoarder/queue";
+
import serverConfig from "./config";
-export const queueConnectionDetails = {
- host: serverConfig.bullMQ.redisHost,
- port: serverConfig.bullMQ.redisPort,
- db: serverConfig.bullMQ.redisDBIdx,
- password: serverConfig.bullMQ.redisPassword,
-};
+const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db");
+
+const queueDB = buildDBClient(QUEUE_DB_PATH);
+
+export function runQueueDBMigrations() {
+ migrateDB(queueDB);
+}
// Link Crawler
export const zCrawlLinkRequestSchema = z.object({
@@ -17,16 +20,12 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.infer<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = new Queue<ZCrawlLinkRequest, void>(
+export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
+ queueDB,
{
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 5,
- backoff: {
- type: "exponential",
- delay: 1000,
- },
+ defaultJobArgs: {
+ numRetries: 5,
},
},
);
@@ -37,16 +36,15 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = new Queue<ZOpenAIRequest, void>("openai_queue", {
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 3,
- backoff: {
- type: "exponential",
- delay: 500,
+export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>(
+ "openai_queue",
+ queueDB,
+ {
+ defaultJobArgs: {
+ numRetries: 3,
},
},
-});
+);
// Search Indexing Worker
export const zSearchIndexingRequestSchema = z.object({
@@ -56,29 +54,25 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue = new Queue<ZSearchIndexingRequest, void>(
+export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
"searching_indexing",
+ queueDB,
{
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 5,
- backoff: {
- type: "exponential",
- delay: 1000,
- },
+ defaultJobArgs: {
+ numRetries: 5,
},
},
);
export function triggerSearchReindex(bookmarkId: string) {
- SearchIndexingQueue.add("search_indexing", {
+ SearchIndexingQueue.enqueue({
bookmarkId,
type: "index",
});
}
export function triggerSearchDeletion(bookmarkId: string) {
- SearchIndexingQueue.add("search_indexing", {
+ SearchIndexingQueue.enqueue({
bookmarkId: bookmarkId,
type: "delete",
});