aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/workers/crawlerWorker.ts51
1 files changed, 13 insertions, 38 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index aedf4aa0..3591474e 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -77,6 +77,7 @@ import {
DequeuedJob,
EnqueueOptions,
getQueueClient,
+ QueueRetryAfterError,
} from "@karakeep/shared/queueing";
import { getRateLimitClient } from "@karakeep/shared/ratelimiting";
import { tryCatch } from "@karakeep/shared/tryCatch";
@@ -187,7 +188,7 @@ const cookieSchema = z.object({
const cookiesSchema = z.array(cookieSchema);
interface CrawlerRunResult {
- status: "completed" | "rescheduled";
+ status: "completed";
}
function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] {
@@ -325,13 +326,7 @@ export class CrawlerWorker {
LinkCrawlerQueue,
{
run: runCrawler,
- onComplete: async (job, result) => {
- if (result.status === "rescheduled") {
- logger.info(
- `[Crawler][${job.id}] Rescheduled due to domain rate limiting`,
- );
- return;
- }
+ onComplete: async (job) => {
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
@@ -1308,24 +1303,18 @@ async function crawlAndParseUrl(
}
/**
- * Checks if the domain should be rate limited and reschedules the job if needed.
- * @returns true if the job should continue, false if it was rescheduled
+ * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed.
+ * @throws {QueueRetryAfterError} if the domain is rate limited
*/
-async function checkDomainRateLimit(
- url: string,
- jobId: string,
- jobData: ZCrawlLinkRequest,
- userId: string,
- jobPriority?: number,
-): Promise<boolean> {
+async function checkDomainRateLimit(url: string, jobId: string): Promise<void> {
const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
if (!crawlerDomainRateLimitConfig) {
- return true;
+ return;
}
const rateLimitClient = await getRateLimitClient();
if (!rateLimitClient) {
- return true;
+ return;
}
const hostname = new URL(url).hostname;
@@ -1344,17 +1333,13 @@ async function checkDomainRateLimit(
const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
logger.info(
- `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
+ `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
);
- await LinkCrawlerQueue.enqueue(jobData, {
- priority: jobPriority,
+ throw new QueueRetryAfterError(
+ `Domain "${hostname}" is rate limited`,
delayMs,
- groupId: userId,
- });
- return false;
+ );
}
-
- return true;
}
async function runCrawler(
@@ -1381,17 +1366,7 @@ async function runCrawler(
precrawledArchiveAssetId,
} = await getBookmarkDetails(bookmarkId);
- const shouldContinue = await checkDomainRateLimit(
- url,
- jobId,
- job.data,
- userId,
- job.priority,
- );
-
- if (!shouldContinue) {
- return { status: "rescheduled" };
- }
+ await checkDomainRateLimit(url, jobId);
logger.info(
`[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,