aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/openaiWorker.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-07-21 19:18:58 +0100
committerGitHub <noreply@github.com>2024-07-21 19:18:58 +0100
commit9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch)
tree2423f82619d48656f8dc60870fab8b152eef4401 /apps/workers/openaiWorker.ts
parentedbd98d7841388d1169a3a3b159367487bda431e (diff)
downloadkarakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'apps/workers/openaiWorker.ts')
-rw-r--r--apps/workers/openaiWorker.ts43
1 files changed, 22 insertions, 21 deletions
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 776d6828..9e6e2f23 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -1,5 +1,3 @@
-import type { Job } from "bullmq";
-import { Worker } from "bullmq";
import { and, Column, eq, inArray, sql } from "drizzle-orm";
import { z } from "zod";
@@ -11,12 +9,12 @@ import {
bookmarkTags,
tagsOnBookmarks,
} from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import { readAsset } from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
OpenAIQueue,
- queueConnectionDetails,
triggerSearchReindex,
zOpenAIRequestSchema,
} from "@hoarder/shared/queues";
@@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus(
export class OpenAiWorker {
static build() {
logger.info("Starting inference worker ...");
- const worker = new Worker<ZOpenAIRequest, void>(
- OpenAIQueue.name,
- runOpenAI,
+ const worker = new Runner<ZOpenAIRequest>(
+ OpenAIQueue,
{
- connection: queueConnectionDetails,
- autorun: false,
+ run: runOpenAI,
+ onComplete: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[inference][${jobId}] Completed successfully`);
+ await attemptMarkTaggingStatus(job?.data, "success");
+ },
+ onError: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(
+ `[inference][${jobId}] inference job failed: ${job.error}`,
+ );
+ await attemptMarkTaggingStatus(job?.data, "failure");
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[inference][${jobId}] Completed successfully`);
- attemptMarkTaggingStatus(job?.data, "success");
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[inference][${jobId}] inference job failed: ${error}`);
- attemptMarkTaggingStatus(job?.data, "failure");
- });
-
return worker;
}
}
@@ -361,7 +362,7 @@ async function connectTags(
});
}
-async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
+async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
const jobId = job.id ?? "unknown";
const inferenceClient = InferenceClientFactory.build();