diff options
Diffstat (limited to 'packages/plugins/queue-liteque/src')
| -rw-r--r-- | packages/plugins/queue-liteque/src/index.ts | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index 94fa795f..b809d158 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -4,10 +4,12 @@ import { SqliteQueue as LQ, Runner as LQRunner, migrateDB, + RetryAfterError, } from "liteque"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { + DequeuedJob, EnqueueOptions, Queue, QueueClient, @@ -17,6 +19,7 @@ import type { RunnerOptions, } from "@karakeep/shared/queueing"; import serverConfig from "@karakeep/shared/config"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; class LitequeQueueWrapper<T> implements Queue<T> { constructor( @@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient { throw new Error(`Queue ${name} not found`); } + // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError + const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => { + try { + return await funcs.run(job); + } catch (error) { + if (error instanceof QueueRetryAfterError) { + // Translate to liteque's native RetryAfterError + // This will cause liteque to retry after the delay without counting against attempts + throw new RetryAfterError(error.delayMs); + } + // Re-throw any other errors + throw error; + } + }; + const runner = new LQRunner<T, R>( wrapper._impl, { - run: funcs.run, + run: wrappedRun, onComplete: funcs.onComplete, onError: funcs.onError, }, |
