aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/webhookWorker.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers/webhookWorker.ts')
-rw-r--r--apps/workers/webhookWorker.ts136
1 files changed, 136 insertions, 0 deletions
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts
new file mode 100644
index 00000000..5124f8a4
--- /dev/null
+++ b/apps/workers/webhookWorker.ts
@@ -0,0 +1,136 @@
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import fetch from "node-fetch";
+
+import { db } from "@hoarder/db";
+import { bookmarks } from "@hoarder/db/schema";
+import serverConfig from "@hoarder/shared/config";
+import logger from "@hoarder/shared/logger";
+import {
+ WebhookQueue,
+ ZWebhookRequest,
+ zWebhookRequestSchema,
+} from "@hoarder/shared/queues";
+
+export class WebhookWorker {
+ static build() {
+ logger.info("Starting webhook worker ...");
+ const worker = new Runner<ZWebhookRequest>(
+ WebhookQueue,
+ {
+ run: runWebhook,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[webhook][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs:
+ serverConfig.webhook.timeoutSec *
+ (serverConfig.webhook.retryTimes + 1) +
+ 1, //consider retry times, and timeout and add 1 second for other stuff
+ validator: zWebhookRequestSchema,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function fetchBookmark(linkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, linkId),
+ with: {
+ link: true,
+ text: true,
+ asset: true,
+ },
+ });
+}
+
+async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
+ const jobId = job.id;
+ const webhookUrls = serverConfig.webhook.urls;
+ if (!webhookUrls) {
+ logger.info(
+ `[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`,
+ );
+ return;
+ }
+ const webhookToken = serverConfig.webhook.token;
+ const webhookTimeoutSec = serverConfig.webhook.timeoutSec;
+
+ const { bookmarkId } = job.data;
+ const bookmark = await fetchBookmark(bookmarkId);
+ if (!bookmark) {
+ throw new Error(
+ `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ );
+ }
+
+ logger.info(
+ `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
+ );
+
+ await Promise.allSettled(
+ webhookUrls.map(async (url) => {
+ const maxRetries = serverConfig.webhook.retryTimes;
+ let attempt = 0;
+ let success = false;
+
+ while (attempt < maxRetries && !success) {
+ try {
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ ...(webhookToken
+ ? {
+ Authorization: `Bearer ${webhookToken}`,
+ }
+ : {}),
+ },
+ body: JSON.stringify({
+ jobId,
+ bookmarkId,
+ userId: bookmark.userId,
+ url: bookmark.link ? bookmark.link.url : undefined,
+ type: bookmark.type,
+ operation: job.data.operation,
+ }),
+ signal: AbortSignal.timeout(webhookTimeoutSec * 1000),
+ });
+
+ if (!response.ok) {
+ logger.error(
+ `Webhook call to ${url} failed with status: ${response.status}`,
+ );
+ } else {
+ logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
+ success = true;
+ }
+ } catch (error) {
+ logger.error(
+ `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`,
+ );
+ }
+ attempt++;
+ if (!success && attempt < maxRetries) {
+ logger.info(
+ `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`,
+ );
+ }
+ }
+ }),
+ );
+}