aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/workers/workers/importWorker.ts23
1 files changed, 16 insertions, 7 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
index 654de9e5..2acbdcc1 100644
--- a/apps/workers/workers/importWorker.ts
+++ b/apps/workers/workers/importWorker.ts
@@ -27,39 +27,39 @@ import { registry } from "../metrics";
// Prometheus metrics
const importStagingProcessedCounter = new Counter({
- name: "import_staging_processed_total",
+ name: "karakeep_import_staging_processed_total",
help: "Total number of staged items processed",
labelNames: ["result"],
registers: [registry],
});
const importStagingStaleResetCounter = new Counter({
- name: "import_staging_stale_reset_total",
+ name: "karakeep_import_staging_stale_reset_total",
help: "Total number of stale processing items reset to pending",
registers: [registry],
});
const importStagingInFlightGauge = new Gauge({
- name: "import_staging_in_flight",
+ name: "karakeep_import_staging_in_flight",
help: "Current number of in-flight items (processing + recently completed)",
registers: [registry],
});
const importSessionsGauge = new Gauge({
- name: "import_sessions_active",
+ name: "karakeep_import_sessions_active",
help: "Number of active import sessions by status",
labelNames: ["status"],
registers: [registry],
});
const importStagingPendingGauge = new Gauge({
- name: "import_staging_pending_total",
+ name: "karakeep_import_staging_pending_total",
help: "Total number of pending items in staging table",
registers: [registry],
});
const importBatchDurationHistogram = new Histogram({
- name: "import_batch_duration_seconds",
+ name: "karakeep_import_batch_duration_seconds",
help: "Time taken to process a batch of staged items",
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
registers: [registry],
@@ -252,7 +252,16 @@ export class ImportWorker {
const res = await db
.select({ count: count() })
.from(importStagingBookmarks)
- .where(eq(importStagingBookmarks.status, "pending"));
+ .innerJoin(
+ importSessions,
+ eq(importStagingBookmarks.importSessionId, importSessions.id),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importSessions.status, ["pending", "running"]),
+ ),
+ );
return res[0]?.count ?? 0;
}