aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/crawlerWorker.ts4
-rw-r--r--apps/workers/index.ts34
-rw-r--r--apps/workers/webhookWorker.ts136
3 files changed, 163 insertions, 11 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index 16b1f4ae..9666299d 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -55,6 +55,7 @@ import {
OpenAIQueue,
triggerSearchReindex,
triggerVideoWorker,
+ triggerWebhookWorker,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
@@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Trigger a potential download of a video from the URL
await triggerVideoWorker(bookmarkId, url);
+ // Trigger a webhook
+ await triggerWebhookWorker(bookmarkId, "crawled");
+
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index c2d3f28a..3997b423 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit";
import { OpenAiWorker } from "./openaiWorker";
import { SearchIndexingWorker } from "./searchWorker";
import { VideoWorker } from "./videoWorker";
+import { WebhookWorker } from "./webhookWorker";
async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
- [
- await CrawlerWorker.build(),
- OpenAiWorker.build(),
- SearchIndexingWorker.build(),
- TidyAssetsWorker.build(),
- VideoWorker.build(),
- FeedWorker.build(),
- AssetPreprocessingWorker.build(),
- ];
+ const [
+ crawler,
+ openai,
+ search,
+ tidyAssets,
+ video,
+ feed,
+ assetPreprocessing,
+ webhook,
+ ] = [
+ await CrawlerWorker.build(),
+ OpenAiWorker.build(),
+ SearchIndexingWorker.build(),
+ TidyAssetsWorker.build(),
+ VideoWorker.build(),
+ FeedWorker.build(),
+ AssetPreprocessingWorker.build(),
+ WebhookWorker.build(),
+ ];
FeedRefreshingWorker.start();
await Promise.any([
@@ -39,11 +49,12 @@ async function main() {
video.run(),
feed.run(),
assetPreprocessing.run(),
+ webhook.run(),
]),
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
+ "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
);
FeedRefreshingWorker.stop();
@@ -54,6 +65,7 @@ async function main() {
video.stop();
feed.stop();
assetPreprocessing.stop();
+ webhook.stop();
}
main();
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts
new file mode 100644
index 00000000..5124f8a4
--- /dev/null
+++ b/apps/workers/webhookWorker.ts
@@ -0,0 +1,136 @@
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import fetch from "node-fetch";
+
+import { db } from "@hoarder/db";
+import { bookmarks } from "@hoarder/db/schema";
+import serverConfig from "@hoarder/shared/config";
+import logger from "@hoarder/shared/logger";
+import {
+ WebhookQueue,
+ ZWebhookRequest,
+ zWebhookRequestSchema,
+} from "@hoarder/shared/queues";
+
+export class WebhookWorker {
+ static build() {
+ logger.info("Starting webhook worker ...");
+ const worker = new Runner<ZWebhookRequest>(
+ WebhookQueue,
+ {
+ run: runWebhook,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[webhook][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs:
+ serverConfig.webhook.timeoutSec *
+ (serverConfig.webhook.retryTimes + 1) +
+ 1, //consider retry times, and timeout and add 1 second for other stuff
+ validator: zWebhookRequestSchema,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function fetchBookmark(linkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, linkId),
+ with: {
+ link: true,
+ text: true,
+ asset: true,
+ },
+ });
+}
+
+async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
+ const jobId = job.id;
+ const webhookUrls = serverConfig.webhook.urls;
+ if (!webhookUrls) {
+ logger.info(
+ `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`,
+ );
+ return;
+ }
+ const webhookToken = serverConfig.webhook.token;
+ const webhookTimeoutSec = serverConfig.webhook.timeoutSec;
+
+ const { bookmarkId } = job.data;
+ const bookmark = await fetchBookmark(bookmarkId);
+ if (!bookmark) {
+ throw new Error(
+ `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ );
+ }
+
+ logger.info(
+ `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
+ );
+
+ await Promise.allSettled(
+ webhookUrls.map(async (url) => {
+ const maxRetries = serverConfig.webhook.retryTimes;
+ let attempt = 0;
+ let success = false;
+
+ while (attempt < maxRetries && !success) {
+ try {
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ ...(webhookToken
+ ? {
+ Authorization: `Bearer ${webhookToken}`,
+ }
+ : {}),
+ },
+ body: JSON.stringify({
+ jobId,
+ bookmarkId,
+ userId: bookmark.userId,
+ url: bookmark.link ? bookmark.link.url : undefined,
+ type: bookmark.type,
+ operation: job.data.operation,
+ }),
+ signal: AbortSignal.timeout(webhookTimeoutSec * 1000),
+ });
+
+ if (!response.ok) {
+ logger.error(
+ `Webhook call to ${url} failed with status: ${response.status}`,
+ );
+ } else {
+ logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
+ success = true;
+ }
+ } catch (error) {
+ logger.error(
+ `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`,
+ );
+ }
+ attempt++;
+ if (!success && attempt < maxRetries) {
+ logger.info(
+ `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`,
+ );
+ }
+ }
+ }),
+ );
+}