aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
author玄猫 <hanguofeng@gmail.com>2025-01-19 20:34:42 +0800
committerGitHub <noreply@github.com>2025-01-19 12:34:42 +0000
commitb9cce5d12baa40deb21ab4f36be19e3a41e18ad4 (patch)
treef4ec46c026c35ec2c70393b92f52b87e468746ef
parentb323573047dee4c358f513fbb9b50174e9e98a99 (diff)
downloadkarakeep-b9cce5d12baa40deb21ab4f36be19e3a41e18ad4.tar.zst
feat(webhook): Implement webhook functionality for bookmark events (#852)
* feat(webhook): Implement webhook functionality for bookmark events - Added WebhookWorker to handle webhook requests. - Integrated webhook triggering in crawlerWorker after video processing. - Updated main worker initialization to include WebhookWorker. - Enhanced configuration to support webhook URLs, token, and timeout. - Documented webhook configuration options in the documentation. - Introduced zWebhookRequestSchema for validating webhook requests. * feat(webhook): Update webhook handling and configuration - Changed webhook operation type from "create" to "crawled" in crawlerWorker and documentation. - Enhanced webhook retry logic in WebhookWorker to support multiple attempts. - Updated Docker configuration to include new webhook environment variables. - Improved validation for webhook configuration in shared config. - Adjusted zWebhookRequestSchema to reflect the new operation type. - Updated documentation to clarify webhook configuration options and usage. * minor modifications --------- Co-authored-by: Mohamed Bassem <me@mbassem.com>
-rw-r--r--apps/workers/crawlerWorker.ts4
-rw-r--r--apps/workers/index.ts34
-rw-r--r--apps/workers/webhookWorker.ts136
-rw-r--r--docs/docs/03-configuration.md33
-rw-r--r--packages/shared/config.ts14
-rw-r--r--packages/shared/queues.ts27
6 files changed, 237 insertions, 11 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index 16b1f4ae..9666299d 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -55,6 +55,7 @@ import {
OpenAIQueue,
triggerSearchReindex,
triggerVideoWorker,
+ triggerWebhookWorker,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
@@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Trigger a potential download of a video from the URL
await triggerVideoWorker(bookmarkId, url);
+ // Trigger a webhook
+ await triggerWebhookWorker(bookmarkId, "crawled");
+
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index c2d3f28a..3997b423 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit";
import { OpenAiWorker } from "./openaiWorker";
import { SearchIndexingWorker } from "./searchWorker";
import { VideoWorker } from "./videoWorker";
+import { WebhookWorker } from "./webhookWorker";
async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
- [
- await CrawlerWorker.build(),
- OpenAiWorker.build(),
- SearchIndexingWorker.build(),
- TidyAssetsWorker.build(),
- VideoWorker.build(),
- FeedWorker.build(),
- AssetPreprocessingWorker.build(),
- ];
+ const [
+ crawler,
+ openai,
+ search,
+ tidyAssets,
+ video,
+ feed,
+ assetPreprocessing,
+ webhook,
+ ] = [
+ await CrawlerWorker.build(),
+ OpenAiWorker.build(),
+ SearchIndexingWorker.build(),
+ TidyAssetsWorker.build(),
+ VideoWorker.build(),
+ FeedWorker.build(),
+ AssetPreprocessingWorker.build(),
+ WebhookWorker.build(),
+ ];
FeedRefreshingWorker.start();
await Promise.any([
@@ -39,11 +49,12 @@ async function main() {
video.run(),
feed.run(),
assetPreprocessing.run(),
+ webhook.run(),
]),
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
+ "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
);
FeedRefreshingWorker.stop();
@@ -54,6 +65,7 @@ async function main() {
video.stop();
feed.stop();
assetPreprocessing.stop();
+ webhook.stop();
}
main();
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts
new file mode 100644
index 00000000..5124f8a4
--- /dev/null
+++ b/apps/workers/webhookWorker.ts
@@ -0,0 +1,136 @@
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import fetch from "node-fetch";
+
+import { db } from "@hoarder/db";
+import { bookmarks } from "@hoarder/db/schema";
+import serverConfig from "@hoarder/shared/config";
+import logger from "@hoarder/shared/logger";
+import {
+ WebhookQueue,
+ ZWebhookRequest,
+ zWebhookRequestSchema,
+} from "@hoarder/shared/queues";
+
+export class WebhookWorker {
+ static build() {
+ logger.info("Starting webhook worker ...");
+ const worker = new Runner<ZWebhookRequest>(
+ WebhookQueue,
+ {
+ run: runWebhook,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[webhook][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs:
+ serverConfig.webhook.timeoutSec *
+ (serverConfig.webhook.retryTimes + 1) +
+ 1, //consider retry times, and timeout and add 1 second for other stuff
+ validator: zWebhookRequestSchema,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function fetchBookmark(linkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, linkId),
+ with: {
+ link: true,
+ text: true,
+ asset: true,
+ },
+ });
+}
+
+async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
+ const jobId = job.id;
+ const webhookUrls = serverConfig.webhook.urls;
+ if (!webhookUrls) {
+ logger.info(
+ `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`,
+ );
+ return;
+ }
+ const webhookToken = serverConfig.webhook.token;
+ const webhookTimeoutSec = serverConfig.webhook.timeoutSec;
+
+ const { bookmarkId } = job.data;
+ const bookmark = await fetchBookmark(bookmarkId);
+ if (!bookmark) {
+ throw new Error(
+ `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ );
+ }
+
+ logger.info(
+ `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
+ );
+
+ await Promise.allSettled(
+ webhookUrls.map(async (url) => {
+ const maxRetries = serverConfig.webhook.retryTimes;
+ let attempt = 0;
+ let success = false;
+
+ while (attempt < maxRetries && !success) {
+ try {
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ ...(webhookToken
+ ? {
+ Authorization: `Bearer ${webhookToken}`,
+ }
+ : {}),
+ },
+ body: JSON.stringify({
+ jobId,
+ bookmarkId,
+ userId: bookmark.userId,
+ url: bookmark.link ? bookmark.link.url : undefined,
+ type: bookmark.type,
+ operation: job.data.operation,
+ }),
+ signal: AbortSignal.timeout(webhookTimeoutSec * 1000),
+ });
+
+ if (!response.ok) {
+ logger.error(
+ `Webhook call to ${url} failed with status: ${response.status}`,
+ );
+ } else {
+ logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
+ success = true;
+ }
+ } catch (error) {
+ logger.error(
+ `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`,
+ );
+ }
+ attempt++;
+ if (!success && attempt < maxRetries) {
+ logger.info(
+ `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`,
+ );
+ }
+ }
+ }),
+ );
+}
diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md
index 82438dbf..bf326aff 100644
--- a/docs/docs/03-configuration.md
+++ b/docs/docs/03-configuration.md
@@ -95,3 +95,36 @@ Hoarder uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract t
| OCR_CACHE_DIR | No | $TEMP_DIR | The dir where tesseract will download its models. By default, those models are not persisted and stored in the OS' temp dir. |
| OCR_LANGS | No | eng | Comma separated list of the language codes that you want tesseract to support. You can find the language codes [here](https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html). Set to empty string to disable OCR. |
| OCR_CONFIDENCE_THRESHOLD | No | 50 | A number between 0 and 100 indicating the minimum acceptable confidence from tessaract. If tessaract's confidence is lower than this value, extracted text won't be stored. |
+
+## Webhook Configs
+
+You can use webhooks to trigger actions when bookmarks are changed ( only support _crawled_ now ).
+
+| Name | Required | Default | Description |
+| ------------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- |
+| WEBHOOK_URLS | No | | The urls of the webhooks to trigger, separated by commas. |
+| WEBHOOK_TOKEN | No | | The token to use for authentication. Will appears in the Authorization header as Bearer token. |
+| WEBHOOK_TIMEOUT_SEC | No | 5 | The timeout for the webhook request in seconds. |
+| WEBHOOK_RETRY_TIMES | No | 3 | The number of times to retry the webhook request. |
+
+:::info
+
+- If a url is add to hoarder , after it is crawled, the webhook will be triggered.
+- The WEBHOOK_TOKEN is used for authentication. It will appear in the Authorization header as Bearer token.
+ ```
+ Authorization: Bearer <WEBHOOK_TOKEN>
+ ```
+- The webhook will be triggered with the job id (used for idempotence), bookmark id, bookmark type, the user id, the url and the operation in JSON format in the body.
+
+ ```json
+ {
+ "jobId": 123,
+ "type": "link",
+ "bookmarkId": "exampleBookmarkId",
+ "userId": "exampleUserId",
+ "url": "https://example.com",
+ "operation": "crawled"
+ }
+ ```
+
+ :::
diff --git a/packages/shared/config.ts b/packages/shared/config.ts
index 7b74fc21..df9a5764 100644
--- a/packages/shared/config.ts
+++ b/packages/shared/config.ts
@@ -56,6 +56,14 @@ const allEnv = z.object({
DATA_DIR: z.string().default(""),
MAX_ASSET_SIZE_MB: z.coerce.number().default(4),
INFERENCE_LANG: z.string().default("english"),
+ WEBHOOK_URLS: z
+ .string()
+ .transform((val) => val.split(","))
+ .pipe(z.array(z.string().url()))
+ .optional(),
+ WEBHOOK_TOKEN: z.string().optional(),
+ WEBHOOK_TIMEOUT_SEC: z.coerce.number().default(5),
+ WEBHOOK_RETRY_TIMES: z.coerce.number().int().min(0).default(3),
// Build only flag
SERVER_VERSION: z.string().optional(),
DISABLE_NEW_RELEASE_CHECK: stringBool("false"),
@@ -134,6 +142,12 @@ const serverConfigSchema = allEnv.transform((val) => {
serverVersion: val.SERVER_VERSION,
disableNewReleaseCheck: val.DISABLE_NEW_RELEASE_CHECK,
usingLegacySeparateContainers: val.USING_LEGACY_SEPARATE_CONTAINERS,
+ webhook: {
+ urls: val.WEBHOOK_URLS,
+ token: val.WEBHOOK_TOKEN,
+ timeoutSec: val.WEBHOOK_TIMEOUT_SEC,
+ retryTimes: val.WEBHOOK_RETRY_TIMES,
+ },
};
});
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index 7afb8774..b0002a29 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -158,3 +158,30 @@ export const AssetPreprocessingQueue =
keepFailedJobs: false,
},
);
+
+//Webhook worker
+export const zWebhookRequestSchema = z.object({
+ bookmarkId: z.string(),
+ operation: z.enum(["crawled"]),
+});
+export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
+export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
+ "webhook_queue",
+ queueDB,
+ {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ },
+);
+
+export async function triggerWebhookWorker(
+ bookmarkId: string,
+ operation: "crawled",
+) {
+ await WebhookQueue.enqueue({
+ bookmarkId,
+ operation,
+ });
+}