aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2025-08-22 20:09:52 +0300
committerMohamedBassem <me@mbassem.com>2025-08-22 21:20:37 +0300
commit52d018c872d0db30c4d54d89fefa8543ee9ff93e (patch)
treeb9301240e40e128074c52bad5e51a23e354c5805 /apps/workers
parent9d6b0ef2df7757b3fed99c39cb6d92e4ff1b14df (diff)
downloadkarakeep-52d018c872d0db30c4d54d89fefa8543ee9ff93e.tar.zst
feat: Export prometheus metrics from the workers
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/exit.ts4
-rw-r--r--apps/workers/index.ts7
-rw-r--r--apps/workers/metrics.ts17
-rw-r--r--apps/workers/package.json4
-rw-r--r--apps/workers/server.ts51
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts3
-rw-r--r--apps/workers/workers/crawlerWorker.ts9
-rw-r--r--apps/workers/workers/feedWorker.ts3
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts3
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts3
-rw-r--r--apps/workers/workers/searchWorker.ts3
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts3
-rw-r--r--apps/workers/workers/videoWorker.ts3
-rw-r--r--apps/workers/workers/webhookWorker.ts3
14 files changed, 111 insertions, 5 deletions
diff --git a/apps/workers/exit.ts b/apps/workers/exit.ts
index cd50a323..d4bc84f1 100644
--- a/apps/workers/exit.ts
+++ b/apps/workers/exit.ts
@@ -1,11 +1,11 @@
import logger from "@karakeep/shared/logger";
-export let isShuttingDown = false;
+export const exitAbortController = new AbortController();
export const shutdownPromise = new Promise((resolve) => {
process.on("SIGTERM", () => {
logger.info("Received SIGTERM, shutting down ...");
- isShuttingDown = true;
+ exitAbortController.abort();
resolve("");
});
});
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index a21b9c2d..f34e4722 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,5 +1,7 @@
import "dotenv/config";
+import { buildServer } from "server";
+
import { loadAllPlugins } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
@@ -31,6 +33,7 @@ async function main() {
assetPreprocessing,
webhook,
ruleEngine,
+ httpServer,
] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
@@ -41,6 +44,7 @@ async function main() {
AssetPreprocessingWorker.build(),
WebhookWorker.build(),
RuleEngineWorker.build(),
+ buildServer(),
];
FeedRefreshingWorker.start();
@@ -55,6 +59,7 @@ async function main() {
assetPreprocessing.run(),
webhook.run(),
ruleEngine.run(),
+ httpServer.serve(),
]),
shutdownPromise,
]);
@@ -72,6 +77,8 @@ async function main() {
assetPreprocessing.stop();
webhook.stop();
ruleEngine.stop();
+ await httpServer.stop();
+ process.exit(0);
}
main();
diff --git a/apps/workers/metrics.ts b/apps/workers/metrics.ts
new file mode 100644
index 00000000..04eec1fb
--- /dev/null
+++ b/apps/workers/metrics.ts
@@ -0,0 +1,17 @@
+import { prometheus } from "@hono/prometheus";
+import { Counter, Registry } from "prom-client";
+
+const registry = new Registry();
+
+export const { printMetrics } = prometheus({
+ registry: registry,
+ prefix: "karakeep_",
+});
+
+export const workerStatsCounter = new Counter({
+ name: "karakeep_worker_stats",
+ help: "Stats for each worker",
+ labelNames: ["worker_name", "status"],
+});
+
+registry.registerMetric(workerStatsCounter);
diff --git a/apps/workers/package.json b/apps/workers/package.json
index a771c710..2c0b9a77 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -6,6 +6,8 @@
"type": "module",
"dependencies": {
"@ghostery/adblocker-playwright": "^2.5.1",
+ "@hono/node-server": "^1.19.0",
+ "@hono/prometheus": "^1.0.2",
"@karakeep/db": "workspace:^0.1.0",
"@karakeep/shared": "workspace:^0.1.0",
"@karakeep/shared-server": "workspace:^0.1.0",
@@ -18,6 +20,7 @@
"dotenv": "^16.4.1",
"drizzle-orm": "^0.44.2",
"execa": "9.3.1",
+ "hono": "^4.7.10",
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.6",
"jsdom": "^24.0.0",
@@ -42,6 +45,7 @@
"pdfjs-dist": "^4.2.67",
"playwright": "^1.42.1",
"playwright-extra": "^4.3.6",
+ "prom-client": "^15.1.3",
"puppeteer-extra-plugin-stealth": "^2.11.2",
"rss-parser": "^3.13.0",
"tesseract.js": "^5.1.1",
diff --git a/apps/workers/server.ts b/apps/workers/server.ts
new file mode 100644
index 00000000..f1b8a11d
--- /dev/null
+++ b/apps/workers/server.ts
@@ -0,0 +1,51 @@
+import { serve } from "@hono/node-server";
+import { Hono } from "hono";
+import { bearerAuth } from "hono/bearer-auth";
+
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+
+import { printMetrics } from "./metrics";
+
+const app = new Hono()
+ .get("/health", (c) =>
+ c.json({ status: "ok", timestamp: new Date().toISOString() }),
+ )
+ .get(
+ "/metrics",
+ bearerAuth({ token: serverConfig.prometheus.metricsToken }),
+ printMetrics,
+ );
+
+export function buildServer() {
+ const server = serve(
+ {
+ fetch: app.fetch,
+ port: serverConfig.workers.port,
+ hostname: serverConfig.workers.host,
+ },
+ (info) => {
+ logger.info(`Listening on http://${info.address}:${info.port}`);
+ },
+ );
+ return {
+ _server: server,
+ stop: () =>
+ new Promise<void>((resolve, reject) => {
+ server.close((err) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve();
+ }
+ });
+ }),
+ serve: () =>
+ new Promise<void>((resolve, reject) => {
+ server.on("error", reject);
+ server.on("close", () => resolve());
+ }),
+ };
+}
+
+export default app;
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index c3ecd1e0..73cf8bb5 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,6 +1,7 @@
import os from "os";
import { eq } from "drizzle-orm";
import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
@@ -34,11 +35,13 @@ export class AssetPreprocessingWorker {
{
run: run,
onComplete: async (job) => {
+ workerStatsCounter.labels("assetPreprocessing", "completed").inc();
const jobId = job.id;
logger.info(`[assetPreprocessing][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: async (job) => {
+ workerStatsCounter.labels("assetPreProcessing", "failed").inc();
const jobId = job.id;
logger.error(
`[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index f22a68d6..e032ab16 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -8,7 +8,7 @@ import { Mutex } from "async-mutex";
import DOMPurify from "dompurify";
import { eq } from "drizzle-orm";
import { execa } from "execa";
-import { isShuttingDown } from "exit";
+import { exitAbortController } from "exit";
import { HttpProxyAgent } from "http-proxy-agent";
import { HttpsProxyAgent } from "https-proxy-agent";
import { JSDOM, VirtualConsole } from "jsdom";
@@ -24,6 +24,7 @@ import metascraperPublisher from "metascraper-publisher";
import metascraperTitle from "metascraper-title";
import metascraperTwitter from "metascraper-twitter";
import metascraperUrl from "metascraper-url";
+import { workerStatsCounter } from "metrics";
import fetch from "node-fetch";
import { Browser, BrowserContextOptions } from "playwright";
import { chromium } from "playwright-extra";
@@ -203,7 +204,7 @@ async function launchBrowser() {
logger.error(
`[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${globalBrowserResult.error.stack}`,
);
- if (isShuttingDown) {
+ if (exitAbortController.signal.aborted) {
logger.info("[Crawler] We're shutting down so won't retry.");
return;
}
@@ -214,7 +215,7 @@ async function launchBrowser() {
}
globalBrowser = globalBrowserResult.data;
globalBrowser?.on("disconnected", () => {
- if (isShuttingDown) {
+ if (exitAbortController.signal.aborted) {
logger.info(
"[Crawler] The Playwright browser got disconnected. But we're shutting down so won't restart it.",
);
@@ -265,6 +266,7 @@ export class CrawlerWorker {
/* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
),
onComplete: async (job) => {
+ workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
const bookmarkId = job.data.bookmarkId;
@@ -273,6 +275,7 @@ export class CrawlerWorker {
}
},
onError: async (job) => {
+ workerStatsCounter.labels("crawler", "failed").inc();
const jobId = job.id;
logger.error(
`[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 682889f0..62106fc8 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -1,5 +1,6 @@
import { and, eq, inArray } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import cron from "node-cron";
import Parser from "rss-parser";
import { buildImpersonatingTRPCClient } from "trpc";
@@ -50,6 +51,7 @@ export class FeedWorker {
{
run: run,
onComplete: async (job) => {
+ workerStatsCounter.labels("feed", "completed").inc();
const jobId = job.id;
logger.info(`[feed][${jobId}] Completed successfully`);
await db
@@ -58,6 +60,7 @@ export class FeedWorker {
.where(eq(rssFeedsTable.id, job.data?.feedId));
},
onError: async (job) => {
+ workerStatsCounter.labels("feed", "failed").inc();
const jobId = job.id;
logger.error(
`[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
index 0dba6f58..32de3806 100644
--- a/apps/workers/workers/inference/inferenceWorker.ts
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import type { ZOpenAIRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
@@ -43,11 +44,13 @@ export class OpenAiWorker {
{
run: runOpenAI,
onComplete: async (job) => {
+ workerStatsCounter.labels("inference", "completed").inc();
const jobId = job.id;
logger.info(`[inference][${jobId}] Completed successfully`);
await attemptMarkStatus(job.data, "success");
},
onError: async (job) => {
+ workerStatsCounter.labels("inference", "failed").inc();
const jobId = job.id;
logger.error(
`[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 39f0a523..2a4fbb1a 100644
--- a/apps/workers/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import { buildImpersonatingAuthedContext } from "trpc";
import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
@@ -21,11 +22,13 @@ export class RuleEngineWorker {
{
run: runRuleEngine,
onComplete: (job) => {
+ workerStatsCounter.labels("ruleEngine", "completed").inc();
const jobId = job.id;
logger.info(`[ruleEngine][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
+ workerStatsCounter.labels("ruleEngine", "failed").inc();
const jobId = job.id;
logger.error(
`[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index 4c924ceb..7dd25ee8 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import type { ZSearchIndexingRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
@@ -25,11 +26,13 @@ export class SearchIndexingWorker {
{
run: runSearchIndexing,
onComplete: (job) => {
+ workerStatsCounter.labels("search", "completed").inc();
const jobId = job.id;
logger.info(`[search][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
+ workerStatsCounter.labels("search", "failed").inc();
const jobId = job.id;
logger.error(
`[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
index d4c8abdb..cf3e33b6 100644
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ b/apps/workers/workers/tidyAssetsWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { assets } from "@karakeep/db/schema";
@@ -19,11 +20,13 @@ export class TidyAssetsWorker {
{
run: runTidyAssets,
onComplete: (job) => {
+ workerStatsCounter.labels("tidyAssets", "completed").inc();
const jobId = job.id;
logger.info(`[tidyAssets][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
+ workerStatsCounter.labels("tidyAssets", "failed").inc();
const jobId = job.id;
logger.error(
`[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index d25c1948..ca46fcee 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -3,6 +3,7 @@ import * as os from "os";
import path from "path";
import { execa } from "execa";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
@@ -41,6 +42,7 @@ export class VideoWorker {
/* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
onComplete: async (job) => {
+ workerStatsCounter.labels("video", "completed").inc();
const jobId = job.id;
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
@@ -48,6 +50,7 @@ export class VideoWorker {
return Promise.resolve();
},
onError: async (job) => {
+ workerStatsCounter.labels("video", "failed").inc();
const jobId = job.id;
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`,
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 504f7f9b..96070e22 100644
--- a/apps/workers/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts
@@ -1,5 +1,6 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
+import { workerStatsCounter } from "metrics";
import fetch from "node-fetch";
import { db } from "@karakeep/db";
@@ -20,11 +21,13 @@ export class WebhookWorker {
{
run: runWebhook,
onComplete: async (job) => {
+ workerStatsCounter.labels("webhook", "completed").inc();
const jobId = job.id;
logger.info(`[webhook][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: async (job) => {
+ workerStatsCounter.labels("webhook", "failed").inc();
const jobId = job.id;
logger.error(
`[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,