diff options
Diffstat (limited to 'packages/plugins')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 113 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 3 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 292 |
4 files changed, 380 insertions, 30 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index 98668872..de37bbea 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -41,6 +41,7 @@ class RestateQueueWrapper<T> implements Queue<T> { data: { payload: T; priority: number; + groupId?: string; }, ) => Promise<void>; } @@ -49,6 +50,7 @@ class RestateQueueWrapper<T> implements Queue<T> { { payload, priority: options?.priority ?? 0, + groupId: options?.groupId, }, restateClient.rpc.sendOpts({ delay: options?.delayMs diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 152604dc..9acecf28 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -8,21 +8,47 @@ interface QueueItem { priority: number; } -interface QueueState { +interface LegacyQueueState { items: QueueItem[]; + itemsv2: Record<string, GroupState>; + inFlight: number; +} + +interface QueueState { + groups: Record<string, GroupState>; inFlight: number; } +interface GroupState { + id: string; + items: QueueItem[]; + lastServedTimestamp: number; +} + export const semaphore = object({ name: "Semaphore", handlers: { acquire: async ( - ctx: ObjectContext<QueueState>, - req: { awakeableId: string; priority: number; capacity: number }, + ctx: ObjectContext<LegacyQueueState>, + req: { + awakeableId: string; + priority: number; + capacity: number; + groupId?: string; + }, ): Promise<void> => { const state = await getState(ctx); + req.groupId = req.groupId ?? "__ungrouped__"; + + if (state.groups[req.groupId] === undefined) { + state.groups[req.groupId] = { + id: req.groupId, + items: [], + lastServedTimestamp: Date.now(), + }; + } - state.items.push({ + state.groups[req.groupId].items.push({ awakeable: req.awakeableId, priority: req.priority, }); @@ -33,7 +59,7 @@ export const semaphore = object({ }, release: async ( - ctx: ObjectContext<QueueState>, + ctx: ObjectContext<LegacyQueueState>, capacity: number, ): Promise<void> => { const state = await getState(ctx); @@ -49,40 +75,84 @@ export const semaphore = object({ }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; +function selectAndPopItem(state: QueueState): { + item: QueueItem; + groupId: string; +} { + let selected: { + priority: number; + groupId: string; + index: number; + groupLastServedTimestamp: number; + } = { + priority: Number.MAX_SAFE_INTEGER, + groupId: "", + index: 0, + groupLastServedTimestamp: 0, + }; + + for (const [groupId, group] of Object.entries(state.groups)) { + for (const [i, item] of group.items.entries()) { + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } else if (item.priority === selected.priority) { + if (group.lastServedTimestamp < selected.groupLastServedTimestamp) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } + } } } - const [item] = items.splice(selected.index, 1); - return item; + + const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); + state.groups[selected.groupId].lastServedTimestamp = Date.now(); + if (state.groups[selected.groupId].items.length === 0) { + delete state.groups[selected.groupId]; + } + return { item, groupId: selected.groupId }; } function tick( - ctx: ObjectContext<QueueState>, + ctx: ObjectContext<LegacyQueueState>, state: QueueState, capacity: number, ) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); + while (state.inFlight < capacity && Object.keys(state.groups).length > 0) { + const { item } = selectAndPopItem(state); state.inFlight++; ctx.resolveAwakeable(item.awakeable); } } -async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { +async function getState( + ctx: ObjectContext<LegacyQueueState>, +): Promise<QueueState> { + const groups = (await ctx.get("itemsv2")) ?? {}; + const items = (await ctx.get("items")) ?? []; + + if (items.length > 0) { + groups["__legacy__"] = { + id: "__legacy__", + items, + lastServedTimestamp: 0, + }; + } + return { - items: (await ctx.get("items")) ?? [], + groups, inFlight: (await ctx.get("inFlight")) ?? 0, }; } -function setState(ctx: ObjectContext<QueueState>, state: QueueState) { - ctx.set("items", state.items); +function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) { + ctx.set("itemsv2", state.groups); ctx.set("inFlight", state.inFlight); + ctx.clear("items"); } export class RestateSemaphore { @@ -92,7 +162,7 @@ export class RestateSemaphore { private readonly capacity: number, ) {} - async acquire(priority: number) { + async acquire(priority: number, groupId?: string) { const awk = this.ctx.awakeable(); await this.ctx .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) @@ -100,6 +170,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + groupId, }); try { diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 06ed97f5..b26f66cf 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -40,6 +40,7 @@ export function buildRestateService<T, R>( data: { payload: T; priority: number; + groupId?: string; }, ) => { const id = `${await genId(ctx)}`; @@ -64,7 +65,7 @@ export function buildRestateService<T, R>( let lastError: Error | undefined; for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - await semaphore.acquire(priority); + await semaphore.acquire(priority, data.groupId); const res = await runWorkerLogic(ctx, funcs, { id, data: payload, diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index d716671b..28e32394 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -11,14 +11,45 @@ import { import type { Queue, QueueClient } from "@karakeep/shared/queueing"; -import { AdminClient } from "../admin.js"; -import { RestateQueueProvider } from "../index.js"; -import { waitUntil } from "./utils.js"; +import { AdminClient } from "../admin"; +import { RestateQueueProvider } from "../index"; +import { waitUntil } from "./utils"; + +class Baton { + private promise: Promise<void>; + private resolve: () => void; + private waiting = 0; + + constructor() { + this.resolve = () => { + /* empty */ + }; + this.promise = new Promise<void>((resolve) => { + this.resolve = resolve; + }); + } + + async acquire() { + this.waiting++; + await this.promise; + } + + async waitUntilCountWaiting(count: number) { + while (this.waiting < count) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + + release() { + this.resolve(); + } +} type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } - | { type: "stall"; durSec: number }; + | { type: "stall"; durSec: number } + | { type: "semaphore-acquire" }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -30,6 +61,7 @@ describe("Restate Queue Provider", () => { errors: [] as string[], inFlight: 0, maxInFlight: 0, + baton: new Baton(), }; async function waitUntilQueueEmpty() { @@ -48,6 +80,7 @@ describe("Restate Queue Provider", () => { testState.errors = []; testState.inFlight = 0; testState.maxInFlight = 0; + testState.baton = new Baton(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -98,6 +131,8 @@ describe("Restate Queue Provider", () => { setTimeout(resolve, jobData.durSec * 1000), ); break; + case "semaphore-acquire": + await testState.baton.acquire(); } }, onError: async (job) => { @@ -195,12 +230,22 @@ describe("Restate Queue Provider", () => { }, 60000); it("should handle priorities", async () => { - // Hog the queue first + // hog the queue await Promise.all([ - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), ]); + await testState.baton.waitUntilCountWaiting(3); // Then those will get reprioritized await Promise.all([ @@ -213,6 +258,10 @@ describe("Restate Queue Provider", () => { queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), ]); + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + await waitUntilQueueEmpty(); expect(testState.results).toEqual([ @@ -220,4 +269,231 @@ describe("Restate Queue Provider", () => { 202, 201, 200, 300, 301, 302, ]); }, 60000); + + describe("Group Fairness", () => { + it("should process jobs from different groups fairly with same priority", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + // Enqueue jobs from two different groups with same priority + // Group A has more jobs + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 102 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 103 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 300 }, + { priority: 0, groupId: "C" }, + ); + await queue.enqueue( + { type: "val", val: 301 }, + { priority: 0, groupId: "C" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + 200, 100, 300, 201, 101, 301, 102, 103, + ]); + }, 60000); + + it("should respect priority over group fairness", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "B" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Priority 0 (higher) should run before priority 1 (lower) + expect(testState.results).toEqual([200, 201, 100, 101]); + }, 60000); + + it("should handle jobs without groupId", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + // Mix of grouped and ungrouped jobs + await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped + await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "A" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // All jobs should complete successfully + expect(testState.results).toEqual([100, 200, 101, 201]); + }, 60000); + + it("should work with jobs that don't specify groupId", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + + await testState.baton.waitUntilCountWaiting(3); + + // These should all go to the default "__ungrouped__" group + await queue.enqueue({ type: "val", val: 1 }, { priority: 0 }); + await queue.enqueue({ type: "val", val: 2 }, { priority: 1 }); + await queue.enqueue({ type: "val", val: 3 }, { priority: -1 }); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Should respect priority + expect(testState.results).toEqual([3, 1, 2]); + }, 60000); + + it("should handle same job in same group with different priorities", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 2, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 102 }, + { priority: 0, groupId: "A" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Should respect priority even within the same group + expect(testState.results).toEqual([102, 101, 100]); + }, 60000); + }); }); |
