diff options
| author | MohamedBassem <me@mbassem.com> | 2025-08-22 20:09:52 +0300 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2025-08-22 21:20:37 +0300 |
| commit | 52d018c872d0db30c4d54d89fefa8543ee9ff93e (patch) | |
| tree | b9301240e40e128074c52bad5e51a23e354c5805 /apps/workers | |
| parent | 9d6b0ef2df7757b3fed99c39cb6d92e4ff1b14df (diff) | |
| download | karakeep-52d018c872d0db30c4d54d89fefa8543ee9ff93e.tar.zst | |
feat: Export prometheus metrics from the workers
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/exit.ts | 4 | ||||
| -rw-r--r-- | apps/workers/index.ts | 7 | ||||
| -rw-r--r-- | apps/workers/metrics.ts | 17 | ||||
| -rw-r--r-- | apps/workers/package.json | 4 | ||||
| -rw-r--r-- | apps/workers/server.ts | 51 | ||||
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 9 | ||||
| -rw-r--r-- | apps/workers/workers/feedWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/inference/inferenceWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/ruleEngineWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/searchWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/tidyAssetsWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/videoWorker.ts | 3 | ||||
| -rw-r--r-- | apps/workers/workers/webhookWorker.ts | 3 |
14 files changed, 111 insertions, 5 deletions
diff --git a/apps/workers/exit.ts b/apps/workers/exit.ts index cd50a323..d4bc84f1 100644 --- a/apps/workers/exit.ts +++ b/apps/workers/exit.ts @@ -1,11 +1,11 @@ import logger from "@karakeep/shared/logger"; -export let isShuttingDown = false; +export const exitAbortController = new AbortController(); export const shutdownPromise = new Promise((resolve) => { process.on("SIGTERM", () => { logger.info("Received SIGTERM, shutting down ..."); - isShuttingDown = true; + exitAbortController.abort(); resolve(""); }); }); diff --git a/apps/workers/index.ts b/apps/workers/index.ts index a21b9c2d..f34e4722 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,5 +1,7 @@ import "dotenv/config"; +import { buildServer } from "server"; + import { loadAllPlugins } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; @@ -31,6 +33,7 @@ async function main() { assetPreprocessing, webhook, ruleEngine, + httpServer, ] = [ await CrawlerWorker.build(), OpenAiWorker.build(), @@ -41,6 +44,7 @@ async function main() { AssetPreprocessingWorker.build(), WebhookWorker.build(), RuleEngineWorker.build(), + buildServer(), ]; FeedRefreshingWorker.start(); @@ -55,6 +59,7 @@ async function main() { assetPreprocessing.run(), webhook.run(), ruleEngine.run(), + httpServer.serve(), ]), shutdownPromise, ]); @@ -72,6 +77,8 @@ async function main() { assetPreprocessing.stop(); webhook.stop(); ruleEngine.stop(); + await httpServer.stop(); + process.exit(0); } main(); diff --git a/apps/workers/metrics.ts b/apps/workers/metrics.ts new file mode 100644 index 00000000..04eec1fb --- /dev/null +++ b/apps/workers/metrics.ts @@ -0,0 +1,17 @@ +import { prometheus } from "@hono/prometheus"; +import { Counter, Registry } from "prom-client"; + +const registry = new Registry(); + +export const { printMetrics } = prometheus({ + registry: registry, + prefix: "karakeep_", +}); + +export const workerStatsCounter = new Counter({ + name: "karakeep_worker_stats", + help: "Stats for each worker", + labelNames: ["worker_name", "status"], +}); + +registry.registerMetric(workerStatsCounter); diff --git a/apps/workers/package.json b/apps/workers/package.json index a771c710..2c0b9a77 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -6,6 +6,8 @@ "type": "module", "dependencies": { "@ghostery/adblocker-playwright": "^2.5.1", + "@hono/node-server": "^1.19.0", + "@hono/prometheus": "^1.0.2", "@karakeep/db": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0", "@karakeep/shared-server": "workspace:^0.1.0", @@ -18,6 +20,7 @@ "dotenv": "^16.4.1", "drizzle-orm": "^0.44.2", "execa": "9.3.1", + "hono": "^4.7.10", "http-proxy-agent": "^7.0.2", "https-proxy-agent": "^7.0.6", "jsdom": "^24.0.0", @@ -42,6 +45,7 @@ "pdfjs-dist": "^4.2.67", "playwright": "^1.42.1", "playwright-extra": "^4.3.6", + "prom-client": "^15.1.3", "puppeteer-extra-plugin-stealth": "^2.11.2", "rss-parser": "^3.13.0", "tesseract.js": "^5.1.1", diff --git a/apps/workers/server.ts b/apps/workers/server.ts new file mode 100644 index 00000000..f1b8a11d --- /dev/null +++ b/apps/workers/server.ts @@ -0,0 +1,51 @@ +import { serve } from "@hono/node-server"; +import { Hono } from "hono"; +import { bearerAuth } from "hono/bearer-auth"; + +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; + +import { printMetrics } from "./metrics"; + +const app = new Hono() + .get("/health", (c) => + c.json({ status: "ok", timestamp: new Date().toISOString() }), + ) + .get( + "/metrics", + bearerAuth({ token: serverConfig.prometheus.metricsToken }), + printMetrics, + ); + +export function buildServer() { + const server = serve( + { + fetch: app.fetch, + port: serverConfig.workers.port, + hostname: serverConfig.workers.host, + }, + (info) => { + logger.info(`Listening on http://${info.address}:${info.port}`); + }, + ); + return { + _server: server, + stop: () => + new Promise<void>((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }), + serve: () => + new Promise<void>((resolve, reject) => { + server.on("error", reject); + server.on("close", () => resolve()); + }), + }; +} + +export default app; diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index c3ecd1e0..73cf8bb5 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,6 +1,7 @@ import os from "os"; import { eq } from "drizzle-orm"; import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; @@ -34,11 +35,13 @@ export class AssetPreprocessingWorker { { run: run, onComplete: async (job) => { + workerStatsCounter.labels("assetPreprocessing", "completed").inc(); const jobId = job.id; logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: async (job) => { + workerStatsCounter.labels("assetPreProcessing", "failed").inc(); const jobId = job.id; logger.error( `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index f22a68d6..e032ab16 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -8,7 +8,7 @@ import { Mutex } from "async-mutex"; import DOMPurify from "dompurify"; import { eq } from "drizzle-orm"; import { execa } from "execa"; -import { isShuttingDown } from "exit"; +import { exitAbortController } from "exit"; import { HttpProxyAgent } from "http-proxy-agent"; import { HttpsProxyAgent } from "https-proxy-agent"; import { JSDOM, VirtualConsole } from "jsdom"; @@ -24,6 +24,7 @@ import metascraperPublisher from "metascraper-publisher"; import metascraperTitle from "metascraper-title"; import metascraperTwitter from "metascraper-twitter"; import metascraperUrl from "metascraper-url"; +import { workerStatsCounter } from "metrics"; import fetch from "node-fetch"; import { Browser, BrowserContextOptions } from "playwright"; import { chromium } from "playwright-extra"; @@ -203,7 +204,7 @@ async function launchBrowser() { logger.error( `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${globalBrowserResult.error.stack}`, ); - if (isShuttingDown) { + if (exitAbortController.signal.aborted) { logger.info("[Crawler] We're shutting down so won't retry."); return; } @@ -214,7 +215,7 @@ async function launchBrowser() { } globalBrowser = globalBrowserResult.data; globalBrowser?.on("disconnected", () => { - if (isShuttingDown) { + if (exitAbortController.signal.aborted) { logger.info( "[Crawler] The Playwright browser got disconnected. But we're shutting down so won't restart it.", ); @@ -265,6 +266,7 @@ export class CrawlerWorker { /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, ), onComplete: async (job) => { + workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); const bookmarkId = job.data.bookmarkId; @@ -273,6 +275,7 @@ export class CrawlerWorker { } }, onError: async (job) => { + workerStatsCounter.labels("crawler", "failed").inc(); const jobId = job.id; logger.error( `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 682889f0..62106fc8 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -1,5 +1,6 @@ import { and, eq, inArray } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import cron from "node-cron"; import Parser from "rss-parser"; import { buildImpersonatingTRPCClient } from "trpc"; @@ -50,6 +51,7 @@ export class FeedWorker { { run: run, onComplete: async (job) => { + workerStatsCounter.labels("feed", "completed").inc(); const jobId = job.id; logger.info(`[feed][${jobId}] Completed successfully`); await db @@ -58,6 +60,7 @@ export class FeedWorker { .where(eq(rssFeedsTable.id, job.data?.feedId)); }, onError: async (job) => { + workerStatsCounter.labels("feed", "failed").inc(); const jobId = job.id; logger.error( `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index 0dba6f58..32de3806 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import type { ZOpenAIRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; @@ -43,11 +44,13 @@ export class OpenAiWorker { { run: runOpenAI, onComplete: async (job) => { + workerStatsCounter.labels("inference", "completed").inc(); const jobId = job.id; logger.info(`[inference][${jobId}] Completed successfully`); await attemptMarkStatus(job.data, "success"); }, onError: async (job) => { + workerStatsCounter.labels("inference", "failed").inc(); const jobId = job.id; logger.error( `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 39f0a523..2a4fbb1a 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import { buildImpersonatingAuthedContext } from "trpc"; import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; @@ -21,11 +22,13 @@ export class RuleEngineWorker { { run: runRuleEngine, onComplete: (job) => { + workerStatsCounter.labels("ruleEngine", "completed").inc(); const jobId = job.id; logger.info(`[ruleEngine][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: (job) => { + workerStatsCounter.labels("ruleEngine", "failed").inc(); const jobId = job.id; logger.error( `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index 4c924ceb..7dd25ee8 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; @@ -25,11 +26,13 @@ export class SearchIndexingWorker { { run: runSearchIndexing, onComplete: (job) => { + workerStatsCounter.labels("search", "completed").inc(); const jobId = job.id; logger.info(`[search][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: (job) => { + workerStatsCounter.labels("search", "failed").inc(); const jobId = job.id; logger.error( `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts index d4c8abdb..cf3e33b6 100644 --- a/apps/workers/workers/tidyAssetsWorker.ts +++ b/apps/workers/workers/tidyAssetsWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import { db } from "@karakeep/db"; import { assets } from "@karakeep/db/schema"; @@ -19,11 +20,13 @@ export class TidyAssetsWorker { { run: runTidyAssets, onComplete: (job) => { + workerStatsCounter.labels("tidyAssets", "completed").inc(); const jobId = job.id; logger.info(`[tidyAssets][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: (job) => { + workerStatsCounter.labels("tidyAssets", "failed").inc(); const jobId = job.id; logger.error( `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index d25c1948..ca46fcee 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -3,6 +3,7 @@ import * as os from "os"; import path from "path";
import { execa } from "execa";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
@@ -41,6 +42,7 @@ export class VideoWorker { /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
onComplete: async (job) => {
+ workerStatsCounter.labels("video", "completed").inc();
const jobId = job.id;
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
@@ -48,6 +50,7 @@ export class VideoWorker { return Promise.resolve();
},
onError: async (job) => {
+ workerStatsCounter.labels("video", "failed").inc();
const jobId = job.id;
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`,
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 504f7f9b..96070e22 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -1,5 +1,6 @@ import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; +import { workerStatsCounter } from "metrics"; import fetch from "node-fetch"; import { db } from "@karakeep/db"; @@ -20,11 +21,13 @@ export class WebhookWorker { { run: runWebhook, onComplete: async (job) => { + workerStatsCounter.labels("webhook", "completed").inc(); const jobId = job.id; logger.info(`[webhook][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: async (job) => { + workerStatsCounter.labels("webhook", "failed").inc(); const jobId = job.id; logger.error( `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, |
