aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-07-21 19:18:58 +0100
committerGitHub <noreply@github.com>2024-07-21 19:18:58 +0100
commit9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch)
tree2423f82619d48656f8dc60870fab8b152eef4401 /packages/shared
parentedbd98d7841388d1169a3a3b159367487bda431e (diff)
downloadkarakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'packages/shared')
-rw-r--r--packages/shared/config.ts10
-rw-r--r--packages/shared/package.json2
-rw-r--r--packages/shared/queues.ts60
3 files changed, 28 insertions, 44 deletions
diff --git a/packages/shared/config.ts b/packages/shared/config.ts
index 2c739a0c..28bfdbdb 100644
--- a/packages/shared/config.ts
+++ b/packages/shared/config.ts
@@ -15,10 +15,6 @@ const allEnv = z.object({
OLLAMA_BASE_URL: z.string().url().optional(),
INFERENCE_TEXT_MODEL: z.string().default("gpt-3.5-turbo-0125"),
INFERENCE_IMAGE_MODEL: z.string().default("gpt-4o-2024-05-13"),
- REDIS_HOST: z.string().default("localhost"),
- REDIS_PORT: z.coerce.number().default(6379),
- REDIS_DB_IDX: z.coerce.number().optional(),
- REDIS_PASSWORD: z.string().optional(),
CRAWLER_HEADLESS_BROWSER: stringBool("true"),
BROWSER_WEB_URL: z.string().url().optional(),
BROWSER_WEBSOCKET_URL: z.string().url().optional(),
@@ -58,12 +54,6 @@ const serverConfigSchema = allEnv.transform((val) => {
imageModel: val.INFERENCE_IMAGE_MODEL,
inferredTagLang: val.INFERENCE_LANG,
},
- bullMQ: {
- redisHost: val.REDIS_HOST,
- redisPort: val.REDIS_PORT,
- redisDBIdx: val.REDIS_DB_IDX,
- redisPassword: val.REDIS_PASSWORD,
- },
crawler: {
numWorkers: val.CRAWLER_NUM_WORKERS,
headlessBrowser: val.CRAWLER_HEADLESS_BROWSER,
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 032f3db5..2b1ae973 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -5,7 +5,7 @@
"private": true,
"type": "module",
"dependencies": {
- "bullmq": "^5.1.9",
+ "@hoarder/queue": "workspace:^0.1.0",
"meilisearch": "^0.37.0",
"winston": "^3.11.0",
"zod": "^3.22.4"
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index 2b890755..8747fb3f 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -1,14 +1,17 @@
-import { Queue } from "bullmq";
+import path from "node:path";
import { z } from "zod";
+import { buildDBClient, migrateDB, SqliteQueue } from "@hoarder/queue";
+
import serverConfig from "./config";
-export const queueConnectionDetails = {
- host: serverConfig.bullMQ.redisHost,
- port: serverConfig.bullMQ.redisPort,
- db: serverConfig.bullMQ.redisDBIdx,
- password: serverConfig.bullMQ.redisPassword,
-};
+const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db");
+
+const queueDB = buildDBClient(QUEUE_DB_PATH);
+
+export function runQueueDBMigrations() {
+ migrateDB(queueDB);
+}
// Link Crawler
export const zCrawlLinkRequestSchema = z.object({
@@ -17,16 +20,12 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.infer<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = new Queue<ZCrawlLinkRequest, void>(
+export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
+ queueDB,
{
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 5,
- backoff: {
- type: "exponential",
- delay: 1000,
- },
+ defaultJobArgs: {
+ numRetries: 5,
},
},
);
@@ -37,16 +36,15 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = new Queue<ZOpenAIRequest, void>("openai_queue", {
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 3,
- backoff: {
- type: "exponential",
- delay: 500,
+export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>(
+ "openai_queue",
+ queueDB,
+ {
+ defaultJobArgs: {
+ numRetries: 3,
},
},
-});
+);
// Search Indexing Worker
export const zSearchIndexingRequestSchema = z.object({
@@ -56,29 +54,25 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue = new Queue<ZSearchIndexingRequest, void>(
+export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
"searching_indexing",
+ queueDB,
{
- connection: queueConnectionDetails,
- defaultJobOptions: {
- attempts: 5,
- backoff: {
- type: "exponential",
- delay: 1000,
- },
+ defaultJobArgs: {
+ numRetries: 5,
},
},
);
export function triggerSearchReindex(bookmarkId: string) {
- SearchIndexingQueue.add("search_indexing", {
+ SearchIndexingQueue.enqueue({
bookmarkId,
type: "index",
});
}
export function triggerSearchDeletion(bookmarkId: string) {
- SearchIndexingQueue.add("search_indexing", {
+ SearchIndexingQueue.enqueue({
bookmarkId: bookmarkId,
type: "delete",
});