From 36309aecbaab4ec94791fd1fce91676b30e6bd7c Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Mon, 9 Feb 2026 01:38:35 +0000 Subject: Revert "refactor: move assetdb to shared-server" This reverts commit a04d3c35fc9082e529a713605a038d236bb072c7. --- packages/api/utils/assets.ts | 2 +- packages/api/utils/upload.ts | 2 +- packages/e2e_tests/package.json | 1 - packages/e2e_tests/tests/assetdb/assetdb-utils.ts | 2 +- .../tests/assetdb/interface-compliance.test.ts | 2 +- .../tests/assetdb/local-filesystem-store.test.ts | 2 +- packages/e2e_tests/tests/assetdb/s3-store.test.ts | 2 +- packages/shared-server/assetdb.ts | 773 --------------------- packages/shared-server/package.json | 8 +- packages/shared/assetdb.ts | 761 ++++++++++++++++++++ packages/shared/package.json | 2 + packages/trpc/models/assets.ts | 2 +- packages/trpc/models/backups.ts | 2 +- packages/trpc/models/bookmarks.ts | 2 +- packages/trpc/models/users.ts | 2 +- packages/trpc/routers/bookmarks.ts | 2 +- 16 files changed, 776 insertions(+), 791 deletions(-) delete mode 100644 packages/shared-server/assetdb.ts create mode 100644 packages/shared/assetdb.ts (limited to 'packages') diff --git a/packages/api/utils/assets.ts b/packages/api/utils/assets.ts index 2ebd691d..7936f4ba 100644 --- a/packages/api/utils/assets.ts +++ b/packages/api/utils/assets.ts @@ -5,7 +5,7 @@ import { createAssetReadStream, getAssetSize, readAssetMetadata, -} from "@karakeep/shared-server/assetdb"; +} from "@karakeep/shared/assetdb"; import { toWebReadableStream } from "./upload"; diff --git a/packages/api/utils/upload.ts b/packages/api/utils/upload.ts index f532fc66..b82bc855 100644 --- a/packages/api/utils/upload.ts +++ b/packages/api/utils/upload.ts @@ -11,7 +11,7 @@ import { newAssetId, saveAssetFromFile, SUPPORTED_UPLOAD_ASSET_TYPES, -} from "@karakeep/shared-server/assetdb"; +} from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { AuthedContext } from "@karakeep/trpc"; diff --git a/packages/e2e_tests/package.json b/packages/e2e_tests/package.json index b1c472f4..d93318aa 100644 --- a/packages/e2e_tests/package.json +++ b/packages/e2e_tests/package.json @@ -18,7 +18,6 @@ "@aws-sdk/client-s3": "^3.842.0", "@karakeep/sdk": "workspace:*", "@karakeep/shared": "workspace:^0.1.0", - "@karakeep/shared-server": "workspace:^0.1.0", "@karakeep/trpc": "workspace:^0.1.0", "@trpc/client": "^11.9.0", "superjson": "^2.2.1", diff --git a/packages/e2e_tests/tests/assetdb/assetdb-utils.ts b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts index 85a3f307..a8e29ab4 100644 --- a/packages/e2e_tests/tests/assetdb/assetdb-utils.ts +++ b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts @@ -15,7 +15,7 @@ import { AssetStore, LocalFileSystemAssetStore, S3AssetStore, -} from "@karakeep/shared-server/assetdb"; +} from "@karakeep/shared/assetdb"; export interface TestAssetData { userId: string; diff --git a/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts index c9efaa5b..d5288c7a 100644 --- a/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts +++ b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts @@ -1,7 +1,7 @@ import * as fs from "fs"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { ASSET_TYPES, AssetStore } from "@karakeep/shared-server/assetdb"; +import { ASSET_TYPES, AssetStore } from "@karakeep/shared/assetdb"; import { assertAssetExists, diff --git a/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts index a6a7e9ca..36ff837f 100644 --- a/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts +++ b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts @@ -2,7 +2,7 @@ import * as fs from "fs"; import * as path from "path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { LocalFileSystemAssetStore } from "@karakeep/shared-server/assetdb"; +import { LocalFileSystemAssetStore } from "@karakeep/shared/assetdb"; import { assertAssetNotExists, diff --git a/packages/e2e_tests/tests/assetdb/s3-store.test.ts b/packages/e2e_tests/tests/assetdb/s3-store.test.ts index 541b8791..c573750e 100644 --- a/packages/e2e_tests/tests/assetdb/s3-store.test.ts +++ b/packages/e2e_tests/tests/assetdb/s3-store.test.ts @@ -1,7 +1,7 @@ import { HeadObjectCommand, S3Client } from "@aws-sdk/client-s3"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { S3AssetStore } from "@karakeep/shared-server/assetdb"; +import { S3AssetStore } from "@karakeep/shared/assetdb"; import { assertAssetExists, diff --git a/packages/shared-server/assetdb.ts b/packages/shared-server/assetdb.ts deleted file mode 100644 index bb6bb75e..00000000 --- a/packages/shared-server/assetdb.ts +++ /dev/null @@ -1,773 +0,0 @@ -import * as fs from "fs"; -import * as path from "path"; -import { Readable } from "stream"; -import { - _Object, - DeleteObjectCommand, - DeleteObjectsCommand, - GetObjectCommand, - HeadObjectCommand, - ListObjectsV2Command, - PutObjectCommand, - S3Client, -} from "@aws-sdk/client-s3"; -import glob from "glob"; -import { z } from "zod"; - -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { QuotaApproved } from "@karakeep/shared/storageQuota"; - -const ROOT_PATH = serverConfig.assetsDir; - -export const enum ASSET_TYPES { - IMAGE_GIF = "image/gif", - IMAGE_JPEG = "image/jpeg", - IMAGE_PNG = "image/png", - IMAGE_WEBP = "image/webp", - APPLICATION_PDF = "application/pdf", - APPLICATION_ZIP = "application/zip", - TEXT_HTML = "text/html", - - VIDEO_MP4 = "video/mp4", - VIDEO_WEBM = "video/webm", - VIDEO_MKV = "video/x-matroska", -} - -export const VIDEO_ASSET_TYPES: Set = new Set([ - ASSET_TYPES.VIDEO_MP4, - ASSET_TYPES.VIDEO_WEBM, - ASSET_TYPES.VIDEO_MKV, -]); - -export const IMAGE_ASSET_TYPES: Set = new Set([ - ASSET_TYPES.IMAGE_GIF, - ASSET_TYPES.IMAGE_JPEG, - ASSET_TYPES.IMAGE_PNG, - ASSET_TYPES.IMAGE_WEBP, -]); - -// The assets that we allow the users to upload -export const SUPPORTED_UPLOAD_ASSET_TYPES: Set = new Set([ - ...IMAGE_ASSET_TYPES, - ...VIDEO_ASSET_TYPES, - ASSET_TYPES.TEXT_HTML, - ASSET_TYPES.APPLICATION_PDF, -]); - -// The assets that we allow as a bookmark of type asset -export const SUPPORTED_BOOKMARK_ASSET_TYPES: Set = new Set([ - ...IMAGE_ASSET_TYPES, - ASSET_TYPES.APPLICATION_PDF, -]); - -// The assets that we support saving in the asset db -export const SUPPORTED_ASSET_TYPES: Set = new Set([ - ...SUPPORTED_UPLOAD_ASSET_TYPES, - ASSET_TYPES.TEXT_HTML, - ASSET_TYPES.VIDEO_MP4, - ASSET_TYPES.APPLICATION_ZIP, -]); - -export const zAssetMetadataSchema = z.object({ - contentType: z.string(), - fileName: z.string().nullish(), -}); - -export type AssetMetadata = z.infer; - -export interface AssetInfo { - userId: string; - assetId: string; - contentType: string; - fileName?: string | null; - size: number; -} - -export interface AssetStore { - saveAsset(params: { - userId: string; - assetId: string; - asset: Buffer; - metadata: AssetMetadata; - }): Promise; - - saveAssetFromFile(params: { - userId: string; - assetId: string; - assetPath: string; - metadata: AssetMetadata; - }): Promise; - - readAsset(params: { - userId: string; - assetId: string; - }): Promise<{ asset: Buffer; metadata: AssetMetadata }>; - - createAssetReadStream(params: { - userId: string; - assetId: string; - start?: number; - end?: number; - }): Promise; - - readAssetMetadata(params: { - userId: string; - assetId: string; - }): Promise; - - getAssetSize(params: { userId: string; assetId: string }): Promise; - - deleteAsset(params: { userId: string; assetId: string }): Promise; - - deleteUserAssets(params: { userId: string }): Promise; - - getAllAssets(): AsyncGenerator; -} - -export function newAssetId() { - return crypto.randomUUID(); -} - -class LocalFileSystemAssetStore implements AssetStore { - private rootPath: string; - - constructor(rootPath: string) { - this.rootPath = rootPath; - } - - private getAssetDir(userId: string, assetId: string) { - return path.join(this.rootPath, userId, assetId); - } - - private async isPathExists(filePath: string) { - return fs.promises - .access(filePath) - .then(() => true) - .catch(() => false); - } - - async saveAsset({ - userId, - assetId, - asset, - metadata, - }: { - userId: string; - assetId: string; - asset: Buffer; - metadata: AssetMetadata; - }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - const assetDir = this.getAssetDir(userId, assetId); - await fs.promises.mkdir(assetDir, { recursive: true }); - - await Promise.all([ - fs.promises.writeFile( - path.join(assetDir, "asset.bin"), - Uint8Array.from(asset), - ), - fs.promises.writeFile( - path.join(assetDir, "metadata.json"), - JSON.stringify(metadata), - ), - ]); - } - - async saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata, - }: { - userId: string; - assetId: string; - assetPath: string; - metadata: AssetMetadata; - }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - const assetDir = this.getAssetDir(userId, assetId); - await fs.promises.mkdir(assetDir, { recursive: true }); - - await Promise.all([ - fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")), - fs.promises.writeFile( - path.join(assetDir, "metadata.json"), - JSON.stringify(metadata), - ), - ]); - await fs.promises.rm(assetPath); - } - - async readAsset({ userId, assetId }: { userId: string; assetId: string }) { - const assetDir = this.getAssetDir(userId, assetId); - - const [asset, metadataStr] = await Promise.all([ - fs.promises.readFile(path.join(assetDir, "asset.bin")), - fs.promises.readFile(path.join(assetDir, "metadata.json"), { - encoding: "utf8", - }), - ]); - - const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr)); - return { asset, metadata }; - } - - async createAssetReadStream({ - userId, - assetId, - start, - end, - }: { - userId: string; - assetId: string; - start?: number; - end?: number; - }) { - const assetDir = this.getAssetDir(userId, assetId); - const assetPath = path.join(assetDir, "asset.bin"); - if (!(await this.isPathExists(assetPath))) { - throw new Error(`Asset ${assetId} not found`); - } - - return fs.createReadStream(path.join(assetDir, "asset.bin"), { - start, - end, - }); - } - - async readAssetMetadata({ - userId, - assetId, - }: { - userId: string; - assetId: string; - }) { - const assetDir = this.getAssetDir(userId, assetId); - - const metadataStr = await fs.promises.readFile( - path.join(assetDir, "metadata.json"), - { - encoding: "utf8", - }, - ); - - return zAssetMetadataSchema.parse(JSON.parse(metadataStr)); - } - - async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { - const assetDir = this.getAssetDir(userId, assetId); - const stat = await fs.promises.stat(path.join(assetDir, "asset.bin")); - return stat.size; - } - - async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { - const assetDir = this.getAssetDir(userId, assetId); - if (!(await this.isPathExists(assetDir))) { - return; - } - await fs.promises.rm(assetDir, { recursive: true }); - } - - async deleteUserAssets({ userId }: { userId: string }) { - const userDir = path.join(this.rootPath, userId); - const dirExists = await this.isPathExists(userDir); - if (!dirExists) { - return; - } - await fs.promises.rm(userDir, { recursive: true }); - } - - async *getAllAssets() { - const files = await new Promise((resolve, reject) => { - glob( - "*/*/asset.bin", - { - cwd: this.rootPath, - nodir: true, - }, - (err, matches) => { - if (err) { - reject(err); - return; - } - resolve(matches); - }, - ); - }); - - for (const file of files) { - const [userId, assetId] = file.split("/").slice(0, 2); - const [size, metadata] = await Promise.all([ - this.getAssetSize({ userId, assetId }), - this.readAssetMetadata({ userId, assetId }), - ]); - - yield { - userId, - assetId, - ...metadata, - size, - }; - } - } -} - -class S3AssetStore implements AssetStore { - private s3Client: S3Client; - private bucketName: string; - - constructor(s3Client: S3Client, bucketName: string) { - this.s3Client = s3Client; - this.bucketName = bucketName; - } - - private getAssetKey(userId: string, assetId: string) { - return `${userId}/${assetId}`; - } - - private metadataToS3Metadata( - metadata: AssetMetadata, - ): Record { - return { - ...(metadata.fileName - ? { "x-amz-meta-file-name": metadata.fileName } - : {}), - "x-amz-meta-content-type": metadata.contentType, - }; - } - - private s3MetadataToMetadata( - s3Metadata: Record | undefined, - ): AssetMetadata { - if (!s3Metadata) { - throw new Error("No metadata found in S3 object"); - } - - return { - contentType: s3Metadata["x-amz-meta-content-type"] || "", - fileName: s3Metadata["x-amz-meta-file-name"] ?? null, - }; - } - - async saveAsset({ - userId, - assetId, - asset, - metadata, - }: { - userId: string; - assetId: string; - asset: Buffer; - metadata: AssetMetadata; - }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - - await this.s3Client.send( - new PutObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - Body: asset, - ContentType: metadata.contentType, - Metadata: this.metadataToS3Metadata(metadata), - }), - ); - } - - async saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata, - }: { - userId: string; - assetId: string; - assetPath: string; - metadata: AssetMetadata; - }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - - const asset = fs.createReadStream(assetPath); - await this.s3Client.send( - new PutObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - Body: asset, - ContentType: metadata.contentType, - Metadata: this.metadataToS3Metadata(metadata), - }), - ); - await fs.promises.rm(assetPath); - } - - async readAsset({ userId, assetId }: { userId: string; assetId: string }) { - const response = await this.s3Client.send( - new GetObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - }), - ); - - if (!response.Body) { - throw new Error("Asset not found"); - } - - const assetBuffer = await this.streamToBuffer(response.Body as Readable); - const metadata = this.s3MetadataToMetadata(response.Metadata); - - return { asset: assetBuffer, metadata }; - } - - async createAssetReadStream({ - userId, - assetId, - start, - end, - }: { - userId: string; - assetId: string; - start?: number; - end?: number; - }) { - const range = - start !== undefined && end !== undefined - ? `bytes=${start}-${end}` - : undefined; - - const command = new GetObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - Range: range, - }); - - const response = await this.s3Client.send(command); - if (!response.Body) { - throw new Error("Asset not found"); - } - return response.Body as NodeJS.ReadableStream; - } - - async readAssetMetadata({ - userId, - assetId, - }: { - userId: string; - assetId: string; - }) { - const response = await this.s3Client.send( - new HeadObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - }), - ); - - return this.s3MetadataToMetadata(response.Metadata); - } - - async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { - const response = await this.s3Client.send( - new HeadObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - }), - ); - - return response.ContentLength || 0; - } - - async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { - await this.s3Client.send( - new DeleteObjectCommand({ - Bucket: this.bucketName, - Key: this.getAssetKey(userId, assetId), - }), - ); - } - - async deleteUserAssets({ userId }: { userId: string }) { - let continuationToken: string | undefined; - - do { - const listResponse = await this.s3Client.send( - new ListObjectsV2Command({ - Bucket: this.bucketName, - Prefix: `${userId}/`, - ContinuationToken: continuationToken, - }), - ); - - if (listResponse.Contents && listResponse.Contents.length > 0) { - await this.s3Client.send( - new DeleteObjectsCommand({ - Bucket: this.bucketName, - Delete: { - Objects: listResponse.Contents.map((obj) => ({ - Key: obj.Key!, - })), - }, - }), - ); - } - - continuationToken = listResponse.NextContinuationToken; - } while (continuationToken); - } - - async *getAllAssets() { - let continuationToken: string | undefined; - - do { - const listResponse = await this.s3Client.send( - new ListObjectsV2Command({ - Bucket: this.bucketName, - ContinuationToken: continuationToken, - }), - ); - - if (listResponse.Contents) { - for (const obj of listResponse.Contents) { - if (!obj.Key) continue; - - const pathParts = obj.Key.split("/"); - if (pathParts.length === 2) { - const userId = pathParts[0]; - const assetId = pathParts[1]; - - try { - const headResponse = await this.s3Client.send( - new HeadObjectCommand({ - Bucket: this.bucketName, - Key: obj.Key, - }), - ); - - const metadata = this.s3MetadataToMetadata(headResponse.Metadata); - const size = headResponse.ContentLength || 0; - - yield { - userId, - assetId, - ...metadata, - size, - }; - } catch (error) { - logger.warn(`Failed to read asset ${userId}/${assetId}:`, error); - } - } - } - } - - continuationToken = listResponse.NextContinuationToken; - } while (continuationToken); - } - - private async streamToBuffer(stream: Readable): Promise { - const chunks: Buffer[] = []; - for await (const chunk of stream) { - chunks.push(chunk); - } - return Buffer.concat(chunks); - } -} - -function createDefaultAssetStore(): AssetStore { - const config = serverConfig.assetStore; - - if (config.type === "s3") { - if (!config.s3.bucket) { - throw new Error( - "ASSET_STORE_S3_BUCKET is required when using S3 asset store", - ); - } - if (!config.s3.accessKeyId || !config.s3.secretAccessKey) { - throw new Error( - "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store", - ); - } - - const s3Client = new S3Client({ - region: config.s3.region, - endpoint: config.s3.endpoint, - forcePathStyle: config.s3.forcePathStyle, - credentials: { - accessKeyId: config.s3.accessKeyId, - secretAccessKey: config.s3.secretAccessKey, - }, - }); - - return new S3AssetStore(s3Client, config.s3.bucket); - } - - return new LocalFileSystemAssetStore(ROOT_PATH); -} - -const defaultAssetStore = createDefaultAssetStore(); - -export { LocalFileSystemAssetStore, S3AssetStore }; - -/** - * Example usage of S3AssetStore: - * - * import { S3Client } from "@aws-sdk/client-s3"; - * import { S3AssetStore } from "@karakeep/shared-server/assetdb"; - * - * const s3Client = new S3Client({ - * region: "us-east-1", - * credentials: { - * accessKeyId: "your-access-key", - * secretAccessKey: "your-secret-key" - * } - * }); - * - * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name"); - * - * // Use s3AssetStore instead of the default file system store - * await s3AssetStore.saveAsset({ - * userId: "user123", - * assetId: "asset456", - * asset: buffer, - * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" } - * }); - */ - -export async function saveAsset({ - userId, - assetId, - asset, - metadata, - quotaApproved, -}: { - userId: string; - assetId: string; - asset: Buffer; - metadata: z.infer; - quotaApproved: QuotaApproved; -}) { - // Verify the quota approval is for the correct user and size - if (quotaApproved.userId !== userId) { - throw new Error("Quota approval is for a different user"); - } - if (quotaApproved.approvedSize < asset.byteLength) { - throw new Error("Asset size exceeds approved quota"); - } - - return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata }); -} - -export async function saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata, - quotaApproved, -}: { - userId: string; - assetId: string; - assetPath: string; - metadata: z.infer; - quotaApproved: QuotaApproved; -}) { - // Verify the quota approval is for the correct user - if (quotaApproved.userId !== userId) { - throw new Error("Quota approval is for a different user"); - } - - // For file-based saves, we'll verify the file size matches the approved size - // when the underlying store implementation reads the file - - return defaultAssetStore.saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata, - }); -} - -export async function readAsset({ - userId, - assetId, -}: { - userId: string; - assetId: string; -}) { - return defaultAssetStore.readAsset({ userId, assetId }); -} - -export async function createAssetReadStream({ - userId, - assetId, - start, - end, -}: { - userId: string; - assetId: string; - start?: number; - end?: number; -}) { - return defaultAssetStore.createAssetReadStream({ - userId, - assetId, - start, - end, - }); -} - -export async function readAssetMetadata({ - userId, - assetId, -}: { - userId: string; - assetId: string; -}) { - return defaultAssetStore.readAssetMetadata({ userId, assetId }); -} - -export async function getAssetSize({ - userId, - assetId, -}: { - userId: string; - assetId: string; -}) { - return defaultAssetStore.getAssetSize({ userId, assetId }); -} - -/** - * Deletes the passed in asset if it exists and ignores any errors - * @param userId the id of the user the asset belongs to - * @param assetId the id of the asset to delete - */ -export async function silentDeleteAsset( - userId: string, - assetId: string | undefined, -) { - if (assetId) { - await deleteAsset({ userId, assetId }).catch(() => ({})); - } -} - -export async function deleteAsset({ - userId, - assetId, -}: { - userId: string; - assetId: string; -}) { - return defaultAssetStore.deleteAsset({ userId, assetId }); -} - -export async function deleteUserAssets({ userId }: { userId: string }) { - return defaultAssetStore.deleteUserAssets({ userId }); -} - -export async function* getAllAssets() { - yield* defaultAssetStore.getAllAssets(); -} diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index 0afc2bf2..357248b4 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -5,7 +5,6 @@ "private": true, "type": "module", "dependencies": { - "@aws-sdk/client-s3": "^3.842.0", "@karakeep/db": "workspace:^0.1.0", "@karakeep/plugins": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0", @@ -14,9 +13,7 @@ "@opentelemetry/resources": "^2.2.0", "@opentelemetry/sdk-trace-base": "^2.2.0", "@opentelemetry/sdk-trace-node": "^2.2.0", - "@opentelemetry/semantic-conventions": "^1.38.0", - "glob": "^11.0.0", - "zod": "^3.24.2" + "@opentelemetry/semantic-conventions": "^1.38.0" }, "devDependencies": { "@karakeep/prettier-config": "workspace:^0.1.0", @@ -32,8 +29,7 @@ }, "main": "index.ts", "exports": { - ".": "./index.ts", - "./assetdb": "./assetdb.ts" + ".": "./index.ts" }, "prettier": "@karakeep/prettier-config" } diff --git a/packages/shared/assetdb.ts b/packages/shared/assetdb.ts new file mode 100644 index 00000000..2e22faf7 --- /dev/null +++ b/packages/shared/assetdb.ts @@ -0,0 +1,761 @@ +import * as fs from "fs"; +import * as path from "path"; +import { Readable } from "stream"; +import { + _Object, + DeleteObjectCommand, + DeleteObjectsCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, +} from "@aws-sdk/client-s3"; +import { Glob } from "glob"; +import { z } from "zod"; + +import serverConfig from "./config"; +import logger from "./logger"; +import { QuotaApproved } from "./storageQuota"; + +const ROOT_PATH = serverConfig.assetsDir; + +export const enum ASSET_TYPES { + IMAGE_GIF = "image/gif", + IMAGE_JPEG = "image/jpeg", + IMAGE_PNG = "image/png", + IMAGE_WEBP = "image/webp", + APPLICATION_PDF = "application/pdf", + APPLICATION_ZIP = "application/zip", + TEXT_HTML = "text/html", + + VIDEO_MP4 = "video/mp4", + VIDEO_WEBM = "video/webm", + VIDEO_MKV = "video/x-matroska", +} + +export const VIDEO_ASSET_TYPES: Set = new Set([ + ASSET_TYPES.VIDEO_MP4, + ASSET_TYPES.VIDEO_WEBM, + ASSET_TYPES.VIDEO_MKV, +]); + +export const IMAGE_ASSET_TYPES: Set = new Set([ + ASSET_TYPES.IMAGE_GIF, + ASSET_TYPES.IMAGE_JPEG, + ASSET_TYPES.IMAGE_PNG, + ASSET_TYPES.IMAGE_WEBP, +]); + +// The assets that we allow the users to upload +export const SUPPORTED_UPLOAD_ASSET_TYPES: Set = new Set([ + ...IMAGE_ASSET_TYPES, + ...VIDEO_ASSET_TYPES, + ASSET_TYPES.TEXT_HTML, + ASSET_TYPES.APPLICATION_PDF, +]); + +// The assets that we allow as a bookmark of type asset +export const SUPPORTED_BOOKMARK_ASSET_TYPES: Set = new Set([ + ...IMAGE_ASSET_TYPES, + ASSET_TYPES.APPLICATION_PDF, +]); + +// The assets that we support saving in the asset db +export const SUPPORTED_ASSET_TYPES: Set = new Set([ + ...SUPPORTED_UPLOAD_ASSET_TYPES, + ASSET_TYPES.TEXT_HTML, + ASSET_TYPES.VIDEO_MP4, + ASSET_TYPES.APPLICATION_ZIP, +]); + +export const zAssetMetadataSchema = z.object({ + contentType: z.string(), + fileName: z.string().nullish(), +}); + +export type AssetMetadata = z.infer; + +export interface AssetInfo { + userId: string; + assetId: string; + contentType: string; + fileName?: string | null; + size: number; +} + +export interface AssetStore { + saveAsset(params: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }): Promise; + + saveAssetFromFile(params: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }): Promise; + + readAsset(params: { + userId: string; + assetId: string; + }): Promise<{ asset: Buffer; metadata: AssetMetadata }>; + + createAssetReadStream(params: { + userId: string; + assetId: string; + start?: number; + end?: number; + }): Promise; + + readAssetMetadata(params: { + userId: string; + assetId: string; + }): Promise; + + getAssetSize(params: { userId: string; assetId: string }): Promise; + + deleteAsset(params: { userId: string; assetId: string }): Promise; + + deleteUserAssets(params: { userId: string }): Promise; + + getAllAssets(): AsyncGenerator; +} + +export function newAssetId() { + return crypto.randomUUID(); +} + +class LocalFileSystemAssetStore implements AssetStore { + private rootPath: string; + + constructor(rootPath: string) { + this.rootPath = rootPath; + } + + private getAssetDir(userId: string, assetId: string) { + return path.join(this.rootPath, userId, assetId); + } + + private async isPathExists(filePath: string) { + return fs.promises + .access(filePath) + .then(() => true) + .catch(() => false); + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.writeFile( + path.join(assetDir, "asset.bin"), + Uint8Array.from(asset), + ), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + + const [asset, metadataStr] = await Promise.all([ + fs.promises.readFile(path.join(assetDir, "asset.bin")), + fs.promises.readFile(path.join(assetDir, "metadata.json"), { + encoding: "utf8", + }), + ]); + + const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + return { asset, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const assetDir = this.getAssetDir(userId, assetId); + const assetPath = path.join(assetDir, "asset.bin"); + if (!(await this.isPathExists(assetPath))) { + throw new Error(`Asset ${assetId} not found`); + } + + return fs.createReadStream(path.join(assetDir, "asset.bin"), { + start, + end, + }); + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const assetDir = this.getAssetDir(userId, assetId); + + const metadataStr = await fs.promises.readFile( + path.join(assetDir, "metadata.json"), + { + encoding: "utf8", + }, + ); + + return zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + const stat = await fs.promises.stat(path.join(assetDir, "asset.bin")); + return stat.size; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + if (!(await this.isPathExists(assetDir))) { + return; + } + await fs.promises.rm(assetDir, { recursive: true }); + } + + async deleteUserAssets({ userId }: { userId: string }) { + const userDir = path.join(this.rootPath, userId); + const dirExists = await this.isPathExists(userDir); + if (!dirExists) { + return; + } + await fs.promises.rm(userDir, { recursive: true }); + } + + async *getAllAssets() { + const g = new Glob(`/**/**/asset.bin`, { + maxDepth: 3, + root: this.rootPath, + cwd: this.rootPath, + absolute: false, + }); + for await (const file of g) { + const [userId, assetId] = file.split("/").slice(0, 2); + const [size, metadata] = await Promise.all([ + this.getAssetSize({ userId, assetId }), + this.readAssetMetadata({ userId, assetId }), + ]); + yield { + userId, + assetId, + ...metadata, + size, + }; + } + } +} + +class S3AssetStore implements AssetStore { + private s3Client: S3Client; + private bucketName: string; + + constructor(s3Client: S3Client, bucketName: string) { + this.s3Client = s3Client; + this.bucketName = bucketName; + } + + private getAssetKey(userId: string, assetId: string) { + return `${userId}/${assetId}`; + } + + private metadataToS3Metadata( + metadata: AssetMetadata, + ): Record { + return { + ...(metadata.fileName + ? { "x-amz-meta-file-name": metadata.fileName } + : {}), + "x-amz-meta-content-type": metadata.contentType, + }; + } + + private s3MetadataToMetadata( + s3Metadata: Record | undefined, + ): AssetMetadata { + if (!s3Metadata) { + throw new Error("No metadata found in S3 object"); + } + + return { + contentType: s3Metadata["x-amz-meta-content-type"] || "", + fileName: s3Metadata["x-amz-meta-file-name"] ?? null, + }; + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + const asset = fs.createReadStream(assetPath); + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + if (!response.Body) { + throw new Error("Asset not found"); + } + + const assetBuffer = await this.streamToBuffer(response.Body as Readable); + const metadata = this.s3MetadataToMetadata(response.Metadata); + + return { asset: assetBuffer, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const range = + start !== undefined && end !== undefined + ? `bytes=${start}-${end}` + : undefined; + + const command = new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Range: range, + }); + + const response = await this.s3Client.send(command); + if (!response.Body) { + throw new Error("Asset not found"); + } + return response.Body as NodeJS.ReadableStream; + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return this.s3MetadataToMetadata(response.Metadata); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return response.ContentLength || 0; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + await this.s3Client.send( + new DeleteObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + } + + async deleteUserAssets({ userId }: { userId: string }) { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + Prefix: `${userId}/`, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents && listResponse.Contents.length > 0) { + await this.s3Client.send( + new DeleteObjectsCommand({ + Bucket: this.bucketName, + Delete: { + Objects: listResponse.Contents.map((obj) => ({ + Key: obj.Key!, + })), + }, + }), + ); + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + async *getAllAssets() { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents) { + for (const obj of listResponse.Contents) { + if (!obj.Key) continue; + + const pathParts = obj.Key.split("/"); + if (pathParts.length === 2) { + const userId = pathParts[0]; + const assetId = pathParts[1]; + + try { + const headResponse = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: obj.Key, + }), + ); + + const metadata = this.s3MetadataToMetadata(headResponse.Metadata); + const size = headResponse.ContentLength || 0; + + yield { + userId, + assetId, + ...metadata, + size, + }; + } catch (error) { + logger.warn(`Failed to read asset ${userId}/${assetId}:`, error); + } + } + } + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + private async streamToBuffer(stream: Readable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + return Buffer.concat(chunks); + } +} + +function createDefaultAssetStore(): AssetStore { + const config = serverConfig.assetStore; + + if (config.type === "s3") { + if (!config.s3.bucket) { + throw new Error( + "ASSET_STORE_S3_BUCKET is required when using S3 asset store", + ); + } + if (!config.s3.accessKeyId || !config.s3.secretAccessKey) { + throw new Error( + "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store", + ); + } + + const s3Client = new S3Client({ + region: config.s3.region, + endpoint: config.s3.endpoint, + forcePathStyle: config.s3.forcePathStyle, + credentials: { + accessKeyId: config.s3.accessKeyId, + secretAccessKey: config.s3.secretAccessKey, + }, + }); + + return new S3AssetStore(s3Client, config.s3.bucket); + } + + return new LocalFileSystemAssetStore(ROOT_PATH); +} + +const defaultAssetStore = createDefaultAssetStore(); + +export { LocalFileSystemAssetStore, S3AssetStore }; + +/** + * Example usage of S3AssetStore: + * + * import { S3Client } from "@aws-sdk/client-s3"; + * import { S3AssetStore } from "@karakeep/shared/assetdb"; + * + * const s3Client = new S3Client({ + * region: "us-east-1", + * credentials: { + * accessKeyId: "your-access-key", + * secretAccessKey: "your-secret-key" + * } + * }); + * + * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name"); + * + * // Use s3AssetStore instead of the default file system store + * await s3AssetStore.saveAsset({ + * userId: "user123", + * assetId: "asset456", + * asset: buffer, + * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" } + * }); + */ + +export async function saveAsset({ + userId, + assetId, + asset, + metadata, + quotaApproved, +}: { + userId: string; + assetId: string; + asset: Buffer; + metadata: z.infer; + quotaApproved: QuotaApproved; +}) { + // Verify the quota approval is for the correct user and size + if (quotaApproved.userId !== userId) { + throw new Error("Quota approval is for a different user"); + } + if (quotaApproved.approvedSize < asset.byteLength) { + throw new Error("Asset size exceeds approved quota"); + } + + return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata }); +} + +export async function saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + quotaApproved, +}: { + userId: string; + assetId: string; + assetPath: string; + metadata: z.infer; + quotaApproved: QuotaApproved; +}) { + // Verify the quota approval is for the correct user + if (quotaApproved.userId !== userId) { + throw new Error("Quota approval is for a different user"); + } + + // For file-based saves, we'll verify the file size matches the approved size + // when the underlying store implementation reads the file + + return defaultAssetStore.saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }); +} + +export async function readAsset({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.readAsset({ userId, assetId }); +} + +export async function createAssetReadStream({ + userId, + assetId, + start, + end, +}: { + userId: string; + assetId: string; + start?: number; + end?: number; +}) { + return defaultAssetStore.createAssetReadStream({ + userId, + assetId, + start, + end, + }); +} + +export async function readAssetMetadata({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.readAssetMetadata({ userId, assetId }); +} + +export async function getAssetSize({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.getAssetSize({ userId, assetId }); +} + +/** + * Deletes the passed in asset if it exists and ignores any errors + * @param userId the id of the user the asset belongs to + * @param assetId the id of the asset to delete + */ +export async function silentDeleteAsset( + userId: string, + assetId: string | undefined, +) { + if (assetId) { + await deleteAsset({ userId, assetId }).catch(() => ({})); + } +} + +export async function deleteAsset({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.deleteAsset({ userId, assetId }); +} + +export async function deleteUserAssets({ userId }: { userId: string }) { + return defaultAssetStore.deleteUserAssets({ userId }); +} + +export async function* getAllAssets() { + yield* defaultAssetStore.getAllAssets(); +} diff --git a/packages/shared/package.json b/packages/shared/package.json index a0f5608e..93739354 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -5,6 +5,8 @@ "private": true, "type": "module", "dependencies": { + "@aws-sdk/client-s3": "^3.842.0", + "glob": "^11.0.0", "html-to-text": "^9.0.5", "js-tiktoken": "^1.0.20", "nodemailer": "^7.0.4", diff --git a/packages/trpc/models/assets.ts b/packages/trpc/models/assets.ts index 63342896..f97cfffb 100644 --- a/packages/trpc/models/assets.ts +++ b/packages/trpc/models/assets.ts @@ -3,7 +3,7 @@ import { and, desc, eq, sql } from "drizzle-orm"; import { z } from "zod"; import { assets } from "@karakeep/db/schema"; -import { deleteAsset } from "@karakeep/shared-server/assetdb"; +import { deleteAsset } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { createSignedToken } from "@karakeep/shared/signedTokens"; import { zAssetSignedTokenSchema } from "@karakeep/shared/types/assets"; diff --git a/packages/trpc/models/backups.ts b/packages/trpc/models/backups.ts index 2fd17301..c7ab99ba 100644 --- a/packages/trpc/models/backups.ts +++ b/packages/trpc/models/backups.ts @@ -4,7 +4,7 @@ import { z } from "zod"; import { assets, backupsTable } from "@karakeep/db/schema"; import { BackupQueue } from "@karakeep/shared-server"; -import { deleteAsset } from "@karakeep/shared-server/assetdb"; +import { deleteAsset } from "@karakeep/shared/assetdb"; import { zBackupSchema } from "@karakeep/shared/types/backups"; import { AuthedContext } from ".."; diff --git a/packages/trpc/models/bookmarks.ts b/packages/trpc/models/bookmarks.ts index e0669787..c8cd1f00 100644 --- a/packages/trpc/models/bookmarks.ts +++ b/packages/trpc/models/bookmarks.ts @@ -30,7 +30,7 @@ import { tagsOnBookmarks, } from "@karakeep/db/schema"; import { SearchIndexingQueue, triggerWebhook } from "@karakeep/shared-server"; -import { deleteAsset, readAsset } from "@karakeep/shared-server/assetdb"; +import { deleteAsset, readAsset } from "@karakeep/shared/assetdb"; import { getAlignedExpiry } from "@karakeep/shared/signedTokens"; import { BookmarkTypes, diff --git a/packages/trpc/models/users.ts b/packages/trpc/models/users.ts index c799f390..3340956a 100644 --- a/packages/trpc/models/users.ts +++ b/packages/trpc/models/users.ts @@ -18,7 +18,7 @@ import { users, verificationTokens, } from "@karakeep/db/schema"; -import { deleteAsset, deleteUserAssets } from "@karakeep/shared-server/assetdb"; +import { deleteAsset, deleteUserAssets } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { zResetPasswordSchema, diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index db29bf02..782566cf 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -27,7 +27,7 @@ import { triggerSearchReindex, triggerWebhook, } from "@karakeep/shared-server"; -import { SUPPORTED_BOOKMARK_ASSET_TYPES } from "@karakeep/shared-server/assetdb"; +import { SUPPORTED_BOOKMARK_ASSET_TYPES } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { InferenceClientFactory } from "@karakeep/shared/inference"; import { buildSummaryPrompt } from "@karakeep/shared/prompts.server"; -- cgit v1.2.3-70-g09d2