From 99413db0e79a156a1b87eacd3c6a7b83e9df946e Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 8 Nov 2025 14:50:00 +0000 Subject: refactor: consolidate multiple karakeep plugins into one package (#2101) * refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude --- packages/plugins-queue-restate/src/service.ts | 133 -------------------------- 1 file changed, 133 deletions(-) delete mode 100644 packages/plugins-queue-restate/src/service.ts (limited to 'packages/plugins-queue-restate/src/service.ts') diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts deleted file mode 100644 index de5b070f..00000000 --- a/packages/plugins-queue-restate/src/service.ts +++ /dev/null @@ -1,133 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; - -import type { - Queue, - QueueOptions, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import { tryCatch } from "@karakeep/shared/tryCatch"; - -import { genId } from "./idProvider"; -import { RestateSemaphore } from "./semaphore"; - -export function buildRestateService( - queue: Queue, - funcs: RunnerFuncs, - opts: RunnerOptions, - queueOpts: QueueOptions, -) { - const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; - return restate.service({ - name: queue.name(), - options: { - inactivityTimeout: { - seconds: opts.timeoutSecs, - }, - }, - handlers: { - run: async ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => { - const id = `${await genId(ctx)}`; - let payload = data.payload; - if (opts.validator) { - const res = opts.validator.safeParse(data.payload); - if (!res.success) { - throw new restate.TerminalError(res.error.message, { - errorCode: 400, - }); - } - payload = res.data; - } - - const priority = data.priority ?? 0; - - const semaphore = new RestateSemaphore( - ctx, - `queue:${queue.name()}`, - opts.concurrency, - ); - - let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - await semaphore.acquire(priority); - const res = await runWorkerLogic(ctx, funcs, { - id, - data: payload, - priority, - runNumber, - numRetriesLeft: NUM_RETRIES - runNumber, - abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), - }); - await semaphore.release(); - if (res.error) { - lastError = res.error; - // TODO: add backoff - await ctx.sleep(1000); - } else { - break; - } - } - if (lastError) { - throw new restate.TerminalError(lastError.message, { - errorCode: 500, - cause: "cause" in lastError ? lastError.cause : undefined, - }); - } - }, - }, - }); -} - -async function runWorkerLogic( - ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs, - data: { - id: string; - data: T; - priority: number; - runNumber: number; - numRetriesLeft: number; - abortSignal: AbortSignal; - }, -) { - const res = await tryCatch( - ctx.run( - `main logic`, - async () => { - await run(data); - }, - { - maxRetryAttempts: 1, - }, - ), - ); - if (res.error) { - await tryCatch( - ctx.run( - `onError`, - async () => - onError?.({ - ...data, - error: res.error, - }), - { - maxRetryAttempts: 1, - }, - ), - ); - return res; - } - - await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { - maxRetryAttempts: 1, - }), - ); - return res; -} -- cgit v1.2.3-70-g09d2