diff options
| author | 玄猫 <hanguofeng@gmail.com> | 2025-01-19 20:34:42 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-19 12:34:42 +0000 |
| commit | b9cce5d12baa40deb21ab4f36be19e3a41e18ad4 (patch) | |
| tree | f4ec46c026c35ec2c70393b92f52b87e468746ef /apps/workers/webhookWorker.ts | |
| parent | b323573047dee4c358f513fbb9b50174e9e98a99 (diff) | |
| download | karakeep-b9cce5d12baa40deb21ab4f36be19e3a41e18ad4.tar.zst | |
feat(webhook): Implement webhook functionality for bookmark events (#852)
* feat(webhook): Implement webhook functionality for bookmark events
- Added WebhookWorker to handle webhook requests.
- Integrated webhook triggering in crawlerWorker after video processing.
- Updated main worker initialization to include WebhookWorker.
- Enhanced configuration to support webhook URLs, token, and timeout.
- Documented webhook configuration options in the documentation.
- Introduced zWebhookRequestSchema for validating webhook requests.
* feat(webhook): Update webhook handling and configuration
- Changed webhook operation type from "create" to "crawled" in crawlerWorker and documentation.
- Enhanced webhook retry logic in WebhookWorker to support multiple attempts.
- Updated Docker configuration to include new webhook environment variables.
- Improved validation for webhook configuration in shared config.
- Adjusted zWebhookRequestSchema to reflect the new operation type.
- Updated documentation to clarify webhook configuration options and usage.
* minor modifications
---------
Co-authored-by: Mohamed Bassem <me@mbassem.com>
Diffstat (limited to 'apps/workers/webhookWorker.ts')
| -rw-r--r-- | apps/workers/webhookWorker.ts | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts new file mode 100644 index 00000000..5124f8a4 --- /dev/null +++ b/apps/workers/webhookWorker.ts @@ -0,0 +1,136 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import fetch from "node-fetch"; + +import { db } from "@hoarder/db"; +import { bookmarks } from "@hoarder/db/schema"; +import serverConfig from "@hoarder/shared/config"; +import logger from "@hoarder/shared/logger"; +import { + WebhookQueue, + ZWebhookRequest, + zWebhookRequestSchema, +} from "@hoarder/shared/queues"; + +export class WebhookWorker { + static build() { + logger.info("Starting webhook worker ..."); + const worker = new Runner<ZWebhookRequest>( + WebhookQueue, + { + run: runWebhook, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[webhook][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: + serverConfig.webhook.timeoutSec * + (serverConfig.webhook.retryTimes + 1) + + 1, //consider retry times, and timeout and add 1 second for other stuff + validator: zWebhookRequestSchema, + }, + ); + + return worker; + } +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} + +async function runWebhook(job: DequeuedJob<ZWebhookRequest>) { + const jobId = job.id; + const webhookUrls = serverConfig.webhook.urls; + if (!webhookUrls) { + logger.info( + `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`, + ); + return; + } + const webhookToken = serverConfig.webhook.token; + const webhookTimeoutSec = serverConfig.webhook.timeoutSec; + + const { bookmarkId } = job.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + logger.info( + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`, + ); + + await Promise.allSettled( + webhookUrls.map(async (url) => { + const maxRetries = serverConfig.webhook.retryTimes; + let attempt = 0; + let success = false; + + while (attempt < maxRetries && !success) { + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(webhookToken + ? { + Authorization: `Bearer ${webhookToken}`, + } + : {}), + }, + body: JSON.stringify({ + jobId, + bookmarkId, + userId: bookmark.userId, + url: bookmark.link ? bookmark.link.url : undefined, + type: bookmark.type, + operation: job.data.operation, + }), + signal: AbortSignal.timeout(webhookTimeoutSec * 1000), + }); + + if (!response.ok) { + logger.error( + `Webhook call to ${url} failed with status: ${response.status}`, + ); + } else { + logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`); + success = true; + } + } catch (error) { + logger.error( + `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, + ); + } + attempt++; + if (!success && attempt < maxRetries) { + logger.info( + `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, + ); + } + } + }), + ); +} |
