From 04f93941ca4d10f09befb4d1b219899ecbe792b1 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 21 Jun 2025 14:34:09 +0000 Subject: fix: Fix webhook not firing on deletion. Fixes #1613 --- apps/workers/workers/webhookWorker.ts | 37 ++++++++++++++++++----------------- packages/shared/queues.ts | 3 +++ packages/trpc/routers/bookmarks.ts | 2 +- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 9d3ed2c1..f42266dd 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -3,7 +3,7 @@ import { DequeuedJob, Runner } from "liteque"; import fetch from "node-fetch"; import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; +import { bookmarks, webhooksTable } from "@karakeep/db/schema"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { @@ -56,38 +56,39 @@ async function fetchBookmark(bookmarkId: string) { url: true, }, }, - user: { - columns: {}, - with: { - webhooks: true, - }, - }, }, }); } +async function fetchUserWebhooks(userId: string) { + return await db.query.webhooksTable.findMany({ + where: eq(webhooksTable.userId, userId), + }); +} + async function runWebhook(job: DequeuedJob) { const jobId = job.id; const webhookTimeoutSec = serverConfig.webhook.timeoutSec; const { bookmarkId } = job.data; const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - if (!bookmark.user.webhooks) { + const userId = job.data.userId ?? bookmark?.userId; + if (!userId) { + logger.error( + `[webhook][${jobId}] Failed to find user for bookmark with id ${bookmarkId}. Skipping webhook`, + ); return; } + const webhooks = await fetchUserWebhooks(userId); + logger.info( - `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id} for operation "${job.data.operation}"`, + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmarkId} for operation "${job.data.operation}"`, ); await Promise.allSettled( - bookmark.user.webhooks + webhooks .filter((w) => w.events.includes(job.data.operation)) .map(async (webhook) => { const url = webhook.url; @@ -111,9 +112,9 @@ async function runWebhook(job: DequeuedJob) { body: JSON.stringify({ jobId, bookmarkId, - userId: bookmark.userId, - url: bookmark.link ? bookmark.link.url : undefined, - type: bookmark.type, + userId, + url: bookmark?.link ? bookmark.link.url : undefined, + type: bookmark?.type, operation: job.data.operation, }), signal: AbortSignal.timeout(webhookTimeoutSec * 1000), diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index a434414c..a2fdc6b4 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -173,6 +173,7 @@ export const AssetPreprocessingQueue = export const zWebhookRequestSchema = z.object({ bookmarkId: z.string(), operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]), + userId: z.string().optional(), }); export type ZWebhookRequest = z.infer; export const WebhookQueue = new SqliteQueue( @@ -189,9 +190,11 @@ export const WebhookQueue = new SqliteQueue( export async function triggerWebhook( bookmarkId: string, operation: ZWebhookRequest["operation"], + userId?: string, ) { await WebhookQueue.enqueue({ bookmarkId, + userId, operation, }); } diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 04d15d1f..815cf90d 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -646,7 +646,7 @@ export const bookmarksAppRouter = router({ ), ); await triggerSearchDeletion(input.bookmarkId); - await triggerWebhook(input.bookmarkId, "deleted"); + await triggerWebhook(input.bookmarkId, "deleted", ctx.user.id); if (deleted.changes > 0 && bookmark) { await cleanupAssetForBookmark({ asset: bookmark.asset, -- cgit v1.2.3-70-g09d2