aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-11-09 14:00:27 +0000
committerMohamed Bassem <me@mbassem.com>2024-11-09 14:00:27 +0000
commitf8bed574ae11a29c9c59f7d96721805188a507db (patch)
tree1f897f80976c1264eb78d42b7aa9b912379b38a8 /apps/workers
parent10070c1752b5efcbfdf351454de06fbbbe9fb0b6 (diff)
downloadkarakeep-f8bed574ae11a29c9c59f7d96721805188a507db.tar.zst
fix: Only update bookmark tagging/crawling status when worker is out of retries
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/crawlerWorker.ts8
-rw-r--r--apps/workers/openaiWorker.ts12
-rw-r--r--apps/workers/searchWorker.ts6
-rw-r--r--apps/workers/tidyAssetsWorker.ts6
-rw-r--r--apps/workers/videoWorker.ts13
5 files changed, 26 insertions, 19 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index f607234e..c8141d39 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -166,20 +166,20 @@ export class CrawlerWorker {
/* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
),
onComplete: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
- const bookmarkId = job?.data.bookmarkId;
+ const bookmarkId = job.data.bookmarkId;
if (bookmarkId) {
await changeBookmarkStatus(bookmarkId, "success");
}
},
onError: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.error(
`[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`,
);
const bookmarkId = job.data?.bookmarkId;
- if (bookmarkId) {
+ if (bookmarkId && job.numRetriesLeft == 0) {
await changeBookmarkStatus(bookmarkId, "failure");
}
},
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 571d5b73..948e92a7 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -68,16 +68,18 @@ export class OpenAiWorker {
{
run: runOpenAI,
onComplete: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.info(`[inference][${jobId}] Completed successfully`);
- await attemptMarkTaggingStatus(job?.data, "success");
+ await attemptMarkTaggingStatus(job.data, "success");
},
onError: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.error(
`[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`,
);
- await attemptMarkTaggingStatus(job?.data, "failure");
+ if (job.numRetriesLeft == 0) {
+ await attemptMarkTaggingStatus(job?.data, "failure");
+ }
},
},
{
@@ -387,7 +389,7 @@ async function connectTags(
}
async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
- const jobId = job.id ?? "unknown";
+ const jobId = job.id;
const inferenceClient = InferenceClientFactory.build();
if (!inferenceClient) {
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts
index d2f1dffc..1fbdbe73 100644
--- a/apps/workers/searchWorker.ts
+++ b/apps/workers/searchWorker.ts
@@ -19,12 +19,12 @@ export class SearchIndexingWorker {
{
run: runSearchIndexing,
onComplete: (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.info(`[search][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.error(
`[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
);
@@ -117,7 +117,7 @@ async function runDelete(
}
async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
- const jobId = job.id ?? "unknown";
+ const jobId = job.id;
const request = zSearchIndexingRequestSchema.safeParse(job.data);
if (!request.success) {
diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts
index c70736f2..bea0b7d9 100644
--- a/apps/workers/tidyAssetsWorker.ts
+++ b/apps/workers/tidyAssetsWorker.ts
@@ -19,12 +19,12 @@ export class TidyAssetsWorker {
{
run: runTidyAssets,
onComplete: (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.info(`[tidyAssets][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.error(
`[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`,
);
@@ -86,7 +86,7 @@ async function handleAsset(
}
async function runTidyAssets(job: DequeuedJob<ZTidyAssetsRequest>) {
- const jobId = job.id ?? "unknown";
+ const jobId = job.id;
const request = zTidyAssetsRequestSchema.safeParse(job.data);
if (!request.success) {
diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts
index a85a8cae..444448f7 100644
--- a/apps/workers/videoWorker.ts
+++ b/apps/workers/videoWorker.ts
@@ -14,7 +14,11 @@ import {
} from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
-import { VideoWorkerQueue, ZVideoRequest } from "@hoarder/shared/queues";
+import {
+ VideoWorkerQueue,
+ ZVideoRequest,
+ zvideoRequestSchema,
+} from "@hoarder/shared/queues";
import { withTimeout } from "./utils";
import { getBookmarkDetails, updateAsset } from "./workerUtils";
@@ -33,14 +37,14 @@ export class VideoWorker {
/* timeoutSec */ serverConfig.crawler.downloadVideoTimeout,
),
onComplete: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.info(
`[VideoCrawler][${jobId}] Video Download Completed successfully`,
);
return Promise.resolve();
},
onError: async (job) => {
- const jobId = job?.id ?? "unknown";
+ const jobId = job.id;
logger.error(
`[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`,
);
@@ -51,6 +55,7 @@ export class VideoWorker {
pollIntervalMs: 1000,
timeoutSecs: serverConfig.crawler.downloadVideoTimeout,
concurrency: 1,
+ validator: zvideoRequestSchema,
},
);
}
@@ -71,7 +76,7 @@ function prepareYtDlpArguments(url: string, assetPath: string) {
}
async function runWorker(job: DequeuedJob<ZVideoRequest>) {
- const jobId = job.id ?? "unknown";
+ const jobId = job.id;
const { bookmarkId } = job.data;
const {