aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-08 14:50:00 +0000
committerGitHub <noreply@github.com>2025-11-08 14:50:00 +0000
commit99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch)
tree73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins-queue-restate/src/semaphore.ts
parent737b03172c2e063ba311c23d6552418bd2ab1955 (diff)
downloadkarakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins-queue-restate/src/semaphore.ts')
-rw-r--r--packages/plugins-queue-restate/src/semaphore.ts109
1 files changed, 0 insertions, 109 deletions
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts
deleted file mode 100644
index ad636f98..00000000
--- a/packages/plugins-queue-restate/src/semaphore.ts
+++ /dev/null
@@ -1,109 +0,0 @@
-// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
-
-import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
-
-interface QueueItem {
- awakeable: string;
- priority: number;
-}
-
-interface QueueState {
- items: QueueItem[];
- inFlight: number;
-}
-
-export const semaphore = object({
- name: "Semaphore",
- handlers: {
- acquire: async (
- ctx: ObjectContext<QueueState>,
- req: { awakeableId: string; priority: number; capacity: number },
- ): Promise<void> => {
- const state = await getState(ctx);
-
- state.items.push({
- awakeable: req.awakeableId,
- priority: req.priority,
- });
-
- tick(ctx, state, req.capacity);
-
- setState(ctx, state);
- },
-
- release: async (
- ctx: ObjectContext<QueueState>,
- capacity: number,
- ): Promise<void> => {
- const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
- setState(ctx, state);
- },
- },
- options: {
- ingressPrivate: true,
- },
-});
-
-// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(items: QueueItem[]): QueueItem {
- let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
- for (const [i, item] of items.entries()) {
- if (item.priority < selected.priority) {
- selected.priority = item.priority;
- selected.index = i;
- }
- }
- const [item] = items.splice(selected.index, 1);
- return item;
-}
-
-function tick(
- ctx: ObjectContext<QueueState>,
- state: QueueState,
- capacity: number,
-) {
- while (state.inFlight < capacity && state.items.length > 0) {
- const item = selectAndPopItem(state.items);
- state.inFlight++;
- ctx.resolveAwakeable(item.awakeable);
- }
-}
-
-async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
- return {
- items: (await ctx.get("items")) ?? [],
- inFlight: (await ctx.get("inFlight")) ?? 0,
- };
-}
-
-function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
- ctx.set("items", state.items);
- ctx.set("inFlight", state.inFlight);
-}
-
-export class RestateSemaphore {
- constructor(
- private readonly ctx: Context,
- private readonly id: string,
- private readonly capacity: number,
- ) {}
-
- async acquire(priority: number) {
- const awk = this.ctx.awakeable();
- await this.ctx
- .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .acquire({
- awakeableId: awk.id,
- priority,
- capacity: this.capacity,
- });
- await awk.promise;
- }
- async release() {
- await this.ctx
- .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
- }
-}