aboutsummaryrefslogtreecommitdiffstats
path: root/packages/queue
diff options
context:
space:
mode:
Diffstat (limited to 'packages/queue')
-rw-r--r--packages/queue/db.ts19
-rw-r--r--packages/queue/drizzle.config.ts10
-rw-r--r--packages/queue/drizzle/0000_wonderful_talisman.sql18
-rw-r--r--packages/queue/drizzle/meta/0000_snapshot.json130
-rw-r--r--packages/queue/drizzle/meta/_journal.json13
-rw-r--r--packages/queue/index.ts6
-rw-r--r--packages/queue/options.ts22
-rw-r--r--packages/queue/package.json35
-rw-r--r--packages/queue/queue.ts146
-rw-r--r--packages/queue/runner.test.ts440
-rw-r--r--packages/queue/runner.ts115
-rw-r--r--packages/queue/schema.ts36
-rw-r--r--packages/queue/tsconfig.json9
-rw-r--r--packages/queue/types.ts11
-rw-r--r--packages/queue/vitest.config.ts13
15 files changed, 0 insertions, 1023 deletions
diff --git a/packages/queue/db.ts b/packages/queue/db.ts
deleted file mode 100644
index f1412fef..00000000
--- a/packages/queue/db.ts
+++ /dev/null
@@ -1,19 +0,0 @@
-import path from "node:path";
-import Database from "better-sqlite3";
-import { BetterSQLite3Database, drizzle } from "drizzle-orm/better-sqlite3";
-import { migrate } from "drizzle-orm/better-sqlite3/migrator";
-
-import * as schema from "./schema";
-
-export function buildDBClient(dbPath: string, runMigrations = false) {
- const sqlite = new Database(dbPath);
- const db = drizzle(sqlite, { schema });
- if (runMigrations) {
- migrateDB(db);
- }
- return db;
-}
-
-export function migrateDB(db: BetterSQLite3Database<typeof schema>) {
- migrate(db, { migrationsFolder: path.join(__dirname, "drizzle") });
-}
diff --git a/packages/queue/drizzle.config.ts b/packages/queue/drizzle.config.ts
deleted file mode 100644
index 6ef01d1b..00000000
--- a/packages/queue/drizzle.config.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-import type { Config } from "drizzle-kit";
-
-export default {
- schema: "./schema.ts",
- out: "./drizzle",
- driver: "better-sqlite",
- dbCredentials: {
- url: "data.db",
- },
-} satisfies Config;
diff --git a/packages/queue/drizzle/0000_wonderful_talisman.sql b/packages/queue/drizzle/0000_wonderful_talisman.sql
deleted file mode 100644
index e042ab92..00000000
--- a/packages/queue/drizzle/0000_wonderful_talisman.sql
+++ /dev/null
@@ -1,18 +0,0 @@
-CREATE TABLE `tasks` (
- `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL,
- `queue` text NOT NULL,
- `payload` text NOT NULL,
- `createdAt` integer NOT NULL,
- `status` text DEFAULT 'pending' NOT NULL,
- `expireAt` integer,
- `allocationId` text NOT NULL,
- `numRunsLeft` integer NOT NULL,
- `maxNumRuns` integer NOT NULL
-);
---> statement-breakpoint
-CREATE INDEX `tasks_queue_idx` ON `tasks` (`queue`);--> statement-breakpoint
-CREATE INDEX `tasks_status_idx` ON `tasks` (`status`);--> statement-breakpoint
-CREATE INDEX `tasks_expire_at_idx` ON `tasks` (`expireAt`);--> statement-breakpoint
-CREATE INDEX `tasks_num_runs_left_idx` ON `tasks` (`numRunsLeft`);--> statement-breakpoint
-CREATE INDEX `tasks_max_num_runs_idx` ON `tasks` (`maxNumRuns`);--> statement-breakpoint
-CREATE INDEX `tasks_allocation_id_idx` ON `tasks` (`allocationId`); \ No newline at end of file
diff --git a/packages/queue/drizzle/meta/0000_snapshot.json b/packages/queue/drizzle/meta/0000_snapshot.json
deleted file mode 100644
index 57c7c2f4..00000000
--- a/packages/queue/drizzle/meta/0000_snapshot.json
+++ /dev/null
@@ -1,130 +0,0 @@
-{
- "version": "5",
- "dialect": "sqlite",
- "id": "3094773c-0138-46b2-b617-4b10093b0f53",
- "prevId": "00000000-0000-0000-0000-000000000000",
- "tables": {
- "tasks": {
- "name": "tasks",
- "columns": {
- "id": {
- "name": "id",
- "type": "integer",
- "primaryKey": true,
- "notNull": true,
- "autoincrement": true
- },
- "queue": {
- "name": "queue",
- "type": "text",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- },
- "payload": {
- "name": "payload",
- "type": "text",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- },
- "createdAt": {
- "name": "createdAt",
- "type": "integer",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- },
- "status": {
- "name": "status",
- "type": "text",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false,
- "default": "'pending'"
- },
- "expireAt": {
- "name": "expireAt",
- "type": "integer",
- "primaryKey": false,
- "notNull": false,
- "autoincrement": false
- },
- "allocationId": {
- "name": "allocationId",
- "type": "text",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- },
- "numRunsLeft": {
- "name": "numRunsLeft",
- "type": "integer",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- },
- "maxNumRuns": {
- "name": "maxNumRuns",
- "type": "integer",
- "primaryKey": false,
- "notNull": true,
- "autoincrement": false
- }
- },
- "indexes": {
- "tasks_queue_idx": {
- "name": "tasks_queue_idx",
- "columns": [
- "queue"
- ],
- "isUnique": false
- },
- "tasks_status_idx": {
- "name": "tasks_status_idx",
- "columns": [
- "status"
- ],
- "isUnique": false
- },
- "tasks_expire_at_idx": {
- "name": "tasks_expire_at_idx",
- "columns": [
- "expireAt"
- ],
- "isUnique": false
- },
- "tasks_num_runs_left_idx": {
- "name": "tasks_num_runs_left_idx",
- "columns": [
- "numRunsLeft"
- ],
- "isUnique": false
- },
- "tasks_max_num_runs_idx": {
- "name": "tasks_max_num_runs_idx",
- "columns": [
- "maxNumRuns"
- ],
- "isUnique": false
- },
- "tasks_allocation_id_idx": {
- "name": "tasks_allocation_id_idx",
- "columns": [
- "allocationId"
- ],
- "isUnique": false
- }
- },
- "foreignKeys": {},
- "compositePrimaryKeys": {},
- "uniqueConstraints": {}
- }
- },
- "enums": {},
- "_meta": {
- "schemas": {},
- "tables": {},
- "columns": {}
- }
-} \ No newline at end of file
diff --git a/packages/queue/drizzle/meta/_journal.json b/packages/queue/drizzle/meta/_journal.json
deleted file mode 100644
index 2b14f895..00000000
--- a/packages/queue/drizzle/meta/_journal.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "version": "5",
- "dialect": "sqlite",
- "entries": [
- {
- "idx": 0,
- "version": "5",
- "when": 1720992922192,
- "tag": "0000_wonderful_talisman",
- "breakpoints": true
- }
- ]
-} \ No newline at end of file
diff --git a/packages/queue/index.ts b/packages/queue/index.ts
deleted file mode 100644
index c9144f29..00000000
--- a/packages/queue/index.ts
+++ /dev/null
@@ -1,6 +0,0 @@
-export { SqliteQueue } from "./queue";
-export { buildDBClient, migrateDB } from "./db";
-export type { SqliteQueueOptions, RunnerOptions, RunnerFuncs } from "./options";
-export { Runner } from "./runner";
-
-export type { DequeuedJob, DequeuedJobError } from "./types";
diff --git a/packages/queue/options.ts b/packages/queue/options.ts
deleted file mode 100644
index 18f8e52d..00000000
--- a/packages/queue/options.ts
+++ /dev/null
@@ -1,22 +0,0 @@
-import { ZodType } from "zod";
-
-import { DequeuedJob, DequeuedJobError } from "./types";
-
-export interface SqliteQueueOptions {
- defaultJobArgs: {
- numRetries: number;
- };
-}
-
-export interface RunnerFuncs<T> {
- run: (job: DequeuedJob<T>) => Promise<void>;
- onComplete?: (job: DequeuedJob<T>) => Promise<void>;
- onError?: (job: DequeuedJobError<T>) => Promise<void>;
-}
-
-export interface RunnerOptions<T> {
- pollIntervalMs: number;
- timeoutSecs: number;
- concurrency: number;
- validator?: ZodType<T>;
-}
diff --git a/packages/queue/package.json b/packages/queue/package.json
deleted file mode 100644
index 146a88b7..00000000
--- a/packages/queue/package.json
+++ /dev/null
@@ -1,35 +0,0 @@
-{
- "$schema": "https://json.schemastore.org/package.json",
- "name": "@hoarder/queue",
- "version": "0.1.0",
- "private": true,
- "type": "module",
- "dependencies": {
- "async-mutex": "^0.4.1",
- "better-sqlite3": "^11.3.0",
- "drizzle-orm": "^0.33.0",
- "zod": "^3.22.4"
- },
- "devDependencies": {
- "@hoarder/eslint-config": "workspace:^0.2.0",
- "@hoarder/prettier-config": "workspace:^0.1.0",
- "@hoarder/tsconfig": "workspace:^0.1.0",
- "@types/better-sqlite3": "^7.6.11",
- "drizzle-kit": "^0.20.14",
- "vitest": "^1.3.1"
- },
- "scripts": {
- "typecheck": "tsc --noEmit",
- "test": "vitest",
- "format": "prettier . --ignore-path ../../.prettierignore",
- "lint": "eslint ."
- },
- "main": "index.ts",
- "eslintConfig": {
- "root": true,
- "extends": [
- "@hoarder/eslint-config/base"
- ]
- },
- "prettier": "@hoarder/prettier-config"
-}
diff --git a/packages/queue/queue.ts b/packages/queue/queue.ts
deleted file mode 100644
index ad486468..00000000
--- a/packages/queue/queue.ts
+++ /dev/null
@@ -1,146 +0,0 @@
-import assert from "node:assert";
-import { and, asc, count, eq, gt, lt, or } from "drizzle-orm";
-
-import { buildDBClient } from "./db";
-import { SqliteQueueOptions } from "./options";
-import { Job, tasksTable } from "./schema";
-
-// generate random id
-function generateAllocationId() {
- return Math.random().toString(36).substring(2, 15);
-}
-
-export class SqliteQueue<T> {
- queueName: string;
- db: ReturnType<typeof buildDBClient>;
- options: SqliteQueueOptions;
-
- constructor(
- name: string,
- db: ReturnType<typeof buildDBClient>,
- options: SqliteQueueOptions,
- ) {
- this.queueName = name;
- this.options = options;
- this.db = db;
- }
-
- name() {
- return this.queueName;
- }
-
- async enqueue(payload: T): Promise<Job> {
- const job = await this.db
- .insert(tasksTable)
- .values({
- queue: this.queueName,
- payload: JSON.stringify(payload),
- numRunsLeft: this.options.defaultJobArgs.numRetries + 1,
- maxNumRuns: this.options.defaultJobArgs.numRetries + 1,
- allocationId: generateAllocationId(),
- })
- .returning();
-
- return job[0];
- }
-
- async stats() {
- const res = await this.db
- .select({ status: tasksTable.status, count: count() })
- .from(tasksTable)
- .where(eq(tasksTable.queue, this.queueName))
- .groupBy(tasksTable.status);
-
- return res.reduce(
- (acc, r) => {
- acc[r.status] += r.count;
- return acc;
- },
- {
- pending: 0,
- pending_retry: 0,
- running: 0,
- failed: 0,
- },
- );
- }
-
- async attemptDequeue(options: { timeoutSecs: number }): Promise<Job | null> {
- return await this.db.transaction(async (txn) => {
- const jobs = await txn
- .select()
- .from(tasksTable)
- .where(
- and(
- eq(tasksTable.queue, this.queueName),
- gt(tasksTable.numRunsLeft, 0),
- or(
- // Not picked by a worker yet
- eq(tasksTable.status, "pending"),
-
- // Failed but still has attempts left
- eq(tasksTable.status, "pending_retry"),
-
- // Expired and still has attempts left
- and(
- eq(tasksTable.status, "running"),
- lt(tasksTable.expireAt, new Date()),
- ),
- ),
- ),
- )
- .orderBy(asc(tasksTable.createdAt))
- .limit(1);
-
- if (jobs.length == 0) {
- return null;
- }
- assert(jobs.length == 1);
- const job = jobs[0];
-
- const result = await txn
- .update(tasksTable)
- .set({
- status: "running",
- numRunsLeft: job.numRunsLeft - 1,
- allocationId: generateAllocationId(),
- expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000),
- })
- .where(
- and(
- eq(tasksTable.id, job.id),
-
- // The compare and swap is necessary to avoid race conditions
- eq(tasksTable.allocationId, job.allocationId),
- ),
- )
- .returning();
- if (result.length == 0) {
- return null;
- }
- assert(result.length == 1);
- return result[0];
- });
- }
-
- async finalize(
- id: number,
- alloctionId: string,
- status: "completed" | "pending_retry" | "failed",
- ) {
- if (status == "completed") {
- await this.db
- .delete(tasksTable)
- .where(
- and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)),
- );
- } else {
- await this.db
- .update(tasksTable)
- .set({ status: status, expireAt: null })
- .where(
- and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)),
- );
- }
- }
-}
diff --git a/packages/queue/runner.test.ts b/packages/queue/runner.test.ts
deleted file mode 100644
index 7777b422..00000000
--- a/packages/queue/runner.test.ts
+++ /dev/null
@@ -1,440 +0,0 @@
-/* eslint-disable @typescript-eslint/require-await */
-import { Semaphore } from "async-mutex";
-import { eq } from "drizzle-orm";
-import { describe, expect, test } from "vitest";
-import { z } from "zod";
-
-import {
- buildDBClient,
- DequeuedJob,
- DequeuedJobError,
- Runner,
- RunnerOptions,
- SqliteQueue,
-} from "./";
-import { tasksTable } from "./schema";
-
-class Baton {
- semaphore: Semaphore;
- constructor() {
- this.semaphore = new Semaphore(0);
- this.reset();
- }
- post() {
- this.semaphore.setValue(100000);
- }
-
- async wait() {
- await this.semaphore.acquire();
- }
-
- reset() {
- this.semaphore.setValue(-Infinity);
- }
-}
-
-class Barrier {
- semaphore: Semaphore;
- baton: Baton;
- constructor(numParticipants: number) {
- this.semaphore = new Semaphore(numParticipants * -1 + 1);
- this.baton = new Baton();
- this.reset(numParticipants * -1 + 1);
- }
-
- async notifyReachedAndWait() {
- this.semaphore.release();
- await this.baton.wait();
- }
-
- async waitUntilAllReached() {
- await this.semaphore.waitForUnlock();
- }
-
- allowParticipantsToProceed() {
- this.baton.post();
- }
-
- reset(numParticipants: number) {
- this.semaphore.setValue(numParticipants);
- this.baton.reset();
- }
-}
-
-const defaultRunnerOpts = {
- pollIntervalMs: 100,
- timeoutSecs: 100,
- concurrency: 2,
- validator: z.object({
- increment: z.number(),
- succeedAfter: z.number().optional().default(0),
- blockForSec: z.number().optional().default(0),
- }),
-};
-
-interface Work {
- increment: number;
- succeedAfter?: number;
- blockForSec?: number;
-}
-
-interface Results {
- result: number;
- numCalled: number;
- numCompleted: number;
- numFailed: number;
-}
-
-async function waitUntilAllSettled(queue: SqliteQueue<Work>) {
- let stats = await queue.stats();
- while (stats.running > 0 || stats.pending > 0 || stats.pending_retry > 0) {
- await new Promise((resolve) => setTimeout(resolve, 100));
- stats = await queue.stats();
- console.log(stats);
- }
-}
-
-function buildRunner(
- queue: SqliteQueue<Work>,
- opts: RunnerOptions<Work>,
- barrier: Barrier,
- inputResults?: Results,
-) {
- const results = inputResults ?? {
- result: 0,
- numCalled: 0,
- numCompleted: 0,
- numFailed: 0,
- };
- const runner = new Runner<Work>(
- queue,
- {
- run: async (job: DequeuedJob<Work>) => {
- console.log("STARTED:", job);
- results.numCalled++;
- await barrier.notifyReachedAndWait();
- if (job.runNumber < (job.data.succeedAfter ?? 0)) {
- throw new Error("Failed");
- }
- if (job.data.blockForSec !== undefined) {
- await new Promise((resolve) =>
- setTimeout(resolve, job.data.blockForSec! * 1000),
- );
- }
- results.result += job.data.increment;
- },
- onComplete: async (job: DequeuedJob<Work>) => {
- console.log("COMPLETED:", job);
- results.numCompleted++;
- },
- onError: async (job: DequeuedJobError<Work>) => {
- console.log("FAILED:", job);
- results.numFailed++;
- },
- },
- opts,
- );
-
- return { runner, results };
-}
-
-describe("SqiteQueueRunner", () => {
- test("should run jobs with correct concurrency", async () => {
- const queue = new SqliteQueue<Work>(
- "queue1",
- buildDBClient(":memory:", true),
- {
- defaultJobArgs: {
- numRetries: 0,
- },
- },
- );
-
- const barrier = new Barrier(2);
- const { runner, results } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 2 },
- barrier,
- );
-
- await queue.enqueue({ increment: 1 });
- await queue.enqueue({ increment: 2 });
- await queue.enqueue({ increment: 3 });
-
- expect(await queue.stats()).toEqual({
- pending: 3,
- running: 0,
- pending_retry: 0,
- failed: 0,
- });
-
- const runnerPromise = runner.runUntilEmpty();
-
- // Wait until all runners reach the synchronization point
- await barrier.waitUntilAllReached();
-
- // Ensure that we have two "running" jobs given the concurrency of 2
- expect(await queue.stats()).toEqual({
- pending: 1,
- running: 2,
- pending_retry: 0,
- failed: 0,
- });
-
- // Allow jobs to proceed
- barrier.allowParticipantsToProceed();
-
- // Wait until all jobs are consumed
- await runnerPromise;
-
- expect(await queue.stats()).toEqual({
- pending: 0,
- running: 0,
- pending_retry: 0,
- failed: 0,
- });
-
- expect(results.result).toEqual(6);
- expect(results.numCalled).toEqual(3);
- expect(results.numCompleted).toEqual(3);
- expect(results.numFailed).toEqual(0);
- });
-
- test("should retry errors", async () => {
- const queue = new SqliteQueue<Work>(
- "queue1",
- buildDBClient(":memory:", true),
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- },
- );
-
- const barrier = new Barrier(0);
- barrier.allowParticipantsToProceed();
- const { runner, results } = buildRunner(queue, defaultRunnerOpts, barrier);
-
- await queue.enqueue({ increment: 1, succeedAfter: 2 });
- await queue.enqueue({ increment: 1, succeedAfter: 10 });
- await queue.enqueue({ increment: 3, succeedAfter: 0 });
-
- const runnerPromise = runner.runUntilEmpty();
-
- // Wait until all jobs are consumed
- await runnerPromise;
-
- expect(await queue.stats()).toEqual({
- pending: 0,
- pending_retry: 0,
- running: 0,
- failed: 1,
- });
-
- expect(results.result).toEqual(4);
- expect(results.numCalled).toEqual(7);
- expect(results.numCompleted).toEqual(2);
- expect(results.numFailed).toEqual(1);
- });
-
- test("timeouts are respected", async () => {
- const queue = new SqliteQueue<Work>(
- "queue1",
- buildDBClient(":memory:", true),
- {
- defaultJobArgs: {
- numRetries: 1,
- },
- },
- );
-
- const barrier = new Barrier(1);
- barrier.allowParticipantsToProceed();
- const { runner: runner, results } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 1, timeoutSecs: 1 },
- barrier,
- );
-
- await queue.enqueue({ increment: 1, blockForSec: 10 });
- await runner.runUntilEmpty();
-
- expect(await queue.stats()).toEqual({
- pending: 0,
- pending_retry: 0,
- running: 0,
- failed: 1,
- });
-
- expect(results.result).toEqual(0);
- expect(results.numCalled).toEqual(2);
- expect(results.numCompleted).toEqual(0);
- expect(results.numFailed).toEqual(1);
- });
-
- test("serialization errors", async () => {
- const queue = new SqliteQueue<Work>(
- "queue1",
- buildDBClient(":memory:", true),
- {
- defaultJobArgs: {
- numRetries: 1,
- },
- },
- );
-
- const job = await queue.enqueue({ increment: 1 });
- // Corrupt the payload
- await queue.db
- .update(tasksTable)
- .set({ payload: "{}" })
- .where(eq(tasksTable.id, job.id));
-
- const barrier = new Barrier(1);
- barrier.allowParticipantsToProceed();
- const { runner, results } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 1 },
- barrier,
- );
-
- const p = runner.run();
- await waitUntilAllSettled(queue);
- runner.stop();
- await p;
-
- expect(await queue.stats()).toEqual({
- pending: 0,
- pending_retry: 0,
- running: 0,
- failed: 1,
- });
-
- expect(results.result).toEqual(0);
- expect(results.numCalled).toEqual(0);
- expect(results.numCompleted).toEqual(0);
- expect(results.numFailed).toEqual(1);
- });
-
- test("concurrent runners", async () => {
- const queue = new SqliteQueue<Work>(
- "queue1",
- buildDBClient(":memory:", true),
- {
- defaultJobArgs: {
- numRetries: 0,
- },
- },
- );
-
- await queue.enqueue({ increment: 1 });
- await queue.enqueue({ increment: 2 });
- await queue.enqueue({ increment: 3 });
-
- const barrier = new Barrier(3);
- const { runner: runner1, results } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 1 },
- barrier,
- );
- const { runner: runner2 } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 1 },
- barrier,
- results,
- );
- const { runner: runner3 } = buildRunner(
- queue,
- { ...defaultRunnerOpts, concurrency: 1 },
- barrier,
- results,
- );
-
- const runPromises = Promise.all([
- runner1.run(),
- runner2.run(),
- runner3.run(),
- ]);
-
- await barrier.waitUntilAllReached();
-
- expect(await queue.stats()).toEqual({
- pending: 0,
- pending_retry: 0,
- running: 3,
- failed: 0,
- });
-
- barrier.allowParticipantsToProceed();
-
- runner1.stop();
- runner2.stop();
- runner3.stop();
-
- await runPromises;
-
- expect(results.result).toEqual(6);
- expect(results.numCalled).toEqual(3);
- expect(results.numCompleted).toEqual(3);
- expect(results.numFailed).toEqual(0);
- });
-
- test("large test", async () => {
- const db = buildDBClient(":memory:", true);
- const queue1 = new SqliteQueue<Work>("queue1", db, {
- defaultJobArgs: {
- numRetries: 0,
- },
- });
- const queue2 = new SqliteQueue<Work>("queue2", db, {
- defaultJobArgs: {
- numRetries: 0,
- },
- });
-
- const barrier = new Barrier(0);
- barrier.allowParticipantsToProceed();
- const results = {
- result: 0,
- numCalled: 0,
- numCompleted: 0,
- numFailed: 0,
- };
- const runners = [];
- const runnerPromises = [];
-
- for (let i = 0; i < 10; i++) {
- const { runner } = buildRunner(
- i % 2 == 0 ? queue1 : queue2,
- { ...defaultRunnerOpts, concurrency: 2 },
- barrier,
- results,
- );
- runners.push(runner);
- runnerPromises.push(runner.run());
- }
-
- {
- const enqueuePromises = [];
- for (let i = 0; i < 1000; i++) {
- enqueuePromises.push(
- (i % 2 == 0 ? queue1 : queue2).enqueue({ increment: i }),
- );
- }
- await Promise.all(enqueuePromises);
- }
-
- await Promise.all([
- waitUntilAllSettled(queue1),
- waitUntilAllSettled(queue2),
- ]);
-
- runners.forEach((runner) => runner.stop());
- await Promise.all(runnerPromises);
-
- expect(results.result).toEqual(499500);
- expect(results.numCalled).toEqual(1000);
- expect(results.numCompleted).toEqual(1000);
- expect(results.numFailed).toEqual(0);
- });
-});
diff --git a/packages/queue/runner.ts b/packages/queue/runner.ts
deleted file mode 100644
index 1a90f969..00000000
--- a/packages/queue/runner.ts
+++ /dev/null
@@ -1,115 +0,0 @@
-import assert from "node:assert";
-import { Semaphore } from "async-mutex";
-
-import { RunnerFuncs, RunnerOptions } from "./options";
-import { SqliteQueue } from "./queue";
-import { Job } from "./schema";
-import { DequeuedJob } from "./types";
-
-export class Runner<T> {
- queue: SqliteQueue<T>;
- funcs: RunnerFuncs<T>;
- opts: RunnerOptions<T>;
- stopping = false;
-
- constructor(
- queue: SqliteQueue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- ) {
- this.queue = queue;
- this.funcs = funcs;
- this.opts = opts;
- }
-
- async run() {
- return this.runImpl(false);
- }
-
- stop() {
- this.stopping = true;
- }
-
- async runUntilEmpty() {
- return this.runImpl(true);
- }
-
- async runImpl(breakOnEmpty: boolean) {
- const semaphore = new Semaphore(this.opts.concurrency);
- const inFlight = new Map<number, Promise<void>>();
- while (!this.stopping) {
- await semaphore.waitForUnlock();
- const job = await this.queue.attemptDequeue({
- timeoutSecs: this.opts.timeoutSecs,
- });
- if (!job && breakOnEmpty && inFlight.size == 0) {
- // No more jobs to process, and no ongoing jobs.
- break;
- }
- if (!job) {
- await new Promise((resolve) =>
- setTimeout(resolve, this.opts.pollIntervalMs),
- );
- continue;
- }
- const [_, release] = await semaphore.acquire();
- inFlight.set(
- job.id,
- this.runOnce(job).finally(() => {
- inFlight.delete(job.id);
- release();
- }),
- );
- }
- await Promise.allSettled(inFlight.values());
- }
-
- async runOnce(job: Job) {
- assert(job.allocationId);
-
- let parsed: T;
- try {
- parsed = JSON.parse(job.payload) as T;
- if (this.opts.validator) {
- parsed = this.opts.validator.parse(parsed);
- }
- } catch (e) {
- if (job.numRunsLeft <= 0) {
- await this.funcs.onError?.({
- id: job.id.toString(),
- error: e as Error,
- });
- await this.queue.finalize(job.id, job.allocationId, "failed");
- } else {
- await this.queue.finalize(job.id, job.allocationId, "pending_retry");
- }
- return;
- }
-
- const dequeuedJob: DequeuedJob<T> = {
- id: job.id.toString(),
- data: parsed,
- runNumber: job.maxNumRuns - job.numRunsLeft - 1,
- };
- try {
- await Promise.race([
- this.funcs.run(dequeuedJob),
- new Promise((_, reject) =>
- setTimeout(
- () => reject(new Error("Timeout")),
- this.opts.timeoutSecs * 1000,
- ),
- ),
- ]);
- await this.funcs.onComplete?.(dequeuedJob);
- await this.queue.finalize(job.id, job.allocationId, "completed");
- } catch (e) {
- if (job.numRunsLeft <= 0) {
- await this.funcs.onError?.({ ...dequeuedJob, error: e as Error });
- await this.queue.finalize(job.id, job.allocationId, "failed");
- } else {
- await this.queue.finalize(job.id, job.allocationId, "pending_retry");
- }
- }
- }
-}
diff --git a/packages/queue/schema.ts b/packages/queue/schema.ts
deleted file mode 100644
index 377c6b1c..00000000
--- a/packages/queue/schema.ts
+++ /dev/null
@@ -1,36 +0,0 @@
-import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
-
-function createdAtField() {
- return integer("createdAt", { mode: "timestamp" })
- .notNull()
- .$defaultFn(() => new Date());
-}
-
-export const tasksTable = sqliteTable(
- "tasks",
- {
- id: integer("id").notNull().primaryKey({ autoIncrement: true }),
- queue: text("queue").notNull(),
- payload: text("payload").notNull(),
- createdAt: createdAtField(),
- status: text("status", {
- enum: ["pending", "running", "pending_retry", "failed"],
- })
- .notNull()
- .default("pending"),
- expireAt: integer("expireAt", { mode: "timestamp" }),
- allocationId: text("allocationId").notNull(),
- numRunsLeft: integer("numRunsLeft").notNull(),
- maxNumRuns: integer("maxNumRuns").notNull(),
- },
- (tasks) => ({
- queueIdx: index("tasks_queue_idx").on(tasks.queue),
- statusIdx: index("tasks_status_idx").on(tasks.status),
- expireAtIdx: index("tasks_expire_at_idx").on(tasks.expireAt),
- numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft),
- maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns),
- allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId),
- }),
-);
-
-export type Job = typeof tasksTable.$inferSelect;
diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json
deleted file mode 100644
index 71bf61e7..00000000
--- a/packages/queue/tsconfig.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "$schema": "https://json.schemastore.org/tsconfig",
- "extends": "@hoarder/tsconfig/node.json",
- "include": ["**/*.ts"],
- "exclude": ["node_modules"],
- "compilerOptions": {
- "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
- },
-}
diff --git a/packages/queue/types.ts b/packages/queue/types.ts
deleted file mode 100644
index 01975cc7..00000000
--- a/packages/queue/types.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-export interface DequeuedJob<T> {
- id: string;
- data: T;
- runNumber: number;
-}
-
-export interface DequeuedJobError<T> {
- id: string;
- data?: T;
- error: Error;
-}
diff --git a/packages/queue/vitest.config.ts b/packages/queue/vitest.config.ts
deleted file mode 100644
index a206cfc4..00000000
--- a/packages/queue/vitest.config.ts
+++ /dev/null
@@ -1,13 +0,0 @@
-/// <reference types="vitest" />
-
-import { defineConfig } from "vitest/config";
-
-// https://vitejs.dev/config/
-export default defineConfig({
- plugins: [],
- test: {
- alias: {
- "@/*": "./*",
- },
- },
-});