import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, inject, it, } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; import { waitUntil } from "./utils"; class Baton { private promise: Promise; private resolve: () => void; private waiting = 0; constructor() { this.resolve = () => { /* empty */ }; this.promise = new Promise((resolve) => { this.resolve = resolve; }); } async acquire() { this.waiting++; await this.promise; } async waitUntilCountWaiting(count: number) { while (this.waiting < count) { await new Promise((resolve) => setTimeout(resolve, 100)); } } release() { this.resolve(); } } type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } | { type: "semaphore-acquire" } | { type: "rate-limit"; val: number; delayMs: number; attemptsBeforeSuccess: number; } | { type: "timeout-then-succeed"; val: number; timeoutDurSec: number; attemptsBeforeSuccess: number; }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; let queue: Queue; let adminClient: AdminClient; const testState = { results: [] as number[], errors: [] as string[], inFlight: 0, maxInFlight: 0, baton: new Baton(), rateLimitAttempts: new Map(), timeoutAttempts: new Map(), }; async function waitUntilQueueEmpty() { await waitUntil( async () => { const stats = await queue.stats(); return stats.pending + stats.pending_retry + stats.running === 0; }, "Queue to be empty", 60000, ); } beforeEach(async () => { testState.results = []; testState.errors = []; testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); testState.rateLimitAttempts = new Map(); testState.timeoutAttempts = new Map(); }); afterEach(async () => { await waitUntilQueueEmpty(); }); beforeAll(async () => { const ingressPort = inject("restateIngressPort"); const adminPort = inject("restateAdminPort"); process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; process.env.RESTATE_LISTEN_PORT = "9080"; const provider = new RestateQueueProvider(); const client = await provider.getClient(); if (!client) { throw new Error("Failed to create queue client"); } queueClient = client; adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); queue = queueClient.createQueue("test-queue", { defaultJobArgs: { numRetries: 3, }, keepFailedJobs: false, }); queueClient.createRunner( queue, { run: async (job) => { testState.inFlight++; testState.maxInFlight = Math.max( testState.maxInFlight, testState.inFlight, ); const jobData = job.data; switch (jobData.type) { case "val": return jobData.val; case "err": throw new Error(jobData.err); case "stall": await new Promise((resolve) => setTimeout(resolve, jobData.durSec * 1000), ); break; case "semaphore-acquire": await testState.baton.acquire(); break; case "rate-limit": { const attemptKey = `${job.id}`; const currentAttempts = testState.rateLimitAttempts.get(attemptKey) || 0; testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); if (currentAttempts < jobData.attemptsBeforeSuccess) { throw new QueueRetryAfterError( `Rate limited (attempt ${currentAttempts + 1})`, jobData.delayMs, ); } return jobData.val; } case "timeout-then-succeed": { const attemptKey = `${job.id}`; const currentAttempts = testState.timeoutAttempts.get(attemptKey) || 0; testState.timeoutAttempts.set(attemptKey, currentAttempts + 1); if (currentAttempts < jobData.attemptsBeforeSuccess) { // Stall longer than the timeout to trigger a timeout await new Promise((resolve) => setTimeout(resolve, jobData.timeoutDurSec * 1000), ); // This should not be reached if timeout works correctly throw new Error("Should have timed out"); } return jobData.val; } } }, onError: async (job) => { testState.inFlight--; const jobData = job.data; if (jobData && jobData.type === "err") { testState.errors.push(jobData.err); } }, onComplete: async (_j, res) => { testState.inFlight--; if (res) { testState.results.push(res); } }, }, { concurrency: 3, timeoutSecs: 2, pollIntervalMs: 0 /* Doesn't matter */, }, ); await queueClient.prepare(); await queueClient.start(); await adminClient.upsertDeployment("http://host.docker.internal:9080"); }, 90000); afterAll(async () => { if (queueClient?.shutdown) { await queueClient.shutdown(); } }); it("should enqueue and process a job", async () => { const jobId = await queue.enqueue({ type: "val", val: 42 }); expect(jobId).toBeDefined(); expect(typeof jobId).toBe("string"); await waitUntilQueueEmpty(); expect(testState.results).toEqual([42]); }, 60000); it("should process multiple jobs", async () => { await queue.enqueue({ type: "val", val: 1 }); await queue.enqueue({ type: "val", val: 2 }); await queue.enqueue({ type: "val", val: 3 }); await waitUntilQueueEmpty(); expect(testState.results.length).toEqual(3); expect(testState.results).toContain(1); expect(testState.results).toContain(2); expect(testState.results).toContain(3); }, 60000); it("should retry failed jobs", async () => { await queue.enqueue({ type: "err", err: "Test error" }); await waitUntilQueueEmpty(); // Initial attempt + 3 retries expect(testState.errors).toEqual([ "Test error", "Test error", "Test error", "Test error", ]); }, 90000); it("should use idempotency key", async () => { const idempotencyKey = `test-${Date.now()}`; // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); expect(testState.results).toEqual([200]); }, 60000); it("should handle concurrent jobs", async () => { const promises = []; for (let i = 300; i < 320; i++) { promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); } await Promise.all(promises); await waitUntilQueueEmpty(); expect(testState.maxInFlight).toEqual(3); }, 60000); it("should handle priorities", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); // Then those will get reprioritized await Promise.all([ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), ]); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); expect(testState.results).toEqual([ // Lower numeric priority value should run first 202, 201, 200, 300, 301, 302, ]); }, 60000); describe("Group Fairness", () => { it("should process jobs from different groups fairly with same priority", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); // Enqueue jobs from two different groups with same priority // Group A has more jobs await queue.enqueue( { type: "val", val: 200 }, { priority: 0, groupId: "B" }, ); await queue.enqueue( { type: "val", val: 201 }, { priority: 0, groupId: "B" }, ); await queue.enqueue( { type: "val", val: 100 }, { priority: 0, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 101 }, { priority: 0, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 102 }, { priority: 0, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 103 }, { priority: 0, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 300 }, { priority: 0, groupId: "C" }, ); await queue.enqueue( { type: "val", val: 301 }, { priority: 0, groupId: "C" }, ); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); expect(testState.results).toEqual([ 200, 100, 300, 201, 101, 301, 102, 103, ]); }, 60000); it("should respect priority over group fairness", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); await queue.enqueue( { type: "val", val: 100 }, { priority: 1, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 101 }, { priority: 1, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 200 }, { priority: 0, groupId: "B" }, ); await queue.enqueue( { type: "val", val: 201 }, { priority: 0, groupId: "B" }, ); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); // Priority 0 (higher) should run before priority 1 (lower) expect(testState.results).toEqual([200, 201, 100, 101]); }, 60000); it("should handle jobs without groupId", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); // Mix of grouped and ungrouped jobs await queue.enqueue({ type: "val", val: 100 }, { priority: 0 }); // ungrouped await queue.enqueue({ type: "val", val: 101 }, { priority: 0 }); // ungrouped await queue.enqueue( { type: "val", val: 200 }, { priority: 0, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 201 }, { priority: 0, groupId: "A" }, ); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); // All jobs should complete successfully expect(testState.results).toEqual([100, 200, 101, 201]); }, 60000); it("should work with jobs that don't specify groupId", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); // These should all go to the default "__ungrouped__" group await queue.enqueue({ type: "val", val: 1 }, { priority: 0 }); await queue.enqueue({ type: "val", val: 2 }, { priority: 1 }); await queue.enqueue({ type: "val", val: 3 }, { priority: -1 }); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); // Should respect priority expect(testState.results).toEqual([3, 1, 2]); }, 60000); it("should handle same job in same group with different priorities", async () => { // hog the queue await Promise.all([ queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), queue.enqueue( { type: "semaphore-acquire" }, { groupId: "init", priority: -10 }, ), ]); await testState.baton.waitUntilCountWaiting(3); await queue.enqueue( { type: "val", val: 100 }, { priority: 2, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 101 }, { priority: 1, groupId: "A" }, ); await queue.enqueue( { type: "val", val: 102 }, { priority: 0, groupId: "A" }, ); // Wait for all jobs to be enqueued await new Promise((resolve) => setTimeout(resolve, 1000)); testState.baton.release(); await waitUntilQueueEmpty(); // Should respect priority even within the same group expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); describe("QueueRetryAfterError handling", () => { it("should retry after delay without counting against retry attempts", async () => { const startTime = Date.now(); // This job will fail with QueueRetryAfterError twice before succeeding await queue.enqueue({ type: "rate-limit", val: 42, delayMs: 500, // 500ms delay attemptsBeforeSuccess: 2, // Fail twice, succeed on third try }); await waitUntilQueueEmpty(); const duration = Date.now() - startTime; // Should have succeeded expect(testState.results).toEqual([42]); // Should have been called 3 times (2 rate limit failures + 1 success) expect(testState.rateLimitAttempts.size).toBe(1); const attempts = Array.from(testState.rateLimitAttempts.values())[0]; expect(attempts).toBe(3); // Should have waited at least 1 second total (2 x 500ms delays) expect(duration).toBeGreaterThanOrEqual(1000); // onError should NOT have been called for rate limit retries expect(testState.errors).toEqual([]); }, 60000); it("should not exhaust retries when rate limited", async () => { // This job will be rate limited many more times than the retry limit // but should still eventually succeed await queue.enqueue({ type: "rate-limit", val: 100, delayMs: 100, // Short delay for faster test attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) }); await waitUntilQueueEmpty(); // Should have succeeded despite being "retried" more than the limit expect(testState.results).toEqual([100]); // Should have been called 11 times (10 rate limit failures + 1 success) const attempts = Array.from(testState.rateLimitAttempts.values())[0]; expect(attempts).toBe(11); // No errors should have been recorded expect(testState.errors).toEqual([]); }, 90000); it("should still respect retry limit for non-rate-limit errors", async () => { // Enqueue a regular error job that should fail permanently await queue.enqueue({ type: "err", err: "Regular error" }); await waitUntilQueueEmpty(); // Should have failed 4 times (initial + 3 retries) and not succeeded expect(testState.errors).toEqual([ "Regular error", "Regular error", "Regular error", "Regular error", ]); expect(testState.results).toEqual([]); }, 90000); }); describe("Timeout handling", () => { it("should retry timed out jobs and not waste semaphore slots", async () => { // This test verifies that: // 1. Jobs that timeout get retried correctly // 2. Semaphore slots are freed when jobs timeout (via lease expiry) // 3. Other jobs can still run while a job is being retried // Enqueue a job that will timeout on first attempt, succeed on second // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout await queue.enqueue({ type: "timeout-then-succeed", val: 42, timeoutDurSec: 5, attemptsBeforeSuccess: 1, // Timeout once, then succeed }); // Wait a bit for the first attempt to start await new Promise((resolve) => setTimeout(resolve, 500)); // Enqueue more jobs to verify semaphore slots are eventually freed // With concurrency=3, these should be able to run after the timeout await queue.enqueue({ type: "val", val: 100 }); await queue.enqueue({ type: "val", val: 101 }); await queue.enqueue({ type: "val", val: 102 }); await waitUntilQueueEmpty(); // The timeout job should have succeeded after retry expect(testState.results).toContain(42); // All other jobs should have completed expect(testState.results).toContain(100); expect(testState.results).toContain(101); expect(testState.results).toContain(102); // The timeout job should have been attempted twice const attempts = Array.from(testState.timeoutAttempts.values())[0]; expect(attempts).toBe(2); // Concurrency should not have exceeded the limit expect(testState.maxInFlight).toBeLessThanOrEqual(3); }, 120000); it("should handle job that times out multiple times before succeeding", async () => { // Enqueue a single job that times out twice before succeeding // This tests that the retry mechanism works correctly for timeouts await queue.enqueue({ type: "timeout-then-succeed", val: 99, timeoutDurSec: 5, attemptsBeforeSuccess: 2, // Timeout twice, then succeed }); await waitUntilQueueEmpty(); // Job should eventually succeed expect(testState.results).toEqual([99]); // Should have been attempted 3 times (2 timeouts + 1 success) const attempts = Array.from(testState.timeoutAttempts.values())[0]; expect(attempts).toBe(3); }, 180000); }); });