diff options
| author | Mohamed Bassem <me@mbassem.com> | 2024-07-21 19:18:58 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-21 19:18:58 +0100 |
| commit | 9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch) | |
| tree | 2423f82619d48656f8dc60870fab8b152eef4401 /apps/workers/searchWorker.ts | |
| parent | edbd98d7841388d1169a3a3b159367487bda431e (diff) | |
| download | karakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst | |
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'apps/workers/searchWorker.ts')
| -rw-r--r-- | apps/workers/searchWorker.ts | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts index 75c057c6..56c2aac4 100644 --- a/apps/workers/searchWorker.ts +++ b/apps/workers/searchWorker.ts @@ -1,13 +1,11 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { eq } from "drizzle-orm"; import type { ZSearchIndexingRequest } from "@hoarder/shared/queues"; import { db } from "@hoarder/db"; import { bookmarks } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import logger from "@hoarder/shared/logger"; import { - queueConnectionDetails, SearchIndexingQueue, zSearchIndexingRequestSchema, } from "@hoarder/shared/queues"; @@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search"; export class SearchIndexingWorker { static build() { logger.info("Starting search indexing worker ..."); - const worker = new Worker<ZSearchIndexingRequest, void>( - SearchIndexingQueue.name, - runSearchIndexing, + const worker = new Runner<ZSearchIndexingRequest>( + SearchIndexingQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runSearchIndexing, + onComplete: (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[search][${jobId}] search job failed: ${job.error}`); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[search][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[search][${jobId}] search job failed: ${error}`); - }); - return worker; } } @@ -112,7 +113,7 @@ async function runDelete( await ensureTaskSuccess(searchClient, task.taskUid); } -async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) { +async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { const jobId = job.id ?? "unknown"; const request = zSearchIndexingRequestSchema.safeParse(job.data); |
