aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/service.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src/service.ts')
-rw-r--r--packages/plugins-queue-restate/src/service.ts133
1 files changed, 0 insertions, 133 deletions
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts
deleted file mode 100644
index de5b070f..00000000
--- a/packages/plugins-queue-restate/src/service.ts
+++ /dev/null
@@ -1,133 +0,0 @@
-import * as restate from "@restatedev/restate-sdk";
-
-import type {
- Queue,
- QueueOptions,
- RunnerFuncs,
- RunnerOptions,
-} from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-
-import { genId } from "./idProvider";
-import { RestateSemaphore } from "./semaphore";
-
-export function buildRestateService<T>(
- queue: Queue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- priority: number;
- },
- ) => {
- const id = `${await genId(ctx)}`;
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- await semaphore.acquire(priority);
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
- if (res.error) {
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000);
- } else {
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-async function runWorkerLogic<T>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-) {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- await run(data);
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return res;
- }
-
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data), {
- maxRetryAttempts: 1,
- }),
- );
- return res;
-}