aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-27 16:02:18 +0000
committerGitHub <noreply@github.com>2025-11-27 16:02:18 +0000
commit6821257def80ffa655b6feb893dd74ca2a13b9f1 (patch)
tree34d02cefca4f9f4b69a07db268f7c1df289aa1fb /apps
parent54268759492df88644e4279fdcc600214f922f43 (diff)
downloadkarakeep-6821257def80ffa655b6feb893dd74ca2a13b9f1.tar.zst
fix: Propagate group ids in queue calls (#2177)
* fix: Propagate group ids * fix tests
Diffstat (limited to 'apps')
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts1
-rw-r--r--apps/workers/workers/crawlerWorker.ts17
-rw-r--r--apps/workers/workers/feedWorker.ts2
-rw-r--r--apps/workers/workers/inference/summarize.ts1
-rw-r--r--apps/workers/workers/inference/tagging.ts1
5 files changed, 18 insertions, 4 deletions
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index 42c0ff01..ff16906d 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -361,6 +361,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: req.priority,
+ groupId: bookmark.userId,
};
if (!isFixMode || anythingChanged) {
await OpenAIQueue.enqueue(
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 0ab3a3cc..44f8f0c3 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -986,10 +986,15 @@ async function handleAsAssetBookmark(
.where(eq(bookmarks.id, bookmarkId));
await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
});
- await AssetPreprocessingQueue.enqueue({
- bookmarkId,
- fixMode: false,
- });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId,
+ fixMode: false,
+ },
+ {
+ groupId: userId,
+ },
+ );
}
type StoreHtmlResult =
@@ -1286,6 +1291,7 @@ async function checkDomainRateLimit(
url: string,
jobId: string,
jobData: ZCrawlLinkRequest,
+ userId: string,
jobPriority?: number,
): Promise<boolean> {
const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
@@ -1319,6 +1325,7 @@ async function checkDomainRateLimit(
await LinkCrawlerQueue.enqueue(jobData, {
priority: jobPriority,
delayMs,
+ groupId: userId,
});
return false;
}
@@ -1354,6 +1361,7 @@ async function runCrawler(
url,
jobId,
job.data,
+ userId,
job.priority,
);
@@ -1411,6 +1419,7 @@ async function runCrawler(
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: job.priority,
+ groupId: userId,
};
// Enqueue openai job (if not set, assume it's true for backward compatibility)
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 57358880..3382e980 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -22,6 +22,7 @@ export const FeedRefreshingWorker = cron.schedule(
.findMany({
columns: {
id: true,
+ userId: true,
},
where: eq(rssFeedsTable.enabled, true),
})
@@ -38,6 +39,7 @@ export const FeedRefreshingWorker = cron.schedule(
},
{
idempotencyKey,
+ groupId: feed.userId,
},
);
}
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 0b25470e..0fa24869 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -137,5 +137,6 @@ URL: ${link.url ?? ""}
await triggerSearchReindex(bookmarkId, {
priority: job.priority,
+ groupId: bookmarkData.userId,
});
}
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 789a30b4..50a03b00 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -460,6 +460,7 @@ export async function runTagging(
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: job.priority,
+ groupId: bookmark.userId,
};
// Trigger a webhook