From 9edd154440c18bcc4542560e229eb293f9e0c2d4 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 21 Jul 2024 19:18:58 +0100 Subject: refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309) --- apps/workers/openaiWorker.ts | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) (limited to 'apps/workers/openaiWorker.ts') diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 776d6828..9e6e2f23 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -1,5 +1,3 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { and, Column, eq, inArray, sql } from "drizzle-orm"; import { z } from "zod"; @@ -11,12 +9,12 @@ import { bookmarkTags, tagsOnBookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { readAsset } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; import { OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zOpenAIRequestSchema, } from "@hoarder/shared/queues"; @@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus( export class OpenAiWorker { static build() { logger.info("Starting inference worker ..."); - const worker = new Worker( - OpenAIQueue.name, - runOpenAI, + const worker = new Runner( + OpenAIQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runOpenAI, + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkTaggingStatus(job?.data, "success"); + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}`, + ); + await attemptMarkTaggingStatus(job?.data, "failure"); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[inference][${jobId}] Completed successfully`); - attemptMarkTaggingStatus(job?.data, "success"); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[inference][${jobId}] inference job failed: ${error}`); - attemptMarkTaggingStatus(job?.data, "failure"); - }); - return worker; } } @@ -361,7 +362,7 @@ async function connectTags( }); } -async function runOpenAI(job: Job) { +async function runOpenAI(job: DequeuedJob) { const jobId = job.id ?? "unknown"; const inferenceClient = InferenceClientFactory.build(); -- cgit v1.2.3-70-g09d2