diff options
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/index.ts | 3 | ||||
| -rw-r--r-- | apps/workers/package.json | 1 | ||||
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 70 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 28 | ||||
| -rw-r--r-- | apps/workers/workers/feedWorker.ts | 11 | ||||
| -rw-r--r-- | apps/workers/workers/inference/inferenceWorker.ts | 10 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 4 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 14 | ||||
| -rw-r--r-- | apps/workers/workers/ruleEngineWorker.ts | 14 | ||||
| -rw-r--r-- | apps/workers/workers/searchWorker.ts | 59 | ||||
| -rw-r--r-- | apps/workers/workers/tidyAssetsWorker.ts | 12 | ||||
| -rw-r--r-- | apps/workers/workers/videoWorker.ts | 19 | ||||
| -rw-r--r-- | apps/workers/workers/webhookWorker.ts | 12 |
13 files changed, 134 insertions, 123 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index cd860fc0..578ff6c8 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -2,10 +2,9 @@ import "dotenv/config"; import { buildServer } from "server"; -import { loadAllPlugins } from "@karakeep/shared-server"; +import { loadAllPlugins, runQueueDBMigrations } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; -import { runQueueDBMigrations } from "@karakeep/shared/queues"; import { shutdownPromise } from "./exit"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; diff --git a/apps/workers/package.json b/apps/workers/package.json index 6de801dd..030250e0 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -24,7 +24,6 @@ "http-proxy-agent": "^7.0.2", "https-proxy-agent": "^7.0.6", "jsdom": "^24.0.0", - "liteque": "^0.6.0", "metascraper": "^5.46.18", "metascraper-amazon": "^5.46.18", "metascraper-author": "5.46.18", diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index d2df1724..2dcec59b 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,12 +1,11 @@ import os from "os"; import { eq } from "drizzle-orm"; -import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; -import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; +import type { AssetPreprocessingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { assets, @@ -14,44 +13,53 @@ import { bookmarkAssets, bookmarks, } from "@karakeep/db/schema"; -import { QuotaService, StorageQuotaError } from "@karakeep/shared-server"; -import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { AssetPreprocessingQueue, OpenAIQueue, + QuotaService, + StorageQuotaError, triggerSearchReindex, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + DequeuedJob, + EnqueueOptions, + getQueueClient, +} from "@karakeep/shared/queueing"; export class AssetPreprocessingWorker { - static build() { + static async build() { logger.info("Starting asset preprocessing worker ..."); - const worker = new Runner<AssetPreprocessingRequest>( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - workerStatsCounter.labels("assetPreprocessing", "completed").inc(); - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); + const worker = + (await getQueueClient())!.createRunner<AssetPreprocessingRequest>( + AssetPreprocessingQueue, + { + run: run, + onComplete: async (job) => { + workerStatsCounter.labels("assetPreprocessing", "completed").inc(); + const jobId = job.id; + logger.info( + `[assetPreprocessing][${jobId}] Completed successfully`, + ); + return Promise.resolve(); + }, + onError: async (job) => { + workerStatsCounter.labels("assetPreProcessing", "failed").inc(); + const jobId = job.id; + logger.error( + `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, }, - onError: async (job) => { - workerStatsCounter.labels("assetPreProcessing", "failed").inc(); - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); + { + concurrency: serverConfig.assetPreprocessing.numWorkers, + pollIntervalMs: 1000, + timeoutSecs: 30, }, - }, - { - concurrency: serverConfig.assetPreprocessing.numWorkers, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); + ); return worker; } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 79d8e06a..baea1346 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -10,7 +10,6 @@ import { eq } from "drizzle-orm"; import { execa } from "execa"; import { exitAbortController } from "exit"; import { JSDOM, VirtualConsole } from "jsdom"; -import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import metascraper from "metascraper"; import metascraperAmazon from "metascraper-amazon"; import metascraperAuthor from "metascraper-author"; @@ -30,7 +29,7 @@ import { fetchWithProxy } from "utils"; import { getBookmarkDetails, updateAsset } from "workerUtils"; import { z } from "zod"; -import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; +import type { ZCrawlLinkRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { assets, @@ -40,7 +39,16 @@ import { bookmarks, users, } from "@karakeep/db/schema"; -import { QuotaService } from "@karakeep/shared-server"; +import { + AssetPreprocessingQueue, + LinkCrawlerQueue, + OpenAIQueue, + QuotaService, + triggerSearchReindex, + triggerWebhook, + VideoWorkerQueue, + zCrawlLinkRequestSchema, +} from "@karakeep/shared-server"; import { ASSET_TYPES, getAssetSize, @@ -55,14 +63,10 @@ import { import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { - AssetPreprocessingQueue, - LinkCrawlerQueue, - OpenAIQueue, - triggerSearchReindex, - triggerWebhook, - VideoWorkerQueue, - zCrawlLinkRequestSchema, -} from "@karakeep/shared/queues"; + DequeuedJob, + EnqueueOptions, + getQueueClient, +} from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -247,7 +251,7 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = new Runner<ZCrawlLinkRequest>( + const worker = (await getQueueClient())!.createRunner<ZCrawlLinkRequest>( LinkCrawlerQueue, { run: runCrawler, diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 2ece4890..048d9696 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -1,5 +1,4 @@ import { and, eq, inArray } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import cron from "node-cron"; import Parser from "rss-parser"; @@ -7,12 +6,12 @@ import { buildImpersonatingTRPCClient } from "trpc"; import { fetchWithProxy } from "utils"; import { z } from "zod"; -import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; +import type { ZFeedRequestSchema } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; -import { QuotaService } from "@karakeep/shared-server"; +import { FeedQueue, QuotaService } from "@karakeep/shared-server"; import logger from "@karakeep/shared/logger"; -import { FeedQueue } from "@karakeep/shared/queues"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; export const FeedRefreshingWorker = cron.schedule( @@ -46,9 +45,9 @@ export const FeedRefreshingWorker = cron.schedule( ); export class FeedWorker { - static build() { + static async build() { logger.info("Starting feed worker ..."); - const worker = new Runner<ZFeedRequestSchema>( + const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>( FeedQueue, { run: run, diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index 32de3806..065462b3 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -1,14 +1,14 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import type { ZOpenAIRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; +import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClientFactory } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; -import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { runSummarization } from "./summarize"; import { runTagging } from "./tagging"; @@ -37,9 +37,9 @@ async function attemptMarkStatus( } export class OpenAiWorker { - static build() { + static async build() { logger.info("Starting inference worker ..."); - const worker = new Runner<ZOpenAIRequest>( + const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>( OpenAIQueue, { run: runOpenAI, diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 02a7f4e0..0b25470e 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -1,13 +1,13 @@ import { and, eq } from "drizzle-orm"; -import { DequeuedJob } from "liteque"; import { db } from "@karakeep/db"; import { bookmarks, customPrompts } from "@karakeep/db/schema"; +import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClient } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; import { buildSummaryPrompt } from "@karakeep/shared/prompts"; -import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues"; +import { DequeuedJob } from "@karakeep/shared/queueing"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 6dd146af..0d245644 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,13 +1,12 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob, EnqueueOptions } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; +import type { ZOpenAIRequest } from "@karakeep/shared-server"; import type { InferenceClient, InferenceResponse, } from "@karakeep/shared/inference"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; import { bookmarks, @@ -15,15 +14,16 @@ import { customPrompts, tagsOnBookmarks, } from "@karakeep/db/schema"; -import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; import { triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; +import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; const openAIResponseSchema = z.object({ diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 2a4fbb1a..37c7f595 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -1,23 +1,23 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import { buildImpersonatingAuthedContext } from "trpc"; -import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import type { ZRuleEngineRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { RuleEngineQueue, zRuleEngineRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; export class RuleEngineWorker { - static build() { + static async build() { logger.info("Starting rule engine worker ..."); - const worker = new Runner<ZRuleEngineRequest>( + const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>( RuleEngineQueue, { run: runRuleEngine, diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index 1433ef3c..8f043c1d 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -1,16 +1,16 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; -import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; +import type { ZSearchIndexingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { SearchIndexingQueue, zSearchIndexingRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { BookmarkSearchDocument, getSearchClient, @@ -19,33 +19,34 @@ import { import { Bookmark } from "@karakeep/trpc/models/bookmarks"; export class SearchIndexingWorker { - static build() { + static async build() { logger.info("Starting search indexing worker ..."); - const worker = new Runner<ZSearchIndexingRequest>( - SearchIndexingQueue, - { - run: runSearchIndexing, - onComplete: (job) => { - workerStatsCounter.labels("search", "completed").inc(); - const jobId = job.id; - logger.info(`[search][${jobId}] Completed successfully`); - return Promise.resolve(); + const worker = + (await getQueueClient())!.createRunner<ZSearchIndexingRequest>( + SearchIndexingQueue, + { + run: runSearchIndexing, + onComplete: (job) => { + workerStatsCounter.labels("search", "completed").inc(); + const jobId = job.id; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + workerStatsCounter.labels("search", "failed").inc(); + const jobId = job.id; + logger.error( + `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, }, - onError: (job) => { - workerStatsCounter.labels("search", "failed").inc(); - const jobId = job.id; - logger.error( - `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); + { + concurrency: serverConfig.search.numWorkers, + pollIntervalMs: 1000, + timeoutSecs: 30, }, - }, - { - concurrency: serverConfig.search.numWorkers, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); + ); return worker; } diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts index cf3e33b6..b5b95185 100644 --- a/apps/workers/workers/tidyAssetsWorker.ts +++ b/apps/workers/workers/tidyAssetsWorker.ts @@ -1,21 +1,21 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import { db } from "@karakeep/db"; import { assets } from "@karakeep/db/schema"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; import { TidyAssetsQueue, ZTidyAssetsRequest, zTidyAssetsRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; export class TidyAssetsWorker { - static build() { + static async build() { logger.info("Starting tidy assets worker ..."); - const worker = new Runner<ZTidyAssetsRequest>( + const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>( TidyAssetsQueue, { run: runTidyAssets, diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index 68be0126..a41eb069 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -2,12 +2,17 @@ import fs from "fs"; import * as os from "os";
import path from "path";
import { execa } from "execa";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
-import { QuotaService, StorageQuotaError } from "@karakeep/shared-server";
+import {
+ QuotaService,
+ StorageQuotaError,
+ VideoWorkerQueue,
+ ZVideoRequest,
+ zvideoRequestSchema,
+} from "@karakeep/shared-server";
import {
ASSET_TYPES,
newAssetId,
@@ -16,21 +21,17 @@ import { } from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import {
- VideoWorkerQueue,
- ZVideoRequest,
- zvideoRequestSchema,
-} from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { getBookmarkDetails, updateAsset } from "../workerUtils";
const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads");
export class VideoWorker {
- static build() {
+ static async build() {
logger.info("Starting video worker ...");
- return new Runner<ZVideoRequest>(
+ return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
run: runWorker,
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 96070e22..2bbef160 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -1,22 +1,22 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import fetch from "node-fetch"; import { db } from "@karakeep/db"; import { bookmarks, webhooksTable } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { WebhookQueue, ZWebhookRequest, zWebhookRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; export class WebhookWorker { - static build() { + static async build() { logger.info("Starting webhook worker ..."); - const worker = new Runner<ZWebhookRequest>( + const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>( WebhookQueue, { run: runWebhook, |
