diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-24 01:23:06 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-24 01:23:06 +0000 |
| commit | 54268759492df88644e4279fdcc600214f922f43 (patch) | |
| tree | 610cb784662ca2c7513f67aa7a74f3a8d3e40b3f /packages/plugins/queue-restate/src/tests/queue.test.ts | |
| parent | 38842f77e549fd0946e43a40c65abe0f196c3f04 (diff) | |
| download | karakeep-54268759492df88644e4279fdcc600214f922f43.tar.zst | |
feat: Introduce groupId in restate queue (#2168)
* feat: Introduce groupId in restate queue
* add group ids to the interface
* use last served timestamp
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 292 |
1 files changed, 284 insertions, 8 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index d716671b..28e32394 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -11,14 +11,45 @@ import { import type { Queue, QueueClient } from "@karakeep/shared/queueing"; -import { AdminClient } from "../admin.js"; -import { RestateQueueProvider } from "../index.js"; -import { waitUntil } from "./utils.js"; +import { AdminClient } from "../admin"; +import { RestateQueueProvider } from "../index"; +import { waitUntil } from "./utils"; + +class Baton { + private promise: Promise<void>; + private resolve: () => void; + private waiting = 0; + + constructor() { + this.resolve = () => { + /* empty */ + }; + this.promise = new Promise<void>((resolve) => { + this.resolve = resolve; + }); + } + + async acquire() { + this.waiting++; + await this.promise; + } + + async waitUntilCountWaiting(count: number) { + while (this.waiting < count) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + + release() { + this.resolve(); + } +} type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } - | { type: "stall"; durSec: number }; + | { type: "stall"; durSec: number } + | { type: "semaphore-acquire" }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -30,6 +61,7 @@ describe("Restate Queue Provider", () => { errors: [] as string[], inFlight: 0, maxInFlight: 0, + baton: new Baton(), }; async function waitUntilQueueEmpty() { @@ -48,6 +80,7 @@ describe("Restate Queue Provider", () => { testState.errors = []; testState.inFlight = 0; testState.maxInFlight = 0; + testState.baton = new Baton(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -98,6 +131,8 @@ describe("Restate Queue Provider", () => { setTimeout(resolve, jobData.durSec * 1000), ); break; + case "semaphore-acquire": + await testState.baton.acquire(); } }, onError: async (job) => { @@ -195,12 +230,22 @@ describe("Restate Queue Provider", () => { }, 60000); it("should handle priorities", async () => { - // Hog the queue first + // hog the queue await Promise.all([ - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), ]); + await testState.baton.waitUntilCountWaiting(3); // Then those will get reprioritized await Promise.all([ @@ -213,6 +258,10 @@ describe("Restate Queue Provider", () => { queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), ]); + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + await waitUntilQueueEmpty(); expect(testState.results).toEqual([ @@ -220,4 +269,231 @@ describe("Restate Queue Provider", () => { 202, 201, 200, 300, 301, 302, ]); }, 60000); + + describe("Group Fairness", () => { + it("should process jobs from different groups fairly with same priority", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + // Enqueue jobs from two different groups with same priority + // Group A has more jobs + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 102 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 103 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 300 }, + { priority: 0, groupId: "C" }, + ); + await queue.enqueue( + { type: "val", val: 301 }, + { priority: 0, groupId: "C" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + 200, 100, 300, 201, 101, 301, 102, 103, + ]); + }, 60000); + + it("should respect priority over group fairness", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "B" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "B" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Priority 0 (higher) should run before priority 1 (lower) + expect(testState.results).toEqual([200, 201, 100, 101]); + }, 60000); + + it("should handle jobs without groupId", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + // Mix of grouped and ungrouped jobs + await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped + await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped + await queue.enqueue( + { type: "val", val: 200 }, + { priority: 0, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 201 }, + { priority: 0, groupId: "A" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // All jobs should complete successfully + expect(testState.results).toEqual([100, 200, 101, 201]); + }, 60000); + + it("should work with jobs that don't specify groupId", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + + await testState.baton.waitUntilCountWaiting(3); + + // These should all go to the default "__ungrouped__" group + await queue.enqueue({ type: "val", val: 1 }, { priority: 0 }); + await queue.enqueue({ type: "val", val: 2 }, { priority: 1 }); + await queue.enqueue({ type: "val", val: 3 }, { priority: -1 }); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Should respect priority + expect(testState.results).toEqual([3, 1, 2]); + }, 60000); + + it("should handle same job in same group with different priorities", async () => { + // hog the queue + await Promise.all([ + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + queue.enqueue( + { type: "semaphore-acquire" }, + { groupId: "init", priority: -10 }, + ), + ]); + await testState.baton.waitUntilCountWaiting(3); + + await queue.enqueue( + { type: "val", val: 100 }, + { priority: 2, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 101 }, + { priority: 1, groupId: "A" }, + ); + await queue.enqueue( + { type: "val", val: 102 }, + { priority: 0, groupId: "A" }, + ); + + // Wait for all jobs to be enqueued + await new Promise((resolve) => setTimeout(resolve, 1000)); + testState.baton.release(); + + await waitUntilQueueEmpty(); + + // Should respect priority even within the same group + expect(testState.results).toEqual([102, 101, 100]); + }, 60000); + }); }); |
