1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
import * as restate from "@restatedev/restate-sdk";
import type {
Queue,
QueueOptions,
RunnerOptions,
} from "@karakeep/shared/queueing";
import logger from "@karakeep/shared/logger";
import { tryCatch } from "@karakeep/shared/tryCatch";
import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
import { runnerServiceName } from "./runner";
import { RestateSemaphore } from "./semaphore";
export function buildDispatcherService<T, R>(
queue: Queue<T>,
opts: RunnerOptions<T>,
queueOpts: QueueOptions,
) {
const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
const runnerName = runnerServiceName(queue.name());
// Type definition for the runner service client
// Note: ctx parameter is required for Restate SDK to correctly infer client method signatures
interface RunnerService {
run: (
ctx: restate.Context,
data: RunnerJobData<T>,
) => Promise<RunnerResult<R>>;
onCompleted: (
ctx: restate.Context,
data: { job: RunnerJobData<T>; result: R },
) => Promise<void>;
onError: (
ctx: restate.Context,
data: { job: RunnerJobData<T>; error: SerializedError },
) => Promise<void>;
}
return restate.service({
name: queue.name(),
options: {
inactivityTimeout: {
seconds: opts.timeoutSecs * 2,
},
retryPolicy: {
maxAttempts: NUM_RETRIES,
initialInterval: {
seconds: 5,
},
maxInterval: {
minutes: 1,
},
},
journalRetention: {
days: 3,
},
},
handlers: {
run: async (
ctx: restate.Context,
data: {
payload: T;
queuedIdempotencyKey?: string;
priority: number;
groupId?: string;
},
) => {
const id = ctx.rand.uuidv4();
const priority = data.priority ?? 0;
const logDebug = async (message: string) => {
await ctx.run(
"log",
async () => {
logger.debug(message);
},
{
maxRetryAttempts: 1,
},
);
};
const semaphore = new RestateSemaphore(
ctx,
`queue:${queue.name()}`,
opts.concurrency,
Math.ceil(opts.timeoutSecs * 1.5 * 1000),
);
const runner = ctx.serviceClient<RunnerService>({ name: runnerName });
let runNumber = 0;
while (runNumber <= NUM_RETRIES) {
await logDebug(
`Dispatcher attempt ${runNumber} for queue ${queue.name()} job ${id} (priority=${priority}, groupId=${data.groupId ?? "none"})`,
);
const leaseId = await semaphore.acquire(
priority,
data.groupId,
data.queuedIdempotencyKey,
);
if (!leaseId) {
// Idempotency key already exists, skip
await logDebug(
`Dispatcher skipping queue ${queue.name()} job ${id} due to existing idempotency key`,
);
return;
}
await logDebug(
`Dispatcher acquired lease ${leaseId} for queue ${queue.name()} job ${id}`,
);
const jobData: RunnerJobData<T> = {
id,
data: data.payload,
priority,
runNumber,
numRetriesLeft: NUM_RETRIES - runNumber,
timeoutSecs: opts.timeoutSecs,
};
// Call the runner service
const res = await tryCatch(runner.run(jobData));
// Handle RPC-level errors (e.g., runner service unavailable)
if (res.error) {
await logDebug(
`Dispatcher RPC error for queue ${queue.name()} job ${id}: ${res.error instanceof Error ? res.error.message : String(res.error)}`,
);
await semaphore.release(leaseId);
if (res.error instanceof restate.CancelledError) {
throw res.error;
}
// Retry with backoff
await ctx.sleep(1000, "rpc error retry");
runNumber++;
continue;
}
const result = res.data;
if (result.type === "rate_limit") {
// Rate limit - release semaphore, sleep, and retry without incrementing runNumber
await logDebug(
`Dispatcher rate limit for queue ${queue.name()} job ${id} (delayMs=${result.delayMs})`,
);
await semaphore.release(leaseId);
await ctx.sleep(result.delayMs, "rate limit retry");
continue;
}
if (result.type === "error") {
// Call onError on the runner BEFORE releasing semaphore
// This ensures inFlight tracking stays consistent
await logDebug(
`Dispatcher runner error for queue ${queue.name()} job ${id}: ${result.error.message}`,
);
await tryCatch(
runner.onError({
job: jobData,
error: result.error,
}),
);
await semaphore.release(leaseId);
// Retry with backoff
await ctx.sleep(1000, "error retry");
runNumber++;
continue;
}
// Success - call onCompleted BEFORE releasing semaphore
// This ensures inFlight tracking stays consistent
await logDebug(
`Dispatcher completed queue ${queue.name()} job ${id}`,
);
await tryCatch(
runner.onCompleted({
job: jobData,
result: result.value,
}),
);
await semaphore.release(leaseId);
break;
}
},
},
});
}
|