aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/service.ts
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--packages/plugins/queue-restate/src/service.ts154
1 files changed, 13 insertions, 141 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..139e5738 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -1,154 +1,26 @@
-import * as restate from "@restatedev/restate-sdk";
-
import type {
Queue,
QueueOptions,
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-import { genId } from "./idProvider";
-import { RestateSemaphore } from "./semaphore";
+import { buildDispatcherService } from "./dispatcher";
+import { buildRunnerService } from "./runner";
+
+export interface RestateServicePair<T, R> {
+ dispatcher: ReturnType<typeof buildDispatcherService<T, R>>;
+ runner: ReturnType<typeof buildRunnerService<T, R>>;
+}
-export function buildRestateService<T, R>(
+export function buildRestateServices<T, R>(
queue: Queue<T>,
funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- retryPolicy: {
- maxAttempts: NUM_RETRIES,
- initialInterval: {
- seconds: 5,
- },
- maxInterval: {
- minutes: 1,
- },
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- queuedIdempotencyKey?: string;
- priority: number;
- groupId?: string;
- },
- ) => {
- const id = `${await genId(ctx)}`;
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- const acquired = await semaphore.acquire(
- priority,
- data.groupId,
- data.queuedIdempotencyKey,
- );
- if (!acquired) {
- return;
- }
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
- if (res.error) {
- if (res.error instanceof restate.CancelledError) {
- throw res.error;
- }
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000);
- } else {
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-async function runWorkerLogic<T, R>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T, R>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-) {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- return await run(data);
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return res;
- }
-
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
- maxRetryAttempts: 1,
- }),
- );
- return res;
+): RestateServicePair<T, R> {
+ return {
+ dispatcher: buildDispatcherService<T, R>(queue, opts, queueOpts),
+ runner: buildRunnerService<T, R>(queue.name(), funcs, opts),
+ };
}