diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-08 14:50:00 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-08 14:50:00 +0000 |
| commit | 99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch) | |
| tree | 73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins-queue-restate/src/service.ts | |
| parent | 737b03172c2e063ba311c23d6552418bd2ab1955 (diff) | |
| download | karakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst | |
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory
- Create new `packages/plugins` directory with consolidated package.json
- Move queue-liteque, queue-restate, and search-meilisearch to subdirectories
- Update imports in packages/shared-server/src/plugins.ts
- Remove individual plugin package directories
- Update shared-server dependency to use @karakeep/plugins
This reduces overhead of maintaining multiple separate packages for plugins.
* refactor: consolidate plugin config files to root level
- Move .oxlintrc.json to packages/plugins root
- Move vitest.config.ts to packages/plugins root
- Update vitest config paths to work from root
- Remove individual config files from plugin subdirectories
This reduces configuration duplication across plugin subdirectories.
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins-queue-restate/src/service.ts')
| -rw-r--r-- | packages/plugins-queue-restate/src/service.ts | 133 |
1 files changed, 0 insertions, 133 deletions
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts deleted file mode 100644 index de5b070f..00000000 --- a/packages/plugins-queue-restate/src/service.ts +++ /dev/null @@ -1,133 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; - -import type { - Queue, - QueueOptions, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import { tryCatch } from "@karakeep/shared/tryCatch"; - -import { genId } from "./idProvider"; -import { RestateSemaphore } from "./semaphore"; - -export function buildRestateService<T>( - queue: Queue<T>, - funcs: RunnerFuncs<T>, - opts: RunnerOptions<T>, - queueOpts: QueueOptions, -) { - const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; - return restate.service({ - name: queue.name(), - options: { - inactivityTimeout: { - seconds: opts.timeoutSecs, - }, - }, - handlers: { - run: async ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => { - const id = `${await genId(ctx)}`; - let payload = data.payload; - if (opts.validator) { - const res = opts.validator.safeParse(data.payload); - if (!res.success) { - throw new restate.TerminalError(res.error.message, { - errorCode: 400, - }); - } - payload = res.data; - } - - const priority = data.priority ?? 0; - - const semaphore = new RestateSemaphore( - ctx, - `queue:${queue.name()}`, - opts.concurrency, - ); - - let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - await semaphore.acquire(priority); - const res = await runWorkerLogic(ctx, funcs, { - id, - data: payload, - priority, - runNumber, - numRetriesLeft: NUM_RETRIES - runNumber, - abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), - }); - await semaphore.release(); - if (res.error) { - lastError = res.error; - // TODO: add backoff - await ctx.sleep(1000); - } else { - break; - } - } - if (lastError) { - throw new restate.TerminalError(lastError.message, { - errorCode: 500, - cause: "cause" in lastError ? lastError.cause : undefined, - }); - } - }, - }, - }); -} - -async function runWorkerLogic<T>( - ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs<T>, - data: { - id: string; - data: T; - priority: number; - runNumber: number; - numRetriesLeft: number; - abortSignal: AbortSignal; - }, -) { - const res = await tryCatch( - ctx.run( - `main logic`, - async () => { - await run(data); - }, - { - maxRetryAttempts: 1, - }, - ), - ); - if (res.error) { - await tryCatch( - ctx.run( - `onError`, - async () => - onError?.({ - ...data, - error: res.error, - }), - { - maxRetryAttempts: 1, - }, - ), - ); - return res; - } - - await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { - maxRetryAttempts: 1, - }), - ); - return res; -} |
