aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-restate/src/tests')
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts292
1 files changed, 284 insertions, 8 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index d716671b..28e32394 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -11,14 +11,45 @@ import {
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
-import { AdminClient } from "../admin.js";
-import { RestateQueueProvider } from "../index.js";
-import { waitUntil } from "./utils.js";
+import { AdminClient } from "../admin";
+import { RestateQueueProvider } from "../index";
+import { waitUntil } from "./utils";
+
+class Baton {
+ private promise: Promise<void>;
+ private resolve: () => void;
+ private waiting = 0;
+
+ constructor() {
+ this.resolve = () => {
+ /* empty */
+ };
+ this.promise = new Promise<void>((resolve) => {
+ this.resolve = resolve;
+ });
+ }
+
+ async acquire() {
+ this.waiting++;
+ await this.promise;
+ }
+
+ async waitUntilCountWaiting(count: number) {
+ while (this.waiting < count) {
+ await new Promise((resolve) => setTimeout(resolve, 100));
+ }
+ }
+
+ release() {
+ this.resolve();
+ }
+}
type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
- | { type: "stall"; durSec: number };
+ | { type: "stall"; durSec: number }
+ | { type: "semaphore-acquire" };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -30,6 +61,7 @@ describe("Restate Queue Provider", () => {
errors: [] as string[],
inFlight: 0,
maxInFlight: 0,
+ baton: new Baton(),
};
async function waitUntilQueueEmpty() {
@@ -48,6 +80,7 @@ describe("Restate Queue Provider", () => {
testState.errors = [];
testState.inFlight = 0;
testState.maxInFlight = 0;
+ testState.baton = new Baton();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -98,6 +131,8 @@ describe("Restate Queue Provider", () => {
setTimeout(resolve, jobData.durSec * 1000),
);
break;
+ case "semaphore-acquire":
+ await testState.baton.acquire();
}
},
onError: async (job) => {
@@ -195,12 +230,22 @@ describe("Restate Queue Provider", () => {
}, 60000);
it("should handle priorities", async () => {
- // Hog the queue first
+ // hog the queue
await Promise.all([
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
]);
+ await testState.baton.waitUntilCountWaiting(3);
// Then those will get reprioritized
await Promise.all([
@@ -213,6 +258,10 @@ describe("Restate Queue Provider", () => {
queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
]);
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
await waitUntilQueueEmpty();
expect(testState.results).toEqual([
@@ -220,4 +269,231 @@ describe("Restate Queue Provider", () => {
202, 201, 200, 300, 301, 302,
]);
}, 60000);
+
+ describe("Group Fairness", () => {
+ it("should process jobs from different groups fairly with same priority", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // Enqueue jobs from two different groups with same priority
+ // Group A has more jobs
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 102 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 103 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 300 },
+ { priority: 0, groupId: "C" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 301 },
+ { priority: 0, groupId: "C" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ 200, 100, 300, 201, 101, 301, 102, 103,
+ ]);
+ }, 60000);
+
+ it("should respect priority over group fairness", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "B" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Priority 0 (higher) should run before priority 1 (lower)
+ expect(testState.results).toEqual([200, 201, 100, 101]);
+ }, 60000);
+
+ it("should handle jobs without groupId", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // Mix of grouped and ungrouped jobs
+ await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped
+ await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "A" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // All jobs should complete successfully
+ expect(testState.results).toEqual([100, 200, 101, 201]);
+ }, 60000);
+
+ it("should work with jobs that don't specify groupId", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // These should all go to the default "__ungrouped__" group
+ await queue.enqueue({ type: "val", val: 1 }, { priority: 0 });
+ await queue.enqueue({ type: "val", val: 2 }, { priority: 1 });
+ await queue.enqueue({ type: "val", val: 3 }, { priority: -1 });
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Should respect priority
+ expect(testState.results).toEqual([3, 1, 2]);
+ }, 60000);
+
+ it("should handle same job in same group with different priorities", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 2, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 102 },
+ { priority: 0, groupId: "A" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Should respect priority even within the same group
+ expect(testState.results).toEqual([102, 101, 100]);
+ }, 60000);
+ });
});