diff options
| author | 玄猫 <hanguofeng@gmail.com> | 2025-01-19 20:34:42 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-19 12:34:42 +0000 |
| commit | b9cce5d12baa40deb21ab4f36be19e3a41e18ad4 (patch) | |
| tree | f4ec46c026c35ec2c70393b92f52b87e468746ef | |
| parent | b323573047dee4c358f513fbb9b50174e9e98a99 (diff) | |
| download | karakeep-b9cce5d12baa40deb21ab4f36be19e3a41e18ad4.tar.zst | |
feat(webhook): Implement webhook functionality for bookmark events (#852)
* feat(webhook): Implement webhook functionality for bookmark events
- Added WebhookWorker to handle webhook requests.
- Integrated webhook triggering in crawlerWorker after video processing.
- Updated main worker initialization to include WebhookWorker.
- Enhanced configuration to support webhook URLs, token, and timeout.
- Documented webhook configuration options in the documentation.
- Introduced zWebhookRequestSchema for validating webhook requests.
* feat(webhook): Update webhook handling and configuration
- Changed webhook operation type from "create" to "crawled" in crawlerWorker and documentation.
- Enhanced webhook retry logic in WebhookWorker to support multiple attempts.
- Updated Docker configuration to include new webhook environment variables.
- Improved validation for webhook configuration in shared config.
- Adjusted zWebhookRequestSchema to reflect the new operation type.
- Updated documentation to clarify webhook configuration options and usage.
* minor modifications
---------
Co-authored-by: Mohamed Bassem <me@mbassem.com>
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 4 | ||||
| -rw-r--r-- | apps/workers/index.ts | 34 | ||||
| -rw-r--r-- | apps/workers/webhookWorker.ts | 136 | ||||
| -rw-r--r-- | docs/docs/03-configuration.md | 33 | ||||
| -rw-r--r-- | packages/shared/config.ts | 14 | ||||
| -rw-r--r-- | packages/shared/queues.ts | 27 |
6 files changed, 237 insertions, 11 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index 16b1f4ae..9666299d 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -55,6 +55,7 @@ import { OpenAIQueue, triggerSearchReindex, triggerVideoWorker, + triggerWebhookWorker, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; import { BookmarkTypes } from "@hoarder/shared/types/bookmarks"; @@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { // Trigger a potential download of a video from the URL await triggerVideoWorker(bookmarkId, url); + // Trigger a webhook + await triggerWebhookWorker(bookmarkId, "crawled"); + // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); } diff --git a/apps/workers/index.ts b/apps/workers/index.ts index c2d3f28a..3997b423 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit"; import { OpenAiWorker } from "./openaiWorker"; import { SearchIndexingWorker } from "./searchWorker"; import { VideoWorker } from "./videoWorker"; +import { WebhookWorker } from "./webhookWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] = - [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - AssetPreprocessingWorker.build(), - ]; + const [ + crawler, + openai, + search, + tidyAssets, + video, + feed, + assetPreprocessing, + webhook, + ] = [ + await CrawlerWorker.build(), + OpenAiWorker.build(), + SearchIndexingWorker.build(), + TidyAssetsWorker.build(), + VideoWorker.build(), + FeedWorker.build(), + AssetPreprocessingWorker.build(), + WebhookWorker.build(), + ]; FeedRefreshingWorker.start(); await Promise.any([ @@ -39,11 +49,12 @@ async function main() { video.run(), feed.run(), assetPreprocessing.run(), + webhook.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...", ); FeedRefreshingWorker.stop(); @@ -54,6 +65,7 @@ async function main() { video.stop(); feed.stop(); assetPreprocessing.stop(); + webhook.stop(); } main(); diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts new file mode 100644 index 00000000..5124f8a4 --- /dev/null +++ b/apps/workers/webhookWorker.ts @@ -0,0 +1,136 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import fetch from "node-fetch"; + +import { db } from "@hoarder/db"; +import { bookmarks } from "@hoarder/db/schema"; +import serverConfig from "@hoarder/shared/config"; +import logger from "@hoarder/shared/logger"; +import { + WebhookQueue, + ZWebhookRequest, + zWebhookRequestSchema, +} from "@hoarder/shared/queues"; + +export class WebhookWorker { + static build() { + logger.info("Starting webhook worker ..."); + const worker = new Runner<ZWebhookRequest>( + WebhookQueue, + { + run: runWebhook, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[webhook][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: + serverConfig.webhook.timeoutSec * + (serverConfig.webhook.retryTimes + 1) + + 1, //consider retry times, and timeout and add 1 second for other stuff + validator: zWebhookRequestSchema, + }, + ); + + return worker; + } +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} + +async function runWebhook(job: DequeuedJob<ZWebhookRequest>) { + const jobId = job.id; + const webhookUrls = serverConfig.webhook.urls; + if (!webhookUrls) { + logger.info( + `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`, + ); + return; + } + const webhookToken = serverConfig.webhook.token; + const webhookTimeoutSec = serverConfig.webhook.timeoutSec; + + const { bookmarkId } = job.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + logger.info( + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`, + ); + + await Promise.allSettled( + webhookUrls.map(async (url) => { + const maxRetries = serverConfig.webhook.retryTimes; + let attempt = 0; + let success = false; + + while (attempt < maxRetries && !success) { + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(webhookToken + ? { + Authorization: `Bearer ${webhookToken}`, + } + : {}), + }, + body: JSON.stringify({ + jobId, + bookmarkId, + userId: bookmark.userId, + url: bookmark.link ? bookmark.link.url : undefined, + type: bookmark.type, + operation: job.data.operation, + }), + signal: AbortSignal.timeout(webhookTimeoutSec * 1000), + }); + + if (!response.ok) { + logger.error( + `Webhook call to ${url} failed with status: ${response.status}`, + ); + } else { + logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`); + success = true; + } + } catch (error) { + logger.error( + `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, + ); + } + attempt++; + if (!success && attempt < maxRetries) { + logger.info( + `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, + ); + } + } + }), + ); +} diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md index 82438dbf..bf326aff 100644 --- a/docs/docs/03-configuration.md +++ b/docs/docs/03-configuration.md @@ -95,3 +95,36 @@ Hoarder uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract t | OCR_CACHE_DIR | No | $TEMP_DIR | The dir where tesseract will download its models. By default, those models are not persisted and stored in the OS' temp dir. | | OCR_LANGS | No | eng | Comma separated list of the language codes that you want tesseract to support. You can find the language codes [here](https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html). Set to empty string to disable OCR. | | OCR_CONFIDENCE_THRESHOLD | No | 50 | A number between 0 and 100 indicating the minimum acceptable confidence from tessaract. If tessaract's confidence is lower than this value, extracted text won't be stored. | + +## Webhook Configs + +You can use webhooks to trigger actions when bookmarks are changed ( only support _crawled_ now ). + +| Name | Required | Default | Description | +| ------------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- | +| WEBHOOK_URLS | No | | The urls of the webhooks to trigger, separated by commas. | +| WEBHOOK_TOKEN | No | | The token to use for authentication. Will appears in the Authorization header as Bearer token. | +| WEBHOOK_TIMEOUT_SEC | No | 5 | The timeout for the webhook request in seconds. | +| WEBHOOK_RETRY_TIMES | No | 3 | The number of times to retry the webhook request. | + +:::info + +- If a url is add to hoarder , after it is crawled, the webhook will be triggered. +- The WEBHOOK_TOKEN is used for authentication. It will appear in the Authorization header as Bearer token. + ``` + Authorization: Bearer <WEBHOOK_TOKEN> + ``` +- The webhook will be triggered with the job id (used for idempotence), bookmark id, bookmark type, the user id, the url and the operation in JSON format in the body. + + ```json + { + "jobId": 123, + "type": "link", + "bookmarkId": "exampleBookmarkId", + "userId": "exampleUserId", + "url": "https://example.com", + "operation": "crawled" + } + ``` + + ::: diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 7b74fc21..df9a5764 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -56,6 +56,14 @@ const allEnv = z.object({ DATA_DIR: z.string().default(""), MAX_ASSET_SIZE_MB: z.coerce.number().default(4), INFERENCE_LANG: z.string().default("english"), + WEBHOOK_URLS: z + .string() + .transform((val) => val.split(",")) + .pipe(z.array(z.string().url())) + .optional(), + WEBHOOK_TOKEN: z.string().optional(), + WEBHOOK_TIMEOUT_SEC: z.coerce.number().default(5), + WEBHOOK_RETRY_TIMES: z.coerce.number().int().min(0).default(3), // Build only flag SERVER_VERSION: z.string().optional(), DISABLE_NEW_RELEASE_CHECK: stringBool("false"), @@ -134,6 +142,12 @@ const serverConfigSchema = allEnv.transform((val) => { serverVersion: val.SERVER_VERSION, disableNewReleaseCheck: val.DISABLE_NEW_RELEASE_CHECK, usingLegacySeparateContainers: val.USING_LEGACY_SEPARATE_CONTAINERS, + webhook: { + urls: val.WEBHOOK_URLS, + token: val.WEBHOOK_TOKEN, + timeoutSec: val.WEBHOOK_TIMEOUT_SEC, + retryTimes: val.WEBHOOK_RETRY_TIMES, + }, }; }); diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index 7afb8774..b0002a29 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -158,3 +158,30 @@ export const AssetPreprocessingQueue = keepFailedJobs: false, }, ); + +//Webhook worker +export const zWebhookRequestSchema = z.object({ + bookmarkId: z.string(), + operation: z.enum(["crawled"]), +}); +export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; +export const WebhookQueue = new SqliteQueue<ZWebhookRequest>( + "webhook_queue", + queueDB, + { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }, +); + +export async function triggerWebhookWorker( + bookmarkId: string, + operation: "crawled", +) { + await WebhookQueue.enqueue({ + bookmarkId, + operation, + }); +} |
