diff options
| author | MohamedBassem <me@mbassem.com> | 2026-02-04 05:40:19 -0800 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2026-02-04 05:40:19 -0800 |
| commit | d9329e89adc6ca579a299d42d115c850fc9305dd (patch) | |
| tree | 3b46f35c750b8f0038d7ec30f4b68be7d4001493 | |
| parent | e48bae7ce75437e7a10940b21bd8fc0c30d3914d (diff) | |
| download | karakeep-d9329e89adc6ca579a299d42d115c850fc9305dd.tar.zst | |
fix(import): skip counting pending items for paushed sessions
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index 654de9e5..2acbdcc1 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -27,39 +27,39 @@ import { registry } from "../metrics"; // Prometheus metrics const importStagingProcessedCounter = new Counter({ - name: "import_staging_processed_total", + name: "karakeep_import_staging_processed_total", help: "Total number of staged items processed", labelNames: ["result"], registers: [registry], }); const importStagingStaleResetCounter = new Counter({ - name: "import_staging_stale_reset_total", + name: "karakeep_import_staging_stale_reset_total", help: "Total number of stale processing items reset to pending", registers: [registry], }); const importStagingInFlightGauge = new Gauge({ - name: "import_staging_in_flight", + name: "karakeep_import_staging_in_flight", help: "Current number of in-flight items (processing + recently completed)", registers: [registry], }); const importSessionsGauge = new Gauge({ - name: "import_sessions_active", + name: "karakeep_import_sessions_active", help: "Number of active import sessions by status", labelNames: ["status"], registers: [registry], }); const importStagingPendingGauge = new Gauge({ - name: "import_staging_pending_total", + name: "karakeep_import_staging_pending_total", help: "Total number of pending items in staging table", registers: [registry], }); const importBatchDurationHistogram = new Histogram({ - name: "import_batch_duration_seconds", + name: "karakeep_import_batch_duration_seconds", help: "Time taken to process a batch of staged items", buckets: [0.1, 0.5, 1, 2, 5, 10, 30], registers: [registry], @@ -252,7 +252,16 @@ export class ImportWorker { const res = await db .select({ count: count() }) .from(importStagingBookmarks) - .where(eq(importStagingBookmarks.status, "pending")); + .innerJoin( + importSessions, + eq(importStagingBookmarks.importSessionId, importSessions.id), + ) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importSessions.status, ["pending", "running"]), + ), + ); return res[0]?.count ?? 0; } |
