diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-08 14:50:00 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-08 14:50:00 +0000 |
| commit | 99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch) | |
| tree | 73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins-queue-restate/src/semaphore.ts | |
| parent | 737b03172c2e063ba311c23d6552418bd2ab1955 (diff) | |
| download | karakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst | |
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory
- Create new `packages/plugins` directory with consolidated package.json
- Move queue-liteque, queue-restate, and search-meilisearch to subdirectories
- Update imports in packages/shared-server/src/plugins.ts
- Remove individual plugin package directories
- Update shared-server dependency to use @karakeep/plugins
This reduces overhead of maintaining multiple separate packages for plugins.
* refactor: consolidate plugin config files to root level
- Move .oxlintrc.json to packages/plugins root
- Move vitest.config.ts to packages/plugins root
- Update vitest config paths to work from root
- Remove individual config files from plugin subdirectories
This reduces configuration duplication across plugin subdirectories.
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins-queue-restate/src/semaphore.ts')
| -rw-r--r-- | packages/plugins-queue-restate/src/semaphore.ts | 109 |
1 files changed, 0 insertions, 109 deletions
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts deleted file mode 100644 index ad636f98..00000000 --- a/packages/plugins-queue-restate/src/semaphore.ts +++ /dev/null @@ -1,109 +0,0 @@ -// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts - -import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; - -interface QueueItem { - awakeable: string; - priority: number; -} - -interface QueueState { - items: QueueItem[]; - inFlight: number; -} - -export const semaphore = object({ - name: "Semaphore", - handlers: { - acquire: async ( - ctx: ObjectContext<QueueState>, - req: { awakeableId: string; priority: number; capacity: number }, - ): Promise<void> => { - const state = await getState(ctx); - - state.items.push({ - awakeable: req.awakeableId, - priority: req.priority, - }); - - tick(ctx, state, req.capacity); - - setState(ctx, state); - }, - - release: async ( - ctx: ObjectContext<QueueState>, - capacity: number, - ): Promise<void> => { - const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); - setState(ctx, state); - }, - }, - options: { - ingressPrivate: true, - }, -}); - -// Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; - } - } - const [item] = items.splice(selected.index, 1); - return item; -} - -function tick( - ctx: ObjectContext<QueueState>, - state: QueueState, - capacity: number, -) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); - state.inFlight++; - ctx.resolveAwakeable(item.awakeable); - } -} - -async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { - return { - items: (await ctx.get("items")) ?? [], - inFlight: (await ctx.get("inFlight")) ?? 0, - }; -} - -function setState(ctx: ObjectContext<QueueState>, state: QueueState) { - ctx.set("items", state.items); - ctx.set("inFlight", state.inFlight); -} - -export class RestateSemaphore { - constructor( - private readonly ctx: Context, - private readonly id: string, - private readonly capacity: number, - ) {} - - async acquire(priority: number) { - const awk = this.ctx.awakeable(); - await this.ctx - .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .acquire({ - awakeableId: awk.id, - priority, - capacity: this.capacity, - }); - await awk.promise; - } - async release() { - await this.ctx - .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .release(this.capacity); - } -} |
