aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-10-05 07:04:29 +0100
committerGitHub <noreply@github.com>2025-10-05 07:04:29 +0100
commit74a1f7b6b600d4cb53352dde7def374c3125721a (patch)
tree70b79ebae61456f6ff2cb02a37351fa9817fb342
parent4a580d713621f99abb8baabc9b847ce039d44842 (diff)
downloadkarakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration * add retry * add delay + idempotency * implement concurrency limits * add admin stats * add todos * add id provider * handle onComplete failures * add tests * add pub key and fix logging * add priorities * fail call after retries * more fixes * fix retries left * some refactoring * fix package.json * upgrade sdk * some test cleanups
-rw-r--r--apps/workers/index.ts10
-rw-r--r--packages/plugins-queue-liteque/package.json5
-rw-r--r--packages/plugins-queue-liteque/src/index.ts9
-rw-r--r--packages/plugins-queue-restate/.oxlintrc.json19
-rw-r--r--packages/plugins-queue-restate/index.ts12
-rw-r--r--packages/plugins-queue-restate/package.json27
-rw-r--r--packages/plugins-queue-restate/src/admin.ts75
-rw-r--r--packages/plugins-queue-restate/src/env.ts13
-rw-r--r--packages/plugins-queue-restate/src/idProvider.ts18
-rw-r--r--packages/plugins-queue-restate/src/index.ts201
-rw-r--r--packages/plugins-queue-restate/src/semaphore.ts105
-rw-r--r--packages/plugins-queue-restate/src/service.ts133
-rw-r--r--packages/plugins-queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins-queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins-queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins-queue-restate/src/tests/utils.ts23
-rw-r--r--packages/plugins-queue-restate/tsconfig.json10
-rw-r--r--packages/plugins-queue-restate/vitest.config.ts14
-rw-r--r--packages/plugins-search-meilisearch/package.json4
-rw-r--r--packages/shared-server/package.json1
-rw-r--r--packages/shared-server/src/plugins.ts1
-rw-r--r--packages/shared-server/src/queues.ts8
-rw-r--r--packages/shared/queueing.ts5
-rw-r--r--pnpm-lock.yaml50
24 files changed, 1049 insertions, 13 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 578ff6c8..c0270f0d 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -2,7 +2,11 @@ import "dotenv/config";
import { buildServer } from "server";
-import { loadAllPlugins, runQueueDBMigrations } from "@karakeep/shared-server";
+import {
+ loadAllPlugins,
+ prepareQueue,
+ startQueue,
+} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
@@ -46,7 +50,7 @@ function isWorkerEnabled(name: WorkerName) {
async function main() {
await loadAllPlugins();
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
- runQueueDBMigrations();
+ await prepareQueue();
const httpServer = buildServer();
@@ -59,6 +63,8 @@ async function main() {
})),
);
+ await startQueue();
+
if (workers.some((w) => w.name === "feed")) {
FeedRefreshingWorker.start();
}
diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json
index a31c9707..bb4b1aac 100644
--- a/packages/plugins-queue-liteque/package.json
+++ b/packages/plugins-queue-liteque/package.json
@@ -6,8 +6,8 @@
"type": "module",
"scripts": {
"typecheck": "tsc --noEmit",
- "format": "prettier . --ignore-path ../../.prettierignore",
- "format:fix": "prettier . --write --ignore-path ../../.prettierignore",
+ "format": "prettier . --cache --ignore-path ../../.prettierignore --check",
+ "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write",
"lint": "oxlint .",
"lint:fix": "oxlint . --fix",
"test": "vitest"
@@ -24,4 +24,3 @@
},
"prettier": "@karakeep/prettier-config"
}
-
diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts
index 16e6e20a..ddc2181c 100644
--- a/packages/plugins-queue-liteque/src/index.ts
+++ b/packages/plugins-queue-liteque/src/index.ts
@@ -24,6 +24,7 @@ class LitequeQueueWrapper<T> implements Queue<T> {
constructor(
private readonly _name: string,
private readonly lq: LQ<T>,
+ public readonly opts: QueueOptions,
) {}
name(): string {
@@ -60,10 +61,14 @@ class LitequeQueueClient implements QueueClient {
private queues = new Map<string, LitequeQueueWrapper<unknown>>();
- async init(): Promise<void> {
+ async prepare(): Promise<void> {
migrateDB(this.db);
}
+ async start(): Promise<void> {
+ // No-op for sqlite
+ }
+
createQueue<T>(name: string, options: QueueOptions): Queue<T> {
if (this.queues.has(name)) {
throw new Error(`Queue ${name} already exists`);
@@ -72,7 +77,7 @@ class LitequeQueueClient implements QueueClient {
defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries },
keepFailedJobs: options.keepFailedJobs,
});
- const wrapper = new LitequeQueueWrapper<T>(name, lq);
+ const wrapper = new LitequeQueueWrapper<T>(name, lq, options);
this.queues.set(name, wrapper);
return wrapper;
}
diff --git a/packages/plugins-queue-restate/.oxlintrc.json b/packages/plugins-queue-restate/.oxlintrc.json
new file mode 100644
index 00000000..79ba0255
--- /dev/null
+++ b/packages/plugins-queue-restate/.oxlintrc.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "../../node_modules/oxlint/configuration_schema.json",
+ "extends": [
+ "../../tooling/oxlint/oxlint-base.json"
+ ],
+ "env": {
+ "builtin": true,
+ "commonjs": true
+ },
+ "ignorePatterns": [
+ "**/*.config.js",
+ "**/*.config.cjs",
+ "**/.eslintrc.cjs",
+ "**/.next",
+ "**/dist",
+ "**/build",
+ "**/pnpm-lock.yaml"
+ ]
+}
diff --git a/packages/plugins-queue-restate/index.ts b/packages/plugins-queue-restate/index.ts
new file mode 100644
index 00000000..d313615c
--- /dev/null
+++ b/packages/plugins-queue-restate/index.ts
@@ -0,0 +1,12 @@
+// Auto-register the Restate queue provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { RestateQueueProvider } from "./src";
+
+if (RestateQueueProvider.isConfigured()) {
+ PluginManager.register({
+ type: PluginType.Queue,
+ name: "Restate",
+ provider: new RestateQueueProvider(),
+ });
+}
diff --git a/packages/plugins-queue-restate/package.json b/packages/plugins-queue-restate/package.json
new file mode 100644
index 00000000..16681150
--- /dev/null
+++ b/packages/plugins-queue-restate/package.json
@@ -0,0 +1,27 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@karakeep/plugins-queue-restate",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "scripts": {
+ "typecheck": "tsc --noEmit",
+ "format": "prettier . --cache --ignore-path ../../.prettierignore --check",
+ "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write",
+ "lint": "oxlint .",
+ "lint:fix": "oxlint . --fix",
+ "test": "vitest"
+ },
+ "dependencies": {
+ "@karakeep/shared": "workspace:*",
+ "@restatedev/restate-sdk": "^1.9.0",
+ "@restatedev/restate-sdk-clients": "^1.9.0"
+ },
+ "devDependencies": {
+ "@karakeep/prettier-config": "workspace:^0.1.0",
+ "@karakeep/tsconfig": "workspace:^0.1.0",
+ "vite-tsconfig-paths": "^4.3.1",
+ "vitest": "^3.2.4"
+ },
+ "prettier": "@karakeep/prettier-config"
+}
diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts
new file mode 100644
index 00000000..dddc8f00
--- /dev/null
+++ b/packages/plugins-queue-restate/src/admin.ts
@@ -0,0 +1,75 @@
+import { z } from "zod";
+
+export class AdminClient {
+ constructor(private addr: string) {}
+
+ async upsertDeployment(deploymentAddr: string) {
+ const res = await fetch(`${this.addr}/deployments`, {
+ method: "POST",
+ body: JSON.stringify({
+ uri: deploymentAddr,
+ force: true,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to upsert deployment: ${res.status}`);
+ }
+ }
+
+ async getStats(serviceName: string) {
+ const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`;
+ const res = await fetch(`${this.addr}/query`, {
+ method: "POST",
+ body: JSON.stringify({
+ query,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to get stats: ${res.status}`);
+ }
+ const zStatus = z.enum([
+ "pending",
+ "scheduled",
+ "ready",
+ "running",
+ "paused",
+ "backing-off",
+ "suspended",
+ "completed",
+ ]);
+ const zSchema = z.object({
+ rows: z.array(
+ z.object({
+ status: zStatus,
+ count: z.number(),
+ }),
+ ),
+ });
+
+ return zSchema.parse(await res.json()).rows.reduce(
+ (acc, cur) => {
+ acc[cur.status] = cur.count;
+ return acc;
+ },
+ {
+ pending: 0,
+ scheduled: 0,
+ ready: 0,
+ running: 0,
+ paused: 0,
+ "backing-off": 0,
+ suspended: 0,
+ completed: 0,
+ } as Record<z.infer<typeof zStatus>, number>,
+ );
+ }
+}
diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts
new file mode 100644
index 00000000..01175e86
--- /dev/null
+++ b/packages/plugins-queue-restate/src/env.ts
@@ -0,0 +1,13 @@
+import { z } from "zod";
+
+export const envConfig = z
+ .object({
+ RESTATE_LISTEN_PORT: z.coerce.number().optional(),
+ RESTATE_INGRESS_ADDR: z
+ .string()
+ .optional()
+ .default("http://localhost:8080"),
+ RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
+ RESTATE_PUB_KEY: z.string().optional(),
+ })
+ .parse(process.env);
diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts
new file mode 100644
index 00000000..72ebc860
--- /dev/null
+++ b/packages/plugins-queue-restate/src/idProvider.ts
@@ -0,0 +1,18 @@
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+export const idProvider = object({
+ name: "IdProvider",
+ handlers: {
+ get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => {
+ const state = (await ctx.get("nextId")) ?? 0;
+ ctx.set("nextId", state + 1);
+ return state;
+ },
+ },
+});
+
+export async function genId(ctx: Context) {
+ return ctx
+ .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider")
+ .get();
+}
diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts
new file mode 100644
index 00000000..bedc26af
--- /dev/null
+++ b/packages/plugins-queue-restate/src/index.ts
@@ -0,0 +1,201 @@
+import * as restate from "@restatedev/restate-sdk";
+import * as restateClient from "@restatedev/restate-sdk-clients";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+
+import { AdminClient } from "./admin";
+import { envConfig } from "./env";
+import { idProvider } from "./idProvider";
+import { semaphore } from "./semaphore";
+import { buildRestateService } from "./service";
+
+class RestateQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly client: restateClient.Ingress,
+ private readonly adminClient: AdminClient,
+ public readonly opts: QueueOptions,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ interface MyService {
+ run: (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => Promise<void>;
+ }
+ const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
+ const res = await cl.run(
+ {
+ payload,
+ priority: options?.priority ?? 0,
+ },
+ restateClient.rpc.sendOpts({
+ delay: options?.delayMs
+ ? {
+ milliseconds: options.delayMs,
+ }
+ : undefined,
+ idempotencyKey: options?.idempotencyKey,
+ }),
+ );
+ return res.invocationId;
+ }
+
+ async stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }> {
+ const res = await this.adminClient.getStats(this.name());
+ return {
+ pending: res.pending + res.ready,
+ pending_retry: res["backing-off"] + res.paused + res.suspended,
+ running: res.running,
+ failed: 0,
+ };
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ throw new Error("Method not implemented.");
+ }
+}
+
+class RestateRunnerWrapper<T> implements Runner<T> {
+ constructor(
+ private readonly wf: restate.ServiceDefinition<
+ string,
+ {
+ run: (ctx: restate.Context, data: T) => Promise<void>;
+ }
+ >,
+ ) {}
+
+ async run(): Promise<void> {
+ // No-op for restate
+ }
+
+ async stop(): Promise<void> {
+ // No-op for restate
+ }
+
+ async runUntilEmpty(): Promise<void> {
+ throw new Error("Method not implemented.");
+ }
+
+ get def(): restate.WorkflowDefinition<string, unknown> {
+ return this.wf;
+ }
+}
+
+class RestateQueueClient implements QueueClient {
+ private client: restateClient.Ingress;
+ private adminClient: AdminClient;
+ private queues = new Map<string, RestateQueueWrapper<unknown>>();
+ private services = new Map<string, RestateRunnerWrapper<unknown>>();
+
+ constructor() {
+ this.client = restateClient.connect({
+ url: envConfig.RESTATE_INGRESS_ADDR,
+ });
+ this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
+ }
+
+ async prepare(): Promise<void> {
+ // No-op for restate
+ }
+
+ async start(): Promise<void> {
+ const port = await restate.serve({
+ port: envConfig.RESTATE_LISTEN_PORT ?? 0,
+ services: [
+ ...[...this.services.values()].map((svc) => svc.def),
+ semaphore,
+ idProvider,
+ ],
+ identityKeys: envConfig.RESTATE_PUB_KEY
+ ? [envConfig.RESTATE_PUB_KEY]
+ : undefined,
+ logger: (meta, msg) => {
+ if (meta.context) {
+ // No need to log invocation logs
+ } else {
+ logger.log(meta.level, `[restate] ${msg}`);
+ }
+ },
+ });
+ logger.info(`Restate listening on port ${port}`);
+ }
+
+ createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const wrapper = new RestateQueueWrapper<T>(
+ name,
+ this.client,
+ this.adminClient,
+ opts,
+ );
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.services.get(name);
+ if (wrapper) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const svc = new RestateRunnerWrapper<T>(
+ buildRestateService(queue, funcs, opts, queue.opts),
+ );
+ this.services.set(name, svc);
+ return svc;
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class RestateQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ static isConfigured(): boolean {
+ return envConfig.RESTATE_LISTEN_PORT !== undefined;
+ }
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new RestateQueueClient();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts
new file mode 100644
index 00000000..253dbe33
--- /dev/null
+++ b/packages/plugins-queue-restate/src/semaphore.ts
@@ -0,0 +1,105 @@
+// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
+
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+interface QueueItem {
+ awakeable: string;
+ priority: number;
+}
+
+interface QueueState {
+ items: QueueItem[];
+ inFlight: number;
+}
+
+export const semaphore = object({
+ name: "Semaphore",
+ handlers: {
+ acquire: async (
+ ctx: ObjectContext<QueueState>,
+ req: { awakeableId: string; priority: number; capacity: number },
+ ): Promise<void> => {
+ const state = await getState(ctx);
+
+ state.items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ });
+
+ tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ },
+
+ release: async (
+ ctx: ObjectContext<QueueState>,
+ capacity: number,
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ state.inFlight--;
+ tick(ctx, state, capacity);
+ setState(ctx, state);
+ },
+ },
+});
+
+function selectAndPopItem(items: QueueItem[]): QueueItem {
+ let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 };
+ for (const [i, item] of items.entries()) {
+ if (item.priority > highest.priority) {
+ highest.priority = item.priority;
+ highest.index = i;
+ }
+ }
+ const [item] = items.splice(highest.index, 1);
+ return item;
+}
+
+function tick(
+ ctx: ObjectContext<QueueState>,
+ state: QueueState,
+ capacity: number,
+) {
+ while (state.inFlight < capacity && state.items.length > 0) {
+ const item = selectAndPopItem(state.items);
+ state.inFlight++;
+ ctx.resolveAwakeable(item.awakeable);
+ }
+}
+
+async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
+ return {
+ items: (await ctx.get("items")) ?? [],
+ inFlight: (await ctx.get("inFlight")) ?? 0,
+ };
+}
+
+function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
+ ctx.set("items", state.items);
+ ctx.set("inFlight", state.inFlight);
+}
+
+export class RestateSemaphore {
+ constructor(
+ private readonly ctx: Context,
+ private readonly id: string,
+ private readonly capacity: number,
+ ) {}
+
+ async acquire(priority: number) {
+ const awk = this.ctx.awakeable();
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .acquire({
+ awakeableId: awk.id,
+ priority,
+ capacity: this.capacity,
+ });
+ await awk.promise;
+ }
+ async release() {
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .release(this.capacity);
+ }
+}
diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts
new file mode 100644
index 00000000..de5b070f
--- /dev/null
+++ b/packages/plugins-queue-restate/src/service.ts
@@ -0,0 +1,133 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type {
+ Queue,
+ QueueOptions,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import { genId } from "./idProvider";
+import { RestateSemaphore } from "./semaphore";
+
+export function buildRestateService<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ queueOpts: QueueOptions,
+) {
+ const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
+ return restate.service({
+ name: queue.name(),
+ options: {
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => {
+ const id = `${await genId(ctx)}`;
+ let payload = data.payload;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(data.payload);
+ if (!res.success) {
+ throw new restate.TerminalError(res.error.message, {
+ errorCode: 400,
+ });
+ }
+ payload = res.data;
+ }
+
+ const priority = data.priority ?? 0;
+
+ const semaphore = new RestateSemaphore(
+ ctx,
+ `queue:${queue.name()}`,
+ opts.concurrency,
+ );
+
+ let lastError: Error | undefined;
+ for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ await semaphore.acquire(priority);
+ const res = await runWorkerLogic(ctx, funcs, {
+ id,
+ data: payload,
+ priority,
+ runNumber,
+ numRetriesLeft: NUM_RETRIES - runNumber,
+ abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
+ });
+ await semaphore.release();
+ if (res.error) {
+ lastError = res.error;
+ // TODO: add backoff
+ await ctx.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ if (lastError) {
+ throw new restate.TerminalError(lastError.message, {
+ errorCode: 500,
+ cause: "cause" in lastError ? lastError.cause : undefined,
+ });
+ }
+ },
+ },
+ });
+}
+
+async function runWorkerLogic<T>(
+ ctx: restate.Context,
+ { run, onError, onComplete }: RunnerFuncs<T>,
+ data: {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ numRetriesLeft: number;
+ abortSignal: AbortSignal;
+ },
+) {
+ const res = await tryCatch(
+ ctx.run(
+ `main logic`,
+ async () => {
+ await run(data);
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ if (res.error) {
+ await tryCatch(
+ ctx.run(
+ `onError`,
+ async () =>
+ onError?.({
+ ...data,
+ error: res.error,
+ }),
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ return res;
+ }
+
+ await tryCatch(
+ ctx.run("onComplete", async () => await onComplete?.(data), {
+ maxRetryAttempts: 1,
+ }),
+ );
+ return res;
+}
diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml
new file mode 100644
index 00000000..f24c2921
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/docker-compose.yml
@@ -0,0 +1,8 @@
+services:
+ restate:
+ image: docker.restate.dev/restatedev/restate:1.5
+ ports:
+ - "${RESTATE_INGRESS_PORT:-8080}:8080"
+ - "${RESTATE_ADMIN_PORT:-9070}:9070"
+ extra_hosts:
+ - "host.docker.internal:host-gateway"
diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts
new file mode 100644
index 00000000..692581d6
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/queue.test.ts
@@ -0,0 +1,221 @@
+import {
+ afterAll,
+ afterEach,
+ beforeAll,
+ beforeEach,
+ describe,
+ expect,
+ inject,
+ it,
+} from "vitest";
+
+import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+
+import { AdminClient } from "../admin.js";
+import { RestateQueueProvider } from "../index.js";
+import { waitUntil } from "./utils.js";
+
+type TestAction =
+ | { type: "val"; val: number }
+ | { type: "err"; err: string }
+ | { type: "stall"; durSec: number };
+
+describe("Restate Queue Provider", () => {
+ let queueClient: QueueClient;
+ let queue: Queue<TestAction>;
+ let adminClient: AdminClient;
+
+ const testState = {
+ results: [] as number[],
+ errors: [] as string[],
+ inFlight: 0,
+ maxInFlight: 0,
+ };
+
+ async function waitUntilQueueEmpty() {
+ await waitUntil(
+ async () => {
+ const stats = await queue.stats();
+ return stats.pending + stats.pending_retry + stats.running === 0;
+ },
+ "Queue to be empty",
+ 60000,
+ );
+ }
+
+ beforeEach(async () => {
+ testState.results = [];
+ testState.errors = [];
+ testState.inFlight = 0;
+ testState.maxInFlight = 0;
+ });
+ afterEach(async () => {
+ await waitUntilQueueEmpty();
+ });
+
+ beforeAll(async () => {
+ const ingressPort = inject("restateIngressPort");
+ const adminPort = inject("restateAdminPort");
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ const provider = new RestateQueueProvider();
+ const client = await provider.getClient();
+
+ if (!client) {
+ throw new Error("Failed to create queue client");
+ }
+
+ queueClient = client;
+ adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
+
+ queue = queueClient.createQueue<TestAction>("test-queue", {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ });
+
+ queueClient.createRunner(
+ queue,
+ {
+ run: async (job) => {
+ testState.inFlight++;
+ testState.maxInFlight = Math.max(
+ testState.maxInFlight,
+ testState.inFlight,
+ );
+ const jobData = job.data;
+ switch (jobData.type) {
+ case "val":
+ testState.results.push(jobData.val);
+ break;
+ case "err":
+ throw new Error(jobData.err);
+ case "stall":
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.durSec * 1000),
+ );
+ break;
+ }
+ },
+ onError: async (job) => {
+ testState.inFlight--;
+ const jobData = job.data;
+ if (jobData && jobData.type === "err") {
+ testState.errors.push(jobData.err);
+ }
+ },
+ onComplete: async () => {
+ testState.inFlight--;
+ },
+ },
+ {
+ concurrency: 3,
+ timeoutSecs: 2,
+ pollIntervalMs: 0 /* Doesn't matter */,
+ },
+ );
+
+ await queueClient.prepare();
+ await queueClient.start();
+
+ await adminClient.upsertDeployment("http://host.docker.internal:9080");
+ }, 90000);
+
+ afterAll(async () => {
+ if (queueClient?.shutdown) {
+ await queueClient.shutdown();
+ }
+ });
+
+ it("should enqueue and process a job", async () => {
+ const jobId = await queue.enqueue({ type: "val", val: 42 });
+
+ expect(jobId).toBeDefined();
+ expect(typeof jobId).toBe("string");
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([42]);
+ }, 60000);
+
+ it("should process multiple jobs", async () => {
+ await queue.enqueue({ type: "val", val: 1 });
+ await queue.enqueue({ type: "val", val: 2 });
+ await queue.enqueue({ type: "val", val: 3 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results.length).toEqual(3);
+ expect(testState.results).toContain(1);
+ expect(testState.results).toContain(2);
+ expect(testState.results).toContain(3);
+ }, 60000);
+
+ it("should retry failed jobs", async () => {
+ await queue.enqueue({ type: "err", err: "Test error" });
+
+ await waitUntilQueueEmpty();
+
+ // Initial attempt + 3 retries
+ expect(testState.errors).toEqual([
+ "Test error",
+ "Test error",
+ "Test error",
+ "Test error",
+ ]);
+ }, 90000);
+
+ it("should use idempotency key", async () => {
+ const idempotencyKey = `test-${Date.now()}`;
+
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([200]);
+ }, 60000);
+
+ it("should handle concurrent jobs", async () => {
+ const promises = [];
+ for (let i = 300; i < 320; i++) {
+ promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
+ }
+ await Promise.all(promises);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.maxInFlight).toEqual(3);
+ }, 60000);
+
+ it("should handle priorities", async () => {
+ // Hog the queue first
+ await Promise.all([
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ ]);
+
+ // Then those will get reprioritized
+ await Promise.all([
+ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
+ queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
+ queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
+
+ queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
+ queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
+ queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
+ ]);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ // Then in order of increasing priority
+ 302, 301, 300, 200, 201, 202,
+ ]);
+ }, 60000);
+});
diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
new file mode 100644
index 00000000..7d9dea5c
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts
@@ -0,0 +1,90 @@
+import { execSync } from "child_process";
+import net from "net";
+import path from "path";
+import { fileURLToPath } from "url";
+import type { GlobalSetupContext } from "vitest/node";
+
+import { waitUntil } from "../utils.js";
+
+async function getRandomPort(): Promise<number> {
+ const server = net.createServer();
+ return new Promise<number>((resolve, reject) => {
+ server.unref();
+ server.on("error", reject);
+ server.listen(0, () => {
+ const port = (server.address() as net.AddressInfo).port;
+ server.close(() => resolve(port));
+ });
+ });
+}
+
+async function waitForHealthy(
+ ingressPort: number,
+ adminPort: number,
+ timeout = 60000,
+): Promise<void> {
+ await waitUntil(
+ async () => {
+ const response = await fetch(`http://localhost:${adminPort}/health`);
+ return response.ok;
+ },
+ "Restate admin API is healthy",
+ timeout,
+ );
+
+ await waitUntil(
+ async () => {
+ const response = await fetch(
+ `http://localhost:${ingressPort}/restate/health`,
+ );
+ return response.ok;
+ },
+ "Restate ingress is healthy",
+ timeout,
+ );
+}
+
+export default async function ({ provide }: GlobalSetupContext) {
+ const __dirname = path.dirname(fileURLToPath(import.meta.url));
+ const ingressPort = await getRandomPort();
+ const adminPort = await getRandomPort();
+
+ console.log(
+ `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
+ );
+ execSync(`docker compose up -d`, {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ env: {
+ ...process.env,
+ RESTATE_INGRESS_PORT: ingressPort.toString(),
+ RESTATE_ADMIN_PORT: adminPort.toString(),
+ },
+ });
+
+ console.log("Waiting for Restate to become healthy...");
+ await waitForHealthy(ingressPort, adminPort);
+
+ provide("restateIngressPort", ingressPort);
+ provide("restateAdminPort", adminPort);
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ return async () => {
+ console.log("Stopping Restate...");
+ execSync("docker compose down", {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ });
+ return Promise.resolve();
+ };
+}
+
+declare module "vitest" {
+ export interface ProvidedContext {
+ restateIngressPort: number;
+ restateAdminPort: number;
+ }
+}
diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts
new file mode 100644
index 00000000..e02d2dee
--- /dev/null
+++ b/packages/plugins-queue-restate/src/tests/utils.ts
@@ -0,0 +1,23 @@
+export async function waitUntil(
+ f: () => Promise<boolean>,
+ description: string,
+ timeoutMs = 60000,
+): Promise<void> {
+ const startTime = Date.now();
+
+ while (Date.now() - startTime < timeoutMs) {
+ console.log(`Waiting for ${description}...`);
+ try {
+ const res = await f();
+ if (res) {
+ console.log(`${description}: success`);
+ return;
+ }
+ } catch (error) {
+ console.log(`${description}: error, retrying...: ${error}`);
+ }
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+
+ throw new Error(`${description}: timeout after ${timeoutMs}ms`);
+}
diff --git a/packages/plugins-queue-restate/tsconfig.json b/packages/plugins-queue-restate/tsconfig.json
new file mode 100644
index 00000000..3bfa695c
--- /dev/null
+++ b/packages/plugins-queue-restate/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@karakeep/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ }
+}
+
diff --git a/packages/plugins-queue-restate/vitest.config.ts b/packages/plugins-queue-restate/vitest.config.ts
new file mode 100644
index 00000000..73e0e1b9
--- /dev/null
+++ b/packages/plugins-queue-restate/vitest.config.ts
@@ -0,0 +1,14 @@
+/// <reference types="vitest" />
+
+import tsconfigPaths from "vite-tsconfig-paths";
+import { defineConfig } from "vitest/config";
+
+export default defineConfig({
+ plugins: [tsconfigPaths()],
+ test: {
+ globalSetup: ["./src/tests/setup/startContainers.ts"],
+ teardownTimeout: 30000,
+ include: ["src/tests/**/*.test.ts"],
+ testTimeout: 60000,
+ },
+});
diff --git a/packages/plugins-search-meilisearch/package.json b/packages/plugins-search-meilisearch/package.json
index 3bc9db80..c9482731 100644
--- a/packages/plugins-search-meilisearch/package.json
+++ b/packages/plugins-search-meilisearch/package.json
@@ -6,8 +6,8 @@
"type": "module",
"scripts": {
"typecheck": "tsc --noEmit",
- "format": "prettier . --ignore-path ../../.prettierignore",
- "format:fix": "prettier . --write --ignore-path ../../.prettierignore",
+ "format": "prettier . --cache --ignore-path ../../.prettierignore --check",
+ "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write",
"lint": "oxlint .",
"lint:fix": "oxlint . --fix",
"test": "vitest"
diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json
index 9c1b52a8..d18eb4a0 100644
--- a/packages/shared-server/package.json
+++ b/packages/shared-server/package.json
@@ -7,6 +7,7 @@
"dependencies": {
"@karakeep/db": "workspace:^0.1.0",
"@karakeep/plugins-queue-liteque": "workspace:^0.1.0",
+ "@karakeep/plugins-queue-restate": "workspace:^0.1.0",
"@karakeep/plugins-search-meilisearch": "workspace:^0.1.0",
"@karakeep/shared": "workspace:^0.1.0"
},
diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts
index b6a88462..de08b9b0 100644
--- a/packages/shared-server/src/plugins.ts
+++ b/packages/shared-server/src/plugins.ts
@@ -8,6 +8,7 @@ export async function loadAllPlugins() {
// Load plugins here. Order of plugin loading matter.
// Queue provider(s)
await import("@karakeep/plugins-queue-liteque");
+ await import("@karakeep/plugins-queue-restate");
await import("@karakeep/plugins-search-meilisearch");
PluginManager.logAllPlugins();
pluginsLoaded = true;
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index 1c4e0452..c9f8276d 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -8,8 +8,12 @@ import { loadAllPlugins } from ".";
await loadAllPlugins();
const QUEUE_CLIENT = await getQueueClient();
-export function runQueueDBMigrations() {
- QUEUE_CLIENT.init();
+export async function prepareQueue() {
+ await QUEUE_CLIENT.prepare();
+}
+
+export async function startQueue() {
+ await QUEUE_CLIENT.start();
}
// Link Crawler
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
index dfe3b31a..e401972b 100644
--- a/packages/shared/queueing.ts
+++ b/packages/shared/queueing.ts
@@ -3,7 +3,6 @@ import { ZodType } from "zod";
import { PluginManager, PluginType } from "./plugins";
export interface EnqueueOptions {
- numRetries?: number;
idempotencyKey?: string;
priority?: number;
delayMs?: number;
@@ -47,6 +46,7 @@ export interface RunnerOptions<T> {
}
export interface Queue<T> {
+ opts: QueueOptions;
name(): string;
enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
stats(): Promise<{
@@ -65,7 +65,8 @@ export interface Runner<_T> {
}
export interface QueueClient {
- init(): Promise<void>;
+ prepare(): Promise<void>;
+ start(): Promise<void>;
createQueue<T>(name: string, options: QueueOptions): Queue<T>;
createRunner<T>(
queue: Queue<T>,
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index f04745e5..6846af89 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -1158,6 +1158,31 @@ importers:
specifier: ^3.2.4
version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@17.4.9)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)
+ packages/plugins-queue-restate:
+ dependencies:
+ '@karakeep/shared':
+ specifier: workspace:*
+ version: link:../shared
+ '@restatedev/restate-sdk':
+ specifier: ^1.9.0
+ version: 1.9.0
+ '@restatedev/restate-sdk-clients':
+ specifier: ^1.9.0
+ version: 1.9.0
+ devDependencies:
+ '@karakeep/prettier-config':
+ specifier: workspace:^0.1.0
+ version: link:../../tooling/prettier
+ '@karakeep/tsconfig':
+ specifier: workspace:^0.1.0
+ version: link:../../tooling/typescript
+ vite-tsconfig-paths:
+ specifier: ^4.3.1
+ version: 4.3.2(typescript@5.8.3)(vite@7.0.6(@types/node@22.15.30)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0))
+ vitest:
+ specifier: ^3.2.4
+ version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@17.4.9)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)
+
packages/plugins-search-meilisearch:
dependencies:
'@karakeep/shared':
@@ -1302,6 +1327,9 @@ importers:
'@karakeep/plugins-queue-liteque':
specifier: workspace:^0.1.0
version: link:../plugins-queue-liteque
+ '@karakeep/plugins-queue-restate':
+ specifier: workspace:^0.1.0
+ version: link:../plugins-queue-restate
'@karakeep/plugins-search-meilisearch':
specifier: workspace:^0.1.0
version: link:../plugins-search-meilisearch
@@ -4895,6 +4923,18 @@ packages:
'@remusao/trie@2.1.0':
resolution: {integrity: sha512-Er3Q8q0/2OcCJPQYJOPLmCuqO0wu7cav3SPtpjlxSbjFi1x+A1pZkkLD6c9q2rGEkGW/tkrRzfrhNMt8VQjzXg==}
+ '@restatedev/restate-sdk-clients@1.9.0':
+ resolution: {integrity: sha512-vgKlDXQ4fkR9EU+MF/37edxmrLfFlGlorXsbumhkriGgBVimTKigdl/sdSphcAqdnPIq/tBd3H18Qj83uxKezQ==}
+ engines: {node: '>= 20.19'}
+
+ '@restatedev/restate-sdk-core@1.9.0':
+ resolution: {integrity: sha512-q3S7hnxcP3zU8cDcaAkYmO50zT8dykzaBm65D+hyJgba74gs/h5HMjbS8gbZgRezbJHtRrRsJmnNFet/qCj4pQ==}
+ engines: {node: '>= 20.19'}
+
+ '@restatedev/restate-sdk@1.9.0':
+ resolution: {integrity: sha512-PSvFpiD7qGN1DVBWuFHR4x5j8Mk6rAl5r75/XUAfW/YN/YchuTK4jIO4nqBLvbJ4dzioDlFbLmLZprKBVfX2tA==}
+ engines: {node: '>= 20.19'}
+
'@rn-primitives/hooks@1.3.0':
resolution: {integrity: sha512-BR97reSu7uVDpyMeQdRJHT0w8KdS6jdYnOL6xQtqS2q3H6N7vXBlX4LFERqJZphD+aziJFIAJ3HJF1vtt6XlpQ==}
peerDependencies:
@@ -19779,6 +19819,16 @@ snapshots:
'@remusao/trie@2.1.0': {}
+ '@restatedev/restate-sdk-clients@1.9.0':
+ dependencies:
+ '@restatedev/restate-sdk-core': 1.9.0
+
+ '@restatedev/restate-sdk-core@1.9.0': {}
+
+ '@restatedev/restate-sdk@1.9.0':
+ dependencies:
+ '@restatedev/restate-sdk-core': 1.9.0
+
'@rn-primitives/hooks@1.3.0(react-native@0.79.3(@babel/core@7.26.0)(@types/react@19.1.8)(react@19.1.0))(react@19.1.0)':
dependencies:
'@rn-primitives/types': 1.2.0(react-native@0.79.3(@babel/core@7.26.0)(@types/react@19.1.8)(react@19.1.0))(react@19.1.0)