diff options
Diffstat (limited to 'packages/plugins')
| -rw-r--r-- | packages/plugins/queue-liteque/src/index.ts | 20 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 49 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 97 |
3 files changed, 156 insertions, 10 deletions
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index 94fa795f..b809d158 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -4,10 +4,12 @@ import { SqliteQueue as LQ, Runner as LQRunner, migrateDB, + RetryAfterError, } from "liteque"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { + DequeuedJob, EnqueueOptions, Queue, QueueClient, @@ -17,6 +19,7 @@ import type { RunnerOptions, } from "@karakeep/shared/queueing"; import serverConfig from "@karakeep/shared/config"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; class LitequeQueueWrapper<T> implements Queue<T> { constructor( @@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient { throw new Error(`Queue ${name} not found`); } + // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError + const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => { + try { + return await funcs.run(job); + } catch (error) { + if (error instanceof QueueRetryAfterError) { + // Translate to liteque's native RetryAfterError + // This will cause liteque to retry after the delay without counting against attempts + throw new RetryAfterError(error.delayMs); + } + // Re-throw any other errors + throw error; + } + }; + const runner = new LQRunner<T, R>( wrapper._impl, { - run: funcs.run, + run: wrappedRun, onComplete: funcs.onComplete, onError: funcs.onError, }, diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..2b5716ee 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -6,6 +6,7 @@ import type { RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; @@ -65,7 +66,8 @@ export function buildRestateService<T, R>( ); let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { const acquired = await semaphore.acquire( priority, data.groupId, @@ -83,14 +85,24 @@ export function buildRestateService<T, R>( abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), }); await semaphore.release(); - if (res.error) { + + if (res.type === "rate_limit") { + // Handle rate limit retries without counting against retry attempts + await ctx.sleep(res.delayMs, "rate limit retry"); + // Don't increment runNumber - retry without counting against attempts + continue; + } + + if (res.type === "error") { if (res.error instanceof restate.CancelledError) { throw res.error; } lastError = res.error; // TODO: add backoff - await ctx.sleep(1000); + await ctx.sleep(1000, "error retry"); + runNumber++; } else { + // Success break; } } @@ -105,6 +117,11 @@ export function buildRestateService<T, R>( }); } +type RunResult<R> = + | { type: "success"; value: R } + | { type: "rate_limit"; delayMs: number } + | { type: "error"; error: Error }; + async function runWorkerLogic<T, R>( ctx: restate.Context, { run, onError, onComplete }: RunnerFuncs<T, R>, @@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>( numRetriesLeft: number; abortSignal: AbortSignal; }, -) { +): Promise<RunResult<R>> { const res = await tryCatch( ctx.run( `main logic`, async () => { - return await run(data); + const res = await tryCatch(run(data)); + if (res.error) { + if (res.error instanceof QueueRetryAfterError) { + return { type: "rate_limit" as const, delayMs: res.error.delayMs }; + } + throw res.error; // Rethrow + } + return { type: "success" as const, value: res.data }; }, { maxRetryAttempts: 1, }, ), ); + if (res.error) { await tryCatch( ctx.run( @@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>( }, ), ); - return res; + return { type: "error", error: res.error }; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Don't call onError or onComplete for rate limit retries + return result; } + // Success case - call onComplete await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data, res.data), { + ctx.run("onComplete", async () => await onComplete?.(data, result.value), { maxRetryAttempts: 1, }), ); - return res; + return result; } diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..7da3f18e 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,13 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); }); |
