diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-27 16:02:18 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-27 16:02:18 +0000 |
| commit | 6821257def80ffa655b6feb893dd74ca2a13b9f1 (patch) | |
| tree | 34d02cefca4f9f4b69a07db268f7c1df289aa1fb /apps/workers | |
| parent | 54268759492df88644e4279fdcc600214f922f43 (diff) | |
| download | karakeep-6821257def80ffa655b6feb893dd74ca2a13b9f1.tar.zst | |
fix: Propagate group ids in queue calls (#2177)
* fix: Propagate group ids
* fix tests
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 1 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 17 | ||||
| -rw-r--r-- | apps/workers/workers/feedWorker.ts | 2 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 1 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 1 |
5 files changed, 18 insertions, 4 deletions
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 42c0ff01..ff16906d 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -361,6 +361,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: req.priority, + groupId: bookmark.userId, }; if (!isFixMode || anythingChanged) { await OpenAIQueue.enqueue( diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 0ab3a3cc..44f8f0c3 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -986,10 +986,15 @@ async function handleAsAssetBookmark( .where(eq(bookmarks.id, bookmarkId)); await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); }); - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: false, - }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId, + fixMode: false, + }, + { + groupId: userId, + }, + ); } type StoreHtmlResult = @@ -1286,6 +1291,7 @@ async function checkDomainRateLimit( url: string, jobId: string, jobData: ZCrawlLinkRequest, + userId: string, jobPriority?: number, ): Promise<boolean> { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; @@ -1319,6 +1325,7 @@ async function checkDomainRateLimit( await LinkCrawlerQueue.enqueue(jobData, { priority: jobPriority, delayMs, + groupId: userId, }); return false; } @@ -1354,6 +1361,7 @@ async function runCrawler( url, jobId, job.data, + userId, job.priority, ); @@ -1411,6 +1419,7 @@ async function runCrawler( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: userId, }; // Enqueue openai job (if not set, assume it's true for backward compatibility) diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 57358880..3382e980 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -22,6 +22,7 @@ export const FeedRefreshingWorker = cron.schedule( .findMany({ columns: { id: true, + userId: true, }, where: eq(rssFeedsTable.enabled, true), }) @@ -38,6 +39,7 @@ export const FeedRefreshingWorker = cron.schedule( }, { idempotencyKey, + groupId: feed.userId, }, ); } diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 0b25470e..0fa24869 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -137,5 +137,6 @@ URL: ${link.url ?? ""} await triggerSearchReindex(bookmarkId, { priority: job.priority, + groupId: bookmarkData.userId, }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 789a30b4..50a03b00 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -460,6 +460,7 @@ export async function runTagging( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: bookmark.userId, }; // Trigger a webhook |
