aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/workers/workers/crawlerWorker.ts51
-rw-r--r--packages/plugins/queue-liteque/src/index.ts20
-rw-r--r--packages/plugins/queue-restate/src/service.ts49
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts97
-rw-r--r--packages/shared/queueing.ts15
5 files changed, 184 insertions, 48 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index aedf4aa0..3591474e 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -77,6 +77,7 @@ import {
DequeuedJob,
EnqueueOptions,
getQueueClient,
+ QueueRetryAfterError,
} from "@karakeep/shared/queueing";
import { getRateLimitClient } from "@karakeep/shared/ratelimiting";
import { tryCatch } from "@karakeep/shared/tryCatch";
@@ -187,7 +188,7 @@ const cookieSchema = z.object({
const cookiesSchema = z.array(cookieSchema);
interface CrawlerRunResult {
- status: "completed" | "rescheduled";
+ status: "completed";
}
function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] {
@@ -325,13 +326,7 @@ export class CrawlerWorker {
LinkCrawlerQueue,
{
run: runCrawler,
- onComplete: async (job, result) => {
- if (result.status === "rescheduled") {
- logger.info(
- `[Crawler][${job.id}] Rescheduled due to domain rate limiting`,
- );
- return;
- }
+ onComplete: async (job) => {
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
@@ -1308,24 +1303,18 @@ async function crawlAndParseUrl(
}
/**
- * Checks if the domain should be rate limited and reschedules the job if needed.
- * @returns true if the job should continue, false if it was rescheduled
+ * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed.
+ * @throws {QueueRetryAfterError} if the domain is rate limited
*/
-async function checkDomainRateLimit(
- url: string,
- jobId: string,
- jobData: ZCrawlLinkRequest,
- userId: string,
- jobPriority?: number,
-): Promise<boolean> {
+async function checkDomainRateLimit(url: string, jobId: string): Promise<void> {
const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
if (!crawlerDomainRateLimitConfig) {
- return true;
+ return;
}
const rateLimitClient = await getRateLimitClient();
if (!rateLimitClient) {
- return true;
+ return;
}
const hostname = new URL(url).hostname;
@@ -1344,17 +1333,13 @@ async function checkDomainRateLimit(
const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
logger.info(
- `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
+ `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
);
- await LinkCrawlerQueue.enqueue(jobData, {
- priority: jobPriority,
+ throw new QueueRetryAfterError(
+ `Domain "${hostname}" is rate limited`,
delayMs,
- groupId: userId,
- });
- return false;
+ );
}
-
- return true;
}
async function runCrawler(
@@ -1381,17 +1366,7 @@ async function runCrawler(
precrawledArchiveAssetId,
} = await getBookmarkDetails(bookmarkId);
- const shouldContinue = await checkDomainRateLimit(
- url,
- jobId,
- job.data,
- userId,
- job.priority,
- );
-
- if (!shouldContinue) {
- return { status: "rescheduled" };
- }
+ await checkDomainRateLimit(url, jobId);
logger.info(
`[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
index 94fa795f..b809d158 100644
--- a/packages/plugins/queue-liteque/src/index.ts
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -4,10 +4,12 @@ import {
SqliteQueue as LQ,
Runner as LQRunner,
migrateDB,
+ RetryAfterError,
} from "liteque";
import type { PluginProvider } from "@karakeep/shared/plugins";
import type {
+ DequeuedJob,
EnqueueOptions,
Queue,
QueueClient,
@@ -17,6 +19,7 @@ import type {
RunnerOptions,
} from "@karakeep/shared/queueing";
import serverConfig from "@karakeep/shared/config";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
class LitequeQueueWrapper<T> implements Queue<T> {
constructor(
@@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient {
throw new Error(`Queue ${name} not found`);
}
+ // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError
+ const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => {
+ try {
+ return await funcs.run(job);
+ } catch (error) {
+ if (error instanceof QueueRetryAfterError) {
+ // Translate to liteque's native RetryAfterError
+ // This will cause liteque to retry after the delay without counting against attempts
+ throw new RetryAfterError(error.delayMs);
+ }
+ // Re-throw any other errors
+ throw error;
+ }
+ };
+
const runner = new LQRunner<T, R>(
wrapper._impl,
{
- run: funcs.run,
+ run: wrappedRun,
onComplete: funcs.onComplete,
onError: funcs.onError,
},
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..2b5716ee 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -6,6 +6,7 @@ import type {
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { genId } from "./idProvider";
@@ -65,7 +66,8 @@ export function buildRestateService<T, R>(
);
let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ let runNumber = 0;
+ while (runNumber <= NUM_RETRIES) {
const acquired = await semaphore.acquire(
priority,
data.groupId,
@@ -83,14 +85,24 @@ export function buildRestateService<T, R>(
abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
});
await semaphore.release();
- if (res.error) {
+
+ if (res.type === "rate_limit") {
+ // Handle rate limit retries without counting against retry attempts
+ await ctx.sleep(res.delayMs, "rate limit retry");
+ // Don't increment runNumber - retry without counting against attempts
+ continue;
+ }
+
+ if (res.type === "error") {
if (res.error instanceof restate.CancelledError) {
throw res.error;
}
lastError = res.error;
// TODO: add backoff
- await ctx.sleep(1000);
+ await ctx.sleep(1000, "error retry");
+ runNumber++;
} else {
+ // Success
break;
}
}
@@ -105,6 +117,11 @@ export function buildRestateService<T, R>(
});
}
+type RunResult<R> =
+ | { type: "success"; value: R }
+ | { type: "rate_limit"; delayMs: number }
+ | { type: "error"; error: Error };
+
async function runWorkerLogic<T, R>(
ctx: restate.Context,
{ run, onError, onComplete }: RunnerFuncs<T, R>,
@@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>(
numRetriesLeft: number;
abortSignal: AbortSignal;
},
-) {
+): Promise<RunResult<R>> {
const res = await tryCatch(
ctx.run(
`main logic`,
async () => {
- return await run(data);
+ const res = await tryCatch(run(data));
+ if (res.error) {
+ if (res.error instanceof QueueRetryAfterError) {
+ return { type: "rate_limit" as const, delayMs: res.error.delayMs };
+ }
+ throw res.error; // Rethrow
+ }
+ return { type: "success" as const, value: res.data };
},
{
maxRetryAttempts: 1,
},
),
);
+
if (res.error) {
await tryCatch(
ctx.run(
@@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>(
},
),
);
- return res;
+ return { type: "error", error: res.error };
+ }
+
+ const result = res.data;
+
+ if (result.type === "rate_limit") {
+ // Don't call onError or onComplete for rate limit retries
+ return result;
}
+ // Success case - call onComplete
await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
+ ctx.run("onComplete", async () => await onComplete?.(data, result.value), {
maxRetryAttempts: 1,
}),
);
- return res;
+ return result;
}
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 2085d57b..7da3f18e 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -10,6 +10,7 @@ import {
} from "vitest";
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { AdminClient } from "../admin";
import { RestateQueueProvider } from "../index";
@@ -49,7 +50,13 @@ type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
| { type: "stall"; durSec: number }
- | { type: "semaphore-acquire" };
+ | { type: "semaphore-acquire" }
+ | {
+ type: "rate-limit";
+ val: number;
+ delayMs: number;
+ attemptsBeforeSuccess: number;
+ };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => {
inFlight: 0,
maxInFlight: 0,
baton: new Baton(),
+ rateLimitAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => {
testState.inFlight = 0;
testState.maxInFlight = 0;
testState.baton = new Baton();
+ testState.rateLimitAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => {
break;
case "semaphore-acquire":
await testState.baton.acquire();
+ break;
+ case "rate-limit": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.rateLimitAttempts.get(attemptKey) || 0;
+ testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ throw new QueueRetryAfterError(
+ `Rate limited (attempt ${currentAttempts + 1})`,
+ jobData.delayMs,
+ );
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
+
+ describe("QueueRetryAfterError handling", () => {
+ it("should retry after delay without counting against retry attempts", async () => {
+ const startTime = Date.now();
+
+ // This job will fail with QueueRetryAfterError twice before succeeding
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 42,
+ delayMs: 500, // 500ms delay
+ attemptsBeforeSuccess: 2, // Fail twice, succeed on third try
+ });
+
+ await waitUntilQueueEmpty();
+
+ const duration = Date.now() - startTime;
+
+ // Should have succeeded
+ expect(testState.results).toEqual([42]);
+
+ // Should have been called 3 times (2 rate limit failures + 1 success)
+ expect(testState.rateLimitAttempts.size).toBe(1);
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(3);
+
+ // Should have waited at least 1 second total (2 x 500ms delays)
+ expect(duration).toBeGreaterThanOrEqual(1000);
+
+ // onError should NOT have been called for rate limit retries
+ expect(testState.errors).toEqual([]);
+ }, 60000);
+
+ it("should not exhaust retries when rate limited", async () => {
+ // This job will be rate limited many more times than the retry limit
+ // but should still eventually succeed
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 100,
+ delayMs: 100, // Short delay for faster test
+ attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit)
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Should have succeeded despite being "retried" more than the limit
+ expect(testState.results).toEqual([100]);
+
+ // Should have been called 11 times (10 rate limit failures + 1 success)
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(11);
+
+ // No errors should have been recorded
+ expect(testState.errors).toEqual([]);
+ }, 90000);
+
+ it("should still respect retry limit for non-rate-limit errors", async () => {
+ // Enqueue a regular error job that should fail permanently
+ await queue.enqueue({ type: "err", err: "Regular error" });
+
+ await waitUntilQueueEmpty();
+
+ // Should have failed 4 times (initial + 3 retries) and not succeeded
+ expect(testState.errors).toEqual([
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ ]);
+ expect(testState.results).toEqual([]);
+ }, 90000);
+ });
});
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
index 0dd6ed6b..bc2c9cfa 100644
--- a/packages/shared/queueing.ts
+++ b/packages/shared/queueing.ts
@@ -2,6 +2,21 @@ import { ZodType } from "zod";
import { PluginManager, PluginType } from "./plugins";
+/**
+ * Special error that indicates a job should be retried after a delay
+ * without counting against the retry attempts limit.
+ * Useful for handling rate limiting scenarios.
+ */
+export class QueueRetryAfterError extends Error {
+ constructor(
+ message: string,
+ public readonly delayMs: number,
+ ) {
+ super(message);
+ this.name = "QueueRetryAfterError";
+ }
+}
+
export interface EnqueueOptions {
idempotencyKey?: string;
priority?: number;