diff options
| author | MohamedBassem <me@mbassem.com> | 2024-07-14 20:42:06 +0000 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2024-07-14 22:25:23 +0000 |
| commit | f77a41a75237b8c816e0c9ca7217dfacc32cc7d0 (patch) | |
| tree | 69557b2001183cd915a81e0ea9d276e10d262147 /packages/queue/queue.ts | |
| parent | aa3dce09ff68c212ac1fad33adfbfaba96290a59 (diff) | |
| download | karakeep-f77a41a75237b8c816e0c9ca7217dfacc32cc7d0.tar.zst | |
chore: Add a new sqlite based queue package
Diffstat (limited to 'packages/queue/queue.ts')
| -rw-r--r-- | packages/queue/queue.ts | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/packages/queue/queue.ts b/packages/queue/queue.ts new file mode 100644 index 00000000..ad486468 --- /dev/null +++ b/packages/queue/queue.ts @@ -0,0 +1,146 @@ +import assert from "node:assert"; +import { and, asc, count, eq, gt, lt, or } from "drizzle-orm"; + +import { buildDBClient } from "./db"; +import { SqliteQueueOptions } from "./options"; +import { Job, tasksTable } from "./schema"; + +// generate random id +function generateAllocationId() { + return Math.random().toString(36).substring(2, 15); +} + +export class SqliteQueue<T> { + queueName: string; + db: ReturnType<typeof buildDBClient>; + options: SqliteQueueOptions; + + constructor( + name: string, + db: ReturnType<typeof buildDBClient>, + options: SqliteQueueOptions, + ) { + this.queueName = name; + this.options = options; + this.db = db; + } + + name() { + return this.queueName; + } + + async enqueue(payload: T): Promise<Job> { + const job = await this.db + .insert(tasksTable) + .values({ + queue: this.queueName, + payload: JSON.stringify(payload), + numRunsLeft: this.options.defaultJobArgs.numRetries + 1, + maxNumRuns: this.options.defaultJobArgs.numRetries + 1, + allocationId: generateAllocationId(), + }) + .returning(); + + return job[0]; + } + + async stats() { + const res = await this.db + .select({ status: tasksTable.status, count: count() }) + .from(tasksTable) + .where(eq(tasksTable.queue, this.queueName)) + .groupBy(tasksTable.status); + + return res.reduce( + (acc, r) => { + acc[r.status] += r.count; + return acc; + }, + { + pending: 0, + pending_retry: 0, + running: 0, + failed: 0, + }, + ); + } + + async attemptDequeue(options: { timeoutSecs: number }): Promise<Job | null> { + return await this.db.transaction(async (txn) => { + const jobs = await txn + .select() + .from(tasksTable) + .where( + and( + eq(tasksTable.queue, this.queueName), + gt(tasksTable.numRunsLeft, 0), + or( + // Not picked by a worker yet + eq(tasksTable.status, "pending"), + + // Failed but still has attempts left + eq(tasksTable.status, "pending_retry"), + + // Expired and still has attempts left + and( + eq(tasksTable.status, "running"), + lt(tasksTable.expireAt, new Date()), + ), + ), + ), + ) + .orderBy(asc(tasksTable.createdAt)) + .limit(1); + + if (jobs.length == 0) { + return null; + } + assert(jobs.length == 1); + const job = jobs[0]; + + const result = await txn + .update(tasksTable) + .set({ + status: "running", + numRunsLeft: job.numRunsLeft - 1, + allocationId: generateAllocationId(), + expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), + }) + .where( + and( + eq(tasksTable.id, job.id), + + // The compare and swap is necessary to avoid race conditions + eq(tasksTable.allocationId, job.allocationId), + ), + ) + .returning(); + if (result.length == 0) { + return null; + } + assert(result.length == 1); + return result[0]; + }); + } + + async finalize( + id: number, + alloctionId: string, + status: "completed" | "pending_retry" | "failed", + ) { + if (status == "completed") { + await this.db + .delete(tasksTable) + .where( + and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), + ); + } else { + await this.db + .update(tasksTable) + .set({ status: status, expireAt: null }) + .where( + and(eq(tasksTable.id, id), eq(tasksTable.allocationId, alloctionId)), + ); + } + } +} |
