aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/service.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-restate/src/service.ts')
-rw-r--r--packages/plugins/queue-restate/src/service.ts49
1 files changed, 41 insertions, 8 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..2b5716ee 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -6,6 +6,7 @@ import type {
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { genId } from "./idProvider";
@@ -65,7 +66,8 @@ export function buildRestateService<T, R>(
);
let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ let runNumber = 0;
+ while (runNumber <= NUM_RETRIES) {
const acquired = await semaphore.acquire(
priority,
data.groupId,
@@ -83,14 +85,24 @@ export function buildRestateService<T, R>(
abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
});
await semaphore.release();
- if (res.error) {
+
+ if (res.type === "rate_limit") {
+ // Handle rate limit retries without counting against retry attempts
+ await ctx.sleep(res.delayMs, "rate limit retry");
+ // Don't increment runNumber - retry without counting against attempts
+ continue;
+ }
+
+ if (res.type === "error") {
if (res.error instanceof restate.CancelledError) {
throw res.error;
}
lastError = res.error;
// TODO: add backoff
- await ctx.sleep(1000);
+ await ctx.sleep(1000, "error retry");
+ runNumber++;
} else {
+ // Success
break;
}
}
@@ -105,6 +117,11 @@ export function buildRestateService<T, R>(
});
}
+type RunResult<R> =
+ | { type: "success"; value: R }
+ | { type: "rate_limit"; delayMs: number }
+ | { type: "error"; error: Error };
+
async function runWorkerLogic<T, R>(
ctx: restate.Context,
{ run, onError, onComplete }: RunnerFuncs<T, R>,
@@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>(
numRetriesLeft: number;
abortSignal: AbortSignal;
},
-) {
+): Promise<RunResult<R>> {
const res = await tryCatch(
ctx.run(
`main logic`,
async () => {
- return await run(data);
+ const res = await tryCatch(run(data));
+ if (res.error) {
+ if (res.error instanceof QueueRetryAfterError) {
+ return { type: "rate_limit" as const, delayMs: res.error.delayMs };
+ }
+ throw res.error; // Rethrow
+ }
+ return { type: "success" as const, value: res.data };
},
{
maxRetryAttempts: 1,
},
),
);
+
if (res.error) {
await tryCatch(
ctx.run(
@@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>(
},
),
);
- return res;
+ return { type: "error", error: res.error };
+ }
+
+ const result = res.data;
+
+ if (result.type === "rate_limit") {
+ // Don't call onError or onComplete for rate limit retries
+ return result;
}
+ // Success case - call onComplete
await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
+ ctx.run("onComplete", async () => await onComplete?.(data, result.value), {
maxRetryAttempts: 1,
}),
);
- return res;
+ return result;
}