aboutsummaryrefslogtreecommitdiffstats
path: root/packages/queue
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2024-07-14 20:42:06 +0000
committerMohamedBassem <me@mbassem.com>2024-07-14 22:25:23 +0000
commitf77a41a75237b8c816e0c9ca7217dfacc32cc7d0 (patch)
tree69557b2001183cd915a81e0ea9d276e10d262147 /packages/queue
parentaa3dce09ff68c212ac1fad33adfbfaba96290a59 (diff)
downloadkarakeep-f77a41a75237b8c816e0c9ca7217dfacc32cc7d0.tar.zst
chore: Add a new sqlite based queue package
Diffstat (limited to 'packages/queue')
-rw-r--r--packages/queue/db.ts19
-rw-r--r--packages/queue/drizzle.config.ts10
-rw-r--r--packages/queue/drizzle/0000_wonderful_talisman.sql18
-rw-r--r--packages/queue/drizzle/meta/0000_snapshot.json130
-rw-r--r--packages/queue/drizzle/meta/_journal.json13
-rw-r--r--packages/queue/index.ts6
-rw-r--r--packages/queue/options.ts22
-rw-r--r--packages/queue/package.json35
-rw-r--r--packages/queue/queue.ts146
-rw-r--r--packages/queue/runner.test.ts440
-rw-r--r--packages/queue/runner.ts115
-rw-r--r--packages/queue/schema.ts36
-rw-r--r--packages/queue/tsconfig.json9
-rw-r--r--packages/queue/types.ts11
-rw-r--r--packages/queue/vitest.config.ts13
15 files changed, 1023 insertions, 0 deletions
diff --git a/packages/queue/db.ts b/packages/queue/db.ts
new file mode 100644
index 00000000..f1412fef
--- /dev/null
+++ b/packages/queue/db.ts
@@ -0,0 +1,19 @@
+import path from "node:path";
+import Database from "better-sqlite3";
+import { BetterSQLite3Database, drizzle } from "drizzle-orm/better-sqlite3";
+import { migrate } from "drizzle-orm/better-sqlite3/migrator";
+
+import * as schema from "./schema";
+
+export function buildDBClient(dbPath: string, runMigrations = false) {
+ const sqlite = new Database(dbPath);
+ const db = drizzle(sqlite, { schema });
+ if (runMigrations) {
+ migrateDB(db);
+ }
+ return db;
+}
+
+export function migrateDB(db: BetterSQLite3Database<typeof schema>) {
+ migrate(db, { migrationsFolder: path.join(__dirname, "drizzle") });
+}
diff --git a/packages/queue/drizzle.config.ts b/packages/queue/drizzle.config.ts
new file mode 100644
index 00000000..6ef01d1b
--- /dev/null
+++ b/packages/queue/drizzle.config.ts
@@ -0,0 +1,10 @@
+import type { Config } from "drizzle-kit";
+
+export default {
+ schema: "./schema.ts",
+ out: "./drizzle",
+ driver: "better-sqlite",
+ dbCredentials: {
+ url: "data.db",
+ },
+} satisfies Config;
diff --git a/packages/queue/drizzle/0000_wonderful_talisman.sql b/packages/queue/drizzle/0000_wonderful_talisman.sql
new file mode 100644
index 00000000..e042ab92
--- /dev/null
+++ b/packages/queue/drizzle/0000_wonderful_talisman.sql
@@ -0,0 +1,18 @@
+CREATE TABLE `tasks` (
+ `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL,
+ `queue` text NOT NULL,
+ `payload` text NOT NULL,
+ `createdAt` integer NOT NULL,
+ `status` text DEFAULT 'pending' NOT NULL,
+ `expireAt` integer,
+ `allocationId` text NOT NULL,
+ `numRunsLeft` integer NOT NULL,
+ `maxNumRuns` integer NOT NULL
+);
+--> statement-breakpoint
+CREATE INDEX `tasks_queue_idx` ON `tasks` (`queue`);--> statement-breakpoint
+CREATE INDEX `tasks_status_idx` ON `tasks` (`status`);--> statement-breakpoint
+CREATE INDEX `tasks_expire_at_idx` ON `tasks` (`expireAt`);--> statement-breakpoint
+CREATE INDEX `tasks_num_runs_left_idx` ON `tasks` (`numRunsLeft`);--> statement-breakpoint
+CREATE INDEX `tasks_max_num_runs_idx` ON `tasks` (`maxNumRuns`);--> statement-breakpoint
+CREATE INDEX `tasks_allocation_id_idx` ON `tasks` (`allocationId`); \ No newline at end of file
diff --git a/packages/queue/drizzle/meta/0000_snapshot.json b/packages/queue/drizzle/meta/0000_snapshot.json
new file mode 100644
index 00000000..57c7c2f4
--- /dev/null
+++ b/packages/queue/drizzle/meta/0000_snapshot.json
@@ -0,0 +1,130 @@
+{
+ "version": "5",
+ "dialect": "sqlite",
+ "id": "3094773c-0138-46b2-b617-4b10093b0f53",
+ "prevId": "00000000-0000-0000-0000-000000000000",
+ "tables": {
+ "tasks": {
+ "name": "tasks",
+ "columns": {
+ "id": {
+ "name": "id",
+ "type": "integer",
+ "primaryKey": true,
+ "notNull": true,
+ "autoincrement": true
+ },
+ "queue": {
+ "name": "queue",
+ "type": "text",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ },
+ "payload": {
+ "name": "payload",
+ "type": "text",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ },
+ "createdAt": {
+ "name": "createdAt",
+ "type": "integer",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ },
+ "status": {
+ "name": "status",
+ "type": "text",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false,
+ "default": "'pending'"
+ },
+ "expireAt": {
+ "name": "expireAt",
+ "type": "integer",
+ "primaryKey": false,
+ "notNull": false,
+ "autoincrement": false
+ },
+ "allocationId": {
+ "name": "allocationId",
+ "type": "text",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ },
+ "numRunsLeft": {
+ "name": "numRunsLeft",
+ "type": "integer",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ },
+ "maxNumRuns": {
+ "name": "maxNumRuns",
+ "type": "integer",
+ "primaryKey": false,
+ "notNull": true,
+ "autoincrement": false
+ }
+ },
+ "indexes": {
+ "tasks_queue_idx": {
+ "name": "tasks_queue_idx",
+ "columns": [
+ "queue"
+ ],
+ "isUnique": false
+ },
+ "tasks_status_idx": {
+ "name": "tasks_status_idx",
+ "columns": [
+ "status"
+ ],
+ "isUnique": false
+ },
+ "tasks_expire_at_idx": {
+ "name": "tasks_expire_at_idx",
+ "columns": [
+ "expireAt"
+ ],
+ "isUnique": false
+ },
+ "tasks_num_runs_left_idx": {
+ "name": "tasks_num_runs_left_idx",
+ "columns": [
+ "numRunsLeft"
+ ],
+ "isUnique": false
+ },
+ "tasks_max_num_runs_idx": {
+ "name": "tasks_max_num_runs_idx",
+ "columns": [
+ "maxNumRuns"
+ ],
+ "isUnique": false
+ },
+ "tasks_allocation_id_idx": {
+ "name": "tasks_allocation_id_idx",
+ "columns": [
+ "allocationId"
+ ],
+ "isUnique": false
+ }
+ },
+ "foreignKeys": {},
+ "compositePrimaryKeys": {},
+ "uniqueConstraints": {}
+ }
+ },
+ "enums": {},
+ "_meta": {
+ "schemas": {},
+ "tables": {},
+ "columns": {}
+ }
+} \ No newline at end of file
diff --git a/packages/queue/drizzle/meta/_journal.json b/packages/queue/drizzle/meta/_journal.json
new file mode 100644
index 00000000..2b14f895
--- /dev/null
+++ b/packages/queue/drizzle/meta/_journal.json
@@ -0,0 +1,13 @@
+{
+ "version": "5",
+ "dialect": "sqlite",
+ "entries": [
+ {
+ "idx": 0,
+ "version": "5",
+ "when": 1720992922192,
+ "tag": "0000_wonderful_talisman",
+ "breakpoints": true
+ }
+ ]
+} \ No newline at end of file
diff --git a/packages/queue/index.ts b/packages/queue/index.ts
new file mode 100644
index 00000000..c9144f29
--- /dev/null
+++ b/packages/queue/index.ts
@@ -0,0 +1,6 @@
+export { SqliteQueue } from "./queue";
+export { buildDBClient, migrateDB } from "./db";
+export type { SqliteQueueOptions, RunnerOptions, RunnerFuncs } from "./options";
+export { Runner } from "./runner";
+
+export type { DequeuedJob, DequeuedJobError } from "./types";
diff --git a/packages/queue/options.ts b/packages/queue/options.ts
new file mode 100644
index 00000000..18f8e52d
--- /dev/null
+++ b/packages/queue/options.ts
@@ -0,0 +1,22 @@
+import { ZodType } from "zod";
+
+import { DequeuedJob, DequeuedJobError } from "./types";
+
+export interface SqliteQueueOptions {
+ defaultJobArgs: {
+ numRetries: number;
+ };
+}
+
+export interface RunnerFuncs<T> {
+ run: (job: DequeuedJob<T>) => Promise<void>;
+ onComplete?: (job: DequeuedJob<T>) => Promise<void>;
+ onError?: (job: DequeuedJobError<T>) => Promise<void>;
+}
+
+export interface RunnerOptions<T> {
+ pollIntervalMs: number;
+ timeoutSecs: number;
+ concurrency: number;
+ validator?: ZodType<T>;
+}
diff --git a/packages/queue/package.json b/packages/queue/package.json
new file mode 100644
index 00000000..e0e9d5d1
--- /dev/null
+++ b/packages/queue/package.json
@@ -0,0 +1,35 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@hoarder/queue",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "dependencies": {
+ "better-sqlite3": "^9.4.3",
+ "drizzle-orm": "^0.29.4",
+ "zod": "^3.22.4",
+ "async-mutex": "^0.4.1"
+ },
+ "devDependencies": {
+ "@hoarder/eslint-config": "workspace:^0.2.0",
+ "@hoarder/prettier-config": "workspace:^0.1.0",
+ "@hoarder/tsconfig": "workspace:^0.1.0",
+ "@types/better-sqlite3": "^7.6.9",
+ "drizzle-kit": "^0.20.14",
+ "vitest": "^1.3.1"
+ },
+ "scripts": {
+ "typecheck": "tsc --noEmit",
+ "test": "vitest",
+ "format": "prettier . --ignore-path ../../.prettierignore",
+ "lint": "eslint ."
+ },
+ "main": "index.ts",
+ "eslintConfig": {
+ "root": true,
+ "extends": [
+ "@hoarder/eslint-config/base"
+ ]
+ },
+ "prettier": "@hoarder/prettier-config"
+}
diff --git a/packages/queue/queue.ts b/packages/queue/queue.ts
new file mode 100644
index 00000000..ad486468
--- /dev/null
+++ b/packages/queue/queue.ts
@@ -0,0 +1,146 @@
+import assert from "node:assert";
+import { and, asc, count, eq, gt, lt, or } from "drizzle-orm";
+
+import { buildDBClient } from "./db";
+import { SqliteQueueOptions } from "./options";
+import { Job, tasksTable } from "./schema";
+
+// generate random id
+function generateAllocationId() {
+ return Math.random().toString(36).substring(2, 15);
+}
+
+export class SqliteQueue<T> {
+ queueName: string;
+ db: ReturnType<typeof buildDBClient>;
+ options: SqliteQueueOptions;
+
+ constructor(
+ name: string,
+ db: ReturnType<typeof buildDBClient>,
+ options: SqliteQueueOptions,
+ ) {
+ this.queueName = name;
+ this.options = options;
+ this.db = db;
+ }
+
+ name() {
+ return this.queueName;
+ }
+
+ async enqueue(payload: T): Promise<Job> {
+ const job = await this.db
+ .insert(tasksTable)
+ .values({
+ queue: this.queueName,
+ payload: JSON.stringify(payload),
+ numRunsLeft: this.options.defaultJobArgs.numRetries + 1,
+ maxNumRuns: this.options.defaultJobArgs.numRetries + 1,
+ allocationId: generateAllocationId(),
+ })
+ .returning();
+
+ return job[0];
+ }
+
+ async stats() {
+ const res = await this.db
+ .select({ status: tasksTable.status, count: count() })
+ .from(tasksTable)
+ .where(eq(tasksTable.queue, this.queueName))
+ .groupBy(tasksTable.status);
+
+ return res.reduce(
+ (acc, r) => {
+ acc[r.status] += r.count;
+ return acc;
+ },
+ {
+ pending: 0,
+ pending_retry: 0,
+ running: 0,
+ failed: 0,
+ },
+ );
+ }
+
+ async attemptDequeue(options: { timeoutSecs: number }): Promise<Job | null> {
+ return await this.db.transaction(async (txn) => {
+ const jobs = await txn
+ .select()
+ .from(tasksTable)
+ .where(
+ and(
+ eq(tasksTable.queue, this.queueName),
+ gt(tasksTable.numRunsLeft, 0),
+ or(
+ // Not picked by a worker yet
+ eq(tasksTable.status, "pending"),
+
+ // Failed but still has attempts left
+ eq(tasksTable.status, "pending_retry"),
+
+ // Expired and still has attempts left
+ and(
+ eq(tasksTable.status, "running"),
+ lt(tasksTable.expireAt, new Date()),
+ ),
+ ),
+ ),
+ )
+ .orderBy(asc(tasksTable.createdAt))
+ .limit(1);
+
+ if (jobs.length == 0) {
+ return null;
+ }
+ assert(jobs.length == 1);
+ const job = jobs[0];
+
+ const result = await txn
+ .update(tasksTable)
+ .set({
+ status: "running",
+ numRunsLeft: job.numRunsLeft - 1,
+ allocationId: generateAllocationId(),
+ expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000),
+ })
+ .where(
+ and(
+ eq(tasksTable.id, job.id),
+
+ // The compare and swap is necessary to avoid race conditions
+ eq(tasksTable.allocationId, job.allocationId),
+ ),
+ )
+ .returning();
+ if (result.length == 0) {
+ return null;
+ }
+ assert(result.length == 1);
+ return result[0];
+ });
+ }
+
+ async finalize(
+ id: number,
+ alloctionId: string,
+ status: "completed" | "pending_retry" | "failed",
+ ) {
+ if (status == "completed") {
+ await this.db
+ .delete(tasksTable)
+ .where(
+ and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)),
+ );
+ } else {
+ await this.db
+ .update(tasksTable)
+ .set({ status: status, expireAt: null })
+ .where(
+ and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)),
+ );
+ }
+ }
+}
diff --git a/packages/queue/runner.test.ts b/packages/queue/runner.test.ts
new file mode 100644
index 00000000..9e50c9a5
--- /dev/null
+++ b/packages/queue/runner.test.ts
@@ -0,0 +1,440 @@
+/* eslint-disable @typescript-eslint/require-await */
+import { Semaphore } from "async-mutex";
+import { eq } from "drizzle-orm";
+import { describe, expect, test } from "vitest";
+import { z } from "zod";
+
+import {
+ buildDBClient,
+ DequeuedJob,
+ DequeuedJobError,
+ Runner,
+ RunnerOptions,
+ SqliteQueue,
+} from "./";
+import { tasksTable } from "./schema";
+
+class Baton {
+ semaphore: Semaphore;
+ constructor() {
+ this.semaphore = new Semaphore(0);
+ this.reset();
+ }
+ post() {
+ this.semaphore.setValue(100000);
+ }
+
+ async wait() {
+ await this.semaphore.acquire();
+ }
+
+ reset() {
+ this.semaphore.setValue(-Infinity);
+ }
+}
+
+class Barrier {
+ semaphore: Semaphore;
+ baton: Baton;
+ constructor(numParticipants: number) {
+ this.semaphore = new Semaphore(numParticipants * -1 + 1);
+ this.baton = new Baton();
+ this.reset(numParticipants * -1 + 1);
+ }
+
+ async notifyReachedAndWait() {
+ this.semaphore.release();
+ await this.baton.wait();
+ }
+
+ async waitUntilAllReached() {
+ await this.semaphore.waitForUnlock();
+ }
+
+ allowParticipantsToProceed() {
+ this.baton.post();
+ }
+
+ reset(numParticipants: number) {
+ this.semaphore.setValue(numParticipants);
+ this.baton.reset();
+ }
+}
+
+const defaultRunnerOpts = {
+ pollIntervalMs: 100,
+ timeoutSecs: 100,
+ concurrency: 2,
+ validator: z.object({
+ increment: z.number(),
+ succeedAfter: z.number().optional().default(0),
+ blockForSec: z.number().optional().default(0),
+ }),
+};
+
+interface Work {
+ increment: number;
+ succeedAfter?: number;
+ blockForSec?: number;
+}
+
+interface Results {
+ result: number;
+ numCalled: number;
+ numCompleted: number;
+ numFailed: number;
+}
+
+async function waitUntilAllSettled(queue: SqliteQueue<Work>) {
+ let stats = await queue.stats();
+ while (stats.running > 0 || stats.pending > 0 || stats.pending_retry > 0) {
+ await new Promise((resolve) => setTimeout(resolve, 100));
+ stats = await queue.stats();
+ console.log(stats);
+ }
+}
+
+function buildRunner(
+ queue: SqliteQueue<Work>,
+ opts: RunnerOptions<Work>,
+ barrier: Barrier,
+ inputResults?: Results,
+) {
+ const results = inputResults ?? {
+ result: 0,
+ numCalled: 0,
+ numCompleted: 0,
+ numFailed: 0,
+ };
+ const runner = new Runner<Work>(
+ queue,
+ {
+ run: async (job: DequeuedJob<Work>) => {
+ console.log("STARTED:", job);
+ results.numCalled++;
+ await barrier.notifyReachedAndWait();
+ if (job.runNumber < (job.data.succeedAfter ?? 0)) {
+ throw new Error("Failed");
+ }
+ if (job.data.blockForSec !== undefined) {
+ await new Promise((resolve) =>
+ setTimeout(resolve, job.data.blockForSec! * 1000),
+ );
+ }
+ results.result += job.data.increment;
+ },
+ onComplete: async (job: DequeuedJob<Work>) => {
+ console.log("COMPLETED:", job);
+ results.numCompleted++;
+ },
+ onError: async (job: DequeuedJobError<Work>) => {
+ console.log("FAILED:", job);
+ results.numFailed++;
+ },
+ },
+ opts,
+ );
+
+ return { runner, results };
+}
+
+describe("SqiteQueueRunner", () => {
+ test("should run jobs with correct concurrency", async () => {
+ const queue = new SqliteQueue<Work>(
+ "queue1",
+ buildDBClient(":memory:", true),
+ {
+ defaultJobArgs: {
+ numRetries: 0,
+ },
+ },
+ );
+
+ const barrier = new Barrier(2);
+ const { runner, results } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 2 },
+ barrier,
+ );
+
+ queue.enqueue({ increment: 1 });
+ queue.enqueue({ increment: 2 });
+ queue.enqueue({ increment: 3 });
+
+ expect(await queue.stats()).toEqual({
+ pending: 3,
+ running: 0,
+ pending_retry: 0,
+ failed: 0,
+ });
+
+ const runnerPromise = runner.runUntilEmpty();
+
+ // Wait until all runners reach the synchronization point
+ await barrier.waitUntilAllReached();
+
+ // Ensure that we have two "running" jobs given the concurrency of 2
+ expect(await queue.stats()).toEqual({
+ pending: 1,
+ running: 2,
+ pending_retry: 0,
+ failed: 0,
+ });
+
+ // Allow jobs to proceed
+ barrier.allowParticipantsToProceed();
+
+ // Wait until all jobs are consumed
+ await runnerPromise;
+
+ expect(await queue.stats()).toEqual({
+ pending: 0,
+ running: 0,
+ pending_retry: 0,
+ failed: 0,
+ });
+
+ expect(results.result).toEqual(6);
+ expect(results.numCalled).toEqual(3);
+ expect(results.numCompleted).toEqual(3);
+ expect(results.numFailed).toEqual(0);
+ });
+
+ test("should retry errors", async () => {
+ const queue = new SqliteQueue<Work>(
+ "queue1",
+ buildDBClient(":memory:", true),
+ {
+ defaultJobArgs: {
+ numRetries: 2,
+ },
+ },
+ );
+
+ const barrier = new Barrier(0);
+ barrier.allowParticipantsToProceed();
+ const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier);
+
+ queue.enqueue({ increment: 1, succeedAfter: 2 });
+ queue.enqueue({ increment: 1, succeedAfter: 10 });
+ queue.enqueue({ increment: 3, succeedAfter: 0 });
+
+ const runnerPromise = runner.runUntilEmpty();
+
+ // Wait until all jobs are consumed
+ await runnerPromise;
+
+ expect(await queue.stats()).toEqual({
+ pending: 0,
+ pending_retry: 0,
+ running: 0,
+ failed: 1,
+ });
+
+ expect(results.result).toEqual(4);
+ expect(results.numCalled).toEqual(7);
+ expect(results.numCompleted).toEqual(2);
+ expect(results.numFailed).toEqual(1);
+ });
+
+ test("timeouts are respected", async () => {
+ const queue = new SqliteQueue<Work>(
+ "queue1",
+ buildDBClient(":memory:", true),
+ {
+ defaultJobArgs: {
+ numRetries: 1,
+ },
+ },
+ );
+
+ const barrier = new Barrier(1);
+ barrier.allowParticipantsToProceed();
+ const { runner: runner, results } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 1, timeoutSecs: 1 },
+ barrier,
+ );
+
+ queue.enqueue({ increment: 1, blockForSec: 10 });
+ await runner.runUntilEmpty();
+
+ expect(await queue.stats()).toEqual({
+ pending: 0,
+ pending_retry: 0,
+ running: 0,
+ failed: 1,
+ });
+
+ expect(results.result).toEqual(0);
+ expect(results.numCalled).toEqual(2);
+ expect(results.numCompleted).toEqual(0);
+ expect(results.numFailed).toEqual(1);
+ });
+
+ test("serialization errors", async () => {
+ const queue = new SqliteQueue<Work>(
+ "queue1",
+ buildDBClient(":memory:", true),
+ {
+ defaultJobArgs: {
+ numRetries: 1,
+ },
+ },
+ );
+
+ const job = await queue.enqueue({ increment: 1 });
+ // Corrupt the payload
+ await queue.db
+ .update(tasksTable)
+ .set({ payload: "{}" })
+ .where(eq(tasksTable.id, job.id));
+
+ const barrier = new Barrier(1);
+ barrier.allowParticipantsToProceed();
+ const { runner, results } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 1 },
+ barrier,
+ );
+
+ const p = runner.run();
+ await waitUntilAllSettled(queue);
+ runner.stop();
+ await p;
+
+ expect(await queue.stats()).toEqual({
+ pending: 0,
+ pending_retry: 0,
+ running: 0,
+ failed: 1,
+ });
+
+ expect(results.result).toEqual(0);
+ expect(results.numCalled).toEqual(0);
+ expect(results.numCompleted).toEqual(0);
+ expect(results.numFailed).toEqual(1);
+ });
+
+ test("concurrent runners", async () => {
+ const queue = new SqliteQueue<Work>(
+ "queue1",
+ buildDBClient(":memory:", true),
+ {
+ defaultJobArgs: {
+ numRetries: 0,
+ },
+ },
+ );
+
+ await queue.enqueue({ increment: 1 });
+ await queue.enqueue({ increment: 2 });
+ await queue.enqueue({ increment: 3 });
+
+ const barrier = new Barrier(3);
+ const { runner: runner1, results } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 1 },
+ barrier,
+ );
+ const { runner: runner2 } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 1 },
+ barrier,
+ results,
+ );
+ const { runner: runner3 } = buildRunner(
+ queue,
+ { ...defaultRunnerOpts, concurrency: 1 },
+ barrier,
+ results,
+ );
+
+ const runPromises = Promise.all([
+ runner1.run(),
+ runner2.run(),
+ runner3.run(),
+ ]);
+
+ await barrier.waitUntilAllReached();
+
+ expect(await queue.stats()).toEqual({
+ pending: 0,
+ pending_retry: 0,
+ running: 3,
+ failed: 0,
+ });
+
+ barrier.allowParticipantsToProceed();
+
+ runner1.stop();
+ runner2.stop();
+ runner3.stop();
+
+ await runPromises;
+
+ expect(results.result).toEqual(6);
+ expect(results.numCalled).toEqual(3);
+ expect(results.numCompleted).toEqual(3);
+ expect(results.numFailed).toEqual(0);
+ });
+
+ test("large test", async () => {
+ const db = buildDBClient(":memory:", true);
+ const queue1 = new SqliteQueue<Work>("queue1", db, {
+ defaultJobArgs: {
+ numRetries: 0,
+ },
+ });
+ const queue2 = new SqliteQueue<Work>("queue2", db, {
+ defaultJobArgs: {
+ numRetries: 0,
+ },
+ });
+
+ const barrier = new Barrier(0);
+ barrier.allowParticipantsToProceed();
+ const results = {
+ result: 0,
+ numCalled: 0,
+ numCompleted: 0,
+ numFailed: 0,
+ };
+ const runners = [];
+ const runnerPromises = [];
+
+ for (let i = 0; i < 10; i++) {
+ const { runner } = buildRunner(
+ i % 2 == 0 ? queue1 : queue2,
+ { ...defaultRunnerOpts, concurrency: 2 },
+ barrier,
+ results,
+ );
+ runners.push(runner);
+ runnerPromises.push(runner.run());
+ }
+
+ {
+ const enqueuePromises = [];
+ for (let i = 0; i < 1000; i++) {
+ enqueuePromises.push(
+ (i % 2 == 0 ? queue1 : queue2).enqueue({ increment: i }),
+ );
+ }
+ await Promise.all(enqueuePromises);
+ }
+
+ await Promise.all([
+ waitUntilAllSettled(queue1),
+ waitUntilAllSettled(queue2),
+ ]);
+
+ runners.forEach((runner) => runner.stop());
+ await Promise.all(runnerPromises);
+
+ expect(results.result).toEqual(499500);
+ expect(results.numCalled).toEqual(1000);
+ expect(results.numCompleted).toEqual(1000);
+ expect(results.numFailed).toEqual(0);
+ });
+});
diff --git a/packages/queue/runner.ts b/packages/queue/runner.ts
new file mode 100644
index 00000000..1a90f969
--- /dev/null
+++ b/packages/queue/runner.ts
@@ -0,0 +1,115 @@
+import assert from "node:assert";
+import { Semaphore } from "async-mutex";
+
+import { RunnerFuncs, RunnerOptions } from "./options";
+import { SqliteQueue } from "./queue";
+import { Job } from "./schema";
+import { DequeuedJob } from "./types";
+
+export class Runner<T> {
+ queue: SqliteQueue<T>;
+ funcs: RunnerFuncs<T>;
+ opts: RunnerOptions<T>;
+ stopping = false;
+
+ constructor(
+ queue: SqliteQueue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ) {
+ this.queue = queue;
+ this.funcs = funcs;
+ this.opts = opts;
+ }
+
+ async run() {
+ return this.runImpl(false);
+ }
+
+ stop() {
+ this.stopping = true;
+ }
+
+ async runUntilEmpty() {
+ return this.runImpl(true);
+ }
+
+ async runImpl(breakOnEmpty: boolean) {
+ const semaphore = new Semaphore(this.opts.concurrency);
+ const inFlight = new Map<number, Promise<void>>();
+ while (!this.stopping) {
+ await semaphore.waitForUnlock();
+ const job = await this.queue.attemptDequeue({
+ timeoutSecs: this.opts.timeoutSecs,
+ });
+ if (!job && breakOnEmpty && inFlight.size == 0) {
+ // No more jobs to process, and no ongoing jobs.
+ break;
+ }
+ if (!job) {
+ await new Promise((resolve) =>
+ setTimeout(resolve, this.opts.pollIntervalMs),
+ );
+ continue;
+ }
+ const [_, release] = await semaphore.acquire();
+ inFlight.set(
+ job.id,
+ this.runOnce(job).finally(() => {
+ inFlight.delete(job.id);
+ release();
+ }),
+ );
+ }
+ await Promise.allSettled(inFlight.values());
+ }
+
+ async runOnce(job: Job) {
+ assert(job.allocationId);
+
+ let parsed: T;
+ try {
+ parsed = JSON.parse(job.payload) as T;
+ if (this.opts.validator) {
+ parsed = this.opts.validator.parse(parsed);
+ }
+ } catch (e) {
+ if (job.numRunsLeft <= 0) {
+ await this.funcs.onError?.({
+ id: job.id.toString(),
+ error: e as Error,
+ });
+ await this.queue.finalize(job.id, job.allocationId, "failed");
+ } else {
+ await this.queue.finalize(job.id, job.allocationId, "pending_retry");
+ }
+ return;
+ }
+
+ const dequeuedJob: DequeuedJob<T> = {
+ id: job.id.toString(),
+ data: parsed,
+ runNumber: job.maxNumRuns - job.numRunsLeft - 1,
+ };
+ try {
+ await Promise.race([
+ this.funcs.run(dequeuedJob),
+ new Promise((_, reject) =>
+ setTimeout(
+ () => reject(new Error("Timeout")),
+ this.opts.timeoutSecs * 1000,
+ ),
+ ),
+ ]);
+ await this.funcs.onComplete?.(dequeuedJob);
+ await this.queue.finalize(job.id, job.allocationId, "completed");
+ } catch (e) {
+ if (job.numRunsLeft <= 0) {
+ await this.funcs.onError?.({ ...dequeuedJob, error: e as Error });
+ await this.queue.finalize(job.id, job.allocationId, "failed");
+ } else {
+ await this.queue.finalize(job.id, job.allocationId, "pending_retry");
+ }
+ }
+ }
+}
diff --git a/packages/queue/schema.ts b/packages/queue/schema.ts
new file mode 100644
index 00000000..377c6b1c
--- /dev/null
+++ b/packages/queue/schema.ts
@@ -0,0 +1,36 @@
+import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
+
+function createdAtField() {
+ return integer("createdAt", { mode: "timestamp" })
+ .notNull()
+ .$defaultFn(() => new Date());
+}
+
+export const tasksTable = sqliteTable(
+ "tasks",
+ {
+ id: integer("id").notNull().primaryKey({ autoIncrement: true }),
+ queue: text("queue").notNull(),
+ payload: text("payload").notNull(),
+ createdAt: createdAtField(),
+ status: text("status", {
+ enum: ["pending", "running", "pending_retry", "failed"],
+ })
+ .notNull()
+ .default("pending"),
+ expireAt: integer("expireAt", { mode: "timestamp" }),
+ allocationId: text("allocationId").notNull(),
+ numRunsLeft: integer("numRunsLeft").notNull(),
+ maxNumRuns: integer("maxNumRuns").notNull(),
+ },
+ (tasks) => ({
+ queueIdx: index("tasks_queue_idx").on(tasks.queue),
+ statusIdx: index("tasks_status_idx").on(tasks.status),
+ expireAtIdx: index("tasks_expire_at_idx").on(tasks.expireAt),
+ numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft),
+ maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns),
+ allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId),
+ }),
+);
+
+export type Job = typeof tasksTable.$inferSelect;
diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json
new file mode 100644
index 00000000..71bf61e7
--- /dev/null
+++ b/packages/queue/tsconfig.json
@@ -0,0 +1,9 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@hoarder/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ },
+}
diff --git a/packages/queue/types.ts b/packages/queue/types.ts
new file mode 100644
index 00000000..01975cc7
--- /dev/null
+++ b/packages/queue/types.ts
@@ -0,0 +1,11 @@
+export interface DequeuedJob<T> {
+ id: string;
+ data: T;
+ runNumber: number;
+}
+
+export interface DequeuedJobError<T> {
+ id: string;
+ data?: T;
+ error: Error;
+}
diff --git a/packages/queue/vitest.config.ts b/packages/queue/vitest.config.ts
new file mode 100644
index 00000000..a206cfc4
--- /dev/null
+++ b/packages/queue/vitest.config.ts
@@ -0,0 +1,13 @@
+/// <reference types="vitest" />
+
+import { defineConfig } from "vitest/config";
+
+// https://vitejs.dev/config/
+export default defineConfig({
+ plugins: [],
+ test: {
+ alias: {
+ "@/*": "./*",
+ },
+ },
+});