aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-liteque
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-liteque')
-rw-r--r--packages/plugins/queue-liteque/src/index.ts20
1 files changed, 19 insertions, 1 deletions
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
index 94fa795f..b809d158 100644
--- a/packages/plugins/queue-liteque/src/index.ts
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -4,10 +4,12 @@ import {
SqliteQueue as LQ,
Runner as LQRunner,
migrateDB,
+ RetryAfterError,
} from "liteque";
import type { PluginProvider } from "@karakeep/shared/plugins";
import type {
+ DequeuedJob,
EnqueueOptions,
Queue,
QueueClient,
@@ -17,6 +19,7 @@ import type {
RunnerOptions,
} from "@karakeep/shared/queueing";
import serverConfig from "@karakeep/shared/config";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
class LitequeQueueWrapper<T> implements Queue<T> {
constructor(
@@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient {
throw new Error(`Queue ${name} not found`);
}
+ // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError
+ const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => {
+ try {
+ return await funcs.run(job);
+ } catch (error) {
+ if (error instanceof QueueRetryAfterError) {
+ // Translate to liteque's native RetryAfterError
+ // This will cause liteque to retry after the delay without counting against attempts
+ throw new RetryAfterError(error.delayMs);
+ }
+ // Re-throw any other errors
+ throw error;
+ }
+ };
+
const runner = new LQRunner<T, R>(
wrapper._impl,
{
- run: funcs.run,
+ run: wrappedRun,
onComplete: funcs.onComplete,
onError: funcs.onError,
},