diff options
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 1 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 17 | ||||
| -rw-r--r-- | apps/workers/workers/feedWorker.ts | 2 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 1 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 1 | ||||
| -rw-r--r-- | packages/trpc/lib/__tests__/ruleEngine.test.ts | 15 | ||||
| -rw-r--r-- | packages/trpc/lib/ruleEngine.ts | 15 | ||||
| -rw-r--r-- | packages/trpc/models/bookmarks.ts | 17 | ||||
| -rw-r--r-- | packages/trpc/models/tags.ts | 14 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 46 | ||||
| -rw-r--r-- | packages/trpc/routers/feeds.ts | 11 |
11 files changed, 103 insertions, 37 deletions
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 42c0ff01..ff16906d 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -361,6 +361,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: req.priority, + groupId: bookmark.userId, }; if (!isFixMode || anythingChanged) { await OpenAIQueue.enqueue( diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 0ab3a3cc..44f8f0c3 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -986,10 +986,15 @@ async function handleAsAssetBookmark( .where(eq(bookmarks.id, bookmarkId)); await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); }); - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: false, - }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId, + fixMode: false, + }, + { + groupId: userId, + }, + ); } type StoreHtmlResult = @@ -1286,6 +1291,7 @@ async function checkDomainRateLimit( url: string, jobId: string, jobData: ZCrawlLinkRequest, + userId: string, jobPriority?: number, ): Promise<boolean> { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; @@ -1319,6 +1325,7 @@ async function checkDomainRateLimit( await LinkCrawlerQueue.enqueue(jobData, { priority: jobPriority, delayMs, + groupId: userId, }); return false; } @@ -1354,6 +1361,7 @@ async function runCrawler( url, jobId, job.data, + userId, job.priority, ); @@ -1411,6 +1419,7 @@ async function runCrawler( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: userId, }; // Enqueue openai job (if not set, assume it's true for backward compatibility) diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 57358880..3382e980 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -22,6 +22,7 @@ export const FeedRefreshingWorker = cron.schedule( .findMany({ columns: { id: true, + userId: true, }, where: eq(rssFeedsTable.enabled, true), }) @@ -38,6 +39,7 @@ export const FeedRefreshingWorker = cron.schedule( }, { idempotencyKey, + groupId: feed.userId, }, ); } diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 0b25470e..0fa24869 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -137,5 +137,6 @@ URL: ${link.url ?? ""} await triggerSearchReindex(bookmarkId, { priority: job.priority, + groupId: bookmarkData.userId, }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 789a30b4..50a03b00 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -460,6 +460,7 @@ export async function runTagging( // Propagate priority to child jobs const enqueueOpts: EnqueueOptions = { priority: job.priority, + groupId: bookmark.userId, }; // Trigger a webhook diff --git a/packages/trpc/lib/__tests__/ruleEngine.test.ts b/packages/trpc/lib/__tests__/ruleEngine.test.ts index a108ede7..ede22ec6 100644 --- a/packages/trpc/lib/__tests__/ruleEngine.test.ts +++ b/packages/trpc/lib/__tests__/ruleEngine.test.ts @@ -528,11 +528,16 @@ describe("RuleEngine", () => { const action: RuleEngineAction = { type: "downloadFullPageArchive" }; const result = await engine.executeAction(action); expect(result).toBe(`Enqueued full page archive`); - expect(LinkCrawlerQueue.enqueue).toHaveBeenCalledWith({ - bookmarkId: bookmarkId, - archiveFullPage: true, - runInference: false, - }); + expect(LinkCrawlerQueue.enqueue).toHaveBeenCalledWith( + { + bookmarkId: bookmarkId, + archiveFullPage: true, + runInference: false, + }, + { + groupId: userId, + }, + ); }); it("should execute favouriteBookmark action", async () => { diff --git a/packages/trpc/lib/ruleEngine.ts b/packages/trpc/lib/ruleEngine.ts index 2d5deae6..c191619b 100644 --- a/packages/trpc/lib/ruleEngine.ts +++ b/packages/trpc/lib/ruleEngine.ts @@ -189,11 +189,16 @@ export class RuleEngine { return `Removed from list ${action.listId}`; } case "downloadFullPageArchive": { - await LinkCrawlerQueue.enqueue({ - bookmarkId: this.bookmark.id, - archiveFullPage: true, - runInference: false, - }); + await LinkCrawlerQueue.enqueue( + { + bookmarkId: this.bookmark.id, + archiveFullPage: true, + runInference: false, + }, + { + groupId: this.bookmark.userId, + }, + ); return `Enqueued full page archive`; } case "favouriteBookmark": { diff --git a/packages/trpc/models/bookmarks.ts b/packages/trpc/models/bookmarks.ts index bd696ee8..07fa8693 100644 --- a/packages/trpc/models/bookmarks.ts +++ b/packages/trpc/models/bookmarks.ts @@ -779,12 +779,19 @@ export class Bookmark extends BareBookmark { ), ); - await SearchIndexingQueue.enqueue({ - bookmarkId: this.bookmark.id, - type: "delete", - }); + await SearchIndexingQueue.enqueue( + { + bookmarkId: this.bookmark.id, + type: "delete", + }, + { + groupId: this.ctx.user.id, + }, + ); - await triggerWebhook(this.bookmark.id, "deleted", this.ctx.user.id); + await triggerWebhook(this.bookmark.id, "deleted", this.ctx.user.id, { + groupId: this.ctx.user.id, + }); if (deleted.changes > 0) { await this.cleanupAssets(); } diff --git a/packages/trpc/models/tags.ts b/packages/trpc/models/tags.ts index b230b6b4..55532077 100644 --- a/packages/trpc/models/tags.ts +++ b/packages/trpc/models/tags.ts @@ -280,7 +280,11 @@ export class Tag { try { await Promise.all( - affectedBookmarks.map((id) => triggerSearchReindex(id)), + affectedBookmarks.map((id) => + triggerSearchReindex(id, { + groupId: ctx.user.id, + }), + ), ); } catch (e) { console.error("Failed to reindex affected bookmarks", e); @@ -315,7 +319,9 @@ export class Tag { await Promise.all( affectedBookmarks.map(({ bookmarkId }) => - triggerSearchReindex(bookmarkId), + triggerSearchReindex(bookmarkId, { + groupId: this.ctx.user.id, + }), ), ); } @@ -352,7 +358,9 @@ export class Tag { await Promise.all( affectedBookmarks .map((b) => b.bookmarkId) - .map((id) => triggerSearchReindex(id)), + .map((id) => + triggerSearchReindex(id, { groupId: this.ctx.user.id }), + ), ); } catch (e) { console.error("Failed to reindex affected bookmarks", e); diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 389f026c..05ffa240 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -281,6 +281,7 @@ export const bookmarksAppRouter = router({ const enqueueOpts: EnqueueOptions = { // The lower the priority number, the sooner the job will be processed priority: input.crawlPriority === "low" ? 50 : 0, + groupId: ctx.user.id, }; switch (bookmark.content.type) { @@ -487,8 +488,12 @@ export const bookmarksAppRouter = router({ ); } // Trigger re-indexing and webhooks - await triggerSearchReindex(input.bookmarkId); - await triggerWebhook(input.bookmarkId, "edited"); + await triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }); + await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }); return updatedBookmark; }), @@ -527,8 +532,12 @@ export const bookmarksAppRouter = router({ ), ); }); - await triggerSearchReindex(input.bookmarkId); - await triggerWebhook(input.bookmarkId, "edited"); + await triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }); + await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }); }), deleteBookmark: authedProcedure @@ -561,10 +570,15 @@ export const bookmarksAppRouter = router({ crawlStatusCode: null, }) .where(eq(bookmarkLinks.id, input.bookmarkId)); - await LinkCrawlerQueue.enqueue({ - bookmarkId: input.bookmarkId, - archiveFullPage: input.archiveFullPage, - }); + await LinkCrawlerQueue.enqueue( + { + bookmarkId: input.bookmarkId, + archiveFullPage: input.archiveFullPage, + }, + { + groupId: ctx.user.id, + }, + ); }), getBookmark: authedProcedure .input( @@ -818,8 +832,12 @@ export const bookmarksAppRouter = router({ tagId: t, })), ]); - await triggerSearchReindex(input.bookmarkId); - await triggerWebhook(input.bookmarkId, "edited"); + await triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }); + await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }); return { bookmarkId: input.bookmarkId, attached: allIds, @@ -959,8 +977,12 @@ Author: ${bookmark.author ?? ""} summary: summary.response, }) .where(eq(bookmarks.id, input.bookmarkId)); - await triggerSearchReindex(input.bookmarkId); - await triggerWebhook(input.bookmarkId, "edited"); + await triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }); + await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }); return { bookmarkId: input.bookmarkId, diff --git a/packages/trpc/routers/feeds.ts b/packages/trpc/routers/feeds.ts index 57c88084..8591c98c 100644 --- a/packages/trpc/routers/feeds.ts +++ b/packages/trpc/routers/feeds.ts @@ -57,8 +57,13 @@ export const feedsAppRouter = router({ .input(z.object({ feedId: z.string() })) .mutation(async ({ input, ctx }) => { await Feed.fromId(ctx, input.feedId); - await FeedQueue.enqueue({ - feedId: input.feedId, - }); + await FeedQueue.enqueue( + { + feedId: input.feedId, + }, + { + groupId: ctx.user.id, + }, + ); }), }); |
