aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/searchWorker.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers/searchWorker.ts')
-rw-r--r--apps/workers/searchWorker.ts39
1 files changed, 20 insertions, 19 deletions
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts
index 75c057c6..56c2aac4 100644
--- a/apps/workers/searchWorker.ts
+++ b/apps/workers/searchWorker.ts
@@ -1,13 +1,11 @@
-import type { Job } from "bullmq";
-import { Worker } from "bullmq";
import { eq } from "drizzle-orm";
import type { ZSearchIndexingRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import logger from "@hoarder/shared/logger";
import {
- queueConnectionDetails,
SearchIndexingQueue,
zSearchIndexingRequestSchema,
} from "@hoarder/shared/queues";
@@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search";
export class SearchIndexingWorker {
static build() {
logger.info("Starting search indexing worker ...");
- const worker = new Worker<ZSearchIndexingRequest, void>(
- SearchIndexingQueue.name,
- runSearchIndexing,
+ const worker = new Runner<ZSearchIndexingRequest>(
+ SearchIndexingQueue,
{
- connection: queueConnectionDetails,
- autorun: false,
+ run: runSearchIndexing,
+ onComplete: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[search][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(`[search][${jobId}] search job failed: ${job.error}`);
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[search][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[search][${jobId}] search job failed: ${error}`);
- });
-
return worker;
}
}
@@ -112,7 +113,7 @@ async function runDelete(
await ensureTaskSuccess(searchClient, task.taskUid);
}
-async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
+async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
const jobId = job.id ?? "unknown";
const request = zSearchIndexingRequestSchema.safeParse(job.data);