aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-27 16:02:18 +0000
committerGitHub <noreply@github.com>2025-11-27 16:02:18 +0000
commit6821257def80ffa655b6feb893dd74ca2a13b9f1 (patch)
tree34d02cefca4f9f4b69a07db268f7c1df289aa1fb
parent54268759492df88644e4279fdcc600214f922f43 (diff)
downloadkarakeep-6821257def80ffa655b6feb893dd74ca2a13b9f1.tar.zst
fix: Propagate group ids in queue calls (#2177)
* fix: Propagate group ids * fix tests
Diffstat (limited to '')
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts1
-rw-r--r--apps/workers/workers/crawlerWorker.ts17
-rw-r--r--apps/workers/workers/feedWorker.ts2
-rw-r--r--apps/workers/workers/inference/summarize.ts1
-rw-r--r--apps/workers/workers/inference/tagging.ts1
-rw-r--r--packages/trpc/lib/__tests__/ruleEngine.test.ts15
-rw-r--r--packages/trpc/lib/ruleEngine.ts15
-rw-r--r--packages/trpc/models/bookmarks.ts17
-rw-r--r--packages/trpc/models/tags.ts14
-rw-r--r--packages/trpc/routers/bookmarks.ts46
-rw-r--r--packages/trpc/routers/feeds.ts11
11 files changed, 103 insertions, 37 deletions
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index 42c0ff01..ff16906d 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -361,6 +361,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: req.priority,
+ groupId: bookmark.userId,
};
if (!isFixMode || anythingChanged) {
await OpenAIQueue.enqueue(
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 0ab3a3cc..44f8f0c3 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -986,10 +986,15 @@ async function handleAsAssetBookmark(
.where(eq(bookmarks.id, bookmarkId));
await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
});
- await AssetPreprocessingQueue.enqueue({
- bookmarkId,
- fixMode: false,
- });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId,
+ fixMode: false,
+ },
+ {
+ groupId: userId,
+ },
+ );
}
type StoreHtmlResult =
@@ -1286,6 +1291,7 @@ async function checkDomainRateLimit(
url: string,
jobId: string,
jobData: ZCrawlLinkRequest,
+ userId: string,
jobPriority?: number,
): Promise<boolean> {
const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
@@ -1319,6 +1325,7 @@ async function checkDomainRateLimit(
await LinkCrawlerQueue.enqueue(jobData, {
priority: jobPriority,
delayMs,
+ groupId: userId,
});
return false;
}
@@ -1354,6 +1361,7 @@ async function runCrawler(
url,
jobId,
job.data,
+ userId,
job.priority,
);
@@ -1411,6 +1419,7 @@ async function runCrawler(
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: job.priority,
+ groupId: userId,
};
// Enqueue openai job (if not set, assume it's true for backward compatibility)
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 57358880..3382e980 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -22,6 +22,7 @@ export const FeedRefreshingWorker = cron.schedule(
.findMany({
columns: {
id: true,
+ userId: true,
},
where: eq(rssFeedsTable.enabled, true),
})
@@ -38,6 +39,7 @@ export const FeedRefreshingWorker = cron.schedule(
},
{
idempotencyKey,
+ groupId: feed.userId,
},
);
}
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 0b25470e..0fa24869 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -137,5 +137,6 @@ URL: ${link.url ?? ""}
await triggerSearchReindex(bookmarkId, {
priority: job.priority,
+ groupId: bookmarkData.userId,
});
}
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 789a30b4..50a03b00 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -460,6 +460,7 @@ export async function runTagging(
// Propagate priority to child jobs
const enqueueOpts: EnqueueOptions = {
priority: job.priority,
+ groupId: bookmark.userId,
};
// Trigger a webhook
diff --git a/packages/trpc/lib/__tests__/ruleEngine.test.ts b/packages/trpc/lib/__tests__/ruleEngine.test.ts
index a108ede7..ede22ec6 100644
--- a/packages/trpc/lib/__tests__/ruleEngine.test.ts
+++ b/packages/trpc/lib/__tests__/ruleEngine.test.ts
@@ -528,11 +528,16 @@ describe("RuleEngine", () => {
const action: RuleEngineAction = { type: "downloadFullPageArchive" };
const result = await engine.executeAction(action);
expect(result).toBe(`Enqueued full page archive`);
- expect(LinkCrawlerQueue.enqueue).toHaveBeenCalledWith({
- bookmarkId: bookmarkId,
- archiveFullPage: true,
- runInference: false,
- });
+ expect(LinkCrawlerQueue.enqueue).toHaveBeenCalledWith(
+ {
+ bookmarkId: bookmarkId,
+ archiveFullPage: true,
+ runInference: false,
+ },
+ {
+ groupId: userId,
+ },
+ );
});
it("should execute favouriteBookmark action", async () => {
diff --git a/packages/trpc/lib/ruleEngine.ts b/packages/trpc/lib/ruleEngine.ts
index 2d5deae6..c191619b 100644
--- a/packages/trpc/lib/ruleEngine.ts
+++ b/packages/trpc/lib/ruleEngine.ts
@@ -189,11 +189,16 @@ export class RuleEngine {
return `Removed from list ${action.listId}`;
}
case "downloadFullPageArchive": {
- await LinkCrawlerQueue.enqueue({
- bookmarkId: this.bookmark.id,
- archiveFullPage: true,
- runInference: false,
- });
+ await LinkCrawlerQueue.enqueue(
+ {
+ bookmarkId: this.bookmark.id,
+ archiveFullPage: true,
+ runInference: false,
+ },
+ {
+ groupId: this.bookmark.userId,
+ },
+ );
return `Enqueued full page archive`;
}
case "favouriteBookmark": {
diff --git a/packages/trpc/models/bookmarks.ts b/packages/trpc/models/bookmarks.ts
index bd696ee8..07fa8693 100644
--- a/packages/trpc/models/bookmarks.ts
+++ b/packages/trpc/models/bookmarks.ts
@@ -779,12 +779,19 @@ export class Bookmark extends BareBookmark {
),
);
- await SearchIndexingQueue.enqueue({
- bookmarkId: this.bookmark.id,
- type: "delete",
- });
+ await SearchIndexingQueue.enqueue(
+ {
+ bookmarkId: this.bookmark.id,
+ type: "delete",
+ },
+ {
+ groupId: this.ctx.user.id,
+ },
+ );
- await triggerWebhook(this.bookmark.id, "deleted", this.ctx.user.id);
+ await triggerWebhook(this.bookmark.id, "deleted", this.ctx.user.id, {
+ groupId: this.ctx.user.id,
+ });
if (deleted.changes > 0) {
await this.cleanupAssets();
}
diff --git a/packages/trpc/models/tags.ts b/packages/trpc/models/tags.ts
index b230b6b4..55532077 100644
--- a/packages/trpc/models/tags.ts
+++ b/packages/trpc/models/tags.ts
@@ -280,7 +280,11 @@ export class Tag {
try {
await Promise.all(
- affectedBookmarks.map((id) => triggerSearchReindex(id)),
+ affectedBookmarks.map((id) =>
+ triggerSearchReindex(id, {
+ groupId: ctx.user.id,
+ }),
+ ),
);
} catch (e) {
console.error("Failed to reindex affected bookmarks", e);
@@ -315,7 +319,9 @@ export class Tag {
await Promise.all(
affectedBookmarks.map(({ bookmarkId }) =>
- triggerSearchReindex(bookmarkId),
+ triggerSearchReindex(bookmarkId, {
+ groupId: this.ctx.user.id,
+ }),
),
);
}
@@ -352,7 +358,9 @@ export class Tag {
await Promise.all(
affectedBookmarks
.map((b) => b.bookmarkId)
- .map((id) => triggerSearchReindex(id)),
+ .map((id) =>
+ triggerSearchReindex(id, { groupId: this.ctx.user.id }),
+ ),
);
} catch (e) {
console.error("Failed to reindex affected bookmarks", e);
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 389f026c..05ffa240 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -281,6 +281,7 @@ export const bookmarksAppRouter = router({
const enqueueOpts: EnqueueOptions = {
// The lower the priority number, the sooner the job will be processed
priority: input.crawlPriority === "low" ? 50 : 0,
+ groupId: ctx.user.id,
};
switch (bookmark.content.type) {
@@ -487,8 +488,12 @@ export const bookmarksAppRouter = router({
);
}
// Trigger re-indexing and webhooks
- await triggerSearchReindex(input.bookmarkId);
- await triggerWebhook(input.bookmarkId, "edited");
+ await triggerSearchReindex(input.bookmarkId, {
+ groupId: ctx.user.id,
+ });
+ await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, {
+ groupId: ctx.user.id,
+ });
return updatedBookmark;
}),
@@ -527,8 +532,12 @@ export const bookmarksAppRouter = router({
),
);
});
- await triggerSearchReindex(input.bookmarkId);
- await triggerWebhook(input.bookmarkId, "edited");
+ await triggerSearchReindex(input.bookmarkId, {
+ groupId: ctx.user.id,
+ });
+ await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, {
+ groupId: ctx.user.id,
+ });
}),
deleteBookmark: authedProcedure
@@ -561,10 +570,15 @@ export const bookmarksAppRouter = router({
crawlStatusCode: null,
})
.where(eq(bookmarkLinks.id, input.bookmarkId));
- await LinkCrawlerQueue.enqueue({
- bookmarkId: input.bookmarkId,
- archiveFullPage: input.archiveFullPage,
- });
+ await LinkCrawlerQueue.enqueue(
+ {
+ bookmarkId: input.bookmarkId,
+ archiveFullPage: input.archiveFullPage,
+ },
+ {
+ groupId: ctx.user.id,
+ },
+ );
}),
getBookmark: authedProcedure
.input(
@@ -818,8 +832,12 @@ export const bookmarksAppRouter = router({
tagId: t,
})),
]);
- await triggerSearchReindex(input.bookmarkId);
- await triggerWebhook(input.bookmarkId, "edited");
+ await triggerSearchReindex(input.bookmarkId, {
+ groupId: ctx.user.id,
+ });
+ await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, {
+ groupId: ctx.user.id,
+ });
return {
bookmarkId: input.bookmarkId,
attached: allIds,
@@ -959,8 +977,12 @@ Author: ${bookmark.author ?? ""}
summary: summary.response,
})
.where(eq(bookmarks.id, input.bookmarkId));
- await triggerSearchReindex(input.bookmarkId);
- await triggerWebhook(input.bookmarkId, "edited");
+ await triggerSearchReindex(input.bookmarkId, {
+ groupId: ctx.user.id,
+ });
+ await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, {
+ groupId: ctx.user.id,
+ });
return {
bookmarkId: input.bookmarkId,
diff --git a/packages/trpc/routers/feeds.ts b/packages/trpc/routers/feeds.ts
index 57c88084..8591c98c 100644
--- a/packages/trpc/routers/feeds.ts
+++ b/packages/trpc/routers/feeds.ts
@@ -57,8 +57,13 @@ export const feedsAppRouter = router({
.input(z.object({ feedId: z.string() }))
.mutation(async ({ input, ctx }) => {
await Feed.fromId(ctx, input.feedId);
- await FeedQueue.enqueue({
- feedId: input.feedId,
- });
+ await FeedQueue.enqueue(
+ {
+ feedId: input.feedId,
+ },
+ {
+ groupId: ctx.user.id,
+ },
+ );
}),
});