From 6821257def80ffa655b6feb893dd74ca2a13b9f1 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Thu, 27 Nov 2025 16:02:18 +0000 Subject: fix: Propagate group ids in queue calls (#2177) * fix: Propagate group ids * fix tests --- apps/workers/workers/assetPreprocessingWorker.ts | 1 + apps/workers/workers/crawlerWorker.ts | 17 +++++++++++++---- apps/workers/workers/feedWorker.ts | 2 ++ apps/workers/workers/inference/summarize.ts | 1 + apps/workers/workers/inference/tagging.ts | 1 + 5 files changed, 18 insertions(+), 4 deletions(-) (limited to 'apps') diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 42c0ff01..ff16906d 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -361,6 +361,7 @@ async function run(req: DequeuedJob) { // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: req.priority, + groupId: bookmark.userId, }; if (!isFixMode || anythingChanged) { await OpenAIQueue.enqueue( diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 0ab3a3cc..44f8f0c3 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -986,10 +986,15 @@ async function handleAsAssetBookmark( .where(eq(bookmarks.id, bookmarkId)); await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); }); - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: false, - }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId, + fixMode: false, + }, + { + groupId: userId, + }, + ); } type StoreHtmlResult = @@ -1286,6 +1291,7 @@ async function checkDomainRateLimit( url: string, jobId: string, jobData: ZCrawlLinkRequest, + userId: string, jobPriority?: number, ): Promise { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; @@ -1319,6 +1325,7 @@ async function checkDomainRateLimit( await LinkCrawlerQueue.enqueue(jobData, { priority: jobPriority, delayMs, + groupId: userId, }); return false; } @@ -1354,6 +1361,7 @@ async function runCrawler( url, jobId, job.data, + userId, job.priority, ); @@ -1411,6 +1419,7 @@ async function runCrawler( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: userId, }; // Enqueue openai job (if not set, assume it's true for backward compatibility) diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 57358880..3382e980 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -22,6 +22,7 @@ export const FeedRefreshingWorker = cron.schedule( .findMany({ columns: { id: true, + userId: true, }, where: eq(rssFeedsTable.enabled, true), }) @@ -38,6 +39,7 @@ export const FeedRefreshingWorker = cron.schedule( }, { idempotencyKey, + groupId: feed.userId, }, ); } diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 0b25470e..0fa24869 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -137,5 +137,6 @@ URL: ${link.url ?? ""} await triggerSearchReindex(bookmarkId, { priority: job.priority, + groupId: bookmarkData.userId, }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 789a30b4..50a03b00 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -460,6 +460,7 @@ export async function runTagging( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: bookmark.userId, }; // Trigger a webhook -- cgit v1.2.3-70-g09d2