diff options
Diffstat (limited to 'packages/workers/openai.ts')
| -rw-r--r-- | packages/workers/openai.ts | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts index a2f90c8a..999f2827 100644 --- a/packages/workers/openai.ts +++ b/packages/workers/openai.ts @@ -1,13 +1,41 @@ -import prisma, { BookmarkedLink } from "@remember/db"; +import { prisma, BookmarkedLink } from "@remember/db"; import logger from "@remember/shared/logger"; -import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues"; +import { OpenAIQueue, ZOpenAIRequest, queueConnectionDetails, zOpenAIRequestSchema } from "@remember/shared/queues"; import { Job } from "bullmq"; import OpenAI from "openai"; import { z } from "zod"; +import { Worker } from "bullmq"; const openAIResponseSchema = z.object({ tags: z.array(z.string()), }); + + +export class OpenAiWorker { + static async build() { + logger.info("Starting openai worker ..."); + const worker = new Worker<ZOpenAIRequest, void>( + OpenAIQueue.name, + runOpenAI, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[openai][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[openai][${jobId}] openai job failed: ${error}`); + }); + + return worker; + } +} function buildPrompt(url: string, description: string) { return ` @@ -121,7 +149,7 @@ async function connectTags(bookmarkId: string, tagIds: string[]) { ); } -export default async function runOpenAI(job: Job<ZOpenAIRequest, void>) { +async function runOpenAI(job: Job<ZOpenAIRequest, void>) { const jobId = job.id || "unknown"; if (!process.env.OPENAI_API_KEY || !process.env.OPENAI_ENABLED) { |
