aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/service.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-08 14:50:00 +0000
committerGitHub <noreply@github.com>2025-11-08 14:50:00 +0000
commit99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch)
tree73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins-queue-restate/src/service.ts
parent737b03172c2e063ba311c23d6552418bd2ab1955 (diff)
downloadkarakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins-queue-restate/src/service.ts')
-rw-r--r--packages/plugins-queue-restate/src/service.ts133
1 files changed, 0 insertions, 133 deletions
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts
deleted file mode 100644
index de5b070f..00000000
--- a/packages/plugins-queue-restate/src/service.ts
+++ /dev/null
@@ -1,133 +0,0 @@
-import * as restate from "@restatedev/restate-sdk";
-
-import type {
- Queue,
- QueueOptions,
- RunnerFuncs,
- RunnerOptions,
-} from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-
-import { genId } from "./idProvider";
-import { RestateSemaphore } from "./semaphore";
-
-export function buildRestateService<T>(
- queue: Queue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- priority: number;
- },
- ) => {
- const id = `${await genId(ctx)}`;
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- await semaphore.acquire(priority);
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
- if (res.error) {
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000);
- } else {
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-async function runWorkerLogic<T>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-) {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- await run(data);
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return res;
- }
-
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data), {
- maxRetryAttempts: 1,
- }),
- );
- return res;
-}