diff options
Diffstat (limited to 'packages/plugins/queue-restate/src/service.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 49 |
1 files changed, 41 insertions, 8 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..2b5716ee 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -6,6 +6,7 @@ import type { RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; @@ -65,7 +66,8 @@ export function buildRestateService<T, R>( ); let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { const acquired = await semaphore.acquire( priority, data.groupId, @@ -83,14 +85,24 @@ export function buildRestateService<T, R>( abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), }); await semaphore.release(); - if (res.error) { + + if (res.type === "rate_limit") { + // Handle rate limit retries without counting against retry attempts + await ctx.sleep(res.delayMs, "rate limit retry"); + // Don't increment runNumber - retry without counting against attempts + continue; + } + + if (res.type === "error") { if (res.error instanceof restate.CancelledError) { throw res.error; } lastError = res.error; // TODO: add backoff - await ctx.sleep(1000); + await ctx.sleep(1000, "error retry"); + runNumber++; } else { + // Success break; } } @@ -105,6 +117,11 @@ export function buildRestateService<T, R>( }); } +type RunResult<R> = + | { type: "success"; value: R } + | { type: "rate_limit"; delayMs: number } + | { type: "error"; error: Error }; + async function runWorkerLogic<T, R>( ctx: restate.Context, { run, onError, onComplete }: RunnerFuncs<T, R>, @@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>( numRetriesLeft: number; abortSignal: AbortSignal; }, -) { +): Promise<RunResult<R>> { const res = await tryCatch( ctx.run( `main logic`, async () => { - return await run(data); + const res = await tryCatch(run(data)); + if (res.error) { + if (res.error instanceof QueueRetryAfterError) { + return { type: "rate_limit" as const, delayMs: res.error.delayMs }; + } + throw res.error; // Rethrow + } + return { type: "success" as const, value: res.data }; }, { maxRetryAttempts: 1, }, ), ); + if (res.error) { await tryCatch( ctx.run( @@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>( }, ), ); - return res; + return { type: "error", error: res.error }; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Don't call onError or onComplete for rate limit retries + return result; } + // Success case - call onComplete await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data, res.data), { + ctx.run("onComplete", async () => await onComplete?.(data, result.value), { maxRetryAttempts: 1, }), ); - return res; + return result; } |
