diff options
Diffstat (limited to 'packages/queue')
| -rw-r--r-- | packages/queue/db.ts | 19 | ||||
| -rw-r--r-- | packages/queue/drizzle.config.ts | 10 | ||||
| -rw-r--r-- | packages/queue/drizzle/0000_wonderful_talisman.sql | 18 | ||||
| -rw-r--r-- | packages/queue/drizzle/meta/0000_snapshot.json | 130 | ||||
| -rw-r--r-- | packages/queue/drizzle/meta/_journal.json | 13 | ||||
| -rw-r--r-- | packages/queue/index.ts | 6 | ||||
| -rw-r--r-- | packages/queue/options.ts | 22 | ||||
| -rw-r--r-- | packages/queue/package.json | 35 | ||||
| -rw-r--r-- | packages/queue/queue.ts | 146 | ||||
| -rw-r--r-- | packages/queue/runner.test.ts | 440 | ||||
| -rw-r--r-- | packages/queue/runner.ts | 115 | ||||
| -rw-r--r-- | packages/queue/schema.ts | 36 | ||||
| -rw-r--r-- | packages/queue/tsconfig.json | 9 | ||||
| -rw-r--r-- | packages/queue/types.ts | 11 | ||||
| -rw-r--r-- | packages/queue/vitest.config.ts | 13 |
15 files changed, 0 insertions, 1023 deletions
diff --git a/packages/queue/db.ts b/packages/queue/db.ts deleted file mode 100644 index f1412fef..00000000 --- a/packages/queue/db.ts +++ /dev/null @@ -1,19 +0,0 @@ -import path from "node:path"; -import Database from "better-sqlite3"; -import { BetterSQLite3Database, drizzle } from "drizzle-orm/better-sqlite3"; -import { migrate } from "drizzle-orm/better-sqlite3/migrator"; - -import * as schema from "./schema"; - -export function buildDBClient(dbPath: string, runMigrations = false) { - const sqlite = new Database(dbPath); - const db = drizzle(sqlite, { schema }); - if (runMigrations) { - migrateDB(db); - } - return db; -} - -export function migrateDB(db: BetterSQLite3Database<typeof schema>) { - migrate(db, { migrationsFolder: path.join(__dirname, "drizzle") }); -} diff --git a/packages/queue/drizzle.config.ts b/packages/queue/drizzle.config.ts deleted file mode 100644 index 6ef01d1b..00000000 --- a/packages/queue/drizzle.config.ts +++ /dev/null @@ -1,10 +0,0 @@ -import type { Config } from "drizzle-kit"; - -export default { - schema: "./schema.ts", - out: "./drizzle", - driver: "better-sqlite", - dbCredentials: { - url: "data.db", - }, -} satisfies Config; diff --git a/packages/queue/drizzle/0000_wonderful_talisman.sql b/packages/queue/drizzle/0000_wonderful_talisman.sql deleted file mode 100644 index e042ab92..00000000 --- a/packages/queue/drizzle/0000_wonderful_talisman.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE TABLE `tasks` ( - `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, - `queue` text NOT NULL, - `payload` text NOT NULL, - `createdAt` integer NOT NULL, - `status` text DEFAULT 'pending' NOT NULL, - `expireAt` integer, - `allocationId` text NOT NULL, - `numRunsLeft` integer NOT NULL, - `maxNumRuns` integer NOT NULL -); ---> statement-breakpoint -CREATE INDEX `tasks_queue_idx` ON `tasks` (`queue`);--> statement-breakpoint -CREATE INDEX `tasks_status_idx` ON `tasks` (`status`);--> statement-breakpoint -CREATE INDEX `tasks_expire_at_idx` ON `tasks` (`expireAt`);--> statement-breakpoint -CREATE INDEX `tasks_num_runs_left_idx` ON `tasks` (`numRunsLeft`);--> statement-breakpoint -CREATE INDEX `tasks_max_num_runs_idx` ON `tasks` (`maxNumRuns`);--> statement-breakpoint -CREATE INDEX `tasks_allocation_id_idx` ON `tasks` (`allocationId`);
\ No newline at end of file diff --git a/packages/queue/drizzle/meta/0000_snapshot.json b/packages/queue/drizzle/meta/0000_snapshot.json deleted file mode 100644 index 57c7c2f4..00000000 --- a/packages/queue/drizzle/meta/0000_snapshot.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "version": "5", - "dialect": "sqlite", - "id": "3094773c-0138-46b2-b617-4b10093b0f53", - "prevId": "00000000-0000-0000-0000-000000000000", - "tables": { - "tasks": { - "name": "tasks", - "columns": { - "id": { - "name": "id", - "type": "integer", - "primaryKey": true, - "notNull": true, - "autoincrement": true - }, - "queue": { - "name": "queue", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "payload": { - "name": "payload", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "createdAt": { - "name": "createdAt", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "status": { - "name": "status", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": "'pending'" - }, - "expireAt": { - "name": "expireAt", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "allocationId": { - "name": "allocationId", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "numRunsLeft": { - "name": "numRunsLeft", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "maxNumRuns": { - "name": "maxNumRuns", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": { - "tasks_queue_idx": { - "name": "tasks_queue_idx", - "columns": [ - "queue" - ], - "isUnique": false - }, - "tasks_status_idx": { - "name": "tasks_status_idx", - "columns": [ - "status" - ], - "isUnique": false - }, - "tasks_expire_at_idx": { - "name": "tasks_expire_at_idx", - "columns": [ - "expireAt" - ], - "isUnique": false - }, - "tasks_num_runs_left_idx": { - "name": "tasks_num_runs_left_idx", - "columns": [ - "numRunsLeft" - ], - "isUnique": false - }, - "tasks_max_num_runs_idx": { - "name": "tasks_max_num_runs_idx", - "columns": [ - "maxNumRuns" - ], - "isUnique": false - }, - "tasks_allocation_id_idx": { - "name": "tasks_allocation_id_idx", - "columns": [ - "allocationId" - ], - "isUnique": false - } - }, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {} - } - }, - "enums": {}, - "_meta": { - "schemas": {}, - "tables": {}, - "columns": {} - } -}
\ No newline at end of file diff --git a/packages/queue/drizzle/meta/_journal.json b/packages/queue/drizzle/meta/_journal.json deleted file mode 100644 index 2b14f895..00000000 --- a/packages/queue/drizzle/meta/_journal.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "version": "5", - "dialect": "sqlite", - "entries": [ - { - "idx": 0, - "version": "5", - "when": 1720992922192, - "tag": "0000_wonderful_talisman", - "breakpoints": true - } - ] -}
\ No newline at end of file diff --git a/packages/queue/index.ts b/packages/queue/index.ts deleted file mode 100644 index c9144f29..00000000 --- a/packages/queue/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -export { SqliteQueue } from "./queue"; -export { buildDBClient, migrateDB } from "./db"; -export type { SqliteQueueOptions, RunnerOptions, RunnerFuncs } from "./options"; -export { Runner } from "./runner"; - -export type { DequeuedJob, DequeuedJobError } from "./types"; diff --git a/packages/queue/options.ts b/packages/queue/options.ts deleted file mode 100644 index 18f8e52d..00000000 --- a/packages/queue/options.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { ZodType } from "zod"; - -import { DequeuedJob, DequeuedJobError } from "./types"; - -export interface SqliteQueueOptions { - defaultJobArgs: { - numRetries: number; - }; -} - -export interface RunnerFuncs<T> { - run: (job: DequeuedJob<T>) => Promise<void>; - onComplete?: (job: DequeuedJob<T>) => Promise<void>; - onError?: (job: DequeuedJobError<T>) => Promise<void>; -} - -export interface RunnerOptions<T> { - pollIntervalMs: number; - timeoutSecs: number; - concurrency: number; - validator?: ZodType<T>; -} diff --git a/packages/queue/package.json b/packages/queue/package.json deleted file mode 100644 index 146a88b7..00000000 --- a/packages/queue/package.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@hoarder/queue", - "version": "0.1.0", - "private": true, - "type": "module", - "dependencies": { - "async-mutex": "^0.4.1", - "better-sqlite3": "^11.3.0", - "drizzle-orm": "^0.33.0", - "zod": "^3.22.4" - }, - "devDependencies": { - "@hoarder/eslint-config": "workspace:^0.2.0", - "@hoarder/prettier-config": "workspace:^0.1.0", - "@hoarder/tsconfig": "workspace:^0.1.0", - "@types/better-sqlite3": "^7.6.11", - "drizzle-kit": "^0.20.14", - "vitest": "^1.3.1" - }, - "scripts": { - "typecheck": "tsc --noEmit", - "test": "vitest", - "format": "prettier . --ignore-path ../../.prettierignore", - "lint": "eslint ." - }, - "main": "index.ts", - "eslintConfig": { - "root": true, - "extends": [ - "@hoarder/eslint-config/base" - ] - }, - "prettier": "@hoarder/prettier-config" -} diff --git a/packages/queue/queue.ts b/packages/queue/queue.ts deleted file mode 100644 index ad486468..00000000 --- a/packages/queue/queue.ts +++ /dev/null @@ -1,146 +0,0 @@ -import assert from "node:assert"; -import { and, asc, count, eq, gt, lt, or } from "drizzle-orm"; - -import { buildDBClient } from "./db"; -import { SqliteQueueOptions } from "./options"; -import { Job, tasksTable } from "./schema"; - -// generate random id -function generateAllocationId() { - return Math.random().toString(36).substring(2, 15); -} - -export class SqliteQueue<T> { - queueName: string; - db: ReturnType<typeof buildDBClient>; - options: SqliteQueueOptions; - - constructor( - name: string, - db: ReturnType<typeof buildDBClient>, - options: SqliteQueueOptions, - ) { - this.queueName = name; - this.options = options; - this.db = db; - } - - name() { - return this.queueName; - } - - async enqueue(payload: T): Promise<Job> { - const job = await this.db - .insert(tasksTable) - .values({ - queue: this.queueName, - payload: JSON.stringify(payload), - numRunsLeft: this.options.defaultJobArgs.numRetries + 1, - maxNumRuns: this.options.defaultJobArgs.numRetries + 1, - allocationId: generateAllocationId(), - }) - .returning(); - - return job[0]; - } - - async stats() { - const res = await this.db - .select({ status: tasksTable.status, count: count() }) - .from(tasksTable) - .where(eq(tasksTable.queue, this.queueName)) - .groupBy(tasksTable.status); - - return res.reduce( - (acc, r) => { - acc[r.status] += r.count; - return acc; - }, - { - pending: 0, - pending_retry: 0, - running: 0, - failed: 0, - }, - ); - } - - async attemptDequeue(options: { timeoutSecs: number }): Promise<Job | null> { - return await this.db.transaction(async (txn) => { - const jobs = await txn - .select() - .from(tasksTable) - .where( - and( - eq(tasksTable.queue, this.queueName), - gt(tasksTable.numRunsLeft, 0), - or( - // Not picked by a worker yet - eq(tasksTable.status, "pending"), - - // Failed but still has attempts left - eq(tasksTable.status, "pending_retry"), - - // Expired and still has attempts left - and( - eq(tasksTable.status, "running"), - lt(tasksTable.expireAt, new Date()), - ), - ), - ), - ) - .orderBy(asc(tasksTable.createdAt)) - .limit(1); - - if (jobs.length == 0) { - return null; - } - assert(jobs.length == 1); - const job = jobs[0]; - - const result = await txn - .update(tasksTable) - .set({ - status: "running", - numRunsLeft: job.numRunsLeft - 1, - allocationId: generateAllocationId(), - expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), - }) - .where( - and( - eq(tasksTable.id, job.id), - - // The compare and swap is necessary to avoid race conditions - eq(tasksTable.allocationId, job.allocationId), - ), - ) - .returning(); - if (result.length == 0) { - return null; - } - assert(result.length == 1); - return result[0]; - }); - } - - async finalize( - id: number, - alloctionId: string, - status: "completed" | "pending_retry" | "failed", - ) { - if (status == "completed") { - await this.db - .delete(tasksTable) - .where( - and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), - ); - } else { - await this.db - .update(tasksTable) - .set({ status: status, expireAt: null }) - .where( - and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), - ); - } - } -} diff --git a/packages/queue/runner.test.ts b/packages/queue/runner.test.ts deleted file mode 100644 index 7777b422..00000000 --- a/packages/queue/runner.test.ts +++ /dev/null @@ -1,440 +0,0 @@ -/* eslint-disable @typescript-eslint/require-await */ -import { Semaphore } from "async-mutex"; -import { eq } from "drizzle-orm"; -import { describe, expect, test } from "vitest"; -import { z } from "zod"; - -import { - buildDBClient, - DequeuedJob, - DequeuedJobError, - Runner, - RunnerOptions, - SqliteQueue, -} from "./"; -import { tasksTable } from "./schema"; - -class Baton { - semaphore: Semaphore; - constructor() { - this.semaphore = new Semaphore(0); - this.reset(); - } - post() { - this.semaphore.setValue(100000); - } - - async wait() { - await this.semaphore.acquire(); - } - - reset() { - this.semaphore.setValue(-Infinity); - } -} - -class Barrier { - semaphore: Semaphore; - baton: Baton; - constructor(numParticipants: number) { - this.semaphore = new Semaphore(numParticipants * -1 + 1); - this.baton = new Baton(); - this.reset(numParticipants * -1 + 1); - } - - async notifyReachedAndWait() { - this.semaphore.release(); - await this.baton.wait(); - } - - async waitUntilAllReached() { - await this.semaphore.waitForUnlock(); - } - - allowParticipantsToProceed() { - this.baton.post(); - } - - reset(numParticipants: number) { - this.semaphore.setValue(numParticipants); - this.baton.reset(); - } -} - -const defaultRunnerOpts = { - pollIntervalMs: 100, - timeoutSecs: 100, - concurrency: 2, - validator: z.object({ - increment: z.number(), - succeedAfter: z.number().optional().default(0), - blockForSec: z.number().optional().default(0), - }), -}; - -interface Work { - increment: number; - succeedAfter?: number; - blockForSec?: number; -} - -interface Results { - result: number; - numCalled: number; - numCompleted: number; - numFailed: number; -} - -async function waitUntilAllSettled(queue: SqliteQueue<Work>) { - let stats = await queue.stats(); - while (stats.running > 0 || stats.pending > 0 || stats.pending_retry > 0) { - await new Promise((resolve) => setTimeout(resolve, 100)); - stats = await queue.stats(); - console.log(stats); - } -} - -function buildRunner( - queue: SqliteQueue<Work>, - opts: RunnerOptions<Work>, - barrier: Barrier, - inputResults?: Results, -) { - const results = inputResults ?? { - result: 0, - numCalled: 0, - numCompleted: 0, - numFailed: 0, - }; - const runner = new Runner<Work>( - queue, - { - run: async (job: DequeuedJob<Work>) => { - console.log("STARTED:", job); - results.numCalled++; - await barrier.notifyReachedAndWait(); - if (job.runNumber < (job.data.succeedAfter ?? 0)) { - throw new Error("Failed"); - } - if (job.data.blockForSec !== undefined) { - await new Promise((resolve) => - setTimeout(resolve, job.data.blockForSec! * 1000), - ); - } - results.result += job.data.increment; - }, - onComplete: async (job: DequeuedJob<Work>) => { - console.log("COMPLETED:", job); - results.numCompleted++; - }, - onError: async (job: DequeuedJobError<Work>) => { - console.log("FAILED:", job); - results.numFailed++; - }, - }, - opts, - ); - - return { runner, results }; -} - -describe("SqiteQueueRunner", () => { - test("should run jobs with correct concurrency", async () => { - const queue = new SqliteQueue<Work>( - "queue1", - buildDBClient(":memory:", true), - { - defaultJobArgs: { - numRetries: 0, - }, - }, - ); - - const barrier = new Barrier(2); - const { runner, results } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 2 }, - barrier, - ); - - await queue.enqueue({ increment: 1 }); - await queue.enqueue({ increment: 2 }); - await queue.enqueue({ increment: 3 }); - - expect(await queue.stats()).toEqual({ - pending: 3, - running: 0, - pending_retry: 0, - failed: 0, - }); - - const runnerPromise = runner.runUntilEmpty(); - - // Wait until all runners reach the synchronization point - await barrier.waitUntilAllReached(); - - // Ensure that we have two "running" jobs given the concurrency of 2 - expect(await queue.stats()).toEqual({ - pending: 1, - running: 2, - pending_retry: 0, - failed: 0, - }); - - // Allow jobs to proceed - barrier.allowParticipantsToProceed(); - - // Wait until all jobs are consumed - await runnerPromise; - - expect(await queue.stats()).toEqual({ - pending: 0, - running: 0, - pending_retry: 0, - failed: 0, - }); - - expect(results.result).toEqual(6); - expect(results.numCalled).toEqual(3); - expect(results.numCompleted).toEqual(3); - expect(results.numFailed).toEqual(0); - }); - - test("should retry errors", async () => { - const queue = new SqliteQueue<Work>( - "queue1", - buildDBClient(":memory:", true), - { - defaultJobArgs: { - numRetries: 2, - }, - }, - ); - - const barrier = new Barrier(0); - barrier.allowParticipantsToProceed(); - const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier); - - await queue.enqueue({ increment: 1, succeedAfter: 2 }); - await queue.enqueue({ increment: 1, succeedAfter: 10 }); - await queue.enqueue({ increment: 3, succeedAfter: 0 }); - - const runnerPromise = runner.runUntilEmpty(); - - // Wait until all jobs are consumed - await runnerPromise; - - expect(await queue.stats()).toEqual({ - pending: 0, - pending_retry: 0, - running: 0, - failed: 1, - }); - - expect(results.result).toEqual(4); - expect(results.numCalled).toEqual(7); - expect(results.numCompleted).toEqual(2); - expect(results.numFailed).toEqual(1); - }); - - test("timeouts are respected", async () => { - const queue = new SqliteQueue<Work>( - "queue1", - buildDBClient(":memory:", true), - { - defaultJobArgs: { - numRetries: 1, - }, - }, - ); - - const barrier = new Barrier(1); - barrier.allowParticipantsToProceed(); - const { runner: runner, results } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 1, timeoutSecs: 1 }, - barrier, - ); - - await queue.enqueue({ increment: 1, blockForSec: 10 }); - await runner.runUntilEmpty(); - - expect(await queue.stats()).toEqual({ - pending: 0, - pending_retry: 0, - running: 0, - failed: 1, - }); - - expect(results.result).toEqual(0); - expect(results.numCalled).toEqual(2); - expect(results.numCompleted).toEqual(0); - expect(results.numFailed).toEqual(1); - }); - - test("serialization errors", async () => { - const queue = new SqliteQueue<Work>( - "queue1", - buildDBClient(":memory:", true), - { - defaultJobArgs: { - numRetries: 1, - }, - }, - ); - - const job = await queue.enqueue({ increment: 1 }); - // Corrupt the payload - await queue.db - .update(tasksTable) - .set({ payload: "{}" }) - .where(eq(tasksTable.id, job.id)); - - const barrier = new Barrier(1); - barrier.allowParticipantsToProceed(); - const { runner, results } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 1 }, - barrier, - ); - - const p = runner.run(); - await waitUntilAllSettled(queue); - runner.stop(); - await p; - - expect(await queue.stats()).toEqual({ - pending: 0, - pending_retry: 0, - running: 0, - failed: 1, - }); - - expect(results.result).toEqual(0); - expect(results.numCalled).toEqual(0); - expect(results.numCompleted).toEqual(0); - expect(results.numFailed).toEqual(1); - }); - - test("concurrent runners", async () => { - const queue = new SqliteQueue<Work>( - "queue1", - buildDBClient(":memory:", true), - { - defaultJobArgs: { - numRetries: 0, - }, - }, - ); - - await queue.enqueue({ increment: 1 }); - await queue.enqueue({ increment: 2 }); - await queue.enqueue({ increment: 3 }); - - const barrier = new Barrier(3); - const { runner: runner1, results } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 1 }, - barrier, - ); - const { runner: runner2 } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 1 }, - barrier, - results, - ); - const { runner: runner3 } = buildRunner( - queue, - { ...defaultRunnerOpts, concurrency: 1 }, - barrier, - results, - ); - - const runPromises = Promise.all([ - runner1.run(), - runner2.run(), - runner3.run(), - ]); - - await barrier.waitUntilAllReached(); - - expect(await queue.stats()).toEqual({ - pending: 0, - pending_retry: 0, - running: 3, - failed: 0, - }); - - barrier.allowParticipantsToProceed(); - - runner1.stop(); - runner2.stop(); - runner3.stop(); - - await runPromises; - - expect(results.result).toEqual(6); - expect(results.numCalled).toEqual(3); - expect(results.numCompleted).toEqual(3); - expect(results.numFailed).toEqual(0); - }); - - test("large test", async () => { - const db = buildDBClient(":memory:", true); - const queue1 = new SqliteQueue<Work>("queue1", db, { - defaultJobArgs: { - numRetries: 0, - }, - }); - const queue2 = new SqliteQueue<Work>("queue2", db, { - defaultJobArgs: { - numRetries: 0, - }, - }); - - const barrier = new Barrier(0); - barrier.allowParticipantsToProceed(); - const results = { - result: 0, - numCalled: 0, - numCompleted: 0, - numFailed: 0, - }; - const runners = []; - const runnerPromises = []; - - for (let i = 0; i < 10; i++) { - const { runner } = buildRunner( - i % 2 == 0 ? queue1 : queue2, - { ...defaultRunnerOpts, concurrency: 2 }, - barrier, - results, - ); - runners.push(runner); - runnerPromises.push(runner.run()); - } - - { - const enqueuePromises = []; - for (let i = 0; i < 1000; i++) { - enqueuePromises.push( - (i % 2 == 0 ? queue1 : queue2).enqueue({ increment: i }), - ); - } - await Promise.all(enqueuePromises); - } - - await Promise.all([ - waitUntilAllSettled(queue1), - waitUntilAllSettled(queue2), - ]); - - runners.forEach((runner) => runner.stop()); - await Promise.all(runnerPromises); - - expect(results.result).toEqual(499500); - expect(results.numCalled).toEqual(1000); - expect(results.numCompleted).toEqual(1000); - expect(results.numFailed).toEqual(0); - }); -}); diff --git a/packages/queue/runner.ts b/packages/queue/runner.ts deleted file mode 100644 index 1a90f969..00000000 --- a/packages/queue/runner.ts +++ /dev/null @@ -1,115 +0,0 @@ -import assert from "node:assert"; -import { Semaphore } from "async-mutex"; - -import { RunnerFuncs, RunnerOptions } from "./options"; -import { SqliteQueue } from "./queue"; -import { Job } from "./schema"; -import { DequeuedJob } from "./types"; - -export class Runner<T> { - queue: SqliteQueue<T>; - funcs: RunnerFuncs<T>; - opts: RunnerOptions<T>; - stopping = false; - - constructor( - queue: SqliteQueue<T>, - funcs: RunnerFuncs<T>, - opts: RunnerOptions<T>, - ) { - this.queue = queue; - this.funcs = funcs; - this.opts = opts; - } - - async run() { - return this.runImpl(false); - } - - stop() { - this.stopping = true; - } - - async runUntilEmpty() { - return this.runImpl(true); - } - - async runImpl(breakOnEmpty: boolean) { - const semaphore = new Semaphore(this.opts.concurrency); - const inFlight = new Map<number, Promise<void>>(); - while (!this.stopping) { - await semaphore.waitForUnlock(); - const job = await this.queue.attemptDequeue({ - timeoutSecs: this.opts.timeoutSecs, - }); - if (!job && breakOnEmpty && inFlight.size == 0) { - // No more jobs to process, and no ongoing jobs. - break; - } - if (!job) { - await new Promise((resolve) => - setTimeout(resolve, this.opts.pollIntervalMs), - ); - continue; - } - const [_, release] = await semaphore.acquire(); - inFlight.set( - job.id, - this.runOnce(job).finally(() => { - inFlight.delete(job.id); - release(); - }), - ); - } - await Promise.allSettled(inFlight.values()); - } - - async runOnce(job: Job) { - assert(job.allocationId); - - let parsed: T; - try { - parsed = JSON.parse(job.payload) as T; - if (this.opts.validator) { - parsed = this.opts.validator.parse(parsed); - } - } catch (e) { - if (job.numRunsLeft <= 0) { - await this.funcs.onError?.({ - id: job.id.toString(), - error: e as Error, - }); - await this.queue.finalize(job.id, job.allocationId, "failed"); - } else { - await this.queue.finalize(job.id, job.allocationId, "pending_retry"); - } - return; - } - - const dequeuedJob: DequeuedJob<T> = { - id: job.id.toString(), - data: parsed, - runNumber: job.maxNumRuns - job.numRunsLeft - 1, - }; - try { - await Promise.race([ - this.funcs.run(dequeuedJob), - new Promise((_, reject) => - setTimeout( - () => reject(new Error("Timeout")), - this.opts.timeoutSecs * 1000, - ), - ), - ]); - await this.funcs.onComplete?.(dequeuedJob); - await this.queue.finalize(job.id, job.allocationId, "completed"); - } catch (e) { - if (job.numRunsLeft <= 0) { - await this.funcs.onError?.({ ...dequeuedJob, error: e as Error }); - await this.queue.finalize(job.id, job.allocationId, "failed"); - } else { - await this.queue.finalize(job.id, job.allocationId, "pending_retry"); - } - } - } -} diff --git a/packages/queue/schema.ts b/packages/queue/schema.ts deleted file mode 100644 index 377c6b1c..00000000 --- a/packages/queue/schema.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core"; - -function createdAtField() { - return integer("createdAt", { mode: "timestamp" }) - .notNull() - .$defaultFn(() => new Date()); -} - -export const tasksTable = sqliteTable( - "tasks", - { - id: integer("id").notNull().primaryKey({ autoIncrement: true }), - queue: text("queue").notNull(), - payload: text("payload").notNull(), - createdAt: createdAtField(), - status: text("status", { - enum: ["pending", "running", "pending_retry", "failed"], - }) - .notNull() - .default("pending"), - expireAt: integer("expireAt", { mode: "timestamp" }), - allocationId: text("allocationId").notNull(), - numRunsLeft: integer("numRunsLeft").notNull(), - maxNumRuns: integer("maxNumRuns").notNull(), - }, - (tasks) => ({ - queueIdx: index("tasks_queue_idx").on(tasks.queue), - statusIdx: index("tasks_status_idx").on(tasks.status), - expireAtIdx: index("tasks_expire_at_idx").on(tasks.expireAt), - numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft), - maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns), - allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId), - }), -); - -export type Job = typeof tasksTable.$inferSelect; diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json deleted file mode 100644 index 71bf61e7..00000000 --- a/packages/queue/tsconfig.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@hoarder/tsconfig/node.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" - }, -} diff --git a/packages/queue/types.ts b/packages/queue/types.ts deleted file mode 100644 index 01975cc7..00000000 --- a/packages/queue/types.ts +++ /dev/null @@ -1,11 +0,0 @@ -export interface DequeuedJob<T> { - id: string; - data: T; - runNumber: number; -} - -export interface DequeuedJobError<T> { - id: string; - data?: T; - error: Error; -} diff --git a/packages/queue/vitest.config.ts b/packages/queue/vitest.config.ts deleted file mode 100644 index a206cfc4..00000000 --- a/packages/queue/vitest.config.ts +++ /dev/null @@ -1,13 +0,0 @@ -/// <reference types="vitest" /> - -import { defineConfig } from "vitest/config"; - -// https://vitejs.dev/config/ -export default defineConfig({ - plugins: [], - test: { - alias: { - "@/*": "./*", - }, - }, -}); |
