aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared-server
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-09 01:22:30 +0000
committerMohamed Bassem <me@mbassem.com>2026-02-09 01:22:30 +0000
commita04d3c35fc9082e529a713605a038d236bb072c7 (patch)
tree3e69735f9762c1739402afe30c26961b7d8e598a /packages/shared-server
parent298a4277da7376ee6e7471f180c0030f35db49e4 (diff)
downloadkarakeep-a04d3c35fc9082e529a713605a038d236bb072c7.tar.zst
refactor: move assetdb to shared-server
Diffstat (limited to 'packages/shared-server')
-rw-r--r--packages/shared-server/assetdb.ts773
-rw-r--r--packages/shared-server/package.json8
2 files changed, 779 insertions, 2 deletions
diff --git a/packages/shared-server/assetdb.ts b/packages/shared-server/assetdb.ts
new file mode 100644
index 00000000..bb6bb75e
--- /dev/null
+++ b/packages/shared-server/assetdb.ts
@@ -0,0 +1,773 @@
+import * as fs from "fs";
+import * as path from "path";
+import { Readable } from "stream";
+import {
+ _Object,
+ DeleteObjectCommand,
+ DeleteObjectsCommand,
+ GetObjectCommand,
+ HeadObjectCommand,
+ ListObjectsV2Command,
+ PutObjectCommand,
+ S3Client,
+} from "@aws-sdk/client-s3";
+import glob from "glob";
+import { z } from "zod";
+
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { QuotaApproved } from "@karakeep/shared/storageQuota";
+
+const ROOT_PATH = serverConfig.assetsDir;
+
+export const enum ASSET_TYPES {
+ IMAGE_GIF = "image/gif",
+ IMAGE_JPEG = "image/jpeg",
+ IMAGE_PNG = "image/png",
+ IMAGE_WEBP = "image/webp",
+ APPLICATION_PDF = "application/pdf",
+ APPLICATION_ZIP = "application/zip",
+ TEXT_HTML = "text/html",
+
+ VIDEO_MP4 = "video/mp4",
+ VIDEO_WEBM = "video/webm",
+ VIDEO_MKV = "video/x-matroska",
+}
+
+export const VIDEO_ASSET_TYPES: Set<string> = new Set<string>([
+ ASSET_TYPES.VIDEO_MP4,
+ ASSET_TYPES.VIDEO_WEBM,
+ ASSET_TYPES.VIDEO_MKV,
+]);
+
+export const IMAGE_ASSET_TYPES: Set<string> = new Set<string>([
+ ASSET_TYPES.IMAGE_GIF,
+ ASSET_TYPES.IMAGE_JPEG,
+ ASSET_TYPES.IMAGE_PNG,
+ ASSET_TYPES.IMAGE_WEBP,
+]);
+
+// The assets that we allow the users to upload
+export const SUPPORTED_UPLOAD_ASSET_TYPES: Set<string> = new Set<string>([
+ ...IMAGE_ASSET_TYPES,
+ ...VIDEO_ASSET_TYPES,
+ ASSET_TYPES.TEXT_HTML,
+ ASSET_TYPES.APPLICATION_PDF,
+]);
+
+// The assets that we allow as a bookmark of type asset
+export const SUPPORTED_BOOKMARK_ASSET_TYPES: Set<string> = new Set<string>([
+ ...IMAGE_ASSET_TYPES,
+ ASSET_TYPES.APPLICATION_PDF,
+]);
+
+// The assets that we support saving in the asset db
+export const SUPPORTED_ASSET_TYPES: Set<string> = new Set<string>([
+ ...SUPPORTED_UPLOAD_ASSET_TYPES,
+ ASSET_TYPES.TEXT_HTML,
+ ASSET_TYPES.VIDEO_MP4,
+ ASSET_TYPES.APPLICATION_ZIP,
+]);
+
+export const zAssetMetadataSchema = z.object({
+ contentType: z.string(),
+ fileName: z.string().nullish(),
+});
+
+export type AssetMetadata = z.infer<typeof zAssetMetadataSchema>;
+
+export interface AssetInfo {
+ userId: string;
+ assetId: string;
+ contentType: string;
+ fileName?: string | null;
+ size: number;
+}
+
+export interface AssetStore {
+ saveAsset(params: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ saveAssetFromFile(params: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ readAsset(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<{ asset: Buffer; metadata: AssetMetadata }>;
+
+ createAssetReadStream(params: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }): Promise<NodeJS.ReadableStream>;
+
+ readAssetMetadata(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<AssetMetadata>;
+
+ getAssetSize(params: { userId: string; assetId: string }): Promise<number>;
+
+ deleteAsset(params: { userId: string; assetId: string }): Promise<void>;
+
+ deleteUserAssets(params: { userId: string }): Promise<void>;
+
+ getAllAssets(): AsyncGenerator<AssetInfo>;
+}
+
+export function newAssetId() {
+ return crypto.randomUUID();
+}
+
+class LocalFileSystemAssetStore implements AssetStore {
+ private rootPath: string;
+
+ constructor(rootPath: string) {
+ this.rootPath = rootPath;
+ }
+
+ private getAssetDir(userId: string, assetId: string) {
+ return path.join(this.rootPath, userId, assetId);
+ }
+
+ private async isPathExists(filePath: string) {
+ return fs.promises
+ .access(filePath)
+ .then(() => true)
+ .catch(() => false);
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.writeFile(
+ path.join(assetDir, "asset.bin"),
+ Uint8Array.from(asset),
+ ),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const [asset, metadataStr] = await Promise.all([
+ fs.promises.readFile(path.join(assetDir, "asset.bin")),
+ fs.promises.readFile(path.join(assetDir, "metadata.json"), {
+ encoding: "utf8",
+ }),
+ ]);
+
+ const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ return { asset, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const assetPath = path.join(assetDir, "asset.bin");
+ if (!(await this.isPathExists(assetPath))) {
+ throw new Error(`Asset ${assetId} not found`);
+ }
+
+ return fs.createReadStream(path.join(assetDir, "asset.bin"), {
+ start,
+ end,
+ });
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const metadataStr = await fs.promises.readFile(
+ path.join(assetDir, "metadata.json"),
+ {
+ encoding: "utf8",
+ },
+ );
+
+ return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const stat = await fs.promises.stat(path.join(assetDir, "asset.bin"));
+ return stat.size;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ if (!(await this.isPathExists(assetDir))) {
+ return;
+ }
+ await fs.promises.rm(assetDir, { recursive: true });
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ const userDir = path.join(this.rootPath, userId);
+ const dirExists = await this.isPathExists(userDir);
+ if (!dirExists) {
+ return;
+ }
+ await fs.promises.rm(userDir, { recursive: true });
+ }
+
+ async *getAllAssets() {
+ const files = await new Promise<string[]>((resolve, reject) => {
+ glob(
+ "*/*/asset.bin",
+ {
+ cwd: this.rootPath,
+ nodir: true,
+ },
+ (err, matches) => {
+ if (err) {
+ reject(err);
+ return;
+ }
+ resolve(matches);
+ },
+ );
+ });
+
+ for (const file of files) {
+ const [userId, assetId] = file.split("/").slice(0, 2);
+ const [size, metadata] = await Promise.all([
+ this.getAssetSize({ userId, assetId }),
+ this.readAssetMetadata({ userId, assetId }),
+ ]);
+
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ }
+ }
+}
+
+class S3AssetStore implements AssetStore {
+ private s3Client: S3Client;
+ private bucketName: string;
+
+ constructor(s3Client: S3Client, bucketName: string) {
+ this.s3Client = s3Client;
+ this.bucketName = bucketName;
+ }
+
+ private getAssetKey(userId: string, assetId: string) {
+ return `${userId}/${assetId}`;
+ }
+
+ private metadataToS3Metadata(
+ metadata: AssetMetadata,
+ ): Record<string, string> {
+ return {
+ ...(metadata.fileName
+ ? { "x-amz-meta-file-name": metadata.fileName }
+ : {}),
+ "x-amz-meta-content-type": metadata.contentType,
+ };
+ }
+
+ private s3MetadataToMetadata(
+ s3Metadata: Record<string, string> | undefined,
+ ): AssetMetadata {
+ if (!s3Metadata) {
+ throw new Error("No metadata found in S3 object");
+ }
+
+ return {
+ contentType: s3Metadata["x-amz-meta-content-type"] || "",
+ fileName: s3Metadata["x-amz-meta-file-name"] ?? null,
+ };
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ const asset = fs.createReadStream(assetPath);
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+
+ const assetBuffer = await this.streamToBuffer(response.Body as Readable);
+ const metadata = this.s3MetadataToMetadata(response.Metadata);
+
+ return { asset: assetBuffer, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const range =
+ start !== undefined && end !== undefined
+ ? `bytes=${start}-${end}`
+ : undefined;
+
+ const command = new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Range: range,
+ });
+
+ const response = await this.s3Client.send(command);
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+ return response.Body as NodeJS.ReadableStream;
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return this.s3MetadataToMetadata(response.Metadata);
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return response.ContentLength || 0;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ await this.s3Client.send(
+ new DeleteObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ Prefix: `${userId}/`,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents && listResponse.Contents.length > 0) {
+ await this.s3Client.send(
+ new DeleteObjectsCommand({
+ Bucket: this.bucketName,
+ Delete: {
+ Objects: listResponse.Contents.map((obj) => ({
+ Key: obj.Key!,
+ })),
+ },
+ }),
+ );
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ async *getAllAssets() {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents) {
+ for (const obj of listResponse.Contents) {
+ if (!obj.Key) continue;
+
+ const pathParts = obj.Key.split("/");
+ if (pathParts.length === 2) {
+ const userId = pathParts[0];
+ const assetId = pathParts[1];
+
+ try {
+ const headResponse = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: obj.Key,
+ }),
+ );
+
+ const metadata = this.s3MetadataToMetadata(headResponse.Metadata);
+ const size = headResponse.ContentLength || 0;
+
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ } catch (error) {
+ logger.warn(`Failed to read asset ${userId}/${assetId}:`, error);
+ }
+ }
+ }
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ private async streamToBuffer(stream: Readable): Promise<Buffer> {
+ const chunks: Buffer[] = [];
+ for await (const chunk of stream) {
+ chunks.push(chunk);
+ }
+ return Buffer.concat(chunks);
+ }
+}
+
+function createDefaultAssetStore(): AssetStore {
+ const config = serverConfig.assetStore;
+
+ if (config.type === "s3") {
+ if (!config.s3.bucket) {
+ throw new Error(
+ "ASSET_STORE_S3_BUCKET is required when using S3 asset store",
+ );
+ }
+ if (!config.s3.accessKeyId || !config.s3.secretAccessKey) {
+ throw new Error(
+ "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store",
+ );
+ }
+
+ const s3Client = new S3Client({
+ region: config.s3.region,
+ endpoint: config.s3.endpoint,
+ forcePathStyle: config.s3.forcePathStyle,
+ credentials: {
+ accessKeyId: config.s3.accessKeyId,
+ secretAccessKey: config.s3.secretAccessKey,
+ },
+ });
+
+ return new S3AssetStore(s3Client, config.s3.bucket);
+ }
+
+ return new LocalFileSystemAssetStore(ROOT_PATH);
+}
+
+const defaultAssetStore = createDefaultAssetStore();
+
+export { LocalFileSystemAssetStore, S3AssetStore };
+
+/**
+ * Example usage of S3AssetStore:
+ *
+ * import { S3Client } from "@aws-sdk/client-s3";
+ * import { S3AssetStore } from "@karakeep/shared-server/assetdb";
+ *
+ * const s3Client = new S3Client({
+ * region: "us-east-1",
+ * credentials: {
+ * accessKeyId: "your-access-key",
+ * secretAccessKey: "your-secret-key"
+ * }
+ * });
+ *
+ * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name");
+ *
+ * // Use s3AssetStore instead of the default file system store
+ * await s3AssetStore.saveAsset({
+ * userId: "user123",
+ * assetId: "asset456",
+ * asset: buffer,
+ * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" }
+ * });
+ */
+
+export async function saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ quotaApproved,
+}: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: z.infer<typeof zAssetMetadataSchema>;
+ quotaApproved: QuotaApproved;
+}) {
+ // Verify the quota approval is for the correct user and size
+ if (quotaApproved.userId !== userId) {
+ throw new Error("Quota approval is for a different user");
+ }
+ if (quotaApproved.approvedSize < asset.byteLength) {
+ throw new Error("Asset size exceeds approved quota");
+ }
+
+ return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata });
+}
+
+export async function saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ quotaApproved,
+}: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: z.infer<typeof zAssetMetadataSchema>;
+ quotaApproved: QuotaApproved;
+}) {
+ // Verify the quota approval is for the correct user
+ if (quotaApproved.userId !== userId) {
+ throw new Error("Quota approval is for a different user");
+ }
+
+ // For file-based saves, we'll verify the file size matches the approved size
+ // when the underlying store implementation reads the file
+
+ return defaultAssetStore.saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ });
+}
+
+export async function readAsset({
+ userId,
+ assetId,
+}: {
+ userId: string;
+ assetId: string;
+}) {
+ return defaultAssetStore.readAsset({ userId, assetId });
+}
+
+export async function createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+}: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+}) {
+ return defaultAssetStore.createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ });
+}
+
+export async function readAssetMetadata({
+ userId,
+ assetId,
+}: {
+ userId: string;
+ assetId: string;
+}) {
+ return defaultAssetStore.readAssetMetadata({ userId, assetId });
+}
+
+export async function getAssetSize({
+ userId,
+ assetId,
+}: {
+ userId: string;
+ assetId: string;
+}) {
+ return defaultAssetStore.getAssetSize({ userId, assetId });
+}
+
+/**
+ * Deletes the passed in asset if it exists and ignores any errors
+ * @param userId the id of the user the asset belongs to
+ * @param assetId the id of the asset to delete
+ */
+export async function silentDeleteAsset(
+ userId: string,
+ assetId: string | undefined,
+) {
+ if (assetId) {
+ await deleteAsset({ userId, assetId }).catch(() => ({}));
+ }
+}
+
+export async function deleteAsset({
+ userId,
+ assetId,
+}: {
+ userId: string;
+ assetId: string;
+}) {
+ return defaultAssetStore.deleteAsset({ userId, assetId });
+}
+
+export async function deleteUserAssets({ userId }: { userId: string }) {
+ return defaultAssetStore.deleteUserAssets({ userId });
+}
+
+export async function* getAllAssets() {
+ yield* defaultAssetStore.getAllAssets();
+}
diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json
index 357248b4..0afc2bf2 100644
--- a/packages/shared-server/package.json
+++ b/packages/shared-server/package.json
@@ -5,6 +5,7 @@
"private": true,
"type": "module",
"dependencies": {
+ "@aws-sdk/client-s3": "^3.842.0",
"@karakeep/db": "workspace:^0.1.0",
"@karakeep/plugins": "workspace:^0.1.0",
"@karakeep/shared": "workspace:^0.1.0",
@@ -13,7 +14,9 @@
"@opentelemetry/resources": "^2.2.0",
"@opentelemetry/sdk-trace-base": "^2.2.0",
"@opentelemetry/sdk-trace-node": "^2.2.0",
- "@opentelemetry/semantic-conventions": "^1.38.0"
+ "@opentelemetry/semantic-conventions": "^1.38.0",
+ "glob": "^11.0.0",
+ "zod": "^3.24.2"
},
"devDependencies": {
"@karakeep/prettier-config": "workspace:^0.1.0",
@@ -29,7 +32,8 @@
},
"main": "index.ts",
"exports": {
- ".": "./index.ts"
+ ".": "./index.ts",
+ "./assetdb": "./assetdb.ts"
},
"prettier": "@karakeep/prettier-config"
}