diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-09 20:13:39 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-11-09 20:13:39 +0000 |
| commit | b28cd03a4a5f95f429a1429a59319c8a9ac986f8 (patch) | |
| tree | f8b6a50338675bd0073643b5ff31fe447795fad4 | |
| parent | 03161482b44bd67f6eafb3e3d51107811b638d4b (diff) | |
| download | karakeep-b28cd03a4a5f95f429a1429a59319c8a9ac986f8.tar.zst | |
refactor: Allow runner functions to return results to onComplete
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | packages/plugins/package.json | 2 | ||||
| -rw-r--r-- | packages/plugins/queue-liteque/src/index.ts | 16 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 4 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 12 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 8 | ||||
| -rw-r--r-- | packages/shared/queueing.ts | 10 | ||||
| -rw-r--r-- | pnpm-lock.yaml | 14 |
8 files changed, 32 insertions, 36 deletions
diff --git a/apps/workers/package.json b/apps/workers/package.json index f35a52f4..ecbc7684 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -25,7 +25,7 @@ "https-proxy-agent": "^7.0.6", "ipaddr.js": "^2.2.0", "jsdom": "^24.0.0", - "liteque": "^0.6.2", + "liteque": "^0.7.0", "lru-cache": "^11.2.2", "metascraper": "^5.49.5", "metascraper-amazon": "^5.49.5", diff --git a/packages/plugins/package.json b/packages/plugins/package.json index 5931d1a7..daf46843 100644 --- a/packages/plugins/package.json +++ b/packages/plugins/package.json @@ -22,7 +22,7 @@ "@karakeep/shared": "workspace:*", "@restatedev/restate-sdk": "^1.9.0", "@restatedev/restate-sdk-clients": "^1.9.0", - "liteque": "^0.6.2", + "liteque": "^0.7.0", "meilisearch": "^0.45.0" }, "devDependencies": { diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index ddc2181c..94fa795f 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -8,8 +8,6 @@ import { import type { PluginProvider } from "@karakeep/shared/plugins"; import type { - DequeuedJob, - DequeuedJobError, EnqueueOptions, Queue, QueueClient, @@ -82,9 +80,9 @@ class LitequeQueueClient implements QueueClient { return wrapper; } - createRunner<T>( + createRunner<T, R = void>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, ): Runner<T> { const name = queue.name(); @@ -93,16 +91,12 @@ class LitequeQueueClient implements QueueClient { throw new Error(`Queue ${name} not found`); } - const runner = new LQRunner<T>( + const runner = new LQRunner<T, R>( wrapper._impl, { run: funcs.run, - onComplete: funcs.onComplete as - | ((job: DequeuedJob<T>) => Promise<void>) - | undefined, - onError: funcs.onError as - | ((job: DequeuedJobError<T>) => Promise<void>) - | undefined, + onComplete: funcs.onComplete, + onError: funcs.onError, }, { pollIntervalMs: opts.pollIntervalMs ?? 1000, diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index bedc26af..98668872 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -162,9 +162,9 @@ class RestateQueueClient implements QueueClient { return wrapper; } - createRunner<T>( + createRunner<T, R = void>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, ): Runner<T> { const name = queue.name(); diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index de5b070f..d19cad4c 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -11,9 +11,9 @@ import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; import { RestateSemaphore } from "./semaphore"; -export function buildRestateService<T>( +export function buildRestateService<T, R>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, queueOpts: QueueOptions, ) { @@ -84,9 +84,9 @@ export function buildRestateService<T>( }); } -async function runWorkerLogic<T>( +async function runWorkerLogic<T, R>( ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs<T>, + { run, onError, onComplete }: RunnerFuncs<T, R>, data: { id: string; data: T; @@ -100,7 +100,7 @@ async function runWorkerLogic<T>( ctx.run( `main logic`, async () => { - await run(data); + return await run(data); }, { maxRetryAttempts: 1, @@ -125,7 +125,7 @@ async function runWorkerLogic<T>( } await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { + ctx.run("onComplete", async () => await onComplete?.(data, res.data), { maxRetryAttempts: 1, }), ); diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index e59d47cb..d716671b 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -90,8 +90,7 @@ describe("Restate Queue Provider", () => { const jobData = job.data; switch (jobData.type) { case "val": - testState.results.push(jobData.val); - break; + return jobData.val; case "err": throw new Error(jobData.err); case "stall": @@ -108,8 +107,11 @@ describe("Restate Queue Provider", () => { testState.errors.push(jobData.err); } }, - onComplete: async () => { + onComplete: async (_j, res) => { testState.inFlight--; + if (res) { + testState.results.push(res); + } }, }, { diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts index e401972b..ed6759dd 100644 --- a/packages/shared/queueing.ts +++ b/packages/shared/queueing.ts @@ -32,9 +32,9 @@ export interface DequeuedJobError<T> { numRetriesLeft: number; } -export interface RunnerFuncs<T> { - run: (job: DequeuedJob<T>) => Promise<void>; - onComplete?: (job: DequeuedJob<T>) => Promise<void>; +export interface RunnerFuncs<T, R = void> { + run: (job: DequeuedJob<T>) => Promise<R>; + onComplete?: (job: DequeuedJob<T>, result: R) => Promise<void>; onError?: (job: DequeuedJobError<T>) => Promise<void>; } @@ -68,9 +68,9 @@ export interface QueueClient { prepare(): Promise<void>; start(): Promise<void>; createQueue<T>(name: string, options: QueueOptions): Queue<T>; - createRunner<T>( + createRunner<T, R = void>( queue: Queue<T>, - funcs: RunnerFuncs<T>, + funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, ): Runner<T>; shutdown?(): Promise<void>; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 47387568..4a920330 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -839,8 +839,8 @@ importers: specifier: ^24.0.0 version: 24.1.3 liteque: - specifier: ^0.6.2 - version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) + specifier: ^0.7.0 + version: 0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) lru-cache: specifier: ^11.2.2 version: 11.2.2 @@ -1160,8 +1160,8 @@ importers: specifier: ^1.9.0 version: 1.9.0 liteque: - specifier: ^0.6.2 - version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) + specifier: ^0.7.0 + version: 0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) meilisearch: specifier: ^0.45.0 version: 0.45.0 @@ -9957,8 +9957,8 @@ packages: resolution: {integrity: sha512-wUayTU8MS827Dam6MxgD72Ui+KOSF+u/eIqpatOtjnvgJ0+mnDq33uC2M7J0tPK+upe/DpUAuK4JUU89iBoNKQ==} engines: {node: '>=4'} - liteque@0.6.2: - resolution: {integrity: sha512-q99LW+XXHsy3/r1YFTwEmFz1q9Fd8gd9bnBw03gcIlrXjqcmW3av2OrmSKytwUIMdQwbHB0zI5fk/nC/J6xIUQ==} + liteque@0.7.0: + resolution: {integrity: sha512-3eov7hUb+fsQgX2E1Xl1/egxdjnA8opSh+xmB8iIpJthcHYPvFdyI4UIB7JzXnYbMjiNCKDELrk+AjqGLlvp1Q==} peerDependencies: better-sqlite3: '>=7' @@ -25678,7 +25678,7 @@ snapshots: liquid-json@0.3.1: {} - liteque@0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0): + liteque@0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0): dependencies: async-mutex: 0.4.1 better-sqlite3: 11.3.0 |
