From b28cd03a4a5f95f429a1429a59319c8a9ac986f8 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 9 Nov 2025 20:13:39 +0000 Subject: refactor: Allow runner functions to return results to onComplete --- packages/plugins/package.json | 2 +- packages/plugins/queue-liteque/src/index.ts | 16 +++++----------- packages/plugins/queue-restate/src/index.ts | 4 ++-- packages/plugins/queue-restate/src/service.ts | 12 ++++++------ packages/plugins/queue-restate/src/tests/queue.test.ts | 8 +++++--- 5 files changed, 19 insertions(+), 23 deletions(-) (limited to 'packages/plugins') diff --git a/packages/plugins/package.json b/packages/plugins/package.json index 5931d1a7..daf46843 100644 --- a/packages/plugins/package.json +++ b/packages/plugins/package.json @@ -22,7 +22,7 @@ "@karakeep/shared": "workspace:*", "@restatedev/restate-sdk": "^1.9.0", "@restatedev/restate-sdk-clients": "^1.9.0", - "liteque": "^0.6.2", + "liteque": "^0.7.0", "meilisearch": "^0.45.0" }, "devDependencies": { diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index ddc2181c..94fa795f 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -8,8 +8,6 @@ import { import type { PluginProvider } from "@karakeep/shared/plugins"; import type { - DequeuedJob, - DequeuedJobError, EnqueueOptions, Queue, QueueClient, @@ -82,9 +80,9 @@ class LitequeQueueClient implements QueueClient { return wrapper; } - createRunner( + createRunner( queue: Queue, - funcs: RunnerFuncs, + funcs: RunnerFuncs, opts: RunnerOptions, ): Runner { const name = queue.name(); @@ -93,16 +91,12 @@ class LitequeQueueClient implements QueueClient { throw new Error(`Queue ${name} not found`); } - const runner = new LQRunner( + const runner = new LQRunner( wrapper._impl, { run: funcs.run, - onComplete: funcs.onComplete as - | ((job: DequeuedJob) => Promise) - | undefined, - onError: funcs.onError as - | ((job: DequeuedJobError) => Promise) - | undefined, + onComplete: funcs.onComplete, + onError: funcs.onError, }, { pollIntervalMs: opts.pollIntervalMs ?? 1000, diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index bedc26af..98668872 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -162,9 +162,9 @@ class RestateQueueClient implements QueueClient { return wrapper; } - createRunner( + createRunner( queue: Queue, - funcs: RunnerFuncs, + funcs: RunnerFuncs, opts: RunnerOptions, ): Runner { const name = queue.name(); diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index de5b070f..d19cad4c 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -11,9 +11,9 @@ import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; import { RestateSemaphore } from "./semaphore"; -export function buildRestateService( +export function buildRestateService( queue: Queue, - funcs: RunnerFuncs, + funcs: RunnerFuncs, opts: RunnerOptions, queueOpts: QueueOptions, ) { @@ -84,9 +84,9 @@ export function buildRestateService( }); } -async function runWorkerLogic( +async function runWorkerLogic( ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs, + { run, onError, onComplete }: RunnerFuncs, data: { id: string; data: T; @@ -100,7 +100,7 @@ async function runWorkerLogic( ctx.run( `main logic`, async () => { - await run(data); + return await run(data); }, { maxRetryAttempts: 1, @@ -125,7 +125,7 @@ async function runWorkerLogic( } await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { + ctx.run("onComplete", async () => await onComplete?.(data, res.data), { maxRetryAttempts: 1, }), ); diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index e59d47cb..d716671b 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -90,8 +90,7 @@ describe("Restate Queue Provider", () => { const jobData = job.data; switch (jobData.type) { case "val": - testState.results.push(jobData.val); - break; + return jobData.val; case "err": throw new Error(jobData.err); case "stall": @@ -108,8 +107,11 @@ describe("Restate Queue Provider", () => { testState.errors.push(jobData.err); } }, - onComplete: async () => { + onComplete: async (_j, res) => { testState.inFlight--; + if (res) { + testState.results.push(res); + } }, }, { -- cgit v1.2.3-70-g09d2