aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/queueing.ts
blob: ed6759dd9f063c5112ea0a3f1288c6a9e9d1096c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import { ZodType } from "zod";

import { PluginManager, PluginType } from "./plugins";

export interface EnqueueOptions {
  idempotencyKey?: string;
  priority?: number;
  delayMs?: number;
}

export interface QueueOptions {
  defaultJobArgs: {
    numRetries: number;
  };
  keepFailedJobs: boolean;
}

export interface DequeuedJob<T> {
  id: string;
  data: T;
  priority: number;
  runNumber: number;
  abortSignal: AbortSignal;
}

export interface DequeuedJobError<T> {
  id: string;
  data?: T;
  priority: number;
  error: Error;
  runNumber: number;
  numRetriesLeft: number;
}

export interface RunnerFuncs<T, R = void> {
  run: (job: DequeuedJob<T>) => Promise<R>;
  onComplete?: (job: DequeuedJob<T>, result: R) => Promise<void>;
  onError?: (job: DequeuedJobError<T>) => Promise<void>;
}

export interface RunnerOptions<T> {
  pollIntervalMs?: number;
  timeoutSecs: number;
  concurrency: number;
  validator?: ZodType<T>;
}

export interface Queue<T> {
  opts: QueueOptions;
  name(): string;
  enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
  stats(): Promise<{
    pending: number;
    pending_retry: number;
    running: number;
    failed: number;
  }>;
  cancelAllNonRunning?(): Promise<number>;
}

export interface Runner<_T> {
  run(): Promise<void>;
  stop(): void;
  runUntilEmpty?(): Promise<void>;
}

export interface QueueClient {
  prepare(): Promise<void>;
  start(): Promise<void>;
  createQueue<T>(name: string, options: QueueOptions): Queue<T>;
  createRunner<T, R = void>(
    queue: Queue<T>,
    funcs: RunnerFuncs<T, R>,
    opts: RunnerOptions<T>,
  ): Runner<T>;
  shutdown?(): Promise<void>;
}

export async function getQueueClient(): Promise<QueueClient> {
  const client = await PluginManager.getClient(PluginType.Queue);
  if (!client) {
    throw new Error("Failed to get queue client");
  }
  return client;
}