diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-09 20:13:39 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-11-09 20:13:39 +0000 |
| commit | b28cd03a4a5f95f429a1429a59319c8a9ac986f8 (patch) | |
| tree | f8b6a50338675bd0073643b5ff31fe447795fad4 /packages/plugins/queue-restate/src | |
| parent | 03161482b44bd67f6eafb3e3d51107811b638d4b (diff) | |
| download | karakeep-b28cd03a4a5f95f429a1429a59319c8a9ac986f8.tar.zst | |
refactor: Allow runner functions to return results to onComplete
Diffstat (limited to 'packages/plugins/queue-restate/src')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 4 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 12 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 8 |
3 files changed, 13 insertions, 11 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index bedc26af..98668872 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -162,9 +162,9 @@ class RestateQueueClient implements QueueClient { return wrapper; } - createRunner<T>( + createRunner<T, R = void>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, ): Runner<T> { const name = queue.name(); diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index de5b070f..d19cad4c 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -11,9 +11,9 @@ import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; import { RestateSemaphore } from "./semaphore"; -export function buildRestateService<T>( +export function buildRestateService<T, R>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, queueOpts: QueueOptions, ) { @@ -84,9 +84,9 @@ export function buildRestateService<T>( }); } -async function runWorkerLogic<T>( +async function runWorkerLogic<T, R>( ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs<T>, + { run, onError, onComplete }: RunnerFuncs<T, R>, data: { id: string; data: T; @@ -100,7 +100,7 @@ async function runWorkerLogic<T>( ctx.run( `main logic`, async () => { - await run(data); + return await run(data); }, { maxRetryAttempts: 1, @@ -125,7 +125,7 @@ async function runWorkerLogic<T>( } await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { + ctx.run("onComplete", async () => await onComplete?.(data, res.data), { maxRetryAttempts: 1, }), ); diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index e59d47cb..d716671b 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -90,8 +90,7 @@ describe("Restate Queue Provider", () => { const jobData = job.data; switch (jobData.type) { case "val": - testState.results.push(jobData.val); - break; + return jobData.val; case "err": throw new Error(jobData.err); case "stall": @@ -108,8 +107,11 @@ describe("Restate Queue Provider", () => { testState.errors.push(jobData.err); } }, - onComplete: async () => { + onComplete: async (_j, res) => { testState.inFlight--; + if (res) { + testState.results.push(res); + } }, }, { |
