diff options
| author | 玄猫 <hanguofeng@gmail.com> | 2025-01-19 20:34:42 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-19 12:34:42 +0000 |
| commit | b9cce5d12baa40deb21ab4f36be19e3a41e18ad4 (patch) | |
| tree | f4ec46c026c35ec2c70393b92f52b87e468746ef /apps | |
| parent | b323573047dee4c358f513fbb9b50174e9e98a99 (diff) | |
| download | karakeep-b9cce5d12baa40deb21ab4f36be19e3a41e18ad4.tar.zst | |
feat(webhook): Implement webhook functionality for bookmark events (#852)
* feat(webhook): Implement webhook functionality for bookmark events
- Added WebhookWorker to handle webhook requests.
- Integrated webhook triggering in crawlerWorker after video processing.
- Updated main worker initialization to include WebhookWorker.
- Enhanced configuration to support webhook URLs, token, and timeout.
- Documented webhook configuration options in the documentation.
- Introduced zWebhookRequestSchema for validating webhook requests.
* feat(webhook): Update webhook handling and configuration
- Changed webhook operation type from "create" to "crawled" in crawlerWorker and documentation.
- Enhanced webhook retry logic in WebhookWorker to support multiple attempts.
- Updated Docker configuration to include new webhook environment variables.
- Improved validation for webhook configuration in shared config.
- Adjusted zWebhookRequestSchema to reflect the new operation type.
- Updated documentation to clarify webhook configuration options and usage.
* minor modifications
---------
Co-authored-by: Mohamed Bassem <me@mbassem.com>
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 4 | ||||
| -rw-r--r-- | apps/workers/index.ts | 34 | ||||
| -rw-r--r-- | apps/workers/webhookWorker.ts | 136 |
3 files changed, 163 insertions, 11 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index 16b1f4ae..9666299d 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -55,6 +55,7 @@ import { OpenAIQueue, triggerSearchReindex, triggerVideoWorker, + triggerWebhookWorker, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; @@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { // Trigger a potential download of a video from the URL await triggerVideoWorker(bookmarkId, url); + // Trigger a webhook + await triggerWebhookWorker(bookmarkId, "crawled"); + // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); } diff --git a/apps/workers/index.ts b/apps/workers/index.ts index c2d3f28a..3997b423 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit"; import { OpenAiWorker } from "./openaiWorker"; import { SearchIndexingWorker } from "./searchWorker"; import { VideoWorker } from "./videoWorker"; +import { WebhookWorker } from "./webhookWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] = - [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - AssetPreprocessingWorker.build(), - ]; + const [ + crawler, + openai, + search, + tidyAssets, + video, + feed, + assetPreprocessing, + webhook, + ] = [ + await CrawlerWorker.build(), + OpenAiWorker.build(), + SearchIndexingWorker.build(), + TidyAssetsWorker.build(), + VideoWorker.build(), + FeedWorker.build(), + AssetPreprocessingWorker.build(), + WebhookWorker.build(), + ]; FeedRefreshingWorker.start(); await Promise.any([ @@ -39,11 +49,12 @@ async function main() { video.run(), feed.run(), assetPreprocessing.run(), + webhook.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...", ); FeedRefreshingWorker.stop(); @@ -54,6 +65,7 @@ async function main() { video.stop(); feed.stop(); assetPreprocessing.stop(); + webhook.stop(); } main(); diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts new file mode 100644 index 00000000..5124f8a4 --- /dev/null +++ b/apps/workers/webhookWorker.ts @@ -0,0 +1,136 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import fetch from "node-fetch"; + +import { db } from "@hoarder/db"; +import { bookmarks } from "@hoarder/db/schema"; +import serverConfig from "@hoarder/shared/config"; +import logger from "@hoarder/shared/logger"; +import { + WebhookQueue, + ZWebhookRequest, + zWebhookRequestSchema, +} from "@hoarder/shared/queues"; + +export class WebhookWorker { + static build() { + logger.info("Starting webhook worker ..."); + const worker = new Runner<ZWebhookRequest>( + WebhookQueue, + { + run: runWebhook, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[webhook][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: + serverConfig.webhook.timeoutSec * + (serverConfig.webhook.retryTimes + 1) + + 1, //consider retry times, and timeout and add 1 second for other stuff + validator: zWebhookRequestSchema, + }, + ); + + return worker; + } +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} + +async function runWebhook(job: DequeuedJob<ZWebhookRequest>) { + const jobId = job.id; + const webhookUrls = serverConfig.webhook.urls; + if (!webhookUrls) { + logger.info( + `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`, + ); + return; + } + const webhookToken = serverConfig.webhook.token; + const webhookTimeoutSec = serverConfig.webhook.timeoutSec; + + const { bookmarkId } = job.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + logger.info( + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`, + ); + + await Promise.allSettled( + webhookUrls.map(async (url) => { + const maxRetries = serverConfig.webhook.retryTimes; + let attempt = 0; + let success = false; + + while (attempt < maxRetries && !success) { + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(webhookToken + ? { + Authorization: `Bearer ${webhookToken}`, + } + : {}), + }, + body: JSON.stringify({ + jobId, + bookmarkId, + userId: bookmark.userId, + url: bookmark.link ? bookmark.link.url : undefined, + type: bookmark.type, + operation: job.data.operation, + }), + signal: AbortSignal.timeout(webhookTimeoutSec * 1000), + }); + + if (!response.ok) { + logger.error( + `Webhook call to ${url} failed with status: ${response.status}`, + ); + } else { + logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`); + success = true; + } + } catch (error) { + logger.error( + `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, + ); + } + attempt++; + if (!success && attempt < maxRetries) { + logger.info( + `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, + ); + } + } + }), + ); +} |
