aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/index.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-10 15:31:30 +0000
committerGitHub <noreply@github.com>2026-01-10 15:31:30 +0000
commitf48e98e16ae588ee5004531bf9a5aed757ed3786 (patch)
treefc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/index.ts
parentaace8864d7eab5c858a92064b0ac59c122377830 (diff)
downloadkarakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/index.ts')
-rw-r--r--packages/plugins/queue-restate/src/index.ts39
1 files changed, 28 insertions, 11 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index 07082955..64572b03 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -17,7 +17,7 @@ import { AdminClient } from "./admin";
import { envConfig } from "./env";
import { idProvider } from "./idProvider";
import { semaphore } from "./semaphore";
-import { buildRestateService } from "./service";
+import { buildRestateServices } from "./service";
class RestateQueueWrapper<T> implements Queue<T> {
constructor(
@@ -87,12 +87,13 @@ class RestateQueueWrapper<T> implements Queue<T> {
class RestateRunnerWrapper<T> implements Runner<T> {
constructor(
- private readonly wf: restate.ServiceDefinition<
+ private readonly dispatcherDef: restate.ServiceDefinition<
string,
{
run: (ctx: restate.Context, data: T) => Promise<void>;
}
>,
+ private readonly runnerDef: restate.ServiceDefinition<string, unknown>,
) {}
async run(): Promise<void> {
@@ -107,8 +108,12 @@ class RestateRunnerWrapper<T> implements Runner<T> {
throw new Error("Method not implemented.");
}
- get def(): restate.WorkflowDefinition<string, unknown> {
- return this.wf;
+ get dispatcherService(): restate.ServiceDefinition<string, unknown> {
+ return this.dispatcherDef;
+ }
+
+ get runnerService(): restate.ServiceDefinition<string, unknown> {
+ return this.runnerDef;
}
}
@@ -130,14 +135,24 @@ class RestateQueueClient implements QueueClient {
}
async start(): Promise<void> {
+ const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = [];
+
+ for (const svc of this.services.values()) {
+ if (envConfig.RESTATE_ENABLE_DISPATCHERS) {
+ servicesToExpose.push(svc.dispatcherService);
+ }
+ if (envConfig.RESTATE_ENABLE_RUNNERS) {
+ servicesToExpose.push(svc.runnerService);
+ }
+ }
+
+ if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) {
+ servicesToExpose.push(semaphore, idProvider);
+ }
+
const port = await restate.serve({
port: envConfig.RESTATE_LISTEN_PORT ?? 0,
- services: [
- ...[...this.services.values()].map((svc) => svc.def),
- ...(envConfig.RESTATE_EXPOSE_CORE_SERVICES
- ? [semaphore, idProvider]
- : []),
- ],
+ services: servicesToExpose,
identityKeys: envConfig.RESTATE_PUB_KEY
? [envConfig.RESTATE_PUB_KEY]
: undefined,
@@ -176,8 +191,10 @@ class RestateQueueClient implements QueueClient {
if (wrapper) {
throw new Error(`Queue ${name} already exists`);
}
+ const services = buildRestateServices(queue, funcs, opts, queue.opts);
const svc = new RestateRunnerWrapper<T>(
- buildRestateService(queue, funcs, opts, queue.opts),
+ services.dispatcher,
+ services.runner,
);
this.services.set(name, svc);
return svc;