aboutsummaryrefslogtreecommitdiffstats
path: root/packages/workers/index.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/workers/index.ts')
-rw-r--r--packages/workers/index.ts58
1 files changed, 58 insertions, 0 deletions
diff --git a/packages/workers/index.ts b/packages/workers/index.ts
new file mode 100644
index 00000000..bf092953
--- /dev/null
+++ b/packages/workers/index.ts
@@ -0,0 +1,58 @@
+import { Worker } from "bullmq";
+
+import {
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ ZCrawlLinkRequest,
+ ZOpenAIRequest,
+ queueConnectionDetails,
+} from "@remember/shared/queues";
+import logger from "@remember/shared/logger";
+import runCrawler from "./crawler";
+import runOpenAI from "./openai";
+
+function crawlerWorker() {
+ logger.info("Starting crawler worker ...");
+ const worker = new Worker<ZCrawlLinkRequest, void>(
+ LinkCrawlerQueue.name,
+ runCrawler,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
+ });
+
+ return worker;
+}
+
+function openaiWorker() {
+ logger.info("Starting openai worker ...");
+ const worker = new Worker<ZOpenAIRequest, void>(OpenAIQueue.name, runOpenAI, {
+ connection: queueConnectionDetails,
+ autorun: false,
+ });
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[openai][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[openai][${jobId}] openai job failed: ${error}`);
+ });
+
+ return worker;
+}
+
+await Promise.all([crawlerWorker().run(), openaiWorker().run()]);