diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-20 00:44:46 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-10-20 00:44:46 +0000 |
| commit | 8c0aae33b878827ca0978d9979bb4f2b51ef2f6e (patch) | |
| tree | cee4452c0c76e1f2301a932f2276eaa0375920aa /packages/plugins-queue-restate/src | |
| parent | cdf8121338aae0aaf69edc9c8b4244ba493cb7fc (diff) | |
| download | karakeep-8c0aae33b878827ca0978d9979bb4f2b51ef2f6e.tar.zst | |
fix(restate): Fix priority for restate queue
Diffstat (limited to 'packages/plugins-queue-restate/src')
| -rw-r--r-- | packages/plugins-queue-restate/src/semaphore.ts | 11 | ||||
| -rw-r--r-- | packages/plugins-queue-restate/src/tests/queue.test.ts | 4 |
2 files changed, 8 insertions, 7 deletions
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts index eb78f6b6..ad636f98 100644 --- a/packages/plugins-queue-restate/src/semaphore.ts +++ b/packages/plugins-queue-restate/src/semaphore.ts @@ -46,15 +46,16 @@ export const semaphore = object({ }, }); +// Lower numbers represent higher priority, mirroring Liteque’s semantics. function selectAndPopItem(items: QueueItem[]): QueueItem { - let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 }; + let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; for (const [i, item] of items.entries()) { - if (item.priority > highest.priority) { - highest.priority = item.priority; - highest.index = i; + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.index = i; } } - const [item] = items.splice(highest.index, 1); + const [item] = items.splice(selected.index, 1); return item; } diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts index 692581d6..e59d47cb 100644 --- a/packages/plugins-queue-restate/src/tests/queue.test.ts +++ b/packages/plugins-queue-restate/src/tests/queue.test.ts @@ -214,8 +214,8 @@ describe("Restate Queue Provider", () => { await waitUntilQueueEmpty(); expect(testState.results).toEqual([ - // Then in order of increasing priority - 302, 301, 300, 200, 201, 202, + // Lower numeric priority value should run first + 202, 201, 200, 300, 301, 302, ]); }, 60000); }); |
