aboutsummaryrefslogtreecommitdiffstats
path: root/packages/queue/runner.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-10-27 23:37:54 +0000
committerMohamed Bassem <me@mbassem.com>2024-10-27 23:40:10 +0000
commita746e9a38e53e4a9114d786c7fb1b3ef4ca67870 (patch)
treebf1d819f6faca01bc7c3db1efe91c0879caaf4a8 /packages/queue/runner.ts
parenteb7da996a7c2d617d276f296cac07a6fd5648664 (diff)
downloadkarakeep-a746e9a38e53e4a9114d786c7fb1b3ef4ca67870.tar.zst
deps: Extract the queue implementation into its own repos
Diffstat (limited to 'packages/queue/runner.ts')
-rw-r--r--packages/queue/runner.ts115
1 files changed, 0 insertions, 115 deletions
diff --git a/packages/queue/runner.ts b/packages/queue/runner.ts
deleted file mode 100644
index 1a90f969..00000000
--- a/packages/queue/runner.ts
+++ /dev/null
@@ -1,115 +0,0 @@
-import assert from "node:assert";
-import { Semaphore } from "async-mutex";
-
-import { RunnerFuncs, RunnerOptions } from "./options";
-import { SqliteQueue } from "./queue";
-import { Job } from "./schema";
-import { DequeuedJob } from "./types";
-
-export class Runner<T> {
- queue: SqliteQueue<T>;
- funcs: RunnerFuncs<T>;
- opts: RunnerOptions<T>;
- stopping = false;
-
- constructor(
- queue: SqliteQueue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- ) {
- this.queue = queue;
- this.funcs = funcs;
- this.opts = opts;
- }
-
- async run() {
- return this.runImpl(false);
- }
-
- stop() {
- this.stopping = true;
- }
-
- async runUntilEmpty() {
- return this.runImpl(true);
- }
-
- async runImpl(breakOnEmpty: boolean) {
- const semaphore = new Semaphore(this.opts.concurrency);
- const inFlight = new Map<number, Promise<void>>();
- while (!this.stopping) {
- await semaphore.waitForUnlock();
- const job = await this.queue.attemptDequeue({
- timeoutSecs: this.opts.timeoutSecs,
- });
- if (!job && breakOnEmpty && inFlight.size == 0) {
- // No more jobs to process, and no ongoing jobs.
- break;
- }
- if (!job) {
- await new Promise((resolve) =>
- setTimeout(resolve, this.opts.pollIntervalMs),
- );
- continue;
- }
- const [_, release] = await semaphore.acquire();
- inFlight.set(
- job.id,
- this.runOnce(job).finally(() => {
- inFlight.delete(job.id);
- release();
- }),
- );
- }
- await Promise.allSettled(inFlight.values());
- }
-
- async runOnce(job: Job) {
- assert(job.allocationId);
-
- let parsed: T;
- try {
- parsed = JSON.parse(job.payload) as T;
- if (this.opts.validator) {
- parsed = this.opts.validator.parse(parsed);
- }
- } catch (e) {
- if (job.numRunsLeft <= 0) {
- await this.funcs.onError?.({
- id: job.id.toString(),
- error: e as Error,
- });
- await this.queue.finalize(job.id, job.allocationId, "failed");
- } else {
- await this.queue.finalize(job.id, job.allocationId, "pending_retry");
- }
- return;
- }
-
- const dequeuedJob: DequeuedJob<T> = {
- id: job.id.toString(),
- data: parsed,
- runNumber: job.maxNumRuns - job.numRunsLeft - 1,
- };
- try {
- await Promise.race([
- this.funcs.run(dequeuedJob),
- new Promise((_, reject) =>
- setTimeout(
- () => reject(new Error("Timeout")),
- this.opts.timeoutSecs * 1000,
- ),
- ),
- ]);
- await this.funcs.onComplete?.(dequeuedJob);
- await this.queue.finalize(job.id, job.allocationId, "completed");
- } catch (e) {
- if (job.numRunsLeft <= 0) {
- await this.funcs.onError?.({ ...dequeuedJob, error: e as Error });
- await this.queue.finalize(job.id, job.allocationId, "failed");
- } else {
- await this.queue.finalize(job.id, job.allocationId, "pending_retry");
- }
- }
- }
-}