aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/workers/package.json2
-rw-r--r--packages/plugins/package.json2
-rw-r--r--packages/plugins/queue-liteque/src/index.ts16
-rw-r--r--packages/plugins/queue-restate/src/index.ts4
-rw-r--r--packages/plugins/queue-restate/src/service.ts12
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts8
-rw-r--r--packages/shared/queueing.ts10
-rw-r--r--pnpm-lock.yaml14
8 files changed, 32 insertions, 36 deletions
diff --git a/apps/workers/package.json b/apps/workers/package.json
index f35a52f4..ecbc7684 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -25,7 +25,7 @@
"https-proxy-agent": "^7.0.6",
"ipaddr.js": "^2.2.0",
"jsdom": "^24.0.0",
- "liteque": "^0.6.2",
+ "liteque": "^0.7.0",
"lru-cache": "^11.2.2",
"metascraper": "^5.49.5",
"metascraper-amazon": "^5.49.5",
diff --git a/packages/plugins/package.json b/packages/plugins/package.json
index 5931d1a7..daf46843 100644
--- a/packages/plugins/package.json
+++ b/packages/plugins/package.json
@@ -22,7 +22,7 @@
"@karakeep/shared": "workspace:*",
"@restatedev/restate-sdk": "^1.9.0",
"@restatedev/restate-sdk-clients": "^1.9.0",
- "liteque": "^0.6.2",
+ "liteque": "^0.7.0",
"meilisearch": "^0.45.0"
},
"devDependencies": {
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
index ddc2181c..94fa795f 100644
--- a/packages/plugins/queue-liteque/src/index.ts
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -8,8 +8,6 @@ import {
import type { PluginProvider } from "@karakeep/shared/plugins";
import type {
- DequeuedJob,
- DequeuedJobError,
EnqueueOptions,
Queue,
QueueClient,
@@ -82,9 +80,9 @@ class LitequeQueueClient implements QueueClient {
return wrapper;
}
- createRunner<T>(
+ createRunner<T, R = void>(
queue: Queue<T>,
- funcs: RunnerFuncs<T>,
+ funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
): Runner<T> {
const name = queue.name();
@@ -93,16 +91,12 @@ class LitequeQueueClient implements QueueClient {
throw new Error(`Queue ${name} not found`);
}
- const runner = new LQRunner<T>(
+ const runner = new LQRunner<T, R>(
wrapper._impl,
{
run: funcs.run,
- onComplete: funcs.onComplete as
- | ((job: DequeuedJob<T>) => Promise<void>)
- | undefined,
- onError: funcs.onError as
- | ((job: DequeuedJobError<T>) => Promise<void>)
- | undefined,
+ onComplete: funcs.onComplete,
+ onError: funcs.onError,
},
{
pollIntervalMs: opts.pollIntervalMs ?? 1000,
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index bedc26af..98668872 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -162,9 +162,9 @@ class RestateQueueClient implements QueueClient {
return wrapper;
}
- createRunner<T>(
+ createRunner<T, R = void>(
queue: Queue<T>,
- funcs: RunnerFuncs<T>,
+ funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
): Runner<T> {
const name = queue.name();
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index de5b070f..d19cad4c 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -11,9 +11,9 @@ import { tryCatch } from "@karakeep/shared/tryCatch";
import { genId } from "./idProvider";
import { RestateSemaphore } from "./semaphore";
-export function buildRestateService<T>(
+export function buildRestateService<T, R>(
queue: Queue<T>,
- funcs: RunnerFuncs<T>,
+ funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
queueOpts: QueueOptions,
) {
@@ -84,9 +84,9 @@ export function buildRestateService<T>(
});
}
-async function runWorkerLogic<T>(
+async function runWorkerLogic<T, R>(
ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T>,
+ { run, onError, onComplete }: RunnerFuncs<T, R>,
data: {
id: string;
data: T;
@@ -100,7 +100,7 @@ async function runWorkerLogic<T>(
ctx.run(
`main logic`,
async () => {
- await run(data);
+ return await run(data);
},
{
maxRetryAttempts: 1,
@@ -125,7 +125,7 @@ async function runWorkerLogic<T>(
}
await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data), {
+ ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
maxRetryAttempts: 1,
}),
);
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index e59d47cb..d716671b 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -90,8 +90,7 @@ describe("Restate Queue Provider", () => {
const jobData = job.data;
switch (jobData.type) {
case "val":
- testState.results.push(jobData.val);
- break;
+ return jobData.val;
case "err":
throw new Error(jobData.err);
case "stall":
@@ -108,8 +107,11 @@ describe("Restate Queue Provider", () => {
testState.errors.push(jobData.err);
}
},
- onComplete: async () => {
+ onComplete: async (_j, res) => {
testState.inFlight--;
+ if (res) {
+ testState.results.push(res);
+ }
},
},
{
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
index e401972b..ed6759dd 100644
--- a/packages/shared/queueing.ts
+++ b/packages/shared/queueing.ts
@@ -32,9 +32,9 @@ export interface DequeuedJobError<T> {
numRetriesLeft: number;
}
-export interface RunnerFuncs<T> {
- run: (job: DequeuedJob<T>) => Promise<void>;
- onComplete?: (job: DequeuedJob<T>) => Promise<void>;
+export interface RunnerFuncs<T, R = void> {
+ run: (job: DequeuedJob<T>) => Promise<R>;
+ onComplete?: (job: DequeuedJob<T>, result: R) => Promise<void>;
onError?: (job: DequeuedJobError<T>) => Promise<void>;
}
@@ -68,9 +68,9 @@ export interface QueueClient {
prepare(): Promise<void>;
start(): Promise<void>;
createQueue<T>(name: string, options: QueueOptions): Queue<T>;
- createRunner<T>(
+ createRunner<T, R = void>(
queue: Queue<T>,
- funcs: RunnerFuncs<T>,
+ funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
): Runner<T>;
shutdown?(): Promise<void>;
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 47387568..4a920330 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -839,8 +839,8 @@ importers:
specifier: ^24.0.0
version: 24.1.3
liteque:
- specifier: ^0.6.2
- version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
+ specifier: ^0.7.0
+ version: 0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
lru-cache:
specifier: ^11.2.2
version: 11.2.2
@@ -1160,8 +1160,8 @@ importers:
specifier: ^1.9.0
version: 1.9.0
liteque:
- specifier: ^0.6.2
- version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
+ specifier: ^0.7.0
+ version: 0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
meilisearch:
specifier: ^0.45.0
version: 0.45.0
@@ -9957,8 +9957,8 @@ packages:
resolution: {integrity: sha512-wUayTU8MS827Dam6MxgD72Ui+KOSF+u/eIqpatOtjnvgJ0+mnDq33uC2M7J0tPK+upe/DpUAuK4JUU89iBoNKQ==}
engines: {node: '>=4'}
- liteque@0.6.2:
- resolution: {integrity: sha512-q99LW+XXHsy3/r1YFTwEmFz1q9Fd8gd9bnBw03gcIlrXjqcmW3av2OrmSKytwUIMdQwbHB0zI5fk/nC/J6xIUQ==}
+ liteque@0.7.0:
+ resolution: {integrity: sha512-3eov7hUb+fsQgX2E1Xl1/egxdjnA8opSh+xmB8iIpJthcHYPvFdyI4UIB7JzXnYbMjiNCKDELrk+AjqGLlvp1Q==}
peerDependencies:
better-sqlite3: '>=7'
@@ -25678,7 +25678,7 @@ snapshots:
liquid-json@0.3.1: {}
- liteque@0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0):
+ liteque@0.7.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0):
dependencies:
async-mutex: 0.4.1
better-sqlite3: 11.3.0