aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins')
-rw-r--r--packages/plugins/queue-restate/src/index.ts2
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts113
-rw-r--r--packages/plugins/queue-restate/src/service.ts3
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts292
4 files changed, 380 insertions, 30 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index 98668872..de37bbea 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -41,6 +41,7 @@ class RestateQueueWrapper<T> implements Queue<T> {
data: {
payload: T;
priority: number;
+ groupId?: string;
},
) => Promise<void>;
}
@@ -49,6 +50,7 @@ class RestateQueueWrapper<T> implements Queue<T> {
{
payload,
priority: options?.priority ?? 0,
+ groupId: options?.groupId,
},
restateClient.rpc.sendOpts({
delay: options?.delayMs
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 152604dc..9acecf28 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -8,21 +8,47 @@ interface QueueItem {
priority: number;
}
-interface QueueState {
+interface LegacyQueueState {
items: QueueItem[];
+ itemsv2: Record<string, GroupState>;
+ inFlight: number;
+}
+
+interface QueueState {
+ groups: Record<string, GroupState>;
inFlight: number;
}
+interface GroupState {
+ id: string;
+ items: QueueItem[];
+ lastServedTimestamp: number;
+}
+
export const semaphore = object({
name: "Semaphore",
handlers: {
acquire: async (
- ctx: ObjectContext<QueueState>,
- req: { awakeableId: string; priority: number; capacity: number },
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ awakeableId: string;
+ priority: number;
+ capacity: number;
+ groupId?: string;
+ },
): Promise<void> => {
const state = await getState(ctx);
+ req.groupId = req.groupId ?? "__ungrouped__";
+
+ if (state.groups[req.groupId] === undefined) {
+ state.groups[req.groupId] = {
+ id: req.groupId,
+ items: [],
+ lastServedTimestamp: Date.now(),
+ };
+ }
- state.items.push({
+ state.groups[req.groupId].items.push({
awakeable: req.awakeableId,
priority: req.priority,
});
@@ -33,7 +59,7 @@ export const semaphore = object({
},
release: async (
- ctx: ObjectContext<QueueState>,
+ ctx: ObjectContext<LegacyQueueState>,
capacity: number,
): Promise<void> => {
const state = await getState(ctx);
@@ -49,40 +75,84 @@ export const semaphore = object({
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(items: QueueItem[]): QueueItem {
- let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
- for (const [i, item] of items.entries()) {
- if (item.priority < selected.priority) {
- selected.priority = item.priority;
- selected.index = i;
+function selectAndPopItem(state: QueueState): {
+ item: QueueItem;
+ groupId: string;
+} {
+ let selected: {
+ priority: number;
+ groupId: string;
+ index: number;
+ groupLastServedTimestamp: number;
+ } = {
+ priority: Number.MAX_SAFE_INTEGER,
+ groupId: "",
+ index: 0,
+ groupLastServedTimestamp: 0,
+ };
+
+ for (const [groupId, group] of Object.entries(state.groups)) {
+ for (const [i, item] of group.items.entries()) {
+ if (item.priority < selected.priority) {
+ selected.priority = item.priority;
+ selected.groupId = groupId;
+ selected.index = i;
+ selected.groupLastServedTimestamp = group.lastServedTimestamp;
+ } else if (item.priority === selected.priority) {
+ if (group.lastServedTimestamp < selected.groupLastServedTimestamp) {
+ selected.priority = item.priority;
+ selected.groupId = groupId;
+ selected.index = i;
+ selected.groupLastServedTimestamp = group.lastServedTimestamp;
+ }
+ }
}
}
- const [item] = items.splice(selected.index, 1);
- return item;
+
+ const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
+ state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ if (state.groups[selected.groupId].items.length === 0) {
+ delete state.groups[selected.groupId];
+ }
+ return { item, groupId: selected.groupId };
}
function tick(
- ctx: ObjectContext<QueueState>,
+ ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
) {
- while (state.inFlight < capacity && state.items.length > 0) {
- const item = selectAndPopItem(state.items);
+ while (state.inFlight < capacity && Object.keys(state.groups).length > 0) {
+ const { item } = selectAndPopItem(state);
state.inFlight++;
ctx.resolveAwakeable(item.awakeable);
}
}
-async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
+async function getState(
+ ctx: ObjectContext<LegacyQueueState>,
+): Promise<QueueState> {
+ const groups = (await ctx.get("itemsv2")) ?? {};
+ const items = (await ctx.get("items")) ?? [];
+
+ if (items.length > 0) {
+ groups["__legacy__"] = {
+ id: "__legacy__",
+ items,
+ lastServedTimestamp: 0,
+ };
+ }
+
return {
- items: (await ctx.get("items")) ?? [],
+ groups,
inFlight: (await ctx.get("inFlight")) ?? 0,
};
}
-function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
- ctx.set("items", state.items);
+function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
+ ctx.set("itemsv2", state.groups);
ctx.set("inFlight", state.inFlight);
+ ctx.clear("items");
}
export class RestateSemaphore {
@@ -92,7 +162,7 @@ export class RestateSemaphore {
private readonly capacity: number,
) {}
- async acquire(priority: number) {
+ async acquire(priority: number, groupId?: string) {
const awk = this.ctx.awakeable();
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
@@ -100,6 +170,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ groupId,
});
try {
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 06ed97f5..b26f66cf 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -40,6 +40,7 @@ export function buildRestateService<T, R>(
data: {
payload: T;
priority: number;
+ groupId?: string;
},
) => {
const id = `${await genId(ctx)}`;
@@ -64,7 +65,7 @@ export function buildRestateService<T, R>(
let lastError: Error | undefined;
for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- await semaphore.acquire(priority);
+ await semaphore.acquire(priority, data.groupId);
const res = await runWorkerLogic(ctx, funcs, {
id,
data: payload,
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index d716671b..28e32394 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -11,14 +11,45 @@ import {
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
-import { AdminClient } from "../admin.js";
-import { RestateQueueProvider } from "../index.js";
-import { waitUntil } from "./utils.js";
+import { AdminClient } from "../admin";
+import { RestateQueueProvider } from "../index";
+import { waitUntil } from "./utils";
+
+class Baton {
+ private promise: Promise<void>;
+ private resolve: () => void;
+ private waiting = 0;
+
+ constructor() {
+ this.resolve = () => {
+ /* empty */
+ };
+ this.promise = new Promise<void>((resolve) => {
+ this.resolve = resolve;
+ });
+ }
+
+ async acquire() {
+ this.waiting++;
+ await this.promise;
+ }
+
+ async waitUntilCountWaiting(count: number) {
+ while (this.waiting < count) {
+ await new Promise((resolve) => setTimeout(resolve, 100));
+ }
+ }
+
+ release() {
+ this.resolve();
+ }
+}
type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
- | { type: "stall"; durSec: number };
+ | { type: "stall"; durSec: number }
+ | { type: "semaphore-acquire" };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -30,6 +61,7 @@ describe("Restate Queue Provider", () => {
errors: [] as string[],
inFlight: 0,
maxInFlight: 0,
+ baton: new Baton(),
};
async function waitUntilQueueEmpty() {
@@ -48,6 +80,7 @@ describe("Restate Queue Provider", () => {
testState.errors = [];
testState.inFlight = 0;
testState.maxInFlight = 0;
+ testState.baton = new Baton();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -98,6 +131,8 @@ describe("Restate Queue Provider", () => {
setTimeout(resolve, jobData.durSec * 1000),
);
break;
+ case "semaphore-acquire":
+ await testState.baton.acquire();
}
},
onError: async (job) => {
@@ -195,12 +230,22 @@ describe("Restate Queue Provider", () => {
}, 60000);
it("should handle priorities", async () => {
- // Hog the queue first
+ // hog the queue
await Promise.all([
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
- queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
]);
+ await testState.baton.waitUntilCountWaiting(3);
// Then those will get reprioritized
await Promise.all([
@@ -213,6 +258,10 @@ describe("Restate Queue Provider", () => {
queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
]);
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
await waitUntilQueueEmpty();
expect(testState.results).toEqual([
@@ -220,4 +269,231 @@ describe("Restate Queue Provider", () => {
202, 201, 200, 300, 301, 302,
]);
}, 60000);
+
+ describe("Group Fairness", () => {
+ it("should process jobs from different groups fairly with same priority", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // Enqueue jobs from two different groups with same priority
+ // Group A has more jobs
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 102 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 103 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 300 },
+ { priority: 0, groupId: "C" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 301 },
+ { priority: 0, groupId: "C" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ 200, 100, 300, 201, 101, 301, 102, 103,
+ ]);
+ }, 60000);
+
+ it("should respect priority over group fairness", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "B" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "B" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Priority 0 (higher) should run before priority 1 (lower)
+ expect(testState.results).toEqual([200, 201, 100, 101]);
+ }, 60000);
+
+ it("should handle jobs without groupId", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // Mix of grouped and ungrouped jobs
+ await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped
+ await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped
+ await queue.enqueue(
+ { type: "val", val: 200 },
+ { priority: 0, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 201 },
+ { priority: 0, groupId: "A" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // All jobs should complete successfully
+ expect(testState.results).toEqual([100, 200, 101, 201]);
+ }, 60000);
+
+ it("should work with jobs that don't specify groupId", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+
+ await testState.baton.waitUntilCountWaiting(3);
+
+ // These should all go to the default "__ungrouped__" group
+ await queue.enqueue({ type: "val", val: 1 }, { priority: 0 });
+ await queue.enqueue({ type: "val", val: 2 }, { priority: 1 });
+ await queue.enqueue({ type: "val", val: 3 }, { priority: -1 });
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Should respect priority
+ expect(testState.results).toEqual([3, 1, 2]);
+ }, 60000);
+
+ it("should handle same job in same group with different priorities", async () => {
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
+ await queue.enqueue(
+ { type: "val", val: 100 },
+ { priority: 2, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 101 },
+ { priority: 1, groupId: "A" },
+ );
+ await queue.enqueue(
+ { type: "val", val: 102 },
+ { priority: 0, groupId: "A" },
+ );
+
+ // Wait for all jobs to be enqueued
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ testState.baton.release();
+
+ await waitUntilQueueEmpty();
+
+ // Should respect priority even within the same group
+ expect(testState.results).toEqual([102, 101, 100]);
+ }, 60000);
+ });
});