diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-05 07:04:29 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-05 07:04:29 +0100 |
| commit | 74a1f7b6b600d4cb53352dde7def374c3125721a (patch) | |
| tree | 70b79ebae61456f6ff2cb02a37351fa9817fb342 /packages/plugins-queue-restate/src/service.ts | |
| parent | 4a580d713621f99abb8baabc9b847ce039d44842 (diff) | |
| download | karakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst | |
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration
* add retry
* add delay + idempotency
* implement concurrency limits
* add admin stats
* add todos
* add id provider
* handle onComplete failures
* add tests
* add pub key and fix logging
* add priorities
* fail call after retries
* more fixes
* fix retries left
* some refactoring
* fix package.json
* upgrade sdk
* some test cleanups
Diffstat (limited to 'packages/plugins-queue-restate/src/service.ts')
| -rw-r--r-- | packages/plugins-queue-restate/src/service.ts | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts new file mode 100644 index 00000000..de5b070f --- /dev/null +++ b/packages/plugins-queue-restate/src/service.ts @@ -0,0 +1,133 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import { genId } from "./idProvider"; +import { RestateSemaphore } from "./semaphore"; + +export function buildRestateService<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => { + const id = `${await genId(ctx)}`; + let payload = data.payload; + if (opts.validator) { + const res = opts.validator.safeParse(data.payload); + if (!res.success) { + throw new restate.TerminalError(res.error.message, { + errorCode: 400, + }); + } + payload = res.data; + } + + const priority = data.priority ?? 0; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + ); + + let lastError: Error | undefined; + for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + await semaphore.acquire(priority); + const res = await runWorkerLogic(ctx, funcs, { + id, + data: payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), + }); + await semaphore.release(); + if (res.error) { + lastError = res.error; + // TODO: add backoff + await ctx.sleep(1000); + } else { + break; + } + } + if (lastError) { + throw new restate.TerminalError(lastError.message, { + errorCode: 500, + cause: "cause" in lastError ? lastError.cause : undefined, + }); + } + }, + }, + }); +} + +async function runWorkerLogic<T>( + ctx: restate.Context, + { run, onError, onComplete }: RunnerFuncs<T>, + data: { + id: string; + data: T; + priority: number; + runNumber: number; + numRetriesLeft: number; + abortSignal: AbortSignal; + }, +) { + const res = await tryCatch( + ctx.run( + `main logic`, + async () => { + await run(data); + }, + { + maxRetryAttempts: 1, + }, + ), + ); + if (res.error) { + await tryCatch( + ctx.run( + `onError`, + async () => + onError?.({ + ...data, + error: res.error, + }), + { + maxRetryAttempts: 1, + }, + ), + ); + return res; + } + + await tryCatch( + ctx.run("onComplete", async () => await onComplete?.(data), { + maxRetryAttempts: 1, + }), + ); + return res; +} |
