aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared/assetdb.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-07-04 23:58:42 +0100
committerGitHub <noreply@github.com>2025-07-04 23:58:42 +0100
commitd66b3b8619e8fff36c0243f7cc67eef864c5009b (patch)
tree6f555ad31cfc44aebffab1db3edb6134c10878d0 /packages/shared/assetdb.ts
parent53b6b3c24d9669ba240c1f9c5fb58672b6cf8666 (diff)
downloadkarakeep-d66b3b8619e8fff36c0243f7cc67eef864c5009b.tar.zst
feat: Add support for S3 as an asset storage layer (#1703)
* feat: Add support for S3 as an asset storage layer. Fixes #305 * some minor fixes * use bulk deletion api * stream the file to s3
Diffstat (limited to 'packages/shared/assetdb.ts')
-rw-r--r--packages/shared/assetdb.ts670
1 files changed, 576 insertions, 94 deletions
diff --git a/packages/shared/assetdb.ts b/packages/shared/assetdb.ts
index 3ac92067..77050406 100644
--- a/packages/shared/assetdb.ts
+++ b/packages/shared/assetdb.ts
@@ -1,5 +1,16 @@
import * as fs from "fs";
import * as path from "path";
+import { Readable } from "stream";
+import {
+ _Object,
+ DeleteObjectCommand,
+ DeleteObjectsCommand,
+ GetObjectCommand,
+ HeadObjectCommand,
+ ListObjectsV2Command,
+ PutObjectCommand,
+ S3Client,
+} from "@aws-sdk/client-s3";
import { Glob } from "glob";
import { z } from "zod";
@@ -43,19 +54,563 @@ export const SUPPORTED_ASSET_TYPES: Set<string> = new Set<string>([
ASSET_TYPES.VIDEO_MP4,
]);
-function getAssetDir(userId: string, assetId: string) {
- return path.join(ROOT_PATH, userId, assetId);
-}
-
export const zAssetMetadataSchema = z.object({
contentType: z.string(),
fileName: z.string().nullish(),
});
+export type AssetMetadata = z.infer<typeof zAssetMetadataSchema>;
+
+export interface AssetInfo {
+ userId: string;
+ assetId: string;
+ contentType: string;
+ fileName?: string | null;
+ size: number;
+}
+
+export interface AssetStore {
+ saveAsset(params: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ saveAssetFromFile(params: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ readAsset(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<{ asset: Buffer; metadata: AssetMetadata }>;
+
+ createAssetReadStream(params: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }): Promise<NodeJS.ReadableStream>;
+
+ readAssetMetadata(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<AssetMetadata>;
+
+ getAssetSize(params: { userId: string; assetId: string }): Promise<number>;
+
+ deleteAsset(params: { userId: string; assetId: string }): Promise<void>;
+
+ deleteUserAssets(params: { userId: string }): Promise<void>;
+
+ getAllAssets(): AsyncGenerator<AssetInfo>;
+}
+
export function newAssetId() {
return crypto.randomUUID();
}
+class LocalFileSystemAssetStore implements AssetStore {
+ private rootPath: string;
+
+ constructor(rootPath: string) {
+ this.rootPath = rootPath;
+ }
+
+ private getAssetDir(userId: string, assetId: string) {
+ return path.join(this.rootPath, userId, assetId);
+ }
+
+ private async isPathExists(filePath: string) {
+ return fs.promises
+ .access(filePath)
+ .then(() => true)
+ .catch(() => false);
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.writeFile(
+ path.join(assetDir, "asset.bin"),
+ Uint8Array.from(asset),
+ ),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const [asset, metadataStr] = await Promise.all([
+ fs.promises.readFile(path.join(assetDir, "asset.bin")),
+ fs.promises.readFile(path.join(assetDir, "metadata.json"), {
+ encoding: "utf8",
+ }),
+ ]);
+
+ const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ return { asset, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const assetPath = path.join(assetDir, "asset.bin");
+ if (!(await this.isPathExists(assetPath))) {
+ throw new Error(`Asset ${assetId} not found`);
+ }
+
+ return fs.createReadStream(path.join(assetDir, "asset.bin"), {
+ start,
+ end,
+ });
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const metadataStr = await fs.promises.readFile(
+ path.join(assetDir, "metadata.json"),
+ {
+ encoding: "utf8",
+ },
+ );
+
+ return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const stat = await fs.promises.stat(path.join(assetDir, "asset.bin"));
+ return stat.size;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ if (!(await this.isPathExists(assetDir))) {
+ return;
+ }
+ await fs.promises.rm(assetDir, { recursive: true });
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ const userDir = path.join(this.rootPath, userId);
+ const dirExists = await this.isPathExists(userDir);
+ if (!dirExists) {
+ return;
+ }
+ await fs.promises.rm(userDir, { recursive: true });
+ }
+
+ async *getAllAssets() {
+ const g = new Glob(`/**/**/asset.bin`, {
+ maxDepth: 3,
+ root: this.rootPath,
+ cwd: this.rootPath,
+ absolute: false,
+ });
+ for await (const file of g) {
+ const [userId, assetId] = file.split("/").slice(0, 2);
+ const [size, metadata] = await Promise.all([
+ this.getAssetSize({ userId, assetId }),
+ this.readAssetMetadata({ userId, assetId }),
+ ]);
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ }
+ }
+}
+
+class S3AssetStore implements AssetStore {
+ private s3Client: S3Client;
+ private bucketName: string;
+
+ constructor(s3Client: S3Client, bucketName: string) {
+ this.s3Client = s3Client;
+ this.bucketName = bucketName;
+ }
+
+ private getAssetKey(userId: string, assetId: string) {
+ return `${userId}/${assetId}`;
+ }
+
+ private metadataToS3Metadata(
+ metadata: AssetMetadata,
+ ): Record<string, string> {
+ return {
+ ...(metadata.fileName
+ ? { "x-amz-meta-file-name": metadata.fileName }
+ : {}),
+ "x-amz-meta-content-type": metadata.contentType,
+ };
+ }
+
+ private s3MetadataToMetadata(
+ s3Metadata: Record<string, string> | undefined,
+ ): AssetMetadata {
+ if (!s3Metadata) {
+ throw new Error("No metadata found in S3 object");
+ }
+
+ return {
+ contentType: s3Metadata["x-amz-meta-content-type"] || "",
+ fileName: s3Metadata["x-amz-meta-file-name"] ?? null,
+ };
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ const asset = fs.createReadStream(assetPath);
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+
+ const assetBuffer = await this.streamToBuffer(response.Body as Readable);
+ const metadata = this.s3MetadataToMetadata(response.Metadata);
+
+ return { asset: assetBuffer, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const range =
+ start !== undefined && end !== undefined
+ ? `bytes=${start}-${end}`
+ : undefined;
+
+ const command = new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Range: range,
+ });
+
+ const response = await this.s3Client.send(command);
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+ return response.Body as NodeJS.ReadableStream;
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return this.s3MetadataToMetadata(response.Metadata);
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return response.ContentLength || 0;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ await this.s3Client.send(
+ new DeleteObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ Prefix: `${userId}/`,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents && listResponse.Contents.length > 0) {
+ await this.s3Client.send(
+ new DeleteObjectsCommand({
+ Bucket: this.bucketName,
+ Delete: {
+ Objects: listResponse.Contents.map((obj) => ({
+ Key: obj.Key!,
+ })),
+ },
+ }),
+ );
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ async *getAllAssets() {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents) {
+ for (const obj of listResponse.Contents) {
+ if (!obj.Key) continue;
+
+ const pathParts = obj.Key.split("/");
+ if (pathParts.length === 2) {
+ const userId = pathParts[0];
+ const assetId = pathParts[1];
+
+ try {
+ const headResponse = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: obj.Key,
+ }),
+ );
+
+ const metadata = this.s3MetadataToMetadata(headResponse.Metadata);
+ const size = headResponse.ContentLength || 0;
+
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ } catch (error) {
+ logger.warn(`Failed to read asset ${userId}/${assetId}:`, error);
+ }
+ }
+ }
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ private async streamToBuffer(stream: Readable): Promise<Buffer> {
+ const chunks: Buffer[] = [];
+ for await (const chunk of stream) {
+ chunks.push(chunk);
+ }
+ return Buffer.concat(chunks);
+ }
+}
+
+function createDefaultAssetStore(): AssetStore {
+ const config = serverConfig.assetStore;
+
+ if (config.type === "s3") {
+ if (!config.s3.bucket) {
+ throw new Error(
+ "ASSET_STORE_S3_BUCKET is required when using S3 asset store",
+ );
+ }
+ if (!config.s3.accessKeyId || !config.s3.secretAccessKey) {
+ throw new Error(
+ "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store",
+ );
+ }
+
+ const s3Client = new S3Client({
+ region: config.s3.region,
+ endpoint: config.s3.endpoint,
+ forcePathStyle: config.s3.forcePathStyle,
+ credentials: {
+ accessKeyId: config.s3.accessKeyId,
+ secretAccessKey: config.s3.secretAccessKey,
+ },
+ });
+
+ return new S3AssetStore(s3Client, config.s3.bucket);
+ }
+
+ return new LocalFileSystemAssetStore(ROOT_PATH);
+}
+
+const defaultAssetStore = createDefaultAssetStore();
+
+export { LocalFileSystemAssetStore, S3AssetStore };
+
+/**
+ * Example usage of S3AssetStore:
+ *
+ * import { S3Client } from "@aws-sdk/client-s3";
+ * import { S3AssetStore } from "@karakeep/shared/assetdb";
+ *
+ * const s3Client = new S3Client({
+ * region: "us-east-1",
+ * credentials: {
+ * accessKeyId: "your-access-key",
+ * secretAccessKey: "your-secret-key"
+ * }
+ * });
+ *
+ * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name");
+ *
+ * // Use s3AssetStore instead of the default file system store
+ * await s3AssetStore.saveAsset({
+ * userId: "user123",
+ * assetId: "asset456",
+ * asset: buffer,
+ * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" }
+ * });
+ */
+
export async function saveAsset({
userId,
assetId,
@@ -67,22 +622,7 @@ export async function saveAsset({
asset: Buffer;
metadata: z.infer<typeof zAssetMetadataSchema>;
}) {
- if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
- throw new Error("Unsupported asset type");
- }
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.mkdir(assetDir, { recursive: true });
-
- await Promise.all([
- fs.promises.writeFile(
- path.join(assetDir, "asset.bin"),
- Uint8Array.from(asset),
- ),
- fs.promises.writeFile(
- path.join(assetDir, "metadata.json"),
- JSON.stringify(metadata),
- ),
- ]);
+ return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata });
}
export async function saveAssetFromFile({
@@ -96,22 +636,12 @@ export async function saveAssetFromFile({
assetPath: string;
metadata: z.infer<typeof zAssetMetadataSchema>;
}) {
- if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
- throw new Error("Unsupported asset type");
- }
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.mkdir(assetDir, { recursive: true });
-
- await Promise.all([
- // We'll have to copy first then delete the original file as inside the docker container
- // we can't move file between mounts.
- fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")),
- fs.promises.writeFile(
- path.join(assetDir, "metadata.json"),
- JSON.stringify(metadata),
- ),
- ]);
- await fs.promises.rm(assetPath);
+ return defaultAssetStore.saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ });
}
export async function readAsset({
@@ -121,20 +651,10 @@ export async function readAsset({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- const [asset, metadataStr] = await Promise.all([
- fs.promises.readFile(path.join(assetDir, "asset.bin")),
- fs.promises.readFile(path.join(assetDir, "metadata.json"), {
- encoding: "utf8",
- }),
- ]);
-
- const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr));
- return { asset, metadata };
+ return defaultAssetStore.readAsset({ userId, assetId });
}
-export function createAssetReadStream({
+export async function createAssetReadStream({
userId,
assetId,
start,
@@ -145,9 +665,9 @@ export function createAssetReadStream({
start?: number;
end?: number;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- return fs.createReadStream(path.join(assetDir, "asset.bin"), {
+ return defaultAssetStore.createAssetReadStream({
+ userId,
+ assetId,
start,
end,
});
@@ -160,16 +680,7 @@ export async function readAssetMetadata({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- const metadataStr = await fs.promises.readFile(
- path.join(assetDir, "metadata.json"),
- {
- encoding: "utf8",
- },
- );
-
- return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ return defaultAssetStore.readAssetMetadata({ userId, assetId });
}
export async function getAssetSize({
@@ -179,9 +690,7 @@ export async function getAssetSize({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
- const stat = await fs.promises.stat(path.join(assetDir, "asset.bin"));
- return stat.size;
+ return defaultAssetStore.getAssetSize({ userId, assetId });
}
/**
@@ -205,42 +714,15 @@ export async function deleteAsset({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.rm(path.join(assetDir), { recursive: true });
+ return defaultAssetStore.deleteAsset({ userId, assetId });
}
export async function deleteUserAssets({ userId }: { userId: string }) {
- const userDir = path.join(ROOT_PATH, userId);
- const dirExists = await fs.promises
- .access(userDir)
- .then(() => true)
- .catch(() => false);
- if (!dirExists) {
- return;
- }
- await fs.promises.rm(userDir, { recursive: true });
+ return defaultAssetStore.deleteUserAssets({ userId });
}
export async function* getAllAssets() {
- const g = new Glob(`/**/**/asset.bin`, {
- maxDepth: 3,
- root: ROOT_PATH,
- cwd: ROOT_PATH,
- absolute: false,
- });
- for await (const file of g) {
- const [userId, assetId] = file.split("/").slice(0, 2);
- const [size, metadata] = await Promise.all([
- getAssetSize({ userId, assetId }),
- readAssetMetadata({ userId, assetId }),
- ]);
- yield {
- userId,
- assetId,
- ...metadata,
- size,
- };
- }
+ yield* defaultAssetStore.getAllAssets();
}
export async function storeScreenshot(