aboutsummaryrefslogtreecommitdiffstats
path: root/packages
diff options
context:
space:
mode:
Diffstat (limited to 'packages')
-rw-r--r--packages/plugins/queue-liteque/src/index.ts20
-rw-r--r--packages/plugins/queue-restate/src/service.ts49
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts97
-rw-r--r--packages/shared/queueing.ts15
4 files changed, 171 insertions, 10 deletions
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
index 94fa795f..b809d158 100644
--- a/packages/plugins/queue-liteque/src/index.ts
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -4,10 +4,12 @@ import {
SqliteQueue as LQ,
Runner as LQRunner,
migrateDB,
+ RetryAfterError,
} from "liteque";
import type { PluginProvider } from "@karakeep/shared/plugins";
import type {
+ DequeuedJob,
EnqueueOptions,
Queue,
QueueClient,
@@ -17,6 +19,7 @@ import type {
RunnerOptions,
} from "@karakeep/shared/queueing";
import serverConfig from "@karakeep/shared/config";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
class LitequeQueueWrapper<T> implements Queue<T> {
constructor(
@@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient {
throw new Error(`Queue ${name} not found`);
}
+ // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError
+ const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => {
+ try {
+ return await funcs.run(job);
+ } catch (error) {
+ if (error instanceof QueueRetryAfterError) {
+ // Translate to liteque's native RetryAfterError
+ // This will cause liteque to retry after the delay without counting against attempts
+ throw new RetryAfterError(error.delayMs);
+ }
+ // Re-throw any other errors
+ throw error;
+ }
+ };
+
const runner = new LQRunner<T, R>(
wrapper._impl,
{
- run: funcs.run,
+ run: wrappedRun,
onComplete: funcs.onComplete,
onError: funcs.onError,
},
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..2b5716ee 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -6,6 +6,7 @@ import type {
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { genId } from "./idProvider";
@@ -65,7 +66,8 @@ export function buildRestateService<T, R>(
);
let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ let runNumber = 0;
+ while (runNumber <= NUM_RETRIES) {
const acquired = await semaphore.acquire(
priority,
data.groupId,
@@ -83,14 +85,24 @@ export function buildRestateService<T, R>(
abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
});
await semaphore.release();
- if (res.error) {
+
+ if (res.type === "rate_limit") {
+ // Handle rate limit retries without counting against retry attempts
+ await ctx.sleep(res.delayMs, "rate limit retry");
+ // Don't increment runNumber - retry without counting against attempts
+ continue;
+ }
+
+ if (res.type === "error") {
if (res.error instanceof restate.CancelledError) {
throw res.error;
}
lastError = res.error;
// TODO: add backoff
- await ctx.sleep(1000);
+ await ctx.sleep(1000, "error retry");
+ runNumber++;
} else {
+ // Success
break;
}
}
@@ -105,6 +117,11 @@ export function buildRestateService<T, R>(
});
}
+type RunResult<R> =
+ | { type: "success"; value: R }
+ | { type: "rate_limit"; delayMs: number }
+ | { type: "error"; error: Error };
+
async function runWorkerLogic<T, R>(
ctx: restate.Context,
{ run, onError, onComplete }: RunnerFuncs<T, R>,
@@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>(
numRetriesLeft: number;
abortSignal: AbortSignal;
},
-) {
+): Promise<RunResult<R>> {
const res = await tryCatch(
ctx.run(
`main logic`,
async () => {
- return await run(data);
+ const res = await tryCatch(run(data));
+ if (res.error) {
+ if (res.error instanceof QueueRetryAfterError) {
+ return { type: "rate_limit" as const, delayMs: res.error.delayMs };
+ }
+ throw res.error; // Rethrow
+ }
+ return { type: "success" as const, value: res.data };
},
{
maxRetryAttempts: 1,
},
),
);
+
if (res.error) {
await tryCatch(
ctx.run(
@@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>(
},
),
);
- return res;
+ return { type: "error", error: res.error };
+ }
+
+ const result = res.data;
+
+ if (result.type === "rate_limit") {
+ // Don't call onError or onComplete for rate limit retries
+ return result;
}
+ // Success case - call onComplete
await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
+ ctx.run("onComplete", async () => await onComplete?.(data, result.value), {
maxRetryAttempts: 1,
}),
);
- return res;
+ return result;
}
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 2085d57b..7da3f18e 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -10,6 +10,7 @@ import {
} from "vitest";
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { AdminClient } from "../admin";
import { RestateQueueProvider } from "../index";
@@ -49,7 +50,13 @@ type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
| { type: "stall"; durSec: number }
- | { type: "semaphore-acquire" };
+ | { type: "semaphore-acquire" }
+ | {
+ type: "rate-limit";
+ val: number;
+ delayMs: number;
+ attemptsBeforeSuccess: number;
+ };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => {
inFlight: 0,
maxInFlight: 0,
baton: new Baton(),
+ rateLimitAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => {
testState.inFlight = 0;
testState.maxInFlight = 0;
testState.baton = new Baton();
+ testState.rateLimitAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => {
break;
case "semaphore-acquire":
await testState.baton.acquire();
+ break;
+ case "rate-limit": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.rateLimitAttempts.get(attemptKey) || 0;
+ testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ throw new QueueRetryAfterError(
+ `Rate limited (attempt ${currentAttempts + 1})`,
+ jobData.delayMs,
+ );
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
+
+ describe("QueueRetryAfterError handling", () => {
+ it("should retry after delay without counting against retry attempts", async () => {
+ const startTime = Date.now();
+
+ // This job will fail with QueueRetryAfterError twice before succeeding
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 42,
+ delayMs: 500, // 500ms delay
+ attemptsBeforeSuccess: 2, // Fail twice, succeed on third try
+ });
+
+ await waitUntilQueueEmpty();
+
+ const duration = Date.now() - startTime;
+
+ // Should have succeeded
+ expect(testState.results).toEqual([42]);
+
+ // Should have been called 3 times (2 rate limit failures + 1 success)
+ expect(testState.rateLimitAttempts.size).toBe(1);
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(3);
+
+ // Should have waited at least 1 second total (2 x 500ms delays)
+ expect(duration).toBeGreaterThanOrEqual(1000);
+
+ // onError should NOT have been called for rate limit retries
+ expect(testState.errors).toEqual([]);
+ }, 60000);
+
+ it("should not exhaust retries when rate limited", async () => {
+ // This job will be rate limited many more times than the retry limit
+ // but should still eventually succeed
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 100,
+ delayMs: 100, // Short delay for faster test
+ attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit)
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Should have succeeded despite being "retried" more than the limit
+ expect(testState.results).toEqual([100]);
+
+ // Should have been called 11 times (10 rate limit failures + 1 success)
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(11);
+
+ // No errors should have been recorded
+ expect(testState.errors).toEqual([]);
+ }, 90000);
+
+ it("should still respect retry limit for non-rate-limit errors", async () => {
+ // Enqueue a regular error job that should fail permanently
+ await queue.enqueue({ type: "err", err: "Regular error" });
+
+ await waitUntilQueueEmpty();
+
+ // Should have failed 4 times (initial + 3 retries) and not succeeded
+ expect(testState.errors).toEqual([
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ ]);
+ expect(testState.results).toEqual([]);
+ }, 90000);
+ });
});
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
index 0dd6ed6b..bc2c9cfa 100644
--- a/packages/shared/queueing.ts
+++ b/packages/shared/queueing.ts
@@ -2,6 +2,21 @@ import { ZodType } from "zod";
import { PluginManager, PluginType } from "./plugins";
+/**
+ * Special error that indicates a job should be retried after a delay
+ * without counting against the retry attempts limit.
+ * Useful for handling rate limiting scenarios.
+ */
+export class QueueRetryAfterError extends Error {
+ constructor(
+ message: string,
+ public readonly delayMs: number,
+ ) {
+ super(message);
+ this.name = "QueueRetryAfterError";
+ }
+}
+
export interface EnqueueOptions {
idempotencyKey?: string;
priority?: number;