aboutsummaryrefslogtreecommitdiffstats
path: root/packages/queue/runner.ts
blob: 1a90f969d212f7f00d1f091da1e017b6d14e99d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import assert from "node:assert";
import { Semaphore } from "async-mutex";

import { RunnerFuncs, RunnerOptions } from "./options";
import { SqliteQueue } from "./queue";
import { Job } from "./schema";
import { DequeuedJob } from "./types";

export class Runner<T> {
  queue: SqliteQueue<T>;
  funcs: RunnerFuncs<T>;
  opts: RunnerOptions<T>;
  stopping = false;

  constructor(
    queue: SqliteQueue<T>,
    funcs: RunnerFuncs<T>,
    opts: RunnerOptions<T>,
  ) {
    this.queue = queue;
    this.funcs = funcs;
    this.opts = opts;
  }

  async run() {
    return this.runImpl(false);
  }

  stop() {
    this.stopping = true;
  }

  async runUntilEmpty() {
    return this.runImpl(true);
  }

  async runImpl(breakOnEmpty: boolean) {
    const semaphore = new Semaphore(this.opts.concurrency);
    const inFlight = new Map<number, Promise<void>>();
    while (!this.stopping) {
      await semaphore.waitForUnlock();
      const job = await this.queue.attemptDequeue({
        timeoutSecs: this.opts.timeoutSecs,
      });
      if (!job && breakOnEmpty && inFlight.size == 0) {
        // No more jobs to process, and no ongoing jobs.
        break;
      }
      if (!job) {
        await new Promise((resolve) =>
          setTimeout(resolve, this.opts.pollIntervalMs),
        );
        continue;
      }
      const [_, release] = await semaphore.acquire();
      inFlight.set(
        job.id,
        this.runOnce(job).finally(() => {
          inFlight.delete(job.id);
          release();
        }),
      );
    }
    await Promise.allSettled(inFlight.values());
  }

  async runOnce(job: Job) {
    assert(job.allocationId);

    let parsed: T;
    try {
      parsed = JSON.parse(job.payload) as T;
      if (this.opts.validator) {
        parsed = this.opts.validator.parse(parsed);
      }
    } catch (e) {
      if (job.numRunsLeft <= 0) {
        await this.funcs.onError?.({
          id: job.id.toString(),
          error: e as Error,
        });
        await this.queue.finalize(job.id, job.allocationId, "failed");
      } else {
        await this.queue.finalize(job.id, job.allocationId, "pending_retry");
      }
      return;
    }

    const dequeuedJob: DequeuedJob<T> = {
      id: job.id.toString(),
      data: parsed,
      runNumber: job.maxNumRuns - job.numRunsLeft - 1,
    };
    try {
      await Promise.race([
        this.funcs.run(dequeuedJob),
        new Promise((_, reject) =>
          setTimeout(
            () => reject(new Error("Timeout")),
            this.opts.timeoutSecs * 1000,
          ),
        ),
      ]);
      await this.funcs.onComplete?.(dequeuedJob);
      await this.queue.finalize(job.id, job.allocationId, "completed");
    } catch (e) {
      if (job.numRunsLeft <= 0) {
        await this.funcs.onError?.({ ...dequeuedJob, error: e as Error });
        await this.queue.finalize(job.id, job.allocationId, "failed");
      } else {
        await this.queue.finalize(job.id, job.allocationId, "pending_retry");
      }
    }
  }
}