diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-09 01:22:30 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2026-02-09 01:22:30 +0000 |
| commit | a04d3c35fc9082e529a713605a038d236bb072c7 (patch) | |
| tree | 3e69735f9762c1739402afe30c26961b7d8e598a /packages/shared-server | |
| parent | 298a4277da7376ee6e7471f180c0030f35db49e4 (diff) | |
| download | karakeep-a04d3c35fc9082e529a713605a038d236bb072c7.tar.zst | |
refactor: move assetdb to shared-server
Diffstat (limited to 'packages/shared-server')
| -rw-r--r-- | packages/shared-server/assetdb.ts | 773 | ||||
| -rw-r--r-- | packages/shared-server/package.json | 8 |
2 files changed, 779 insertions, 2 deletions
diff --git a/packages/shared-server/assetdb.ts b/packages/shared-server/assetdb.ts new file mode 100644 index 00000000..bb6bb75e --- /dev/null +++ b/packages/shared-server/assetdb.ts @@ -0,0 +1,773 @@ +import * as fs from "fs"; +import * as path from "path"; +import { Readable } from "stream"; +import { + _Object, + DeleteObjectCommand, + DeleteObjectsCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, +} from "@aws-sdk/client-s3"; +import glob from "glob"; +import { z } from "zod"; + +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { QuotaApproved } from "@karakeep/shared/storageQuota"; + +const ROOT_PATH = serverConfig.assetsDir; + +export const enum ASSET_TYPES { + IMAGE_GIF = "image/gif", + IMAGE_JPEG = "image/jpeg", + IMAGE_PNG = "image/png", + IMAGE_WEBP = "image/webp", + APPLICATION_PDF = "application/pdf", + APPLICATION_ZIP = "application/zip", + TEXT_HTML = "text/html", + + VIDEO_MP4 = "video/mp4", + VIDEO_WEBM = "video/webm", + VIDEO_MKV = "video/x-matroska", +} + +export const VIDEO_ASSET_TYPES: Set<string> = new Set<string>([ + ASSET_TYPES.VIDEO_MP4, + ASSET_TYPES.VIDEO_WEBM, + ASSET_TYPES.VIDEO_MKV, +]); + +export const IMAGE_ASSET_TYPES: Set<string> = new Set<string>([ + ASSET_TYPES.IMAGE_GIF, + ASSET_TYPES.IMAGE_JPEG, + ASSET_TYPES.IMAGE_PNG, + ASSET_TYPES.IMAGE_WEBP, +]); + +// The assets that we allow the users to upload +export const SUPPORTED_UPLOAD_ASSET_TYPES: Set<string> = new Set<string>([ + ...IMAGE_ASSET_TYPES, + ...VIDEO_ASSET_TYPES, + ASSET_TYPES.TEXT_HTML, + ASSET_TYPES.APPLICATION_PDF, +]); + +// The assets that we allow as a bookmark of type asset +export const SUPPORTED_BOOKMARK_ASSET_TYPES: Set<string> = new Set<string>([ + ...IMAGE_ASSET_TYPES, + ASSET_TYPES.APPLICATION_PDF, +]); + +// The assets that we support saving in the asset db +export const SUPPORTED_ASSET_TYPES: Set<string> = new Set<string>([ + ...SUPPORTED_UPLOAD_ASSET_TYPES, + ASSET_TYPES.TEXT_HTML, + ASSET_TYPES.VIDEO_MP4, + ASSET_TYPES.APPLICATION_ZIP, +]); + +export const zAssetMetadataSchema = z.object({ + contentType: z.string(), + fileName: z.string().nullish(), +}); + +export type AssetMetadata = z.infer<typeof zAssetMetadataSchema>; + +export interface AssetInfo { + userId: string; + assetId: string; + contentType: string; + fileName?: string | null; + size: number; +} + +export interface AssetStore { + saveAsset(params: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }): Promise<void>; + + saveAssetFromFile(params: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }): Promise<void>; + + readAsset(params: { + userId: string; + assetId: string; + }): Promise<{ asset: Buffer; metadata: AssetMetadata }>; + + createAssetReadStream(params: { + userId: string; + assetId: string; + start?: number; + end?: number; + }): Promise<NodeJS.ReadableStream>; + + readAssetMetadata(params: { + userId: string; + assetId: string; + }): Promise<AssetMetadata>; + + getAssetSize(params: { userId: string; assetId: string }): Promise<number>; + + deleteAsset(params: { userId: string; assetId: string }): Promise<void>; + + deleteUserAssets(params: { userId: string }): Promise<void>; + + getAllAssets(): AsyncGenerator<AssetInfo>; +} + +export function newAssetId() { + return crypto.randomUUID(); +} + +class LocalFileSystemAssetStore implements AssetStore { + private rootPath: string; + + constructor(rootPath: string) { + this.rootPath = rootPath; + } + + private getAssetDir(userId: string, assetId: string) { + return path.join(this.rootPath, userId, assetId); + } + + private async isPathExists(filePath: string) { + return fs.promises + .access(filePath) + .then(() => true) + .catch(() => false); + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.writeFile( + path.join(assetDir, "asset.bin"), + Uint8Array.from(asset), + ), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + + const [asset, metadataStr] = await Promise.all([ + fs.promises.readFile(path.join(assetDir, "asset.bin")), + fs.promises.readFile(path.join(assetDir, "metadata.json"), { + encoding: "utf8", + }), + ]); + + const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + return { asset, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const assetDir = this.getAssetDir(userId, assetId); + const assetPath = path.join(assetDir, "asset.bin"); + if (!(await this.isPathExists(assetPath))) { + throw new Error(`Asset ${assetId} not found`); + } + + return fs.createReadStream(path.join(assetDir, "asset.bin"), { + start, + end, + }); + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const assetDir = this.getAssetDir(userId, assetId); + + const metadataStr = await fs.promises.readFile( + path.join(assetDir, "metadata.json"), + { + encoding: "utf8", + }, + ); + + return zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + const stat = await fs.promises.stat(path.join(assetDir, "asset.bin")); + return stat.size; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + if (!(await this.isPathExists(assetDir))) { + return; + } + await fs.promises.rm(assetDir, { recursive: true }); + } + + async deleteUserAssets({ userId }: { userId: string }) { + const userDir = path.join(this.rootPath, userId); + const dirExists = await this.isPathExists(userDir); + if (!dirExists) { + return; + } + await fs.promises.rm(userDir, { recursive: true }); + } + + async *getAllAssets() { + const files = await new Promise<string[]>((resolve, reject) => { + glob( + "*/*/asset.bin", + { + cwd: this.rootPath, + nodir: true, + }, + (err, matches) => { + if (err) { + reject(err); + return; + } + resolve(matches); + }, + ); + }); + + for (const file of files) { + const [userId, assetId] = file.split("/").slice(0, 2); + const [size, metadata] = await Promise.all([ + this.getAssetSize({ userId, assetId }), + this.readAssetMetadata({ userId, assetId }), + ]); + + yield { + userId, + assetId, + ...metadata, + size, + }; + } + } +} + +class S3AssetStore implements AssetStore { + private s3Client: S3Client; + private bucketName: string; + + constructor(s3Client: S3Client, bucketName: string) { + this.s3Client = s3Client; + this.bucketName = bucketName; + } + + private getAssetKey(userId: string, assetId: string) { + return `${userId}/${assetId}`; + } + + private metadataToS3Metadata( + metadata: AssetMetadata, + ): Record<string, string> { + return { + ...(metadata.fileName + ? { "x-amz-meta-file-name": metadata.fileName } + : {}), + "x-amz-meta-content-type": metadata.contentType, + }; + } + + private s3MetadataToMetadata( + s3Metadata: Record<string, string> | undefined, + ): AssetMetadata { + if (!s3Metadata) { + throw new Error("No metadata found in S3 object"); + } + + return { + contentType: s3Metadata["x-amz-meta-content-type"] || "", + fileName: s3Metadata["x-amz-meta-file-name"] ?? null, + }; + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + const asset = fs.createReadStream(assetPath); + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + if (!response.Body) { + throw new Error("Asset not found"); + } + + const assetBuffer = await this.streamToBuffer(response.Body as Readable); + const metadata = this.s3MetadataToMetadata(response.Metadata); + + return { asset: assetBuffer, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const range = + start !== undefined && end !== undefined + ? `bytes=${start}-${end}` + : undefined; + + const command = new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Range: range, + }); + + const response = await this.s3Client.send(command); + if (!response.Body) { + throw new Error("Asset not found"); + } + return response.Body as NodeJS.ReadableStream; + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return this.s3MetadataToMetadata(response.Metadata); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return response.ContentLength || 0; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + await this.s3Client.send( + new DeleteObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + } + + async deleteUserAssets({ userId }: { userId: string }) { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + Prefix: `${userId}/`, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents && listResponse.Contents.length > 0) { + await this.s3Client.send( + new DeleteObjectsCommand({ + Bucket: this.bucketName, + Delete: { + Objects: listResponse.Contents.map((obj) => ({ + Key: obj.Key!, + })), + }, + }), + ); + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + async *getAllAssets() { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents) { + for (const obj of listResponse.Contents) { + if (!obj.Key) continue; + + const pathParts = obj.Key.split("/"); + if (pathParts.length === 2) { + const userId = pathParts[0]; + const assetId = pathParts[1]; + + try { + const headResponse = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: obj.Key, + }), + ); + + const metadata = this.s3MetadataToMetadata(headResponse.Metadata); + const size = headResponse.ContentLength || 0; + + yield { + userId, + assetId, + ...metadata, + size, + }; + } catch (error) { + logger.warn(`Failed to read asset ${userId}/${assetId}:`, error); + } + } + } + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + private async streamToBuffer(stream: Readable): Promise<Buffer> { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + return Buffer.concat(chunks); + } +} + +function createDefaultAssetStore(): AssetStore { + const config = serverConfig.assetStore; + + if (config.type === "s3") { + if (!config.s3.bucket) { + throw new Error( + "ASSET_STORE_S3_BUCKET is required when using S3 asset store", + ); + } + if (!config.s3.accessKeyId || !config.s3.secretAccessKey) { + throw new Error( + "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store", + ); + } + + const s3Client = new S3Client({ + region: config.s3.region, + endpoint: config.s3.endpoint, + forcePathStyle: config.s3.forcePathStyle, + credentials: { + accessKeyId: config.s3.accessKeyId, + secretAccessKey: config.s3.secretAccessKey, + }, + }); + + return new S3AssetStore(s3Client, config.s3.bucket); + } + + return new LocalFileSystemAssetStore(ROOT_PATH); +} + +const defaultAssetStore = createDefaultAssetStore(); + +export { LocalFileSystemAssetStore, S3AssetStore }; + +/** + * Example usage of S3AssetStore: + * + * import { S3Client } from "@aws-sdk/client-s3"; + * import { S3AssetStore } from "@karakeep/shared-server/assetdb"; + * + * const s3Client = new S3Client({ + * region: "us-east-1", + * credentials: { + * accessKeyId: "your-access-key", + * secretAccessKey: "your-secret-key" + * } + * }); + * + * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name"); + * + * // Use s3AssetStore instead of the default file system store + * await s3AssetStore.saveAsset({ + * userId: "user123", + * assetId: "asset456", + * asset: buffer, + * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" } + * }); + */ + +export async function saveAsset({ + userId, + assetId, + asset, + metadata, + quotaApproved, +}: { + userId: string; + assetId: string; + asset: Buffer; + metadata: z.infer<typeof zAssetMetadataSchema>; + quotaApproved: QuotaApproved; +}) { + // Verify the quota approval is for the correct user and size + if (quotaApproved.userId !== userId) { + throw new Error("Quota approval is for a different user"); + } + if (quotaApproved.approvedSize < asset.byteLength) { + throw new Error("Asset size exceeds approved quota"); + } + + return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata }); +} + +export async function saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + quotaApproved, +}: { + userId: string; + assetId: string; + assetPath: string; + metadata: z.infer<typeof zAssetMetadataSchema>; + quotaApproved: QuotaApproved; +}) { + // Verify the quota approval is for the correct user + if (quotaApproved.userId !== userId) { + throw new Error("Quota approval is for a different user"); + } + + // For file-based saves, we'll verify the file size matches the approved size + // when the underlying store implementation reads the file + + return defaultAssetStore.saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }); +} + +export async function readAsset({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.readAsset({ userId, assetId }); +} + +export async function createAssetReadStream({ + userId, + assetId, + start, + end, +}: { + userId: string; + assetId: string; + start?: number; + end?: number; +}) { + return defaultAssetStore.createAssetReadStream({ + userId, + assetId, + start, + end, + }); +} + +export async function readAssetMetadata({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.readAssetMetadata({ userId, assetId }); +} + +export async function getAssetSize({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.getAssetSize({ userId, assetId }); +} + +/** + * Deletes the passed in asset if it exists and ignores any errors + * @param userId the id of the user the asset belongs to + * @param assetId the id of the asset to delete + */ +export async function silentDeleteAsset( + userId: string, + assetId: string | undefined, +) { + if (assetId) { + await deleteAsset({ userId, assetId }).catch(() => ({})); + } +} + +export async function deleteAsset({ + userId, + assetId, +}: { + userId: string; + assetId: string; +}) { + return defaultAssetStore.deleteAsset({ userId, assetId }); +} + +export async function deleteUserAssets({ userId }: { userId: string }) { + return defaultAssetStore.deleteUserAssets({ userId }); +} + +export async function* getAllAssets() { + yield* defaultAssetStore.getAllAssets(); +} diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index 357248b4..0afc2bf2 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -5,6 +5,7 @@ "private": true, "type": "module", "dependencies": { + "@aws-sdk/client-s3": "^3.842.0", "@karakeep/db": "workspace:^0.1.0", "@karakeep/plugins": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0", @@ -13,7 +14,9 @@ "@opentelemetry/resources": "^2.2.0", "@opentelemetry/sdk-trace-base": "^2.2.0", "@opentelemetry/sdk-trace-node": "^2.2.0", - "@opentelemetry/semantic-conventions": "^1.38.0" + "@opentelemetry/semantic-conventions": "^1.38.0", + "glob": "^11.0.0", + "zod": "^3.24.2" }, "devDependencies": { "@karakeep/prettier-config": "workspace:^0.1.0", @@ -29,7 +32,8 @@ }, "main": "index.ts", "exports": { - ".": "./index.ts" + ".": "./index.ts", + "./assetdb": "./assetdb.ts" }, "prettier": "@karakeep/prettier-config" } |
